001/*
002 * $Id$
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.Semaphore;
014
015import org.apache.logging.log4j.Logger;
016import org.apache.logging.log4j.LogManager;
017
018import edu.jas.kern.MPJEngine;
019import edu.jas.poly.ExpVector;
020import edu.jas.poly.GenPolynomial;
021import edu.jas.poly.GenPolynomialRing;
022import edu.jas.poly.PolyUtil;
023import edu.jas.structure.RingElem;
024import edu.jas.util.DistHashTableMPJ;
025import edu.jas.util.MPJChannel;
026import edu.jas.util.Terminator;
027import edu.jas.util.ThreadPool;
028
029import mpi.Comm;
030
031
032/**
033 * Groebner Base distributed algorithm with MPJ. Implements a distributed memory
034 * parallel version of Groebner bases. Using MPJ and pairlist class, distributed
035 * tasks do reduction.
036 * @param <C> coefficient type
037 * @author Heinz Kredel
038 */
039
040public class GroebnerBaseDistributedMPJ<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
041
042
043    private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedMPJ.class);
044
045
046    /**
047     * Number of threads to use.
048     */
049    protected final int threads;
050
051
052    /**
053     * Default number of threads.
054     */
055    public static final int DEFAULT_THREADS = 2;
056
057
058    /*
059     * Pool of threads to use. <b>Note:</b> No ComputerThreads for one node
060     * tests
061     */
062    protected transient final ThreadPool pool;
063
064
065    /*
066     * Underlying MPJ engine.
067     */
068    protected transient final Comm engine;
069
070
071    /**
072     * Constructor.
073     */
074    public GroebnerBaseDistributedMPJ() throws IOException {
075        this(DEFAULT_THREADS);
076    }
077
078
079    /**
080     * Constructor.
081     * @param threads number of threads to use.
082     */
083    public GroebnerBaseDistributedMPJ(int threads) throws IOException {
084        this(threads, new ThreadPool(threads));
085    }
086
087
088    /**
089     * Constructor.
090     * @param threads number of threads to use.
091     * @param pool ThreadPool to use.
092     */
093    public GroebnerBaseDistributedMPJ(int threads, ThreadPool pool) throws IOException {
094        this(threads, pool, new OrderedPairlist<C>());
095    }
096
097
098    /**
099     * Constructor.
100     * @param threads number of threads to use.
101     * @param pl pair selection strategy
102     */
103    public GroebnerBaseDistributedMPJ(int threads, PairList<C> pl) throws IOException {
104        this(threads, new ThreadPool(threads), pl);
105    }
106
107
108    /**
109     * Constructor.
110     * @param threads number of threads to use.
111     * @param pool ThreadPool to use.
112     * @param pl pair selection strategy
113     */
114    public GroebnerBaseDistributedMPJ(int threads, ThreadPool pool, PairList<C> pl) throws IOException {
115        super(new ReductionPar<C>(), pl);
116        this.engine = MPJEngine.getCommunicator();
117        int size = engine.Size();
118        if (size < 2) {
119            throw new IllegalArgumentException("Minimal 2 MPJ processes required, not " + size);
120        }
121        if (threads != size || pool.getNumber() != size) {
122            throw new IllegalArgumentException(
123                            "threads != size: " + threads + " != " + size + ", #pool " + pool.getNumber());
124        }
125        this.threads = threads;
126        this.pool = pool;
127    }
128
129
130    /**
131     * Cleanup and terminate ThreadPool.
132     */
133    @Override
134    public void terminate() {
135        if (pool == null) {
136            return;
137        }
138        //pool.terminate();
139        pool.cancel();
140    }
141
142
143    /**
144     * Distributed Groebner base.
145     * @param modv number of module variables.
146     * @param F polynomial list.
147     * @return GB(F) a Groebner base of F or null, if a IOException occurs or on
148     *         MPJ client part.
149     */
150    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
151        try {
152            if (engine.Rank() == 0) {
153                return GBmaster(modv, F);
154            }
155        } catch (IOException e) {
156            logger.info("GBmaster: " + e);
157            e.printStackTrace();
158            return null;
159        }
160        pool.terminate(); // not used on clients
161        try {
162            clientPart(0);
163        } catch (IOException e) {
164            logger.info("clientPart: " + e);
165            e.printStackTrace();
166        }
167        return null;
168    }
169
170
171    /**
172     * Distributed Groebner base, part for MPJ master.
173     * @param modv number of module variables.
174     * @param F polynomial list.
175     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
176     */
177    List<GenPolynomial<C>> GBmaster(int modv, List<GenPolynomial<C>> F) throws IOException {
178        List<GenPolynomial<C>> G = normalizeZerosOnes(F);
179        G = PolyUtil.<C> monic(G);
180        if (G.size() <= 1) {
181            //return G; 
182        }
183        if (G.isEmpty()) {
184            throw new IllegalArgumentException("empty F / zero ideal not allowed");
185        }
186        GenPolynomialRing<C> ring = G.get(0).ring;
187        if (!ring.coFac.isField()) {
188            throw new IllegalArgumentException("coefficients not from a field");
189        }
190        PairList<C> pairlist = strategy.create(modv, ring);
191        pairlist.put(G);
192
193        /*
194        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
195        GenPolynomial<C> p;
196        PairList<C> pairlist = null;
197        boolean oneInGB = false;
198        int l = F.size();
199        int unused = 0;
200        ListIterator<GenPolynomial<C>> it = F.listIterator();
201        while (it.hasNext()) {
202            p = it.next();
203            if (p.length() > 0) {
204                p = p.monic();
205                if (p.isONE()) {
206                    oneInGB = true;
207                    G.clear();
208                    G.add(p);
209                    //return G; must signal termination to others
210                }
211                if (!oneInGB) {
212                    G.add(p);
213                }
214                if (pairlist == null) {
215                    pairlist = strategy.create(modv, p.ring);
216                    if (!p.ring.coFac.isField()) {
217                        throw new IllegalArgumentException("coefficients not from a field");
218                    }
219                }
220                // theList not updated here
221                if (p.isONE()) {
222                    unused = pairlist.putOne();
223                } else {
224                    unused = pairlist.put(p);
225                }
226            } else {
227                l--;
228            }
229        }
230        //if (l <= 1) {
231        //return G; must signal termination to others
232        //}
233        */
234        logger.info("done pairlist, initialize DHT: " + pairlist);
235
236        DistHashTableMPJ<Integer, GenPolynomial<C>> theList = new DistHashTableMPJ<Integer, GenPolynomial<C>>(
237                        engine);
238        theList.init();
239        //logger.info("done DHT: " + theList);
240
241        List<GenPolynomial<C>> al = pairlist.getList();
242        for (int i = 0; i < al.size(); i++) {
243            // no wait required
244            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
245            if (nn != null) {
246                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
247            }
248        }
249
250        Terminator fin = new Terminator(threads - 1);
251        MPJReducerServer<C> R;
252        for (int i = 1; i < threads; i++) {
253            logger.debug("addJob " + i + " of " + threads);
254            MPJChannel chan = new MPJChannel(engine, i); // closed in server
255            R = new MPJReducerServer<C>(i, fin, chan, theList, pairlist);
256            pool.addJob(R);
257        }
258        logger.debug("main loop waiting");
259        fin.waitDone();
260        int ps = theList.size();
261        logger.info("#distributed list = " + ps);
262        // make sure all polynomials arrived: not needed in master
263        // G = (ArrayList)theList.values();
264        G = pairlist.getList();
265        if (ps != G.size()) {
266            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
267        }
268        long time = System.currentTimeMillis();
269        List<GenPolynomial<C>> Gp = minimalGB(G); // not jet distributed but threaded
270        time = System.currentTimeMillis() - time;
271        logger.debug("parallel gbmi = " + time);
272        G = Gp;
273        logger.info("theList.terminate()");
274        theList.terminate();
275        logger.info("end" + pairlist);
276        return G;
277    }
278
279
280    /**
281     * GB distributed client.
282     * @param rank of the MPJ where the server runs on.
283     * @throws IOException
284     */
285    public void clientPart(int rank) throws IOException {
286        if (rank != 0) {
287            throw new UnsupportedOperationException("only master at rank 0 implemented: " + rank);
288        }
289        Comm engine = MPJEngine.getCommunicator();
290
291        DistHashTableMPJ<Integer, GenPolynomial<C>> theList = new DistHashTableMPJ<Integer, GenPolynomial<C>>();
292        theList.init();
293
294        MPJChannel chan = new MPJChannel(engine, rank);
295
296        MPJReducerClient<C> R = new MPJReducerClient<C>(chan, theList);
297        R.run();
298
299        chan.close();
300        theList.terminate();
301        return;
302    }
303
304
305    /**
306     * Minimal ordered groebner basis.
307     * @param Fp a Groebner base.
308     * @return a reduced Groebner base of Fp.
309     */
310    @SuppressWarnings("unchecked")
311    @Override
312    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
313        GenPolynomial<C> a;
314        ArrayList<GenPolynomial<C>> G;
315        G = new ArrayList<GenPolynomial<C>>(Fp.size());
316        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
317        while (it.hasNext()) {
318            a = it.next();
319            if (a.length() != 0) { // always true
320                // already monic  a = a.monic();
321                G.add(a);
322            }
323        }
324        if (G.size() <= 1) {
325            return G;
326        }
327
328        ExpVector e;
329        ExpVector f;
330        GenPolynomial<C> p;
331        ArrayList<GenPolynomial<C>> F;
332        F = new ArrayList<GenPolynomial<C>>(G.size());
333        boolean mt;
334
335        while (G.size() > 0) {
336            a = G.remove(0);
337            e = a.leadingExpVector();
338
339            it = G.listIterator();
340            mt = false;
341            while (it.hasNext() && !mt) {
342                p = it.next();
343                f = p.leadingExpVector();
344                mt = e.multipleOf(f);
345            }
346            it = F.listIterator();
347            while (it.hasNext() && !mt) {
348                p = it.next();
349                f = p.leadingExpVector();
350                mt = e.multipleOf(f);
351            }
352            if (!mt) {
353                F.add(a);
354            } else {
355                // System.out.println("dropped " + a.length());
356            }
357        }
358        G = F;
359        if (G.size() <= 1) {
360            return G;
361        }
362        Collections.reverse(G); // important for lex GB
363
364        MiMPJReducerServer<C>[] mirs = (MiMPJReducerServer<C>[]) new MiMPJReducerServer[G.size()];
365        int i = 0;
366        F = new ArrayList<GenPolynomial<C>>(G.size());
367        while (G.size() > 0) {
368            a = G.remove(0);
369            // System.out.println("doing " + a.length());
370            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
371            R.addAll(G);
372            R.addAll(F);
373            mirs[i] = new MiMPJReducerServer<C>(R, a);
374            pool.addJob(mirs[i]);
375            i++;
376            F.add(a);
377        }
378        G = F;
379        F = new ArrayList<GenPolynomial<C>>(G.size());
380        for (i = 0; i < mirs.length; i++) {
381            a = mirs[i].getNF();
382            F.add(a);
383        }
384        return F;
385    }
386
387}
388
389
390/**
391 * Distributed server reducing worker threads.
392 * @param <C> coefficient type
393 */
394
395class MPJReducerServer<C extends RingElem<C>> implements Runnable {
396
397
398    /*
399     * Termination detection coordinator.
400     */
401    private final Terminator finaler;
402
403
404    /*
405     * Underlying MPJ engine.
406     */
407    //protected transient final Comm engine;
408
409
410    /*
411     * MPJ channel.
412     */
413    private final MPJChannel pairChannel;
414
415
416    /*
417     * GB rank.
418     */
419    final int rank;
420
421
422    /*
423     * Distributed HashTable of polynomials.
424     */
425    private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList;
426
427
428    /*
429     * Critical pair list of polynomials.
430     */
431    private final PairList<C> pairlist;
432
433
434    private static final Logger logger = LogManager.getLogger(MPJReducerServer.class);
435
436
437    /**
438     * Constructor.
439     * @param r MPJ rank of partner.
440     * @param fin termination coordinator to use.
441     * @param c MPJ channel to use.
442     * @param dl DHT to use.
443     * @param L pair selection strategy
444     */
445    MPJReducerServer(int r, Terminator fin, MPJChannel c, DistHashTableMPJ<Integer, GenPolynomial<C>> dl,
446                    PairList<C> L) {
447        rank = r;
448        finaler = fin;
449        //engine = e;
450        theList = dl;
451        pairlist = L;
452        pairChannel = c;
453        logger.debug("reducer server constructor: "); // + r);
454    }
455
456
457    /**
458     * Main method.
459     */
460    @SuppressWarnings("unchecked")
461    public void run() {
462        logger.debug("reducer server running: "); // + this);
463        // try {
464        //     pairChannel = new MPJChannel(engine, rank);
465        // } catch (IOException e) {
466        //     e.printStackTrace();
467        //     return;
468        // }
469        if (logger.isInfoEnabled()) {
470            logger.info("reducer server running: pairChannel = " + pairChannel);
471        }
472        Pair<C> pair;
473        GenPolynomial<C> H = null;
474        boolean set = false;
475        boolean goon = true;
476        int polIndex = -1;
477        int red = 0;
478        int sleeps = 0;
479
480        // while more requests
481        while (goon) {
482            // receive request
483            logger.debug("receive request");
484            Object req = null;
485            try {
486                req = pairChannel.receive();
487            } catch (IOException e) {
488                goon = false;
489                e.printStackTrace();
490            } catch (ClassNotFoundException e) {
491                goon = false;
492                e.printStackTrace();
493            }
494            //logger.debug("received request, req = " + req);
495            if (req == null) {
496                goon = false;
497                break;
498            }
499            if (!(req instanceof GBTransportMessReq)) {
500                goon = false;
501                break;
502            }
503
504            // find pair
505            logger.debug("find pair");
506            while (!pairlist.hasNext()) { // wait
507                if (!set) {
508                    finaler.beIdle();
509                    set = true;
510                }
511                if (!finaler.hasJobs() && !pairlist.hasNext()) {
512                    goon = false;
513                    break;
514                }
515                try {
516                    sleeps++;
517                    if (sleeps % 10 == 0) {
518                        logger.info(" reducer is sleeping");
519                    }
520                    Thread.sleep(100);
521                } catch (InterruptedException e) {
522                    goon = false;
523                    break;
524                }
525            }
526            if (!pairlist.hasNext() && !finaler.hasJobs()) {
527                goon = false;
528                break; //continue; //break?
529            }
530            if (set) {
531                set = false;
532                finaler.notIdle();
533            }
534
535            pair = pairlist.removeNext();
536            /*
537             * send pair to client, receive H
538             */
539            logger.debug("send pair = " + pair);
540            GBTransportMess msg = null;
541            if (pair != null) {
542                msg = new GBTransportMessPairIndex(pair);
543            } else {
544                msg = new GBTransportMess(); //End();
545                // goon ?= false;
546            }
547            try {
548                pairChannel.send(msg);
549            } catch (IOException e) {
550                e.printStackTrace();
551                goon = false;
552                break;
553            }
554            logger.debug("#distributed list = " + theList.size());
555            Object rh = null;
556            try {
557                rh = pairChannel.receive();
558            } catch (IOException e) {
559                e.printStackTrace();
560                goon = false;
561                break;
562            } catch (ClassNotFoundException e) {
563                e.printStackTrace();
564                goon = false;
565                break;
566            }
567            //logger.debug("received H polynomial");
568            if (rh == null) {
569                if (pair != null) {
570                    pair.setZero();
571                }
572            } else if (rh instanceof GBTransportMessPoly) {
573                // update pair list
574                red++;
575                H = ((GBTransportMessPoly<C>) rh).pol;
576                if (logger.isDebugEnabled()) {
577                    logger.debug("H = " + H);
578                }
579                if (H == null) {
580                    if (pair != null) {
581                        pair.setZero();
582                    }
583                } else {
584                    if (H.isZERO()) {
585                        pair.setZero();
586                    } else {
587                        if (H.isONE()) {
588                            polIndex = pairlist.putOne();
589                            //GenPolynomial<C> nn = 
590                            theList.putWait(Integer.valueOf(polIndex), H);
591                            goon = false;
592                            break;
593                        }
594                        polIndex = pairlist.put(H);
595                        // use putWait ? but still not all distributed
596                        //GenPolynomial<C> nn = 
597                        theList.putWait(Integer.valueOf(polIndex), H);
598                    }
599                }
600            }
601        }
602        logger.info("terminated, done " + red + " reductions");
603
604        /*
605         * send end mark to client
606         */
607        logger.debug("send end");
608        try {
609            pairChannel.send(new GBTransportMessEnd());
610        } catch (IOException e) {
611            if (logger.isDebugEnabled()) {
612                e.printStackTrace();
613            }
614        }
615        finaler.beIdle();
616        pairChannel.close();
617    }
618
619}
620
621
622/**
623 * Distributed clients reducing worker threads.
624 */
625
626class MPJReducerClient<C extends RingElem<C>> implements Runnable {
627
628
629    private final MPJChannel pairChannel;
630
631
632    private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList;
633
634
635    private final ReductionPar<C> red;
636
637
638    private static final Logger logger = LogManager.getLogger(MPJReducerClient.class);
639
640
641    /**
642     * Constructor.
643     * @param pc MPJ communication channel.
644     * @param dl DHT to use.
645     */
646    MPJReducerClient(MPJChannel pc, DistHashTableMPJ<Integer, GenPolynomial<C>> dl) {
647        pairChannel = pc;
648        theList = dl;
649        red = new ReductionPar<C>();
650    }
651
652
653    /**
654     * Main run method.
655     */
656    @SuppressWarnings("unchecked")
657    public void run() {
658        logger.debug("reducer client running");
659        Pair<C> pair = null;
660        GenPolynomial<C> pi, pj, ps;
661        GenPolynomial<C> S;
662        GenPolynomial<C> H = null;
663        //boolean set = false;
664        boolean goon = true;
665        int reduction = 0;
666        //int sleeps = 0;
667        Integer pix, pjx, psx;
668
669        while (goon) {
670            /* protocol:
671             * request pair, process pair, send result
672             */
673            // pair = (Pair) pairlist.removeNext();
674            Object req = new GBTransportMessReq();
675            logger.debug("send request");
676            try {
677                pairChannel.send(req);
678            } catch (IOException e) {
679                goon = false;
680                e.printStackTrace();
681                break;
682            }
683            logger.debug("receive pair, goon");
684            Object pp = null;
685            try {
686                pp = pairChannel.receive();
687            } catch (IOException e) {
688                goon = false;
689                if (logger.isDebugEnabled()) {
690                    e.printStackTrace();
691                }
692                break;
693            } catch (ClassNotFoundException e) {
694                goon = false;
695                e.printStackTrace();
696            }
697            if (logger.isDebugEnabled()) {
698                logger.debug("received pair = " + pp);
699            }
700            H = null;
701            if (pp == null) { // should not happen
702                continue;
703            }
704            if (pp instanceof GBTransportMessEnd) {
705                goon = false;
706                continue;
707            }
708            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
709                pi = pj = ps = null;
710                if (pp instanceof GBTransportMessPair) {
711                    pair = ((GBTransportMessPair<C>) pp).pair;
712                    if (pair != null) {
713                        pi = pair.pi;
714                        pj = pair.pj;
715                        //logger.debug("pair: pix = " + pair.i 
716                        //               + ", pjx = " + pair.j);
717                    }
718                }
719                if (pp instanceof GBTransportMessPairIndex) {
720                    pix = ((GBTransportMessPairIndex) pp).i;
721                    pjx = ((GBTransportMessPairIndex) pp).j;
722                    psx = ((GBTransportMessPairIndex) pp).s;
723                    pi = theList.getWait(pix);
724                    pj = theList.getWait(pjx);
725                    ps = theList.getWait(psx);
726                }
727
728                if (pi != null && pj != null) {
729                    S = red.SPolynomial(pi, pj);
730                    //System.out.println("S   = " + S);
731                    if (S.isZERO()) {
732                        // pair.setZero(); does not work in dist
733                    } else {
734                        if (logger.isDebugEnabled()) {
735                            logger.info("ht(S) = " + S.leadingExpVector());
736                        }
737                        H = red.normalform(theList, S);
738                        reduction++;
739                        if (H.isZERO()) {
740                            // pair.setZero(); does not work in dist
741                        } else {
742                            H = H.monic();
743                            if (logger.isInfoEnabled()) {
744                                logger.info("ht(H) = " + H.leadingExpVector());
745                            }
746                        }
747                    }
748                } else {
749                    logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps);
750                }
751            }
752
753            // send H or must send null
754            if (logger.isDebugEnabled()) {
755                logger.debug("#distributed list = " + theList.size());
756                logger.debug("send H polynomial = " + H);
757            }
758            try {
759                pairChannel.send(new GBTransportMessPoly<C>(H));
760            } catch (IOException e) {
761                goon = false;
762                e.printStackTrace();
763            }
764        }
765        logger.info("terminated, done " + reduction + " reductions");
766        pairChannel.close();
767    }
768}
769
770
771/**
772 * Distributed server reducing worker threads for minimal GB Not jet distributed
773 * but threaded.
774 */
775
776class MiMPJReducerServer<C extends RingElem<C>> implements Runnable {
777
778
779    private final List<GenPolynomial<C>> G;
780
781
782    private GenPolynomial<C> H;
783
784
785    private final Semaphore done = new Semaphore(0);
786
787
788    private final Reduction<C> red;
789
790
791    private static final Logger logger = LogManager.getLogger(MiMPJReducerServer.class);
792
793
794    /**
795     * Constructor.
796     * @param G polynomial list.
797     * @param p polynomial.
798     */
799    @SuppressWarnings("unchecked")
800    MiMPJReducerServer(List<GenPolynomial<C>> G, GenPolynomial<C> p) {
801        this.G = G;
802        H = p;
803        red = new ReductionPar<C>();
804    }
805
806
807    /**
808     * getNF. Blocks until the normal form is computed.
809     * @return the computed normal form.
810     */
811    public GenPolynomial<C> getNF() {
812        try {
813            done.acquire(); //done.P();
814        } catch (InterruptedException e) {
815        }
816        return H;
817    }
818
819
820    /**
821     * Main run method.
822     */
823    public void run() {
824        if (logger.isDebugEnabled()) {
825            logger.debug("ht(H) = " + H.leadingExpVector());
826        }
827        H = red.normalform(G, H); //mod
828        done.release(); //done.V();
829        if (logger.isDebugEnabled()) {
830            logger.debug("ht(H) = " + H.leadingExpVector());
831        }
832        // H = H.monic();
833    }
834}