001/*
002 * $Id: ThreadPool.java 4962 2014-10-17 19:05:55Z kredel $
003 */
004
005// package edu.unima.ky.parallel;
006package edu.jas.util;
007
008
009import java.util.LinkedList;
010
011import org.apache.log4j.Logger;
012
013import edu.jas.kern.PreemptingException;
014
015
016/**
017 * Thread pool using stack / list workpile.
018 * @author Akitoshi Yoshida
019 * @author Heinz Kredel
020 */
021
022public class ThreadPool {
023
024
025    /**
026     * Default number of threads to use.
027     */
028    static final int DEFAULT_SIZE = 3;
029
030
031    /**
032     * Number of threads to use.
033     */
034    final int size;
035
036
037    /**
038     * Array of workers.
039     */
040    protected PoolThread[] workers;
041
042
043    /**
044     * Number of idle workers.
045     */
046    protected int idleworkers = 0;
047
048
049    /**
050     * Shutdown request.
051     */
052    protected volatile boolean shutdown = false;
053
054
055    /**
056     * Work queue / stack.
057     */
058    // should be expressed using strategy pattern
059    // List or Collection is not appropriate
060    // LIFO strategy for recursion
061    protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
062
063
064    protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
065
066
067    private static final Logger logger = Logger.getLogger(ThreadPool.class);
068
069
070    private static boolean debug = logger.isDebugEnabled();
071
072
073    /**
074     * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and
075     * size DEFAULT_SIZE.
076     */
077    public ThreadPool() {
078        this(StrategyEnumeration.FIFO, DEFAULT_SIZE);
079    }
080
081
082    /**
083     * Constructs a new ThreadPool with size DEFAULT_SIZE.
084     * @param strategy for job processing.
085     */
086    public ThreadPool(StrategyEnumeration strategy) {
087        this(strategy, DEFAULT_SIZE);
088    }
089
090
091    /**
092     * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO.
093     * @param size of the pool.
094     */
095    public ThreadPool(int size) {
096        this(StrategyEnumeration.FIFO, size);
097    }
098
099
100    /**
101     * Constructs a new ThreadPool.
102     * @param strategy for job processing.
103     * @param size of the pool.
104     */
105    public ThreadPool(StrategyEnumeration strategy, int size) {
106        this.size = size;
107        this.strategy = strategy;
108        jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
109        workers = new PoolThread[0];
110    }
111
112
113    /**
114     * thread initialization and start.
115     */
116    public void init() {
117        if (workers == null || workers.length == 0) {
118            workers = new PoolThread[size];
119            for (int i = 0; i < workers.length; i++) {
120                workers[i] = new PoolThread(this);
121                workers[i].start();
122            }
123            logger.info("size = " + size + ", strategy = " + strategy);
124        }
125        if (debug) {
126            Thread.dumpStack();
127        }
128    }
129
130
131    /**
132     * toString.
133     */
134    @Override
135    public String toString() {
136        return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs="
137                        + jobstack.size() + ")";
138    }
139
140
141    /**
142     * number of worker threads.
143     */
144    public int getNumber() {
145        return size;
146        //if (workers == null || workers.length < size) {
147        //    init(); // start threads
148        //}
149        //return workers.length; // not null
150    }
151
152
153    /**
154     * get used strategy.
155     */
156    public StrategyEnumeration getStrategy() {
157        return strategy;
158    }
159
160
161    /**
162     * Terminates the threads.
163     */
164    public void terminate() {
165        while (hasJobs()) {
166            try {
167                Thread.sleep(100);
168                //logger.info("waiting for termination in " + this);
169            } catch (InterruptedException e) {
170                Thread.currentThread().interrupt();
171            }
172        }
173        for (int i = 0; i < workers.length; i++) {
174            try {
175                while (workers[i].isAlive()) {
176                    workers[i].interrupt();
177                    workers[i].join(100);
178                }
179            } catch (InterruptedException e) {
180                Thread.currentThread().interrupt();
181            }
182        }
183    }
184
185
186    /**
187     * Cancels the threads.
188     */
189    public int cancel() {
190        shutdown = true;
191        int s = jobstack.size();
192        if (hasJobs()) {
193            synchronized (this) {
194                logger.info("jobs canceled: " + jobstack);
195                jobstack.clear();
196                notifyAll(); // for getJob
197            }
198        }
199        //int re = 0;
200        for (int i = 0; i < workers.length; i++) {
201            if (workers[i] == null) {
202                continue;
203            }
204            try {
205                while (workers[i].isAlive()) {
206                    synchronized (this) {
207                        shutdown = true;
208                        notifyAll(); // for getJob
209                        workers[i].interrupt();
210                    }
211                    //re++;
212                    //if ( re > 3 * workers.length ) {
213                    //    logger.info("give up on: " + workers[i]);
214                    //    break; // give up
215                    //}
216                    workers[i].join(100);
217                }
218            } catch (InterruptedException e) {
219                Thread.currentThread().interrupt();
220            }
221        }
222        return s;
223    }
224
225
226    /**
227     * adds a job to the workpile.
228     * @param job
229     */
230    public synchronized void addJob(Runnable job) {
231        if (workers == null || workers.length < size) {
232            init(); // start threads
233        }
234        jobstack.addLast(job);
235        logger.debug("adding job");
236        if (idleworkers > 0) {
237            logger.debug("notifying a jobless worker");
238            notifyAll();
239        }
240    }
241
242
243    /**
244     * get a job for processing.
245     */
246    protected synchronized Runnable getJob() throws InterruptedException {
247        while (jobstack.isEmpty()) {
248            idleworkers++;
249            logger.debug("waiting");
250            wait(1000);
251            idleworkers--;
252            if (shutdown) {
253                throw new InterruptedException("shutdown in getJob");
254            }
255        }
256        // is expressed using strategy enumeration
257        if (strategy == StrategyEnumeration.LIFO) {
258            return jobstack.removeLast(); // LIFO
259        }
260        return jobstack.removeFirst(); // FIFO
261    }
262
263
264    /**
265     * check if there are jobs for processing.
266     */
267    public boolean hasJobs() {
268        if (jobstack.size() > 0) {
269            return true;
270        }
271        for (int i = 0; i < workers.length; i++) {
272            if (workers[i] == null) {
273                continue;
274            }
275            if (workers[i].isWorking) {
276                return true;
277            }
278        }
279        return false;
280    }
281
282
283    /**
284     * check if there are more than n jobs for processing.
285     * @param n Integer
286     * @return true, if there are possibly more than n jobs.
287     */
288    public boolean hasJobs(int n) {
289        int j = jobstack.size();
290        if (j > 0 && (j + workers.length > n)) {
291            return true;
292        }
293        // if j > 0 no worker should be idle
294        // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
295        int x = 0;
296        for (int i = 0; i < workers.length; i++) {
297            if (workers[i] == null) {
298                continue;
299            }
300            if (workers[i].isWorking) {
301                x++;
302            }
303        }
304        if ((j + x) > n) {
305            return true;
306        }
307        return false;
308    }
309
310}
311
312
313/**
314 * Implements one Thread of the pool.
315 */
316class PoolThread extends Thread {
317
318
319    ThreadPool pool;
320
321
322    private static final Logger logger = Logger.getLogger(PoolThread.class);
323
324
325    private static boolean debug = logger.isDebugEnabled();
326
327
328    volatile boolean isWorking = false;
329
330
331    /**
332     * @param pool ThreadPool.
333     */
334    public PoolThread(ThreadPool pool) {
335        this.pool = pool;
336    }
337
338
339    /**
340     * Run the thread.
341     */
342    @Override
343    public void run() {
344        logger.info("ready");
345        Runnable job;
346        int done = 0;
347        long time = 0;
348        long t;
349        boolean running = true;
350        while (running) {
351            try {
352                logger.debug("looking for a job");
353                job = pool.getJob();
354                if (job == null) {
355                    break;
356                }
357                if (debug) {
358                    logger.info("working");
359                }
360                t = System.currentTimeMillis();
361                isWorking = true;
362                job.run();
363                isWorking = false;
364                time += System.currentTimeMillis() - t;
365                done++;
366                if (debug) {
367                    logger.info("done");
368                }
369                if (Thread.currentThread().isInterrupted()) {
370                    running = false;
371                    isWorking = false;
372                    //throw new RuntimeException("interrupt in while(running) loop");
373                }
374            } catch (InterruptedException e) {
375                Thread.currentThread().interrupt();
376                running = false;
377                isWorking = false;
378            } catch (PreemptingException e) {
379                logger.debug("catched " + e);
380                //e.printStackTrace();
381            } catch (RuntimeException e) {
382                logger.warn("catched " + e);
383                e.printStackTrace();
384            }
385        }
386        isWorking = false;
387        logger.info("terminated, done " + done + " jobs in " + time + " milliseconds");
388    }
389
390}