001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007
008import java.io.FileNotFoundException;
009import java.io.IOException;
010import java.util.LinkedList;
011
012import org.apache.logging.log4j.Logger;
013import org.apache.logging.log4j.LogManager; 
014
015
016/**
017 * Distributed thread pool. Using stack / list work-pile and Executable Channels
018 * and Servers.
019 * @author Heinz Kredel
020 */
021
022public class DistThreadPool /*extends ThreadPool*/{
023
024
025    /**
026     * machine file to use.
027     */
028    private final String mfile;
029
030
031    /**
032     * default machine file for test.
033     */
034    private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE;
035
036
037    /**
038     * Number of threads to use.
039     */
040    protected final int threads;
041
042
043    /**
044     * Default number of threads to use.
045     */
046    static final int DEFAULT_SIZE = 3;
047
048
049    /**
050     * Channels to remote executable servers.
051     */
052    final ExecutableChannels ec;
053
054
055    /**
056     * Array of workers.
057     */
058    protected DistPoolThread[] workers;
059
060
061    /**
062     * Number of idle workers.
063     */
064    protected int idleworkers = 0;
065
066
067    /**
068     * Work queue / stack.
069     */
070    // should be expressed using strategy pattern
071    // List or Collection is not appropriate
072    // LIFO strategy for recursion
073    protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
074
075
076    protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
077
078
079    private static final Logger logger = LogManager.getLogger(DistThreadPool.class);
080
081
082    private static final boolean debug = true; //logger.isDebugEnabled();
083
084
085    /**
086     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO
087     * and size DEFAULT_SIZE.
088     */
089    public DistThreadPool() {
090        this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null);
091    }
092
093
094    /**
095     * Constructs a new DistThreadPool with size DEFAULT_SIZE.
096     * @param strategy for job processing.
097     */
098    public DistThreadPool(StrategyEnumeration strategy) {
099        this(strategy, DEFAULT_SIZE, null);
100    }
101
102
103    /**
104     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
105     * @param size of the pool.
106     */
107    public DistThreadPool(int size) {
108        this(StrategyEnumeration.FIFO, size, null);
109    }
110
111
112    /**
113     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
114     * @param size of the pool.
115     * @param mfile machine file.
116     */
117    public DistThreadPool(int size, String mfile) {
118        this(StrategyEnumeration.FIFO, size, mfile);
119    }
120
121
122    /**
123     * Constructs a new DistThreadPool.
124     * @param strategy for job processing.
125     * @param size of the pool.
126     * @param mfile machine file.
127     */
128    public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) {
129        this.strategy = strategy;
130        if (size < 0) {
131            this.threads = 0;
132        } else {
133            this.threads = size;
134        }
135        if (mfile == null || mfile.length() == 0) {
136            this.mfile = DEFAULT_MFILE;
137        } else {
138            this.mfile = mfile;
139        }
140        jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
141        try {
142            ec = new ExecutableChannels(this.mfile);
143        } catch (FileNotFoundException e) {
144            e.printStackTrace();
145            throw new IllegalArgumentException("DistThreadPool " + e);
146        }
147        if (debug) {
148            logger.info("ec = {}", ec);
149        }
150        try {
151            ec.open(threads);
152        } catch (IOException e) {
153            e.printStackTrace();
154            throw new IllegalArgumentException("DistThreadPool " + e);
155        }
156        if (debug) {
157            logger.info("ec = {}", ec);
158        }
159        workers = new DistPoolThread[0];
160    }
161
162
163    /**
164     * String representation.
165     */
166    @Override
167    public String toString() {
168        StringBuffer s = new StringBuffer("DistThreadPool(");
169        s.append("threads="+threads);
170        s.append(", strategy="+strategy);
171        s.append(", exchan="+ec);
172        s.append(", workers="+workers.length);
173        s.append(")");
174        return s.toString();
175    }
176
177
178    /**
179     * thread initialization and start.
180     */
181    public void init() {
182        if (workers == null || workers.length == 0) {
183            workers = new DistPoolThread[threads];
184            for (int i = 0; i < workers.length; i++) {
185                workers[i] = new DistPoolThread(this, ec, i);
186                workers[i].start();
187            }
188            logger.info("init: {}", this);
189        }
190    }
191
192
193    /**
194     * number of worker threads.
195     */
196    public int getNumber() {
197        if (workers == null || workers.length < threads) {
198            init(); // start threads
199        }
200        return workers.length; // not null
201    }
202
203
204    /**
205     * get used strategy.
206     */
207    public StrategyEnumeration getStrategy() {
208        return strategy;
209    }
210
211
212    /**
213     * the used executable channel.
214     */
215    public ExecutableChannels getEC() {
216        return ec; // not null
217    }
218
219
220    /**
221     * Terminates the threads.
222     * @param shutDown true, if shut-down of the remote executable servers is
223     *            requested, false, if remote executable servers stay alive.
224     */
225    public void terminate(boolean shutDown) {
226        if (shutDown) {
227            logger.info("shutdown = {}", this);
228            ShutdownRequest sdr = new ShutdownRequest();
229            for (int i = 0; i < workers.length; i++) {
230                addJob(sdr);
231            }
232            try {
233                Thread.sleep(20);
234            } catch (InterruptedException e) {
235                Thread.currentThread().interrupt();
236            }
237            logger.info("remaining jobs = {}", jobstack.size());
238            try {
239                for (int i = 0; i < workers.length; i++) {
240                    while (workers[i].isAlive()) {
241                        workers[i].interrupt();
242                        workers[i].join(100);
243                    }
244                }
245            } catch (InterruptedException e) {
246                Thread.currentThread().interrupt();
247            }
248            ec.close();
249        } else {
250            terminate();
251        }
252        logger.info("terminated = {}", this);
253    }
254
255
256    /**
257     * Terminates the threads.
258     */
259    public void terminate() {
260        logger.info("terminate = {}", this);
261        while (hasJobs()) {
262            try {
263                Thread.sleep(100);
264            } catch (InterruptedException e) {
265                Thread.currentThread().interrupt();
266            }
267        }
268        for (int i = 0; i < workers.length; i++) {
269            try {
270                while (workers[i].isAlive()) {
271                    workers[i].interrupt();
272                    workers[i].join(100);
273                }
274            } catch (InterruptedException e) {
275                Thread.currentThread().interrupt();
276            }
277        }
278        ec.close();
279        logger.info("terminated = {}", this);
280    }
281
282
283    /**
284     * adds a job to the workpile.
285     * @param job
286     */
287    public synchronized void addJob(Runnable job) {
288        if (workers == null || workers.length < threads) {
289            init(); // start threads
290        }
291        jobstack.addLast(job);
292        logger.debug("adding job");
293        if (idleworkers > 0) {
294            logger.debug("notifying a jobless worker");
295            notifyAll(); // findbugs
296        }
297    }
298
299
300    /**
301     * get a job for processing.
302     */
303    protected synchronized Runnable getJob() throws InterruptedException {
304        while (jobstack.isEmpty()) {
305            idleworkers++;
306            logger.debug("waiting");
307            wait();
308            idleworkers--;
309        }
310        // is expressed using strategy enumeration
311        if (strategy == StrategyEnumeration.LIFO) {
312            return jobstack.removeLast(); // LIFO
313        }
314        return jobstack.removeFirst(); // FIFO
315    }
316
317
318    /**
319     * check if there are jobs for processing.
320     */
321    public boolean hasJobs() {
322        if (jobstack.size() > 0) {
323            return true;
324        }
325        for (int i = 0; i < workers.length; i++) {
326            if (workers[i].working) {
327                return true;
328            }
329        }
330        return false;
331    }
332
333
334    /**
335     * check if there are more than n jobs for processing.
336     * @param n Integer
337     * @return true, if there are possibly more than n jobs.
338     */
339    public boolean hasJobs(int n) {
340        int j = jobstack.size();
341        if (j > 0 && (j + workers.length > n)) {
342            return true;
343            // if j > 0 no worker should be idle
344            // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
345        }
346        int x = 0;
347        for (int i = 0; i < workers.length; i++) {
348            if (workers[i].working) {
349                x++;
350            }
351        }
352        if ((j + x) > n) {
353            return true;
354        }
355        return false;
356    }
357
358}
359
360
361/**
362 * Implements a shutdown task.
363 */
364class ShutdownRequest implements Runnable {
365
366
367    /**
368     * Run the thread.
369     */
370    public void run() {
371        System.out.println("running ShutdownRequest");
372    }
373
374
375    /**
376     * toString.
377     * @see java.lang.Object#toString()
378     */
379    @Override
380    public String toString() {
381        return "ShutdownRequest";
382    }
383
384}
385
386
387/**
388 * Implements one local part of the distributed thread.
389 */
390class DistPoolThread extends Thread {
391
392
393    final DistThreadPool pool;
394
395
396    final ExecutableChannels ec;
397
398
399    final int myId;
400
401
402    private static final Logger logger = LogManager.getLogger(DistPoolThread.class);
403
404
405    private static final boolean debug = logger.isDebugEnabled();
406
407
408    boolean working = false;
409
410
411    /**
412     * @param pool DistThreadPool.
413     */
414    public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) {
415        this.pool = pool;
416        this.ec = ec;
417        myId = i;
418    }
419
420
421    /**
422     * Run the thread.
423     */
424    @Override
425    public void run() {
426        logger.info("ready, myId = {}", myId);
427        Runnable job;
428        int done = 0;
429        long time = 0;
430        long t;
431        boolean running = true;
432        while (running) {
433            try {
434                logger.debug("looking for a job");
435                job = pool.getJob();
436                working = true;
437                if (debug) {
438                    logger.info("working {} on {}", myId, job);
439                }
440                t = System.currentTimeMillis();
441                // send and wait, like rmi
442                try {
443                    if (job instanceof ShutdownRequest) {
444                        ec.send(myId, ExecutableServer.STOP);
445                    } else {
446                        ec.send(myId, job);
447                    }
448                    if (debug) {
449                        logger.info("send {} at {} send job {}", myId, ec, job);
450                    }
451                } catch (IOException e) {
452                    e.printStackTrace();
453                    logger.info("error send {} at {} e = {}", myId, ec, e);
454                    working = false;
455                }
456                // remote: job.run(); 
457                Object o = null;
458                try {
459                    if (working) {
460                        logger.info("waiting {} on {}", myId, job);
461                        o = ec.receive(myId);
462                        if (debug) {
463                            logger.info("receive {} at {} send job {} received {}", myId, ec, job, o);
464                        }
465                    }
466                } catch (IOException e) {
467                    logger.info("receive exception {} send job {}, e = {}", myId, job, e);
468                    //e.printStackTrace();
469                    running = false;
470                } catch (ClassNotFoundException e) {
471                    logger.info("receive exception {} send job {}, e = {}", myId, job, e);
472                    //e.printStackTrace();
473                    running = false;
474                } finally {
475                    if (debug) {
476                        logger.info("receive finally {} at {} send job {} received {} running {}",
477                                    myId, ec, job, o, running);
478                    }
479                }
480                working = false;
481                time += System.currentTimeMillis() - t;
482                done++;
483                if (debug) {
484                    logger.info("done {} with {}", myId, o);
485                }
486            } catch (InterruptedException e) {
487                running = false;
488                Thread.currentThread().interrupt();
489            }
490        }
491        logger.info("terminated {}, done {} jobs in {} milliseconds", myId, done, time);
492    }
493
494}