001    /*
002     * $Id: DistThreadPool.java 3337 2010-09-27 21:05:17Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    
008    import java.io.FileNotFoundException;
009    import java.io.IOException;
010    import java.util.LinkedList;
011    
012    import org.apache.log4j.Logger;
013    
014    
015    /**
016     * Distributed thread pool. Using stack / list workpile and Executable Channels
017     * and Servers.
018     * @author Heinz Kredel
019     */
020    
021    public class DistThreadPool /*extends ThreadPool*/{
022    
023    
024        /**
025         * machine file to use.
026         */
027        private final String mfile;
028    
029    
030        /**
031         * default machine file for test.
032         */
033        private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE;
034    
035    
036        /**
037         * Number of threads to use.
038         */
039        protected final int threads;
040    
041    
042        /**
043         * Default number of threads to use.
044         */
045        static final int DEFAULT_SIZE = 3;
046    
047    
048        /**
049         * Channels to remote executable servers.
050         */
051        final ExecutableChannels ec;
052    
053    
054        /**
055         * Array of workers.
056         */
057        protected DistPoolThread[] workers;
058    
059    
060        /**
061         * Number of idle workers.
062         */
063        protected int idleworkers = 0;
064    
065    
066        /**
067         * Work queue / stack.
068         */
069        // should be expressed using strategy pattern
070        // List or Collection is not appropriate
071        // LIFO strategy for recursion
072        protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
073    
074    
075        protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
076    
077    
078        private static final Logger logger = Logger.getLogger(DistThreadPool.class);
079    
080    
081        private final boolean debug = logger.isDebugEnabled();
082    
083    
084        /**
085         * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO
086         * and size DEFAULT_SIZE.
087         */
088        public DistThreadPool() {
089            this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null);
090        }
091    
092    
093        /**
094         * Constructs a new DistThreadPool with size DEFAULT_SIZE.
095         * @param strategy for job processing.
096         */
097        public DistThreadPool(StrategyEnumeration strategy) {
098            this(strategy, DEFAULT_SIZE, null);
099        }
100    
101    
102        /**
103         * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
104         * @param size of the pool.
105         */
106        public DistThreadPool(int size) {
107            this(StrategyEnumeration.FIFO, size, null);
108        }
109    
110    
111        /**
112         * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
113         * @param size of the pool.
114         * @param mfile machine file.
115         */
116        public DistThreadPool(int size, String mfile) {
117            this(StrategyEnumeration.FIFO, size, mfile);
118        }
119    
120    
121        /**
122         * Constructs a new DistThreadPool.
123         * @param strategy for job processing.
124         * @param size of the pool.
125         * @param mfile machine file.
126         */
127        public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) {
128            this.strategy = strategy;
129            if (size < 0) {
130                this.threads = 0;
131            } else {
132                this.threads = size;
133            }
134            if (mfile == null || mfile.length() == 0) {
135                this.mfile = DEFAULT_MFILE;
136            } else {
137                this.mfile = mfile;
138            }
139            jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
140            try {
141                ec = new ExecutableChannels(this.mfile);
142            } catch (FileNotFoundException e) {
143                e.printStackTrace();
144                throw new IllegalArgumentException("DistThreadPool " + e);
145            }
146            if (debug) {
147                logger.debug("ExecutableChannels = " + ec);
148            }
149            try {
150                ec.open(threads);
151            } catch (IOException e) {
152                e.printStackTrace();
153                throw new IllegalArgumentException("DistThreadPool " + e);
154            }
155            if (debug) {
156                logger.debug("ExecutableChannels = " + ec);
157            }
158            workers = new DistPoolThread[0];
159        }
160    
161    
162        /**
163         * thread initialization and start.
164         */
165        public void init() {
166            if (workers == null || workers.length == 0) {
167                workers = new DistPoolThread[threads];
168                for (int i = 0; i < workers.length; i++) {
169                    workers[i] = new DistPoolThread(this, ec, i);
170                    workers[i].start();
171                }
172                logger.info("size = " + threads + ", strategy = " + strategy);
173            }
174        }
175    
176    
177        /**
178         * number of worker threads.
179         */
180        public int getNumber() {
181            if (workers == null || workers.length < threads) {
182                init(); // start threads
183            }
184            return workers.length; // not null
185        }
186    
187    
188        /**
189         * get used strategy.
190         */
191        public StrategyEnumeration getStrategy() {
192            return strategy;
193        }
194    
195    
196        /**
197         * the used executable channel.
198         */
199        public ExecutableChannels getEC() {
200            return ec; // not null
201        }
202    
203    
204        /**
205         * Terminates the threads.
206         * @param shutDown true, if shut-down of the remote executable servers is
207         *            requested, false, if remote executable servers stay alive.
208         */
209        public void terminate(boolean shutDown) {
210            if (shutDown) {
211                ShutdownRequest sdr = new ShutdownRequest();
212                for (int i = 0; i < workers.length; i++) {
213                    addJob(sdr);
214                }
215                try {
216                    Thread.sleep(20);
217                } catch (InterruptedException e) {
218                    Thread.currentThread().interrupt();
219                }
220                logger.info("remaining jobs = " + jobstack.size());
221                try {
222                    for (int i = 0; i < workers.length; i++) {
223                        while (workers[i].isAlive()) {
224                            workers[i].interrupt();
225                            workers[i].join(100);
226                        }
227                    }
228                } catch (InterruptedException e) {
229                    Thread.currentThread().interrupt();
230                }
231            } else {
232                terminate();
233            }
234        }
235    
236    
237        /**
238         * Terminates the threads.
239         */
240        public void terminate() {
241            while (hasJobs()) {
242                try {
243                    Thread.sleep(100);
244                } catch (InterruptedException e) {
245                    Thread.currentThread().interrupt();
246                }
247            }
248            for (int i = 0; i < workers.length; i++) {
249                try {
250                    while (workers[i].isAlive()) {
251                        workers[i].interrupt();
252                        workers[i].join(100);
253                    }
254                } catch (InterruptedException e) {
255                    Thread.currentThread().interrupt();
256                }
257            }
258            ec.close();
259        }
260    
261    
262        /**
263         * adds a job to the workpile.
264         * @param job
265         */
266        public synchronized void addJob(Runnable job) {
267            if (workers == null || workers.length < threads) {
268                init(); // start threads
269            }
270            jobstack.addLast(job);
271            logger.debug("adding job");
272            if (idleworkers > 0) {
273                logger.debug("notifying a jobless worker");
274                notify();
275            }
276        }
277    
278    
279        /**
280         * get a job for processing.
281         */
282        protected synchronized Runnable getJob() throws InterruptedException {
283            while (jobstack.isEmpty()) {
284                idleworkers++;
285                logger.debug("waiting");
286                wait();
287                idleworkers--;
288            }
289            // is expressed using strategy enumeration
290            if (strategy == StrategyEnumeration.LIFO) {
291                return jobstack.removeLast(); // LIFO
292            }
293            return jobstack.removeFirst(); // FIFO
294        }
295    
296    
297        /**
298         * check if there are jobs for processing.
299         */
300        public boolean hasJobs() {
301            if (jobstack.size() > 0) {
302                return true;
303            }
304            for (int i = 0; i < workers.length; i++) {
305                if (workers[i].working) {
306                    return true;
307                }
308            }
309            return false;
310        }
311    
312    
313        /**
314         * check if there are more than n jobs for processing.
315         * @param n Integer
316         * @return true, if there are possibly more than n jobs.
317         */
318        public boolean hasJobs(int n) {
319            int j = jobstack.size();
320            if (j > 0 && (j + workers.length > n)) {
321                return true;
322                // if j > 0 no worker should be idle
323                // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
324            }
325            int x = 0;
326            for (int i = 0; i < workers.length; i++) {
327                if (workers[i].working) {
328                    x++;
329                }
330            }
331            if ((j + x) > n) {
332                return true;
333            }
334            return false;
335        }
336    
337    }
338    
339    
340    /**
341     * Implements a shutdown task.
342     */
343    class ShutdownRequest implements Runnable {
344    
345    
346        /**
347         * Run the thread.
348         */
349        public void run() {
350            System.out.println("ShutdownRequest");
351        }
352    }
353    
354    
355    /**
356     * Implements one local part of the distributed thread.
357     */
358    class DistPoolThread extends Thread {
359    
360    
361        final DistThreadPool pool;
362    
363    
364        final ExecutableChannels ec;
365    
366    
367        final int myId;
368    
369    
370        private static final Logger logger = Logger.getLogger(DistPoolThread.class);
371    
372    
373        private final boolean debug = logger.isInfoEnabled();
374    
375    
376        boolean working = false;
377    
378    
379        /**
380         * @param pool DistThreadPool.
381         */
382        public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) {
383            this.pool = pool;
384            this.ec = ec;
385            myId = i;
386        }
387    
388    
389        /**
390         * Run the thread.
391         */
392        @Override
393        public void run() {
394            logger.info("ready, myId = " + myId);
395            Runnable job;
396            int done = 0;
397            long time = 0;
398            long t;
399            boolean running = true;
400            while (running) {
401                try {
402                    logger.debug("looking for a job");
403                    job = pool.getJob();
404                    working = true;
405                    if (debug) {
406                        logger.info("working " + myId + " on " + job);
407                    }
408                    t = System.currentTimeMillis();
409                    // send and wait, like rmi
410                    try {
411                        if (job instanceof ShutdownRequest) {
412                            ec.send(myId, ExecutableServer.STOP);
413                        } else {
414                            ec.send(myId, job);
415                        }
416                        logger.info("send " + myId + " at " + ec + " send job " + job);
417                    } catch (IOException e) {
418                        e.printStackTrace();
419                        logger.info("error send " + myId + " at " + ec + " e = " + e);
420                        working = false;
421                    }
422                    //job.run(); 
423                    Object o = null;
424                    try {
425                        if (working) {
426                            logger.info("waiting " + myId + " on " + job);
427                            o = ec.receive(myId);
428                            logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o);
429                        }
430                    } catch (IOException e) {
431                        logger.info("receive exception " + myId + " send job " + job + ", " + e);
432                        //e.printStackTrace();
433                        running = false;
434                    } catch (ClassNotFoundException e) {
435                        logger.info("receive exception " + myId + " send job " + job + ", " + e);
436                        //e.printStackTrace();
437                        running = false;
438                    } finally {
439                        logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received "
440                                + o + " running " + running);
441                    }
442                    working = false;
443                    time += System.currentTimeMillis() - t;
444                    done++;
445                    if (debug) {
446                        logger.info("done " + myId + " with " + o);
447                    }
448                } catch (InterruptedException e) {
449                    running = false;
450                    Thread.currentThread().interrupt();
451                }
452            }
453            logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds");
454        }
455    
456    }