001/*
002 * $Id: GroebnerBaseDistributedHybridEC.java 5264 2015-07-27 14:19:57Z kredel $
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.atomic.AtomicInteger;
014
015import org.apache.log4j.Logger;
016
017import edu.jas.poly.ExpVector;
018import edu.jas.poly.GenPolynomial;
019import edu.jas.poly.GenPolynomialRing;
020import edu.jas.poly.PolyUtil;
021import edu.jas.structure.RingElem;
022import edu.jas.util.ChannelFactory;
023import edu.jas.util.DistHashTable;
024import edu.jas.util.DistHashTableServer;
025import edu.jas.util.DistThreadPool;
026import edu.jas.util.RemoteExecutable;
027import edu.jas.util.SocketChannel;
028import edu.jas.util.TaggedSocketChannel;
029import edu.jas.util.Terminator;
030import edu.jas.util.ThreadPool;
031
032
033/**
034 * Groebner Base distributed hybrid algorithm. Implements a distributed memory
035 * with multi-core CPUs parallel version of Groebner bases with executable
036 * channels. Using pairlist class, distributed multi-threaded tasks do
037 * reduction, one communication channel per remote node.
038 * @param <C> coefficient type
039 * @author Heinz Kredel
040 */
041
042public class GroebnerBaseDistributedHybridEC<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
043
044
045    public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybridEC.class);
046
047
048    public final boolean debug = logger.isDebugEnabled();
049
050
051    /**
052     * Number of threads to use.
053     */
054    protected final int threads;
055
056
057    /**
058     * Default number of threads.
059     */
060    protected static final int DEFAULT_THREADS = 2;
061
062
063    /**
064     * Number of threads per node to use.
065     */
066    protected final int threadsPerNode;
067
068
069    /**
070     * Default number of threads per compute node.
071     */
072    protected static final int DEFAULT_THREADS_PER_NODE = 1;
073
074
075    /**
076     * Pool of threads to use.
077     */
078    //protected final ExecutorService pool; // not for single node tests
079    protected transient final ThreadPool pool;
080
081
082    /**
083     * Default server port.
084     */
085    protected static final int DEFAULT_PORT = 55711;
086
087
088    /**
089     * Default distributed hash table server port.
090     */
091    protected final int DHT_PORT;
092
093
094    /**
095     * machine file to use.
096     */
097    protected final String mfile;
098
099
100    /**
101     * Server port to use.
102     */
103    protected final int port;
104
105
106    /**
107     * Distributed thread pool to use.
108     */
109    private final transient DistThreadPool dtp;
110
111
112    /**
113     * Distributed hash table server to use.
114     */
115    private final transient DistHashTableServer<Integer> dhts;
116
117
118    /**
119     * Message tag for pairs.
120     */
121    public static final Integer pairTag = Integer.valueOf(1);
122
123
124    /**
125     * Message tag for results.
126     */
127    public static final Integer resultTag = Integer.valueOf(2);
128
129
130    /**
131     * Message tag for acknowledgments.
132     */
133    public static final Integer ackTag = Integer.valueOf(3);
134
135
136    /**
137     * Constructor.
138     * @param mfile name of the machine file.
139     */
140    public GroebnerBaseDistributedHybridEC(String mfile) {
141        this(mfile, DEFAULT_THREADS, DEFAULT_PORT);
142    }
143
144
145    /**
146     * Constructor.
147     * @param mfile name of the machine file.
148     * @param threads number of threads to use.
149     */
150    public GroebnerBaseDistributedHybridEC(String mfile, int threads) {
151        this(mfile, threads, new ThreadPool(threads), DEFAULT_PORT);
152    }
153
154
155    /**
156     * Constructor.
157     * @param mfile name of the machine file.
158     * @param threads number of threads to use.
159     * @param port server port to use.
160     */
161    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int port) {
162        this(mfile, threads, new ThreadPool(threads), port);
163    }
164
165
166    /**
167     * Constructor.
168     * @param mfile name of the machine file.
169     * @param threads number of threads to use.
170     * @param threadsPerNode threads per node to use.
171     * @param port server port to use.
172     */
173    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, int port) {
174        this(mfile, threads, threadsPerNode, new ThreadPool(threads), port);
175    }
176
177
178    /**
179     * Constructor.
180     * @param mfile name of the machine file.
181     * @param threads number of threads to use.
182     * @param pool ThreadPool to use.
183     * @param port server port to use.
184     */
185    public GroebnerBaseDistributedHybridEC(String mfile, int threads, ThreadPool pool, int port) {
186        this(mfile, threads, DEFAULT_THREADS_PER_NODE, pool, port);
187    }
188
189
190    /**
191     * Constructor.
192     * @param mfile name of the machine file.
193     * @param threads number of threads to use.
194     * @param threadsPerNode threads per node to use.
195     * @param pl pair selection strategy
196     * @param port server port to use.
197     */
198    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, PairList<C> pl,
199                    int port) {
200        this(mfile, threads, threadsPerNode, new ThreadPool(threads), pl, port);
201    }
202
203
204    /**
205     * Constructor.
206     * @param mfile name of the machine file.
207     * @param threads number of threads to use.
208     * @param threadsPerNode threads per node to use.
209     * @param port server port to use.
210     */
211    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ThreadPool pool,
212                    int port) {
213        this(mfile, threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
214    }
215
216
217    /**
218     * Constructor.
219     * @param mfile name of the machine file.
220     * @param threads number of threads to use.
221     * @param threadsPerNode threads per node to use.
222     * @param pool ThreadPool to use.
223     * @param pl pair selection strategy
224     * @param port server port to use.
225     */
226    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ThreadPool pool,
227                    PairList<C> pl, int port) {
228        super(new ReductionPar<C>(), pl);
229        this.threads = threads;
230        if (mfile == null || mfile.length() == 0) {
231            this.mfile = "../util/machines"; // contains localhost
232        } else {
233            this.mfile = mfile;
234        }
235        if (threads < 1) {
236            threads = 1;
237        }
238        this.threadsPerNode = threadsPerNode;
239        if (pool == null) {
240            pool = new ThreadPool(threads);
241        }
242        this.pool = pool;
243        this.port = port;
244        logger.info("machine file " + mfile + ", port = " + port);
245        this.dtp = new DistThreadPool(this.threads, this.mfile);
246        logger.info("running " + dtp);
247        this.DHT_PORT = this.dtp.getEC().getMasterPort() + 100;
248        this.dhts = new DistHashTableServer<Integer>(this.DHT_PORT);
249        this.dhts.init();
250        logger.info("running " + dhts);
251    }
252
253
254    /**
255     * Cleanup and terminate ThreadPool.
256     */
257    @Override
258    public void terminate() {
259        terminate(true);
260    }
261
262
263    /**
264     * Terminates the distributed thread pools.
265     * @param shutDown true, if shut-down of the remote executable servers is
266     *            requested, false, if remote executable servers stay alive.
267     */
268    public void terminate(boolean shutDown) {
269        pool.terminate();
270        dtp.terminate(shutDown);
271        logger.debug("dhts.terminate()");
272        dhts.terminate();
273    }
274
275
276    /**
277     * Distributed Groebner base.
278     * @param modv number of module variables.
279     * @param F polynomial list.
280     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
281     */
282    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
283        List<GenPolynomial<C>> Fp = normalizeZerosOnes(F);
284        Fp = PolyUtil.<C> monic(Fp);
285        if (Fp.size() <= 1) {
286            return Fp;
287        }
288        if (!Fp.get(0).ring.coFac.isField()) {
289            throw new IllegalArgumentException("coefficients not from a field");
290        }
291
292        String master = dtp.getEC().getMasterHost();
293        //int port = dtp.getEC().getMasterPort(); // wrong port
294        GBHybridExerClient<C> gbc = new GBHybridExerClient<C>(master, threadsPerNode, port, DHT_PORT);
295        for (int i = 0; i < threads; i++) {
296            // schedule remote clients
297            dtp.addJob(gbc);
298        }
299        // run master
300        List<GenPolynomial<C>> G = GBMaster(modv, Fp);
301        return G;
302    }
303
304
305    /**
306     * Distributed hybrid Groebner base.
307     * @param modv number of module variables.
308     * @param F non empty monic polynomial list without zeros.
309     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
310     */
311    List<GenPolynomial<C>> GBMaster(int modv, List<GenPolynomial<C>> F) {
312        long t = System.currentTimeMillis();
313        ChannelFactory cf = new ChannelFactory(port);
314        cf.init();
315
316        List<GenPolynomial<C>> G = F;
317        if (G.isEmpty()) {
318            throw new IllegalArgumentException("empty polynomial list not allowed");
319        }
320        GenPolynomialRing<C> ring = G.get(0).ring;
321        PairList<C> pairlist = strategy.create(modv, ring);
322        pairlist.put(G);
323
324        /*
325        GenPolynomial<C> p;
326        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
327        PairList<C> pairlist = null;
328        boolean oneInGB = false;
329        int l = F.size();
330        int unused;
331        ListIterator<GenPolynomial<C>> it = F.listIterator();
332        while (it.hasNext()) {
333            p = it.next();
334            if (p.length() > 0) {
335                p = p.monic();
336                if (p.isONE()) {
337                    oneInGB = true;
338                    G.clear();
339                    G.add(p);
340                    //return G; must signal termination to others
341                }
342                if (!oneInGB) {
343                    G.add(p);
344                }
345                if (pairlist == null) {
346                    //pairlist = new OrderedPairlist<C>(modv, p.ring);
347                    pairlist = strategy.create(modv, p.ring);
348                    if (!p.ring.coFac.isField()) {
349                        throw new IllegalArgumentException("coefficients not from a field");
350                    }
351                }
352                // theList not updated here
353                if (p.isONE()) {
354                    unused = pairlist.putOne();
355                } else {
356                    unused = pairlist.put(p);
357                }
358            } else {
359                l--;
360            }
361        }
362        //if (l <= 1) {
363        //return G; must signal termination to others
364        //}
365        */
366        logger.info("start " + pairlist);
367        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(
368                        "localhost", DHT_PORT);
369        theList.init();
370        List<GenPolynomial<C>> al = pairlist.getList();
371        for (int i = 0; i < al.size(); i++) {
372            // no wait required
373            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
374            if (nn != null) {
375                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
376            }
377        }
378
379        Terminator finner = new Terminator(threads * threadsPerNode);
380        HybridReducerServerEC<C> R;
381        logger.info("using pool = " + pool);
382        for (int i = 0; i < threads; i++) {
383            R = new HybridReducerServerEC<C>(threadsPerNode, finner, cf, theList, pairlist);
384            pool.addJob(R);
385            //logger.info("server submitted " + R);
386        }
387        logger.info("main loop waiting " + finner);
388        finner.waitDone();
389        int ps = theList.size();
390        logger.info("#distributed list = " + ps);
391        // make sure all polynomials arrived: not needed in master
392        // G = (ArrayList)theList.values();
393        G = pairlist.getList();
394        if (ps != G.size()) {
395            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
396        }
397        for (GenPolynomial<C> q : theList.getValueList()) {
398            if (debug && q != null && !q.isZERO()) {
399                logger.debug("final q = " + q.leadingExpVector());
400            }
401        }
402        logger.debug("distributed list end");
403        long time = System.currentTimeMillis();
404        List<GenPolynomial<C>> Gp;
405        Gp = minimalGB(G); // not jet distributed but threaded
406        time = System.currentTimeMillis() - time;
407        logger.debug("parallel gbmi time = " + time);
408        G = Gp;
409        logger.debug("server cf.terminate()");
410        cf.terminate();
411        logger.debug("server theList.terminate() " + theList.size());
412        theList.clear();
413        theList.terminate();
414        t = System.currentTimeMillis() - t;
415        logger.info("server GB end, time = " + t + ", " + pairlist.toString());
416        return G;
417    }
418
419
420    /**
421     * GB distributed client part.
422     * @param host the server runs on.
423     * @param port the server runs.
424     * @param dhtport of the DHT server.
425     * @throws IOException
426     */
427    public static <C extends RingElem<C>> void clientPart(String host, int threadsPerNode, int port,
428                    int dhtport) throws IOException {
429        ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
430        cf.init();
431        logger.info("clientPart connecting to " + host + ", port = " + port + ", dhtport = " + dhtport);
432        SocketChannel channel = cf.getChannel(host, port);
433        TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
434        pairChannel.init();
435
436        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host,
437                        dhtport);
438        theList.init();
439
440        ThreadPool pool = new ThreadPool(threadsPerNode);
441        logger.info("client using pool = " + pool);
442        for (int i = 0; i < threadsPerNode; i++) {
443            HybridReducerClientEC<C> Rr = new HybridReducerClientEC<C>(/*threadsPerNode,*/pairChannel, /*i,*/
444            theList);
445            pool.addJob(Rr);
446        }
447        logger.debug("clients submitted");
448
449        pool.terminate();
450        logger.debug("client pool.terminate()");
451
452        pairChannel.close();
453        logger.debug("client pairChannel.close()");
454
455        //master only: theList.clear();
456        theList.terminate();
457        cf.terminate();
458        logger.info("client cf.terminate()");
459
460        channel.close();
461        logger.info("client channel.close()");
462        return;
463    }
464
465
466    /**
467     * Minimal ordered groebner basis.
468     * @param Fp a Groebner base.
469     * @return a reduced Groebner base of Fp.
470     */
471    @Override
472    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
473        GenPolynomial<C> a;
474        ArrayList<GenPolynomial<C>> G;
475        G = new ArrayList<GenPolynomial<C>>(Fp.size());
476        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
477        while (it.hasNext()) {
478            a = it.next();
479            if (a.length() != 0) { // always true
480                // already monic  a = a.monic();
481                G.add(a);
482            }
483        }
484        if (G.size() <= 1) {
485            return G;
486        }
487
488        ExpVector e;
489        ExpVector f;
490        GenPolynomial<C> p;
491        ArrayList<GenPolynomial<C>> F;
492        F = new ArrayList<GenPolynomial<C>>(G.size());
493        boolean mt;
494
495        while (G.size() > 0) {
496            a = G.remove(0);
497            e = a.leadingExpVector();
498
499            it = G.listIterator();
500            mt = false;
501            while (it.hasNext() && !mt) {
502                p = it.next();
503                f = p.leadingExpVector();
504                mt = e.multipleOf(f);
505            }
506            it = F.listIterator();
507            while (it.hasNext() && !mt) {
508                p = it.next();
509                f = p.leadingExpVector();
510                mt = e.multipleOf(f);
511            }
512            if (!mt) {
513                F.add(a);
514            } else {
515                // System.out.println("dropped " + a.length());
516            }
517        }
518        G = F;
519        if (G.size() <= 1) {
520            return G;
521        }
522        Collections.reverse(G); // important for lex GB
523
524        @SuppressWarnings("cast")
525        MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
526        int i = 0;
527        F = new ArrayList<GenPolynomial<C>>(G.size());
528        while (G.size() > 0) {
529            a = G.remove(0);
530            // System.out.println("doing " + a.length());
531            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
532            R.addAll(G);
533            R.addAll(F);
534            mirs[i] = new MiReducerServer<C>(R, a);
535            pool.addJob(mirs[i]);
536            i++;
537            F.add(a);
538        }
539        G = F;
540        F = new ArrayList<GenPolynomial<C>>(G.size());
541        for (i = 0; i < mirs.length; i++) {
542            a = mirs[i].getNF();
543            F.add(a);
544        }
545        return F;
546    }
547
548}
549
550
551/**
552 * Distributed server reducing worker proxy threads.
553 * @param <C> coefficient type
554 */
555class HybridReducerServerEC<C extends RingElem<C>> implements Runnable {
556
557
558    public static final Logger logger = Logger.getLogger(HybridReducerServerEC.class);
559
560
561    public final boolean debug = logger.isDebugEnabled();
562
563
564    private final Terminator finner;
565
566
567    private final ChannelFactory cf;
568
569
570    private TaggedSocketChannel pairChannel;
571
572
573    private final DistHashTable<Integer, GenPolynomial<C>> theList;
574
575
576    private final PairList<C> pairlist;
577
578
579    private final int threadsPerNode;
580
581
582    /**
583     * Message tag for pairs.
584     */
585    public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag;
586
587
588    /**
589     * Message tag for results.
590     */
591    public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag;
592
593
594    /**
595     * Message tag for acknowledgments.
596     */
597    public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag;
598
599
600    /**
601     * Constructor.
602     * @param tpn number of threads per node
603     * @param fin terminator
604     * @param cf channel factory
605     * @param dl distributed hash table
606     * @param L ordered pair list
607     */
608    HybridReducerServerEC(int tpn, Terminator fin, ChannelFactory cf,
609                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
610        threadsPerNode = tpn;
611        finner = fin;
612        this.cf = cf;
613        theList = dl;
614        pairlist = L;
615        //logger.info("reducer server created " + this);
616    }
617
618
619    /**
620     * Work loop.
621     * @see java.lang.Runnable#run()
622     */
623    @Override
624    public void run() {
625        logger.info("reducer server running with " + cf);
626        SocketChannel channel = null;
627        try {
628            channel = cf.getChannel();
629            pairChannel = new TaggedSocketChannel(channel);
630            pairChannel.init();
631        } catch (InterruptedException e) {
632            logger.debug("get pair channel interrupted");
633            e.printStackTrace();
634            return;
635        }
636        if (debug) {
637            logger.info("pairChannel   = " + pairChannel);
638        }
639        // record idle remote workers (minus one?)
640        //finner.beIdle(threadsPerNode-1);
641        finner.initIdle(threadsPerNode);
642        AtomicInteger active = new AtomicInteger(0);
643
644        // start receiver
645        HybridReducerReceiverEC<C> receiver = new HybridReducerReceiverEC<C>(/*threadsPerNode,*/finner,
646                        active, pairChannel, theList, pairlist);
647        receiver.start();
648
649        Pair<C> pair;
650        //boolean set = false;
651        boolean goon = true;
652        //int polIndex = -1;
653        int red = 0;
654        int sleeps = 0;
655
656        // while more requests
657        while (goon) {
658            // receive request if thread is reported incactive
659            logger.debug("receive request");
660            Object req = null;
661            try {
662                req = pairChannel.receive(pairTag);
663            } catch (InterruptedException e) {
664                goon = false;
665                e.printStackTrace();
666            } catch (IOException e) {
667                goon = false;
668                e.printStackTrace();
669            } catch (ClassNotFoundException e) {
670                goon = false;
671                e.printStackTrace();
672            }
673            //logger.info("received request, req = " + req);
674            if (req == null) {
675                goon = false;
676                break;
677            }
678            if (!(req instanceof GBTransportMessReq)) {
679                goon = false;
680                break;
681            }
682
683            // find pair and manage termination status
684            logger.debug("find pair");
685            while (!pairlist.hasNext()) { // wait
686                if (!finner.hasJobs() && !pairlist.hasNext()) {
687                    goon = false;
688                    break;
689                }
690                try {
691                    sleeps++;
692                    if (sleeps % 3 == 0) {
693                        logger.info("waiting for reducers, remaining = " + finner);
694                    }
695                    Thread.sleep(100);
696                } catch (InterruptedException e) {
697                    goon = false;
698                    break;
699                }
700            }
701            if (Thread.currentThread().isInterrupted()) {
702                goon = false;
703                break;
704            }
705            if (!pairlist.hasNext() && !finner.hasJobs()) {
706                logger.info("termination detection: no pairs and no jobs left");
707                goon = false;
708                break; //continue; //break?
709            }
710            finner.notIdle(); // before pairlist get!!
711            pair = pairlist.removeNext();
712            // send pair to client, even if null
713            if (debug) {
714                logger.info("active count = " + active.get());
715                logger.info("send pair = " + pair);
716            }
717            GBTransportMess msg = null;
718            if (pair != null) {
719                msg = new GBTransportMessPairIndex(pair); //,pairlist.size()-1); // size-1
720            } else {
721                msg = new GBTransportMess(); // not End(); at this time
722                // goon ?= false;
723            }
724            try {
725                red++;
726                pairChannel.send(pairTag, msg);
727                @SuppressWarnings("unused")
728                int a = active.getAndIncrement();
729            } catch (IOException e) {
730                e.printStackTrace();
731                goon = false;
732                break;
733            }
734            //logger.debug("#distributed list = " + theList.size());
735        }
736        logger.info("terminated, send " + red + " reduction pairs");
737
738        /*
739         * send end mark to clients
740         */
741        logger.debug("send end");
742        try {
743            for (int i = 0; i < threadsPerNode; i++) { // -1
744                //do not wait: Object rq = pairChannel.receive(pairTag);
745                pairChannel.send(pairTag, new GBTransportMessEnd());
746            }
747            // send also end to receiver
748            pairChannel.send(resultTag, new GBTransportMessEnd());
749            //beware of race condition 
750        } catch (IOException e) {
751            if (logger.isDebugEnabled()) {
752                e.printStackTrace();
753            }
754        }
755        receiver.terminate();
756
757        int d = active.get();
758        if (d > 0) {
759            logger.info("remaining active tasks = " + d);
760        }
761        //logger.info("terminated, send " + red + " reduction pairs");
762        pairChannel.close();
763        logger.debug("redServ pairChannel.close()");
764        finner.release();
765
766        channel.close();
767        logger.info("redServ channel.close()");
768    }
769}
770
771
772/**
773 * Distributed server receiving worker thread.
774 * @param <C> coefficient type
775 */
776class HybridReducerReceiverEC<C extends RingElem<C>> extends Thread {
777
778
779    public static final Logger logger = Logger.getLogger(HybridReducerReceiverEC.class);
780
781
782    public final boolean debug = logger.isDebugEnabled();
783
784
785    private final DistHashTable<Integer, GenPolynomial<C>> theList;
786
787
788    private final PairList<C> pairlist;
789
790
791    private final TaggedSocketChannel pairChannel;
792
793
794    private final Terminator finner;
795
796
797    //private final int threadsPerNode;
798
799
800    private final AtomicInteger active;
801
802
803    private volatile boolean goon;
804
805
806    /**
807     * Message tag for pairs.
808     */
809    public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag;
810
811
812    /**
813     * Message tag for results.
814     */
815    public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag;
816
817
818    /**
819     * Message tag for acknowledgments.
820     */
821    public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag;
822
823
824    /**
825     * Constructor.
826     * @param fin terminator
827     * @param a active remote tasks count
828     * @param pc tagged socket channel
829     * @param dl distributed hash table
830     * @param L ordered pair list
831     */
832    //param tpn number of threads per node
833    HybridReducerReceiverEC(/*int tpn,*/Terminator fin, AtomicInteger a, TaggedSocketChannel pc,
834                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
835        active = a;
836        //threadsPerNode = tpn;
837        finner = fin;
838        pairChannel = pc;
839        theList = dl;
840        pairlist = L;
841        goon = true;
842        //logger.info("reducer server created " + this);
843    }
844
845
846    /**
847     * Work loop.
848     * @see java.lang.Thread#run()
849     */
850    @Override
851    public void run() {
852        //Pair<C> pair = null;
853        GenPolynomial<C> H = null;
854        int red = 0;
855        int polIndex = -1;
856        //Integer senderId; // obsolete
857
858        // while more requests
859        while (goon) {
860            // receive request
861            logger.debug("receive result");
862            //senderId = null;
863            Object rh = null;
864            try {
865                rh = pairChannel.receive(resultTag);
866                @SuppressWarnings("unused")
867                int i = active.getAndDecrement();
868            } catch (InterruptedException e) {
869                goon = false;
870                //e.printStackTrace();
871                //?? finner.initIdle(1);
872                break;
873            } catch (IOException e) {
874                e.printStackTrace();
875                goon = false;
876                finner.initIdle(1);
877                break;
878            } catch (ClassNotFoundException e) {
879                e.printStackTrace();
880                goon = false;
881                finner.initIdle(1);
882                break;
883            }
884            logger.info("received H polynomial");
885            if (rh == null) {
886                if (this.isInterrupted()) {
887                    goon = false;
888                    finner.initIdle(1);
889                    break;
890                }
891                //finner.initIdle(1);
892            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
893                logger.info("received GBTransportMessEnd");
894                goon = false;
895                //?? finner.initIdle(1);
896                break;
897            } else if (rh instanceof GBTransportMessPoly) {
898                // update pair list
899                red++;
900                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
901                H = mpi.pol;
902                //senderId = mpi.threadId;
903                if (H != null) {
904                    if (debug) {
905                        logger.info("H = " + H.leadingExpVector());
906                    }
907                    if (!H.isZERO()) {
908                        if (H.isONE()) {
909                            // finner.allIdle();
910                            polIndex = pairlist.putOne();
911                            theList.putWait(Integer.valueOf(polIndex), H);
912                            //goon = false; must wait for other clients
913                            //finner.initIdle(1);
914                            //break;
915                        } else {
916                            polIndex = pairlist.put(H);
917                            // use putWait ? but still not all distributed
918                            //GenPolynomial<C> nn = 
919                            theList.putWait(Integer.valueOf(polIndex), H);
920                        }
921                    }
922                }
923            }
924            // only after recording in pairlist !
925            finner.initIdle(1);
926            try {
927                pairChannel.send(ackTag, new GBTransportMess());
928                logger.debug("send acknowledgement");
929            } catch (IOException e) {
930                e.printStackTrace();
931                goon = false;
932                break;
933            }
934        } // end while
935        goon = false;
936        logger.info("terminated, received " + red + " reductions");
937    }
938
939
940    /**
941     * Terminate.
942     */
943    public void terminate() {
944        goon = false;
945        //this.interrupt();
946        try {
947            this.join();
948        } catch (InterruptedException e) {
949            // unfug Thread.currentThread().interrupt();
950        }
951        logger.debug("HybridReducerReceiver terminated");
952    }
953
954}
955
956
957/**
958 * Distributed clients reducing worker threads.
959 */
960class HybridReducerClientEC<C extends RingElem<C>> implements Runnable {
961
962
963    private static final Logger logger = Logger.getLogger(HybridReducerClientEC.class);
964
965
966    public final boolean debug = logger.isDebugEnabled();
967
968
969    private final TaggedSocketChannel pairChannel;
970
971
972    private final DistHashTable<Integer, GenPolynomial<C>> theList;
973
974
975    private final ReductionPar<C> red;
976
977
978    //private final int threadsPerNode;
979
980
981    /*
982     * Identification number for this thread.
983     */
984    //public final Integer threadId; // obsolete
985
986
987    /**
988     * Message tag for pairs.
989     */
990    public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag;
991
992
993    /**
994     * Message tag for results.
995     */
996    public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag;
997
998
999    /**
1000     * Message tag for acknowledgments.
1001     */
1002    public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag;
1003
1004
1005    /**
1006     * Constructor.
1007     * @param tc tagged socket channel
1008     * @param dl distributed hash table
1009     */
1010    //param tpn number of threads per node
1011    //param tid thread identification
1012    HybridReducerClientEC(/*int tpn,*/TaggedSocketChannel tc, /*Integer tid,*/
1013                    DistHashTable<Integer, GenPolynomial<C>> dl) {
1014        //threadsPerNode = tpn;
1015        pairChannel = tc;
1016        //threadId = 100 + tid; // keep distinct from other tags
1017        theList = dl;
1018        red = new ReductionPar<C>();
1019    }
1020
1021
1022    /**
1023     * Work loop.
1024     * @see java.lang.Runnable#run()
1025     */
1026    @Override
1027    public void run() {
1028        if (debug) {
1029            logger.info("pairChannel   = " + pairChannel + " reducer client running");
1030        }
1031        Pair<C> pair = null;
1032        GenPolynomial<C> pi, pj, ps;
1033        GenPolynomial<C> S;
1034        GenPolynomial<C> H = null;
1035        //boolean set = false;
1036        boolean goon = true;
1037        boolean doEnd = true;
1038        int reduction = 0;
1039        //int sleeps = 0;
1040        Integer pix, pjx, psx;
1041
1042        while (goon) {
1043            /* protocol:
1044             * request pair, process pair, send result, receive acknowledgment
1045             */
1046            // pair = (Pair) pairlist.removeNext();
1047            Object req = new GBTransportMessReq();
1048            logger.debug("send request");
1049            try {
1050                pairChannel.send(pairTag, req);
1051            } catch (IOException e) {
1052                goon = false;
1053                if (logger.isDebugEnabled()) {
1054                    e.printStackTrace();
1055                }
1056                logger.info("receive pair, IOexception ");
1057                break;
1058            }
1059            logger.debug("receive pair, goon = " + goon);
1060            doEnd = true;
1061            Object pp = null;
1062            try {
1063                pp = pairChannel.receive(pairTag);
1064            } catch (InterruptedException e) {
1065                goon = false;
1066                e.printStackTrace();
1067            } catch (IOException e) {
1068                goon = false;
1069                if (logger.isDebugEnabled()) {
1070                    e.printStackTrace();
1071                }
1072                break;
1073            } catch (ClassNotFoundException e) {
1074                goon = false;
1075                e.printStackTrace();
1076            }
1077            if (debug) {
1078                logger.info("received pair = " + pp);
1079            }
1080            H = null;
1081            if (pp == null) { // should not happen
1082                continue;
1083            }
1084            if (pp instanceof GBTransportMessEnd) {
1085                goon = false;
1086                //doEnd = false;
1087                continue;
1088            }
1089            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1090                pi = pj = ps = null;
1091                if (pp instanceof GBTransportMessPair) { // obsolet, for tests
1092                    GBTransportMessPair<C> tmp = (GBTransportMessPair<C>) pp;
1093                    pair = tmp.pair;
1094                    if (pair != null) {
1095                        pi = pair.pi;
1096                        pj = pair.pj;
1097                        //logger.debug("pair: pix = " + pair.i 
1098                        //               + ", pjx = " + pair.j);
1099                    }
1100                }
1101                if (pp instanceof GBTransportMessPairIndex) {
1102                    GBTransportMessPairIndex tmpi = (GBTransportMessPairIndex) pp;
1103                    pix = tmpi.i;
1104                    pjx = tmpi.j;
1105                    psx = tmpi.s;
1106                    pi = theList.getWait(pix);
1107                    pj = theList.getWait(pjx);
1108                    ps = theList.getWait(psx);
1109                    //logger.info("pix = " + pix + ", pjx = " + pjx + ", psx = " + psx);
1110                }
1111                if (pi != null && pj != null) {
1112                    S = red.SPolynomial(pi, pj);
1113                    //logger.info("ht(S) = " + S.leadingExpVector());
1114                    if (S.isZERO()) {
1115                        // pair.setZero(); does not work in dist
1116                    } else {
1117                        if (logger.isDebugEnabled()) {
1118                            logger.debug("ht(S) = " + S.leadingExpVector());
1119                        }
1120                        H = red.normalform(theList, S);
1121                        //logger.info("ht(H) = " + H.leadingExpVector());
1122                        reduction++;
1123                        if (H.isZERO()) {
1124                            // pair.setZero(); does not work in dist
1125                        } else {
1126                            H = H.monic();
1127                            if (logger.isInfoEnabled()) {
1128                                logger.info("ht(H) = " + H.leadingExpVector());
1129                            }
1130                        }
1131                    }
1132                } else {
1133                    logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps);
1134                }
1135            }
1136            if (pp instanceof GBTransportMess) {
1137                logger.debug("null pair results in null H poly");
1138            }
1139
1140            // send H or must send null, if not at end
1141            if (logger.isDebugEnabled()) {
1142                logger.debug("#distributed list = " + theList.size());
1143                logger.debug("send H polynomial = " + H);
1144            }
1145            try {
1146                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1147                doEnd = false;
1148            } catch (IOException e) {
1149                goon = false;
1150                e.printStackTrace();
1151            }
1152            //logger.info("done send poly message of " + pp);
1153            try {
1154                //pp = pairChannel.receive(threadId);
1155                pp = pairChannel.receive(ackTag);
1156            } catch (InterruptedException e) {
1157                goon = false;
1158                e.printStackTrace();
1159            } catch (IOException e) {
1160                goon = false;
1161                if (logger.isDebugEnabled()) {
1162                    e.printStackTrace();
1163                }
1164                break;
1165            } catch (ClassNotFoundException e) {
1166                goon = false;
1167                e.printStackTrace();
1168            }
1169            if (!(pp instanceof GBTransportMess)) {
1170                logger.error("invalid acknowledgement " + pp);
1171            }
1172            logger.info("received acknowledgment ");
1173        }
1174        logger.info("terminated, " + reduction + " reductions, " + theList.size() + " polynomials");
1175        if (doEnd) {
1176            try {
1177                pairChannel.send(resultTag, new GBTransportMessEnd());
1178            } catch (IOException e) {
1179                //e.printStackTrace();
1180            }
1181            logger.debug("terminated, send done");
1182        }
1183    }
1184}
1185
1186
1187/**
1188 * Objects of this class are to be send to a ExecutableServer.
1189 */
1190class GBHybridExerClient<C extends RingElem<C>> implements RemoteExecutable {
1191
1192
1193    String host;
1194
1195
1196    int port;
1197
1198
1199    int dhtport;
1200
1201
1202    int threadsPerNode;
1203
1204
1205    /**
1206     * GBHybridExerClient.
1207     * @param host
1208     * @param port
1209     * @param dhtport
1210     */
1211    public GBHybridExerClient(String host, int threadsPerNode, int port, int dhtport) {
1212        this.host = host;
1213        this.threadsPerNode = threadsPerNode;
1214        this.port = port;
1215        this.dhtport = dhtport;
1216    }
1217
1218
1219    /**
1220     * run.
1221     */
1222    public void run() {
1223        try {
1224            GroebnerBaseDistributedHybridEC.<C> clientPart(host, threadsPerNode, port, dhtport);
1225        } catch (Exception e) {
1226            e.printStackTrace();
1227        }
1228    }
1229
1230
1231    /**
1232     * String representation.
1233     */
1234    @Override
1235    public String toString() {
1236        StringBuffer s = new StringBuffer("GBHybridExerClient(");
1237        s.append("host=" + host);
1238        s.append(", threadsPerNode=" + threadsPerNode);
1239        s.append(", port=" + port);
1240        s.append(", dhtport=" + dhtport);
1241        s.append(")");
1242        return s.toString();
1243    }
1244
1245}