001/*
002 * $Id: GroebnerBaseDistributedHybrid.java 5313 2015-10-26 22:46:38Z kredel $
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.atomic.AtomicInteger;
014
015import org.apache.log4j.Logger;
016
017import edu.jas.poly.ExpVector;
018import edu.jas.poly.GenPolynomial;
019import edu.jas.structure.RingElem;
020import edu.jas.util.ChannelFactory;
021import edu.jas.util.DistHashTable;
022import edu.jas.util.DistHashTableServer;
023import edu.jas.util.SocketChannel;
024import edu.jas.util.TaggedSocketChannel;
025import edu.jas.util.Terminator;
026import edu.jas.util.ThreadPool;
027
028
029/**
030 * Groebner Base distributed hybrid algorithm. Implements a distributed memory
031 * with multi-core CPUs parallel version of Groebner bases. Using pairlist
032 * class, distributed multi-threaded tasks do reduction, one communication
033 * channel per remote node.
034 * @param <C> coefficient type
035 * @author Heinz Kredel
036 * @deprecated use GroebnerBaseDistributedHybridEC
037 */
038
039@Deprecated
040public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
041
042
043    public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class);
044
045
046    public final boolean debug = logger.isDebugEnabled();
047
048
049    /**
050     * Number of threads to use.
051     */
052    protected final int threads;
053
054
055    /**
056     * Default number of threads.
057     */
058    protected static final int DEFAULT_THREADS = 2;
059
060
061    /**
062     * Number of threads per node to use.
063     */
064    protected final int threadsPerNode;
065
066
067    /**
068     * Default number of threads per compute node.
069     */
070    protected static final int DEFAULT_THREADS_PER_NODE = 1;
071
072
073    /**
074     * Pool of threads to use.
075     */
076    //protected final ExecutorService pool; // not for single node tests
077    protected transient final ThreadPool pool;
078
079
080    /**
081     * Default server port.
082     */
083    protected static final int DEFAULT_PORT = 4711;
084
085
086    /**
087     * Server port to use.
088     */
089    protected final int port;
090
091
092    /**
093     * Message tag for pairs.
094     */
095    public static final Integer pairTag = Integer.valueOf(1);
096
097
098    /**
099     * Message tag for results.
100     */
101    public static final Integer resultTag = Integer.valueOf(2);
102
103
104    /**
105     * Message tag for acknowledgments.
106     */
107    public static final Integer ackTag = Integer.valueOf(3);
108
109
110    /**
111     * Constructor.
112     */
113    public GroebnerBaseDistributedHybrid() {
114        this(DEFAULT_THREADS, DEFAULT_PORT);
115    }
116
117
118    /**
119     * Constructor.
120     * @param threads number of threads to use.
121     */
122    public GroebnerBaseDistributedHybrid(int threads) {
123        this(threads, new ThreadPool(threads), DEFAULT_PORT);
124    }
125
126
127    /**
128     * Constructor.
129     * @param threads number of threads to use.
130     * @param port server port to use.
131     */
132    public GroebnerBaseDistributedHybrid(int threads, int port) {
133        this(threads, new ThreadPool(threads), port);
134    }
135
136
137    /**
138     * Constructor.
139     * @param threads number of threads to use.
140     * @param threadsPerNode threads per node to use.
141     * @param port server port to use.
142     */
143    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) {
144        this(threads, threadsPerNode, new ThreadPool(threads), port);
145    }
146
147
148    /**
149     * Constructor.
150     * @param threads number of threads to use.
151     * @param pool ThreadPool to use.
152     * @param port server port to use.
153     */
154    public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) {
155        this(threads, DEFAULT_THREADS_PER_NODE, pool, port);
156    }
157
158
159    /**
160     * Constructor.
161     * @param threads number of threads to use.
162     * @param threadsPerNode threads per node to use.
163     * @param pl pair selection strategy
164     * @param port server port to use.
165     */
166    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) {
167        this(threads, threadsPerNode, new ThreadPool(threads), pl, port);
168    }
169
170
171    /**
172     * Constructor.
173     * @param threads number of threads to use.
174     * @param threadsPerNode threads per node to use.
175     * @param port server port to use.
176     */
177    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) {
178        this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
179    }
180
181
182    /**
183     * Constructor.
184     * @param threads number of threads to use.
185     * @param threadsPerNode threads per node to use.
186     * @param pool ThreadPool to use.
187     * @param pl pair selection strategy
188     * @param port server port to use.
189     */
190    public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl,
191                    int port) {
192        super(new ReductionPar<C>(), pl);
193        if (threads < 1) {
194            threads = 1;
195        }
196        this.threads = threads;
197        this.threadsPerNode = threadsPerNode;
198        this.pool = pool;
199        this.port = port;
200        //logger.info("generated pool: " + pool);
201    }
202
203
204    /**
205     * Cleanup and terminate.
206     */
207    @Override
208    public void terminate() {
209        if (pool == null) {
210            return;
211        }
212        pool.terminate();
213    }
214
215
216    /**
217     * Distributed hybrid Groebner base.
218     * @param modv number of module variables.
219     * @param F polynomial list.
220     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
221     */
222    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
223        long t = System.currentTimeMillis();
224        final int DL_PORT = port + 100;
225        ChannelFactory cf = new ChannelFactory(port);
226        cf.init();
227        DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT);
228        dls.init();
229        logger.debug("dist-list server running");
230
231        GenPolynomial<C> p;
232        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
233        PairList<C> pairlist = null;
234        boolean oneInGB = false;
235        //nt l = F.size();
236        @SuppressWarnings("unused")
237        int unused;
238        ListIterator<GenPolynomial<C>> it = F.listIterator();
239        while (it.hasNext()) {
240            p = it.next();
241            if (p.length() > 0) {
242                p = p.monic();
243                if (p.isONE()) {
244                    oneInGB = true;
245                    G.clear();
246                    G.add(p);
247                    //return G; must signal termination to others
248                }
249                if (!oneInGB) {
250                    G.add(p);
251                }
252                if (pairlist == null) {
253                    //pairlist = new OrderedPairlist<C>(modv, p.ring);
254                    pairlist = strategy.create(modv, p.ring);
255                    if (!p.ring.coFac.isField()) {
256                        throw new IllegalArgumentException("coefficients not from a field");
257                    }
258                }
259                // theList not updated here
260                if (p.isONE()) {
261                    unused = pairlist.putOne();
262                } else {
263                    unused = pairlist.put(p);
264                }
265            } else {
266                //l--;
267            }
268        }
269        //if (l <= 1) {
270        //    return G; must signal termination to others
271        //}
272        logger.info("pairlist " + pairlist);
273
274        logger.debug("looking for clients");
275        //long t = System.currentTimeMillis();
276        // now in DL, uses resend for late clients
277        //while ( dls.size() < threads ) { sleep(); }
278
279        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(
280                        "localhost", DL_PORT);
281        theList.init();
282        List<GenPolynomial<C>> al = pairlist.getList();
283        if (G.size() != al.size()) {
284            logger.info("G != al: " + G + " != " + al);
285        }
286        for (int i = 0; i < al.size(); i++) {
287            // no wait required
288            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
289            if (nn != null) {
290                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
291            }
292        }
293
294        Terminator finner = new Terminator(threads * threadsPerNode);
295        HybridReducerServer<C> R;
296        logger.info("using pool = " + pool);
297        for (int i = 0; i < threads; i++) {
298            R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist);
299            pool.addJob(R);
300            //logger.info("server submitted " + R);
301        }
302        logger.info("main loop waiting " + finner);
303        finner.waitDone();
304        int ps = theList.size();
305        logger.info("#distributed list = " + ps);
306        // make sure all polynomials arrived: not needed in master
307        // G = (ArrayList)theList.values();
308        G = pairlist.getList();
309        if (ps != G.size()) {
310            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
311        }
312        for (GenPolynomial<C> q : theList.getValueList()) {
313            if (q != null && !q.isZERO()) {
314                logger.debug("final q = " + q.leadingExpVector());
315            }
316        }
317        logger.debug("distributed list end");
318        long time = System.currentTimeMillis();
319        List<GenPolynomial<C>> Gp;
320        Gp = minimalGB(G); // not jet distributed but threaded
321        time = System.currentTimeMillis() - time;
322        logger.info("parallel gbmi time = " + time);
323        G = Gp;
324        logger.debug("server cf.terminate()");
325        cf.terminate();
326        // no more required // 
327        logger.info("server not pool.terminate() " + pool);
328        //pool.terminate();
329        logger.info("server theList.terminate() " + theList.size());
330        theList.terminate();
331        logger.info("server dls.terminate() " + dls);
332        dls.terminate();
333        t = System.currentTimeMillis() - t;
334        logger.info("server GB end, time = " + t + ", " + pairlist.toString());
335        return G;
336    }
337
338
339    /**
340     * GB distributed client.
341     * @param host the server runs on.
342     * @throws IOException
343     */
344    public void clientPart(String host) throws IOException {
345
346        ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
347        cf.init();
348        SocketChannel channel = cf.getChannel(host, port);
349        TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
350        pairChannel.init();
351
352        if (debug) {
353            logger.info("clientPart pairChannel   = " + pairChannel);
354        }
355
356        final int DL_PORT = port + 100;
357        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host,
358                        DL_PORT);
359        theList.init();
360
361        //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList);
362        //R.run();
363
364        ThreadPool pool = new ThreadPool(threadsPerNode);
365        logger.info("client using pool = " + pool);
366        for (int i = 0; i < threadsPerNode; i++) {
367            HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList);
368            pool.addJob(Rr);
369        }
370        if (debug) {
371            logger.info("clients submitted");
372        }
373        pool.terminate();
374        logger.info("client pool.terminate()");
375
376        pairChannel.close();
377        logger.info("client pairChannel.close()");
378
379        theList.terminate();
380        cf.terminate();
381        logger.info("client cf.terminate()");
382
383        channel.close();
384        logger.info("client channel.close()");
385        return;
386    }
387
388
389    /**
390     * Minimal ordered groebner basis.
391     * @param Fp a Groebner base.
392     * @return a reduced Groebner base of Fp.
393     */
394    @SuppressWarnings("cast")
395    @Override
396    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
397        GenPolynomial<C> a;
398        ArrayList<GenPolynomial<C>> G;
399        G = new ArrayList<GenPolynomial<C>>(Fp.size());
400        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
401        while (it.hasNext()) {
402            a = it.next();
403            if (a.length() != 0) { // always true
404                // already monic  a = a.monic();
405                G.add(a);
406            }
407        }
408        if (G.size() <= 1) {
409            return G;
410        }
411
412        ExpVector e;
413        ExpVector f;
414        GenPolynomial<C> p;
415        ArrayList<GenPolynomial<C>> F;
416        F = new ArrayList<GenPolynomial<C>>(G.size());
417        boolean mt;
418
419        while (G.size() > 0) {
420            a = G.remove(0);
421            e = a.leadingExpVector();
422
423            it = G.listIterator();
424            mt = false;
425            while (it.hasNext() && !mt) {
426                p = it.next();
427                f = p.leadingExpVector();
428                mt = e.multipleOf(f);
429            }
430            it = F.listIterator();
431            while (it.hasNext() && !mt) {
432                p = it.next();
433                f = p.leadingExpVector();
434                mt = e.multipleOf(f);
435            }
436            if (!mt) {
437                F.add(a);
438            } else {
439                // System.out.println("dropped " + a.length());
440            }
441        }
442        G = F;
443        if (G.size() <= 1) {
444            return G;
445        }
446        Collections.reverse(G); // important for lex GB
447
448        MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
449        int i = 0;
450        F = new ArrayList<GenPolynomial<C>>(G.size());
451        while (G.size() > 0) {
452            a = G.remove(0);
453            // System.out.println("doing " + a.length());
454            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
455            R.addAll(G);
456            R.addAll(F);
457            mirs[i] = new MiReducerServer<C>(R, a);
458            pool.addJob(mirs[i]);
459            i++;
460            F.add(a);
461        }
462        G = F;
463        F = new ArrayList<GenPolynomial<C>>(G.size());
464        for (i = 0; i < mirs.length; i++) {
465            a = mirs[i].getNF();
466            F.add(a);
467        }
468        return F;
469    }
470
471}
472
473
474/**
475 * Distributed server reducing worker proxy threads.
476 * @param <C> coefficient type
477 */
478
479class HybridReducerServer<C extends RingElem<C>> implements Runnable {
480
481
482    public static final Logger logger = Logger.getLogger(HybridReducerServer.class);
483
484
485    public final boolean debug = logger.isDebugEnabled();
486
487
488    private final Terminator finner;
489
490
491    private final ChannelFactory cf;
492
493
494    private TaggedSocketChannel pairChannel;
495
496
497    private final DistHashTable<Integer, GenPolynomial<C>> theList;
498
499
500    private final PairList<C> pairlist;
501
502
503    private final int threadsPerNode;
504
505
506    /**
507     * Message tag for pairs.
508     */
509    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
510
511
512    /**
513     * Message tag for results.
514     */
515    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
516
517
518    /**
519     * Message tag for acknowledgments.
520     */
521    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
522
523
524    /**
525     * Constructor.
526     * @param tpn number of threads per node
527     * @param fin terminator
528     * @param cf channel factory
529     * @param dl distributed hash table
530     * @param L ordered pair list
531     */
532    HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf,
533                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
534        threadsPerNode = tpn;
535        finner = fin;
536        this.cf = cf;
537        theList = dl;
538        pairlist = L;
539        //logger.info("reducer server created " + this);
540    }
541
542
543    /**
544     * Work loop.
545     * @see java.lang.Runnable#run()
546     */
547    @Override
548    public void run() {
549        logger.info("reducer server running with " + cf);
550        SocketChannel channel = null;
551        try {
552            channel = cf.getChannel();
553            pairChannel = new TaggedSocketChannel(channel);
554            pairChannel.init();
555        } catch (InterruptedException e) {
556            logger.debug("get pair channel interrupted");
557            e.printStackTrace();
558            return;
559        }
560        if (debug) {
561            logger.info("pairChannel   = " + pairChannel);
562        }
563        // record idle remote workers (minus one?)
564        //finner.beIdle(threadsPerNode-1);
565        finner.initIdle(threadsPerNode);
566        AtomicInteger active = new AtomicInteger(0);
567
568        // start receiver
569        HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active,
570                        pairChannel, theList, pairlist);
571        receiver.start();
572
573        Pair<C> pair;
574        //boolean set = false;
575        boolean goon = true;
576        //int polIndex = -1;
577        int red = 0;
578        int sleeps = 0;
579
580        // while more requests
581        while (goon) {
582            // receive request if thread is reported incactive
583            logger.debug("receive request");
584            Object req = null;
585            try {
586                req = pairChannel.receive(pairTag);
587            } catch (InterruptedException e) {
588                goon = false;
589                e.printStackTrace();
590            } catch (IOException e) {
591                goon = false;
592                e.printStackTrace();
593            } catch (ClassNotFoundException e) {
594                goon = false;
595                e.printStackTrace();
596            }
597            logger.info("received request, req = " + req);
598            if (req == null) {
599                goon = false;
600                break;
601            }
602            if (!(req instanceof GBTransportMessReq)) {
603                goon = false;
604                break;
605            }
606
607            // find pair and manage termination status
608            logger.info("find pair");
609            while (!pairlist.hasNext()) { // wait
610                if (!finner.hasJobs() && !pairlist.hasNext()) {
611                    goon = false;
612                    break;
613                }
614                try {
615                    sleeps++;
616                    if (sleeps % 3 == 0) {
617                        logger.info("waiting for reducers, remaining = " + finner.getJobs());
618                    }
619                    Thread.sleep(100);
620                } catch (InterruptedException e) {
621                    goon = false;
622                    break;
623                }
624            }
625            if (!pairlist.hasNext() && !finner.hasJobs()) {
626                logger.info("termination detection: no pairs and no jobs left");
627                goon = false;
628                break; //continue; //break?
629            }
630            finner.notIdle(); // before pairlist get!!
631            pair = pairlist.removeNext();
632            // send pair to client, even if null
633            if (debug) {
634                logger.info("active count = " + active.get());
635                logger.info("send pair = " + pair);
636            }
637            GBTransportMess msg = null;
638            if (pair != null) {
639                msg = new GBTransportMessPairIndex(pair);
640            } else {
641                msg = new GBTransportMess(); //not End(); at this time
642                // goon ?= false;
643            }
644            try {
645                red++;
646                pairChannel.send(pairTag, msg);
647                int a = active.getAndIncrement();
648            } catch (IOException e) {
649                e.printStackTrace();
650                goon = false;
651                break;
652            }
653            //logger.debug("#distributed list = " + theList.size());
654        }
655        logger.info("terminated, send " + red + " reduction pairs");
656
657        /*
658         * send end mark to clients
659         */
660        logger.debug("send end");
661        try {
662            for (int i = 0; i < threadsPerNode; i++) { // -1
663                //do not wait: Object rq = pairChannel.receive(pairTag);
664                pairChannel.send(pairTag, new GBTransportMessEnd());
665            }
666            // send also end to receiver, no more, did not work
667            //pairChannel.send(resultTag, new GBTransportMessEnd());
668        } catch (IOException e) {
669            if (logger.isDebugEnabled()) {
670                e.printStackTrace();
671            }
672        }
673        receiver.terminate();
674
675        int d = active.get();
676        logger.info("remaining active tasks = " + d);
677        //logger.info("terminated, send " + red + " reduction pairs");
678        pairChannel.close();
679        logger.info("redServ pairChannel.close()");
680        finner.release();
681
682        channel.close();
683        logger.info("redServ channel.close()");
684    }
685}
686
687
688/**
689 * Distributed server receiving worker thread.
690 * @param <C> coefficient type
691 */
692
693class HybridReducerReceiver<C extends RingElem<C>> extends Thread {
694
695
696    public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class);
697
698
699    public final boolean debug = logger.isDebugEnabled();
700
701
702    private final DistHashTable<Integer, GenPolynomial<C>> theList;
703
704
705    private final PairList<C> pairlist;
706
707
708    private final TaggedSocketChannel pairChannel;
709
710
711    private final Terminator finner;
712
713
714    private final int threadsPerNode;
715
716
717    private final AtomicInteger active;
718
719
720    private volatile boolean goon;
721
722
723    /**
724     * Message tag for pairs.
725     */
726    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
727
728
729    /**
730     * Message tag for results.
731     */
732    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
733
734
735    /**
736     * Message tag for acknowledgments.
737     */
738    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
739
740
741    /**
742     * Constructor.
743     * @param tpn number of threads per node
744     * @param fin terminator
745     * @param a active remote tasks count
746     * @param pc tagged socket channel
747     * @param dl distributed hash table
748     * @param L ordered pair list
749     */
750    HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc,
751                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
752        active = a;
753        threadsPerNode = tpn;
754        finner = fin;
755        pairChannel = pc;
756        theList = dl;
757        pairlist = L;
758        goon = true;
759        //logger.info("reducer server created " + this);
760    }
761
762
763    /**
764     * Work loop.
765     * @see java.lang.Thread#run()
766     */
767    @Override
768    public void run() {
769        //Pair<C> pair = null;
770        GenPolynomial<C> H = null;
771        int red = 0;
772        int polIndex = -1;
773        //Integer senderId; // obsolete
774
775        // while more requests
776        while (goon) {
777            // receive request
778            logger.debug("receive result");
779            //senderId = null;
780            Object rh = null;
781            try {
782                rh = pairChannel.receive(resultTag);
783                int i = active.getAndDecrement();
784            } catch (InterruptedException e) {
785                goon = false;
786                //e.printStackTrace();
787                //?? finner.initIdle(1);
788                break;
789            } catch (IOException e) {
790                e.printStackTrace();
791                goon = false;
792                finner.initIdle(1);
793                break;
794            } catch (ClassNotFoundException e) {
795                e.printStackTrace();
796                goon = false;
797                finner.initIdle(1);
798                break;
799            }
800            logger.info("received H polynomial");
801            if (rh == null) {
802                if (this.isInterrupted()) {
803                    goon = false;
804                    finner.initIdle(1);
805                    break;
806                }
807                //finner.initIdle(1);
808            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
809                logger.info("received GBTransportMessEnd");
810                goon = false;
811                //?? finner.initIdle(1);
812                break;
813            } else if (rh instanceof GBTransportMessPoly) {
814                // update pair list
815                red++;
816                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
817                H = mpi.pol;
818                //senderId = mpi.threadId;
819                if (H != null) {
820                    if (debug) {
821                        logger.info("H = " + H.leadingExpVector());
822                    }
823                    if (!H.isZERO()) {
824                        if (H.isONE()) {
825                            // finner.allIdle();
826                            polIndex = pairlist.putOne();
827                            GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H);
828                            if (nn != null) {
829                                logger.info("double polynomials nn = " + nn + ", H = " + H);
830                            }
831                            //goon = false; must wait for other clients
832                            //finner.initIdle(1);
833                            //break;
834                        } else {
835                            polIndex = pairlist.put(H);
836                            // use putWait ? but still not all distributed
837                            GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H);
838                            if (nn != null) {
839                                logger.info("double polynomials nn = " + nn + ", H = " + H);
840                            }
841                        }
842                    }
843                }
844            }
845            // only after recording in pairlist !
846            finner.initIdle(1);
847            try {
848                pairChannel.send(ackTag, new GBTransportMess());
849                logger.debug("send acknowledgement");
850            } catch (IOException e) {
851                e.printStackTrace();
852                goon = false;
853                break;
854            }
855        } // end while
856        goon = false;
857        logger.info("terminated, received " + red + " reductions");
858    }
859
860
861    /**
862     * Terminate.
863     */
864    public void terminate() {
865        goon = false;
866        //this.interrupt();
867        try {
868            this.join();
869        } catch (InterruptedException e) {
870            // unfug Thread.currentThread().interrupt();
871        }
872        logger.debug("HybridReducerReceiver terminated");
873    }
874
875}
876
877
878/**
879 * Distributed clients reducing worker threads.
880 */
881
882class HybridReducerClient<C extends RingElem<C>> implements Runnable {
883
884
885    private static final Logger logger = Logger.getLogger(HybridReducerClient.class);
886
887
888    public final boolean debug = logger.isDebugEnabled();
889
890
891    private final TaggedSocketChannel pairChannel;
892
893
894    private final DistHashTable<Integer, GenPolynomial<C>> theList;
895
896
897    private final ReductionPar<C> red;
898
899
900    private final int threadsPerNode;
901
902
903    /*
904     * Identification number for this thread.
905     */
906    //public final Integer threadId; // obsolete
907
908
909    /**
910     * Message tag for pairs.
911     */
912    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
913
914
915    /**
916     * Message tag for results.
917     */
918    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
919
920
921    /**
922     * Message tag for acknowledgments.
923     */
924    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
925
926
927    /**
928     * Constructor.
929     * @param tpn number of threads per node
930     * @param tc tagged socket channel
931     * @param tid thread identification
932     * @param dl distributed hash table
933     */
934    HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid,
935                    DistHashTable<Integer, GenPolynomial<C>> dl) {
936        this.threadsPerNode = tpn;
937        pairChannel = tc;
938        //threadId = 100 + tid; // keep distinct from other tags
939        theList = dl;
940        red = new ReductionPar<C>();
941    }
942
943
944    /**
945     * Work loop.
946     * @see java.lang.Runnable#run()
947     */
948    @Override
949    public void run() {
950        if (debug) {
951            logger.info("pairChannel   = " + pairChannel + " reducer client running");
952        }
953        Pair<C> pair = null;
954        GenPolynomial<C> pi;
955        GenPolynomial<C> pj;
956        GenPolynomial<C> S;
957        GenPolynomial<C> H = null;
958        //boolean set = false;
959        boolean goon = true;
960        boolean doEnd = true;
961        int reduction = 0;
962        //int sleeps = 0;
963        Integer pix;
964        Integer pjx;
965
966        while (goon) {
967            /* protocol:
968             * request pair, process pair, send result, receive acknowledgment
969             */
970            // pair = (Pair) pairlist.removeNext();
971            Object req = new GBTransportMessReq();
972            logger.info("send request = " + req);
973            try {
974                pairChannel.send(pairTag, req);
975            } catch (IOException e) {
976                goon = false;
977                if (logger.isDebugEnabled()) {
978                    e.printStackTrace();
979                }
980                logger.info("receive pair, exception ");
981                break;
982            }
983            logger.debug("receive pair, goon = " + goon);
984            doEnd = true;
985            Object pp = null;
986            try {
987                pp = pairChannel.receive(pairTag);
988            } catch (InterruptedException e) {
989                goon = false;
990                e.printStackTrace();
991            } catch (IOException e) {
992                goon = false;
993                if (logger.isDebugEnabled()) {
994                    e.printStackTrace();
995                }
996                break;
997            } catch (ClassNotFoundException e) {
998                goon = false;
999                e.printStackTrace();
1000            }
1001            if (debug) {
1002                logger.info("received pair = " + pp);
1003            }
1004            H = null;
1005            if (pp == null) { // should not happen
1006                continue;
1007            }
1008            if (pp instanceof GBTransportMessEnd) {
1009                goon = false;
1010                //doEnd = false; // bug
1011                continue;
1012            }
1013            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1014                pi = pj = null;
1015                if (pp instanceof GBTransportMessPair) {
1016                    pair = ((GBTransportMessPair<C>) pp).pair;
1017                    if (pair != null) {
1018                        pi = pair.pi;
1019                        pj = pair.pj;
1020                        //logger.debug("pair: pix = " + pair.i 
1021                        //               + ", pjx = " + pair.j);
1022                    }
1023                }
1024                if (pp instanceof GBTransportMessPairIndex) {
1025                    pix = ((GBTransportMessPairIndex) pp).i;
1026                    pjx = ((GBTransportMessPairIndex) pp).j;
1027                    pi = theList.getWait(pix);
1028                    pj = theList.getWait(pjx);
1029                    //logger.info("pix = " + pix + ", pjx = " +pjx);
1030                }
1031
1032                if (pi != null && pj != null) {
1033                    S = red.SPolynomial(pi, pj);
1034                    //System.out.println("S   = " + S);
1035                    if (S.isZERO()) {
1036                        // pair.setZero(); does not work in dist
1037                    } else {
1038                        if (logger.isDebugEnabled()) {
1039                            logger.debug("ht(S) = " + S.leadingExpVector());
1040                        }
1041                        H = red.normalform(theList, S);
1042                        reduction++;
1043                        if (H.isZERO()) {
1044                            // pair.setZero(); does not work in dist
1045                        } else {
1046                            H = H.monic();
1047                            if (logger.isInfoEnabled()) {
1048                                logger.info("ht(H) = " + H.leadingExpVector());
1049                            }
1050                        }
1051                    }
1052                }
1053            }
1054            if (pp instanceof GBTransportMess) {
1055                logger.debug("null pair results in null H poly");
1056            }
1057
1058            // send H or must send null, if not at end
1059            if (logger.isDebugEnabled()) {
1060                logger.debug("#distributed list = " + theList.size());
1061                logger.debug("send H polynomial = " + H);
1062            }
1063            try {
1064                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1065                doEnd = false;
1066            } catch (IOException e) {
1067                goon = false;
1068                e.printStackTrace();
1069            }
1070            logger.info("done send poly message of " + pp);
1071            try {
1072                //pp = pairChannel.receive(threadId);
1073                pp = pairChannel.receive(ackTag);
1074            } catch (InterruptedException e) {
1075                goon = false;
1076                e.printStackTrace();
1077            } catch (IOException e) {
1078                goon = false;
1079                if (logger.isDebugEnabled()) {
1080                    e.printStackTrace();
1081                }
1082                break;
1083            } catch (ClassNotFoundException e) {
1084                goon = false;
1085                e.printStackTrace();
1086            }
1087            if (!(pp instanceof GBTransportMess)) {
1088                logger.error("invalid acknowledgement " + pp);
1089            }
1090            logger.info("received acknowledgment " + pp);
1091        }
1092        logger.info("terminated, done " + reduction + " reductions");
1093        if (doEnd) {
1094            try {
1095                pairChannel.send(resultTag, new GBTransportMessEnd());
1096            } catch (IOException e) {
1097                //e.printStackTrace();
1098            }
1099            logger.info("terminated, send done");
1100        }
1101    }
1102}