001/*
002 * $Id: GroebnerBaseDistributedHybridEC.java 4261 2012-10-21 11:34:02Z kredel $
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.atomic.AtomicInteger;
014
015import org.apache.log4j.Logger;
016
017import edu.jas.poly.ExpVector;
018import edu.jas.poly.GenPolynomial;
019import edu.jas.structure.RingElem;
020import edu.jas.util.ChannelFactory;
021import edu.jas.util.DistHashTable;
022import edu.jas.util.DistHashTableServer;
023import edu.jas.util.SocketChannel;
024import edu.jas.util.TaggedSocketChannel;
025import edu.jas.util.Terminator;
026import edu.jas.util.ThreadPool;
027import edu.jas.util.DistThreadPool;
028import edu.jas.util.RemoteExecutable;
029
030
031/**
032 * Groebner Base distributed hybrid algorithm. Implements a
033 * distributed memory with multi-core CPUs parallel version of
034 * Groebner bases with executable channels. Using pairlist class,
035 * distributed multi-threaded tasks do reduction, one communication
036 * channel per remote node.
037 * @param <C> coefficient type
038 * @author Heinz Kredel
039 */
040
041public class GroebnerBaseDistributedHybridEC<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
042
043
044    public static final Logger logger = Logger.getLogger(GroebnerBaseDistributedHybridEC.class);
045
046
047    public final boolean debug = logger.isDebugEnabled();
048
049
050    /**
051     * Number of threads to use.
052     */
053    protected final int threads;
054
055
056    /**
057     * Default number of threads.
058     */
059    protected static final int DEFAULT_THREADS = 2;
060
061
062    /**
063     * Number of threads per node to use.
064     */
065    protected final int threadsPerNode;
066
067
068    /**
069     * Default number of threads per compute node.
070     */
071    protected static final int DEFAULT_THREADS_PER_NODE = 1;
072
073
074    /**
075     * Pool of threads to use.
076     */
077    //protected final ExecutorService pool; // not for single node tests
078    protected transient final ThreadPool pool;
079
080
081    /**
082     * Default server port.
083     */
084    protected static final int DEFAULT_PORT = 55711;
085
086
087    /**
088     * Default distributed hash table server port.
089     */
090    protected final int DHT_PORT;
091
092
093    /**
094     * machine file to use.
095     */
096    protected final String mfile;
097
098
099    /**
100     * Server port to use.
101     */
102    protected final int port;
103
104
105    /**
106     * Distributed thread pool to use.
107     */
108    private final DistThreadPool dtp;
109
110
111    /**
112     * Distributed hash table server to use.
113     */
114    private final DistHashTableServer<Integer> dhts;
115
116
117    /**
118     * Message tag for pairs.
119     */
120    public static final Integer pairTag = Integer.valueOf(1);
121
122
123    /**
124     * Message tag for results.
125     */
126    public static final Integer resultTag = Integer.valueOf(2);
127
128
129    /**
130     * Message tag for acknowledgments.
131     */
132    public static final Integer ackTag = Integer.valueOf(3);
133
134
135    /**
136     * Constructor.
137     * @param mfile name of the machine file.
138     */
139    public GroebnerBaseDistributedHybridEC(String mfile) {
140        this(mfile, DEFAULT_THREADS, DEFAULT_PORT);
141    }
142
143
144    /**
145     * Constructor.
146     * @param mfile name of the machine file.
147     * @param threads number of threads to use.
148     */
149    public GroebnerBaseDistributedHybridEC(String mfile, int threads) {
150        this(mfile, threads, new ThreadPool(threads), DEFAULT_PORT);
151    }
152
153
154    /**
155     * Constructor.
156     * @param mfile name of the machine file.
157     * @param threads number of threads to use.
158     * @param port server port to use.
159     */
160    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int port) {
161        this(mfile, threads, new ThreadPool(threads), port);
162    }
163
164
165    /**
166     * Constructor.
167     * @param mfile name of the machine file.
168     * @param threads number of threads to use.
169     * @param threadsPerNode threads per node to use.
170     * @param port server port to use.
171     */
172    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, int port) {
173        this(mfile, threads, threadsPerNode, new ThreadPool(threads), port);
174    }
175
176
177    /**
178     * Constructor.
179     * @param mfile name of the machine file.
180     * @param threads number of threads to use.
181     * @param pool ThreadPool to use.
182     * @param port server port to use.
183     */
184    public GroebnerBaseDistributedHybridEC(String mfile, int threads, ThreadPool pool, int port) {
185        this(mfile, threads, DEFAULT_THREADS_PER_NODE, pool, port);
186    }
187
188
189    /**
190     * Constructor.
191     * @param mfile name of the machine file.
192     * @param threads number of threads to use.
193     * @param threadsPerNode threads per node to use.
194     * @param pl pair selection strategy
195     * @param port server port to use.
196     */
197    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, PairList<C> pl, int port) {
198        this(mfile, threads, threadsPerNode, new ThreadPool(threads), pl, port);
199    }
200
201
202    /**
203     * Constructor.
204     * @param mfile name of the machine file.
205     * @param threads number of threads to use.
206     * @param threadsPerNode threads per node to use.
207     * @param port server port to use.
208     */
209    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, 
210                                           ThreadPool pool, int port) {
211        this(mfile, threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
212    }
213
214
215    /**
216     * Constructor.
217     * @param mfile name of the machine file.
218     * @param threads number of threads to use.
219     * @param threadsPerNode threads per node to use.
220     * @param pool ThreadPool to use.
221     * @param pl pair selection strategy
222     * @param port server port to use.
223     */
224    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ThreadPool pool, 
225                    PairList<C> pl, int port) {
226        super(new ReductionPar<C>(), pl);
227        this.threads = threads;
228        if (mfile == null || mfile.length() == 0) {
229            this.mfile = "../util/machines"; // contains localhost
230        } else {
231            this.mfile = mfile;
232        }
233        if (threads < 1) {
234            threads = 1;
235        }
236        this.threadsPerNode = threadsPerNode;
237        if ( pool == null ) {
238            pool = new ThreadPool(threads);
239        }
240        this.pool = pool;
241        this.port = port;
242        logger.info("machine file " + mfile + ", port = " + port);
243        this.dtp = new DistThreadPool(this.threads, this.mfile);
244        logger.info("running " + dtp);
245        this.DHT_PORT = this.dtp.getEC().getMasterPort() + 100;
246        this.dhts = new DistHashTableServer<Integer>(this.DHT_PORT);
247        this.dhts.init();
248        logger.info("running " + dhts);
249    }
250
251
252    /**
253     * Cleanup and terminate ThreadPool.
254     */
255    @Override
256    public void terminate() {
257        terminate(true);
258    }
259
260
261    /**
262     * Terminates the distributed thread pools.
263     * @param shutDown true, if shut-down of the remote executable servers is
264     *            requested, false, if remote executable servers stay alive.
265     */
266    public void terminate(boolean shutDown) {
267        pool.terminate();
268        dtp.terminate(shutDown);
269        logger.info("dhts.terminate()");
270        dhts.terminate();
271    }
272
273
274    /**
275     * Distributed Groebner base.
276     * @param modv number of module variables.
277     * @param F polynomial list.
278     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
279     */
280    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
281        String master = dtp.getEC().getMasterHost();
282        //int port = dtp.getEC().getMasterPort(); // wrong port
283        GBHybridExerClient<C> gbc = new GBHybridExerClient<C>(master, threadsPerNode, port, DHT_PORT);
284        for (int i = 0; i < threads; i++) {
285            // schedule remote clients
286            dtp.addJob(gbc);
287        }
288        // run master
289        List<GenPolynomial<C>> G = GBMaster(modv, F);
290
291        return G;
292    }
293
294
295    /**
296     * Distributed hybrid Groebner base.
297     * @param modv number of module variables.
298     * @param F polynomial list.
299     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
300     */
301    public List<GenPolynomial<C>> GBMaster(int modv, List<GenPolynomial<C>> F) {
302        long t = System.currentTimeMillis();
303        ChannelFactory cf = new ChannelFactory(port);
304        cf.init();
305
306        GenPolynomial<C> p;
307        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
308        PairList<C> pairlist = null;
309        boolean oneInGB = false;
310        int l = F.size();
311        int unused;
312        ListIterator<GenPolynomial<C>> it = F.listIterator();
313        while (it.hasNext()) {
314            p = it.next();
315            if (p.length() > 0) {
316                p = p.monic();
317                if (p.isONE()) {
318                    oneInGB = true;
319                    G.clear();
320                    G.add(p);
321                    //return G; must signal termination to others
322                }
323                if (!oneInGB) {
324                    G.add(p);
325                }
326                if (pairlist == null) {
327                    //pairlist = new OrderedPairlist<C>(modv, p.ring);
328                    pairlist = strategy.create(modv, p.ring);
329                    if (!p.ring.coFac.isField()) {
330                        throw new IllegalArgumentException("coefficients not from a field");
331                    }
332                }
333                // theList not updated here
334                if (p.isONE()) {
335                    unused = pairlist.putOne();
336                } else {
337                    unused = pairlist.put(p);
338                }
339            } else {
340                l--;
341            }
342        }
343        //if (l <= 1) {
344            //return G; must signal termination to others
345        //}
346        logger.info("pairlist " + pairlist);
347        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(
348                        "localhost", DHT_PORT);
349        theList.init();
350        List<GenPolynomial<C>> al = pairlist.getList();
351        for (int i = 0; i < al.size(); i++) {
352            // no wait required
353            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
354            if (nn != null) {
355                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
356            }
357        }
358
359        Terminator finner = new Terminator(threads * threadsPerNode);
360        HybridReducerServerEC<C> R;
361        logger.info("using pool = " + pool);
362        for (int i = 0; i < threads; i++) {
363            R = new HybridReducerServerEC<C>(threadsPerNode, finner, cf, theList, pairlist);
364            pool.addJob(R);
365            //logger.info("server submitted " + R);
366        }
367        logger.info("main loop waiting " + finner);
368        finner.waitDone();
369        int ps = theList.size();
370        logger.info("#distributed list = " + ps);
371        // make sure all polynomials arrived: not needed in master
372        // G = (ArrayList)theList.values();
373        G = pairlist.getList();
374        if (ps != G.size()) {
375            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
376        }
377        for (GenPolynomial<C> q : theList.getValueList()) {
378            if (q != null && !q.isZERO()) {
379                logger.debug("final q = " + q.leadingExpVector());
380            }
381        }
382        logger.debug("distributed list end");
383        long time = System.currentTimeMillis();
384        List<GenPolynomial<C>> Gp;
385        Gp = minimalGB(G); // not jet distributed but threaded
386        time = System.currentTimeMillis() - time;
387        logger.info("parallel gbmi time = " + time);
388        G = Gp;
389        logger.debug("server cf.terminate()");
390        cf.terminate();
391        // no more required // 
392        logger.info("server not pool.terminate() " + pool);
393        //pool.terminate();
394        logger.info("server theList.terminate() " + theList.size());
395        theList.terminate();
396        t = System.currentTimeMillis() - t;
397        logger.info("server GB end, time = " + t + ", " + pairlist.toString());
398        return G;
399    }
400
401
402    /**
403     * GB distributed client part.
404     * @param host the server runs on.
405     * @param port the server runs.
406     * @param dhtport of the DHT server.
407     * @throws IOException
408     */
409    public static <C extends RingElem<C>> void clientPart(String host, int threadsPerNode, int port, int dhtport) throws IOException {
410        ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
411        cf.init();
412        logger.info("clientPart connecting to " + host + ", port = " + port + ", dhtport = " + dhtport);
413        SocketChannel channel = cf.getChannel(host, port);
414        TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
415        pairChannel.init();
416
417        DistHashTable<Integer, GenPolynomial<C>> theList 
418            = new DistHashTable<Integer, GenPolynomial<C>>(host, dhtport);
419        theList.init();
420
421        ThreadPool pool = new ThreadPool(threadsPerNode);
422        logger.info("client using pool = " + pool);
423        for (int i = 0; i < threadsPerNode; i++) {
424            HybridReducerClientEC<C> Rr = new HybridReducerClientEC<C>(threadsPerNode, pairChannel, i, theList);
425            pool.addJob(Rr);
426        }
427        logger.debug("clients submitted");
428
429        pool.terminate();
430        logger.debug("client pool.terminate()");
431
432        pairChannel.close();
433        logger.info("client pairChannel.close()");
434
435        theList.terminate();
436        cf.terminate();
437        logger.info("client cf.terminate()");
438
439        channel.close();
440        logger.info("client channel.close()");
441        return;
442    }
443
444
445    /**
446     * Minimal ordered groebner basis.
447     * @param Fp a Groebner base.
448     * @return a reduced Groebner base of Fp.
449     */
450    @Override
451    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
452        GenPolynomial<C> a;
453        ArrayList<GenPolynomial<C>> G;
454        G = new ArrayList<GenPolynomial<C>>(Fp.size());
455        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
456        while (it.hasNext()) {
457            a = it.next();
458            if (a.length() != 0) { // always true
459                // already monic  a = a.monic();
460                G.add(a);
461            }
462        }
463        if (G.size() <= 1) {
464            return G;
465        }
466
467        ExpVector e;
468        ExpVector f;
469        GenPolynomial<C> p;
470        ArrayList<GenPolynomial<C>> F;
471        F = new ArrayList<GenPolynomial<C>>(G.size());
472        boolean mt;
473
474        while (G.size() > 0) {
475            a = G.remove(0);
476            e = a.leadingExpVector();
477
478            it = G.listIterator();
479            mt = false;
480            while (it.hasNext() && !mt) {
481                p = it.next();
482                f = p.leadingExpVector();
483                mt = e.multipleOf(f);
484            }
485            it = F.listIterator();
486            while (it.hasNext() && !mt) {
487                p = it.next();
488                f = p.leadingExpVector();
489                mt = e.multipleOf(f);
490            }
491            if (!mt) {
492                F.add(a);
493            } else {
494                // System.out.println("dropped " + a.length());
495            }
496        }
497        G = F;
498        if (G.size() <= 1) {
499            return G;
500        }
501        Collections.reverse(G); // important for lex GB
502
503        MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
504        int i = 0;
505        F = new ArrayList<GenPolynomial<C>>(G.size());
506        while (G.size() > 0) {
507            a = G.remove(0);
508            // System.out.println("doing " + a.length());
509            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
510            R.addAll(G);
511            R.addAll(F);
512            mirs[i] = new MiReducerServer<C>(R, a);
513            pool.addJob(mirs[i]);
514            i++;
515            F.add(a);
516        }
517        G = F;
518        F = new ArrayList<GenPolynomial<C>>(G.size());
519        for (i = 0; i < mirs.length; i++) {
520            a = mirs[i].getNF();
521            F.add(a);
522        }
523        return F;
524    }
525
526}
527
528
529/**
530 * Distributed server reducing worker proxy threads.
531 * @param <C> coefficient type
532 */
533
534class HybridReducerServerEC<C extends RingElem<C>> implements Runnable {
535
536
537    public static final Logger logger = Logger.getLogger(HybridReducerServerEC.class);
538
539
540    public final boolean debug = logger.isDebugEnabled();
541
542
543    private final Terminator finner;
544
545
546    private final ChannelFactory cf;
547
548
549    private TaggedSocketChannel pairChannel;
550
551
552    private final DistHashTable<Integer, GenPolynomial<C>> theList;
553
554
555    private final PairList<C> pairlist;
556
557
558    private final int threadsPerNode;
559
560
561    /**
562     * Message tag for pairs.
563     */
564    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
565
566
567    /**
568     * Message tag for results.
569     */
570    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
571
572
573    /**
574     * Message tag for acknowledgments.
575     */
576    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
577
578
579    /**
580     * Constructor.
581     * @param tpn number of threads per node
582     * @param fin terminator
583     * @param cf channel factory
584     * @param dl distributed hash table
585     * @param L ordered pair list
586     */
587    HybridReducerServerEC(int tpn, Terminator fin, ChannelFactory cf,
588                         DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
589        threadsPerNode = tpn;
590        finner = fin;
591        this.cf = cf;
592        theList = dl;
593        pairlist = L;
594        //logger.info("reducer server created " + this);
595    }
596
597
598    /**
599     * Work loop.
600     * @see java.lang.Runnable#run()
601     */
602    //JAVA6only: @Override
603    public void run() {
604        logger.info("reducer server running with " + cf);
605        SocketChannel channel = null;
606        try {
607            channel = cf.getChannel();
608            pairChannel = new TaggedSocketChannel(channel);
609            pairChannel.init();
610        } catch (InterruptedException e) {
611            logger.debug("get pair channel interrupted");
612            e.printStackTrace();
613            return;
614        }
615        if (debug) {
616            logger.info("pairChannel   = " + pairChannel);
617        }
618        // record idle remote workers (minus one?)
619        //finner.beIdle(threadsPerNode-1);
620        finner.initIdle(threadsPerNode);
621        AtomicInteger active = new AtomicInteger(0);
622
623        // start receiver
624        HybridReducerReceiverEC<C> receiver = new HybridReducerReceiverEC<C>(threadsPerNode, finner, active,
625                        pairChannel, theList, pairlist);
626        receiver.start();
627
628        Pair<C> pair;
629        //boolean set = false;
630        boolean goon = true;
631        //int polIndex = -1;
632        int red = 0;
633        int sleeps = 0;
634
635        // while more requests
636        while (goon) {
637            // receive request if thread is reported incactive
638            logger.debug("receive request");
639            Object req = null;
640            try {
641                req = pairChannel.receive(pairTag);
642            } catch (InterruptedException e) {
643                goon = false;
644                e.printStackTrace();
645            } catch (IOException e) {
646                goon = false;
647                e.printStackTrace();
648            } catch (ClassNotFoundException e) {
649                goon = false;
650                e.printStackTrace();
651            }
652            logger.info("received request, req = " + req);
653            if (req == null) {
654                goon = false;
655                break;
656            }
657            if (!(req instanceof GBTransportMessReq)) {
658                goon = false;
659                break;
660            }
661
662            // find pair and manage termination status
663            logger.info("find pair");
664            while (!pairlist.hasNext()) { // wait
665                if (!finner.hasJobs() && !pairlist.hasNext()) {
666                    goon = false;
667                    break;
668                }
669                try {
670                    sleeps++;
671                    //if (sleeps % 10 == 0) {
672                    logger.info("waiting for reducers, remaining = " + finner.getJobs());
673                    //}
674                    Thread.sleep(100);
675                } catch (InterruptedException e) {
676                    goon = false;
677                    break;
678                }
679            }
680            if (!pairlist.hasNext() && !finner.hasJobs()) {
681                logger.info("termination detection: no pairs and no jobs left");
682                goon = false;
683                break; //continue; //break?
684            }
685            finner.notIdle(); // before pairlist get!!
686            pair = pairlist.removeNext();
687            // send pair to client, even if null
688            if (debug) {
689                logger.info("active count = " + active.get());
690                logger.info("send pair = " + pair);
691            }
692            GBTransportMess msg = null;
693            if (pair != null) {
694                msg = new GBTransportMessPairIndex(pair); //,pairlist.size()-1); // size-1
695            } else {
696                msg = new GBTransportMess(); // not End(); at this time
697                // goon ?= false;
698            }
699            try {
700                red++;
701                pairChannel.send(pairTag, msg);
702                int a = active.getAndIncrement();
703            } catch (IOException e) {
704                e.printStackTrace();
705                goon = false;
706                break;
707            }
708            //logger.debug("#distributed list = " + theList.size());
709        }
710        logger.info("terminated, send " + red + " reduction pairs");
711
712        /*
713         * send end mark to clients
714         */
715        logger.debug("send end");
716        try {
717            for (int i = 0; i < threadsPerNode; i++) { // -1
718                //do not wait: Object rq = pairChannel.receive(pairTag);
719                pairChannel.send(pairTag, new GBTransportMessEnd());
720            }
721            // send also end to receiver
722            pairChannel.send(resultTag, new GBTransportMessEnd());
723            //beware of race condition 
724        } catch (IOException e) {
725            if (logger.isDebugEnabled()) {
726                e.printStackTrace();
727            }
728        }
729        receiver.terminate();
730
731        int d = active.get();
732        logger.info("remaining active tasks = " + d);
733        //logger.info("terminated, send " + red + " reduction pairs");
734        pairChannel.close();
735        logger.info("redServ pairChannel.close()");
736        finner.release();
737
738        channel.close();
739        logger.info("redServ channel.close()");
740    }
741}
742
743
744/**
745 * Distributed server receiving worker thread.
746 * @param <C> coefficient type
747 */
748
749class HybridReducerReceiverEC<C extends RingElem<C>> extends Thread {
750
751
752    public static final Logger logger = Logger.getLogger(HybridReducerReceiverEC.class);
753
754
755    public final boolean debug = logger.isDebugEnabled();
756
757
758    private final DistHashTable<Integer, GenPolynomial<C>> theList;
759
760
761    private final PairList<C> pairlist;
762
763
764    private final TaggedSocketChannel pairChannel;
765
766
767    private final Terminator finner;
768
769
770    private final int threadsPerNode;
771
772
773    private final AtomicInteger active;
774
775
776    private volatile boolean goon;
777
778
779    /**
780     * Message tag for pairs.
781     */
782    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
783
784
785    /**
786     * Message tag for results.
787     */
788    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
789
790
791    /**
792     * Message tag for acknowledgments.
793     */
794    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
795
796
797    /**
798     * Constructor.
799     * @param tpn number of threads per node
800     * @param fin terminator
801     * @param a active remote tasks count
802     * @param pc tagged socket channel
803     * @param dl distributed hash table
804     * @param L ordered pair list
805     */
806    HybridReducerReceiverEC(int tpn, Terminator fin, AtomicInteger a, TaggedSocketChannel pc,
807                            DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
808        active = a;
809        threadsPerNode = tpn;
810        finner = fin;
811        pairChannel = pc;
812        theList = dl;
813        pairlist = L;
814        goon = true;
815        //logger.info("reducer server created " + this);
816    }
817
818
819    /**
820     * Work loop.
821     * @see java.lang.Thread#run()
822     */
823    @Override
824    public void run() {
825        //Pair<C> pair = null;
826        GenPolynomial<C> H = null;
827        int red = 0;
828        int polIndex = -1;
829        //Integer senderId; // obsolete
830
831        // while more requests
832        while (goon) {
833            // receive request
834            logger.debug("receive result");
835            //senderId = null;
836            Object rh = null;
837            try {
838                rh = pairChannel.receive(resultTag);
839                int i = active.getAndDecrement();
840            } catch (InterruptedException e) {
841                goon = false;
842                //e.printStackTrace();
843                //?? finner.initIdle(1);
844                break;
845            } catch (IOException e) {
846                e.printStackTrace();
847                goon = false;
848                finner.initIdle(1);
849                break;
850            } catch (ClassNotFoundException e) {
851                e.printStackTrace();
852                goon = false;
853                finner.initIdle(1);
854                break;
855            }
856            logger.info("received H polynomial");
857            if (rh == null) {
858                if (this.isInterrupted()) {
859                    goon = false;
860                    finner.initIdle(1);
861                    break;
862                }
863                //finner.initIdle(1);
864            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
865                logger.info("received GBTransportMessEnd");
866                goon = false;
867                //?? finner.initIdle(1);
868                break;
869            } else if (rh instanceof GBTransportMessPoly) {
870                // update pair list
871                red++;
872                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
873                H = mpi.pol;
874                //senderId = mpi.threadId;
875                if (H != null) {
876                    if (debug) {
877                        logger.info("H = " + H.leadingExpVector());
878                    }
879                    if (!H.isZERO()) {
880                        if (H.isONE()) {
881                            logger.info("H = one");
882                            // finner.allIdle();
883                            polIndex = pairlist.putOne();
884                            GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H);
885                            if (nn != null) {
886                                logger.info("double polynomials nn = " + nn + ", H = " + H);
887                            }
888                            //goon = false; must wait for other clients
889                            //finner.initIdle(1);
890                            //break;
891                        } else {
892                            polIndex = pairlist.put(H);
893                            // use putWait ? but still not all distributed
894                            GenPolynomial<C> nn = theList.put(Integer.valueOf(polIndex), H);
895                            if (nn != null) {
896                                logger.info("double polynomials nn = " + nn + ", H = " + H);
897                            }
898                        }
899                    }
900                }
901            }
902            // only after recording in pairlist !
903            finner.initIdle(1);
904            try {
905                pairChannel.send(ackTag, new GBTransportMess());
906                logger.debug("send acknowledgement");
907            } catch (IOException e) {
908                e.printStackTrace();
909                goon = false;
910                break;
911            }
912        } // end while
913        goon = false;
914        logger.info("terminated, received " + red + " reductions");
915    }
916
917
918    /**
919     * Terminate.
920     */
921    public void terminate() {
922        goon = false;
923        //this.interrupt();
924        try {
925            this.join();
926        } catch (InterruptedException e) {
927            // unfug Thread.currentThread().interrupt();
928        }
929        logger.debug("HybridReducerReceiver terminated");
930    }
931
932}
933
934
935/**
936 * Distributed clients reducing worker threads.
937 */
938
939class HybridReducerClientEC<C extends RingElem<C>> implements Runnable {
940
941
942    private static final Logger logger = Logger.getLogger(HybridReducerClientEC.class);
943
944
945    public final boolean debug = logger.isDebugEnabled();
946
947
948    private final TaggedSocketChannel pairChannel;
949
950
951    private final DistHashTable<Integer, GenPolynomial<C>> theList;
952
953
954    private final ReductionPar<C> red;
955
956
957    private final int threadsPerNode;
958
959
960    /*
961     * Identification number for this thread.
962     */
963    //public final Integer threadId; // obsolete
964
965
966    /**
967     * Message tag for pairs.
968     */
969    public final Integer pairTag = GroebnerBaseDistributedHybrid.pairTag;
970
971
972    /**
973     * Message tag for results.
974     */
975    public final Integer resultTag = GroebnerBaseDistributedHybrid.resultTag;
976
977
978    /**
979     * Message tag for acknowledgments.
980     */
981    public final Integer ackTag = GroebnerBaseDistributedHybrid.ackTag;
982
983
984    /**
985     * Constructor.
986     * @param tpn number of threads per node
987     * @param tc tagged socket channel
988     * @param tid thread identification
989     * @param dl distributed hash table
990     */
991    HybridReducerClientEC(int tpn, TaggedSocketChannel tc, Integer tid,
992                          DistHashTable<Integer, GenPolynomial<C>> dl) {
993        this.threadsPerNode = tpn;
994        pairChannel = tc;
995        //threadId = 100 + tid; // keep distinct from other tags
996        theList = dl;
997        red = new ReductionPar<C>();
998    }
999
1000
1001    /**
1002     * Work loop.
1003     * @see java.lang.Runnable#run()
1004     */
1005    //JAVA6only: @Override
1006    public void run() {
1007        if (debug) {
1008            logger.info("pairChannel   = " + pairChannel + " reducer client running");
1009        }
1010        Pair<C> pair = null;
1011        GenPolynomial<C> pi;
1012        GenPolynomial<C> pj;
1013        GenPolynomial<C> sp = null;
1014        GenPolynomial<C> S;
1015        GenPolynomial<C> H = null;
1016        //boolean set = false;
1017        boolean goon = true;
1018        boolean doEnd = true;
1019        int reduction = 0;
1020        //int sleeps = 0;
1021        Integer pix;
1022        Integer pjx;
1023
1024        while (goon) {
1025            /* protocol:
1026             * request pair, process pair, send result, receive acknowledgment
1027             */
1028            // pair = (Pair) pairlist.removeNext();
1029            Object req = new GBTransportMessReq();
1030            logger.info("send request = " + req);
1031            try {
1032                pairChannel.send(pairTag, req);
1033            } catch (IOException e) {
1034                goon = false;
1035                if (logger.isDebugEnabled()) {
1036                    e.printStackTrace();
1037                }
1038                logger.info("receive pair, exception ");
1039                break;
1040            }
1041            logger.debug("receive pair, goon = " + goon);
1042            doEnd = true;
1043            Object pp = null;
1044            try {
1045                pp = pairChannel.receive(pairTag);
1046            } catch (InterruptedException e) {
1047                goon = false;
1048                e.printStackTrace();
1049            } catch (IOException e) {
1050                goon = false;
1051                if (logger.isDebugEnabled()) {
1052                    e.printStackTrace();
1053                }
1054                break;
1055            } catch (ClassNotFoundException e) {
1056                goon = false;
1057                e.printStackTrace();
1058            }
1059            if (debug) {
1060                logger.info("received pair = " + pp);
1061            }
1062            H = null;
1063            if (pp == null) { // should not happen
1064                continue;
1065            }
1066            if (pp instanceof GBTransportMessEnd) {
1067                goon = false;
1068                //doEnd = false;
1069                continue;
1070            }
1071            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1072                pi = pj = null;
1073                if (pp instanceof GBTransportMessPair) { // obsolet, for tests
1074                    GBTransportMessPair<C> tmp = (GBTransportMessPair<C>) pp;
1075                    pair = (Pair<C>) tmp.pair;
1076                    if (pair != null) {
1077                        pi = pair.pi;
1078                        pj = pair.pj;
1079                        //logger.debug("pair: pix = " + pair.i 
1080                        //               + ", pjx = " + pair.j);
1081                    }
1082                }
1083                if (pp instanceof GBTransportMessPairIndex) {
1084                    GBTransportMessPairIndex tmpi = (GBTransportMessPairIndex)pp;
1085                    pix = tmpi.i;
1086                    pjx = tmpi.j;
1087                    //Integer sx = tmpi.s; // bug:-1; // last polynomial
1088                    //sp = theList.getWait(sx);
1089                    pi = theList.getWait(pix);
1090                    pj = theList.getWait(pjx);
1091                    //logger.info("pix = " + pix + ", pjx = " + pjx + ", sx = " + sx);
1092                }
1093                if (pi != null && pj != null) {
1094                    S = red.SPolynomial(pi, pj);
1095                    logger.info("ht(S) = " + S.leadingExpVector());
1096                    if (S.isZERO()) {
1097                        // pair.setZero(); does not work in dist
1098                    } else {
1099                        if (logger.isDebugEnabled()) {
1100                            logger.debug("ht(S) = " + S.leadingExpVector());
1101                        }
1102                        H = red.normalform(theList, S);
1103                        logger.info("ht(H) = " + H.leadingExpVector());
1104                        reduction++;
1105                        if (H.isZERO()) {
1106                            // pair.setZero(); does not work in dist
1107                        } else {
1108                            H = H.monic();
1109                            if (logger.isInfoEnabled()) {
1110                                logger.info("ht(H) = " + H.leadingExpVector());
1111                            }
1112                        }
1113                    }
1114                } else {
1115                    logger.info("pi = " + pi + ", pj = " + pj + ", sp = " + sp);
1116                }
1117            }
1118            if (pp instanceof GBTransportMess) {
1119                logger.debug("null pair results in null H poly");
1120            }
1121
1122            // send H or must send null, if not at end
1123            if (logger.isDebugEnabled()) {
1124                logger.debug("#distributed list = " + theList.size());
1125                logger.debug("send H polynomial = " + H);
1126            }
1127            try {
1128                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1129                doEnd = false;
1130            } catch (IOException e) {
1131                goon = false;
1132                e.printStackTrace();
1133            }
1134            logger.info("done send poly message of " + pp);
1135            try {
1136                //pp = pairChannel.receive(threadId);
1137                pp = pairChannel.receive(ackTag);
1138            } catch (InterruptedException e) {
1139                goon = false;
1140                e.printStackTrace();
1141            } catch (IOException e) {
1142                goon = false;
1143                if (logger.isDebugEnabled()) {
1144                    e.printStackTrace();
1145                }
1146                break;
1147            } catch (ClassNotFoundException e) {
1148                goon = false;
1149                e.printStackTrace();
1150            }
1151            if (!(pp instanceof GBTransportMess)) {
1152                logger.error("invalid acknowledgement " + pp);
1153            }
1154            logger.info("received acknowledgment " + pp);
1155        }
1156        logger.info("terminated, done " + reduction + " reductions");
1157        if (doEnd) {
1158            try {
1159                pairChannel.send(resultTag, new GBTransportMessEnd());
1160            } catch (IOException e) {
1161                //e.printStackTrace();
1162            }
1163            logger.info("terminated, send done");
1164        }
1165    }
1166}
1167
1168
1169/**
1170 * Objects of this class are to be send to a ExecutableServer.
1171 */
1172
1173class GBHybridExerClient<C extends RingElem<C>> implements RemoteExecutable {
1174
1175
1176    String host;
1177
1178
1179    int port;
1180
1181
1182    int dhtport;
1183
1184
1185    int threadsPerNode;
1186
1187
1188    /**
1189     * GBHybridExerClient.
1190     * @param host
1191     * @param port
1192     * @param dhtport
1193     */
1194    public GBHybridExerClient(String host, int threadsPerNode, int port, int dhtport) {
1195        this.host = host;
1196        this.threadsPerNode = threadsPerNode;
1197        this.port = port;
1198        this.dhtport = dhtport;
1199    }
1200
1201
1202    /**
1203     * run.
1204     */
1205    public void run() {
1206        try {
1207            GroebnerBaseDistributedHybridEC. <C>clientPart(host,threadsPerNode,port,dhtport);
1208        } catch (Exception e) {
1209            e.printStackTrace();
1210        }
1211    }
1212
1213
1214    /**
1215     * String representation.
1216     */
1217    @Override
1218    public String toString() {
1219        StringBuffer s = new StringBuffer("GBHybridExerClient(");
1220        s.append("host="+host);
1221        s.append(", threadsPerNode="+threadsPerNode);
1222        s.append(", port="+port);
1223        s.append(", dhtport="+dhtport);
1224        s.append(")");
1225        return s.toString();
1226    }
1227
1228}