001 /*
002 * $Id: GroebnerBaseDistributedHybrid.java 3626 2011-05-08 09:51:57Z kredel $
003 */
004
005 package edu.jas.gb;
006
007
008 import java.io.IOException;
009 import java.util.ArrayList;
010 import java.util.List;
011 import java.util.ListIterator;
012 import java.util.Collections;
013 import java.util.concurrent.atomic.AtomicInteger;
014
015 import org.apache.log4j.Logger;
016
017 import edu.jas.poly.ExpVector;
018 import edu.jas.poly.GenPolynomial;
019 import edu.jas.structure.RingElem;
020 import edu.jas.util.ChannelFactory;
021 import edu.jas.util.DistHashTable;
022 import edu.jas.util.DistHashTableServer;
023 import edu.jas.util.SocketChannel;
024 import edu.jas.util.TaggedSocketChannel;
025 import edu.jas.util.Terminator;
026 import edu.jas.util.ThreadPool;
027
028
029 /**
030 * Groebner Base distributed hybrid algorithm. Implements a
031 * distributed memory with multi-core CPUs parallel version of
032 * Groebner bases. Using pairlist class, distributed multi-threaded
033 * tasks do reduction, one communication channel per remote node.
034 * @param <C> coefficient type
035 * @author Heinz Kredel
036 */
037
038 public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
039
040
041 public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class);
042
043
044 public final boolean debug = logger.isDebugEnabled();
045
046
047 /**
048 * Number of threads to use.
049 */
050 protected final int threads;
051
052
053 /**
054 * Default number of threads.
055 */
056 protected static final int DEFAULT_THREADS = 2;
057
058
059 /**
060 * Number of threads per node to use.
061 */
062 protected final int threadsPerNode;
063
064
065 /**
066 * Default number of threads per compute node.
067 */
068 protected static final int DEFAULT_THREADS_PER_NODE = 1;
069
070
071 /**
072 * Pool of threads to use.
073 */
074 //protected final ExecutorService pool; // not for single node tests
075 protected final ThreadPool pool;
076
077
078 /**
079 * Default server port.
080 */
081 protected static final int DEFAULT_PORT = 4711;
082
083
084 /**
085 * Server port to use.
086 */
087 protected final int port;
088
089
090 /**
091 * Message tag for pairs.
092 */
093 public static final Integer pairTag = new Integer(1);
094
095
096 /**
097 * Message tag for results.
098 */
099 public static final Integer resultTag = new Integer(2);
100
101
102 /**
103 * Message tag for acknowledgments.
104 */
105 public static final Integer ackTag = new Integer(3);
106
107
108 /**
109 * Constructor.
110 */
111 public GroebnerBaseDistributedHybrid() {
112 this(DEFAULT_THREADS, DEFAULT_PORT);
113 }
114
115
116 /**
117 * Constructor.
118 * @param threads number of threads to use.
119 */
120 public GroebnerBaseDistributedHybrid(int threads) {
121 this(threads, new ThreadPool(threads), DEFAULT_PORT);
122 }
123
124
125 /**
126 * Constructor.
127 * @param threads number of threads to use.
128 * @param port server port to use.
129 */
130 public GroebnerBaseDistributedHybrid(int threads, int port) {
131 this(threads, new ThreadPool(threads), port);
132 }
133
134
135 /**
136 * Constructor.
137 * @param threads number of threads to use.
138 * @param threadsPerNode threads per node to use.
139 * @param port server port to use.
140 */
141 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) {
142 this(threads, threadsPerNode, new ThreadPool(threads), port);
143 }
144
145
146 /**
147 * Constructor.
148 * @param threads number of threads to use.
149 * @param pool ThreadPool to use.
150 * @param port server port to use.
151 */
152 public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) {
153 this(threads, DEFAULT_THREADS_PER_NODE, pool, port);
154 }
155
156
157 /**
158 * Constructor.
159 * @param threads number of threads to use.
160 * @param threadsPerNode threads per node to use.
161 * @param pl pair selection strategy
162 * @param port server port to use.
163 */
164 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) {
165 this(threads, threadsPerNode, new ThreadPool(threads), pl, port);
166 }
167
168
169 /**
170 * Constructor.
171 * @param threads number of threads to use.
172 * @param threadsPerNode threads per node to use.
173 * @param port server port to use.
174 */
175 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) {
176 this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
177 }
178
179
180 /**
181 * Constructor.
182 * @param threads number of threads to use.
183 * @param threadsPerNode threads per node to use.
184 * @param pool ThreadPool to use.
185 * @param pl pair selection strategy
186 * @param port server port to use.
187 */
188 public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl, int port) {
189 super( new ReductionPar<C>(), pl );
190 if (threads < 1) {
191 threads = 1;
192 }
193 this.threads = threads;
194 this.threadsPerNode = threadsPerNode;
195 this.pool = pool;
196 this.port = port;
197 //logger.info("generated pool: " + pool);
198 }
199
200
201 /**
202 * Cleanup and terminate.
203 */
204 public void terminate() {
205 if (pool == null) {
206 return;
207 }
208 pool.terminate();
209 }
210
211
212 /**
213 * Distributed hybrid Groebner base.
214 * @param modv number of module variables.
215 * @param F polynomial list.
216 * @return GB(F) a Groebner base of F or null, if a IOException occurs.
217 */
218 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
219 long t = System.currentTimeMillis();
220 final int DL_PORT = port + 100;
221 ChannelFactory cf = new ChannelFactory(port);
222 cf.init();
223 DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT);
224 dls.init();
225 logger.debug("dist-list server running");
226
227 GenPolynomial<C> p;
228 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
229 PairList<C> pairlist = null;
230 boolean oneInGB = false;
231 int l = F.size();
232 int unused;
233 ListIterator<GenPolynomial<C>> it = F.listIterator();
234 while (it.hasNext()) {
235 p = it.next();
236 if (p.length() > 0) {
237 p = p.monic();
238 if (p.isONE()) {
239 oneInGB = true;
240 G.clear();
241 G.add(p);
242 //return G; must signal termination to others
243 }
244 if (!oneInGB) {
245 G.add(p);
246 }
247 if (pairlist == null) {
248 //pairlist = new OrderedPairlist<C>(modv, p.ring);
249 pairlist = strategy.create( modv, p.ring );
250 if ( ! p.ring.coFac.isField() ) {
251 throw new IllegalArgumentException("coefficients not from a field");
252 }
253 }
254 // theList not updated here
255 if (p.isONE()) {
256 unused = pairlist.putOne();
257 } else {
258 unused = pairlist.put(p);
259 }
260 } else {
261 l--;
262 }
263 }
264 if (l <= 1) {
265 //return G; must signal termination to others
266 }
267 logger.info("pairlist " + pairlist);
268
269 logger.debug("looking for clients");
270 //long t = System.currentTimeMillis();
271 // now in DL, uses resend for late clients
272 //while ( dls.size() < threads ) { sleep(); }
273
274 DistHashTable<Integer, GenPolynomial<C>> theList
275 = new DistHashTable<Integer, GenPolynomial<C>>("localhost", DL_PORT);
276 List<GenPolynomial<C>> al = pairlist.getList();
277 for (int i = 0; i < al.size(); i++) {
278 // no wait required
279 GenPolynomial<C> nn = theList.put(new Integer(i), al.get(i));
280 if (nn != null) {
281 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
282 }
283 }
284
285 Terminator finner = new Terminator(threads*threadsPerNode);
286 HybridReducerServer<C> R;
287 logger.info("using pool = " + pool);
288 for (int i = 0; i < threads; i++) {
289 R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist);
290 pool.addJob(R);
291 //logger.info("server submitted " + R);
292 }
293 logger.info("main loop waiting " + finner);
294 finner.waitDone();
295 int ps = theList.size();
296 logger.info("#distributed list = " + ps);
297 // make sure all polynomials arrived: not needed in master
298 // G = (ArrayList)theList.values();
299 G = pairlist.getList();
300 if (ps != G.size()) {
301 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
302 }
303 for (GenPolynomial<C> q: theList.getValueList()) {
304 if ( q != null && !q.isZERO() ) {
305 logger.debug("final q = " + q.leadingExpVector());
306 }
307 }
308 logger.debug("distributed list end");
309 long time = System.currentTimeMillis();
310 List<GenPolynomial<C>> Gp;
311 Gp = minimalGB(G); // not jet distributed but threaded
312 time = System.currentTimeMillis() - time;
313 logger.info("parallel gbmi time = " + time);
314 G = Gp;
315 logger.debug("server cf.terminate()");
316 cf.terminate();
317 // no more required //
318 logger.info("server not pool.terminate() " + pool);
319 //pool.terminate();
320 logger.info("server theList.terminate() " + theList.size());
321 theList.terminate();
322 logger.info("server dls.terminate() " + dls);
323 dls.terminate();
324 t = System.currentTimeMillis() - t;
325 logger.info("server GB end, time = " + t + ", " + pairlist.toString());
326 return G;
327 }
328
329
330 /**
331 * GB distributed client.
332 * @param host the server runs on.
333 * @throws IOException
334 */
335 public void clientPart(String host) throws IOException {
336
337 ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
338 cf.init();
339 SocketChannel channel = cf.getChannel(host, port);
340 TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
341 pairChannel.init();
342
343 if (debug) {
344 logger.info("clientPart pairChannel = " + pairChannel);
345 }
346
347 final int DL_PORT = port + 100;
348 DistHashTable<Integer, GenPolynomial<C>> theList
349 = new DistHashTable<Integer, GenPolynomial<C>>(host, DL_PORT);
350
351 //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList);
352 //R.run();
353
354 ThreadPool pool = new ThreadPool(threadsPerNode);
355 logger.info("client using pool = " +pool);
356 for (int i = 0; i < threadsPerNode; i++) {
357 HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList);
358 pool.addJob(Rr);
359 }
360 if (debug) {
361 logger.info("clients submitted");
362 }
363 pool.terminate();
364 logger.info("client pool.terminate()");
365
366 pairChannel.close();
367 logger.info("client pairChannel.close()");
368
369 theList.terminate();
370 cf.terminate();
371 logger.info("client cf.terminate()");
372
373 channel.close();
374 logger.info("client channel.close()");
375 return;
376 }
377
378
379 /**
380 * Minimal ordered groebner basis.
381 * @param Fp a Groebner base.
382 * @return a reduced Groebner base of Fp.
383 */
384 @Override
385 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
386 GenPolynomial<C> a;
387 ArrayList<GenPolynomial<C>> G;
388 G = new ArrayList<GenPolynomial<C>>(Fp.size());
389 ListIterator<GenPolynomial<C>> it = Fp.listIterator();
390 while (it.hasNext()) {
391 a = it.next();
392 if (a.length() != 0) { // always true
393 // already monic a = a.monic();
394 G.add(a);
395 }
396 }
397 if (G.size() <= 1) {
398 return G;
399 }
400
401 ExpVector e;
402 ExpVector f;
403 GenPolynomial<C> p;
404 ArrayList<GenPolynomial<C>> F;
405 F = new ArrayList<GenPolynomial<C>>(G.size());
406 boolean mt;
407
408 while (G.size() > 0) {
409 a = G.remove(0);
410 e = a.leadingExpVector();
411
412 it = G.listIterator();
413 mt = false;
414 while (it.hasNext() && !mt) {
415 p = it.next();
416 f = p.leadingExpVector();
417 mt = e.multipleOf(f);
418 }
419 it = F.listIterator();
420 while (it.hasNext() && !mt) {
421 p = it.next();
422 f = p.leadingExpVector();
423 mt = e.multipleOf(f);
424 }
425 if (!mt) {
426 F.add(a);
427 } else {
428 // System.out.println("dropped " + a.length());
429 }
430 }
431 G = F;
432 if (G.size() <= 1) {
433 return G;
434 }
435 Collections.reverse(G); // important for lex GB
436
437 MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
438 int i = 0;
439 F = new ArrayList<GenPolynomial<C>>(G.size());
440 while (G.size() > 0) {
441 a = G.remove(0);
442 // System.out.println("doing " + a.length());
443 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size()+F.size());
444 R.addAll(G);
445 R.addAll(F);
446 mirs[i] = new MiReducerServer<C>(R, a);
447 pool.addJob(mirs[i]);
448 i++;
449 F.add(a);
450 }
451 G = F;
452 F = new ArrayList<GenPolynomial<C>>(G.size());
453 for (i = 0; i < mirs.length; i++) {
454 a = mirs[i].getNF();
455 F.add(a);
456 }
457 return F;
458 }
459
460 }
461
462
463 /**
464 * Distributed server reducing worker proxy threads.
465 * @param <C> coefficient type
466 */
467
468 class HybridReducerServer<C extends RingElem<C>> implements Runnable {
469
470
471 public static final Logger logger = Logger.getLogger(HybridReducerServer.class);
472
473
474 public final boolean debug = logger.isDebugEnabled();
475
476
477 private final Terminator finner;
478
479
480 private final ChannelFactory cf;
481
482
483 private TaggedSocketChannel pairChannel;
484
485
486 private final DistHashTable<Integer, GenPolynomial<C>> theList;
487
488
489 private final PairList<C> pairlist;
490
491
492 private final int threadsPerNode;
493
494
495 /**
496 * Message tag for pairs.
497 */
498 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
499
500
501 /**
502 * Message tag for results.
503 */
504 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
505
506
507 /**
508 * Message tag for acknowledgments.
509 */
510 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
511
512
513 /**
514 * Constructor.
515 * @param tpn number of threads per node
516 * @param fin terminator
517 * @param cf channel factory
518 * @param dl distributed hash table
519 * @param L ordered pair list
520 */
521 HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf,
522 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
523 threadsPerNode = tpn;
524 finner = fin;
525 this.cf = cf;
526 theList = dl;
527 pairlist = L;
528 //logger.info("reducer server created " + this);
529 }
530
531
532 /**
533 * Work loop.
534 * @see java.lang.Runnable#run()
535 */
536 //JAVA6only: @Override
537 public void run() {
538 logger.info("reducer server running with " + cf);
539 SocketChannel channel = null;
540 try {
541 channel = cf.getChannel();
542 pairChannel = new TaggedSocketChannel(channel);
543 pairChannel.init();
544 } catch (InterruptedException e) {
545 logger.debug("get pair channel interrupted");
546 e.printStackTrace();
547 return;
548 }
549 if (debug) {
550 logger.info("pairChannel = " + pairChannel);
551 }
552 // record idle remote workers (minus one?)
553 //finner.beIdle(threadsPerNode-1);
554 finner.initIdle(threadsPerNode);
555 AtomicInteger active = new AtomicInteger(0);
556
557 // start receiver
558 HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active, pairChannel, theList, pairlist);
559 receiver.start();
560
561 Pair<C> pair;
562 boolean set = false;
563 boolean goon = true;
564 int polIndex = -1;
565 int red = 0;
566 int sleeps = 0;
567
568 // while more requests
569 while (goon) {
570 // receive request if thread is reported incactive
571 logger.debug("receive request");
572 Object req = null;
573 try {
574 req = pairChannel.receive(pairTag);
575 } catch (InterruptedException e) {
576 goon = false;
577 e.printStackTrace();
578 } catch (IOException e) {
579 goon = false;
580 e.printStackTrace();
581 } catch (ClassNotFoundException e) {
582 goon = false;
583 e.printStackTrace();
584 }
585 logger.info("received request, req = " + req);
586 if (req == null) {
587 goon = false;
588 break;
589 }
590 if (!(req instanceof GBTransportMessReq)) {
591 goon = false;
592 break;
593 }
594
595 // find pair and manage termination status
596 logger.info("find pair");
597 while (!pairlist.hasNext()) { // wait
598 if (!finner.hasJobs() && !pairlist.hasNext()) {
599 goon = false;
600 break;
601 }
602 try {
603 sleeps++;
604 //if (sleeps % 10 == 0) {
605 logger.info("waiting for reducers, remaining = " + finner.getJobs());
606 //}
607 Thread.sleep(100);
608 } catch (InterruptedException e) {
609 goon = false;
610 break;
611 }
612 }
613 if (!pairlist.hasNext() && !finner.hasJobs()) {
614 logger.info("termination detection: no pairs and no jobs left");
615 goon = false;
616 break; //continue; //break?
617 }
618 finner.notIdle(); // before pairlist get!!
619 pair = pairlist.removeNext();
620 // send pair to client, even if null
621 if ( debug ) {
622 logger.info("active count = " + active.get());
623 logger.info("send pair = " + pair);
624 }
625 GBTransportMess msg = null;
626 if (pair != null) {
627 msg = new GBTransportMessPairIndex(pair);
628 } else {
629 msg = new GBTransportMess(); //not End(); at this time
630 // goon ?= false;
631 }
632 try {
633 red++;
634 pairChannel.send(pairTag, msg);
635 int a = active.getAndIncrement();
636 } catch (IOException e) {
637 e.printStackTrace();
638 goon = false;
639 break;
640 }
641 //logger.debug("#distributed list = " + theList.size());
642 }
643 logger.info("terminated, send " + red + " reduction pairs");
644
645 /*
646 * send end mark to clients
647 */
648 logger.debug("send end");
649 try {
650 for ( int i = 0; i < threadsPerNode; i++ ) { // -1
651 //do not wait: Object rq = pairChannel.receive(pairTag);
652 pairChannel.send(pairTag, new GBTransportMessEnd());
653 }
654 // send also end to receiver
655 pairChannel.send(resultTag, new GBTransportMessEnd());
656 //beware of race condition
657 } catch (IOException e) {
658 if (logger.isDebugEnabled()) {
659 e.printStackTrace();
660 }
661 }
662 receiver.terminate();
663
664 int d = active.get();
665 logger.info("remaining active tasks = " + d);
666 //logger.info("terminated, send " + red + " reduction pairs");
667 pairChannel.close();
668 logger.info("redServ pairChannel.close()");
669 finner.release();
670
671 channel.close();
672 logger.info("redServ channel.close()");
673 }
674 }
675
676
677 /**
678 * Distributed server receiving worker thread.
679 * @param <C> coefficient type
680 */
681
682 class HybridReducerReceiver<C extends RingElem<C>> extends Thread {
683
684
685 public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class);
686
687
688 public final boolean debug = logger.isDebugEnabled();
689
690
691 private final DistHashTable<Integer, GenPolynomial<C>> theList;
692
693
694 private final PairList<C> pairlist;
695
696
697 private final TaggedSocketChannel pairChannel;
698
699
700 private final Terminator finner;
701
702
703 private final int threadsPerNode;
704
705
706 private final AtomicInteger active;
707
708
709 private volatile boolean goon;
710
711
712 /**
713 * Message tag for pairs.
714 */
715 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
716
717
718 /**
719 * Message tag for results.
720 */
721 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
722
723
724 /**
725 * Message tag for acknowledgments.
726 */
727 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
728
729
730 /**
731 * Constructor.
732 * @param tpn number of threads per node
733 * @param fin terminator
734 * @param a active remote tasks count
735 * @param pc tagged socket channel
736 * @param dl distributed hash table
737 * @param L ordered pair list
738 */
739 HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc,
740 DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
741 active = a;
742 threadsPerNode = tpn;
743 finner = fin;
744 pairChannel = pc;
745 theList = dl;
746 pairlist = L;
747 goon = true;
748 //logger.info("reducer server created " + this);
749 }
750
751
752 /**
753 * Work loop.
754 * @see java.lang.Thread#run()
755 */
756 @Override
757 public void run() {
758 //Pair<C> pair = null;
759 GenPolynomial<C> H = null;
760 int red = 0;
761 int polIndex = -1;
762 //Integer senderId; // obsolete
763
764 // while more requests
765 while (goon) {
766 // receive request
767 logger.debug("receive result");
768 //senderId = null;
769 Object rh = null;
770 try {
771 rh = pairChannel.receive(resultTag);
772 int i = active.getAndDecrement();
773 } catch (InterruptedException e) {
774 goon = false;
775 //e.printStackTrace();
776 //?? finner.initIdle(1);
777 break;
778 } catch (IOException e) {
779 e.printStackTrace();
780 goon = false;
781 finner.initIdle(1);
782 break;
783 } catch (ClassNotFoundException e) {
784 e.printStackTrace();
785 goon = false;
786 finner.initIdle(1);
787 break;
788 }
789 logger.info("received H polynomial");
790 if (rh == null) {
791 if (this.isInterrupted()) {
792 goon = false;
793 finner.initIdle(1);
794 break;
795 }
796 //finner.initIdle(1);
797 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
798 logger.info("received GBTransportMessEnd");
799 goon = false;
800 //?? finner.initIdle(1);
801 break;
802 } else if (rh instanceof GBTransportMessPoly) {
803 // update pair list
804 red++;
805 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
806 H = mpi.pol;
807 //senderId = mpi.threadId;
808 if (H != null) {
809 if (debug) {
810 logger.info("H = " + H.leadingExpVector());
811 }
812 if (!H.isZERO()) {
813 if (H.isONE()) {
814 // finner.allIdle();
815 polIndex = pairlist.putOne();
816 GenPolynomial<C> nn = theList.put(new Integer(polIndex), H);
817 if (nn != null) {
818 logger.info("double polynomials nn = " + nn + ", H = " + H);
819 }
820 //goon = false; must wait for other clients
821 //finner.initIdle(1);
822 //break;
823 } else {
824 polIndex = pairlist.put(H);
825 // use putWait ? but still not all distributed
826 GenPolynomial<C> nn = theList.put(new Integer(polIndex), H);
827 if (nn != null) {
828 logger.info("double polynomials nn = " + nn + ", H = " + H);
829 }
830 }
831 }
832 }
833 }
834 // only after recording in pairlist !
835 finner.initIdle(1);
836 // if ( senderId != null ) { // send acknowledgement after recording
837 try {
838 //pairChannel.send(senderId, new GBTransportMess());
839 pairChannel.send(ackTag, new GBTransportMess());
840 logger.debug("send acknowledgement");
841 } catch (IOException e) {
842 e.printStackTrace();
843 goon = false;
844 break;
845 }
846 //}
847 } // end while
848 goon = false;
849 logger.info("terminated, received " + red + " reductions");
850 }
851
852
853 /**
854 * Terminate.
855 */
856 public void terminate() {
857 goon = false;
858 this.interrupt();
859 try {
860 this.join();
861 } catch (InterruptedException e) {
862 // unfug Thread.currentThread().interrupt();
863 }
864 logger.debug("HybridReducerReceiver terminated");
865 }
866
867 }
868
869
870 /**
871 * Distributed clients reducing worker threads.
872 */
873
874 class HybridReducerClient<C extends RingElem<C>> implements Runnable {
875
876
877 private static final Logger logger = Logger.getLogger(HybridReducerClient.class);
878
879
880 public final boolean debug = logger.isDebugEnabled();
881
882
883 private final TaggedSocketChannel pairChannel;
884
885
886 private final DistHashTable<Integer, GenPolynomial<C>> theList;
887
888
889 private final ReductionPar<C> red;
890
891
892 private final int threadsPerNode;
893
894
895 /*
896 * Identification number for this thread.
897 */
898 //public final Integer threadId; // obsolete
899
900
901 /**
902 * Message tag for pairs.
903 */
904 public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
905
906
907 /**
908 * Message tag for results.
909 */
910 public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
911
912
913 /**
914 * Message tag for acknowledgments.
915 */
916 public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
917
918
919 /**
920 * Constructor.
921 * @param tpn number of threads per node
922 * @param tc tagged socket channel
923 * @param tid thread identification
924 * @param dl distributed hash table
925 */
926 HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid, DistHashTable<Integer, GenPolynomial<C>> dl) {
927 this.threadsPerNode = tpn;
928 pairChannel = tc;
929 //threadId = 100 + tid; // keep distinct from other tags
930 theList = dl;
931 red = new ReductionPar<C>();
932 }
933
934
935 /**
936 * Work loop.
937 * @see java.lang.Runnable#run()
938 */
939 //JAVA6only: @Override
940 public void run() {
941 if (debug) {
942 logger.info("pairChannel = " + pairChannel + " reducer client running");
943 }
944 Pair<C> pair = null;
945 GenPolynomial<C> pi;
946 GenPolynomial<C> pj;
947 GenPolynomial<C> S;
948 GenPolynomial<C> H = null;
949 //boolean set = false;
950 boolean goon = true;
951 boolean doEnd = false;
952 int reduction = 0;
953 //int sleeps = 0;
954 Integer pix;
955 Integer pjx;
956
957 while (goon) {
958 /* protocol:
959 * request pair, process pair, send result, receive acknowledgment
960 */
961 // pair = (Pair) pairlist.removeNext();
962 Object req = new GBTransportMessReq();
963 logger.info("send request = " + req);
964 try {
965 pairChannel.send(pairTag, req);
966 } catch (IOException e) {
967 goon = false;
968 if ( logger.isDebugEnabled() ) {
969 e.printStackTrace();
970 }
971 logger.info("receive pair, exception ");
972 break;
973 }
974 logger.debug("receive pair, goon = " + goon);
975 doEnd = false;
976 Object pp = null;
977 try {
978 pp = pairChannel.receive(pairTag);
979 } catch (InterruptedException e) {
980 goon = false;
981 e.printStackTrace();
982 } catch (IOException e) {
983 goon = false;
984 if (logger.isDebugEnabled()) {
985 e.printStackTrace();
986 }
987 break;
988 } catch (ClassNotFoundException e) {
989 goon = false;
990 e.printStackTrace();
991 }
992 if (debug) {
993 logger.info("received pair = " + pp);
994 }
995 H = null;
996 if (pp == null) { // should not happen
997 continue;
998 }
999 if (pp instanceof GBTransportMessEnd) {
1000 goon = false;
1001 doEnd = true;
1002 continue;
1003 }
1004 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1005 pi = pj = null;
1006 if (pp instanceof GBTransportMessPair) {
1007 pair = ((GBTransportMessPair<C>) pp).pair;
1008 if (pair != null) {
1009 pi = pair.pi;
1010 pj = pair.pj;
1011 //logger.debug("pair: pix = " + pair.i
1012 // + ", pjx = " + pair.j);
1013 }
1014 }
1015 if (pp instanceof GBTransportMessPairIndex) {
1016 pix = ((GBTransportMessPairIndex) pp).i;
1017 pjx = ((GBTransportMessPairIndex) pp).j;
1018 pi = (GenPolynomial<C>) theList.getWait(pix);
1019 pj = (GenPolynomial<C>) theList.getWait(pjx);
1020 //logger.info("pix = " + pix + ", pjx = " +pjx);
1021 }
1022
1023 if (pi != null && pj != null) {
1024 S = red.SPolynomial(pi, pj);
1025 //System.out.println("S = " + S);
1026 if (S.isZERO()) {
1027 // pair.setZero(); does not work in dist
1028 } else {
1029 if (logger.isDebugEnabled()) {
1030 logger.debug("ht(S) = " + S.leadingExpVector());
1031 }
1032 H = red.normalform(theList, S);
1033 reduction++;
1034 if (H.isZERO()) {
1035 // pair.setZero(); does not work in dist
1036 } else {
1037 H = H.monic();
1038 if (logger.isInfoEnabled()) {
1039 logger.info("ht(H) = " + H.leadingExpVector());
1040 }
1041 }
1042 }
1043 }
1044 }
1045 if (pp instanceof GBTransportMess) {
1046 logger.debug("null pair results in null H poly");
1047 }
1048
1049 // send H or must send null, if not at end
1050 if (logger.isDebugEnabled()) {
1051 logger.debug("#distributed list = " + theList.size());
1052 logger.debug("send H polynomial = " + H);
1053 }
1054 try {
1055 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1056 doEnd = true;
1057 } catch (IOException e) {
1058 goon = false;
1059 e.printStackTrace();
1060 }
1061 logger.info("done send poly message of " + pp);
1062 try {
1063 //pp = pairChannel.receive(threadId);
1064 pp = pairChannel.receive(ackTag);
1065 } catch (InterruptedException e) {
1066 goon = false;
1067 e.printStackTrace();
1068 } catch (IOException e) {
1069 goon = false;
1070 if (logger.isDebugEnabled()) {
1071 e.printStackTrace();
1072 }
1073 break;
1074 } catch (ClassNotFoundException e) {
1075 goon = false;
1076 e.printStackTrace();
1077 }
1078 if ( ! (pp instanceof GBTransportMess) ) {
1079 logger.error("invalid acknowledgement " + pp);
1080 }
1081 logger.info("received acknowledgment " + pp);
1082 }
1083 logger.info("terminated, done " + reduction + " reductions");
1084 if ( !doEnd ) {
1085 try {
1086 pairChannel.send(resultTag, new GBTransportMessEnd());
1087 } catch (IOException e) {
1088 //e.printStackTrace();
1089 }
1090 logger.info("terminated, send done");
1091 }
1092 }
1093 }