001/* 002 * $Id: GroebnerBaseDistributedHybrid.java 4261 2012-10-21 11:34:02Z kredel $ 003 */ 004 005package edu.jas.gb; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Collections; 011import java.util.List; 012import java.util.ListIterator; 013import java.util.concurrent.atomic.AtomicInteger; 014 015import org.apache.log4j.Logger; 016 017import edu.jas.poly.ExpVector; 018import edu.jas.poly.GenPolynomial; 019import edu.jas.structure.RingElem; 020import edu.jas.util.ChannelFactory; 021import edu.jas.util.DistHashTable; 022import edu.jas.util.DistHashTableServer; 023import edu.jas.util.SocketChannel; 024import edu.jas.util.TaggedSocketChannel; 025import edu.jas.util.Terminator; 026import edu.jas.util.ThreadPool; 027 028 029/** 030 * Groebner Base distributed hybrid algorithm. Implements a distributed memory 031 * with multi-core CPUs parallel version of Groebner bases. Using pairlist 032 * class, distributed multi-threaded tasks do reduction, one communication 033 * channel per remote node. 034 * @param <C> coefficient type 035 * @author Heinz Kredel 036 * @deprecated use GroebnerBaseDistributedHybridEC 037 */ 038 039public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 040 041 042 public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class); 043 044 045 public final boolean debug = logger.isDebugEnabled(); 046 047 048 /** 049 * Number of threads to use. 050 */ 051 protected final int threads; 052 053 054 /** 055 * Default number of threads. 056 */ 057 protected static final int DEFAULT_THREADS = 2; 058 059 060 /** 061 * Number of threads per node to use. 062 */ 063 protected final int threadsPerNode; 064 065 066 /** 067 * Default number of threads per compute node. 068 */ 069 protected static final int DEFAULT_THREADS_PER_NODE = 1; 070 071 072 /** 073 * Pool of threads to use. 074 */ 075 //protected final ExecutorService pool; // not for single node tests 076 protected transient final ThreadPool pool; 077 078 079 /** 080 * Default server port. 081 */ 082 protected static final int DEFAULT_PORT = 4711; 083 084 085 /** 086 * Server port to use. 087 */ 088 protected final int port; 089 090 091 /** 092 * Message tag for pairs. 093 */ 094 public static final Integer pairTag = Integer.valueOf(1); 095 096 097 /** 098 * Message tag for results. 099 */ 100 public static final Integer resultTag = Integer.valueOf(2); 101 102 103 /** 104 * Message tag for acknowledgments. 105 */ 106 public static final Integer ackTag = Integer.valueOf(3); 107 108 109 /** 110 * Constructor. 111 */ 112 public GroebnerBaseDistributedHybrid() { 113 this(DEFAULT_THREADS, DEFAULT_PORT); 114 } 115 116 117 /** 118 * Constructor. 119 * @param threads number of threads to use. 120 */ 121 public GroebnerBaseDistributedHybrid(int threads) { 122 this(threads, new ThreadPool(threads), DEFAULT_PORT); 123 } 124 125 126 /** 127 * Constructor. 128 * @param threads number of threads to use. 129 * @param port server port to use. 130 */ 131 public GroebnerBaseDistributedHybrid(int threads, int port) { 132 this(threads, new ThreadPool(threads), port); 133 } 134 135 136 /** 137 * Constructor. 138 * @param threads number of threads to use. 139 * @param threadsPerNode threads per node to use. 140 * @param port server port to use. 141 */ 142 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) { 143 this(threads, threadsPerNode, new ThreadPool(threads), port); 144 } 145 146 147 /** 148 * Constructor. 149 * @param threads number of threads to use. 150 * @param pool ThreadPool to use. 151 * @param port server port to use. 152 */ 153 public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) { 154 this(threads, DEFAULT_THREADS_PER_NODE, pool, port); 155 } 156 157 158 /** 159 * Constructor. 160 * @param threads number of threads to use. 161 * @param threadsPerNode threads per node to use. 162 * @param pl pair selection strategy 163 * @param port server port to use. 164 */ 165 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) { 166 this(threads, threadsPerNode, new ThreadPool(threads), pl, port); 167 } 168 169 170 /** 171 * Constructor. 172 * @param threads number of threads to use. 173 * @param threadsPerNode threads per node to use. 174 * @param port server port to use. 175 */ 176 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) { 177 this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port); 178 } 179 180 181 /** 182 * Constructor. 183 * @param threads number of threads to use. 184 * @param threadsPerNode threads per node to use. 185 * @param pool ThreadPool to use. 186 * @param pl pair selection strategy 187 * @param port server port to use. 188 */ 189 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl, 190 int port) { 191 super(new ReductionPar<C>(), pl); 192 if (threads < 1) { 193 threads = 1; 194 } 195 this.threads = threads; 196 this.threadsPerNode = threadsPerNode; 197 this.pool = pool; 198 this.port = port; 199 //logger.info("generated pool: " + pool); 200 } 201 202 203 /** 204 * Cleanup and terminate. 205 */ 206 @Override 207 public void terminate() { 208 if (pool == null) { 209 return; 210 } 211 pool.terminate(); 212 } 213 214 215 /** 216 * Distributed hybrid Groebner base. 217 * @param modv number of module variables. 218 * @param F polynomial list. 219 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 220 */ 221 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 222 long t = System.currentTimeMillis(); 223 final int DL_PORT = port + 100; 224 ChannelFactory cf = new ChannelFactory(port); 225 cf.init(); 226 DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT); 227 dls.init(); 228 logger.debug("dist-list server running"); 229 230 GenPolynomial<C> p; 231 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 232 PairList<C> pairlist = null; 233 boolean oneInGB = false; 234 int l = F.size(); 235 int unused; 236 ListIterator<GenPolynomial<C>> it = F.listIterator(); 237 while (it.hasNext()) { 238 p = it.next(); 239 if (p.length() > 0) { 240 p = p.monic(); 241 if (p.isONE()) { 242 oneInGB = true; 243 G.clear(); 244 G.add(p); 245 //return G; must signal termination to others 246 } 247 if (!oneInGB) { 248 G.add(p); 249 } 250 if (pairlist == null) { 251 //pairlist = new OrderedPairlist<C>(modv, p.ring); 252 pairlist = strategy.create(modv, p.ring); 253 if (!p.ring.coFac.isField()) { 254 throw new IllegalArgumentException("coefficients not from a field"); 255 } 256 } 257 // theList not updated here 258 if (p.isONE()) { 259 unused = pairlist.putOne(); 260 } else { 261 unused = pairlist.put(p); 262 } 263 } else { 264 l--; 265 } 266 } 267 //if (l <= 1) { 268 //return G; must signal termination to others 269 //} 270 logger.info("pairlist " + pairlist); 271 272 logger.debug("looking for clients"); 273 //long t = System.currentTimeMillis(); 274 // now in DL, uses resend for late clients 275 //while ( dls.size() < threads ) { sleep(); } 276 277 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>( 278 "localhost", DL_PORT); 279 theList.init(); 280 List<GenPolynomial<C>> al = pairlist.getList(); 281 for (int i = 0; i < al.size(); i++) { 282 // no wait required 283 GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i)); 284 if (nn != null) { 285 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 286 } 287 } 288 289 Terminator finner = new Terminator(threads * threadsPerNode); 290 HybridReducerServer<C> R; 291 logger.info("using pool = " + pool); 292 for (int i = 0; i < threads; i++) { 293 R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist); 294 pool.addJob(R); 295 //logger.info("server submitted " + R); 296 } 297 logger.info("main loop waiting " + finner); 298 finner.waitDone(); 299 int ps = theList.size(); 300 logger.info("#distributed list = " + ps); 301 // make sure all polynomials arrived: not needed in master 302 // G = (ArrayList)theList.values(); 303 G = pairlist.getList(); 304 if (ps != G.size()) { 305 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 306 } 307 for (GenPolynomial<C> q : theList.getValueList()) { 308 if (q != null && !q.isZERO()) { 309 logger.debug("final q = " + q.leadingExpVector()); 310 } 311 } 312 logger.debug("distributed list end"); 313 long time = System.currentTimeMillis(); 314 List<GenPolynomial<C>> Gp; 315 Gp = minimalGB(G); // not jet distributed but threaded 316 time = System.currentTimeMillis() - time; 317 logger.info("parallel gbmi time = " + time); 318 G = Gp; 319 logger.debug("server cf.terminate()"); 320 cf.terminate(); 321 // no more required // 322 logger.info("server not pool.terminate() " + pool); 323 //pool.terminate(); 324 logger.info("server theList.terminate() " + theList.size()); 325 theList.terminate(); 326 logger.info("server dls.terminate() " + dls); 327 dls.terminate(); 328 t = System.currentTimeMillis() - t; 329 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 330 return G; 331 } 332 333 334 /** 335 * GB distributed client. 336 * @param host the server runs on. 337 * @throws IOException 338 */ 339 public void clientPart(String host) throws IOException { 340 341 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost 342 cf.init(); 343 SocketChannel channel = cf.getChannel(host, port); 344 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel); 345 pairChannel.init(); 346 347 if (debug) { 348 logger.info("clientPart pairChannel = " + pairChannel); 349 } 350 351 final int DL_PORT = port + 100; 352 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host, 353 DL_PORT); 354 theList.init(); 355 356 //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList); 357 //R.run(); 358 359 ThreadPool pool = new ThreadPool(threadsPerNode); 360 logger.info("client using pool = " + pool); 361 for (int i = 0; i < threadsPerNode; i++) { 362 HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList); 363 pool.addJob(Rr); 364 } 365 if (debug) { 366 logger.info("clients submitted"); 367 } 368 pool.terminate(); 369 logger.info("client pool.terminate()"); 370 371 pairChannel.close(); 372 logger.info("client pairChannel.close()"); 373 374 theList.terminate(); 375 cf.terminate(); 376 logger.info("client cf.terminate()"); 377 378 channel.close(); 379 logger.info("client channel.close()"); 380 return; 381 } 382 383 384 /** 385 * Minimal ordered groebner basis. 386 * @param Fp a Groebner base. 387 * @return a reduced Groebner base of Fp. 388 */ 389 @Override 390 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 391 GenPolynomial<C> a; 392 ArrayList<GenPolynomial<C>> G; 393 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 394 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 395 while (it.hasNext()) { 396 a = it.next(); 397 if (a.length() != 0) { // always true 398 // already monic a = a.monic(); 399 G.add(a); 400 } 401 } 402 if (G.size() <= 1) { 403 return G; 404 } 405 406 ExpVector e; 407 ExpVector f; 408 GenPolynomial<C> p; 409 ArrayList<GenPolynomial<C>> F; 410 F = new ArrayList<GenPolynomial<C>>(G.size()); 411 boolean mt; 412 413 while (G.size() > 0) { 414 a = G.remove(0); 415 e = a.leadingExpVector(); 416 417 it = G.listIterator(); 418 mt = false; 419 while (it.hasNext() && !mt) { 420 p = it.next(); 421 f = p.leadingExpVector(); 422 mt = e.multipleOf(f); 423 } 424 it = F.listIterator(); 425 while (it.hasNext() && !mt) { 426 p = it.next(); 427 f = p.leadingExpVector(); 428 mt = e.multipleOf(f); 429 } 430 if (!mt) { 431 F.add(a); 432 } else { 433 // System.out.println("dropped " + a.length()); 434 } 435 } 436 G = F; 437 if (G.size() <= 1) { 438 return G; 439 } 440 Collections.reverse(G); // important for lex GB 441 442 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()]; 443 int i = 0; 444 F = new ArrayList<GenPolynomial<C>>(G.size()); 445 while (G.size() > 0) { 446 a = G.remove(0); 447 // System.out.println("doing " + a.length()); 448 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 449 R.addAll(G); 450 R.addAll(F); 451 mirs[i] = new MiReducerServer<C>(R, a); 452 pool.addJob(mirs[i]); 453 i++; 454 F.add(a); 455 } 456 G = F; 457 F = new ArrayList<GenPolynomial<C>>(G.size()); 458 for (i = 0; i < mirs.length; i++) { 459 a = mirs[i].getNF(); 460 F.add(a); 461 } 462 return F; 463 } 464 465} 466 467 468/** 469 * Distributed server reducing worker proxy threads. 470 * @param <C> coefficient type 471 */ 472 473class HybridReducerServer<C extends RingElem<C>> implements Runnable { 474 475 476 public static final Logger logger = Logger.getLogger(HybridReducerServer.class); 477 478 479 public final boolean debug = logger.isDebugEnabled(); 480 481 482 private final Terminator finner; 483 484 485 private final ChannelFactory cf; 486 487 488 private TaggedSocketChannel pairChannel; 489 490 491 private final DistHashTable<Integer, GenPolynomial<C>> theList; 492 493 494 private final PairList<C> pairlist; 495 496 497 private final int threadsPerNode; 498 499 500 /** 501 * Message tag for pairs. 502 */ 503 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 504 505 506 /** 507 * Message tag for results. 508 */ 509 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 510 511 512 /** 513 * Message tag for acknowledgments. 514 */ 515 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 516 517 518 /** 519 * Constructor. 520 * @param tpn number of threads per node 521 * @param fin terminator 522 * @param cf channel factory 523 * @param dl distributed hash table 524 * @param L ordered pair list 525 */ 526 HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf, 527 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 528 threadsPerNode = tpn; 529 finner = fin; 530 this.cf = cf; 531 theList = dl; 532 pairlist = L; 533 //logger.info("reducer server created " + this); 534 } 535 536 537 /** 538 * Work loop. 539 * @see java.lang.Runnable#run() 540 */ 541 //JAVA6only: @Override 542 public void run() { 543 logger.info("reducer server running with " + cf); 544 SocketChannel channel = null; 545 try { 546 channel = cf.getChannel(); 547 pairChannel = new TaggedSocketChannel(channel); 548 pairChannel.init(); 549 } catch (InterruptedException e) { 550 logger.debug("get pair channel interrupted"); 551 e.printStackTrace(); 552 return; 553 } 554 if (debug) { 555 logger.info("pairChannel = " + pairChannel); 556 } 557 // record idle remote workers (minus one?) 558 //finner.beIdle(threadsPerNode-1); 559 finner.initIdle(threadsPerNode); 560 AtomicInteger active = new AtomicInteger(0); 561 562 // start receiver 563 HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active, 564 pairChannel, theList, pairlist); 565 receiver.start(); 566 567 Pair<C> pair; 568 //boolean set = false; 569 boolean goon = true; 570 //int polIndex = -1; 571 int red = 0; 572 int sleeps = 0; 573 574 // while more requests 575 while (goon) { 576 // receive request if thread is reported incactive 577 logger.debug("receive request"); 578 Object req = null; 579 try { 580 req = pairChannel.receive(pairTag); 581 } catch (InterruptedException e) { 582 goon = false; 583 e.printStackTrace(); 584 } catch (IOException e) { 585 goon = false; 586 e.printStackTrace(); 587 } catch (ClassNotFoundException e) { 588 goon = false; 589 e.printStackTrace(); 590 } 591 logger.info("received request, req = " + req); 592 if (req == null) { 593 goon = false; 594 break; 595 } 596 if (!(req instanceof GBTransportMessReq)) { 597 goon = false; 598 break; 599 } 600 601 // find pair and manage termination status 602 logger.info("find pair"); 603 while (!pairlist.hasNext()) { // wait 604 if (!finner.hasJobs() && !pairlist.hasNext()) { 605 goon = false; 606 break; 607 } 608 try { 609 sleeps++; 610 //if (sleeps % 10 == 0) { 611 logger.info("waiting for reducers, remaining = " + finner.getJobs()); 612 //} 613 Thread.sleep(100); 614 } catch (InterruptedException e) { 615 goon = false; 616 break; 617 } 618 } 619 if (!pairlist.hasNext() && !finner.hasJobs()) { 620 logger.info("termination detection: no pairs and no jobs left"); 621 goon = false; 622 break; //continue; //break? 623 } 624 finner.notIdle(); // before pairlist get!! 625 pair = pairlist.removeNext(); 626 // send pair to client, even if null 627 if (debug) { 628 logger.info("active count = " + active.get()); 629 logger.info("send pair = " + pair); 630 } 631 GBTransportMess msg = null; 632 if (pair != null) { 633 msg = new GBTransportMessPairIndex(pair); 634 } else { 635 msg = new GBTransportMess(); //not End(); at this time 636 // goon ?= false; 637 } 638 try { 639 red++; 640 pairChannel.send(pairTag, msg); 641 int a = active.getAndIncrement(); 642 } catch (IOException e) { 643 e.printStackTrace(); 644 goon = false; 645 break; 646 } 647 //logger.debug("#distributed list = " + theList.size()); 648 } 649 logger.info("terminated, send " + red + " reduction pairs"); 650 651 /* 652 * send end mark to clients 653 */ 654 logger.debug("send end"); 655 try { 656 for (int i = 0; i < threadsPerNode; i++) { // -1 657 //do not wait: Object rq = pairChannel.receive(pairTag); 658 pairChannel.send(pairTag, new GBTransportMessEnd()); 659 } 660 // send also end to receiver, no more, did not work 661 //pairChannel.send(resultTag, new GBTransportMessEnd()); 662 } catch (IOException e) { 663 if (logger.isDebugEnabled()) { 664 e.printStackTrace(); 665 } 666 } 667 receiver.terminate(); 668 669 int d = active.get(); 670 logger.info("remaining active tasks = " + d); 671 //logger.info("terminated, send " + red + " reduction pairs"); 672 pairChannel.close(); 673 logger.info("redServ pairChannel.close()"); 674 finner.release(); 675 676 channel.close(); 677 logger.info("redServ channel.close()"); 678 } 679} 680 681 682/** 683 * Distributed server receiving worker thread. 684 * @param <C> coefficient type 685 */ 686 687class HybridReducerReceiver<C extends RingElem<C>> extends Thread { 688 689 690 public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class); 691 692 693 public final boolean debug = logger.isDebugEnabled(); 694 695 696 private final DistHashTable<Integer, GenPolynomial<C>> theList; 697 698 699 private final PairList<C> pairlist; 700 701 702 private final TaggedSocketChannel pairChannel; 703 704 705 private final Terminator finner; 706 707 708 private final int threadsPerNode; 709 710 711 private final AtomicInteger active; 712 713 714 private volatile boolean goon; 715 716 717 /** 718 * Message tag for pairs. 719 */ 720 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 721 722 723 /** 724 * Message tag for results. 725 */ 726 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 727 728 729 /** 730 * Message tag for acknowledgments. 731 */ 732 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 733 734 735 /** 736 * Constructor. 737 * @param tpn number of threads per node 738 * @param fin terminator 739 * @param a active remote tasks count 740 * @param pc tagged socket channel 741 * @param dl distributed hash table 742 * @param L ordered pair list 743 */ 744 HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 745 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 746 active = a; 747 threadsPerNode = tpn; 748 finner = fin; 749 pairChannel = pc; 750 theList = dl; 751 pairlist = L; 752 goon = true; 753 //logger.info("reducer server created " + this); 754 } 755 756 757 /** 758 * Work loop. 759 * @see java.lang.Thread#run() 760 */ 761 @Override 762 public void run() { 763 //Pair<C> pair = null; 764 GenPolynomial<C> H = null; 765 int red = 0; 766 int polIndex = -1; 767 //Integer senderId; // obsolete 768 769 // while more requests 770 while (goon) { 771 // receive request 772 logger.debug("receive result"); 773 //senderId = null; 774 Object rh = null; 775 try { 776 rh = pairChannel.receive(resultTag); 777 int i = active.getAndDecrement(); 778 } catch (InterruptedException e) { 779 goon = false; 780 //e.printStackTrace(); 781 //?? finner.initIdle(1); 782 break; 783 } catch (IOException e) { 784 e.printStackTrace(); 785 goon = false; 786 finner.initIdle(1); 787 break; 788 } catch (ClassNotFoundException e) { 789 e.printStackTrace(); 790 goon = false; 791 finner.initIdle(1); 792 break; 793 } 794 logger.info("received H polynomial"); 795 if (rh == null) { 796 if (this.isInterrupted()) { 797 goon = false; 798 finner.initIdle(1); 799 break; 800 } 801 //finner.initIdle(1); 802 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 803 logger.info("received GBTransportMessEnd"); 804 goon = false; 805 //?? finner.initIdle(1); 806 break; 807 } else if (rh instanceof GBTransportMessPoly) { 808 // update pair list 809 red++; 810 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 811 H = mpi.pol; 812 //senderId = mpi.threadId; 813 if (H != null) { 814 if (debug) { 815 logger.info("H = " + H.leadingExpVector()); 816 } 817 if (!H.isZERO()) { 818 if (H.isONE()) { 819 // finner.allIdle(); 820 polIndex = pairlist.putOne(); 821 GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H); 822 if (nn != null) { 823 logger.info("double polynomials nn = " + nn + ", H = " + H); 824 } 825 //goon = false; must wait for other clients 826 //finner.initIdle(1); 827 //break; 828 } else { 829 polIndex = pairlist.put(H); 830 // use putWait ? but still not all distributed 831 GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H); 832 if (nn != null) { 833 logger.info("double polynomials nn = " + nn + ", H = " + H); 834 } 835 } 836 } 837 } 838 } 839 // only after recording in pairlist ! 840 finner.initIdle(1); 841 try { 842 pairChannel.send(ackTag, new GBTransportMess()); 843 logger.debug("send acknowledgement"); 844 } catch (IOException e) { 845 e.printStackTrace(); 846 goon = false; 847 break; 848 } 849 } // end while 850 goon = false; 851 logger.info("terminated, received " + red + " reductions"); 852 } 853 854 855 /** 856 * Terminate. 857 */ 858 public void terminate() { 859 goon = false; 860 //this.interrupt(); 861 try { 862 this.join(); 863 } catch (InterruptedException e) { 864 // unfug Thread.currentThread().interrupt(); 865 } 866 logger.debug("HybridReducerReceiver terminated"); 867 } 868 869} 870 871 872/** 873 * Distributed clients reducing worker threads. 874 */ 875 876class HybridReducerClient<C extends RingElem<C>> implements Runnable { 877 878 879 private static final Logger logger = Logger.getLogger(HybridReducerClient.class); 880 881 882 public final boolean debug = logger.isDebugEnabled(); 883 884 885 private final TaggedSocketChannel pairChannel; 886 887 888 private final DistHashTable<Integer, GenPolynomial<C>> theList; 889 890 891 private final ReductionPar<C> red; 892 893 894 private final int threadsPerNode; 895 896 897 /* 898 * Identification number for this thread. 899 */ 900 //public final Integer threadId; // obsolete 901 902 903 /** 904 * Message tag for pairs. 905 */ 906 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 907 908 909 /** 910 * Message tag for results. 911 */ 912 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 913 914 915 /** 916 * Message tag for acknowledgments. 917 */ 918 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 919 920 921 /** 922 * Constructor. 923 * @param tpn number of threads per node 924 * @param tc tagged socket channel 925 * @param tid thread identification 926 * @param dl distributed hash table 927 */ 928 HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid, 929 DistHashTable<Integer, GenPolynomial<C>> dl) { 930 this.threadsPerNode = tpn; 931 pairChannel = tc; 932 //threadId = 100 + tid; // keep distinct from other tags 933 theList = dl; 934 red = new ReductionPar<C>(); 935 } 936 937 938 /** 939 * Work loop. 940 * @see java.lang.Runnable#run() 941 */ 942 //JAVA6only: @Override 943 public void run() { 944 if (debug) { 945 logger.info("pairChannel = " + pairChannel + " reducer client running"); 946 } 947 Pair<C> pair = null; 948 GenPolynomial<C> pi; 949 GenPolynomial<C> pj; 950 GenPolynomial<C> S; 951 GenPolynomial<C> H = null; 952 //boolean set = false; 953 boolean goon = true; 954 boolean doEnd = true; 955 int reduction = 0; 956 //int sleeps = 0; 957 Integer pix; 958 Integer pjx; 959 960 while (goon) { 961 /* protocol: 962 * request pair, process pair, send result, receive acknowledgment 963 */ 964 // pair = (Pair) pairlist.removeNext(); 965 Object req = new GBTransportMessReq(); 966 logger.info("send request = " + req); 967 try { 968 pairChannel.send(pairTag, req); 969 } catch (IOException e) { 970 goon = false; 971 if (logger.isDebugEnabled()) { 972 e.printStackTrace(); 973 } 974 logger.info("receive pair, exception "); 975 break; 976 } 977 logger.debug("receive pair, goon = " + goon); 978 doEnd = true; 979 Object pp = null; 980 try { 981 pp = pairChannel.receive(pairTag); 982 } catch (InterruptedException e) { 983 goon = false; 984 e.printStackTrace(); 985 } catch (IOException e) { 986 goon = false; 987 if (logger.isDebugEnabled()) { 988 e.printStackTrace(); 989 } 990 break; 991 } catch (ClassNotFoundException e) { 992 goon = false; 993 e.printStackTrace(); 994 } 995 if (debug) { 996 logger.info("received pair = " + pp); 997 } 998 H = null; 999 if (pp == null) { // should not happen 1000 continue; 1001 } 1002 if (pp instanceof GBTransportMessEnd) { 1003 goon = false; 1004 //doEnd = false; // bug 1005 continue; 1006 } 1007 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1008 pi = pj = null; 1009 if (pp instanceof GBTransportMessPair) { 1010 pair = ((GBTransportMessPair<C>) pp).pair; 1011 if (pair != null) { 1012 pi = pair.pi; 1013 pj = pair.pj; 1014 //logger.debug("pair: pix = " + pair.i 1015 // + ", pjx = " + pair.j); 1016 } 1017 } 1018 if (pp instanceof GBTransportMessPairIndex) { 1019 pix = ((GBTransportMessPairIndex) pp).i; 1020 pjx = ((GBTransportMessPairIndex) pp).j; 1021 pi = theList.getWait(pix); 1022 pj = theList.getWait(pjx); 1023 //logger.info("pix = " + pix + ", pjx = " +pjx); 1024 } 1025 1026 if (pi != null && pj != null) { 1027 S = red.SPolynomial(pi, pj); 1028 //System.out.println("S = " + S); 1029 if (S.isZERO()) { 1030 // pair.setZero(); does not work in dist 1031 } else { 1032 if (logger.isDebugEnabled()) { 1033 logger.debug("ht(S) = " + S.leadingExpVector()); 1034 } 1035 H = red.normalform(theList, S); 1036 reduction++; 1037 if (H.isZERO()) { 1038 // pair.setZero(); does not work in dist 1039 } else { 1040 H = H.monic(); 1041 if (logger.isInfoEnabled()) { 1042 logger.info("ht(H) = " + H.leadingExpVector()); 1043 } 1044 } 1045 } 1046 } 1047 } 1048 if (pp instanceof GBTransportMess) { 1049 logger.debug("null pair results in null H poly"); 1050 } 1051 1052 // send H or must send null, if not at end 1053 if (logger.isDebugEnabled()) { 1054 logger.debug("#distributed list = " + theList.size()); 1055 logger.debug("send H polynomial = " + H); 1056 } 1057 try { 1058 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1059 doEnd = false; 1060 } catch (IOException e) { 1061 goon = false; 1062 e.printStackTrace(); 1063 } 1064 logger.info("done send poly message of " + pp); 1065 try { 1066 //pp = pairChannel.receive(threadId); 1067 pp = pairChannel.receive(ackTag); 1068 } catch (InterruptedException e) { 1069 goon = false; 1070 e.printStackTrace(); 1071 } catch (IOException e) { 1072 goon = false; 1073 if (logger.isDebugEnabled()) { 1074 e.printStackTrace(); 1075 } 1076 break; 1077 } catch (ClassNotFoundException e) { 1078 goon = false; 1079 e.printStackTrace(); 1080 } 1081 if (!(pp instanceof GBTransportMess)) { 1082 logger.error("invalid acknowledgement " + pp); 1083 } 1084 logger.info("received acknowledgment " + pp); 1085 } 1086 logger.info("terminated, done " + reduction + " reductions"); 1087 if (doEnd) { 1088 try { 1089 pairChannel.send(resultTag, new GBTransportMessEnd()); 1090 } catch (IOException e) { 1091 //e.printStackTrace(); 1092 } 1093 logger.info("terminated, send done"); 1094 } 1095 } 1096}