001/*
002 * $Id: DistThreadPool.java 4305 2012-12-01 10:46:45Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.FileNotFoundException;
009import java.io.IOException;
010import java.util.LinkedList;
011
012import org.apache.log4j.Logger;
013
014
015/**
016 * Distributed thread pool. Using stack / list work-pile and Executable Channels
017 * and Servers.
018 * @author Heinz Kredel
019 */
020
021public class DistThreadPool /*extends ThreadPool*/{
022
023
024    /**
025     * machine file to use.
026     */
027    private final String mfile;
028
029
030    /**
031     * default machine file for test.
032     */
033    private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE;
034
035
036    /**
037     * Number of threads to use.
038     */
039    protected final int threads;
040
041
042    /**
043     * Default number of threads to use.
044     */
045    static final int DEFAULT_SIZE = 3;
046
047
048    /**
049     * Channels to remote executable servers.
050     */
051    final ExecutableChannels ec;
052
053
054    /**
055     * Array of workers.
056     */
057    protected DistPoolThread[] workers;
058
059
060    /**
061     * Number of idle workers.
062     */
063    protected int idleworkers = 0;
064
065
066    /**
067     * Work queue / stack.
068     */
069    // should be expressed using strategy pattern
070    // List or Collection is not appropriate
071    // LIFO strategy for recursion
072    protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
073
074
075    protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
076
077
078    private static final Logger logger = Logger.getLogger(DistThreadPool.class);
079
080
081    private final boolean debug = true; //logger.isDebugEnabled();
082
083
084    /**
085     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO
086     * and size DEFAULT_SIZE.
087     */
088    public DistThreadPool() {
089        this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null);
090    }
091
092
093    /**
094     * Constructs a new DistThreadPool with size DEFAULT_SIZE.
095     * @param strategy for job processing.
096     */
097    public DistThreadPool(StrategyEnumeration strategy) {
098        this(strategy, DEFAULT_SIZE, null);
099    }
100
101
102    /**
103     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
104     * @param size of the pool.
105     */
106    public DistThreadPool(int size) {
107        this(StrategyEnumeration.FIFO, size, null);
108    }
109
110
111    /**
112     * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
113     * @param size of the pool.
114     * @param mfile machine file.
115     */
116    public DistThreadPool(int size, String mfile) {
117        this(StrategyEnumeration.FIFO, size, mfile);
118    }
119
120
121    /**
122     * Constructs a new DistThreadPool.
123     * @param strategy for job processing.
124     * @param size of the pool.
125     * @param mfile machine file.
126     */
127    public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) {
128        this.strategy = strategy;
129        if (size < 0) {
130            this.threads = 0;
131        } else {
132            this.threads = size;
133        }
134        if (mfile == null || mfile.length() == 0) {
135            this.mfile = DEFAULT_MFILE;
136        } else {
137            this.mfile = mfile;
138        }
139        jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
140        try {
141            ec = new ExecutableChannels(this.mfile);
142        } catch (FileNotFoundException e) {
143            e.printStackTrace();
144            throw new IllegalArgumentException("DistThreadPool " + e);
145        }
146        if (debug) {
147            logger.info("ec = " + ec);
148        }
149        try {
150            ec.open(threads);
151        } catch (IOException e) {
152            e.printStackTrace();
153            throw new IllegalArgumentException("DistThreadPool " + e);
154        }
155        if (debug) {
156            logger.info("ec = " + ec);
157        }
158        workers = new DistPoolThread[0];
159    }
160
161
162    /**
163     * String representation.
164     */
165    @Override
166    public String toString() {
167        StringBuffer s = new StringBuffer("DistThreadPool(");
168        s.append("threads="+threads);
169        s.append(", exchan="+ec);
170        s.append(", workers="+workers.length);
171        s.append(")");
172        return s.toString();
173    }
174
175
176    /**
177     * thread initialization and start.
178     */
179    public void init() {
180        if (workers == null || workers.length == 0) {
181            workers = new DistPoolThread[threads];
182            for (int i = 0; i < workers.length; i++) {
183                workers[i] = new DistPoolThread(this, ec, i);
184                workers[i].start();
185            }
186            logger.info("size = " + threads + ", strategy = " + strategy);
187        }
188    }
189
190
191    /**
192     * number of worker threads.
193     */
194    public int getNumber() {
195        if (workers == null || workers.length < threads) {
196            init(); // start threads
197        }
198        return workers.length; // not null
199    }
200
201
202    /**
203     * get used strategy.
204     */
205    public StrategyEnumeration getStrategy() {
206        return strategy;
207    }
208
209
210    /**
211     * the used executable channel.
212     */
213    public ExecutableChannels getEC() {
214        return ec; // not null
215    }
216
217
218    /**
219     * Terminates the threads.
220     * @param shutDown true, if shut-down of the remote executable servers is
221     *            requested, false, if remote executable servers stay alive.
222     */
223    public void terminate(boolean shutDown) {
224        if (shutDown) {
225            ShutdownRequest sdr = new ShutdownRequest();
226            for (int i = 0; i < workers.length; i++) {
227                addJob(sdr);
228            }
229            try {
230                Thread.sleep(20);
231            } catch (InterruptedException e) {
232                Thread.currentThread().interrupt();
233            }
234            logger.info("remaining jobs = " + jobstack.size());
235            try {
236                for (int i = 0; i < workers.length; i++) {
237                    while (workers[i].isAlive()) {
238                        workers[i].interrupt();
239                        workers[i].join(100);
240                    }
241                }
242            } catch (InterruptedException e) {
243                Thread.currentThread().interrupt();
244            }
245        } else {
246            terminate();
247        }
248    }
249
250
251    /**
252     * Terminates the threads.
253     */
254    public void terminate() {
255        while (hasJobs()) {
256            try {
257                Thread.sleep(100);
258            } catch (InterruptedException e) {
259                Thread.currentThread().interrupt();
260            }
261        }
262        for (int i = 0; i < workers.length; i++) {
263            try {
264                while (workers[i].isAlive()) {
265                    workers[i].interrupt();
266                    workers[i].join(100);
267                }
268            } catch (InterruptedException e) {
269                Thread.currentThread().interrupt();
270            }
271        }
272        ec.close();
273    }
274
275
276    /**
277     * adds a job to the workpile.
278     * @param job
279     */
280    public synchronized void addJob(Runnable job) {
281        if (workers == null || workers.length < threads) {
282            init(); // start threads
283        }
284        jobstack.addLast(job);
285        logger.debug("adding job");
286        if (idleworkers > 0) {
287            logger.debug("notifying a jobless worker");
288            notifyAll(); // findbugs
289        }
290    }
291
292
293    /**
294     * get a job for processing.
295     */
296    protected synchronized Runnable getJob() throws InterruptedException {
297        while (jobstack.isEmpty()) {
298            idleworkers++;
299            logger.debug("waiting");
300            wait();
301            idleworkers--;
302        }
303        // is expressed using strategy enumeration
304        if (strategy == StrategyEnumeration.LIFO) {
305            return jobstack.removeLast(); // LIFO
306        }
307        return jobstack.removeFirst(); // FIFO
308    }
309
310
311    /**
312     * check if there are jobs for processing.
313     */
314    public boolean hasJobs() {
315        if (jobstack.size() > 0) {
316            return true;
317        }
318        for (int i = 0; i < workers.length; i++) {
319            if (workers[i].working) {
320                return true;
321            }
322        }
323        return false;
324    }
325
326
327    /**
328     * check if there are more than n jobs for processing.
329     * @param n Integer
330     * @return true, if there are possibly more than n jobs.
331     */
332    public boolean hasJobs(int n) {
333        int j = jobstack.size();
334        if (j > 0 && (j + workers.length > n)) {
335            return true;
336            // if j > 0 no worker should be idle
337            // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
338        }
339        int x = 0;
340        for (int i = 0; i < workers.length; i++) {
341            if (workers[i].working) {
342                x++;
343            }
344        }
345        if ((j + x) > n) {
346            return true;
347        }
348        return false;
349    }
350
351}
352
353
354/**
355 * Implements a shutdown task.
356 */
357class ShutdownRequest implements Runnable {
358
359
360    /**
361     * Run the thread.
362     */
363    public void run() {
364        System.out.println("running ShutdownRequest");
365    }
366
367
368    /**
369     * toString.
370     * @see java.lang.Object#toString()
371     */
372    @Override
373    public String toString() {
374        return "ShutdownRequest";
375    }
376
377}
378
379
380/**
381 * Implements one local part of the distributed thread.
382 */
383class DistPoolThread extends Thread {
384
385
386    final DistThreadPool pool;
387
388
389    final ExecutableChannels ec;
390
391
392    final int myId;
393
394
395    private static final Logger logger = Logger.getLogger(DistPoolThread.class);
396
397
398    private final boolean debug = logger.isDebugEnabled();
399
400
401    boolean working = false;
402
403
404    /**
405     * @param pool DistThreadPool.
406     */
407    public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) {
408        this.pool = pool;
409        this.ec = ec;
410        myId = i;
411    }
412
413
414    /**
415     * Run the thread.
416     */
417    @Override
418    public void run() {
419        logger.info("ready, myId = " + myId);
420        Runnable job;
421        int done = 0;
422        long time = 0;
423        long t;
424        boolean running = true;
425        while (running) {
426            try {
427                logger.debug("looking for a job");
428                job = pool.getJob();
429                working = true;
430                if (debug) {
431                    logger.info("working " + myId + " on " + job);
432                }
433                t = System.currentTimeMillis();
434                // send and wait, like rmi
435                try {
436                    if (job instanceof ShutdownRequest) {
437                        ec.send(myId, ExecutableServer.STOP);
438                    } else {
439                        ec.send(myId, job);
440                    }
441                    if (debug) {
442                        logger.info("send " + myId + " at " + ec + " send job " + job);
443                    }
444                } catch (IOException e) {
445                    e.printStackTrace();
446                    logger.info("error send " + myId + " at " + ec + " e = " + e);
447                    working = false;
448                }
449                // remote: job.run(); 
450                Object o = null;
451                try {
452                    if (working) {
453                        logger.info("waiting " + myId + " on " + job);
454                        o = ec.receive(myId);
455                        if (debug) {
456                            logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o);
457                        }
458                    }
459                } catch (IOException e) {
460                    logger.info("receive exception " + myId + " send job " + job + ", " + e);
461                    //e.printStackTrace();
462                    running = false;
463                } catch (ClassNotFoundException e) {
464                    logger.info("receive exception " + myId + " send job " + job + ", " + e);
465                    //e.printStackTrace();
466                    running = false;
467                } finally {
468                    if (debug) {
469                        logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received "
470                                    + o + " running " + running);
471                    }
472                }
473                working = false;
474                time += System.currentTimeMillis() - t;
475                done++;
476                if (debug) {
477                    logger.info("done " + myId + " with " + o);
478                }
479            } catch (InterruptedException e) {
480                running = false;
481                Thread.currentThread().interrupt();
482            }
483        }
484        logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds");
485    }
486
487}