001/*
002 * $Id: GroebnerBaseDistributedHybrid.java 4261 2012-10-21 11:34:02Z kredel $
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.atomic.AtomicInteger;
014
015import org.apache.log4j.Logger;
016
017import edu.jas.poly.ExpVector;
018import edu.jas.poly.GenPolynomial;
019import edu.jas.structure.RingElem;
020import edu.jas.util.ChannelFactory;
021import edu.jas.util.DistHashTable;
022import edu.jas.util.DistHashTableServer;
023import edu.jas.util.SocketChannel;
024import edu.jas.util.TaggedSocketChannel;
025import edu.jas.util.Terminator;
026import edu.jas.util.ThreadPool;
027
028
029/**
030 * Groebner Base distributed hybrid algorithm. Implements a distributed memory
031 * with multi-core CPUs parallel version of Groebner bases. Using pairlist
032 * class, distributed multi-threaded tasks do reduction, one communication
033 * channel per remote node.
034 * @param <C> coefficient type
035 * @author Heinz Kredel
036 * @deprecated use GroebnerBaseDistributedHybridEC
037 */
038
039public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
040
041
042    public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class);
043
044
045    public final boolean debug = logger.isDebugEnabled();
046
047
048    /**
049     * Number of threads to use.
050     */
051    protected final int threads;
052
053
054    /**
055     * Default number of threads.
056     */
057    protected static final int DEFAULT_THREADS = 2;
058
059
060    /**
061     * Number of threads per node to use.
062     */
063    protected final int threadsPerNode;
064
065
066    /**
067     * Default number of threads per compute node.
068     */
069    protected static final int DEFAULT_THREADS_PER_NODE = 1;
070
071
072    /**
073     * Pool of threads to use.
074     */
075    //protected final ExecutorService pool; // not for single node tests
076    protected transient final ThreadPool pool;
077
078
079    /**
080     * Default server port.
081     */
082    protected static final int DEFAULT_PORT = 4711;
083
084
085    /**
086     * Server port to use.
087     */
088    protected final int port;
089
090
091    /**
092     * Message tag for pairs.
093     */
094    public static final Integer pairTag = Integer.valueOf(1);
095
096
097    /**
098     * Message tag for results.
099     */
100    public static final Integer resultTag = Integer.valueOf(2);
101
102
103    /**
104     * Message tag for acknowledgments.
105     */
106    public static final Integer ackTag = Integer.valueOf(3);
107
108
109    /**
110     * Constructor.
111     */
112    public GroebnerBaseDistributedHybrid() {
113        this(DEFAULT_THREADS, DEFAULT_PORT);
114    }
115
116
117    /**
118     * Constructor.
119     * @param threads number of threads to use.
120     */
121    public GroebnerBaseDistributedHybrid(int threads) {
122        this(threads, new ThreadPool(threads), DEFAULT_PORT);
123    }
124
125
126    /**
127     * Constructor.
128     * @param threads number of threads to use.
129     * @param port server port to use.
130     */
131    public GroebnerBaseDistributedHybrid(int threads, int port) {
132        this(threads, new ThreadPool(threads), port);
133    }
134
135
136    /**
137     * Constructor.
138     * @param threads number of threads to use.
139     * @param threadsPerNode threads per node to use.
140     * @param port server port to use.
141     */
142    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) {
143        this(threads, threadsPerNode, new ThreadPool(threads), port);
144    }
145
146
147    /**
148     * Constructor.
149     * @param threads number of threads to use.
150     * @param pool ThreadPool to use.
151     * @param port server port to use.
152     */
153    public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) {
154        this(threads, DEFAULT_THREADS_PER_NODE, pool, port);
155    }
156
157
158    /**
159     * Constructor.
160     * @param threads number of threads to use.
161     * @param threadsPerNode threads per node to use.
162     * @param pl pair selection strategy
163     * @param port server port to use.
164     */
165    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) {
166        this(threads, threadsPerNode, new ThreadPool(threads), pl, port);
167    }
168
169
170    /**
171     * Constructor.
172     * @param threads number of threads to use.
173     * @param threadsPerNode threads per node to use.
174     * @param port server port to use.
175     */
176    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) {
177        this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
178    }
179
180
181    /**
182     * Constructor.
183     * @param threads number of threads to use.
184     * @param threadsPerNode threads per node to use.
185     * @param pool ThreadPool to use.
186     * @param pl pair selection strategy
187     * @param port server port to use.
188     */
189    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl,
190                    int port) {
191        super(new ReductionPar<C>(), pl);
192        if (threads < 1) {
193            threads = 1;
194        }
195        this.threads = threads;
196        this.threadsPerNode = threadsPerNode;
197        this.pool = pool;
198        this.port = port;
199        //logger.info("generated pool: " + pool);
200    }
201
202
203    /**
204     * Cleanup and terminate.
205     */
206    @Override
207    public void terminate() {
208        if (pool == null) {
209            return;
210        }
211        pool.terminate();
212    }
213
214
215    /**
216     * Distributed hybrid Groebner base.
217     * @param modv number of module variables.
218     * @param F polynomial list.
219     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
220     */
221    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
222        long t = System.currentTimeMillis();
223        final int DL_PORT = port + 100;
224        ChannelFactory cf = new ChannelFactory(port);
225        cf.init();
226        DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT);
227        dls.init();
228        logger.debug("dist-list server running");
229
230        GenPolynomial<C> p;
231        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
232        PairList<C> pairlist = null;
233        boolean oneInGB = false;
234        int l = F.size();
235        int unused;
236        ListIterator<GenPolynomial<C>> it = F.listIterator();
237        while (it.hasNext()) {
238            p = it.next();
239            if (p.length() > 0) {
240                p = p.monic();
241                if (p.isONE()) {
242                    oneInGB = true;
243                    G.clear();
244                    G.add(p);
245                    //return G; must signal termination to others
246                }
247                if (!oneInGB) {
248                    G.add(p);
249                }
250                if (pairlist == null) {
251                    //pairlist = new OrderedPairlist<C>(modv, p.ring);
252                    pairlist = strategy.create(modv, p.ring);
253                    if (!p.ring.coFac.isField()) {
254                        throw new IllegalArgumentException("coefficients not from a field");
255                    }
256                }
257                // theList not updated here
258                if (p.isONE()) {
259                    unused = pairlist.putOne();
260                } else {
261                    unused = pairlist.put(p);
262                }
263            } else {
264                l--;
265            }
266        }
267        //if (l <= 1) {
268            //return G; must signal termination to others
269        //}
270        logger.info("pairlist " + pairlist);
271
272        logger.debug("looking for clients");
273        //long t = System.currentTimeMillis();
274        // now in DL, uses resend for late clients
275        //while ( dls.size() < threads ) { sleep(); }
276
277        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(
278                        "localhost", DL_PORT);
279        theList.init();
280        List<GenPolynomial<C>> al = pairlist.getList();
281        for (int i = 0; i < al.size(); i++) {
282            // no wait required
283            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
284            if (nn != null) {
285                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
286            }
287        }
288
289        Terminator finner = new Terminator(threads * threadsPerNode);
290        HybridReducerServer<C> R;
291        logger.info("using pool = " + pool);
292        for (int i = 0; i < threads; i++) {
293            R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist);
294            pool.addJob(R);
295            //logger.info("server submitted " + R);
296        }
297        logger.info("main loop waiting " + finner);
298        finner.waitDone();
299        int ps = theList.size();
300        logger.info("#distributed list = " + ps);
301        // make sure all polynomials arrived: not needed in master
302        // G = (ArrayList)theList.values();
303        G = pairlist.getList();
304        if (ps != G.size()) {
305            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
306        }
307        for (GenPolynomial<C> q : theList.getValueList()) {
308            if (q != null && !q.isZERO()) {
309                logger.debug("final q = " + q.leadingExpVector());
310            }
311        }
312        logger.debug("distributed list end");
313        long time = System.currentTimeMillis();
314        List<GenPolynomial<C>> Gp;
315        Gp = minimalGB(G); // not jet distributed but threaded
316        time = System.currentTimeMillis() - time;
317        logger.info("parallel gbmi time = " + time);
318        G = Gp;
319        logger.debug("server cf.terminate()");
320        cf.terminate();
321        // no more required // 
322        logger.info("server not pool.terminate() " + pool);
323        //pool.terminate();
324        logger.info("server theList.terminate() " + theList.size());
325        theList.terminate();
326        logger.info("server dls.terminate() " + dls);
327        dls.terminate();
328        t = System.currentTimeMillis() - t;
329        logger.info("server GB end, time = " + t + ", " + pairlist.toString());
330        return G;
331    }
332
333
334    /**
335     * GB distributed client.
336     * @param host the server runs on.
337     * @throws IOException
338     */
339    public void clientPart(String host) throws IOException {
340
341        ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
342        cf.init();
343        SocketChannel channel = cf.getChannel(host, port);
344        TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
345        pairChannel.init();
346
347        if (debug) {
348            logger.info("clientPart pairChannel   = " + pairChannel);
349        }
350
351        final int DL_PORT = port + 100;
352        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host,
353                        DL_PORT);
354        theList.init();
355
356        //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList);
357        //R.run();
358
359        ThreadPool pool = new ThreadPool(threadsPerNode);
360        logger.info("client using pool = " + pool);
361        for (int i = 0; i < threadsPerNode; i++) {
362            HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList);
363            pool.addJob(Rr);
364        }
365        if (debug) {
366            logger.info("clients submitted");
367        }
368        pool.terminate();
369        logger.info("client pool.terminate()");
370
371        pairChannel.close();
372        logger.info("client pairChannel.close()");
373
374        theList.terminate();
375        cf.terminate();
376        logger.info("client cf.terminate()");
377
378        channel.close();
379        logger.info("client channel.close()");
380        return;
381    }
382
383
384    /**
385     * Minimal ordered groebner basis.
386     * @param Fp a Groebner base.
387     * @return a reduced Groebner base of Fp.
388     */
389    @Override
390    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
391        GenPolynomial<C> a;
392        ArrayList<GenPolynomial<C>> G;
393        G = new ArrayList<GenPolynomial<C>>(Fp.size());
394        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
395        while (it.hasNext()) {
396            a = it.next();
397            if (a.length() != 0) { // always true
398                // already monic  a = a.monic();
399                G.add(a);
400            }
401        }
402        if (G.size() <= 1) {
403            return G;
404        }
405
406        ExpVector e;
407        ExpVector f;
408        GenPolynomial<C> p;
409        ArrayList<GenPolynomial<C>> F;
410        F = new ArrayList<GenPolynomial<C>>(G.size());
411        boolean mt;
412
413        while (G.size() > 0) {
414            a = G.remove(0);
415            e = a.leadingExpVector();
416
417            it = G.listIterator();
418            mt = false;
419            while (it.hasNext() && !mt) {
420                p = it.next();
421                f = p.leadingExpVector();
422                mt = e.multipleOf(f);
423            }
424            it = F.listIterator();
425            while (it.hasNext() && !mt) {
426                p = it.next();
427                f = p.leadingExpVector();
428                mt = e.multipleOf(f);
429            }
430            if (!mt) {
431                F.add(a);
432            } else {
433                // System.out.println("dropped " + a.length());
434            }
435        }
436        G = F;
437        if (G.size() <= 1) {
438            return G;
439        }
440        Collections.reverse(G); // important for lex GB
441
442        MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
443        int i = 0;
444        F = new ArrayList<GenPolynomial<C>>(G.size());
445        while (G.size() > 0) {
446            a = G.remove(0);
447            // System.out.println("doing " + a.length());
448            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
449            R.addAll(G);
450            R.addAll(F);
451            mirs[i] = new MiReducerServer<C>(R, a);
452            pool.addJob(mirs[i]);
453            i++;
454            F.add(a);
455        }
456        G = F;
457        F = new ArrayList<GenPolynomial<C>>(G.size());
458        for (i = 0; i < mirs.length; i++) {
459            a = mirs[i].getNF();
460            F.add(a);
461        }
462        return F;
463    }
464
465}
466
467
468/**
469 * Distributed server reducing worker proxy threads.
470 * @param <C> coefficient type
471 */
472
473class HybridReducerServer<C extends RingElem<C>> implements Runnable {
474
475
476    public static final Logger logger = Logger.getLogger(HybridReducerServer.class);
477
478
479    public final boolean debug = logger.isDebugEnabled();
480
481
482    private final Terminator finner;
483
484
485    private final ChannelFactory cf;
486
487
488    private TaggedSocketChannel pairChannel;
489
490
491    private final DistHashTable<Integer, GenPolynomial<C>> theList;
492
493
494    private final PairList<C> pairlist;
495
496
497    private final int threadsPerNode;
498
499
500    /**
501     * Message tag for pairs.
502     */
503    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
504
505
506    /**
507     * Message tag for results.
508     */
509    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
510
511
512    /**
513     * Message tag for acknowledgments.
514     */
515    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
516
517
518    /**
519     * Constructor.
520     * @param tpn number of threads per node
521     * @param fin terminator
522     * @param cf channel factory
523     * @param dl distributed hash table
524     * @param L ordered pair list
525     */
526    HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf,
527                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
528        threadsPerNode = tpn;
529        finner = fin;
530        this.cf = cf;
531        theList = dl;
532        pairlist = L;
533        //logger.info("reducer server created " + this);
534    }
535
536
537    /**
538     * Work loop.
539     * @see java.lang.Runnable#run()
540     */
541    //JAVA6only: @Override
542    public void run() {
543        logger.info("reducer server running with " + cf);
544        SocketChannel channel = null;
545        try {
546            channel = cf.getChannel();
547            pairChannel = new TaggedSocketChannel(channel);
548            pairChannel.init();
549        } catch (InterruptedException e) {
550            logger.debug("get pair channel interrupted");
551            e.printStackTrace();
552            return;
553        }
554        if (debug) {
555            logger.info("pairChannel   = " + pairChannel);
556        }
557        // record idle remote workers (minus one?)
558        //finner.beIdle(threadsPerNode-1);
559        finner.initIdle(threadsPerNode);
560        AtomicInteger active = new AtomicInteger(0);
561
562        // start receiver
563        HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active,
564                        pairChannel, theList, pairlist);
565        receiver.start();
566
567        Pair<C> pair;
568        //boolean set = false;
569        boolean goon = true;
570        //int polIndex = -1;
571        int red = 0;
572        int sleeps = 0;
573
574        // while more requests
575        while (goon) {
576            // receive request if thread is reported incactive
577            logger.debug("receive request");
578            Object req = null;
579            try {
580                req = pairChannel.receive(pairTag);
581            } catch (InterruptedException e) {
582                goon = false;
583                e.printStackTrace();
584            } catch (IOException e) {
585                goon = false;
586                e.printStackTrace();
587            } catch (ClassNotFoundException e) {
588                goon = false;
589                e.printStackTrace();
590            }
591            logger.info("received request, req = " + req);
592            if (req == null) {
593                goon = false;
594                break;
595            }
596            if (!(req instanceof GBTransportMessReq)) {
597                goon = false;
598                break;
599            }
600
601            // find pair and manage termination status
602            logger.info("find pair");
603            while (!pairlist.hasNext()) { // wait
604                if (!finner.hasJobs() && !pairlist.hasNext()) {
605                    goon = false;
606                    break;
607                }
608                try {
609                    sleeps++;
610                    //if (sleeps % 10 == 0) {
611                    logger.info("waiting for reducers, remaining = " + finner.getJobs());
612                    //}
613                    Thread.sleep(100);
614                } catch (InterruptedException e) {
615                    goon = false;
616                    break;
617                }
618            }
619            if (!pairlist.hasNext() && !finner.hasJobs()) {
620                logger.info("termination detection: no pairs and no jobs left");
621                goon = false;
622                break; //continue; //break?
623            }
624            finner.notIdle(); // before pairlist get!!
625            pair = pairlist.removeNext();
626            // send pair to client, even if null
627            if (debug) {
628                logger.info("active count = " + active.get());
629                logger.info("send pair = " + pair);
630            }
631            GBTransportMess msg = null;
632            if (pair != null) {
633                msg = new GBTransportMessPairIndex(pair);
634            } else {
635                msg = new GBTransportMess(); //not End(); at this time
636                // goon ?= false;
637            }
638            try {
639                red++;
640                pairChannel.send(pairTag, msg);
641                int a = active.getAndIncrement();
642            } catch (IOException e) {
643                e.printStackTrace();
644                goon = false;
645                break;
646            }
647            //logger.debug("#distributed list = " + theList.size());
648        }
649        logger.info("terminated, send " + red + " reduction pairs");
650
651        /*
652         * send end mark to clients
653         */
654        logger.debug("send end");
655        try {
656            for (int i = 0; i < threadsPerNode; i++) { // -1
657                //do not wait: Object rq = pairChannel.receive(pairTag);
658                pairChannel.send(pairTag, new GBTransportMessEnd());
659            }
660            // send also end to receiver, no more, did not work
661            //pairChannel.send(resultTag, new GBTransportMessEnd());
662        } catch (IOException e) {
663            if (logger.isDebugEnabled()) {
664                e.printStackTrace();
665            }
666        }
667        receiver.terminate();
668
669        int d = active.get();
670        logger.info("remaining active tasks = " + d);
671        //logger.info("terminated, send " + red + " reduction pairs");
672        pairChannel.close();
673        logger.info("redServ pairChannel.close()");
674        finner.release();
675
676        channel.close();
677        logger.info("redServ channel.close()");
678    }
679}
680
681
682/**
683 * Distributed server receiving worker thread.
684 * @param <C> coefficient type
685 */
686
687class HybridReducerReceiver<C extends RingElem<C>> extends Thread {
688
689
690    public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class);
691
692
693    public final boolean debug = logger.isDebugEnabled();
694
695
696    private final DistHashTable<Integer, GenPolynomial<C>> theList;
697
698
699    private final PairList<C> pairlist;
700
701
702    private final TaggedSocketChannel pairChannel;
703
704
705    private final Terminator finner;
706
707
708    private final int threadsPerNode;
709
710
711    private final AtomicInteger active;
712
713
714    private volatile boolean goon;
715
716
717    /**
718     * Message tag for pairs.
719     */
720    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
721
722
723    /**
724     * Message tag for results.
725     */
726    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
727
728
729    /**
730     * Message tag for acknowledgments.
731     */
732    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
733
734
735    /**
736     * Constructor.
737     * @param tpn number of threads per node
738     * @param fin terminator
739     * @param a active remote tasks count
740     * @param pc tagged socket channel
741     * @param dl distributed hash table
742     * @param L ordered pair list
743     */
744    HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc,
745                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
746        active = a;
747        threadsPerNode = tpn;
748        finner = fin;
749        pairChannel = pc;
750        theList = dl;
751        pairlist = L;
752        goon = true;
753        //logger.info("reducer server created " + this);
754    }
755
756
757    /**
758     * Work loop.
759     * @see java.lang.Thread#run()
760     */
761    @Override
762    public void run() {
763        //Pair<C> pair = null;
764        GenPolynomial<C> H = null;
765        int red = 0;
766        int polIndex = -1;
767        //Integer senderId; // obsolete
768
769        // while more requests
770        while (goon) {
771            // receive request
772            logger.debug("receive result");
773            //senderId = null;
774            Object rh = null;
775            try {
776                rh = pairChannel.receive(resultTag);
777                int i = active.getAndDecrement();
778            } catch (InterruptedException e) {
779                goon = false;
780                //e.printStackTrace();
781                //?? finner.initIdle(1);
782                break;
783            } catch (IOException e) {
784                e.printStackTrace();
785                goon = false;
786                finner.initIdle(1);
787                break;
788            } catch (ClassNotFoundException e) {
789                e.printStackTrace();
790                goon = false;
791                finner.initIdle(1);
792                break;
793            }
794            logger.info("received H polynomial");
795            if (rh == null) {
796                if (this.isInterrupted()) {
797                    goon = false;
798                    finner.initIdle(1);
799                    break;
800                }
801                //finner.initIdle(1);
802            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
803                logger.info("received GBTransportMessEnd");
804                goon = false;
805                //?? finner.initIdle(1);
806                break;
807            } else if (rh instanceof GBTransportMessPoly) {
808                // update pair list
809                red++;
810                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
811                H = mpi.pol;
812                //senderId = mpi.threadId;
813                if (H != null) {
814                    if (debug) {
815                        logger.info("H = " + H.leadingExpVector());
816                    }
817                    if (!H.isZERO()) {
818                        if (H.isONE()) {
819                            // finner.allIdle();
820                            polIndex = pairlist.putOne();
821                            GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H);
822                            if (nn != null) {
823                                logger.info("double polynomials nn = " + nn + ", H = " + H);
824                            }
825                            //goon = false; must wait for other clients
826                            //finner.initIdle(1);
827                            //break;
828                        } else {
829                            polIndex = pairlist.put(H);
830                            // use putWait ? but still not all distributed
831                            GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H);
832                            if (nn != null) {
833                                logger.info("double polynomials nn = " + nn + ", H = " + H);
834                            }
835                        }
836                    }
837                }
838            }
839            // only after recording in pairlist !
840            finner.initIdle(1);
841            try {
842                pairChannel.send(ackTag, new GBTransportMess());
843                logger.debug("send acknowledgement");
844            } catch (IOException e) {
845                e.printStackTrace();
846                goon = false;
847                break;
848            }
849        } // end while
850        goon = false;
851        logger.info("terminated, received " + red + " reductions");
852    }
853
854
855    /**
856     * Terminate.
857     */
858    public void terminate() {
859        goon = false;
860        //this.interrupt();
861        try {
862            this.join();
863        } catch (InterruptedException e) {
864            // unfug Thread.currentThread().interrupt();
865        }
866        logger.debug("HybridReducerReceiver terminated");
867    }
868
869}
870
871
872/**
873 * Distributed clients reducing worker threads.
874 */
875
876class HybridReducerClient<C extends RingElem<C>> implements Runnable {
877
878
879    private static final Logger logger = Logger.getLogger(HybridReducerClient.class);
880
881
882    public final boolean debug = logger.isDebugEnabled();
883
884
885    private final TaggedSocketChannel pairChannel;
886
887
888    private final DistHashTable<Integer, GenPolynomial<C>> theList;
889
890
891    private final ReductionPar<C> red;
892
893
894    private final int threadsPerNode;
895
896
897    /*
898     * Identification number for this thread.
899     */
900    //public final Integer threadId; // obsolete
901
902
903    /**
904     * Message tag for pairs.
905     */
906    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
907
908
909    /**
910     * Message tag for results.
911     */
912    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
913
914
915    /**
916     * Message tag for acknowledgments.
917     */
918    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
919
920
921    /**
922     * Constructor.
923     * @param tpn number of threads per node
924     * @param tc tagged socket channel
925     * @param tid thread identification
926     * @param dl distributed hash table
927     */
928    HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid,
929                    DistHashTable<Integer, GenPolynomial<C>> dl) {
930        this.threadsPerNode = tpn;
931        pairChannel = tc;
932        //threadId = 100 + tid; // keep distinct from other tags
933        theList = dl;
934        red = new ReductionPar<C>();
935    }
936
937
938    /**
939     * Work loop.
940     * @see java.lang.Runnable#run()
941     */
942    //JAVA6only: @Override
943    public void run() {
944        if (debug) {
945            logger.info("pairChannel   = " + pairChannel + " reducer client running");
946        }
947        Pair<C> pair = null;
948        GenPolynomial<C> pi;
949        GenPolynomial<C> pj;
950        GenPolynomial<C> S;
951        GenPolynomial<C> H = null;
952        //boolean set = false;
953        boolean goon = true;
954        boolean doEnd = true;
955        int reduction = 0;
956        //int sleeps = 0;
957        Integer pix;
958        Integer pjx;
959
960        while (goon) {
961            /* protocol:
962             * request pair, process pair, send result, receive acknowledgment
963             */
964            // pair = (Pair) pairlist.removeNext();
965            Object req = new GBTransportMessReq();
966            logger.info("send request = " + req);
967            try {
968                pairChannel.send(pairTag, req);
969            } catch (IOException e) {
970                goon = false;
971                if (logger.isDebugEnabled()) {
972                    e.printStackTrace();
973                }
974                logger.info("receive pair, exception ");
975                break;
976            }
977            logger.debug("receive pair, goon = " + goon);
978            doEnd = true;
979            Object pp = null;
980            try {
981                pp = pairChannel.receive(pairTag);
982            } catch (InterruptedException e) {
983                goon = false;
984                e.printStackTrace();
985            } catch (IOException e) {
986                goon = false;
987                if (logger.isDebugEnabled()) {
988                    e.printStackTrace();
989                }
990                break;
991            } catch (ClassNotFoundException e) {
992                goon = false;
993                e.printStackTrace();
994            }
995            if (debug) {
996                logger.info("received pair = " + pp);
997            }
998            H = null;
999            if (pp == null) { // should not happen
1000                continue;
1001            }
1002            if (pp instanceof GBTransportMessEnd) {
1003                goon = false;
1004                //doEnd = false; // bug
1005                continue;
1006            }
1007            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1008                pi = pj = null;
1009                if (pp instanceof GBTransportMessPair) {
1010                    pair = ((GBTransportMessPair<C>) pp).pair;
1011                    if (pair != null) {
1012                        pi = pair.pi;
1013                        pj = pair.pj;
1014                        //logger.debug("pair: pix = " + pair.i 
1015                        //               + ", pjx = " + pair.j);
1016                    }
1017                }
1018                if (pp instanceof GBTransportMessPairIndex) {
1019                    pix = ((GBTransportMessPairIndex) pp).i;
1020                    pjx = ((GBTransportMessPairIndex) pp).j;
1021                    pi = theList.getWait(pix);
1022                    pj = theList.getWait(pjx);
1023                    //logger.info("pix = " + pix + ", pjx = " +pjx);
1024                }
1025
1026                if (pi != null && pj != null) {
1027                    S = red.SPolynomial(pi, pj);
1028                    //System.out.println("S   = " + S);
1029                    if (S.isZERO()) {
1030                        // pair.setZero(); does not work in dist
1031                    } else {
1032                        if (logger.isDebugEnabled()) {
1033                            logger.debug("ht(S) = " + S.leadingExpVector());
1034                        }
1035                        H = red.normalform(theList, S);
1036                        reduction++;
1037                        if (H.isZERO()) {
1038                            // pair.setZero(); does not work in dist
1039                        } else {
1040                            H = H.monic();
1041                            if (logger.isInfoEnabled()) {
1042                                logger.info("ht(H) = " + H.leadingExpVector());
1043                            }
1044                        }
1045                    }
1046                }
1047            }
1048            if (pp instanceof GBTransportMess) {
1049                logger.debug("null pair results in null H poly");
1050            }
1051
1052            // send H or must send null, if not at end
1053            if (logger.isDebugEnabled()) {
1054                logger.debug("#distributed list = " + theList.size());
1055                logger.debug("send H polynomial = " + H);
1056            }
1057            try {
1058                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1059                doEnd = false;
1060            } catch (IOException e) {
1061                goon = false;
1062                e.printStackTrace();
1063            }
1064            logger.info("done send poly message of " + pp);
1065            try {
1066                //pp = pairChannel.receive(threadId);
1067                pp = pairChannel.receive(ackTag);
1068            } catch (InterruptedException e) {
1069                goon = false;
1070                e.printStackTrace();
1071            } catch (IOException e) {
1072                goon = false;
1073                if (logger.isDebugEnabled()) {
1074                    e.printStackTrace();
1075                }
1076                break;
1077            } catch (ClassNotFoundException e) {
1078                goon = false;
1079                e.printStackTrace();
1080            }
1081            if (!(pp instanceof GBTransportMess)) {
1082                logger.error("invalid acknowledgement " + pp);
1083            }
1084            logger.info("received acknowledgment " + pp);
1085        }
1086        logger.info("terminated, done " + reduction + " reductions");
1087        if (doEnd) {
1088            try {
1089                pairChannel.send(resultTag, new GBTransportMessEnd());
1090            } catch (IOException e) {
1091                //e.printStackTrace();
1092            }
1093            logger.info("terminated, send done");
1094        }
1095    }
1096}