001/* 002 * $Id: GroebnerBaseDistributedHybridEC.java 5264 2015-07-27 14:19:57Z kredel $ 003 */ 004 005package edu.jas.gb; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Collections; 011import java.util.List; 012import java.util.ListIterator; 013import java.util.concurrent.atomic.AtomicInteger; 014 015import org.apache.log4j.Logger; 016 017import edu.jas.poly.ExpVector; 018import edu.jas.poly.GenPolynomial; 019import edu.jas.poly.GenPolynomialRing; 020import edu.jas.poly.PolyUtil; 021import edu.jas.structure.RingElem; 022import edu.jas.util.ChannelFactory; 023import edu.jas.util.DistHashTable; 024import edu.jas.util.DistHashTableServer; 025import edu.jas.util.DistThreadPool; 026import edu.jas.util.RemoteExecutable; 027import edu.jas.util.SocketChannel; 028import edu.jas.util.TaggedSocketChannel; 029import edu.jas.util.Terminator; 030import edu.jas.util.ThreadPool; 031 032 033/** 034 * Groebner Base distributed hybrid algorithm. Implements a distributed memory 035 * with multi-core CPUs parallel version of Groebner bases with executable 036 * channels. Using pairlist class, distributed multi-threaded tasks do 037 * reduction, one communication channel per remote node. 038 * @param <C> coefficient type 039 * @author Heinz Kredel 040 */ 041 042public class GroebnerBaseDistributedHybridEC<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 043 044 045 public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybridEC.class); 046 047 048 public final boolean debug = logger.isDebugEnabled(); 049 050 051 /** 052 * Number of threads to use. 053 */ 054 protected final int threads; 055 056 057 /** 058 * Default number of threads. 059 */ 060 protected static final int DEFAULT_THREADS = 2; 061 062 063 /** 064 * Number of threads per node to use. 065 */ 066 protected final int threadsPerNode; 067 068 069 /** 070 * Default number of threads per compute node. 071 */ 072 protected static final int DEFAULT_THREADS_PER_NODE = 1; 073 074 075 /** 076 * Pool of threads to use. 077 */ 078 //protected final ExecutorService pool; // not for single node tests 079 protected transient final ThreadPool pool; 080 081 082 /** 083 * Default server port. 084 */ 085 protected static final int DEFAULT_PORT = 55711; 086 087 088 /** 089 * Default distributed hash table server port. 090 */ 091 protected final int DHT_PORT; 092 093 094 /** 095 * machine file to use. 096 */ 097 protected final String mfile; 098 099 100 /** 101 * Server port to use. 102 */ 103 protected final int port; 104 105 106 /** 107 * Distributed thread pool to use. 108 */ 109 private final transient DistThreadPool dtp; 110 111 112 /** 113 * Distributed hash table server to use. 114 */ 115 private final transient DistHashTableServer<Integer> dhts; 116 117 118 /** 119 * Message tag for pairs. 120 */ 121 public static final Integer pairTag = Integer.valueOf(1); 122 123 124 /** 125 * Message tag for results. 126 */ 127 public static final Integer resultTag = Integer.valueOf(2); 128 129 130 /** 131 * Message tag for acknowledgments. 132 */ 133 public static final Integer ackTag = Integer.valueOf(3); 134 135 136 /** 137 * Constructor. 138 * @param mfile name of the machine file. 139 */ 140 public GroebnerBaseDistributedHybridEC(String mfile) { 141 this(mfile, DEFAULT_THREADS, DEFAULT_PORT); 142 } 143 144 145 /** 146 * Constructor. 147 * @param mfile name of the machine file. 148 * @param threads number of threads to use. 149 */ 150 public GroebnerBaseDistributedHybridEC(String mfile, int threads) { 151 this(mfile, threads, new ThreadPool(threads), DEFAULT_PORT); 152 } 153 154 155 /** 156 * Constructor. 157 * @param mfile name of the machine file. 158 * @param threads number of threads to use. 159 * @param port server port to use. 160 */ 161 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int port) { 162 this(mfile, threads, new ThreadPool(threads), port); 163 } 164 165 166 /** 167 * Constructor. 168 * @param mfile name of the machine file. 169 * @param threads number of threads to use. 170 * @param threadsPerNode threads per node to use. 171 * @param port server port to use. 172 */ 173 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, int port) { 174 this(mfile, threads, threadsPerNode, new ThreadPool(threads), port); 175 } 176 177 178 /** 179 * Constructor. 180 * @param mfile name of the machine file. 181 * @param threads number of threads to use. 182 * @param pool ThreadPool to use. 183 * @param port server port to use. 184 */ 185 public GroebnerBaseDistributedHybridEC(String mfile, int threads, ThreadPool pool, int port) { 186 this(mfile, threads, DEFAULT_THREADS_PER_NODE, pool, port); 187 } 188 189 190 /** 191 * Constructor. 192 * @param mfile name of the machine file. 193 * @param threads number of threads to use. 194 * @param threadsPerNode threads per node to use. 195 * @param pl pair selection strategy 196 * @param port server port to use. 197 */ 198 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, PairList<C> pl, 199 int port) { 200 this(mfile, threads, threadsPerNode, new ThreadPool(threads), pl, port); 201 } 202 203 204 /** 205 * Constructor. 206 * @param mfile name of the machine file. 207 * @param threads number of threads to use. 208 * @param threadsPerNode threads per node to use. 209 * @param port server port to use. 210 */ 211 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ThreadPool pool, 212 int port) { 213 this(mfile, threads, threadsPerNode, pool, new OrderedPairlist<C>(), port); 214 } 215 216 217 /** 218 * Constructor. 219 * @param mfile name of the machine file. 220 * @param threads number of threads to use. 221 * @param threadsPerNode threads per node to use. 222 * @param pool ThreadPool to use. 223 * @param pl pair selection strategy 224 * @param port server port to use. 225 */ 226 public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ThreadPool pool, 227 PairList<C> pl, int port) { 228 super(new ReductionPar<C>(), pl); 229 this.threads = threads; 230 if (mfile == null || mfile.length() == 0) { 231 this.mfile = "../util/machines"; // contains localhost 232 } else { 233 this.mfile = mfile; 234 } 235 if (threads < 1) { 236 threads = 1; 237 } 238 this.threadsPerNode = threadsPerNode; 239 if (pool == null) { 240 pool = new ThreadPool(threads); 241 } 242 this.pool = pool; 243 this.port = port; 244 logger.info("machine file " + mfile + ", port = " + port); 245 this.dtp = new DistThreadPool(this.threads, this.mfile); 246 logger.info("running " + dtp); 247 this.DHT_PORT = this.dtp.getEC().getMasterPort() + 100; 248 this.dhts = new DistHashTableServer<Integer>(this.DHT_PORT); 249 this.dhts.init(); 250 logger.info("running " + dhts); 251 } 252 253 254 /** 255 * Cleanup and terminate ThreadPool. 256 */ 257 @Override 258 public void terminate() { 259 terminate(true); 260 } 261 262 263 /** 264 * Terminates the distributed thread pools. 265 * @param shutDown true, if shut-down of the remote executable servers is 266 * requested, false, if remote executable servers stay alive. 267 */ 268 public void terminate(boolean shutDown) { 269 pool.terminate(); 270 dtp.terminate(shutDown); 271 logger.debug("dhts.terminate()"); 272 dhts.terminate(); 273 } 274 275 276 /** 277 * Distributed Groebner base. 278 * @param modv number of module variables. 279 * @param F polynomial list. 280 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 281 */ 282 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 283 List<GenPolynomial<C>> Fp = normalizeZerosOnes(F); 284 Fp = PolyUtil.<C> monic(Fp); 285 if (Fp.size() <= 1) { 286 return Fp; 287 } 288 if (!Fp.get(0).ring.coFac.isField()) { 289 throw new IllegalArgumentException("coefficients not from a field"); 290 } 291 292 String master = dtp.getEC().getMasterHost(); 293 //int port = dtp.getEC().getMasterPort(); // wrong port 294 GBHybridExerClient<C> gbc = new GBHybridExerClient<C>(master, threadsPerNode, port, DHT_PORT); 295 for (int i = 0; i < threads; i++) { 296 // schedule remote clients 297 dtp.addJob(gbc); 298 } 299 // run master 300 List<GenPolynomial<C>> G = GBMaster(modv, Fp); 301 return G; 302 } 303 304 305 /** 306 * Distributed hybrid Groebner base. 307 * @param modv number of module variables. 308 * @param F non empty monic polynomial list without zeros. 309 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 310 */ 311 List<GenPolynomial<C>> GBMaster(int modv, List<GenPolynomial<C>> F) { 312 long t = System.currentTimeMillis(); 313 ChannelFactory cf = new ChannelFactory(port); 314 cf.init(); 315 316 List<GenPolynomial<C>> G = F; 317 if (G.isEmpty()) { 318 throw new IllegalArgumentException("empty polynomial list not allowed"); 319 } 320 GenPolynomialRing<C> ring = G.get(0).ring; 321 PairList<C> pairlist = strategy.create(modv, ring); 322 pairlist.put(G); 323 324 /* 325 GenPolynomial<C> p; 326 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 327 PairList<C> pairlist = null; 328 boolean oneInGB = false; 329 int l = F.size(); 330 int unused; 331 ListIterator<GenPolynomial<C>> it = F.listIterator(); 332 while (it.hasNext()) { 333 p = it.next(); 334 if (p.length() > 0) { 335 p = p.monic(); 336 if (p.isONE()) { 337 oneInGB = true; 338 G.clear(); 339 G.add(p); 340 //return G; must signal termination to others 341 } 342 if (!oneInGB) { 343 G.add(p); 344 } 345 if (pairlist == null) { 346 //pairlist = new OrderedPairlist<C>(modv, p.ring); 347 pairlist = strategy.create(modv, p.ring); 348 if (!p.ring.coFac.isField()) { 349 throw new IllegalArgumentException("coefficients not from a field"); 350 } 351 } 352 // theList not updated here 353 if (p.isONE()) { 354 unused = pairlist.putOne(); 355 } else { 356 unused = pairlist.put(p); 357 } 358 } else { 359 l--; 360 } 361 } 362 //if (l <= 1) { 363 //return G; must signal termination to others 364 //} 365 */ 366 logger.info("start " + pairlist); 367 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>( 368 "localhost", DHT_PORT); 369 theList.init(); 370 List<GenPolynomial<C>> al = pairlist.getList(); 371 for (int i = 0; i < al.size(); i++) { 372 // no wait required 373 GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i)); 374 if (nn != null) { 375 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 376 } 377 } 378 379 Terminator finner = new Terminator(threads * threadsPerNode); 380 HybridReducerServerEC<C> R; 381 logger.info("using pool = " + pool); 382 for (int i = 0; i < threads; i++) { 383 R = new HybridReducerServerEC<C>(threadsPerNode, finner, cf, theList, pairlist); 384 pool.addJob(R); 385 //logger.info("server submitted " + R); 386 } 387 logger.info("main loop waiting " + finner); 388 finner.waitDone(); 389 int ps = theList.size(); 390 logger.info("#distributed list = " + ps); 391 // make sure all polynomials arrived: not needed in master 392 // G = (ArrayList)theList.values(); 393 G = pairlist.getList(); 394 if (ps != G.size()) { 395 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 396 } 397 for (GenPolynomial<C> q : theList.getValueList()) { 398 if (debug && q != null && !q.isZERO()) { 399 logger.debug("final q = " + q.leadingExpVector()); 400 } 401 } 402 logger.debug("distributed list end"); 403 long time = System.currentTimeMillis(); 404 List<GenPolynomial<C>> Gp; 405 Gp = minimalGB(G); // not jet distributed but threaded 406 time = System.currentTimeMillis() - time; 407 logger.debug("parallel gbmi time = " + time); 408 G = Gp; 409 logger.debug("server cf.terminate()"); 410 cf.terminate(); 411 logger.debug("server theList.terminate() " + theList.size()); 412 theList.clear(); 413 theList.terminate(); 414 t = System.currentTimeMillis() - t; 415 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 416 return G; 417 } 418 419 420 /** 421 * GB distributed client part. 422 * @param host the server runs on. 423 * @param port the server runs. 424 * @param dhtport of the DHT server. 425 * @throws IOException 426 */ 427 public static <C extends RingElem<C>> void clientPart(String host, int threadsPerNode, int port, 428 int dhtport) throws IOException { 429 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost 430 cf.init(); 431 logger.info("clientPart connecting to " + host + ", port = " + port + ", dhtport = " + dhtport); 432 SocketChannel channel = cf.getChannel(host, port); 433 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel); 434 pairChannel.init(); 435 436 DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host, 437 dhtport); 438 theList.init(); 439 440 ThreadPool pool = new ThreadPool(threadsPerNode); 441 logger.info("client using pool = " + pool); 442 for (int i = 0; i < threadsPerNode; i++) { 443 HybridReducerClientEC<C> Rr = new HybridReducerClientEC<C>(/*threadsPerNode,*/pairChannel, /*i,*/ 444 theList); 445 pool.addJob(Rr); 446 } 447 logger.debug("clients submitted"); 448 449 pool.terminate(); 450 logger.debug("client pool.terminate()"); 451 452 pairChannel.close(); 453 logger.debug("client pairChannel.close()"); 454 455 //master only: theList.clear(); 456 theList.terminate(); 457 cf.terminate(); 458 logger.info("client cf.terminate()"); 459 460 channel.close(); 461 logger.info("client channel.close()"); 462 return; 463 } 464 465 466 /** 467 * Minimal ordered groebner basis. 468 * @param Fp a Groebner base. 469 * @return a reduced Groebner base of Fp. 470 */ 471 @Override 472 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 473 GenPolynomial<C> a; 474 ArrayList<GenPolynomial<C>> G; 475 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 476 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 477 while (it.hasNext()) { 478 a = it.next(); 479 if (a.length() != 0) { // always true 480 // already monic a = a.monic(); 481 G.add(a); 482 } 483 } 484 if (G.size() <= 1) { 485 return G; 486 } 487 488 ExpVector e; 489 ExpVector f; 490 GenPolynomial<C> p; 491 ArrayList<GenPolynomial<C>> F; 492 F = new ArrayList<GenPolynomial<C>>(G.size()); 493 boolean mt; 494 495 while (G.size() > 0) { 496 a = G.remove(0); 497 e = a.leadingExpVector(); 498 499 it = G.listIterator(); 500 mt = false; 501 while (it.hasNext() && !mt) { 502 p = it.next(); 503 f = p.leadingExpVector(); 504 mt = e.multipleOf(f); 505 } 506 it = F.listIterator(); 507 while (it.hasNext() && !mt) { 508 p = it.next(); 509 f = p.leadingExpVector(); 510 mt = e.multipleOf(f); 511 } 512 if (!mt) { 513 F.add(a); 514 } else { 515 // System.out.println("dropped " + a.length()); 516 } 517 } 518 G = F; 519 if (G.size() <= 1) { 520 return G; 521 } 522 Collections.reverse(G); // important for lex GB 523 524 @SuppressWarnings("cast") 525 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()]; 526 int i = 0; 527 F = new ArrayList<GenPolynomial<C>>(G.size()); 528 while (G.size() > 0) { 529 a = G.remove(0); 530 // System.out.println("doing " + a.length()); 531 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 532 R.addAll(G); 533 R.addAll(F); 534 mirs[i] = new MiReducerServer<C>(R, a); 535 pool.addJob(mirs[i]); 536 i++; 537 F.add(a); 538 } 539 G = F; 540 F = new ArrayList<GenPolynomial<C>>(G.size()); 541 for (i = 0; i < mirs.length; i++) { 542 a = mirs[i].getNF(); 543 F.add(a); 544 } 545 return F; 546 } 547 548} 549 550 551/** 552 * Distributed server reducing worker proxy threads. 553 * @param <C> coefficient type 554 */ 555class HybridReducerServerEC<C extends RingElem<C>> implements Runnable { 556 557 558 public static final Logger logger = Logger.getLogger(HybridReducerServerEC.class); 559 560 561 public final boolean debug = logger.isDebugEnabled(); 562 563 564 private final Terminator finner; 565 566 567 private final ChannelFactory cf; 568 569 570 private TaggedSocketChannel pairChannel; 571 572 573 private final DistHashTable<Integer, GenPolynomial<C>> theList; 574 575 576 private final PairList<C> pairlist; 577 578 579 private final int threadsPerNode; 580 581 582 /** 583 * Message tag for pairs. 584 */ 585 public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag; 586 587 588 /** 589 * Message tag for results. 590 */ 591 public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag; 592 593 594 /** 595 * Message tag for acknowledgments. 596 */ 597 public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag; 598 599 600 /** 601 * Constructor. 602 * @param tpn number of threads per node 603 * @param fin terminator 604 * @param cf channel factory 605 * @param dl distributed hash table 606 * @param L ordered pair list 607 */ 608 HybridReducerServerEC(int tpn, Terminator fin, ChannelFactory cf, 609 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 610 threadsPerNode = tpn; 611 finner = fin; 612 this.cf = cf; 613 theList = dl; 614 pairlist = L; 615 //logger.info("reducer server created " + this); 616 } 617 618 619 /** 620 * Work loop. 621 * @see java.lang.Runnable#run() 622 */ 623 @Override 624 public void run() { 625 logger.info("reducer server running with " + cf); 626 SocketChannel channel = null; 627 try { 628 channel = cf.getChannel(); 629 pairChannel = new TaggedSocketChannel(channel); 630 pairChannel.init(); 631 } catch (InterruptedException e) { 632 logger.debug("get pair channel interrupted"); 633 e.printStackTrace(); 634 return; 635 } 636 if (debug) { 637 logger.info("pairChannel = " + pairChannel); 638 } 639 // record idle remote workers (minus one?) 640 //finner.beIdle(threadsPerNode-1); 641 finner.initIdle(threadsPerNode); 642 AtomicInteger active = new AtomicInteger(0); 643 644 // start receiver 645 HybridReducerReceiverEC<C> receiver = new HybridReducerReceiverEC<C>(/*threadsPerNode,*/finner, 646 active, pairChannel, theList, pairlist); 647 receiver.start(); 648 649 Pair<C> pair; 650 //boolean set = false; 651 boolean goon = true; 652 //int polIndex = -1; 653 int red = 0; 654 int sleeps = 0; 655 656 // while more requests 657 while (goon) { 658 // receive request if thread is reported incactive 659 logger.debug("receive request"); 660 Object req = null; 661 try { 662 req = pairChannel.receive(pairTag); 663 } catch (InterruptedException e) { 664 goon = false; 665 e.printStackTrace(); 666 } catch (IOException e) { 667 goon = false; 668 e.printStackTrace(); 669 } catch (ClassNotFoundException e) { 670 goon = false; 671 e.printStackTrace(); 672 } 673 //logger.info("received request, req = " + req); 674 if (req == null) { 675 goon = false; 676 break; 677 } 678 if (!(req instanceof GBTransportMessReq)) { 679 goon = false; 680 break; 681 } 682 683 // find pair and manage termination status 684 logger.debug("find pair"); 685 while (!pairlist.hasNext()) { // wait 686 if (!finner.hasJobs() && !pairlist.hasNext()) { 687 goon = false; 688 break; 689 } 690 try { 691 sleeps++; 692 if (sleeps % 3 == 0) { 693 logger.info("waiting for reducers, remaining = " + finner); 694 } 695 Thread.sleep(100); 696 } catch (InterruptedException e) { 697 goon = false; 698 break; 699 } 700 } 701 if (Thread.currentThread().isInterrupted()) { 702 goon = false; 703 break; 704 } 705 if (!pairlist.hasNext() && !finner.hasJobs()) { 706 logger.info("termination detection: no pairs and no jobs left"); 707 goon = false; 708 break; //continue; //break? 709 } 710 finner.notIdle(); // before pairlist get!! 711 pair = pairlist.removeNext(); 712 // send pair to client, even if null 713 if (debug) { 714 logger.info("active count = " + active.get()); 715 logger.info("send pair = " + pair); 716 } 717 GBTransportMess msg = null; 718 if (pair != null) { 719 msg = new GBTransportMessPairIndex(pair); //,pairlist.size()-1); // size-1 720 } else { 721 msg = new GBTransportMess(); // not End(); at this time 722 // goon ?= false; 723 } 724 try { 725 red++; 726 pairChannel.send(pairTag, msg); 727 @SuppressWarnings("unused") 728 int a = active.getAndIncrement(); 729 } catch (IOException e) { 730 e.printStackTrace(); 731 goon = false; 732 break; 733 } 734 //logger.debug("#distributed list = " + theList.size()); 735 } 736 logger.info("terminated, send " + red + " reduction pairs"); 737 738 /* 739 * send end mark to clients 740 */ 741 logger.debug("send end"); 742 try { 743 for (int i = 0; i < threadsPerNode; i++) { // -1 744 //do not wait: Object rq = pairChannel.receive(pairTag); 745 pairChannel.send(pairTag, new GBTransportMessEnd()); 746 } 747 // send also end to receiver 748 pairChannel.send(resultTag, new GBTransportMessEnd()); 749 //beware of race condition 750 } catch (IOException e) { 751 if (logger.isDebugEnabled()) { 752 e.printStackTrace(); 753 } 754 } 755 receiver.terminate(); 756 757 int d = active.get(); 758 if (d > 0) { 759 logger.info("remaining active tasks = " + d); 760 } 761 //logger.info("terminated, send " + red + " reduction pairs"); 762 pairChannel.close(); 763 logger.debug("redServ pairChannel.close()"); 764 finner.release(); 765 766 channel.close(); 767 logger.info("redServ channel.close()"); 768 } 769} 770 771 772/** 773 * Distributed server receiving worker thread. 774 * @param <C> coefficient type 775 */ 776class HybridReducerReceiverEC<C extends RingElem<C>> extends Thread { 777 778 779 public static final Logger logger = Logger.getLogger(HybridReducerReceiverEC.class); 780 781 782 public final boolean debug = logger.isDebugEnabled(); 783 784 785 private final DistHashTable<Integer, GenPolynomial<C>> theList; 786 787 788 private final PairList<C> pairlist; 789 790 791 private final TaggedSocketChannel pairChannel; 792 793 794 private final Terminator finner; 795 796 797 //private final int threadsPerNode; 798 799 800 private final AtomicInteger active; 801 802 803 private volatile boolean goon; 804 805 806 /** 807 * Message tag for pairs. 808 */ 809 public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag; 810 811 812 /** 813 * Message tag for results. 814 */ 815 public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag; 816 817 818 /** 819 * Message tag for acknowledgments. 820 */ 821 public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag; 822 823 824 /** 825 * Constructor. 826 * @param fin terminator 827 * @param a active remote tasks count 828 * @param pc tagged socket channel 829 * @param dl distributed hash table 830 * @param L ordered pair list 831 */ 832 //param tpn number of threads per node 833 HybridReducerReceiverEC(/*int tpn,*/Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 834 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) { 835 active = a; 836 //threadsPerNode = tpn; 837 finner = fin; 838 pairChannel = pc; 839 theList = dl; 840 pairlist = L; 841 goon = true; 842 //logger.info("reducer server created " + this); 843 } 844 845 846 /** 847 * Work loop. 848 * @see java.lang.Thread#run() 849 */ 850 @Override 851 public void run() { 852 //Pair<C> pair = null; 853 GenPolynomial<C> H = null; 854 int red = 0; 855 int polIndex = -1; 856 //Integer senderId; // obsolete 857 858 // while more requests 859 while (goon) { 860 // receive request 861 logger.debug("receive result"); 862 //senderId = null; 863 Object rh = null; 864 try { 865 rh = pairChannel.receive(resultTag); 866 @SuppressWarnings("unused") 867 int i = active.getAndDecrement(); 868 } catch (InterruptedException e) { 869 goon = false; 870 //e.printStackTrace(); 871 //?? finner.initIdle(1); 872 break; 873 } catch (IOException e) { 874 e.printStackTrace(); 875 goon = false; 876 finner.initIdle(1); 877 break; 878 } catch (ClassNotFoundException e) { 879 e.printStackTrace(); 880 goon = false; 881 finner.initIdle(1); 882 break; 883 } 884 logger.info("received H polynomial"); 885 if (rh == null) { 886 if (this.isInterrupted()) { 887 goon = false; 888 finner.initIdle(1); 889 break; 890 } 891 //finner.initIdle(1); 892 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 893 logger.info("received GBTransportMessEnd"); 894 goon = false; 895 //?? finner.initIdle(1); 896 break; 897 } else if (rh instanceof GBTransportMessPoly) { 898 // update pair list 899 red++; 900 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 901 H = mpi.pol; 902 //senderId = mpi.threadId; 903 if (H != null) { 904 if (debug) { 905 logger.info("H = " + H.leadingExpVector()); 906 } 907 if (!H.isZERO()) { 908 if (H.isONE()) { 909 // finner.allIdle(); 910 polIndex = pairlist.putOne(); 911 theList.putWait(Integer.valueOf(polIndex), H); 912 //goon = false; must wait for other clients 913 //finner.initIdle(1); 914 //break; 915 } else { 916 polIndex = pairlist.put(H); 917 // use putWait ? but still not all distributed 918 //GenPolynomial<C> nn = 919 theList.putWait(Integer.valueOf(polIndex), H); 920 } 921 } 922 } 923 } 924 // only after recording in pairlist ! 925 finner.initIdle(1); 926 try { 927 pairChannel.send(ackTag, new GBTransportMess()); 928 logger.debug("send acknowledgement"); 929 } catch (IOException e) { 930 e.printStackTrace(); 931 goon = false; 932 break; 933 } 934 } // end while 935 goon = false; 936 logger.info("terminated, received " + red + " reductions"); 937 } 938 939 940 /** 941 * Terminate. 942 */ 943 public void terminate() { 944 goon = false; 945 //this.interrupt(); 946 try { 947 this.join(); 948 } catch (InterruptedException e) { 949 // unfug Thread.currentThread().interrupt(); 950 } 951 logger.debug("HybridReducerReceiver terminated"); 952 } 953 954} 955 956 957/** 958 * Distributed clients reducing worker threads. 959 */ 960class HybridReducerClientEC<C extends RingElem<C>> implements Runnable { 961 962 963 private static final Logger logger = Logger.getLogger(HybridReducerClientEC.class); 964 965 966 public final boolean debug = logger.isDebugEnabled(); 967 968 969 private final TaggedSocketChannel pairChannel; 970 971 972 private final DistHashTable<Integer, GenPolynomial<C>> theList; 973 974 975 private final ReductionPar<C> red; 976 977 978 //private final int threadsPerNode; 979 980 981 /* 982 * Identification number for this thread. 983 */ 984 //public final Integer threadId; // obsolete 985 986 987 /** 988 * Message tag for pairs. 989 */ 990 public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag; 991 992 993 /** 994 * Message tag for results. 995 */ 996 public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag; 997 998 999 /** 1000 * Message tag for acknowledgments. 1001 */ 1002 public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag; 1003 1004 1005 /** 1006 * Constructor. 1007 * @param tc tagged socket channel 1008 * @param dl distributed hash table 1009 */ 1010 //param tpn number of threads per node 1011 //param tid thread identification 1012 HybridReducerClientEC(/*int tpn,*/TaggedSocketChannel tc, /*Integer tid,*/ 1013 DistHashTable<Integer, GenPolynomial<C>> dl) { 1014 //threadsPerNode = tpn; 1015 pairChannel = tc; 1016 //threadId = 100 + tid; // keep distinct from other tags 1017 theList = dl; 1018 red = new ReductionPar<C>(); 1019 } 1020 1021 1022 /** 1023 * Work loop. 1024 * @see java.lang.Runnable#run() 1025 */ 1026 @Override 1027 public void run() { 1028 if (debug) { 1029 logger.info("pairChannel = " + pairChannel + " reducer client running"); 1030 } 1031 Pair<C> pair = null; 1032 GenPolynomial<C> pi, pj, ps; 1033 GenPolynomial<C> S; 1034 GenPolynomial<C> H = null; 1035 //boolean set = false; 1036 boolean goon = true; 1037 boolean doEnd = true; 1038 int reduction = 0; 1039 //int sleeps = 0; 1040 Integer pix, pjx, psx; 1041 1042 while (goon) { 1043 /* protocol: 1044 * request pair, process pair, send result, receive acknowledgment 1045 */ 1046 // pair = (Pair) pairlist.removeNext(); 1047 Object req = new GBTransportMessReq(); 1048 logger.debug("send request"); 1049 try { 1050 pairChannel.send(pairTag, req); 1051 } catch (IOException e) { 1052 goon = false; 1053 if (logger.isDebugEnabled()) { 1054 e.printStackTrace(); 1055 } 1056 logger.info("receive pair, IOexception "); 1057 break; 1058 } 1059 logger.debug("receive pair, goon = " + goon); 1060 doEnd = true; 1061 Object pp = null; 1062 try { 1063 pp = pairChannel.receive(pairTag); 1064 } catch (InterruptedException e) { 1065 goon = false; 1066 e.printStackTrace(); 1067 } catch (IOException e) { 1068 goon = false; 1069 if (logger.isDebugEnabled()) { 1070 e.printStackTrace(); 1071 } 1072 break; 1073 } catch (ClassNotFoundException e) { 1074 goon = false; 1075 e.printStackTrace(); 1076 } 1077 if (debug) { 1078 logger.info("received pair = " + pp); 1079 } 1080 H = null; 1081 if (pp == null) { // should not happen 1082 continue; 1083 } 1084 if (pp instanceof GBTransportMessEnd) { 1085 goon = false; 1086 //doEnd = false; 1087 continue; 1088 } 1089 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1090 pi = pj = ps = null; 1091 if (pp instanceof GBTransportMessPair) { // obsolet, for tests 1092 GBTransportMessPair<C> tmp = (GBTransportMessPair<C>) pp; 1093 pair = tmp.pair; 1094 if (pair != null) { 1095 pi = pair.pi; 1096 pj = pair.pj; 1097 //logger.debug("pair: pix = " + pair.i 1098 // + ", pjx = " + pair.j); 1099 } 1100 } 1101 if (pp instanceof GBTransportMessPairIndex) { 1102 GBTransportMessPairIndex tmpi = (GBTransportMessPairIndex) pp; 1103 pix = tmpi.i; 1104 pjx = tmpi.j; 1105 psx = tmpi.s; 1106 pi = theList.getWait(pix); 1107 pj = theList.getWait(pjx); 1108 ps = theList.getWait(psx); 1109 //logger.info("pix = " + pix + ", pjx = " + pjx + ", psx = " + psx); 1110 } 1111 if (pi != null && pj != null) { 1112 S = red.SPolynomial(pi, pj); 1113 //logger.info("ht(S) = " + S.leadingExpVector()); 1114 if (S.isZERO()) { 1115 // pair.setZero(); does not work in dist 1116 } else { 1117 if (logger.isDebugEnabled()) { 1118 logger.debug("ht(S) = " + S.leadingExpVector()); 1119 } 1120 H = red.normalform(theList, S); 1121 //logger.info("ht(H) = " + H.leadingExpVector()); 1122 reduction++; 1123 if (H.isZERO()) { 1124 // pair.setZero(); does not work in dist 1125 } else { 1126 H = H.monic(); 1127 if (logger.isInfoEnabled()) { 1128 logger.info("ht(H) = " + H.leadingExpVector()); 1129 } 1130 } 1131 } 1132 } else { 1133 logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps); 1134 } 1135 } 1136 if (pp instanceof GBTransportMess) { 1137 logger.debug("null pair results in null H poly"); 1138 } 1139 1140 // send H or must send null, if not at end 1141 if (logger.isDebugEnabled()) { 1142 logger.debug("#distributed list = " + theList.size()); 1143 logger.debug("send H polynomial = " + H); 1144 } 1145 try { 1146 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1147 doEnd = false; 1148 } catch (IOException e) { 1149 goon = false; 1150 e.printStackTrace(); 1151 } 1152 //logger.info("done send poly message of " + pp); 1153 try { 1154 //pp = pairChannel.receive(threadId); 1155 pp = pairChannel.receive(ackTag); 1156 } catch (InterruptedException e) { 1157 goon = false; 1158 e.printStackTrace(); 1159 } catch (IOException e) { 1160 goon = false; 1161 if (logger.isDebugEnabled()) { 1162 e.printStackTrace(); 1163 } 1164 break; 1165 } catch (ClassNotFoundException e) { 1166 goon = false; 1167 e.printStackTrace(); 1168 } 1169 if (!(pp instanceof GBTransportMess)) { 1170 logger.error("invalid acknowledgement " + pp); 1171 } 1172 logger.info("received acknowledgment "); 1173 } 1174 logger.info("terminated, " + reduction + " reductions, " + theList.size() + " polynomials"); 1175 if (doEnd) { 1176 try { 1177 pairChannel.send(resultTag, new GBTransportMessEnd()); 1178 } catch (IOException e) { 1179 //e.printStackTrace(); 1180 } 1181 logger.debug("terminated, send done"); 1182 } 1183 } 1184} 1185 1186 1187/** 1188 * Objects of this class are to be send to a ExecutableServer. 1189 */ 1190class GBHybridExerClient<C extends RingElem<C>> implements RemoteExecutable { 1191 1192 1193 String host; 1194 1195 1196 int port; 1197 1198 1199 int dhtport; 1200 1201 1202 int threadsPerNode; 1203 1204 1205 /** 1206 * GBHybridExerClient. 1207 * @param host 1208 * @param port 1209 * @param dhtport 1210 */ 1211 public GBHybridExerClient(String host, int threadsPerNode, int port, int dhtport) { 1212 this.host = host; 1213 this.threadsPerNode = threadsPerNode; 1214 this.port = port; 1215 this.dhtport = dhtport; 1216 } 1217 1218 1219 /** 1220 * run. 1221 */ 1222 public void run() { 1223 try { 1224 GroebnerBaseDistributedHybridEC.<C> clientPart(host, threadsPerNode, port, dhtport); 1225 } catch (Exception e) { 1226 e.printStackTrace(); 1227 } 1228 } 1229 1230 1231 /** 1232 * String representation. 1233 */ 1234 @Override 1235 public String toString() { 1236 StringBuffer s = new StringBuffer("GBHybridExerClient("); 1237 s.append("host=" + host); 1238 s.append(", threadsPerNode=" + threadsPerNode); 1239 s.append(", port=" + port); 1240 s.append(", dhtport=" + dhtport); 1241 s.append(")"); 1242 return s.toString(); 1243 } 1244 1245}