001/* 002 * $Id$ 003 */ 004 005package edu.jas.gb; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Collections; 011import java.util.List; 012import java.util.ListIterator; 013import java.util.concurrent.atomic.AtomicInteger; 014import java.util.concurrent.Executors; 015import java.util.concurrent.ExecutorService; 016import java.util.concurrent.ThreadPoolExecutor; 017import java.util.concurrent.TimeUnit; 018 019import org.apache.logging.log4j.Logger; 020import org.apache.logging.log4j.LogManager; 021 022import edu.jas.poly.ExpVector; 023import edu.jas.poly.GenPolynomial; 024import edu.jas.poly.GenPolynomialRing; 025import edu.jas.poly.PolyUtil; 026import edu.jas.structure.RingElem; 027import edu.jas.util.ChannelFactory; 028import edu.jas.util.DistHashTable; 029import edu.jas.util.DistHashTableServer; 030import edu.jas.util.DistThreadPool; 031import edu.jas.util.RemoteExecutable; 032import edu.jas.util.SocketChannel; 033import edu.jas.util.TaggedSocketChannel; 034import edu.jas.util.Terminator; 035 036 037/** 038 * Groebner Base distributed hybrid algorithm. Implements a distributed memory 039 * with multi-core CPUs parallel version of Groebner bases with executable 040 * channels. Using pairlist class, distributed multi-threaded tasks do 041 * reduction, one communication channel per remote node. 042 * @param <C> coefficient type 043 * @author Heinz Kredel 044 */ 045 046public class GroebnerBaseDistributedHybridEC<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 047 048 049 private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedHybridEC.class); 050 051 052 private static final boolean debug = logger.isDebugEnabled(); 053 054 055 /** 056 * Number of threads to use. 057 */ 058 protected final int threads; 059 060 061 /** 062 * Default number of threads. 063 */ 064 protected static final int DEFAULT_THREADS = 2; 065 066 067 /** 068 * Number of threads per node to use. 069 */ 070 protected final int threadsPerNode; 071 072 073 /** 074 * Default number of threads per compute node. 075 */ 076 protected static final int DEFAULT_THREADS_PER_NODE = 1; 077 078 079 /** 080 * Pool of threads to use. 081 */ 082 //protected final ExecutorService pool; // not for single node tests 083 protected transient final ExecutorService pool; 084 085 086 /** 087 * Default server port. 088 */ 089 protected static final int DEFAULT_PORT = 55711; 090 091 092 /** 093 * Default distributed hash table server port. 094 */ 095 protected final int DHT_PORT; 096 097 098 /** 099 * machine file to use. 100 */ 101 protected final String mfile; 102 103 104 /** 105 * Server port to use. 106 */ 107 protected final int port; 108 109 110 /** 111 * Distributed thread pool to use. 112 */ 113 private final transient DistThreadPool dtp; 114 115 116 /** 117 * Distributed hash table server to use. 118 */ 119 private final transient DistHashTableServer<Integer> dhts; 120 121 122 /** 123 * Message tag for pairs. 124 */ 125 public static final Integer pairTag = Integer.valueOf(1); 126 127 128 /** 129 * Message tag for results. 130 */ 131 public static final Integer resultTag = Integer.valueOf(2); 132 133 134 /** 135 * Message tag for acknowledgments. 136 */ 137 public static final Integer ackTag = Integer.valueOf(3); 138 139 140 /** 141 * Constructor. 142 * @param mfile name of the machine file. 143 */ 144 public GroebnerBaseDistributedHybridEC(String mfile) { 145 this(mfile, DEFAULT_THREADS, DEFAULT_PORT); 146 } 147 148 149 /** 150 * Constructor. 151 * @param mfile name of the machine file. 152 * @param threads number of threads to use. 153 */ 154 public GroebnerBaseDistributedHybridEC(String mfile, int threads) { 155 this(mfile, threads, Executors.newFixedThreadPool(threads), DEFAULT_PORT); 156 } 157 158 159 /** 160 * Constructor. 161 * @param mfile name of the machine file. 162 * @param threads number of threads to use. 163 * @param port server port to use. 164 */ 165 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int port) { 166 this(mfile, threads, Executors.newFixedThreadPool(threads), port); 167 } 168 169 170 /** 171 * Constructor. 172 * @param mfile name of the machine file. 173 * @param threads number of threads to use. 174 * @param threadsPerNode threads per node to use. 175 * @param port server port to use. 176 */ 177 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, int port) { 178 this(mfile, threads, threadsPerNode, Executors.newFixedThreadPool(threads), port); 179 } 180 181 182 /** 183 * Constructor. 184 * @param mfile name of the machine file. 185 * @param threads number of threads to use. 186 * @param pool ExecutorService to use. 187 * @param port server port to use. 188 */ 189 public GroebnerBaseDistributedHybridEC(String mfile, int threads, ExecutorService pool, int port) { 190 this(mfile, threads, DEFAULT_THREADS_PER_NODE, pool, port); 191 } 192 193 194 /** 195 * Constructor. 196 * @param mfile name of the machine file. 197 * @param threads number of threads to use. 198 * @param threadsPerNode threads per node to use. 199 * @param pl pair selection strategy 200 * @param port server port to use. 201 */ 202 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, PairList<C> pl, 203 int port) { 204 this(mfile, threads, threadsPerNode, Executors.newFixedThreadPool(threads), pl, port); 205 } 206 207 208 /** 209 * Constructor. 210 * @param mfile name of the machine file. 211 * @param threads number of threads to use. 212 * @param threadsPerNode threads per node to use. 213 * @param port server port to use. 214 */ 215 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ExecutorService pool, 216 int port) { 217 this(mfile, threads, threadsPerNode, pool, new OrderedPairlist<C>(), port); 218 } 219 220 221 /** 222 * Constructor. 223 * @param mfile name of the machine file. 224 * @param threads number of threads to use. 225 * @param threadsPerNode threads per node to use. 226 * @param pool ExecutorService to use. 227 * @param pl pair selection strategy 228 * @param port server port to use. 229 */ 230 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ExecutorService pool, 231 PairList<C> pl, int port) { 232 super(new ReductionPar<C>(), pl); 233 this.threads = threads; 234 if (mfile == null || mfile.length() == 0) { 235 this.mfile = "../util/machines"; // contains localhost 236 } else { 237 this.mfile = mfile; 238 } 239 if (threads < 1) { 240 threads = 1; 241 } 242 this.threadsPerNode = threadsPerNode; 243 if (pool == null) { 244 pool = Executors.newFixedThreadPool(threads); 245 logger.error("pool must not be null: {}", pool); 246 //throw new IllegalArgumentException("pool must not be null"); 247 } 248 this.pool = pool; 249 this.port = port; 250 logger.info("machine file {}, port = {}, pool = {}", mfile, port, pool); 251 this.dtp = new DistThreadPool(this.threads, this.mfile); 252 logger.info("running {}", dtp); 253 this.DHT_PORT = this.dtp.getEC().getMasterPort() + 100; 254 this.dhts = new DistHashTableServer<Integer>(this.DHT_PORT); 255 this.dhts.init(); 256 logger.info("running {}", dhts); 257 } 258 259 260 /** 261 * Cleanup and terminate ExecutorService. 262 */ 263 @Override 264 public void terminate() { 265 terminate(true); 266 } 267 268 269 /** 270 * Terminates the distributed thread pools. 271 * @param shutDown true, if shut-down of the remote executable servers is 272 * requested, false, if remote executable servers stay alive. 273 */ 274 public void terminate(boolean shutDown) { 275 pool.shutdown(); 276 try { 277 while (!pool.isTerminated()) { 278 //logger.info("await"); 279 boolean rest = pool.awaitTermination(1000L, TimeUnit.MILLISECONDS); 280 } 281 } catch (InterruptedException e) { 282 e.printStackTrace(); 283 } 284 logger.info(pool.toString()); 285 dtp.terminate(shutDown); 286 logger.info("dhts.terminate()"); 287 dhts.terminate(); 288 } 289 290 291 /** 292 * Distributed Groebner base. 293 * @param modv number of module variables. 294 * @param F polynomial list. 295 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 296 */ 297 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 298 List<GenPolynomial<C>> Fp = normalizeZerosOnes(F); 299 Fp = PolyUtil.<C> monic(Fp); 300 if (Fp.size() <= 1) { 301 return Fp; 302 } 303 if (!Fp.get(0).ring.coFac.isField()) { 304 throw new IllegalArgumentException("coefficients not from a field"); 305 } 306 307 String master = dtp.getEC().getMasterHost(); 308 //int port = dtp.getEC().getMasterPort(); // wrong port 309 GBHybridExerClient<C> gbc = new GBHybridExerClient<C>(master, threadsPerNode, port, DHT_PORT); 310 for (int i = 0; i < threads; i++) { 311 // schedule remote clients 312 dtp.addJob(gbc); 313 } 314 try { 315 Thread.currentThread().sleep(1000); 316 } catch (InterruptedException e) { 317 e.printStackTrace(); 318 } 319 // run master 320 List<GenPolynomial<C>> G = GBMaster(modv, Fp); 321 return G; 322 } 323 324 325 /** 326 * Distributed hybrid Groebner base. 327 * @param modv number of module variables. 328 * @param F non empty monic polynomial list without zeros. 329 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 330 */ 331 List<GenPolynomial<C>> GBMaster(int modv, List<GenPolynomial<C>> F) { 332 long t = System.currentTimeMillis(); 333 ChannelFactory cf = new ChannelFactory(port); 334 cf.init(); 335 336 List<GenPolynomial<C>> G = F; 337 if (G.isEmpty()) { 338 throw new IllegalArgumentException("empty polynomial list not allowed"); 339 } 340 GenPolynomialRing<C> ring = G.get(0).ring; 341 PairList<C> pairlist = strategy.create(modv, ring); 342 pairlist.put(G); 343 logger.info("start {}", pairlist); 344 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>( 345 "localhost", DHT_PORT); 346 theList.init(); 347 List<GenPolynomial<C>> al = pairlist.getList(); 348 for (int i = 0; i < al.size(); i++) { 349 // no wait required 350 GenPolynomial<C> nn = null; 351 theList.putWait(Integer.valueOf(i), al.get(i)); 352 if (nn != null) { 353 logger.info("double polynomials {}, nn = {}, al(i) = {}", i, nn, al.get(i)); 354 } 355 } 356 357 Terminator finner = new Terminator(threads * threadsPerNode); 358 HybridReducerServerEC<C> R; 359 logger.info("using pool = {}", pool); 360 for (int i = 0; i < threads; i++) { 361 R = new HybridReducerServerEC<C>(threadsPerNode, finner, cf, theList, pairlist); 362 pool.execute(R); 363 //logger.info("server submitted {}", R); 364 } 365 try { 366 Thread.currentThread().sleep(1000); 367 } catch (InterruptedException e) { 368 e.printStackTrace(); 369 } 370 logger.info("main loop waiting {}", finner); 371 finner.waitDone(); 372 int ps = theList.size(); 373 logger.info("#distributed list = {}", ps); 374 // make sure all polynomials arrived: not needed in master 375 // G = (ArrayList)theList.values(); 376 G = pairlist.getList(); 377 if (ps != G.size()) { 378 logger.info("#distributed list = {}, #pairlist list = {}", theList.size(), G.size()); 379 } 380 for (GenPolynomial<C> q : theList.getValueList()) { 381 if (debug && q != null && !q.isZERO()) { 382 logger.debug("final q = {}", q.leadingExpVector()); 383 } 384 } 385 logger.debug("distributed list end"); 386 long time = System.currentTimeMillis(); 387 List<GenPolynomial<C>> Gp; 388 Gp = minimalGB(G); // not jet distributed but threaded 389 time = System.currentTimeMillis() - time; 390 logger.debug("parallel gbmi time = {}", time); 391 G = Gp; 392 logger.debug("server cf.terminate()"); 393 cf.terminate(); 394 logger.debug("server theList.terminate() {}", theList.size()); 395 theList.clear(); 396 theList.terminate(); 397 t = System.currentTimeMillis() - t; 398 logger.info("server GB end, time = {} with {}", t, pairlist.toString()); 399 return G; 400 } 401 402 403 /** 404 * GB distributed client part. 405 * @param host the server runs on. 406 * @param port the server runs. 407 * @param dhtport of the DHT server. 408 * @throws IOException 409 */ 410 public static <C extends RingElem<C>> void clientPart(String host, int threadsPerNode, int port, 411 int dhtport) throws IOException { 412 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost 413 cf.init(); 414 logger.info("clientPart connecting to {}, port = {}, dhtport = {}", host, port, dhtport); 415 SocketChannel channel = cf.getChannel(host, port); 416 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel); 417 pairChannel.init(); 418 419 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host, 420 dhtport); 421 theList.init(); 422 423 ExecutorService pool = Executors.newFixedThreadPool(threadsPerNode); 424 //ThreadPool pool = new ThreadPool(threadsPerNode); 425 logger.info("client using pool = {}", pool); 426 for (int i = 0; i < threadsPerNode; i++) { 427 HybridReducerClientEC<C> Rr = new HybridReducerClientEC<C>(/*threadsPerNode,*/pairChannel, /*i,*/ 428 theList); 429 pool.execute(Rr); 430 } 431 logger.info("clients submitted"); 432 433 pool.shutdown(); 434 try { 435 while (!pool.isTerminated()) { 436 //logger.info("await"); 437 boolean rest = pool.awaitTermination(1000L, TimeUnit.MILLISECONDS); 438 } 439 } catch (InterruptedException e) { 440 e.printStackTrace(); 441 } 442 //pool.terminate(); 443 logger.info("client pool.terminate()"); 444 445 pairChannel.close(); 446 logger.debug("client pairChannel.close()"); 447 448 //master only: theList.clear(); 449 theList.terminate(); 450 cf.terminate(); 451 logger.info("client cf.terminate()"); 452 453 channel.close(); 454 logger.info("client channel.close()"); 455 return; 456 } 457 458 459 /** 460 * Minimal ordered groebner basis. 461 * @param Fp a Groebner base. 462 * @return a reduced Groebner base of Fp. 463 */ 464 @Override 465 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 466 GenPolynomial<C> a; 467 ArrayList<GenPolynomial<C>> G; 468 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 469 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 470 while (it.hasNext()) { 471 a = it.next(); 472 if (a.length() != 0) { // always true 473 // already monic a = a.monic(); 474 G.add(a); 475 } 476 } 477 if (G.size() <= 1) { 478 return G; 479 } 480 481 ExpVector e; 482 ExpVector f; 483 GenPolynomial<C> p; 484 ArrayList<GenPolynomial<C>> F; 485 F = new ArrayList<GenPolynomial<C>>(G.size()); 486 boolean mt; 487 488 while (G.size() > 0) { 489 a = G.remove(0); 490 e = a.leadingExpVector(); 491 492 it = G.listIterator(); 493 mt = false; 494 while (it.hasNext() && !mt) { 495 p = it.next(); 496 f = p.leadingExpVector(); 497 mt = e.multipleOf(f); 498 } 499 it = F.listIterator(); 500 while (it.hasNext() && !mt) { 501 p = it.next(); 502 f = p.leadingExpVector(); 503 mt = e.multipleOf(f); 504 } 505 if (!mt) { 506 F.add(a); 507 } else { 508 // System.out.println("dropped " + a.length()); 509 } 510 } 511 G = F; 512 if (G.size() <= 1) { 513 return G; 514 } 515 Collections.reverse(G); // important for lex GB 516 517 @SuppressWarnings("unchecked") 518 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()]; 519 int i = 0; 520 F = new ArrayList<GenPolynomial<C>>(G.size()); 521 while (G.size() > 0) { 522 a = G.remove(0); 523 // System.out.println("doing " + a.length()); 524 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 525 R.addAll(G); 526 R.addAll(F); 527 mirs[i] = new MiReducerServer<C>(R, a); 528 pool.execute(mirs[i]); 529 i++; 530 F.add(a); 531 } 532 G = F; 533 F = new ArrayList<GenPolynomial<C>>(G.size()); 534 for (i = 0; i < mirs.length; i++) { 535 a = mirs[i].getNF(); 536 F.add(a); 537 } 538 return F; 539 } 540 541} 542 543 544/** 545 * Distributed server reducing worker proxy threads. 546 * @param <C> coefficient type 547 */ 548class HybridReducerServerEC<C extends RingElem<C>> implements Runnable { 549 550 551 private static final Logger logger = LogManager.getLogger(HybridReducerServerEC.class); 552 553 554 private static final boolean debug = logger.isDebugEnabled(); 555 556 557 private final Terminator finner; 558 559 560 private final ChannelFactory cf; 561 562 563 private TaggedSocketChannel pairChannel; 564 565 566 private final DistHashTable<Integer, GenPolynomial<C>> theList; 567 568 569 private final PairList<C> pairlist; 570 571 572 private final int threadsPerNode; 573 574 575 /** 576 * Message tag for pairs. 577 */ 578 public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag; 579 580 581 /** 582 * Message tag for results. 583 */ 584 public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag; 585 586 587 /** 588 * Message tag for acknowledgments. 589 */ 590 public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag; 591 592 593 /** 594 * Constructor. 595 * @param tpn number of threads per node 596 * @param fin terminator 597 * @param cf channel factory 598 * @param dl distributed hash table 599 * @param L ordered pair list 600 */ 601 HybridReducerServerEC(int tpn, Terminator fin, ChannelFactory cf, 602 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 603 threadsPerNode = tpn; 604 finner = fin; 605 this.cf = cf; 606 theList = dl; 607 pairlist = L; 608 //logger.info("reducer server created {}", this); 609 } 610 611 612 /** 613 * Work loop. 614 * @see java.lang.Runnable#run() 615 */ 616 @Override 617 public void run() { 618 logger.debug("reducer server running with {}", cf); 619 SocketChannel channel = null; 620 try { 621 channel = cf.getChannel(); 622 pairChannel = new TaggedSocketChannel(channel); 623 pairChannel.init(); 624 } catch (InterruptedException e) { 625 logger.debug("get pair channel interrupted"); 626 e.printStackTrace(); 627 return; 628 } 629 if (logger.isInfoEnabled()) { 630 logger.info("pairChannel = {}", pairChannel); 631 } 632 // record idle remote workers (minus one?) 633 //finner.beIdle(threadsPerNode-1); 634 finner.initIdle(threadsPerNode); 635 AtomicInteger active = new AtomicInteger(0); 636 637 // start receiver 638 HybridReducerReceiverEC<C> receiver = new HybridReducerReceiverEC<C>(/*threadsPerNode,*/finner, 639 active, pairChannel, theList, pairlist); 640 receiver.start(); 641 642 Pair<C> pair; 643 //boolean set = false; 644 boolean goon = true; 645 //int polIndex = -1; 646 int red = 0; 647 int sleeps = 0; 648 649 // while more requests 650 while (goon) { 651 // receive request if thread is reported incactive 652 logger.info("receive request"); 653 Object req = null; 654 try { 655 req = pairChannel.receive(pairTag); 656 } catch (InterruptedException e) { 657 goon = false; 658 //e.printStackTrace(); 659 } catch (IOException e) { 660 goon = false; 661 //e.printStackTrace(); 662 } catch (ClassNotFoundException e) { 663 goon = false; 664 e.printStackTrace(); 665 } 666 logger.info("received request, req = {}", req); 667 if (req == null) { 668 goon = false; 669 break; 670 } 671 if (!(req instanceof GBTransportMessReq)) { 672 goon = false; 673 break; 674 } 675 676 // find pair and manage termination status 677 logger.debug("find pair"); 678 while (!pairlist.hasNext()) { // wait 679 if (!finner.hasJobs() && !pairlist.hasNext()) { 680 goon = false; 681 break; 682 } 683 try { 684 sleeps++; 685 if (sleeps % 3 == 0) { 686 logger.info("waiting for reducers, remaining = {}", finner); 687 } 688 Thread.sleep(100); 689 } catch (InterruptedException e) { 690 goon = false; 691 break; 692 } 693 } 694 if (Thread.currentThread().isInterrupted()) { 695 goon = false; 696 break; 697 } 698 if (!pairlist.hasNext() && !finner.hasJobs()) { 699 logger.info("termination detection: no pairs and no jobs left"); 700 goon = false; 701 break; //continue; //break? 702 } 703 finner.notIdle(); // before pairlist get!! 704 pair = pairlist.removeNext(); 705 // send pair to client, even if null 706 if (logger.isInfoEnabled()) { 707 logger.info("active count = {}", active.get()); 708 logger.info("send pair = {}", pair); 709 } 710 GBTransportMess msg = null; 711 if (pair != null) { 712 msg = new GBTransportMessPairIndex(pair); //,pairlist.size()-1); // size-1 713 } else { 714 msg = new GBTransportMess(); // not End(); at this time 715 // goon ?= false; 716 } 717 try { 718 red++; 719 pairChannel.send(pairTag, msg); 720 @SuppressWarnings("unused") 721 int a = active.getAndIncrement(); 722 } catch (IOException e) { 723 e.printStackTrace(); 724 goon = false; 725 break; 726 } 727 //logger.debug("#distributed list = {}", theList.size()); 728 } 729 logger.info("terminated, send {} reduction pairs", red); 730 731 /* 732 * send end mark to clients 733 */ 734 logger.debug("send end"); 735 try { 736 for (int i = 0; i < threadsPerNode; i++) { // -1 737 //do not wait: Object rq = pairChannel.receive(pairTag); 738 pairChannel.send(pairTag, new GBTransportMessEnd()); 739 } 740 // send also end to receiver 741 pairChannel.send(resultTag, new GBTransportMessEnd()); 742 //beware of race condition 743 } catch (IOException e) { 744 if (logger.isDebugEnabled()) { 745 e.printStackTrace(); 746 } 747 } 748 receiver.terminate(); 749 750 int d = active.get(); 751 if (d > 0) { 752 logger.info("remaining active tasks = {}", d); 753 } 754 //logger.info("terminated, send {} reduction pairs", red); 755 pairChannel.close(); 756 logger.debug("redServ pairChannel.close()"); 757 finner.release(); 758 759 channel.close(); 760 logger.info("redServ channel.close()"); 761 } 762} 763 764 765/** 766 * Distributed server receiving worker thread. 767 * @param <C> coefficient type 768 */ 769class HybridReducerReceiverEC<C extends RingElem<C>> extends Thread { 770 771 772 private static final Logger logger = LogManager.getLogger(HybridReducerReceiverEC.class); 773 774 775 private static final boolean debug = logger.isDebugEnabled(); 776 777 778 private final DistHashTable<Integer, GenPolynomial<C>> theList; 779 780 781 private final PairList<C> pairlist; 782 783 784 private final TaggedSocketChannel pairChannel; 785 786 787 private final Terminator finner; 788 789 790 //private final int threadsPerNode; 791 792 793 private final AtomicInteger active; 794 795 796 private volatile boolean goon; 797 798 799 /** 800 * Message tag for pairs. 801 */ 802 public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag; 803 804 805 /** 806 * Message tag for results. 807 */ 808 public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag; 809 810 811 /** 812 * Message tag for acknowledgments. 813 */ 814 public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag; 815 816 817 /** 818 * Constructor. 819 * @param fin terminator 820 * @param a active remote tasks count 821 * @param pc tagged socket channel 822 * @param dl distributed hash table 823 * @param L ordered pair list 824 */ 825 //param tpn number of threads per node 826 HybridReducerReceiverEC(/*int tpn,*/Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 827 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 828 active = a; 829 //threadsPerNode = tpn; 830 finner = fin; 831 pairChannel = pc; 832 theList = dl; 833 pairlist = L; 834 goon = true; 835 //logger.info("reducer server created {}", this); 836 } 837 838 839 /** 840 * Work loop. 841 * @see java.lang.Thread#run() 842 */ 843 @Override 844 public void run() { 845 //Pair<C> pair = null; 846 GenPolynomial<C> H = null; 847 int red = 0; 848 int polIndex = -1; 849 //Integer senderId; // obsolete 850 851 // while more requests 852 while (goon) { 853 // receive request 854 logger.info("receive result"); 855 //senderId = null; 856 Object rh = null; 857 try { 858 rh = pairChannel.receive(resultTag); 859 @SuppressWarnings("unused") 860 int i = active.getAndDecrement(); 861 } catch (InterruptedException e) { 862 goon = false; 863 //e.printStackTrace(); 864 //?? finner.initIdle(1); 865 break; 866 } catch (IOException e) { 867 //e.printStackTrace(); 868 goon = false; 869 //finner.initIdle(1); 870 break; 871 } catch (ClassNotFoundException e) { 872 e.printStackTrace(); 873 goon = false; 874 finner.initIdle(1); 875 break; 876 } 877 logger.info("received H polynomial {}", rh); 878 if (rh == null) { 879 if (this.isInterrupted()) { 880 goon = false; 881 finner.initIdle(1); 882 break; 883 } 884 //finner.initIdle(1); 885 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 886 logger.info("received GBTransportMessEnd"); 887 goon = false; 888 //?? finner.initIdle(1); 889 break; 890 } else if (rh instanceof GBTransportMessPoly) { 891 // update pair list 892 red++; 893 @SuppressWarnings("unchecked") 894 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 895 H = mpi.pol; 896 //senderId = mpi.threadId; 897 if (H != null) { 898 if (logger.isInfoEnabled()) { 899 logger.info("H = {}", H.leadingExpVector()); 900 } 901 if (!H.isZERO()) { 902 if (H.isONE()) { 903 // finner.allIdle(); 904 polIndex = pairlist.putOne(); 905 theList.putWait(Integer.valueOf(polIndex), H); 906 //goon = false; must wait for other clients 907 //finner.initIdle(1); 908 //break; 909 } else { 910 polIndex = pairlist.put(H); 911 // use putWait ? but still not all distributed 912 //GenPolynomial<C> nn = 913 theList.putWait(Integer.valueOf(polIndex), H); 914 } 915 } 916 } 917 } 918 // only after recording in pairlist ! 919 finner.initIdle(1); 920 try { 921 pairChannel.send(ackTag, new GBTransportMess()); 922 logger.debug("send acknowledgement"); 923 } catch (IOException e) { 924 e.printStackTrace(); 925 goon = false; 926 break; 927 } 928 } // end while 929 goon = false; 930 logger.info("terminated, received {} reductions", red); 931 } 932 933 934 /** 935 * Terminate. 936 */ 937 public void terminate() { 938 goon = false; 939 //this.interrupt(); 940 try { 941 this.join(); 942 } catch (InterruptedException e) { 943 // unfug Thread.currentThread().interrupt(); 944 } 945 logger.debug("HybridReducerReceiver terminated"); 946 } 947 948} 949 950 951/** 952 * Distributed clients reducing worker threads. 953 */ 954class HybridReducerClientEC<C extends RingElem<C>> implements Runnable { 955 956 957 private static final Logger logger = LogManager.getLogger(HybridReducerClientEC.class); 958 959 960 private static final boolean debug = logger.isDebugEnabled(); 961 962 963 private final TaggedSocketChannel pairChannel; 964 965 966 private final DistHashTable<Integer, GenPolynomial<C>> theList; 967 968 969 private final ReductionPar<C> red; 970 971 972 //private final int threadsPerNode; 973 974 975 /* 976 * Identification number for this thread. 977 */ 978 //public final Integer threadId; // obsolete 979 980 981 /** 982 * Message tag for pairs. 983 */ 984 public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag; 985 986 987 /** 988 * Message tag for results. 989 */ 990 public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag; 991 992 993 /** 994 * Message tag for acknowledgments. 995 */ 996 public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag; 997 998 999 /** 1000 * Constructor. 1001 * @param tc tagged socket channel 1002 * @param dl distributed hash table 1003 */ 1004 //param tpn number of threads per node 1005 //param tid thread identification 1006 HybridReducerClientEC(/*int tpn,*/TaggedSocketChannel tc, /*Integer tid,*/ 1007 DistHashTable<Integer, GenPolynomial<C>> dl) { 1008 //threadsPerNode = tpn; 1009 pairChannel = tc; 1010 //threadId = 100 + tid; // keep distinct from other tags 1011 theList = dl; 1012 red = new ReductionPar<C>(); 1013 } 1014 1015 1016 /** 1017 * Work loop. 1018 * @see java.lang.Runnable#run() 1019 */ 1020 @Override 1021 public void run() { 1022 if (logger.isInfoEnabled()) { 1023 logger.info("pairChannel = {} reducer client running", pairChannel); 1024 } 1025 Pair<C> pair = null; 1026 GenPolynomial<C> pi, pj, ps; 1027 GenPolynomial<C> S; 1028 GenPolynomial<C> H = null; 1029 //boolean set = false; 1030 boolean goon = true; 1031 boolean doEnd = true; 1032 int reduction = 0; 1033 //int sleeps = 0; 1034 Integer pix, pjx, psx; 1035 1036 while (goon) { 1037 /* protocol: 1038 * request pair, process pair, send result, receive acknowledgment 1039 */ 1040 // pair = (Pair) pairlist.removeNext(); 1041 Object req = new GBTransportMessReq(); 1042 logger.info("send request"); 1043 try { 1044 pairChannel.send(pairTag, req); 1045 } catch (IOException e) { 1046 goon = false; 1047 if (logger.isDebugEnabled()) { 1048 e.printStackTrace(); 1049 } 1050 logger.info("receive pair, IOexception "); 1051 break; 1052 } 1053 logger.info("receive on pairTag, goon = {}", goon); 1054 doEnd = true; 1055 Object pp = null; 1056 try { 1057 pp = pairChannel.receive(pairTag); 1058 } catch (InterruptedException e) { 1059 goon = false; 1060 e.printStackTrace(); 1061 } catch (IOException e) { 1062 goon = false; 1063 if (logger.isDebugEnabled()) { 1064 e.printStackTrace(); 1065 } 1066 break; 1067 } catch (ClassNotFoundException e) { 1068 goon = false; 1069 e.printStackTrace(); 1070 } 1071 if (logger.isInfoEnabled()) { 1072 logger.info("received pair = {}", pp); 1073 } 1074 H = null; 1075 if (pp == null) { // should not happen 1076 continue; 1077 } 1078 if (pp instanceof GBTransportMessEnd) { 1079 goon = false; 1080 //doEnd = false; 1081 continue; 1082 } 1083 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1084 pi = pj = ps = null; 1085 if (pp instanceof GBTransportMessPair) { // obsolet, for tests 1086 @SuppressWarnings("unchecked") 1087 GBTransportMessPair<C> tmp = (GBTransportMessPair<C>) pp; 1088 pair = tmp.pair; 1089 if (pair != null) { 1090 pi = pair.pi; 1091 pj = pair.pj; 1092 //logger.debug("pair: pix = {}, pjx = {}", pair.i, pair.j); 1093 } 1094 } 1095 if (pp instanceof GBTransportMessPairIndex) { 1096 GBTransportMessPairIndex tmpi = (GBTransportMessPairIndex) pp; 1097 pix = tmpi.i; 1098 pjx = tmpi.j; 1099 psx = tmpi.s; 1100 pi = theList.getWait(pix); 1101 pj = theList.getWait(pjx); 1102 ps = theList.getWait(psx); 1103 //logger.info("pix = {}, pjx = {}, psx = {}", pix, pjx, psx); 1104 } 1105 if (pi != null && pj != null) { 1106 S = red.SPolynomial(pi, pj); 1107 //logger.info("ht(S) = {}", S.leadingExpVector()); 1108 if (S.isZERO()) { 1109 // pair.setZero(); does not work in dist 1110 } else { 1111 if (logger.isDebugEnabled()) { 1112 logger.debug("ht(S) = {}", S.leadingExpVector()); 1113 } 1114 H = red.normalform(theList, S); 1115 //logger.info("ht(H) = {}", H.leadingExpVector()); 1116 reduction++; 1117 if (H.isZERO()) { 1118 // pair.setZero(); does not work in dist 1119 } else { 1120 H = H.monic(); 1121 if (logger.isInfoEnabled()) { 1122 logger.info("ht(H) = {}", H.leadingExpVector()); 1123 } 1124 } 1125 } 1126 } else { 1127 logger.info("pi = {}, pj = {}, ps = {}", pi, pj, ps); 1128 } 1129 } 1130 if (pp instanceof GBTransportMess) { 1131 logger.debug("null pair results in null H poly"); 1132 } 1133 1134 // send H or must send null, if not at end 1135 if (logger.isDebugEnabled()) { 1136 logger.debug("#distributed list = {}", theList.size()); 1137 logger.debug("send H polynomial = {}", H); 1138 } 1139 try { 1140 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1141 doEnd = false; 1142 } catch (IOException e) { 1143 goon = false; 1144 e.printStackTrace(); 1145 } 1146 //logger.info("done send poly message of {}", pp); 1147 try { 1148 //pp = pairChannel.receive(threadId); 1149 pp = pairChannel.receive(ackTag); 1150 } catch (InterruptedException e) { 1151 goon = false; 1152 e.printStackTrace(); 1153 } catch (IOException e) { 1154 goon = false; 1155 if (logger.isDebugEnabled()) { 1156 e.printStackTrace(); 1157 } 1158 break; 1159 } catch (ClassNotFoundException e) { 1160 goon = false; 1161 e.printStackTrace(); 1162 } 1163 if (!(pp instanceof GBTransportMess)) { 1164 logger.error("invalid acknowledgement {}", pp); 1165 } 1166 logger.info("received acknowledgment "); 1167 } 1168 logger.info("terminated, {} reductions, {} polynomials", reduction, theList.size()); 1169 if (doEnd) { 1170 try { 1171 pairChannel.send(resultTag, new GBTransportMessEnd()); 1172 } catch (IOException e) { 1173 //e.printStackTrace(); 1174 } 1175 logger.debug("terminated, send done"); 1176 } 1177 } 1178} 1179 1180 1181/** 1182 * Objects of this class are to be send to a ExecutableServer. 1183 */ 1184class GBHybridExerClient<C extends RingElem<C>> implements RemoteExecutable { 1185 1186 1187 String host; 1188 1189 1190 int port; 1191 1192 1193 int dhtport; 1194 1195 1196 int threadsPerNode; 1197 1198 1199 /** 1200 * GBHybridExerClient. 1201 * @param host 1202 * @param port 1203 * @param dhtport 1204 */ 1205 public GBHybridExerClient(String host, int threadsPerNode, int port, int dhtport) { 1206 this.host = host; 1207 this.threadsPerNode = threadsPerNode; 1208 this.port = port; 1209 this.dhtport = dhtport; 1210 } 1211 1212 1213 /** 1214 * run. 1215 */ 1216 public void run() { 1217 try { 1218 GroebnerBaseDistributedHybridEC.<C> clientPart(host, threadsPerNode, port, dhtport); 1219 } catch (Exception e) { 1220 e.printStackTrace(); 1221 } 1222 } 1223 1224 1225 /** 1226 * String representation. 1227 */ 1228 @Override 1229 public String toString() { 1230 StringBuffer s = new StringBuffer("GBHybridExerClient("); 1231 s.append("host=" + host); 1232 s.append(", threadsPerNode=" + threadsPerNode); 1233 s.append(", port=" + port); 1234 s.append(", dhtport=" + dhtport); 1235 s.append(")"); 1236 return s.toString(); 1237 } 1238 1239}