001    /*
002     * $Id: ThreadPool.java 3571 2011-03-18 22:02:51Z kredel $
003     */
004    
005    // package edu.unima.ky.parallel;
006    package edu.jas.util;
007    
008    
009    import java.util.LinkedList;
010    
011    import org.apache.log4j.Logger;
012    
013    
014    /**
015     * Thread pool using stack / list workpile.
016     * @author Akitoshi Yoshida
017     * @author Heinz Kredel
018     */
019    
020    public class ThreadPool {
021    
022    
023        /**
024         * Default number of threads to use.
025         */
026        static final int DEFAULT_SIZE = 3;
027    
028    
029        /**
030         * Number of threads to use.
031         */
032        final int size;
033    
034    
035        /**
036         * Array of workers.
037         */
038        protected PoolThread[] workers;
039    
040    
041        /**
042         * Number of idle workers.
043         */
044        protected int idleworkers = 0;
045    
046    
047        /**
048         * Shutdown request.
049         */
050        protected volatile boolean shutdown = false;
051    
052    
053        /**
054         * Work queue / stack.
055         */
056        // should be expressed using strategy pattern
057        // List or Collection is not appropriate
058        // LIFO strategy for recursion
059        protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
060    
061    
062        protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
063    
064    
065        private static final Logger logger = Logger.getLogger(ThreadPool.class);
066    
067    
068        private static boolean debug = logger.isDebugEnabled();
069    
070    
071        /**
072         * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and
073         * size DEFAULT_SIZE.
074         */
075        public ThreadPool() {
076            this(StrategyEnumeration.FIFO, DEFAULT_SIZE);
077        }
078    
079    
080        /**
081         * Constructs a new ThreadPool with size DEFAULT_SIZE.
082         * @param strategy for job processing.
083         */
084        public ThreadPool(StrategyEnumeration strategy) {
085            this(strategy, DEFAULT_SIZE);
086        }
087    
088    
089        /**
090         * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO.
091         * @param size of the pool.
092         */
093        public ThreadPool(int size) {
094            this(StrategyEnumeration.FIFO, size);
095        }
096    
097    
098        /**
099         * Constructs a new ThreadPool.
100         * @param strategy for job processing.
101         * @param size of the pool.
102         */
103        public ThreadPool(StrategyEnumeration strategy, int size) {
104            this.size = size;
105            this.strategy = strategy;
106            jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
107            workers = new PoolThread[0];
108        }
109    
110    
111        /**
112         * thread initialization and start.
113         */
114        public void init() {
115            if (workers == null || workers.length == 0) {
116                workers = new PoolThread[size];
117                for (int i = 0; i < workers.length; i++) {
118                    workers[i] = new PoolThread(this);
119                    workers[i].start();
120                }
121                logger.info("size = " + size + ", strategy = " + strategy);
122            }
123            if (false) { // debug
124                Thread.dumpStack();
125            }
126        }
127    
128    
129        /**
130         * toString.
131         */
132        @Override
133        public String toString() {
134            return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs="
135                            + jobstack.size() + ")";
136        }
137    
138    
139        /**
140         * number of worker threads.
141         */
142        public int getNumber() {
143            if (workers == null || workers.length < size) {
144                init(); // start threads
145            }
146            return workers.length; // not null
147        }
148    
149    
150        /**
151         * get used strategy.
152         */
153        public StrategyEnumeration getStrategy() {
154            return strategy;
155        }
156    
157    
158        /**
159         * Terminates the threads.
160         */
161        public void terminate() {
162            while (hasJobs()) {
163                try {
164                    Thread.sleep(100);
165                    //logger.info("waiting for termination in " + this);
166                } catch (InterruptedException e) {
167                    Thread.currentThread().interrupt();
168                }
169            }
170            for (int i = 0; i < workers.length; i++) {
171                try {
172                    while (workers[i].isAlive()) {
173                        workers[i].interrupt();
174                        workers[i].join(100);
175                    }
176                } catch (InterruptedException e) {
177                    Thread.currentThread().interrupt();
178                }
179            }
180        }
181    
182    
183        /**
184         * Cancels the threads.
185         */
186        public int cancel() {
187            shutdown = true;
188            int s = jobstack.size();
189            if (hasJobs()) {
190                synchronized (this) {
191                    logger.info("jobs canceled: " + jobstack);
192                    jobstack.clear();
193                }
194            }
195            int re = 0;
196            for (int i = 0; i < workers.length; i++) {
197                if (workers[i] == null) {
198                    continue;
199                }
200                try {
201                    while (workers[i].isAlive()) {
202                        synchronized (this) {
203                            shutdown = true;
204                            notifyAll(); // for getJob
205                            workers[i].interrupt();
206                        }
207                        re++;
208                        //if ( re > 3 * workers.length ) {
209                        //    break; // give up
210                        //}
211                        workers[i].join(100);
212                    }
213                } catch (InterruptedException e) {
214                    Thread.currentThread().interrupt();
215                }
216            }
217            return s;
218        }
219    
220    
221        /**
222         * adds a job to the workpile.
223         * @param job
224         */
225        public synchronized void addJob(Runnable job) {
226            if (workers == null || workers.length < size) {
227                init(); // start threads
228            }
229            jobstack.addLast(job);
230            logger.debug("adding job");
231            if (idleworkers > 0) {
232                logger.debug("notifying a jobless worker");
233                notifyAll();
234            }
235        }
236    
237    
238        /**
239         * get a job for processing.
240         */
241        protected synchronized Runnable getJob() throws InterruptedException {
242            while (jobstack.isEmpty()) {
243                idleworkers++;
244                logger.debug("waiting");
245                wait();
246                idleworkers--;
247                if (shutdown) {
248                    throw new InterruptedException("shutdown in getJob");
249                }
250            }
251            // is expressed using strategy enumeration
252            if (strategy == StrategyEnumeration.LIFO) {
253                return jobstack.removeLast(); // LIFO
254            }
255            return jobstack.removeFirst(); // FIFO
256        }
257    
258    
259        /**
260         * check if there are jobs for processing.
261         */
262        public boolean hasJobs() {
263            if (jobstack.size() > 0) {
264                return true;
265            }
266            for (int i = 0; i < workers.length; i++) {
267                if (workers[i] == null) {
268                    continue;
269                }
270                if (workers[i].isWorking) {
271                    return true;
272                }
273            }
274            return false;
275        }
276    
277    
278        /**
279         * check if there are more than n jobs for processing.
280         * @param n Integer
281         * @return true, if there are possibly more than n jobs.
282         */
283        public boolean hasJobs(int n) {
284            int j = jobstack.size();
285            if (j > 0 && (j + workers.length > n)) {
286                return true;
287            }
288            // if j > 0 no worker should be idle
289            // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
290            int x = 0;
291            for (int i = 0; i < workers.length; i++) {
292                if (workers[i] == null) {
293                    continue;
294                }
295                if (workers[i].isWorking) {
296                    x++;
297                }
298            }
299            if ((j + x) > n) {
300                return true;
301            }
302            return false;
303        }
304    
305    }
306    
307    
308    /**
309     * Implements one Thread of the pool.
310     */
311    class PoolThread extends Thread {
312    
313    
314        ThreadPool pool;
315    
316    
317        private static final Logger logger = Logger.getLogger(ThreadPool.class);
318    
319    
320        private static boolean debug = logger.isDebugEnabled();
321    
322    
323        volatile boolean isWorking = false;
324    
325    
326        /**
327         * @param pool ThreadPool.
328         */
329        public PoolThread(ThreadPool pool) {
330            this.pool = pool;
331        }
332    
333    
334        /**
335         * Run the thread.
336         */
337        @Override
338        public void run() {
339            logger.info("ready");
340            Runnable job;
341            int done = 0;
342            long time = 0;
343            long t;
344            boolean running = true;
345            while (running) {
346                try {
347                    logger.debug("looking for a job");
348                    job = pool.getJob();
349                    if (job == null) {
350                        break;
351                    }
352                    if (debug) {
353                        logger.info("working");
354                    }
355                    t = System.currentTimeMillis();
356                    isWorking = true;
357                    job.run();
358                    isWorking = false;
359                    time += System.currentTimeMillis() - t;
360                    done++;
361                    if (debug) {
362                        logger.info("done");
363                    }
364                } catch (InterruptedException e) {
365                    Thread.currentThread().interrupt();
366                    running = false;
367                    isWorking = false;
368                } catch (RuntimeException e) {
369                    logger.warn("catched " + e);
370                }
371            }
372            isWorking = false;
373            logger.info("terminated, done " + done + " jobs in " + time + " milliseconds");
374        }
375    
376    }