001/*
002 * $Id: ThreadPool.java 4254 2012-10-14 18:13:32Z kredel $
003 */
004
005// package edu.unima.ky.parallel;
006package edu.jas.util;
007
008
009import java.util.LinkedList;
010
011import org.apache.log4j.Logger;
012
013import edu.jas.kern.PreemptingException;
014
015/**
016 * Thread pool using stack / list workpile.
017 * @author Akitoshi Yoshida
018 * @author Heinz Kredel
019 */
020
021public class ThreadPool {
022
023
024    /**
025     * Default number of threads to use.
026     */
027    static final int DEFAULT_SIZE = 3;
028
029
030    /**
031     * Number of threads to use.
032     */
033    final int size;
034
035
036    /**
037     * Array of workers.
038     */
039    protected PoolThread[] workers;
040
041
042    /**
043     * Number of idle workers.
044     */
045    protected int idleworkers = 0;
046
047
048    /**
049     * Shutdown request.
050     */
051    protected volatile boolean shutdown = false;
052
053
054    /**
055     * Work queue / stack.
056     */
057    // should be expressed using strategy pattern
058    // List or Collection is not appropriate
059    // LIFO strategy for recursion
060    protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
061
062
063    protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
064
065
066    private static final Logger logger = Logger.getLogger(ThreadPool.class);
067
068
069    private static boolean debug = logger.isDebugEnabled();
070
071
072    /**
073     * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and
074     * size DEFAULT_SIZE.
075     */
076    public ThreadPool() {
077        this(StrategyEnumeration.FIFO, DEFAULT_SIZE);
078    }
079
080
081    /**
082     * Constructs a new ThreadPool with size DEFAULT_SIZE.
083     * @param strategy for job processing.
084     */
085    public ThreadPool(StrategyEnumeration strategy) {
086        this(strategy, DEFAULT_SIZE);
087    }
088
089
090    /**
091     * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO.
092     * @param size of the pool.
093     */
094    public ThreadPool(int size) {
095        this(StrategyEnumeration.FIFO, size);
096    }
097
098
099    /**
100     * Constructs a new ThreadPool.
101     * @param strategy for job processing.
102     * @param size of the pool.
103     */
104    public ThreadPool(StrategyEnumeration strategy, int size) {
105        this.size = size;
106        this.strategy = strategy;
107        jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
108        workers = new PoolThread[0];
109    }
110
111
112    /**
113     * thread initialization and start.
114     */
115    public void init() {
116        if (workers == null || workers.length == 0) {
117            workers = new PoolThread[size];
118            for (int i = 0; i < workers.length; i++) {
119                workers[i] = new PoolThread(this);
120                workers[i].start();
121            }
122            logger.info("size = " + size + ", strategy = " + strategy);
123        }
124        if (debug) {
125            Thread.dumpStack();
126        }
127    }
128
129
130    /**
131     * toString.
132     */
133    @Override
134    public String toString() {
135        return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs="
136                        + jobstack.size() + ")";
137    }
138
139
140    /**
141     * number of worker threads.
142     */
143    public int getNumber() {
144        return size;
145        //if (workers == null || workers.length < size) {
146        //    init(); // start threads
147        //}
148        //return workers.length; // not null
149    }
150
151
152    /**
153     * get used strategy.
154     */
155    public StrategyEnumeration getStrategy() {
156        return strategy;
157    }
158
159
160    /**
161     * Terminates the threads.
162     */
163    public void terminate() {
164        while (hasJobs()) {
165            try {
166                Thread.sleep(100);
167                //logger.info("waiting for termination in " + this);
168            } catch (InterruptedException e) {
169                Thread.currentThread().interrupt();
170            }
171        }
172        for (int i = 0; i < workers.length; i++) {
173            try {
174                while (workers[i].isAlive()) {
175                    workers[i].interrupt();
176                    workers[i].join(100);
177                }
178            } catch (InterruptedException e) {
179                Thread.currentThread().interrupt();
180            }
181        }
182    }
183
184
185    /**
186     * Cancels the threads.
187     */
188    public int cancel() {
189        shutdown = true;
190        int s = jobstack.size();
191        if (hasJobs()) {
192            synchronized (this) {
193                logger.info("jobs canceled: " + jobstack);
194                jobstack.clear();
195            }
196        }
197        int re = 0;
198        for (int i = 0; i < workers.length; i++) {
199            if (workers[i] == null) {
200                continue;
201            }
202            try {
203                while (workers[i].isAlive()) {
204                    synchronized (this) {
205                        shutdown = true;
206                        notifyAll(); // for getJob
207                        workers[i].interrupt();
208                    }
209                    re++;
210                    //if ( re > 3 * workers.length ) {
211                    //    break; // give up
212                    //}
213                    workers[i].join(100);
214                }
215            } catch (InterruptedException e) {
216                Thread.currentThread().interrupt();
217            }
218        }
219        return s;
220    }
221
222
223    /**
224     * adds a job to the workpile.
225     * @param job
226     */
227    public synchronized void addJob(Runnable job) {
228        if (workers == null || workers.length < size) {
229            init(); // start threads
230        }
231        jobstack.addLast(job);
232        logger.debug("adding job");
233        if (idleworkers > 0) {
234            logger.debug("notifying a jobless worker");
235            notifyAll();
236        }
237    }
238
239
240    /**
241     * get a job for processing.
242     */
243    protected synchronized Runnable getJob() throws InterruptedException {
244        while (jobstack.isEmpty()) {
245            idleworkers++;
246            logger.debug("waiting");
247            wait();
248            idleworkers--;
249            if (shutdown) {
250                throw new InterruptedException("shutdown in getJob");
251            }
252        }
253        // is expressed using strategy enumeration
254        if (strategy == StrategyEnumeration.LIFO) {
255            return jobstack.removeLast(); // LIFO
256        }
257        return jobstack.removeFirst(); // FIFO
258    }
259
260
261    /**
262     * check if there are jobs for processing.
263     */
264    public boolean hasJobs() {
265        if (jobstack.size() > 0) {
266            return true;
267        }
268        for (int i = 0; i < workers.length; i++) {
269            if (workers[i] == null) {
270                continue;
271            }
272            if (workers[i].isWorking) {
273                return true;
274            }
275        }
276        return false;
277    }
278
279
280    /**
281     * check if there are more than n jobs for processing.
282     * @param n Integer
283     * @return true, if there are possibly more than n jobs.
284     */
285    public boolean hasJobs(int n) {
286        int j = jobstack.size();
287        if (j > 0 && (j + workers.length > n)) {
288            return true;
289        }
290        // if j > 0 no worker should be idle
291        // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
292        int x = 0;
293        for (int i = 0; i < workers.length; i++) {
294            if (workers[i] == null) {
295                continue;
296            }
297            if (workers[i].isWorking) {
298                x++;
299            }
300        }
301        if ((j + x) > n) {
302            return true;
303        }
304        return false;
305    }
306
307}
308
309
310/**
311 * Implements one Thread of the pool.
312 */
313class PoolThread extends Thread {
314
315
316    ThreadPool pool;
317
318
319    private static final Logger logger = Logger.getLogger(ThreadPool.class);
320
321
322    private static boolean debug = logger.isDebugEnabled();
323
324
325    volatile boolean isWorking = false;
326
327
328    /**
329     * @param pool ThreadPool.
330     */
331    public PoolThread(ThreadPool pool) {
332        this.pool = pool;
333    }
334
335
336    /**
337     * Run the thread.
338     */
339    @Override
340    public void run() {
341        logger.info("ready");
342        Runnable job;
343        int done = 0;
344        long time = 0;
345        long t;
346        boolean running = true;
347        while (running) {
348            try {
349                logger.debug("looking for a job");
350                job = pool.getJob();
351                if (job == null) {
352                    break;
353                }
354                if (debug) {
355                    logger.info("working");
356                }
357                t = System.currentTimeMillis();
358                isWorking = true;
359                job.run();
360                isWorking = false;
361                time += System.currentTimeMillis() - t;
362                done++;
363                if (debug) {
364                    logger.info("done");
365                }
366            } catch (InterruptedException e) {
367                Thread.currentThread().interrupt();
368                running = false;
369                isWorking = false;
370            } catch (PreemptingException e) {
371                logger.debug("catched " + e);
372                //e.printStackTrace();
373            } catch (RuntimeException e) {
374                logger.warn("catched " + e);
375                e.printStackTrace();
376            }
377        }
378        isWorking = false;
379        logger.info("terminated, done " + done + " jobs in " + time + " milliseconds");
380    }
381
382}