001 /* 002 * $Id: GroebnerBaseDistributedHybrid.java 3626 2011-05-08 09:51:57Z kredel $ 003 */ 004 005 package edu.jas.gb; 006 007 008 import java.io.IOException; 009 import java.util.ArrayList; 010 import java.util.List; 011 import java.util.ListIterator; 012 import java.util.Collections; 013 import java.util.concurrent.atomic.AtomicInteger; 014 015 import org.apache.log4j.Logger; 016 017 import edu.jas.poly.ExpVector; 018 import edu.jas.poly.GenPolynomial; 019 import edu.jas.structure.RingElem; 020 import edu.jas.util.ChannelFactory; 021 import edu.jas.util.DistHashTable; 022 import edu.jas.util.DistHashTableServer; 023 import edu.jas.util.SocketChannel; 024 import edu.jas.util.TaggedSocketChannel; 025 import edu.jas.util.Terminator; 026 import edu.jas.util.ThreadPool; 027 028 029 /** 030 * Groebner Base distributed hybrid algorithm. Implements a 031 * distributed memory with multi-core CPUs parallel version of 032 * Groebner bases. Using pairlist class, distributed multi-threaded 033 * tasks do reduction, one communication channel per remote node. 034 * @param <C> coefficient type 035 * @author Heinz Kredel 036 */ 037 038 public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 039 040 041 public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class); 042 043 044 public final boolean debug = logger.isDebugEnabled(); 045 046 047 /** 048 * Number of threads to use. 049 */ 050 protected final int threads; 051 052 053 /** 054 * Default number of threads. 055 */ 056 protected static final int DEFAULT_THREADS = 2; 057 058 059 /** 060 * Number of threads per node to use. 061 */ 062 protected final int threadsPerNode; 063 064 065 /** 066 * Default number of threads per compute node. 067 */ 068 protected static final int DEFAULT_THREADS_PER_NODE = 1; 069 070 071 /** 072 * Pool of threads to use. 073 */ 074 //protected final ExecutorService pool; // not for single node tests 075 protected final ThreadPool pool; 076 077 078 /** 079 * Default server port. 080 */ 081 protected static final int DEFAULT_PORT = 4711; 082 083 084 /** 085 * Server port to use. 086 */ 087 protected final int port; 088 089 090 /** 091 * Message tag for pairs. 092 */ 093 public static final Integer pairTag = new Integer(1); 094 095 096 /** 097 * Message tag for results. 098 */ 099 public static final Integer resultTag = new Integer(2); 100 101 102 /** 103 * Message tag for acknowledgments. 104 */ 105 public static final Integer ackTag = new Integer(3); 106 107 108 /** 109 * Constructor. 110 */ 111 public GroebnerBaseDistributedHybrid() { 112 this(DEFAULT_THREADS, DEFAULT_PORT); 113 } 114 115 116 /** 117 * Constructor. 118 * @param threads number of threads to use. 119 */ 120 public GroebnerBaseDistributedHybrid(int threads) { 121 this(threads, new ThreadPool(threads), DEFAULT_PORT); 122 } 123 124 125 /** 126 * Constructor. 127 * @param threads number of threads to use. 128 * @param port server port to use. 129 */ 130 public GroebnerBaseDistributedHybrid(int threads, int port) { 131 this(threads, new ThreadPool(threads), port); 132 } 133 134 135 /** 136 * Constructor. 137 * @param threads number of threads to use. 138 * @param threadsPerNode threads per node to use. 139 * @param port server port to use. 140 */ 141 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) { 142 this(threads, threadsPerNode, new ThreadPool(threads), port); 143 } 144 145 146 /** 147 * Constructor. 148 * @param threads number of threads to use. 149 * @param pool ThreadPool to use. 150 * @param port server port to use. 151 */ 152 public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) { 153 this(threads, DEFAULT_THREADS_PER_NODE, pool, port); 154 } 155 156 157 /** 158 * Constructor. 159 * @param threads number of threads to use. 160 * @param threadsPerNode threads per node to use. 161 * @param pl pair selection strategy 162 * @param port server port to use. 163 */ 164 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) { 165 this(threads, threadsPerNode, new ThreadPool(threads), pl, port); 166 } 167 168 169 /** 170 * Constructor. 171 * @param threads number of threads to use. 172 * @param threadsPerNode threads per node to use. 173 * @param port server port to use. 174 */ 175 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) { 176 this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port); 177 } 178 179 180 /** 181 * Constructor. 182 * @param threads number of threads to use. 183 * @param threadsPerNode threads per node to use. 184 * @param pool ThreadPool to use. 185 * @param pl pair selection strategy 186 * @param port server port to use. 187 */ 188 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl, int port) { 189 super( new ReductionPar<C>(), pl ); 190 if (threads < 1) { 191 threads = 1; 192 } 193 this.threads = threads; 194 this.threadsPerNode = threadsPerNode; 195 this.pool = pool; 196 this.port = port; 197 //logger.info("generated pool: " + pool); 198 } 199 200 201 /** 202 * Cleanup and terminate. 203 */ 204 public void terminate() { 205 if (pool == null) { 206 return; 207 } 208 pool.terminate(); 209 } 210 211 212 /** 213 * Distributed hybrid Groebner base. 214 * @param modv number of module variables. 215 * @param F polynomial list. 216 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 217 */ 218 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 219 long t = System.currentTimeMillis(); 220 final int DL_PORT = port + 100; 221 ChannelFactory cf = new ChannelFactory(port); 222 cf.init(); 223 DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT); 224 dls.init(); 225 logger.debug("dist-list server running"); 226 227 GenPolynomial<C> p; 228 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 229 PairList<C> pairlist = null; 230 boolean oneInGB = false; 231 int l = F.size(); 232 int unused; 233 ListIterator<GenPolynomial<C>> it = F.listIterator(); 234 while (it.hasNext()) { 235 p = it.next(); 236 if (p.length() > 0) { 237 p = p.monic(); 238 if (p.isONE()) { 239 oneInGB = true; 240 G.clear(); 241 G.add(p); 242 //return G; must signal termination to others 243 } 244 if (!oneInGB) { 245 G.add(p); 246 } 247 if (pairlist == null) { 248 //pairlist = new OrderedPairlist<C>(modv, p.ring); 249 pairlist = strategy.create( modv, p.ring ); 250 if ( ! p.ring.coFac.isField() ) { 251 throw new IllegalArgumentException("coefficients not from a field"); 252 } 253 } 254 // theList not updated here 255 if (p.isONE()) { 256 unused = pairlist.putOne(); 257 } else { 258 unused = pairlist.put(p); 259 } 260 } else { 261 l--; 262 } 263 } 264 if (l <= 1) { 265 //return G; must signal termination to others 266 } 267 logger.info("pairlist " + pairlist); 268 269 logger.debug("looking for clients"); 270 //long t = System.currentTimeMillis(); 271 // now in DL, uses resend for late clients 272 //while ( dls.size() < threads ) { sleep(); } 273 274 DistHashTable<Integer, GenPolynomial<C>> theList 275 = new DistHashTable<Integer, GenPolynomial<C>>("localhost", DL_PORT); 276 List<GenPolynomial<C>> al = pairlist.getList(); 277 for (int i = 0; i < al.size(); i++) { 278 // no wait required 279 GenPolynomial<C> nn = theList.put(new Integer(i), al.get(i)); 280 if (nn != null) { 281 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 282 } 283 } 284 285 Terminator finner = new Terminator(threads*threadsPerNode); 286 HybridReducerServer<C> R; 287 logger.info("using pool = " + pool); 288 for (int i = 0; i < threads; i++) { 289 R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist); 290 pool.addJob(R); 291 //logger.info("server submitted " + R); 292 } 293 logger.info("main loop waiting " + finner); 294 finner.waitDone(); 295 int ps = theList.size(); 296 logger.info("#distributed list = " + ps); 297 // make sure all polynomials arrived: not needed in master 298 // G = (ArrayList)theList.values(); 299 G = pairlist.getList(); 300 if (ps != G.size()) { 301 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 302 } 303 for (GenPolynomial<C> q: theList.getValueList()) { 304 if ( q != null && !q.isZERO() ) { 305 logger.debug("final q = " + q.leadingExpVector()); 306 } 307 } 308 logger.debug("distributed list end"); 309 long time = System.currentTimeMillis(); 310 List<GenPolynomial<C>> Gp; 311 Gp = minimalGB(G); // not jet distributed but threaded 312 time = System.currentTimeMillis() - time; 313 logger.info("parallel gbmi time = " + time); 314 G = Gp; 315 logger.debug("server cf.terminate()"); 316 cf.terminate(); 317 // no more required // 318 logger.info("server not pool.terminate() " + pool); 319 //pool.terminate(); 320 logger.info("server theList.terminate() " + theList.size()); 321 theList.terminate(); 322 logger.info("server dls.terminate() " + dls); 323 dls.terminate(); 324 t = System.currentTimeMillis() - t; 325 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 326 return G; 327 } 328 329 330 /** 331 * GB distributed client. 332 * @param host the server runs on. 333 * @throws IOException 334 */ 335 public void clientPart(String host) throws IOException { 336 337 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost 338 cf.init(); 339 SocketChannel channel = cf.getChannel(host, port); 340 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel); 341 pairChannel.init(); 342 343 if (debug) { 344 logger.info("clientPart pairChannel = " + pairChannel); 345 } 346 347 final int DL_PORT = port + 100; 348 DistHashTable<Integer, GenPolynomial<C>> theList 349 = new DistHashTable<Integer, GenPolynomial<C>>(host, DL_PORT); 350 351 //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList); 352 //R.run(); 353 354 ThreadPool pool = new ThreadPool(threadsPerNode); 355 logger.info("client using pool = " +pool); 356 for (int i = 0; i < threadsPerNode; i++) { 357 HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList); 358 pool.addJob(Rr); 359 } 360 if (debug) { 361 logger.info("clients submitted"); 362 } 363 pool.terminate(); 364 logger.info("client pool.terminate()"); 365 366 pairChannel.close(); 367 logger.info("client pairChannel.close()"); 368 369 theList.terminate(); 370 cf.terminate(); 371 logger.info("client cf.terminate()"); 372 373 channel.close(); 374 logger.info("client channel.close()"); 375 return; 376 } 377 378 379 /** 380 * Minimal ordered groebner basis. 381 * @param Fp a Groebner base. 382 * @return a reduced Groebner base of Fp. 383 */ 384 @Override 385 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 386 GenPolynomial<C> a; 387 ArrayList<GenPolynomial<C>> G; 388 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 389 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 390 while (it.hasNext()) { 391 a = it.next(); 392 if (a.length() != 0) { // always true 393 // already monic a = a.monic(); 394 G.add(a); 395 } 396 } 397 if (G.size() <= 1) { 398 return G; 399 } 400 401 ExpVector e; 402 ExpVector f; 403 GenPolynomial<C> p; 404 ArrayList<GenPolynomial<C>> F; 405 F = new ArrayList<GenPolynomial<C>>(G.size()); 406 boolean mt; 407 408 while (G.size() > 0) { 409 a = G.remove(0); 410 e = a.leadingExpVector(); 411 412 it = G.listIterator(); 413 mt = false; 414 while (it.hasNext() && !mt) { 415 p = it.next(); 416 f = p.leadingExpVector(); 417 mt = e.multipleOf(f); 418 } 419 it = F.listIterator(); 420 while (it.hasNext() && !mt) { 421 p = it.next(); 422 f = p.leadingExpVector(); 423 mt = e.multipleOf(f); 424 } 425 if (!mt) { 426 F.add(a); 427 } else { 428 // System.out.println("dropped " + a.length()); 429 } 430 } 431 G = F; 432 if (G.size() <= 1) { 433 return G; 434 } 435 Collections.reverse(G); // important for lex GB 436 437 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()]; 438 int i = 0; 439 F = new ArrayList<GenPolynomial<C>>(G.size()); 440 while (G.size() > 0) { 441 a = G.remove(0); 442 // System.out.println("doing " + a.length()); 443 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size()+F.size()); 444 R.addAll(G); 445 R.addAll(F); 446 mirs[i] = new MiReducerServer<C>(R, a); 447 pool.addJob(mirs[i]); 448 i++; 449 F.add(a); 450 } 451 G = F; 452 F = new ArrayList<GenPolynomial<C>>(G.size()); 453 for (i = 0; i < mirs.length; i++) { 454 a = mirs[i].getNF(); 455 F.add(a); 456 } 457 return F; 458 } 459 460 } 461 462 463 /** 464 * Distributed server reducing worker proxy threads. 465 * @param <C> coefficient type 466 */ 467 468 class HybridReducerServer<C extends RingElem<C>> implements Runnable { 469 470 471 public static final Logger logger = Logger.getLogger(HybridReducerServer.class); 472 473 474 public final boolean debug = logger.isDebugEnabled(); 475 476 477 private final Terminator finner; 478 479 480 private final ChannelFactory cf; 481 482 483 private TaggedSocketChannel pairChannel; 484 485 486 private final DistHashTable<Integer, GenPolynomial<C>> theList; 487 488 489 private final PairList<C> pairlist; 490 491 492 private final int threadsPerNode; 493 494 495 /** 496 * Message tag for pairs. 497 */ 498 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 499 500 501 /** 502 * Message tag for results. 503 */ 504 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 505 506 507 /** 508 * Message tag for acknowledgments. 509 */ 510 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 511 512 513 /** 514 * Constructor. 515 * @param tpn number of threads per node 516 * @param fin terminator 517 * @param cf channel factory 518 * @param dl distributed hash table 519 * @param L ordered pair list 520 */ 521 HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf, 522 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 523 threadsPerNode = tpn; 524 finner = fin; 525 this.cf = cf; 526 theList = dl; 527 pairlist = L; 528 //logger.info("reducer server created " + this); 529 } 530 531 532 /** 533 * Work loop. 534 * @see java.lang.Runnable#run() 535 */ 536 //JAVA6only: @Override 537 public void run() { 538 logger.info("reducer server running with " + cf); 539 SocketChannel channel = null; 540 try { 541 channel = cf.getChannel(); 542 pairChannel = new TaggedSocketChannel(channel); 543 pairChannel.init(); 544 } catch (InterruptedException e) { 545 logger.debug("get pair channel interrupted"); 546 e.printStackTrace(); 547 return; 548 } 549 if (debug) { 550 logger.info("pairChannel = " + pairChannel); 551 } 552 // record idle remote workers (minus one?) 553 //finner.beIdle(threadsPerNode-1); 554 finner.initIdle(threadsPerNode); 555 AtomicInteger active = new AtomicInteger(0); 556 557 // start receiver 558 HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active, pairChannel, theList, pairlist); 559 receiver.start(); 560 561 Pair<C> pair; 562 boolean set = false; 563 boolean goon = true; 564 int polIndex = -1; 565 int red = 0; 566 int sleeps = 0; 567 568 // while more requests 569 while (goon) { 570 // receive request if thread is reported incactive 571 logger.debug("receive request"); 572 Object req = null; 573 try { 574 req = pairChannel.receive(pairTag); 575 } catch (InterruptedException e) { 576 goon = false; 577 e.printStackTrace(); 578 } catch (IOException e) { 579 goon = false; 580 e.printStackTrace(); 581 } catch (ClassNotFoundException e) { 582 goon = false; 583 e.printStackTrace(); 584 } 585 logger.info("received request, req = " + req); 586 if (req == null) { 587 goon = false; 588 break; 589 } 590 if (!(req instanceof GBTransportMessReq)) { 591 goon = false; 592 break; 593 } 594 595 // find pair and manage termination status 596 logger.info("find pair"); 597 while (!pairlist.hasNext()) { // wait 598 if (!finner.hasJobs() && !pairlist.hasNext()) { 599 goon = false; 600 break; 601 } 602 try { 603 sleeps++; 604 //if (sleeps % 10 == 0) { 605 logger.info("waiting for reducers, remaining = " + finner.getJobs()); 606 //} 607 Thread.sleep(100); 608 } catch (InterruptedException e) { 609 goon = false; 610 break; 611 } 612 } 613 if (!pairlist.hasNext() && !finner.hasJobs()) { 614 logger.info("termination detection: no pairs and no jobs left"); 615 goon = false; 616 break; //continue; //break? 617 } 618 finner.notIdle(); // before pairlist get!! 619 pair = pairlist.removeNext(); 620 // send pair to client, even if null 621 if ( debug ) { 622 logger.info("active count = " + active.get()); 623 logger.info("send pair = " + pair); 624 } 625 GBTransportMess msg = null; 626 if (pair != null) { 627 msg = new GBTransportMessPairIndex(pair); 628 } else { 629 msg = new GBTransportMess(); //not End(); at this time 630 // goon ?= false; 631 } 632 try { 633 red++; 634 pairChannel.send(pairTag, msg); 635 int a = active.getAndIncrement(); 636 } catch (IOException e) { 637 e.printStackTrace(); 638 goon = false; 639 break; 640 } 641 //logger.debug("#distributed list = " + theList.size()); 642 } 643 logger.info("terminated, send " + red + " reduction pairs"); 644 645 /* 646 * send end mark to clients 647 */ 648 logger.debug("send end"); 649 try { 650 for ( int i = 0; i < threadsPerNode; i++ ) { // -1 651 //do not wait: Object rq = pairChannel.receive(pairTag); 652 pairChannel.send(pairTag, new GBTransportMessEnd()); 653 } 654 // send also end to receiver 655 pairChannel.send(resultTag, new GBTransportMessEnd()); 656 //beware of race condition 657 } catch (IOException e) { 658 if (logger.isDebugEnabled()) { 659 e.printStackTrace(); 660 } 661 } 662 receiver.terminate(); 663 664 int d = active.get(); 665 logger.info("remaining active tasks = " + d); 666 //logger.info("terminated, send " + red + " reduction pairs"); 667 pairChannel.close(); 668 logger.info("redServ pairChannel.close()"); 669 finner.release(); 670 671 channel.close(); 672 logger.info("redServ channel.close()"); 673 } 674 } 675 676 677 /** 678 * Distributed server receiving worker thread. 679 * @param <C> coefficient type 680 */ 681 682 class HybridReducerReceiver<C extends RingElem<C>> extends Thread { 683 684 685 public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class); 686 687 688 public final boolean debug = logger.isDebugEnabled(); 689 690 691 private final DistHashTable<Integer, GenPolynomial<C>> theList; 692 693 694 private final PairList<C> pairlist; 695 696 697 private final TaggedSocketChannel pairChannel; 698 699 700 private final Terminator finner; 701 702 703 private final int threadsPerNode; 704 705 706 private final AtomicInteger active; 707 708 709 private volatile boolean goon; 710 711 712 /** 713 * Message tag for pairs. 714 */ 715 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 716 717 718 /** 719 * Message tag for results. 720 */ 721 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 722 723 724 /** 725 * Message tag for acknowledgments. 726 */ 727 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 728 729 730 /** 731 * Constructor. 732 * @param tpn number of threads per node 733 * @param fin terminator 734 * @param a active remote tasks count 735 * @param pc tagged socket channel 736 * @param dl distributed hash table 737 * @param L ordered pair list 738 */ 739 HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 740 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 741 active = a; 742 threadsPerNode = tpn; 743 finner = fin; 744 pairChannel = pc; 745 theList = dl; 746 pairlist = L; 747 goon = true; 748 //logger.info("reducer server created " + this); 749 } 750 751 752 /** 753 * Work loop. 754 * @see java.lang.Thread#run() 755 */ 756 @Override 757 public void run() { 758 //Pair<C> pair = null; 759 GenPolynomial<C> H = null; 760 int red = 0; 761 int polIndex = -1; 762 //Integer senderId; // obsolete 763 764 // while more requests 765 while (goon) { 766 // receive request 767 logger.debug("receive result"); 768 //senderId = null; 769 Object rh = null; 770 try { 771 rh = pairChannel.receive(resultTag); 772 int i = active.getAndDecrement(); 773 } catch (InterruptedException e) { 774 goon = false; 775 //e.printStackTrace(); 776 //?? finner.initIdle(1); 777 break; 778 } catch (IOException e) { 779 e.printStackTrace(); 780 goon = false; 781 finner.initIdle(1); 782 break; 783 } catch (ClassNotFoundException e) { 784 e.printStackTrace(); 785 goon = false; 786 finner.initIdle(1); 787 break; 788 } 789 logger.info("received H polynomial"); 790 if (rh == null) { 791 if (this.isInterrupted()) { 792 goon = false; 793 finner.initIdle(1); 794 break; 795 } 796 //finner.initIdle(1); 797 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 798 logger.info("received GBTransportMessEnd"); 799 goon = false; 800 //?? finner.initIdle(1); 801 break; 802 } else if (rh instanceof GBTransportMessPoly) { 803 // update pair list 804 red++; 805 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 806 H = mpi.pol; 807 //senderId = mpi.threadId; 808 if (H != null) { 809 if (debug) { 810 logger.info("H = " + H.leadingExpVector()); 811 } 812 if (!H.isZERO()) { 813 if (H.isONE()) { 814 // finner.allIdle(); 815 polIndex = pairlist.putOne(); 816 GenPolynomial<C> nn = theList.put(new Integer(polIndex), H); 817 if (nn != null) { 818 logger.info("double polynomials nn = " + nn + ", H = " + H); 819 } 820 //goon = false; must wait for other clients 821 //finner.initIdle(1); 822 //break; 823 } else { 824 polIndex = pairlist.put(H); 825 // use putWait ? but still not all distributed 826 GenPolynomial<C> nn = theList.put(new Integer(polIndex), H); 827 if (nn != null) { 828 logger.info("double polynomials nn = " + nn + ", H = " + H); 829 } 830 } 831 } 832 } 833 } 834 // only after recording in pairlist ! 835 finner.initIdle(1); 836 // if ( senderId != null ) { // send acknowledgement after recording 837 try { 838 //pairChannel.send(senderId, new GBTransportMess()); 839 pairChannel.send(ackTag, new GBTransportMess()); 840 logger.debug("send acknowledgement"); 841 } catch (IOException e) { 842 e.printStackTrace(); 843 goon = false; 844 break; 845 } 846 //} 847 } // end while 848 goon = false; 849 logger.info("terminated, received " + red + " reductions"); 850 } 851 852 853 /** 854 * Terminate. 855 */ 856 public void terminate() { 857 goon = false; 858 this.interrupt(); 859 try { 860 this.join(); 861 } catch (InterruptedException e) { 862 // unfug Thread.currentThread().interrupt(); 863 } 864 logger.debug("HybridReducerReceiver terminated"); 865 } 866 867 } 868 869 870 /** 871 * Distributed clients reducing worker threads. 872 */ 873 874 class HybridReducerClient<C extends RingElem<C>> implements Runnable { 875 876 877 private static final Logger logger = Logger.getLogger(HybridReducerClient.class); 878 879 880 public final boolean debug = logger.isDebugEnabled(); 881 882 883 private final TaggedSocketChannel pairChannel; 884 885 886 private final DistHashTable<Integer, GenPolynomial<C>> theList; 887 888 889 private final ReductionPar<C> red; 890 891 892 private final int threadsPerNode; 893 894 895 /* 896 * Identification number for this thread. 897 */ 898 //public final Integer threadId; // obsolete 899 900 901 /** 902 * Message tag for pairs. 903 */ 904 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 905 906 907 /** 908 * Message tag for results. 909 */ 910 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 911 912 913 /** 914 * Message tag for acknowledgments. 915 */ 916 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 917 918 919 /** 920 * Constructor. 921 * @param tpn number of threads per node 922 * @param tc tagged socket channel 923 * @param tid thread identification 924 * @param dl distributed hash table 925 */ 926 HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid, DistHashTable<Integer, GenPolynomial<C>> dl) { 927 this.threadsPerNode = tpn; 928 pairChannel = tc; 929 //threadId = 100 + tid; // keep distinct from other tags 930 theList = dl; 931 red = new ReductionPar<C>(); 932 } 933 934 935 /** 936 * Work loop. 937 * @see java.lang.Runnable#run() 938 */ 939 //JAVA6only: @Override 940 public void run() { 941 if (debug) { 942 logger.info("pairChannel = " + pairChannel + " reducer client running"); 943 } 944 Pair<C> pair = null; 945 GenPolynomial<C> pi; 946 GenPolynomial<C> pj; 947 GenPolynomial<C> S; 948 GenPolynomial<C> H = null; 949 //boolean set = false; 950 boolean goon = true; 951 boolean doEnd = false; 952 int reduction = 0; 953 //int sleeps = 0; 954 Integer pix; 955 Integer pjx; 956 957 while (goon) { 958 /* protocol: 959 * request pair, process pair, send result, receive acknowledgment 960 */ 961 // pair = (Pair) pairlist.removeNext(); 962 Object req = new GBTransportMessReq(); 963 logger.info("send request = " + req); 964 try { 965 pairChannel.send(pairTag, req); 966 } catch (IOException e) { 967 goon = false; 968 if ( logger.isDebugEnabled() ) { 969 e.printStackTrace(); 970 } 971 logger.info("receive pair, exception "); 972 break; 973 } 974 logger.debug("receive pair, goon = " + goon); 975 doEnd = false; 976 Object pp = null; 977 try { 978 pp = pairChannel.receive(pairTag); 979 } catch (InterruptedException e) { 980 goon = false; 981 e.printStackTrace(); 982 } catch (IOException e) { 983 goon = false; 984 if (logger.isDebugEnabled()) { 985 e.printStackTrace(); 986 } 987 break; 988 } catch (ClassNotFoundException e) { 989 goon = false; 990 e.printStackTrace(); 991 } 992 if (debug) { 993 logger.info("received pair = " + pp); 994 } 995 H = null; 996 if (pp == null) { // should not happen 997 continue; 998 } 999 if (pp instanceof GBTransportMessEnd) { 1000 goon = false; 1001 doEnd = true; 1002 continue; 1003 } 1004 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1005 pi = pj = null; 1006 if (pp instanceof GBTransportMessPair) { 1007 pair = ((GBTransportMessPair<C>) pp).pair; 1008 if (pair != null) { 1009 pi = pair.pi; 1010 pj = pair.pj; 1011 //logger.debug("pair: pix = " + pair.i 1012 // + ", pjx = " + pair.j); 1013 } 1014 } 1015 if (pp instanceof GBTransportMessPairIndex) { 1016 pix = ((GBTransportMessPairIndex) pp).i; 1017 pjx = ((GBTransportMessPairIndex) pp).j; 1018 pi = (GenPolynomial<C>) theList.getWait(pix); 1019 pj = (GenPolynomial<C>) theList.getWait(pjx); 1020 //logger.info("pix = " + pix + ", pjx = " +pjx); 1021 } 1022 1023 if (pi != null && pj != null) { 1024 S = red.SPolynomial(pi, pj); 1025 //System.out.println("S = " + S); 1026 if (S.isZERO()) { 1027 // pair.setZero(); does not work in dist 1028 } else { 1029 if (logger.isDebugEnabled()) { 1030 logger.debug("ht(S) = " + S.leadingExpVector()); 1031 } 1032 H = red.normalform(theList, S); 1033 reduction++; 1034 if (H.isZERO()) { 1035 // pair.setZero(); does not work in dist 1036 } else { 1037 H = H.monic(); 1038 if (logger.isInfoEnabled()) { 1039 logger.info("ht(H) = " + H.leadingExpVector()); 1040 } 1041 } 1042 } 1043 } 1044 } 1045 if (pp instanceof GBTransportMess) { 1046 logger.debug("null pair results in null H poly"); 1047 } 1048 1049 // send H or must send null, if not at end 1050 if (logger.isDebugEnabled()) { 1051 logger.debug("#distributed list = " + theList.size()); 1052 logger.debug("send H polynomial = " + H); 1053 } 1054 try { 1055 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1056 doEnd = true; 1057 } catch (IOException e) { 1058 goon = false; 1059 e.printStackTrace(); 1060 } 1061 logger.info("done send poly message of " + pp); 1062 try { 1063 //pp = pairChannel.receive(threadId); 1064 pp = pairChannel.receive(ackTag); 1065 } catch (InterruptedException e) { 1066 goon = false; 1067 e.printStackTrace(); 1068 } catch (IOException e) { 1069 goon = false; 1070 if (logger.isDebugEnabled()) { 1071 e.printStackTrace(); 1072 } 1073 break; 1074 } catch (ClassNotFoundException e) { 1075 goon = false; 1076 e.printStackTrace(); 1077 } 1078 if ( ! (pp instanceof GBTransportMess) ) { 1079 logger.error("invalid acknowledgement " + pp); 1080 } 1081 logger.info("received acknowledgment " + pp); 1082 } 1083 logger.info("terminated, done " + reduction + " reductions"); 1084 if ( !doEnd ) { 1085 try { 1086 pairChannel.send(resultTag, new GBTransportMessEnd()); 1087 } catch (IOException e) { 1088 //e.printStackTrace(); 1089 } 1090 logger.info("terminated, send done"); 1091 } 1092 } 1093 }