001    /*
002     * $Id: GroebnerBaseDistributedHybrid.java 3626 2011-05-08 09:51:57Z kredel $
003     */
004    
005    package edu.jas.gb;
006    
007    
008    import java.io.IOException;
009    import java.util.ArrayList;
010    import java.util.List;
011    import java.util.ListIterator;
012    import java.util.Collections;
013    import java.util.concurrent.atomic.AtomicInteger;
014    
015    import org.apache.log4j.Logger;
016    
017    import edu.jas.poly.ExpVector;
018    import edu.jas.poly.GenPolynomial;
019    import edu.jas.structure.RingElem;
020    import edu.jas.util.ChannelFactory;
021    import edu.jas.util.DistHashTable;
022    import edu.jas.util.DistHashTableServer;
023    import edu.jas.util.SocketChannel;
024    import edu.jas.util.TaggedSocketChannel;
025    import edu.jas.util.Terminator;
026    import edu.jas.util.ThreadPool;
027    
028    
029    /**
030     * Groebner Base distributed hybrid algorithm. Implements a
031     * distributed memory with multi-core CPUs parallel version of
032     * Groebner bases. Using pairlist class, distributed multi-threaded
033     * tasks do reduction, one communication channel per remote node.
034     * @param <C> coefficient type
035     * @author Heinz Kredel
036     */
037    
038    public class GroebnerBaseDistributedHybrid<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
039    
040    
041        public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybrid.class);
042    
043    
044        public final boolean debug = logger.isDebugEnabled();
045    
046    
047        /**
048         * Number of threads to use.
049         */
050        protected final int threads;
051    
052    
053        /**
054         * Default number of threads.
055         */
056        protected static final int DEFAULT_THREADS = 2;
057    
058    
059        /**
060         * Number of threads per node to use.
061         */
062        protected final int threadsPerNode;
063    
064    
065        /**
066         * Default number of threads per compute node.
067         */
068        protected static final int DEFAULT_THREADS_PER_NODE = 1;
069    
070    
071        /**
072         * Pool of threads to use.
073         */
074        //protected final ExecutorService pool; // not for single node tests
075        protected final ThreadPool pool;
076    
077    
078        /**
079         * Default server port.
080         */
081        protected static final int DEFAULT_PORT = 4711;
082    
083    
084        /**
085         * Server port to use.
086         */
087        protected final int port;
088    
089    
090        /**
091         * Message tag for pairs.
092         */
093        public static final Integer pairTag = new Integer(1);
094    
095    
096        /**
097         * Message tag for results.
098         */
099        public static final Integer resultTag = new Integer(2);
100    
101    
102        /**
103         * Message tag for acknowledgments.
104         */
105        public static final Integer ackTag = new Integer(3);
106    
107    
108        /**
109         * Constructor.
110         */
111        public GroebnerBaseDistributedHybrid() {
112            this(DEFAULT_THREADS, DEFAULT_PORT);
113        }
114    
115    
116        /**
117         * Constructor.
118         * @param threads number of threads to use.
119         */
120        public GroebnerBaseDistributedHybrid(int threads) {
121            this(threads, new ThreadPool(threads), DEFAULT_PORT);
122        }
123    
124    
125        /**
126         * Constructor.
127         * @param threads number of threads to use.
128         * @param port server port to use.
129         */
130        public GroebnerBaseDistributedHybrid(int threads, int port) {
131            this(threads, new ThreadPool(threads), port);
132        }
133    
134    
135        /**
136         * Constructor.
137         * @param threads number of threads to use.
138         * @param threadsPerNode threads per node to use.
139         * @param port server port to use.
140         */
141        public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, int port) {
142            this(threads, threadsPerNode, new ThreadPool(threads), port);
143        }
144    
145    
146        /**
147         * Constructor.
148         * @param threads number of threads to use.
149         * @param pool ThreadPool to use.
150         * @param port server port to use.
151         */
152        public GroebnerBaseDistributedHybrid(int threads, ThreadPool pool, int port) {
153            this(threads, DEFAULT_THREADS_PER_NODE, pool, port);
154        }
155    
156    
157        /**
158         * Constructor.
159         * @param threads number of threads to use.
160         * @param threadsPerNode threads per node to use.
161         * @param pl pair selection strategy
162         * @param port server port to use.
163         */
164        public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, PairList<C> pl, int port) {
165            this(threads, threadsPerNode, new ThreadPool(threads), pl, port);
166        }
167    
168    
169        /**
170         * Constructor.
171         * @param threads number of threads to use.
172         * @param threadsPerNode threads per node to use.
173         * @param port server port to use.
174         */
175        public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, int port) {
176            this(threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
177        }
178    
179    
180        /**
181         * Constructor.
182         * @param threads number of threads to use.
183         * @param threadsPerNode threads per node to use.
184         * @param pool ThreadPool to use.
185         * @param pl pair selection strategy
186         * @param port server port to use.
187         */
188        public GroebnerBaseDistributedHybrid(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl, int port) {
189            super( new ReductionPar<C>(), pl );
190            if (threads < 1) {
191                threads = 1;
192            }
193            this.threads = threads;
194            this.threadsPerNode = threadsPerNode;
195            this.pool = pool;
196            this.port = port;
197            //logger.info("generated pool: " + pool);
198        }
199    
200    
201        /**
202         * Cleanup and terminate.
203         */
204        public void terminate() {
205            if (pool == null) {
206                return;
207            }
208            pool.terminate();
209        }
210    
211    
212        /**
213         * Distributed hybrid Groebner base. 
214         * @param modv number of module variables.
215         * @param F polynomial list.
216         * @return GB(F) a Groebner base of F or null, if a IOException occurs.
217         */
218        public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
219            long t = System.currentTimeMillis();
220            final int DL_PORT = port + 100;
221            ChannelFactory cf = new ChannelFactory(port);
222            cf.init();
223            DistHashTableServer<Integer> dls = new DistHashTableServer<Integer>(DL_PORT);
224            dls.init();
225            logger.debug("dist-list server running");
226    
227            GenPolynomial<C> p;
228            List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
229            PairList<C> pairlist = null;
230            boolean oneInGB = false;
231            int l = F.size();
232            int unused;
233            ListIterator<GenPolynomial<C>> it = F.listIterator();
234            while (it.hasNext()) {
235                p = it.next();
236                if (p.length() > 0) {
237                    p = p.monic();
238                    if (p.isONE()) {
239                        oneInGB = true;
240                        G.clear();
241                        G.add(p);
242                        //return G; must signal termination to others
243                    }
244                    if (!oneInGB) {
245                        G.add(p);
246                    }
247                    if (pairlist == null) {
248                        //pairlist = new OrderedPairlist<C>(modv, p.ring);
249                        pairlist = strategy.create( modv, p.ring );
250                        if ( ! p.ring.coFac.isField() ) {
251                            throw new IllegalArgumentException("coefficients not from a field");
252                        }
253                    }
254                    // theList not updated here
255                    if (p.isONE()) {
256                        unused = pairlist.putOne();
257                    } else {
258                        unused = pairlist.put(p);
259                    }
260                } else {
261                    l--;
262                }
263            }
264            if (l <= 1) {
265                //return G; must signal termination to others
266            }
267            logger.info("pairlist " + pairlist);
268    
269            logger.debug("looking for clients");
270            //long t = System.currentTimeMillis();
271            // now in DL, uses resend for late clients
272            //while ( dls.size() < threads ) { sleep(); }
273    
274            DistHashTable<Integer, GenPolynomial<C>> theList 
275                = new DistHashTable<Integer, GenPolynomial<C>>("localhost", DL_PORT);
276            List<GenPolynomial<C>> al = pairlist.getList();
277            for (int i = 0; i < al.size(); i++) {
278                // no wait required
279                GenPolynomial<C> nn = theList.put(new Integer(i), al.get(i));
280                if (nn != null) {
281                    logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
282                }
283            }
284    
285            Terminator finner = new Terminator(threads*threadsPerNode);
286            HybridReducerServer<C> R;
287            logger.info("using pool = " + pool);
288            for (int i = 0; i < threads; i++) {
289                R = new HybridReducerServer<C>(threadsPerNode, finner, cf, theList, pairlist);
290                pool.addJob(R);
291                //logger.info("server submitted " + R);
292            }
293            logger.info("main loop waiting " + finner);
294            finner.waitDone();
295            int ps = theList.size();
296            logger.info("#distributed list = " + ps);
297            // make sure all polynomials arrived: not needed in master
298            // G = (ArrayList)theList.values();
299            G = pairlist.getList();
300            if (ps != G.size()) {
301                logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
302            }
303            for (GenPolynomial<C> q: theList.getValueList()) {
304                if ( q != null && !q.isZERO() ) {
305                   logger.debug("final q = " + q.leadingExpVector());
306                }
307            }
308            logger.debug("distributed list end");
309            long time = System.currentTimeMillis();
310            List<GenPolynomial<C>> Gp;
311            Gp = minimalGB(G); // not jet distributed but threaded
312            time = System.currentTimeMillis() - time;
313            logger.info("parallel gbmi time = " + time);
314            G = Gp;
315            logger.debug("server cf.terminate()");
316            cf.terminate();
317            // no more required // 
318            logger.info("server not pool.terminate() " + pool);
319            //pool.terminate();
320            logger.info("server theList.terminate() " + theList.size());
321            theList.terminate();
322            logger.info("server dls.terminate() " + dls);
323            dls.terminate();
324            t = System.currentTimeMillis() - t;
325            logger.info("server GB end, time = " + t + ", " + pairlist.toString());
326            return G;
327        }
328    
329    
330        /**
331         * GB distributed client.
332         * @param host the server runs on.
333         * @throws IOException
334         */
335        public void clientPart(String host) throws IOException {
336    
337            ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
338            cf.init();
339            SocketChannel channel = cf.getChannel(host, port);
340            TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
341            pairChannel.init();
342    
343            if (debug) {
344                logger.info("clientPart pairChannel   = " + pairChannel);
345            }
346    
347            final int DL_PORT = port + 100;
348            DistHashTable<Integer, GenPolynomial<C>> theList 
349                 = new DistHashTable<Integer, GenPolynomial<C>>(host, DL_PORT);
350    
351            //HybridReducerClient<C> R = new HybridReducerClient<C>(threadsPerNode, pairChannel, theList);
352            //R.run();
353    
354            ThreadPool pool = new ThreadPool(threadsPerNode);
355            logger.info("client using pool = " +pool);
356            for (int i = 0; i < threadsPerNode; i++) {
357                HybridReducerClient<C> Rr = new HybridReducerClient<C>(threadsPerNode, pairChannel, i, theList);
358                pool.addJob(Rr);
359            }
360            if (debug) {
361                logger.info("clients submitted");
362            }
363            pool.terminate();
364            logger.info("client pool.terminate()");
365    
366            pairChannel.close();
367            logger.info("client pairChannel.close()");
368    
369            theList.terminate();
370            cf.terminate();
371            logger.info("client cf.terminate()");
372    
373            channel.close();
374            logger.info("client channel.close()");
375            return;
376        }
377    
378    
379        /**
380         * Minimal ordered groebner basis.
381         * @param Fp a Groebner base.
382         * @return a reduced Groebner base of Fp.
383         */
384        @Override
385        public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
386            GenPolynomial<C> a;
387            ArrayList<GenPolynomial<C>> G;
388            G = new ArrayList<GenPolynomial<C>>(Fp.size());
389            ListIterator<GenPolynomial<C>> it = Fp.listIterator();
390            while (it.hasNext()) {
391                a = it.next();
392                if (a.length() != 0) { // always true
393                    // already monic  a = a.monic();
394                    G.add(a);
395                }
396            }
397            if (G.size() <= 1) {
398                return G;
399            }
400    
401            ExpVector e;
402            ExpVector f;
403            GenPolynomial<C> p;
404            ArrayList<GenPolynomial<C>> F;
405            F = new ArrayList<GenPolynomial<C>>(G.size());
406            boolean mt;
407    
408            while (G.size() > 0) {
409                a = G.remove(0);
410                e = a.leadingExpVector();
411    
412                it = G.listIterator();
413                mt = false;
414                while (it.hasNext() && !mt) {
415                    p = it.next();
416                    f = p.leadingExpVector();
417                    mt = e.multipleOf(f);
418                }
419                it = F.listIterator();
420                while (it.hasNext() && !mt) {
421                    p = it.next();
422                    f = p.leadingExpVector();
423                    mt = e.multipleOf(f);
424                }
425                if (!mt) {
426                    F.add(a);
427                } else {
428                    // System.out.println("dropped " + a.length());
429                }
430            }
431            G = F;
432            if (G.size() <= 1) {
433                return G;
434            }
435            Collections.reverse(G); // important for lex GB
436    
437            MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
438            int i = 0;
439            F = new ArrayList<GenPolynomial<C>>(G.size());
440            while (G.size() > 0) {
441                a = G.remove(0);
442                // System.out.println("doing " + a.length());
443                List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size()+F.size());
444                R.addAll(G);
445                R.addAll(F);
446                mirs[i] = new MiReducerServer<C>(R, a);
447                pool.addJob(mirs[i]);
448                i++;
449                F.add(a);
450            }
451            G = F;
452            F = new ArrayList<GenPolynomial<C>>(G.size());
453            for (i = 0; i < mirs.length; i++) {
454                a = mirs[i].getNF();
455                F.add(a);
456            }
457            return F;
458        }
459    
460    }
461    
462    
463    /**
464     * Distributed server reducing worker proxy threads.
465     * @param <C> coefficient type
466     */
467    
468    class HybridReducerServer<C extends RingElem<C>> implements Runnable {
469    
470    
471        public static final Logger logger = Logger.getLogger(HybridReducerServer.class);
472    
473    
474        public final boolean debug = logger.isDebugEnabled();
475    
476    
477        private final Terminator finner;
478    
479    
480        private final ChannelFactory cf;
481    
482    
483        private TaggedSocketChannel pairChannel;
484    
485    
486        private final DistHashTable<Integer, GenPolynomial<C>> theList;
487    
488    
489        private final PairList<C> pairlist;
490    
491    
492        private final int threadsPerNode;
493    
494    
495        /**
496         * Message tag for pairs.
497         */
498        public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
499    
500    
501        /**
502         * Message tag for results.
503         */
504        public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
505    
506    
507        /**
508         * Message tag for acknowledgments.
509         */
510        public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
511    
512    
513        /**
514         * Constructor.
515         * @param tpn number of threads per node
516         * @param fin terminator
517         * @param cf channel factory
518         * @param dl distributed hash table
519         * @param L ordered pair list
520         */
521        HybridReducerServer(int tpn, Terminator fin, ChannelFactory cf, 
522                            DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
523            threadsPerNode = tpn;
524            finner = fin;
525            this.cf = cf;
526            theList = dl;
527            pairlist = L;
528            //logger.info("reducer server created " + this);
529        }
530    
531    
532        /**
533         * Work loop.
534         * @see java.lang.Runnable#run()
535         */
536        //JAVA6only: @Override
537        public void run() {
538            logger.info("reducer server running with " + cf);
539            SocketChannel channel = null;
540            try {
541                channel = cf.getChannel();
542                pairChannel = new TaggedSocketChannel(channel);
543                pairChannel.init();
544            } catch (InterruptedException e) {
545                logger.debug("get pair channel interrupted");
546                e.printStackTrace();
547                return;
548            }
549            if (debug) {
550                logger.info("pairChannel   = " + pairChannel);
551            }
552            // record idle remote workers (minus one?)
553            //finner.beIdle(threadsPerNode-1);
554            finner.initIdle(threadsPerNode);
555            AtomicInteger active = new AtomicInteger(0);
556    
557            // start receiver
558            HybridReducerReceiver<C> receiver = new HybridReducerReceiver<C>(threadsPerNode, finner, active, pairChannel, theList, pairlist);
559            receiver.start();
560    
561            Pair<C> pair;
562            boolean set = false;
563            boolean goon = true;
564            int polIndex = -1;
565            int red = 0;
566            int sleeps = 0;
567    
568            // while more requests
569            while (goon) {
570                // receive request if thread is reported incactive
571                logger.debug("receive request");
572                Object req = null;
573                try {
574                    req = pairChannel.receive(pairTag);
575                } catch (InterruptedException e) {
576                    goon = false;
577                    e.printStackTrace();
578                } catch (IOException e) {
579                    goon = false;
580                    e.printStackTrace();
581                } catch (ClassNotFoundException e) {
582                    goon = false;
583                    e.printStackTrace();
584                }
585                logger.info("received request, req = " + req);
586                if (req == null) {
587                    goon = false;
588                    break;
589                }
590                if (!(req instanceof GBTransportMessReq)) {
591                    goon = false;
592                    break;
593                }
594    
595                // find pair and manage termination status
596                logger.info("find pair");
597                while (!pairlist.hasNext()) { // wait
598                    if (!finner.hasJobs() && !pairlist.hasNext()) {
599                        goon = false;
600                        break;
601                    }
602                    try {
603                        sleeps++;
604                        //if (sleeps % 10 == 0) {
605                        logger.info("waiting for reducers, remaining = " + finner.getJobs());
606                        //}
607                        Thread.sleep(100);
608                    } catch (InterruptedException e) {
609                        goon = false;
610                        break;
611                    }
612                }
613                if (!pairlist.hasNext() && !finner.hasJobs()) {
614                    logger.info("termination detection: no pairs and no jobs left");
615                    goon = false;
616                    break; //continue; //break?
617                }
618                finner.notIdle(); // before pairlist get!!
619                pair = pairlist.removeNext();
620                // send pair to client, even if null
621                if ( debug ) {
622                    logger.info("active count = " + active.get());
623                    logger.info("send pair = " + pair);
624                }
625                GBTransportMess msg = null;
626                if (pair != null) {
627                    msg = new GBTransportMessPairIndex(pair);
628                } else {
629                    msg = new GBTransportMess(); //not End(); at this time
630                    // goon ?= false;
631                }
632                try {
633                    red++;
634                    pairChannel.send(pairTag, msg);
635                    int a = active.getAndIncrement();
636                } catch (IOException e) {
637                    e.printStackTrace();
638                    goon = false;
639                    break;
640                }
641                //logger.debug("#distributed list = " + theList.size());
642            }
643            logger.info("terminated, send " + red + " reduction pairs");
644    
645            /*
646             * send end mark to clients
647             */
648            logger.debug("send end");
649            try {
650                for ( int i = 0; i < threadsPerNode; i++ ) { // -1
651                    //do not wait: Object rq = pairChannel.receive(pairTag);
652                    pairChannel.send(pairTag, new GBTransportMessEnd());
653                }
654                // send also end to receiver
655                pairChannel.send(resultTag, new GBTransportMessEnd());
656                //beware of race condition 
657            } catch (IOException e) {
658                if (logger.isDebugEnabled()) {
659                    e.printStackTrace();
660                }
661            }
662            receiver.terminate();
663    
664            int d = active.get();
665            logger.info("remaining active tasks = " + d);
666            //logger.info("terminated, send " + red + " reduction pairs");
667            pairChannel.close();
668            logger.info("redServ pairChannel.close()");
669            finner.release();
670    
671            channel.close();
672            logger.info("redServ channel.close()");
673        }
674    }
675    
676    
677    /**
678     * Distributed server receiving worker thread.
679     * @param <C> coefficient type
680     */
681    
682    class HybridReducerReceiver<C extends RingElem<C>> extends Thread {
683    
684    
685        public static final Logger logger = Logger.getLogger(HybridReducerReceiver.class);
686    
687    
688        public final boolean debug = logger.isDebugEnabled();
689    
690    
691        private final DistHashTable<Integer, GenPolynomial<C>> theList;
692    
693    
694        private final PairList<C> pairlist;
695    
696    
697        private final TaggedSocketChannel pairChannel;
698    
699    
700        private final Terminator finner;
701    
702    
703        private final int threadsPerNode;
704    
705    
706        private final AtomicInteger active;
707    
708    
709        private volatile boolean goon;
710    
711    
712        /**
713         * Message tag for pairs.
714         */
715        public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
716    
717    
718        /**
719         * Message tag for results.
720         */
721        public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
722    
723    
724        /**
725         * Message tag for acknowledgments.
726         */
727        public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
728    
729    
730        /**
731         * Constructor.
732         * @param tpn number of threads per node
733         * @param fin terminator
734         * @param a active remote tasks count
735         * @param pc tagged socket channel
736         * @param dl distributed hash table
737         * @param L ordered pair list
738         */
739        HybridReducerReceiver(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc, 
740                              DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
741            active = a;
742            threadsPerNode = tpn;
743            finner = fin;
744            pairChannel = pc;
745            theList = dl;
746            pairlist = L;
747            goon = true;
748            //logger.info("reducer server created " + this);
749        }
750    
751    
752        /**
753         * Work loop.
754         * @see java.lang.Thread#run()
755         */
756        @Override
757        public void run() {
758            //Pair<C> pair = null;
759            GenPolynomial<C> H = null;
760            int red = 0;
761            int polIndex = -1;
762            //Integer senderId; // obsolete
763    
764            // while more requests
765            while (goon) {
766                // receive request
767                logger.debug("receive result");
768                //senderId = null;
769                Object rh = null;
770                try {
771                    rh = pairChannel.receive(resultTag);
772                    int i = active.getAndDecrement();
773                } catch (InterruptedException e) {
774                    goon = false;
775                    //e.printStackTrace();
776                    //?? finner.initIdle(1);
777                    break;
778                } catch (IOException e) {
779                    e.printStackTrace();
780                    goon = false;
781                    finner.initIdle(1);
782                    break;
783                } catch (ClassNotFoundException e) {
784                    e.printStackTrace();
785                    goon = false;
786                    finner.initIdle(1);
787                    break;
788                }
789                logger.info("received H polynomial");
790                if (rh == null) {
791                    if (this.isInterrupted()) {
792                        goon = false;
793                        finner.initIdle(1);
794                        break;
795                    }
796                    //finner.initIdle(1);
797                } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
798                    logger.info("received GBTransportMessEnd");
799                    goon = false;
800                    //?? finner.initIdle(1);
801                    break;
802                } else if (rh instanceof GBTransportMessPoly) {
803                    // update pair list
804                    red++;
805                    GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
806                    H = mpi.pol;
807                    //senderId = mpi.threadId;
808                    if (H != null) {
809                        if (debug) {
810                           logger.info("H = " + H.leadingExpVector());
811                        }
812                        if (!H.isZERO()) {
813                            if (H.isONE()) {
814                                // finner.allIdle();
815                                polIndex = pairlist.putOne();
816                                GenPolynomial<C> nn = theList.put(new Integer(polIndex), H);
817                                if (nn != null) {
818                                    logger.info("double polynomials nn = " + nn + ", H = " + H);
819                                }
820                                //goon = false; must wait for other clients
821                                //finner.initIdle(1);
822                                //break;
823                            } else {
824                                polIndex = pairlist.put(H);
825                                // use putWait ? but still not all distributed
826                                GenPolynomial<C> nn = theList.put(new Integer(polIndex), H);
827                                if (nn != null) {
828                                    logger.info("double polynomials nn = " + nn + ", H = " + H);
829                                }
830                            }
831                        }
832                    }
833                }
834                // only after recording in pairlist !
835                finner.initIdle(1);
836                // if ( senderId != null ) { // send acknowledgement after recording
837                    try { 
838                        //pairChannel.send(senderId, new GBTransportMess());
839                        pairChannel.send(ackTag, new GBTransportMess());
840                        logger.debug("send acknowledgement");
841                    } catch (IOException e) {
842                        e.printStackTrace();
843                        goon = false;
844                        break;
845                    }
846                //}
847            } // end while
848            goon = false;
849            logger.info("terminated, received " + red + " reductions");
850        }
851    
852    
853        /**
854         * Terminate.
855         */
856        public void terminate() {
857            goon = false;
858            this.interrupt();
859            try {
860                this.join();
861            } catch (InterruptedException e) {
862                // unfug Thread.currentThread().interrupt();
863            }
864            logger.debug("HybridReducerReceiver terminated");
865        }
866    
867    }
868    
869    
870    /**
871     * Distributed clients reducing worker threads.
872     */
873    
874    class HybridReducerClient<C extends RingElem<C>> implements Runnable {
875    
876    
877        private static final Logger logger = Logger.getLogger(HybridReducerClient.class);
878    
879    
880        public final boolean debug = logger.isDebugEnabled();
881    
882    
883        private final TaggedSocketChannel pairChannel;
884    
885    
886        private final DistHashTable<Integer, GenPolynomial<C>> theList;
887    
888    
889        private final ReductionPar<C> red;
890    
891    
892        private final int threadsPerNode;
893    
894    
895        /*
896         * Identification number for this thread.
897         */
898        //public final Integer threadId; // obsolete
899    
900    
901        /**
902         * Message tag for pairs.
903         */
904        public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
905    
906    
907        /**
908         * Message tag for results.
909         */
910        public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
911    
912    
913        /**
914         * Message tag for acknowledgments.
915         */
916        public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
917    
918    
919        /**
920         * Constructor.
921         * @param tpn number of threads per node
922         * @param tc tagged socket channel
923         * @param tid thread identification
924         * @param dl distributed hash table
925         */
926        HybridReducerClient(int tpn, TaggedSocketChannel tc, Integer tid, DistHashTable<Integer, GenPolynomial<C>> dl) {
927            this.threadsPerNode = tpn;
928            pairChannel = tc;
929            //threadId = 100 + tid; // keep distinct from other tags
930            theList = dl;
931            red = new ReductionPar<C>();
932        }
933    
934    
935        /**
936         * Work loop.
937         * @see java.lang.Runnable#run()
938         */
939        //JAVA6only: @Override
940        public void run() {
941            if (debug) {
942                logger.info("pairChannel   = " + pairChannel + " reducer client running");
943            }
944            Pair<C> pair = null;
945            GenPolynomial<C> pi;
946            GenPolynomial<C> pj;
947            GenPolynomial<C> S;
948            GenPolynomial<C> H = null;
949            //boolean set = false;
950            boolean goon = true;
951            boolean doEnd = false;
952            int reduction = 0;
953            //int sleeps = 0;
954            Integer pix;
955            Integer pjx;
956    
957            while (goon) {
958                /* protocol:
959                 * request pair, process pair, send result, receive acknowledgment
960                 */
961                // pair = (Pair) pairlist.removeNext();
962                Object req = new GBTransportMessReq();
963                logger.info("send request = " + req);
964                try {
965                    pairChannel.send(pairTag, req);
966                } catch (IOException e) {
967                    goon = false;
968                    if ( logger.isDebugEnabled() ) {
969                        e.printStackTrace();
970                    }
971                    logger.info("receive pair, exception ");
972                    break;
973                }
974                logger.debug("receive pair, goon = " + goon);
975                doEnd = false;
976                Object pp = null;
977                try {
978                    pp = pairChannel.receive(pairTag);
979                } catch (InterruptedException e) {
980                    goon = false;
981                    e.printStackTrace();
982                } catch (IOException e) {
983                    goon = false;
984                    if (logger.isDebugEnabled()) {
985                        e.printStackTrace();
986                    }
987                    break;
988                } catch (ClassNotFoundException e) {
989                    goon = false;
990                    e.printStackTrace();
991                }
992                if (debug) {
993                    logger.info("received pair = " + pp);
994                }
995                H = null;
996                if (pp == null) { // should not happen
997                    continue;
998                }
999                if (pp instanceof GBTransportMessEnd) {
1000                    goon = false;
1001                    doEnd = true;
1002                    continue;
1003                }
1004                if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1005                    pi = pj = null;
1006                    if (pp instanceof GBTransportMessPair) {
1007                        pair = ((GBTransportMessPair<C>) pp).pair;
1008                        if (pair != null) {
1009                            pi = pair.pi;
1010                            pj = pair.pj;
1011                            //logger.debug("pair: pix = " + pair.i 
1012                            //               + ", pjx = " + pair.j);
1013                        }
1014                    }
1015                    if (pp instanceof GBTransportMessPairIndex) {
1016                        pix = ((GBTransportMessPairIndex) pp).i;
1017                        pjx = ((GBTransportMessPairIndex) pp).j;
1018                        pi = (GenPolynomial<C>) theList.getWait(pix);
1019                        pj = (GenPolynomial<C>) theList.getWait(pjx);
1020                        //logger.info("pix = " + pix + ", pjx = " +pjx);
1021                    }
1022    
1023                    if (pi != null && pj != null) {
1024                        S = red.SPolynomial(pi, pj);
1025                        //System.out.println("S   = " + S);
1026                        if (S.isZERO()) {
1027                            // pair.setZero(); does not work in dist
1028                        } else {
1029                            if (logger.isDebugEnabled()) {
1030                                logger.debug("ht(S) = " + S.leadingExpVector());
1031                            }
1032                            H = red.normalform(theList, S);
1033                            reduction++;
1034                            if (H.isZERO()) {
1035                                // pair.setZero(); does not work in dist
1036                            } else {
1037                                H = H.monic();
1038                                if (logger.isInfoEnabled()) {
1039                                    logger.info("ht(H) = " + H.leadingExpVector());
1040                                }
1041                            }
1042                        }
1043                    }
1044                }
1045                if (pp instanceof GBTransportMess) {
1046                    logger.debug("null pair results in null H poly");
1047                }
1048    
1049                // send H or must send null, if not at end
1050                if (logger.isDebugEnabled()) {
1051                    logger.debug("#distributed list = " + theList.size());
1052                    logger.debug("send H polynomial = " + H);
1053                }
1054                try {
1055                    pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1056                    doEnd = true;
1057                } catch (IOException e) {
1058                    goon = false;
1059                    e.printStackTrace();
1060                }
1061                logger.info("done send poly message of " + pp);
1062                try {
1063                    //pp = pairChannel.receive(threadId);
1064                    pp = pairChannel.receive(ackTag);
1065                } catch (InterruptedException e) {
1066                    goon = false;
1067                    e.printStackTrace();
1068                } catch (IOException e) {
1069                    goon = false;
1070                    if (logger.isDebugEnabled()) {
1071                        e.printStackTrace();
1072                    }
1073                    break;
1074                } catch (ClassNotFoundException e) {
1075                    goon = false;
1076                    e.printStackTrace();
1077                }
1078                if ( ! (pp instanceof GBTransportMess) ) {
1079                    logger.error("invalid acknowledgement " + pp);
1080                }
1081                logger.info("received acknowledgment " + pp);
1082            }
1083            logger.info("terminated, done " + reduction + " reductions");
1084            if ( !doEnd ) {
1085                try {
1086                    pairChannel.send(resultTag, new GBTransportMessEnd());
1087                } catch (IOException e) {
1088                    //e.printStackTrace();
1089                }
1090                logger.info("terminated, send done");
1091            }
1092        }
1093    }