001/* 002 * $Id: GroebnerBaseDistributedHybrid.java 5313 2015-10-26 22:46:38Z kredel $ 003 */ 004 005package edu.jas.gb; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Collections; 011import java.util.List; 012import java.util.ListIterator; 013import java.util.concurrent.atomic.AtomicInteger; 014 015import org.apache.log4j.Logger; 016 017import edu.jas.poly.ExpVector; 018import edu.jas.poly.GenPolynomial; 019import edu.jas.structure.RingElem; 020import edu.jas.util.ChannelFactory; 021import edu.jas.util.DistHashTable; 022import edu.jas.util.DistHashTableServer; 023import edu.jas.util.SocketChannel; 024import edu.jas.util.TaggedSocketChannel; 025import edu.jas.util.Terminator; 026import edu.jas.util.ThreadPool; 027 028 029/** 030 * Groebner Base distributed hybrid algorithm. Implements a distributed memory 031 * with multi-core CPUs parallel version of Groebner bases. Using pairlist 032 * class, distributed multi-threaded tasks do reduction, one communication 033 * channel per remote node. 034 * @param <C> coefficient type 035 * @author Heinz Kredel 036 * @deprecated use GroebnerBaseDistributedHybridEC 037 */ 038 039@Deprecated 040public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 041 042 043 public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class); 044 045 046 public final boolean debug = logger.isDebugEnabled(); 047 048 049 /** 050 * Number of threads to use. 051 */ 052 protected final int threads; 053 054 055 /** 056 * Default number of threads. 057 */ 058 protected static final int DEFAULT_THREADS = 2; 059 060 061 /** 062 * Number of threads per node to use. 063 */ 064 protected final int threadsPerNode; 065 066 067 /** 068 * Default number of threads per compute node. 069 */ 070 protected static final int DEFAULT_THREADS_PER_NODE = 1; 071 072 073 /** 074 * Pool of threads to use. 075 */ 076 //protected final ExecutorService pool; // not for single node tests 077 protected transient final ThreadPool pool; 078 079 080 /** 081 * Default server port. 082 */ 083 protected static final int DEFAULT_PORT = 4711; 084 085 086 /** 087 * Server port to use. 088 */ 089 protected final int port; 090 091 092 /** 093 * Message tag for pairs. 094 */ 095 public static final Integer pairTag = Integer.valueOf(1); 096 097 098 /** 099 * Message tag for results. 100 */ 101 public static final Integer resultTag = Integer.valueOf(2); 102 103 104 /** 105 * Message tag for acknowledgments. 106 */ 107 public static final Integer ackTag = Integer.valueOf(3); 108 109 110 /** 111 * Constructor. 112 */ 113 public GroebnerBaseDistributedHybrid() { 114 this(DEFAULT_THREADS, DEFAULT_PORT); 115 } 116 117 118 /** 119 * Constructor. 120 * @param threads number of threads to use. 121 */ 122 public GroebnerBaseDistributedHybrid(int threads) { 123 this(threads, new ThreadPool(threads), DEFAULT_PORT); 124 } 125 126 127 /** 128 * Constructor. 129 * @param threads number of threads to use. 130 * @param port server port to use. 131 */ 132 public GroebnerBaseDistributedHybrid(int threads, int port) { 133 this(threads, new ThreadPool(threads), port); 134 } 135 136 137 /** 138 * Constructor. 139 * @param threads number of threads to use. 140 * @param threadsPerNode threads per node to use. 141 * @param port server port to use. 142 */ 143 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) { 144 this(threads, threadsPerNode, new ThreadPool(threads), port); 145 } 146 147 148 /** 149 * Constructor. 150 * @param threads number of threads to use. 151 * @param pool ThreadPool to use. 152 * @param port server port to use. 153 */ 154 public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) { 155 this(threads, DEFAULT_THREADS_PER_NODE, pool, port); 156 } 157 158 159 /** 160 * Constructor. 161 * @param threads number of threads to use. 162 * @param threadsPerNode threads per node to use. 163 * @param pl pair selection strategy 164 * @param port server port to use. 165 */ 166 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) { 167 this(threads, threadsPerNode, new ThreadPool(threads), pl, port); 168 } 169 170 171 /** 172 * Constructor. 173 * @param threads number of threads to use. 174 * @param threadsPerNode threads per node to use. 175 * @param port server port to use. 176 */ 177 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) { 178 this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port); 179 } 180 181 182 /** 183 * Constructor. 184 * @param threads number of threads to use. 185 * @param threadsPerNode threads per node to use. 186 * @param pool ThreadPool to use. 187 * @param pl pair selection strategy 188 * @param port server port to use. 189 */ 190 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl, 191 int port) { 192 super(new ReductionPar<C>(), pl); 193 if (threads < 1) { 194 threads = 1; 195 } 196 this.threads = threads; 197 this.threadsPerNode = threadsPerNode; 198 this.pool = pool; 199 this.port = port; 200 //logger.info("generated pool: " + pool); 201 } 202 203 204 /** 205 * Cleanup and terminate. 206 */ 207 @Override 208 public void terminate() { 209 if (pool == null) { 210 return; 211 } 212 pool.terminate(); 213 } 214 215 216 /** 217 * Distributed hybrid Groebner base. 218 * @param modv number of module variables. 219 * @param F polynomial list. 220 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 221 */ 222 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 223 long t = System.currentTimeMillis(); 224 final int DL_PORT = port + 100; 225 ChannelFactory cf = new ChannelFactory(port); 226 cf.init(); 227 DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT); 228 dls.init(); 229 logger.debug("dist-list server running"); 230 231 GenPolynomial<C> p; 232 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 233 PairList<C> pairlist = null; 234 boolean oneInGB = false; 235 //nt l = F.size(); 236 @SuppressWarnings("unused") 237 int unused; 238 ListIterator<GenPolynomial<C>> it = F.listIterator(); 239 while (it.hasNext()) { 240 p = it.next(); 241 if (p.length() > 0) { 242 p = p.monic(); 243 if (p.isONE()) { 244 oneInGB = true; 245 G.clear(); 246 G.add(p); 247 //return G; must signal termination to others 248 } 249 if (!oneInGB) { 250 G.add(p); 251 } 252 if (pairlist == null) { 253 //pairlist = new OrderedPairlist<C>(modv, p.ring); 254 pairlist = strategy.create(modv, p.ring); 255 if (!p.ring.coFac.isField()) { 256 throw new IllegalArgumentException("coefficients not from a field"); 257 } 258 } 259 // theList not updated here 260 if (p.isONE()) { 261 unused = pairlist.putOne(); 262 } else { 263 unused = pairlist.put(p); 264 } 265 } else { 266 //l--; 267 } 268 } 269 //if (l <= 1) { 270 // return G; must signal termination to others 271 //} 272 logger.info("pairlist " + pairlist); 273 274 logger.debug("looking for clients"); 275 //long t = System.currentTimeMillis(); 276 // now in DL, uses resend for late clients 277 //while ( dls.size() < threads ) { sleep(); } 278 279 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>( 280 "localhost", DL_PORT); 281 theList.init(); 282 List<GenPolynomial<C>> al = pairlist.getList(); 283 if (G.size() != al.size()) { 284 logger.info("G != al: " + G + " != " + al); 285 } 286 for (int i = 0; i < al.size(); i++) { 287 // no wait required 288 GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i)); 289 if (nn != null) { 290 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 291 } 292 } 293 294 Terminator finner = new Terminator(threads * threadsPerNode); 295 HybridReducerServer<C> R; 296 logger.info("using pool = " + pool); 297 for (int i = 0; i < threads; i++) { 298 R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist); 299 pool.addJob(R); 300 //logger.info("server submitted " + R); 301 } 302 logger.info("main loop waiting " + finner); 303 finner.waitDone(); 304 int ps = theList.size(); 305 logger.info("#distributed list = " + ps); 306 // make sure all polynomials arrived: not needed in master 307 // G = (ArrayList)theList.values(); 308 G = pairlist.getList(); 309 if (ps != G.size()) { 310 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 311 } 312 for (GenPolynomial<C> q : theList.getValueList()) { 313 if (q != null && !q.isZERO()) { 314 logger.debug("final q = " + q.leadingExpVector()); 315 } 316 } 317 logger.debug("distributed list end"); 318 long time = System.currentTimeMillis(); 319 List<GenPolynomial<C>> Gp; 320 Gp = minimalGB(G); // not jet distributed but threaded 321 time = System.currentTimeMillis() - time; 322 logger.info("parallel gbmi time = " + time); 323 G = Gp; 324 logger.debug("server cf.terminate()"); 325 cf.terminate(); 326 // no more required // 327 logger.info("server not pool.terminate() " + pool); 328 //pool.terminate(); 329 logger.info("server theList.terminate() " + theList.size()); 330 theList.terminate(); 331 logger.info("server dls.terminate() " + dls); 332 dls.terminate(); 333 t = System.currentTimeMillis() - t; 334 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 335 return G; 336 } 337 338 339 /** 340 * GB distributed client. 341 * @param host the server runs on. 342 * @throws IOException 343 */ 344 public void clientPart(String host) throws IOException { 345 346 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost 347 cf.init(); 348 SocketChannel channel = cf.getChannel(host, port); 349 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel); 350 pairChannel.init(); 351 352 if (debug) { 353 logger.info("clientPart pairChannel = " + pairChannel); 354 } 355 356 final int DL_PORT = port + 100; 357 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host, 358 DL_PORT); 359 theList.init(); 360 361 //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList); 362 //R.run(); 363 364 ThreadPool pool = new ThreadPool(threadsPerNode); 365 logger.info("client using pool = " + pool); 366 for (int i = 0; i < threadsPerNode; i++) { 367 HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList); 368 pool.addJob(Rr); 369 } 370 if (debug) { 371 logger.info("clients submitted"); 372 } 373 pool.terminate(); 374 logger.info("client pool.terminate()"); 375 376 pairChannel.close(); 377 logger.info("client pairChannel.close()"); 378 379 theList.terminate(); 380 cf.terminate(); 381 logger.info("client cf.terminate()"); 382 383 channel.close(); 384 logger.info("client channel.close()"); 385 return; 386 } 387 388 389 /** 390 * Minimal ordered groebner basis. 391 * @param Fp a Groebner base. 392 * @return a reduced Groebner base of Fp. 393 */ 394 @SuppressWarnings("cast") 395 @Override 396 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 397 GenPolynomial<C> a; 398 ArrayList<GenPolynomial<C>> G; 399 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 400 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 401 while (it.hasNext()) { 402 a = it.next(); 403 if (a.length() != 0) { // always true 404 // already monic a = a.monic(); 405 G.add(a); 406 } 407 } 408 if (G.size() <= 1) { 409 return G; 410 } 411 412 ExpVector e; 413 ExpVector f; 414 GenPolynomial<C> p; 415 ArrayList<GenPolynomial<C>> F; 416 F = new ArrayList<GenPolynomial<C>>(G.size()); 417 boolean mt; 418 419 while (G.size() > 0) { 420 a = G.remove(0); 421 e = a.leadingExpVector(); 422 423 it = G.listIterator(); 424 mt = false; 425 while (it.hasNext() && !mt) { 426 p = it.next(); 427 f = p.leadingExpVector(); 428 mt = e.multipleOf(f); 429 } 430 it = F.listIterator(); 431 while (it.hasNext() && !mt) { 432 p = it.next(); 433 f = p.leadingExpVector(); 434 mt = e.multipleOf(f); 435 } 436 if (!mt) { 437 F.add(a); 438 } else { 439 // System.out.println("dropped " + a.length()); 440 } 441 } 442 G = F; 443 if (G.size() <= 1) { 444 return G; 445 } 446 Collections.reverse(G); // important for lex GB 447 448 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()]; 449 int i = 0; 450 F = new ArrayList<GenPolynomial<C>>(G.size()); 451 while (G.size() > 0) { 452 a = G.remove(0); 453 // System.out.println("doing " + a.length()); 454 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 455 R.addAll(G); 456 R.addAll(F); 457 mirs[i] = new MiReducerServer<C>(R, a); 458 pool.addJob(mirs[i]); 459 i++; 460 F.add(a); 461 } 462 G = F; 463 F = new ArrayList<GenPolynomial<C>>(G.size()); 464 for (i = 0; i < mirs.length; i++) { 465 a = mirs[i].getNF(); 466 F.add(a); 467 } 468 return F; 469 } 470 471} 472 473 474/** 475 * Distributed server reducing worker proxy threads. 476 * @param <C> coefficient type 477 */ 478 479class HybridReducerServer<C extends RingElem<C>> implements Runnable { 480 481 482 public static final Logger logger = Logger.getLogger(HybridReducerServer.class); 483 484 485 public final boolean debug = logger.isDebugEnabled(); 486 487 488 private final Terminator finner; 489 490 491 private final ChannelFactory cf; 492 493 494 private TaggedSocketChannel pairChannel; 495 496 497 private final DistHashTable<Integer, GenPolynomial<C>> theList; 498 499 500 private final PairList<C> pairlist; 501 502 503 private final int threadsPerNode; 504 505 506 /** 507 * Message tag for pairs. 508 */ 509 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 510 511 512 /** 513 * Message tag for results. 514 */ 515 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 516 517 518 /** 519 * Message tag for acknowledgments. 520 */ 521 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 522 523 524 /** 525 * Constructor. 526 * @param tpn number of threads per node 527 * @param fin terminator 528 * @param cf channel factory 529 * @param dl distributed hash table 530 * @param L ordered pair list 531 */ 532 HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf, 533 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 534 threadsPerNode = tpn; 535 finner = fin; 536 this.cf = cf; 537 theList = dl; 538 pairlist = L; 539 //logger.info("reducer server created " + this); 540 } 541 542 543 /** 544 * Work loop. 545 * @see java.lang.Runnable#run() 546 */ 547 @Override 548 public void run() { 549 logger.info("reducer server running with " + cf); 550 SocketChannel channel = null; 551 try { 552 channel = cf.getChannel(); 553 pairChannel = new TaggedSocketChannel(channel); 554 pairChannel.init(); 555 } catch (InterruptedException e) { 556 logger.debug("get pair channel interrupted"); 557 e.printStackTrace(); 558 return; 559 } 560 if (debug) { 561 logger.info("pairChannel = " + pairChannel); 562 } 563 // record idle remote workers (minus one?) 564 //finner.beIdle(threadsPerNode-1); 565 finner.initIdle(threadsPerNode); 566 AtomicInteger active = new AtomicInteger(0); 567 568 // start receiver 569 HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active, 570 pairChannel, theList, pairlist); 571 receiver.start(); 572 573 Pair<C> pair; 574 //boolean set = false; 575 boolean goon = true; 576 //int polIndex = -1; 577 int red = 0; 578 int sleeps = 0; 579 580 // while more requests 581 while (goon) { 582 // receive request if thread is reported incactive 583 logger.debug("receive request"); 584 Object req = null; 585 try { 586 req = pairChannel.receive(pairTag); 587 } catch (InterruptedException e) { 588 goon = false; 589 e.printStackTrace(); 590 } catch (IOException e) { 591 goon = false; 592 e.printStackTrace(); 593 } catch (ClassNotFoundException e) { 594 goon = false; 595 e.printStackTrace(); 596 } 597 logger.info("received request, req = " + req); 598 if (req == null) { 599 goon = false; 600 break; 601 } 602 if (!(req instanceof GBTransportMessReq)) { 603 goon = false; 604 break; 605 } 606 607 // find pair and manage termination status 608 logger.info("find pair"); 609 while (!pairlist.hasNext()) { // wait 610 if (!finner.hasJobs() && !pairlist.hasNext()) { 611 goon = false; 612 break; 613 } 614 try { 615 sleeps++; 616 if (sleeps % 3 == 0) { 617 logger.info("waiting for reducers, remaining = " + finner.getJobs()); 618 } 619 Thread.sleep(100); 620 } catch (InterruptedException e) { 621 goon = false; 622 break; 623 } 624 } 625 if (!pairlist.hasNext() && !finner.hasJobs()) { 626 logger.info("termination detection: no pairs and no jobs left"); 627 goon = false; 628 break; //continue; //break? 629 } 630 finner.notIdle(); // before pairlist get!! 631 pair = pairlist.removeNext(); 632 // send pair to client, even if null 633 if (debug) { 634 logger.info("active count = " + active.get()); 635 logger.info("send pair = " + pair); 636 } 637 GBTransportMess msg = null; 638 if (pair != null) { 639 msg = new GBTransportMessPairIndex(pair); 640 } else { 641 msg = new GBTransportMess(); //not End(); at this time 642 // goon ?= false; 643 } 644 try { 645 red++; 646 pairChannel.send(pairTag, msg); 647 int a = active.getAndIncrement(); 648 } catch (IOException e) { 649 e.printStackTrace(); 650 goon = false; 651 break; 652 } 653 //logger.debug("#distributed list = " + theList.size()); 654 } 655 logger.info("terminated, send " + red + " reduction pairs"); 656 657 /* 658 * send end mark to clients 659 */ 660 logger.debug("send end"); 661 try { 662 for (int i = 0; i < threadsPerNode; i++) { // -1 663 //do not wait: Object rq = pairChannel.receive(pairTag); 664 pairChannel.send(pairTag, new GBTransportMessEnd()); 665 } 666 // send also end to receiver, no more, did not work 667 //pairChannel.send(resultTag, new GBTransportMessEnd()); 668 } catch (IOException e) { 669 if (logger.isDebugEnabled()) { 670 e.printStackTrace(); 671 } 672 } 673 receiver.terminate(); 674 675 int d = active.get(); 676 logger.info("remaining active tasks = " + d); 677 //logger.info("terminated, send " + red + " reduction pairs"); 678 pairChannel.close(); 679 logger.info("redServ pairChannel.close()"); 680 finner.release(); 681 682 channel.close(); 683 logger.info("redServ channel.close()"); 684 } 685} 686 687 688/** 689 * Distributed server receiving worker thread. 690 * @param <C> coefficient type 691 */ 692 693class HybridReducerReceiver<C extends RingElem<C>> extends Thread { 694 695 696 public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class); 697 698 699 public final boolean debug = logger.isDebugEnabled(); 700 701 702 private final DistHashTable<Integer, GenPolynomial<C>> theList; 703 704 705 private final PairList<C> pairlist; 706 707 708 private final TaggedSocketChannel pairChannel; 709 710 711 private final Terminator finner; 712 713 714 private final int threadsPerNode; 715 716 717 private final AtomicInteger active; 718 719 720 private volatile boolean goon; 721 722 723 /** 724 * Message tag for pairs. 725 */ 726 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 727 728 729 /** 730 * Message tag for results. 731 */ 732 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 733 734 735 /** 736 * Message tag for acknowledgments. 737 */ 738 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 739 740 741 /** 742 * Constructor. 743 * @param tpn number of threads per node 744 * @param fin terminator 745 * @param a active remote tasks count 746 * @param pc tagged socket channel 747 * @param dl distributed hash table 748 * @param L ordered pair list 749 */ 750 HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 751 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 752 active = a; 753 threadsPerNode = tpn; 754 finner = fin; 755 pairChannel = pc; 756 theList = dl; 757 pairlist = L; 758 goon = true; 759 //logger.info("reducer server created " + this); 760 } 761 762 763 /** 764 * Work loop. 765 * @see java.lang.Thread#run() 766 */ 767 @Override 768 public void run() { 769 //Pair<C> pair = null; 770 GenPolynomial<C> H = null; 771 int red = 0; 772 int polIndex = -1; 773 //Integer senderId; // obsolete 774 775 // while more requests 776 while (goon) { 777 // receive request 778 logger.debug("receive result"); 779 //senderId = null; 780 Object rh = null; 781 try { 782 rh = pairChannel.receive(resultTag); 783 int i = active.getAndDecrement(); 784 } catch (InterruptedException e) { 785 goon = false; 786 //e.printStackTrace(); 787 //?? finner.initIdle(1); 788 break; 789 } catch (IOException e) { 790 e.printStackTrace(); 791 goon = false; 792 finner.initIdle(1); 793 break; 794 } catch (ClassNotFoundException e) { 795 e.printStackTrace(); 796 goon = false; 797 finner.initIdle(1); 798 break; 799 } 800 logger.info("received H polynomial"); 801 if (rh == null) { 802 if (this.isInterrupted()) { 803 goon = false; 804 finner.initIdle(1); 805 break; 806 } 807 //finner.initIdle(1); 808 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 809 logger.info("received GBTransportMessEnd"); 810 goon = false; 811 //?? finner.initIdle(1); 812 break; 813 } else if (rh instanceof GBTransportMessPoly) { 814 // update pair list 815 red++; 816 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 817 H = mpi.pol; 818 //senderId = mpi.threadId; 819 if (H != null) { 820 if (debug) { 821 logger.info("H = " + H.leadingExpVector()); 822 } 823 if (!H.isZERO()) { 824 if (H.isONE()) { 825 // finner.allIdle(); 826 polIndex = pairlist.putOne(); 827 GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H); 828 if (nn != null) { 829 logger.info("double polynomials nn = " + nn + ", H = " + H); 830 } 831 //goon = false; must wait for other clients 832 //finner.initIdle(1); 833 //break; 834 } else { 835 polIndex = pairlist.put(H); 836 // use putWait ? but still not all distributed 837 GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H); 838 if (nn != null) { 839 logger.info("double polynomials nn = " + nn + ", H = " + H); 840 } 841 } 842 } 843 } 844 } 845 // only after recording in pairlist ! 846 finner.initIdle(1); 847 try { 848 pairChannel.send(ackTag, new GBTransportMess()); 849 logger.debug("send acknowledgement"); 850 } catch (IOException e) { 851 e.printStackTrace(); 852 goon = false; 853 break; 854 } 855 } // end while 856 goon = false; 857 logger.info("terminated, received " + red + " reductions"); 858 } 859 860 861 /** 862 * Terminate. 863 */ 864 public void terminate() { 865 goon = false; 866 //this.interrupt(); 867 try { 868 this.join(); 869 } catch (InterruptedException e) { 870 // unfug Thread.currentThread().interrupt(); 871 } 872 logger.debug("HybridReducerReceiver terminated"); 873 } 874 875} 876 877 878/** 879 * Distributed clients reducing worker threads. 880 */ 881 882class HybridReducerClient<C extends RingElem<C>> implements Runnable { 883 884 885 private static final Logger logger = Logger.getLogger(HybridReducerClient.class); 886 887 888 public final boolean debug = logger.isDebugEnabled(); 889 890 891 private final TaggedSocketChannel pairChannel; 892 893 894 private final DistHashTable<Integer, GenPolynomial<C>> theList; 895 896 897 private final ReductionPar<C> red; 898 899 900 private final int threadsPerNode; 901 902 903 /* 904 * Identification number for this thread. 905 */ 906 //public final Integer threadId; // obsolete 907 908 909 /** 910 * Message tag for pairs. 911 */ 912 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag; 913 914 915 /** 916 * Message tag for results. 917 */ 918 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag; 919 920 921 /** 922 * Message tag for acknowledgments. 923 */ 924 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag; 925 926 927 /** 928 * Constructor. 929 * @param tpn number of threads per node 930 * @param tc tagged socket channel 931 * @param tid thread identification 932 * @param dl distributed hash table 933 */ 934 HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid, 935 DistHashTable<Integer, GenPolynomial<C>> dl) { 936 this.threadsPerNode = tpn; 937 pairChannel = tc; 938 //threadId = 100 + tid; // keep distinct from other tags 939 theList = dl; 940 red = new ReductionPar<C>(); 941 } 942 943 944 /** 945 * Work loop. 946 * @see java.lang.Runnable#run() 947 */ 948 @Override 949 public void run() { 950 if (debug) { 951 logger.info("pairChannel = " + pairChannel + " reducer client running"); 952 } 953 Pair<C> pair = null; 954 GenPolynomial<C> pi; 955 GenPolynomial<C> pj; 956 GenPolynomial<C> S; 957 GenPolynomial<C> H = null; 958 //boolean set = false; 959 boolean goon = true; 960 boolean doEnd = true; 961 int reduction = 0; 962 //int sleeps = 0; 963 Integer pix; 964 Integer pjx; 965 966 while (goon) { 967 /* protocol: 968 * request pair, process pair, send result, receive acknowledgment 969 */ 970 // pair = (Pair) pairlist.removeNext(); 971 Object req = new GBTransportMessReq(); 972 logger.info("send request = " + req); 973 try { 974 pairChannel.send(pairTag, req); 975 } catch (IOException e) { 976 goon = false; 977 if (logger.isDebugEnabled()) { 978 e.printStackTrace(); 979 } 980 logger.info("receive pair, exception "); 981 break; 982 } 983 logger.debug("receive pair, goon = " + goon); 984 doEnd = true; 985 Object pp = null; 986 try { 987 pp = pairChannel.receive(pairTag); 988 } catch (InterruptedException e) { 989 goon = false; 990 e.printStackTrace(); 991 } catch (IOException e) { 992 goon = false; 993 if (logger.isDebugEnabled()) { 994 e.printStackTrace(); 995 } 996 break; 997 } catch (ClassNotFoundException e) { 998 goon = false; 999 e.printStackTrace(); 1000 } 1001 if (debug) { 1002 logger.info("received pair = " + pp); 1003 } 1004 H = null; 1005 if (pp == null) { // should not happen 1006 continue; 1007 } 1008 if (pp instanceof GBTransportMessEnd) { 1009 goon = false; 1010 //doEnd = false; // bug 1011 continue; 1012 } 1013 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1014 pi = pj = null; 1015 if (pp instanceof GBTransportMessPair) { 1016 pair = ((GBTransportMessPair<C>) pp).pair; 1017 if (pair != null) { 1018 pi = pair.pi; 1019 pj = pair.pj; 1020 //logger.debug("pair: pix = " + pair.i 1021 // + ", pjx = " + pair.j); 1022 } 1023 } 1024 if (pp instanceof GBTransportMessPairIndex) { 1025 pix = ((GBTransportMessPairIndex) pp).i; 1026 pjx = ((GBTransportMessPairIndex) pp).j; 1027 pi = theList.getWait(pix); 1028 pj = theList.getWait(pjx); 1029 //logger.info("pix = " + pix + ", pjx = " +pjx); 1030 } 1031 1032 if (pi != null && pj != null) { 1033 S = red.SPolynomial(pi, pj); 1034 //System.out.println("S = " + S); 1035 if (S.isZERO()) { 1036 // pair.setZero(); does not work in dist 1037 } else { 1038 if (logger.isDebugEnabled()) { 1039 logger.debug("ht(S) = " + S.leadingExpVector()); 1040 } 1041 H = red.normalform(theList, S); 1042 reduction++; 1043 if (H.isZERO()) { 1044 // pair.setZero(); does not work in dist 1045 } else { 1046 H = H.monic(); 1047 if (logger.isInfoEnabled()) { 1048 logger.info("ht(H) = " + H.leadingExpVector()); 1049 } 1050 } 1051 } 1052 } 1053 } 1054 if (pp instanceof GBTransportMess) { 1055 logger.debug("null pair results in null H poly"); 1056 } 1057 1058 // send H or must send null, if not at end 1059 if (logger.isDebugEnabled()) { 1060 logger.debug("#distributed list = " + theList.size()); 1061 logger.debug("send H polynomial = " + H); 1062 } 1063 try { 1064 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1065 doEnd = false; 1066 } catch (IOException e) { 1067 goon = false; 1068 e.printStackTrace(); 1069 } 1070 logger.info("done send poly message of " + pp); 1071 try { 1072 //pp = pairChannel.receive(threadId); 1073 pp = pairChannel.receive(ackTag); 1074 } catch (InterruptedException e) { 1075 goon = false; 1076 e.printStackTrace(); 1077 } catch (IOException e) { 1078 goon = false; 1079 if (logger.isDebugEnabled()) { 1080 e.printStackTrace(); 1081 } 1082 break; 1083 } catch (ClassNotFoundException e) { 1084 goon = false; 1085 e.printStackTrace(); 1086 } 1087 if (!(pp instanceof GBTransportMess)) { 1088 logger.error("invalid acknowledgement " + pp); 1089 } 1090 logger.info("received acknowledgment " + pp); 1091 } 1092 logger.info("terminated, done " + reduction + " reductions"); 1093 if (doEnd) { 1094 try { 1095 pairChannel.send(resultTag, new GBTransportMessEnd()); 1096 } catch (IOException e) { 1097 //e.printStackTrace(); 1098 } 1099 logger.info("terminated, send done"); 1100 } 1101 } 1102}