001/* 002 * $Id: GroebnerBaseDistributedHybridEC.java 4261 2012-10-21 11:34:02Z kredel $ 003 */ 004 005package edu.jas.gb; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Collections; 011import java.util.List; 012import java.util.ListIterator; 013import java.util.concurrent.atomic.AtomicInteger; 014 015import org.apache.log4j.Logger; 016 017import edu.jas.poly.ExpVector; 018import edu.jas.poly.GenPolynomial; 019import edu.jas.structure.RingElem; 020import edu.jas.util.ChannelFactory; 021import edu.jas.util.DistHashTable; 022import edu.jas.util.DistHashTableServer; 023import edu.jas.util.SocketChannel; 024import edu.jas.util.TaggedSocketChannel; 025import edu.jas.util.Terminator; 026import edu.jas.util.ThreadPool; 027import edu.jas.util.DistThreadPool; 028import edu.jas.util.RemoteExecutable; 029 030 031/** 032 * Groebner Base distributed hybrid algorithm. Implements a 033 * distributed memory with multi-core CPUs parallel version of 034 * Groebner bases with executable channels. Using pairlist class, 035 * distributed multi-threaded tasks do reduction, one communication 036 * channel per remote node. 037 * @param <C> coefficient type 038 * @author Heinz Kredel 039 */ 040 041public class GroebnerBaseDistributedHybridEC<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 042 043 044 public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybridEC.class); 045 046 047 public final boolean debug = logger.isDebugEnabled(); 048 049 050 /** 051 * Number of threads to use. 052 */ 053 protected final int threads; 054 055 056 /** 057 * Default number of threads. 058 */ 059 protected static final int DEFAULT_THREADS = 2; 060 061 062 /** 063 * Number of threads per node to use. 064 */ 065 protected final int threadsPerNode; 066 067 068 /** 069 * Default number of threads per compute node. 070 */ 071 protected static final int DEFAULT_THREADS_PER_NODE = 1; 072 073 074 /** 075 * Pool of threads to use. 076 */ 077 //protected final ExecutorService pool; // not for single node tests 078 protected transient final ThreadPool pool; 079 080 081 /** 082 * Default server port. 083 */ 084 protected static final int DEFAULT_PORT = 55711; 085 086 087 /** 088 * Default distributed hash table server port. 089 */ 090 protected final int DHT_PORT; 091 092 093 /** 094 * machine file to use. 095 */ 096 protected final String mfile; 097 098 099 /** 100 * Server port to use. 101 */ 102 protected final int port; 103 104 105 /** 106 * Distributed thread pool to use. 107 */ 108 private final DistThreadPool dtp; 109 110 111 /** 112 * Distributed hash table server to use. 113 */ 114 private final DistHashTableServer<Integer> dhts; 115 116 117 /** 118 * Message tag for pairs. 119 */ 120 public static final Integer pairTag = Integer.valueOf(1); 121 122 123 /** 124 * Message tag for results. 125 */ 126 public static final Integer resultTag = Integer.valueOf(2); 127 128 129 /** 130 * Message tag for acknowledgments. 131 */ 132 public static final Integer ackTag = Integer.valueOf(3); 133 134 135 /** 136 * Constructor. 137 * @param mfile name of the machine file. 138 */ 139 public GroebnerBaseDistributedHybridEC(String mfile) { 140 this(mfile, DEFAULT_THREADS, DEFAULT_PORT); 141 } 142 143 144 /** 145 * Constructor. 146 * @param mfile name of the machine file. 147 * @param threads number of threads to use. 148 */ 149 public GroebnerBaseDistributedHybridEC(String mfile, int threads) { 150 this(mfile, threads, new ThreadPool(threads), DEFAULT_PORT); 151 } 152 153 154 /** 155 * Constructor. 156 * @param mfile name of the machine file. 157 * @param threads number of threads to use. 158 * @param port server port to use. 159 */ 160 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int port) { 161 this(mfile, threads, new ThreadPool(threads), port); 162 } 163 164 165 /** 166 * Constructor. 167 * @param mfile name of the machine file. 168 * @param threads number of threads to use. 169 * @param threadsPerNode threads per node to use. 170 * @param port server port to use. 171 */ 172 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, int port) { 173 this(mfile, threads, threadsPerNode, new ThreadPool(threads), port); 174 } 175 176 177 /** 178 * Constructor. 179 * @param mfile name of the machine file. 180 * @param threads number of threads to use. 181 * @param pool ThreadPool to use. 182 * @param port server port to use. 183 */ 184 public GroebnerBaseDistributedHybridEC(String mfile, int threads, ThreadPool pool, int port) { 185 this(mfile, threads, DEFAULT_THREADS_PER_NODE, pool, port); 186 } 187 188 189 /** 190 * Constructor. 191 * @param mfile name of the machine file. 192 * @param threads number of threads to use. 193 * @param threadsPerNode threads per node to use. 194 * @param pl pair selection strategy 195 * @param port server port to use. 196 */ 197 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, PairList<C> pl, int port) { 198 this(mfile, threads, threadsPerNode, new ThreadPool(threads), pl, port); 199 } 200 201 202 /** 203 * Constructor. 204 * @param mfile name of the machine file. 205 * @param threads number of threads to use. 206 * @param threadsPerNode threads per node to use. 207 * @param port server port to use. 208 */ 209 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, 210 ThreadPool pool, int port) { 211 this(mfile, threads, threadsPerNode, pool, new OrderedPairlist<C>(), port); 212 } 213 214 215 /** 216 * Constructor. 217 * @param mfile name of the machine file. 218 * @param threads number of threads to use. 219 * @param threadsPerNode threads per node to use. 220 * @param pool ThreadPool to use. 221 * @param pl pair selection strategy 222 * @param port server port to use. 223 */ 224 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ThreadPool pool, 225 PairList<C> pl, int port) { 226 super(new ReductionPar<C>(), pl); 227 this.threads = threads; 228 if (mfile == null || mfile.length() == 0) { 229 this.mfile = "../util/machines"; // contains localhost 230 } else { 231 this.mfile = mfile; 232 } 233 if (threads < 1) { 234 threads = 1; 235 } 236 this.threadsPerNode = threadsPerNode; 237 if ( pool == null ) { 238 pool = new ThreadPool(threads); 239 } 240 this.pool = pool; 241 this.port = port; 242 logger.info("machine file " + mfile + ", port = " + port); 243 this.dtp = new DistThreadPool(this.threads, this.mfile); 244 logger.info("running " + dtp); 245 this.DHT_PORT = this.dtp.getEC().getMasterPort() + 100; 246 this.dhts = new DistHashTableServer<Integer>(this.DHT_PORT); 247 this.dhts.init(); 248 logger.info("running " + dhts); 249 } 250 251 252 /** 253 * Cleanup and terminate ThreadPool. 254 */ 255 @Override 256 public void terminate() { 257 terminate(true); 258 } 259 260 261 /** 262 * Terminates the distributed thread pools. 263 * @param shutDown true, if shut-down of the remote executable servers is 264 * requested, false, if remote executable servers stay alive. 265 */ 266 public void terminate(boolean shutDown) { 267 pool.terminate(); 268 dtp.terminate(shutDown); 269 logger.info("dhts.terminate()"); 270 dhts.terminate(); 271 } 272 273 274 /** 275 * Distributed Groebner base. 276 * @param modv number of module variables. 277 * @param F polynomial list. 278 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 279 */ 280 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 281 String master = dtp.getEC().getMasterHost(); 282 //int port = dtp.getEC().getMasterPort(); // wrong port 283 GBHybridExerClient<C> gbc = new GBHybridExerClient<C>(master, threadsPerNode, port, DHT_PORT); 284 for (int i = 0; i < threads; i++) { 285 // schedule remote clients 286 dtp.addJob(gbc); 287 } 288 // run master 289 List<GenPolynomial<C>> G = GBMaster(modv, F); 290 291 return G; 292 } 293 294 295 /** 296 * Distributed hybrid Groebner base. 297 * @param modv number of module variables. 298 * @param F polynomial list. 299 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 300 */ 301 public List<GenPolynomial<C>> GBMaster(int modv, List<GenPolynomial<C>> F) { 302 long t = System.currentTimeMillis(); 303 ChannelFactory cf = new ChannelFactory(port); 304 cf.init(); 305 306 GenPolynomial<C> p; 307 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 308 PairList<C> pairlist = null; 309 boolean oneInGB = false; 310 int l = F.size(); 311 int unused; 312 ListIterator<GenPolynomial<C>> it = F.listIterator(); 313 while (it.hasNext()) { 314 p = it.next(); 315 if (p.length() > 0) { 316 p = p.monic(); 317 if (p.isONE()) { 318 oneInGB = true; 319 G.clear(); 320 G.add(p); 321 //return G; must signal termination to others 322 } 323 if (!oneInGB) { 324 G.add(p); 325 } 326 if (pairlist == null) { 327 //pairlist = new OrderedPairlist<C>(modv, p.ring); 328 pairlist = strategy.create(modv, p.ring); 329 if (!p.ring.coFac.isField()) { 330 throw new IllegalArgumentException("coefficients not from a field"); 331 } 332 } 333 // theList not updated here 334 if (p.isONE()) { 335 unused = pairlist.putOne(); 336 } else { 337 unused = pairlist.put(p); 338 } 339 } else { 340 l--; 341 } 342 } 343 //if (l <= 1) { 344 //return G; must signal termination to others 345 //} 346 logger.info("pairlist " + pairlist); 347 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>( 348 "localhost", DHT_PORT); 349 theList.init(); 350 List<GenPolynomial<C>> al = pairlist.getList(); 351 for (int i = 0; i < al.size(); i++) { 352 // no wait required 353 GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i)); 354 if (nn != null) { 355 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 356 } 357 } 358 359 Terminator finner = new Terminator(threads * threadsPerNode); 360 HybridReducerServerEC<C> R; 361 logger.info("using pool = " + pool); 362 for (int i = 0; i < threads; i++) { 363 R = new HybridReducerServerEC<C>(threadsPerNode, finner, cf, theList, pairlist); 364 pool.addJob(R); 365 //logger.info("server submitted " + R); 366 } 367 logger.info("main loop waiting " + finner); 368 finner.waitDone(); 369 int ps = theList.size(); 370 logger.info("#distributed list = " + ps); 371 // make sure all polynomials arrived: not needed in master 372 // G = (ArrayList)theList.values(); 373 G = pairlist.getList(); 374 if (ps != G.size()) { 375 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 376 } 377 for (GenPolynomial<C> q : theList.getValueList()) { 378 if (q != null && !q.isZERO()) { 379 logger.debug("final q = " + q.leadingExpVector()); 380 } 381 } 382 logger.debug("distributed list end"); 383 long time = System.currentTimeMillis(); 384 List<GenPolynomial<C>> Gp; 385 Gp = minimalGB(G); // not jet distributed but threaded 386 time = System.currentTimeMillis() - time; 387 logger.info("parallel gbmi time = " + time); 388 G = Gp; 389 logger.debug("server cf.terminate()"); 390 cf.terminate(); 391 // no more required // 392 logger.info("server not pool.terminate() " + pool); 393 //pool.terminate(); 394 logger.info("server theList.terminate() " + theList.size()); 395 theList.terminate(); 396 t = System.currentTimeMillis() - t; 397 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 398 return G; 399 } 400 401 402 /** 403 * GB distributed client part. 404 * @param host the server runs on. 405 * @param port the server runs. 406 * @param dhtport of the DHT server. 407 * @throws IOException 408 */ 409 public static <C extends RingElem<C>> void clientPart(String host, int threadsPerNode, int port, int dhtport) throws IOException { 410 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost 411 cf.init(); 412 logger.info("clientPart connecting to " + host + ", port = " + port + ", dhtport = " + dhtport); 413 SocketChannel channel = cf.getChannel(host, port); 414 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel); 415 pairChannel.init(); 416 417 DistHashTable<Integer, GenPolynomial<C>> theList 418 = new DistHashTable<Integer, GenPolynomial<C>>(host, dhtport); 419 theList.init(); 420 421 ThreadPool pool = new ThreadPool(threadsPerNode); 422 logger.info("client using pool = " + pool); 423 for (int i = 0; i < threadsPerNode; i++) { 424 HybridReducerClientEC<C> Rr = new HybridReducerClientEC<C>(threadsPerNode, pairChannel, i, theList); 425 pool.addJob(Rr); 426 } 427 logger.debug("clients submitted"); 428 429 pool.terminate(); 430 logger.debug("client pool.terminate()"); 431 432 pairChannel.close(); 433 logger.info("client pairChannel.close()"); 434 435 theList.terminate(); 436 cf.terminate(); 437 logger.info("client cf.terminate()"); 438 439 channel.close(); 440 logger.info("client channel.close()"); 441 return; 442 } 443 444 445 /** 446 * Minimal ordered groebner basis. 447 * @param Fp a Groebner base. 448 * @return a reduced Groebner base of Fp. 449 */ 450 @Override 451 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 452 GenPolynomial<C> a; 453 ArrayList<GenPolynomial<C>> G; 454 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 455 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 456 while (it.hasNext()) { 457 a = it.next(); 458 if (a.length() != 0) { // always true 459 // already monic a = a.monic(); 460 G.add(a); 461 } 462 } 463 if (G.size() <= 1) { 464 return G; 465 } 466 467 ExpVector e; 468 ExpVector f; 469 GenPolynomial<C> p; 470 ArrayList<GenPolynomial<C>> F; 471 F = new ArrayList<GenPolynomial<C>>(G.size()); 472 boolean mt; 473 474 while (G.size() > 0) { 475 a = G.remove(0); 476 e = a.leadingExpVector(); 477 478 it = G.listIterator(); 479 mt = false; 480 while (it.hasNext() && !mt) { 481 p = it.next(); 482 f = p.leadingExpVector(); 483 mt = e.multipleOf(f); 484 } 485 it = F.listIterator(); 486 while (it.hasNext() && !mt) { 487 p = it.next(); 488 f = p.leadingExpVector(); 489 mt = e.multipleOf(f); 490 } 491 if (!mt) { 492 F.add(a); 493 } else { 494 // System.out.println("dropped " + a.length()); 495 } 496 } 497 G = F; 498 if (G.size() <= 1) { 499 return G; 500 } 501 Collections.reverse(G); // important for lex GB 502 503 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()]; 504 int i = 0; 505 F = new ArrayList<GenPolynomial<C>>(G.size()); 506 while (G.size() > 0) { 507 a = G.remove(0); 508 // System.out.println("doing " + a.length()); 509 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 510 R.addAll(G); 511 R.addAll(F); 512 mirs[i] = new MiReducerServer<C>(R, a); 513 pool.addJob(mirs[i]); 514 i++; 515 F.add(a); 516 } 517 G = F; 518 F = new ArrayList<GenPolynomial<C>>(G.size()); 519 for (i = 0; i < mirs.length; i++) { 520 a = mirs[i].getNF(); 521 F.add(a); 522 } 523 return F; 524 } 525 526} 527 528 529/** 530 * Distributed server reducing worker proxy threads. 531 * @param <C> coefficient type 532 */ 533 534class HybridReducerServerEC<C extends RingElem<C>> implements Runnable { 535 536 537 public static final Logger logger = Logger.getLogger(HybridReducerServerEC.class); 538 539 540 public final boolean debug = logger.isDebugEnabled(); 541 542 543 private final Terminator finner; 544 545 546 private final ChannelFactory cf; 547 548 549 private TaggedSocketChannel pairChannel; 550 551 552 private final DistHashTable<Integer, GenPolynomial<C>> theList; 553 554 555 private final PairList<C> pairlist; 556 557 558 private final int threadsPerNode; 559 560 561 /** 562 * Message tag for pairs. 563 */ 564 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 565 566 567 /** 568 * Message tag for results. 569 */ 570 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 571 572 573 /** 574 * Message tag for acknowledgments. 575 */ 576 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 577 578 579 /** 580 * Constructor. 581 * @param tpn number of threads per node 582 * @param fin terminator 583 * @param cf channel factory 584 * @param dl distributed hash table 585 * @param L ordered pair list 586 */ 587 HybridReducerServerEC(int tpn, Terminator fin, ChannelFactory cf, 588 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 589 threadsPerNode = tpn; 590 finner = fin; 591 this.cf = cf; 592 theList = dl; 593 pairlist = L; 594 //logger.info("reducer server created " + this); 595 } 596 597 598 /** 599 * Work loop. 600 * @see java.lang.Runnable#run() 601 */ 602 //JAVA6only: @Override 603 public void run() { 604 logger.info("reducer server running with " + cf); 605 SocketChannel channel = null; 606 try { 607 channel = cf.getChannel(); 608 pairChannel = new TaggedSocketChannel(channel); 609 pairChannel.init(); 610 } catch (InterruptedException e) { 611 logger.debug("get pair channel interrupted"); 612 e.printStackTrace(); 613 return; 614 } 615 if (debug) { 616 logger.info("pairChannel = " + pairChannel); 617 } 618 // record idle remote workers (minus one?) 619 //finner.beIdle(threadsPerNode-1); 620 finner.initIdle(threadsPerNode); 621 AtomicInteger active = new AtomicInteger(0); 622 623 // start receiver 624 HybridReducerReceiverEC<C> receiver = new HybridReducerReceiverEC<C>(threadsPerNode, finner, active, 625 pairChannel, theList, pairlist); 626 receiver.start(); 627 628 Pair<C> pair; 629 //boolean set = false; 630 boolean goon = true; 631 //int polIndex = -1; 632 int red = 0; 633 int sleeps = 0; 634 635 // while more requests 636 while (goon) { 637 // receive request if thread is reported incactive 638 logger.debug("receive request"); 639 Object req = null; 640 try { 641 req = pairChannel.receive(pairTag); 642 } catch (InterruptedException e) { 643 goon = false; 644 e.printStackTrace(); 645 } catch (IOException e) { 646 goon = false; 647 e.printStackTrace(); 648 } catch (ClassNotFoundException e) { 649 goon = false; 650 e.printStackTrace(); 651 } 652 logger.info("received request, req = " + req); 653 if (req == null) { 654 goon = false; 655 break; 656 } 657 if (!(req instanceof GBTransportMessReq)) { 658 goon = false; 659 break; 660 } 661 662 // find pair and manage termination status 663 logger.info("find pair"); 664 while (!pairlist.hasNext()) { // wait 665 if (!finner.hasJobs() && !pairlist.hasNext()) { 666 goon = false; 667 break; 668 } 669 try { 670 sleeps++; 671 //if (sleeps % 10 == 0) { 672 logger.info("waiting for reducers, remaining = " + finner.getJobs()); 673 //} 674 Thread.sleep(100); 675 } catch (InterruptedException e) { 676 goon = false; 677 break; 678 } 679 } 680 if (!pairlist.hasNext() && !finner.hasJobs()) { 681 logger.info("termination detection: no pairs and no jobs left"); 682 goon = false; 683 break; //continue; //break? 684 } 685 finner.notIdle(); // before pairlist get!! 686 pair = pairlist.removeNext(); 687 // send pair to client, even if null 688 if (debug) { 689 logger.info("active count = " + active.get()); 690 logger.info("send pair = " + pair); 691 } 692 GBTransportMess msg = null; 693 if (pair != null) { 694 msg = new GBTransportMessPairIndex(pair); //,pairlist.size()-1); // size-1 695 } else { 696 msg = new GBTransportMess(); // not End(); at this time 697 // goon ?= false; 698 } 699 try { 700 red++; 701 pairChannel.send(pairTag, msg); 702 int a = active.getAndIncrement(); 703 } catch (IOException e) { 704 e.printStackTrace(); 705 goon = false; 706 break; 707 } 708 //logger.debug("#distributed list = " + theList.size()); 709 } 710 logger.info("terminated, send " + red + " reduction pairs"); 711 712 /* 713 * send end mark to clients 714 */ 715 logger.debug("send end"); 716 try { 717 for (int i = 0; i < threadsPerNode; i++) { // -1 718 //do not wait: Object rq = pairChannel.receive(pairTag); 719 pairChannel.send(pairTag, new GBTransportMessEnd()); 720 } 721 // send also end to receiver 722 pairChannel.send(resultTag, new GBTransportMessEnd()); 723 //beware of race condition 724 } catch (IOException e) { 725 if (logger.isDebugEnabled()) { 726 e.printStackTrace(); 727 } 728 } 729 receiver.terminate(); 730 731 int d = active.get(); 732 logger.info("remaining active tasks = " + d); 733 //logger.info("terminated, send " + red + " reduction pairs"); 734 pairChannel.close(); 735 logger.info("redServ pairChannel.close()"); 736 finner.release(); 737 738 channel.close(); 739 logger.info("redServ channel.close()"); 740 } 741} 742 743 744/** 745 * Distributed server receiving worker thread. 746 * @param <C> coefficient type 747 */ 748 749class HybridReducerReceiverEC<C extends RingElem<C>> extends Thread { 750 751 752 public static final Logger logger = Logger.getLogger(HybridReducerReceiverEC.class); 753 754 755 public final boolean debug = logger.isDebugEnabled(); 756 757 758 private final DistHashTable<Integer, GenPolynomial<C>> theList; 759 760 761 private final PairList<C> pairlist; 762 763 764 private final TaggedSocketChannel pairChannel; 765 766 767 private final Terminator finner; 768 769 770 private final int threadsPerNode; 771 772 773 private final AtomicInteger active; 774 775 776 private volatile boolean goon; 777 778 779 /** 780 * Message tag for pairs. 781 */ 782 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 783 784 785 /** 786 * Message tag for results. 787 */ 788 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 789 790 791 /** 792 * Message tag for acknowledgments. 793 */ 794 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 795 796 797 /** 798 * Constructor. 799 * @param tpn number of threads per node 800 * @param fin terminator 801 * @param a active remote tasks count 802 * @param pc tagged socket channel 803 * @param dl distributed hash table 804 * @param L ordered pair list 805 */ 806 HybridReducerReceiverEC(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 807 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 808 active = a; 809 threadsPerNode = tpn; 810 finner = fin; 811 pairChannel = pc; 812 theList = dl; 813 pairlist = L; 814 goon = true; 815 //logger.info("reducer server created " + this); 816 } 817 818 819 /** 820 * Work loop. 821 * @see java.lang.Thread#run() 822 */ 823 @Override 824 public void run() { 825 //Pair<C> pair = null; 826 GenPolynomial<C> H = null; 827 int red = 0; 828 int polIndex = -1; 829 //Integer senderId; // obsolete 830 831 // while more requests 832 while (goon) { 833 // receive request 834 logger.debug("receive result"); 835 //senderId = null; 836 Object rh = null; 837 try { 838 rh = pairChannel.receive(resultTag); 839 int i = active.getAndDecrement(); 840 } catch (InterruptedException e) { 841 goon = false; 842 //e.printStackTrace(); 843 //?? finner.initIdle(1); 844 break; 845 } catch (IOException e) { 846 e.printStackTrace(); 847 goon = false; 848 finner.initIdle(1); 849 break; 850 } catch (ClassNotFoundException e) { 851 e.printStackTrace(); 852 goon = false; 853 finner.initIdle(1); 854 break; 855 } 856 logger.info("received H polynomial"); 857 if (rh == null) { 858 if (this.isInterrupted()) { 859 goon = false; 860 finner.initIdle(1); 861 break; 862 } 863 //finner.initIdle(1); 864 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 865 logger.info("received GBTransportMessEnd"); 866 goon = false; 867 //?? finner.initIdle(1); 868 break; 869 } else if (rh instanceof GBTransportMessPoly) { 870 // update pair list 871 red++; 872 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 873 H = mpi.pol; 874 //senderId = mpi.threadId; 875 if (H != null) { 876 if (debug) { 877 logger.info("H = " + H.leadingExpVector()); 878 } 879 if (!H.isZERO()) { 880 if (H.isONE()) { 881 logger.info("H = one"); 882 // finner.allIdle(); 883 polIndex = pairlist.putOne(); 884 GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H); 885 if (nn != null) { 886 logger.info("double polynomials nn = " + nn + ", H = " + H); 887 } 888 //goon = false; must wait for other clients 889 //finner.initIdle(1); 890 //break; 891 } else { 892 polIndex = pairlist.put(H); 893 // use putWait ? but still not all distributed 894 GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H); 895 if (nn != null) { 896 logger.info("double polynomials nn = " + nn + ", H = " + H); 897 } 898 } 899 } 900 } 901 } 902 // only after recording in pairlist ! 903 finner.initIdle(1); 904 try { 905 pairChannel.send(ackTag, new GBTransportMess()); 906 logger.debug("send acknowledgement"); 907 } catch (IOException e) { 908 e.printStackTrace(); 909 goon = false; 910 break; 911 } 912 } // end while 913 goon = false; 914 logger.info("terminated, received " + red + " reductions"); 915 } 916 917 918 /** 919 * Terminate. 920 */ 921 public void terminate() { 922 goon = false; 923 //this.interrupt(); 924 try { 925 this.join(); 926 } catch (InterruptedException e) { 927 // unfug Thread.currentThread().interrupt(); 928 } 929 logger.debug("HybridReducerReceiver terminated"); 930 } 931 932} 933 934 935/** 936 * Distributed clients reducing worker threads. 937 */ 938 939class HybridReducerClientEC<C extends RingElem<C>> implements Runnable { 940 941 942 private static final Logger logger = Logger.getLogger(HybridReducerClientEC.class); 943 944 945 public final boolean debug = logger.isDebugEnabled(); 946 947 948 private final TaggedSocketChannel pairChannel; 949 950 951 private final DistHashTable<Integer, GenPolynomial<C>> theList; 952 953 954 private final ReductionPar<C> red; 955 956 957 private final int threadsPerNode; 958 959 960 /* 961 * Identification number for this thread. 962 */ 963 //public final Integer threadId; // obsolete 964 965 966 /** 967 * Message tag for pairs. 968 */ 969 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 970 971 972 /** 973 * Message tag for results. 974 */ 975 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 976 977 978 /** 979 * Message tag for acknowledgments. 980 */ 981 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 982 983 984 /** 985 * Constructor. 986 * @param tpn number of threads per node 987 * @param tc tagged socket channel 988 * @param tid thread identification 989 * @param dl distributed hash table 990 */ 991 HybridReducerClientEC(int tpn, TaggedSocketChannel tc, Integer tid, 992 DistHashTable<Integer, GenPolynomial<C>> dl) { 993 this.threadsPerNode = tpn; 994 pairChannel = tc; 995 //threadId = 100 + tid; // keep distinct from other tags 996 theList = dl; 997 red = new ReductionPar<C>(); 998 } 999 1000 1001 /** 1002 * Work loop. 1003 * @see java.lang.Runnable#run() 1004 */ 1005 //JAVA6only: @Override 1006 public void run() { 1007 if (debug) { 1008 logger.info("pairChannel = " + pairChannel + " reducer client running"); 1009 } 1010 Pair<C> pair = null; 1011 GenPolynomial<C> pi; 1012 GenPolynomial<C> pj; 1013 GenPolynomial<C> sp = null; 1014 GenPolynomial<C> S; 1015 GenPolynomial<C> H = null; 1016 //boolean set = false; 1017 boolean goon = true; 1018 boolean doEnd = true; 1019 int reduction = 0; 1020 //int sleeps = 0; 1021 Integer pix; 1022 Integer pjx; 1023 1024 while (goon) { 1025 /* protocol: 1026 * request pair, process pair, send result, receive acknowledgment 1027 */ 1028 // pair = (Pair) pairlist.removeNext(); 1029 Object req = new GBTransportMessReq(); 1030 logger.info("send request = " + req); 1031 try { 1032 pairChannel.send(pairTag, req); 1033 } catch (IOException e) { 1034 goon = false; 1035 if (logger.isDebugEnabled()) { 1036 e.printStackTrace(); 1037 } 1038 logger.info("receive pair, exception "); 1039 break; 1040 } 1041 logger.debug("receive pair, goon = " + goon); 1042 doEnd = true; 1043 Object pp = null; 1044 try { 1045 pp = pairChannel.receive(pairTag); 1046 } catch (InterruptedException e) { 1047 goon = false; 1048 e.printStackTrace(); 1049 } catch (IOException e) { 1050 goon = false; 1051 if (logger.isDebugEnabled()) { 1052 e.printStackTrace(); 1053 } 1054 break; 1055 } catch (ClassNotFoundException e) { 1056 goon = false; 1057 e.printStackTrace(); 1058 } 1059 if (debug) { 1060 logger.info("received pair = " + pp); 1061 } 1062 H = null; 1063 if (pp == null) { // should not happen 1064 continue; 1065 } 1066 if (pp instanceof GBTransportMessEnd) { 1067 goon = false; 1068 //doEnd = false; 1069 continue; 1070 } 1071 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1072 pi = pj = null; 1073 if (pp instanceof GBTransportMessPair) { // obsolet, for tests 1074 GBTransportMessPair<C> tmp = (GBTransportMessPair<C>) pp; 1075 pair = (Pair<C>) tmp.pair; 1076 if (pair != null) { 1077 pi = pair.pi; 1078 pj = pair.pj; 1079 //logger.debug("pair: pix = " + pair.i 1080 // + ", pjx = " + pair.j); 1081 } 1082 } 1083 if (pp instanceof GBTransportMessPairIndex) { 1084 GBTransportMessPairIndex tmpi = (GBTransportMessPairIndex)pp; 1085 pix = tmpi.i; 1086 pjx = tmpi.j; 1087 //Integer sx = tmpi.s; // bug:-1; // last polynomial 1088 //sp = theList.getWait(sx); 1089 pi = theList.getWait(pix); 1090 pj = theList.getWait(pjx); 1091 //logger.info("pix = " + pix + ", pjx = " + pjx + ", sx = " + sx); 1092 } 1093 if (pi != null && pj != null) { 1094 S = red.SPolynomial(pi, pj); 1095 logger.info("ht(S) = " + S.leadingExpVector()); 1096 if (S.isZERO()) { 1097 // pair.setZero(); does not work in dist 1098 } else { 1099 if (logger.isDebugEnabled()) { 1100 logger.debug("ht(S) = " + S.leadingExpVector()); 1101 } 1102 H = red.normalform(theList, S); 1103 logger.info("ht(H) = " + H.leadingExpVector()); 1104 reduction++; 1105 if (H.isZERO()) { 1106 // pair.setZero(); does not work in dist 1107 } else { 1108 H = H.monic(); 1109 if (logger.isInfoEnabled()) { 1110 logger.info("ht(H) = " + H.leadingExpVector()); 1111 } 1112 } 1113 } 1114 } else { 1115 logger.info("pi = " + pi + ", pj = " + pj + ", sp = " + sp); 1116 } 1117 } 1118 if (pp instanceof GBTransportMess) { 1119 logger.debug("null pair results in null H poly"); 1120 } 1121 1122 // send H or must send null, if not at end 1123 if (logger.isDebugEnabled()) { 1124 logger.debug("#distributed list = " + theList.size()); 1125 logger.debug("send H polynomial = " + H); 1126 } 1127 try { 1128 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1129 doEnd = false; 1130 } catch (IOException e) { 1131 goon = false; 1132 e.printStackTrace(); 1133 } 1134 logger.info("done send poly message of " + pp); 1135 try { 1136 //pp = pairChannel.receive(threadId); 1137 pp = pairChannel.receive(ackTag); 1138 } catch (InterruptedException e) { 1139 goon = false; 1140 e.printStackTrace(); 1141 } catch (IOException e) { 1142 goon = false; 1143 if (logger.isDebugEnabled()) { 1144 e.printStackTrace(); 1145 } 1146 break; 1147 } catch (ClassNotFoundException e) { 1148 goon = false; 1149 e.printStackTrace(); 1150 } 1151 if (!(pp instanceof GBTransportMess)) { 1152 logger.error("invalid acknowledgement " + pp); 1153 } 1154 logger.info("received acknowledgment " + pp); 1155 } 1156 logger.info("terminated, done " + reduction + " reductions"); 1157 if (doEnd) { 1158 try { 1159 pairChannel.send(resultTag, new GBTransportMessEnd()); 1160 } catch (IOException e) { 1161 //e.printStackTrace(); 1162 } 1163 logger.info("terminated, send done"); 1164 } 1165 } 1166} 1167 1168 1169/** 1170 * Objects of this class are to be send to a ExecutableServer. 1171 */ 1172 1173class GBHybridExerClient<C extends RingElem<C>> implements RemoteExecutable { 1174 1175 1176 String host; 1177 1178 1179 int port; 1180 1181 1182 int dhtport; 1183 1184 1185 int threadsPerNode; 1186 1187 1188 /** 1189 * GBHybridExerClient. 1190 * @param host 1191 * @param port 1192 * @param dhtport 1193 */ 1194 public GBHybridExerClient(String host, int threadsPerNode, int port, int dhtport) { 1195 this.host = host; 1196 this.threadsPerNode = threadsPerNode; 1197 this.port = port; 1198 this.dhtport = dhtport; 1199 } 1200 1201 1202 /** 1203 * run. 1204 */ 1205 public void run() { 1206 try { 1207 GroebnerBaseDistributedHybridEC. <C>clientPart(host,threadsPerNode,port,dhtport); 1208 } catch (Exception e) { 1209 e.printStackTrace(); 1210 } 1211 } 1212 1213 1214 /** 1215 * String representation. 1216 */ 1217 @Override 1218 public String toString() { 1219 StringBuffer s = new StringBuffer("GBHybridExerClient("); 1220 s.append("host="+host); 1221 s.append(", threadsPerNode="+threadsPerNode); 1222 s.append(", port="+port); 1223 s.append(", dhtport="+dhtport); 1224 s.append(")"); 1225 return s.toString(); 1226 } 1227 1228}