001/*
002 * $Id: DistThreadPool.java 4593 2013-08-21 17:24:54Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.FileNotFoundException;
009import java.io.IOException;
010import java.util.LinkedList;
011
012import org.apache.log4j.Logger;
013
014
015/**
016 * Distributed thread pool. Using stack / list work-pile and Executable Channels
017 * and Servers.
018 * @author Heinz Kredel
019 */
020
021public class DistThreadPool /*extends ThreadPool*/{
022
023
024    /**
025     * machine file to use.
026     */
027    private final String mfile;
028
029
030    /**
031     * default machine file for test.
032     */
033    private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE;
034
035
036    /**
037     * Number of threads to use.
038     */
039    protected final int threads;
040
041
042    /**
043     * Default number of threads to use.
044     */
045    static final int DEFAULT_SIZE = 3;
046
047
048    /**
049     * Channels to remote executable servers.
050     */
051    final ExecutableChannels ec;
052
053
054    /**
055     * Array of workers.
056     */
057    protected DistPoolThread[] workers;
058
059
060    /**
061     * Number of idle workers.
062     */
063    protected int idleworkers = 0;
064
065
066    /**
067     * Work queue / stack.
068     */
069    // should be expressed using strategy pattern
070    // List or Collection is not appropriate
071    // LIFO strategy for recursion
072    protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
073
074
075    protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
076
077
078    private static final Logger logger = Logger.getLogger(DistThreadPool.class);
079
080
081    private final boolean debug = true; //logger.isDebugEnabled();
082
083
084    /**
085     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO
086     * and size DEFAULT_SIZE.
087     */
088    public DistThreadPool() {
089        this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null);
090    }
091
092
093    /**
094     * Constructs a new DistThreadPool with size DEFAULT_SIZE.
095     * @param strategy for job processing.
096     */
097    public DistThreadPool(StrategyEnumeration strategy) {
098        this(strategy, DEFAULT_SIZE, null);
099    }
100
101
102    /**
103     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
104     * @param size of the pool.
105     */
106    public DistThreadPool(int size) {
107        this(StrategyEnumeration.FIFO, size, null);
108    }
109
110
111    /**
112     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
113     * @param size of the pool.
114     * @param mfile machine file.
115     */
116    public DistThreadPool(int size, String mfile) {
117        this(StrategyEnumeration.FIFO, size, mfile);
118    }
119
120
121    /**
122     * Constructs a new DistThreadPool.
123     * @param strategy for job processing.
124     * @param size of the pool.
125     * @param mfile machine file.
126     */
127    public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) {
128        this.strategy = strategy;
129        if (size < 0) {
130            this.threads = 0;
131        } else {
132            this.threads = size;
133        }
134        if (mfile == null || mfile.length() == 0) {
135            this.mfile = DEFAULT_MFILE;
136        } else {
137            this.mfile = mfile;
138        }
139        jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
140        try {
141            ec = new ExecutableChannels(this.mfile);
142        } catch (FileNotFoundException e) {
143            e.printStackTrace();
144            throw new IllegalArgumentException("DistThreadPool " + e);
145        }
146        if (debug) {
147            logger.info("ec = " + ec);
148        }
149        try {
150            ec.open(threads);
151        } catch (IOException e) {
152            e.printStackTrace();
153            throw new IllegalArgumentException("DistThreadPool " + e);
154        }
155        if (debug) {
156            logger.info("ec = " + ec);
157        }
158        workers = new DistPoolThread[0];
159    }
160
161
162    /**
163     * String representation.
164     */
165    @Override
166    public String toString() {
167        StringBuffer s = new StringBuffer("DistThreadPool(");
168        s.append("threads="+threads);
169        s.append(", strategy="+strategy);
170        s.append(", exchan="+ec);
171        s.append(", workers="+workers.length);
172        s.append(")");
173        return s.toString();
174    }
175
176
177    /**
178     * thread initialization and start.
179     */
180    public void init() {
181        if (workers == null || workers.length == 0) {
182            workers = new DistPoolThread[threads];
183            for (int i = 0; i < workers.length; i++) {
184                workers[i] = new DistPoolThread(this, ec, i);
185                workers[i].start();
186            }
187            logger.info("init: " + this.toString());
188        }
189    }
190
191
192    /**
193     * number of worker threads.
194     */
195    public int getNumber() {
196        if (workers == null || workers.length < threads) {
197            init(); // start threads
198        }
199        return workers.length; // not null
200    }
201
202
203    /**
204     * get used strategy.
205     */
206    public StrategyEnumeration getStrategy() {
207        return strategy;
208    }
209
210
211    /**
212     * the used executable channel.
213     */
214    public ExecutableChannels getEC() {
215        return ec; // not null
216    }
217
218
219    /**
220     * Terminates the threads.
221     * @param shutDown true, if shut-down of the remote executable servers is
222     *            requested, false, if remote executable servers stay alive.
223     */
224    public void terminate(boolean shutDown) {
225        if (shutDown) {
226            ShutdownRequest sdr = new ShutdownRequest();
227            for (int i = 0; i < workers.length; i++) {
228                addJob(sdr);
229            }
230            try {
231                Thread.sleep(20);
232            } catch (InterruptedException e) {
233                Thread.currentThread().interrupt();
234            }
235            logger.info("remaining jobs = " + jobstack.size());
236            try {
237                for (int i = 0; i < workers.length; i++) {
238                    while (workers[i].isAlive()) {
239                        workers[i].interrupt();
240                        workers[i].join(100);
241                    }
242                }
243            } catch (InterruptedException e) {
244                Thread.currentThread().interrupt();
245            }
246        } else {
247            terminate();
248        }
249    }
250
251
252    /**
253     * Terminates the threads.
254     */
255    public void terminate() {
256        while (hasJobs()) {
257            try {
258                Thread.sleep(100);
259            } catch (InterruptedException e) {
260                Thread.currentThread().interrupt();
261            }
262        }
263        for (int i = 0; i < workers.length; i++) {
264            try {
265                while (workers[i].isAlive()) {
266                    workers[i].interrupt();
267                    workers[i].join(100);
268                }
269            } catch (InterruptedException e) {
270                Thread.currentThread().interrupt();
271            }
272        }
273        ec.close();
274    }
275
276
277    /**
278     * adds a job to the workpile.
279     * @param job
280     */
281    public synchronized void addJob(Runnable job) {
282        if (workers == null || workers.length < threads) {
283            init(); // start threads
284        }
285        jobstack.addLast(job);
286        logger.debug("adding job");
287        if (idleworkers > 0) {
288            logger.debug("notifying a jobless worker");
289            notifyAll(); // findbugs
290        }
291    }
292
293
294    /**
295     * get a job for processing.
296     */
297    protected synchronized Runnable getJob() throws InterruptedException {
298        while (jobstack.isEmpty()) {
299            idleworkers++;
300            logger.debug("waiting");
301            wait();
302            idleworkers--;
303        }
304        // is expressed using strategy enumeration
305        if (strategy == StrategyEnumeration.LIFO) {
306            return jobstack.removeLast(); // LIFO
307        }
308        return jobstack.removeFirst(); // FIFO
309    }
310
311
312    /**
313     * check if there are jobs for processing.
314     */
315    public boolean hasJobs() {
316        if (jobstack.size() > 0) {
317            return true;
318        }
319        for (int i = 0; i < workers.length; i++) {
320            if (workers[i].working) {
321                return true;
322            }
323        }
324        return false;
325    }
326
327
328    /**
329     * check if there are more than n jobs for processing.
330     * @param n Integer
331     * @return true, if there are possibly more than n jobs.
332     */
333    public boolean hasJobs(int n) {
334        int j = jobstack.size();
335        if (j > 0 && (j + workers.length > n)) {
336            return true;
337            // if j > 0 no worker should be idle
338            // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
339        }
340        int x = 0;
341        for (int i = 0; i < workers.length; i++) {
342            if (workers[i].working) {
343                x++;
344            }
345        }
346        if ((j + x) > n) {
347            return true;
348        }
349        return false;
350    }
351
352}
353
354
355/**
356 * Implements a shutdown task.
357 */
358class ShutdownRequest implements Runnable {
359
360
361    /**
362     * Run the thread.
363     */
364    public void run() {
365        System.out.println("running ShutdownRequest");
366    }
367
368
369    /**
370     * toString.
371     * @see java.lang.Object#toString()
372     */
373    @Override
374    public String toString() {
375        return "ShutdownRequest";
376    }
377
378}
379
380
381/**
382 * Implements one local part of the distributed thread.
383 */
384class DistPoolThread extends Thread {
385
386
387    final DistThreadPool pool;
388
389
390    final ExecutableChannels ec;
391
392
393    final int myId;
394
395
396    private static final Logger logger = Logger.getLogger(DistPoolThread.class);
397
398
399    private final boolean debug = logger.isDebugEnabled();
400
401
402    boolean working = false;
403
404
405    /**
406     * @param pool DistThreadPool.
407     */
408    public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) {
409        this.pool = pool;
410        this.ec = ec;
411        myId = i;
412    }
413
414
415    /**
416     * Run the thread.
417     */
418    @Override
419    public void run() {
420        logger.info("ready, myId = " + myId);
421        Runnable job;
422        int done = 0;
423        long time = 0;
424        long t;
425        boolean running = true;
426        while (running) {
427            try {
428                logger.debug("looking for a job");
429                job = pool.getJob();
430                working = true;
431                if (debug) {
432                    logger.info("working " + myId + " on " + job);
433                }
434                t = System.currentTimeMillis();
435                // send and wait, like rmi
436                try {
437                    if (job instanceof ShutdownRequest) {
438                        ec.send(myId, ExecutableServer.STOP);
439                    } else {
440                        ec.send(myId, job);
441                    }
442                    if (debug) {
443                        logger.info("send " + myId + " at " + ec + " send job " + job);
444                    }
445                } catch (IOException e) {
446                    e.printStackTrace();
447                    logger.info("error send " + myId + " at " + ec + " e = " + e);
448                    working = false;
449                }
450                // remote: job.run(); 
451                Object o = null;
452                try {
453                    if (working) {
454                        logger.info("waiting " + myId + " on " + job);
455                        o = ec.receive(myId);
456                        if (debug) {
457                            logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o);
458                        }
459                    }
460                } catch (IOException e) {
461                    logger.info("receive exception " + myId + " send job " + job + ", " + e);
462                    //e.printStackTrace();
463                    running = false;
464                } catch (ClassNotFoundException e) {
465                    logger.info("receive exception " + myId + " send job " + job + ", " + e);
466                    //e.printStackTrace();
467                    running = false;
468                } finally {
469                    if (debug) {
470                        logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received "
471                                    + o + " running " + running);
472                    }
473                }
474                working = false;
475                time += System.currentTimeMillis() - t;
476                done++;
477                if (debug) {
478                    logger.info("done " + myId + " with " + o);
479                }
480            } catch (InterruptedException e) {
481                running = false;
482                Thread.currentThread().interrupt();
483            }
484        }
485        logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds");
486    }
487
488}