001/*
002 * $Id$
003 */
004
005// package edu.unima.ky.parallel;
006package edu.jas.util;
007
008
009import java.util.LinkedList;
010
011import org.apache.logging.log4j.Logger;
012import org.apache.logging.log4j.LogManager; 
013
014import edu.jas.kern.PreemptingException;
015
016
017/**
018 * Thread pool using stack / list workpile.
019 * @author Akitoshi Yoshida
020 * @author Heinz Kredel
021 */
022
023public class ThreadPool {
024
025
026    /**
027     * Default number of threads to use.
028     */
029    static final int DEFAULT_SIZE = 3;
030
031
032    /**
033     * Number of threads to use.
034     */
035    final int size;
036
037
038    /**
039     * Array of workers.
040     */
041    protected PoolThread[] workers;
042
043
044    /**
045     * Number of idle workers.
046     */
047    protected int idleworkers = 0;
048
049
050    /**
051     * Shutdown request.
052     */
053    protected volatile boolean shutdown = false;
054
055
056    /**
057     * Work queue / stack.
058     */
059    // should be expressed using strategy pattern
060    // List or Collection is not appropriate
061    // LIFO strategy for recursion
062    protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
063
064
065    protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
066
067
068    private static final Logger logger = LogManager.getLogger(ThreadPool.class);
069
070
071    private static final boolean debug = logger.isDebugEnabled();
072
073
074    /**
075     * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and
076     * size DEFAULT_SIZE.
077     */
078    public ThreadPool() {
079        this(StrategyEnumeration.FIFO, DEFAULT_SIZE);
080    }
081
082
083    /**
084     * Constructs a new ThreadPool with size DEFAULT_SIZE.
085     * @param strategy for job processing.
086     */
087    public ThreadPool(StrategyEnumeration strategy) {
088        this(strategy, DEFAULT_SIZE);
089    }
090
091
092    /**
093     * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO.
094     * @param size of the pool.
095     */
096    public ThreadPool(int size) {
097        this(StrategyEnumeration.FIFO, size);
098    }
099
100
101    /**
102     * Constructs a new ThreadPool.
103     * @param strategy for job processing.
104     * @param size of the pool.
105     */
106    public ThreadPool(StrategyEnumeration strategy, int size) {
107        this.size = size;
108        this.strategy = strategy;
109        jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
110        workers = new PoolThread[0];
111    }
112
113
114    /**
115     * thread initialization and start.
116     */
117    public void init() {
118        if (workers == null || workers.length == 0) {
119            workers = new PoolThread[size];
120            for (int i = 0; i < workers.length; i++) {
121                workers[i] = new PoolThread(this);
122                workers[i].start();
123            }
124            logger.info("size = {}, strategy = {}", size, strategy);
125        }
126        if (debug) {
127            Thread.dumpStack();
128        }
129    }
130
131
132    /**
133     * toString.
134     */
135    @Override
136    public String toString() {
137        return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs="
138                        + jobstack.size() + ")";
139    }
140
141
142    /**
143     * number of worker threads.
144     */
145    public int getNumber() {
146        return size;
147        //if (workers == null || workers.length < size) {
148        //    init(); // start threads
149        //}
150        //return workers.length; // not null
151    }
152
153
154    /**
155     * get used strategy.
156     */
157    public StrategyEnumeration getStrategy() {
158        return strategy;
159    }
160
161
162    /**
163     * Terminates the threads.
164     */
165    public void terminate() {
166        while (hasJobs()) {
167            try {
168                Thread.sleep(100);
169                //logger.info("waiting for termination in {}", this);
170            } catch (InterruptedException e) {
171                Thread.currentThread().interrupt();
172            }
173        }
174        if (workers == null) {
175            return;
176        }
177        for (int i = 0; i < workers.length; i++) {
178            if (workers[i] == null) {
179                continue;
180            }
181            try {
182                while (workers[i].isAlive()) {
183                    workers[i].interrupt();
184                    workers[i].join(100);
185                }
186            } catch (InterruptedException e) {
187                Thread.currentThread().interrupt();
188            }
189        }
190    }
191
192
193    /**
194     * Cancels the threads.
195     */
196    public int cancel() {
197        shutdown = true;
198        int s = jobstack.size();
199        if (hasJobs()) {
200            synchronized (this) {
201                logger.info("jobs canceled: {}", jobstack);
202                jobstack.clear();
203                notifyAll(); // for getJob
204            }
205        }
206        //int re = 0;
207        if (workers == null) {
208            return s;
209        }
210        for (int i = 0; i < workers.length; i++) {
211            if (workers[i] == null) {
212                continue;
213            }
214            try {
215                while (workers[i].isAlive()) {
216                    synchronized (this) {
217                        shutdown = true;
218                        notifyAll(); // for getJob
219                        workers[i].interrupt();
220                    }
221                    //re++;
222                    //if ( re > 3 * workers.length ) {
223                    //    logger.info("give up on: {}", workers[i]);
224                    //    break; // give up
225                    //}
226                    workers[i].join(100);
227                }
228            } catch (InterruptedException e) {
229                Thread.currentThread().interrupt();
230            }
231        }
232        return s;
233    }
234
235
236    /**
237     * adds a job to the workpile.
238     * @param job
239     */
240    public synchronized void addJob(Runnable job) {
241        if (workers == null || workers.length < size) {
242            init(); // start threads
243        }
244        jobstack.addLast(job);
245        logger.debug("adding job");
246        if (idleworkers > 0) {
247            logger.debug("notifying a jobless worker");
248            notifyAll();
249        }
250    }
251
252
253    /**
254     * get a job for processing.
255     */
256    protected synchronized Runnable getJob() throws InterruptedException {
257        while (jobstack.isEmpty()) {
258            idleworkers++;
259            logger.debug("waiting");
260            wait(1000);
261            idleworkers--;
262            if (shutdown) {
263                throw new InterruptedException("shutdown in getJob");
264            }
265        }
266        // is expressed using strategy enumeration
267        if (strategy == StrategyEnumeration.LIFO) {
268            return jobstack.removeLast(); // LIFO
269        }
270        return jobstack.removeFirst(); // FIFO
271    }
272
273
274    /**
275     * check if there are jobs for processing.
276     */
277    public boolean hasJobs() {
278        if (jobstack.size() > 0) {
279            return true;
280        }
281        for (int i = 0; i < workers.length; i++) {
282            if (workers[i] == null) {
283                continue;
284            }
285            if (workers[i].isWorking) {
286                return true;
287            }
288        }
289        return false;
290    }
291
292
293    /**
294     * check if there are more than n jobs for processing.
295     * @param n Integer
296     * @return true, if there are possibly more than n jobs.
297     */
298    public boolean hasJobs(int n) {
299        int j = jobstack.size();
300        if (j > 0 && (j + workers.length > n)) {
301            return true;
302        }
303        // if j > 0 no worker should be idle
304        // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
305        int x = 0;
306        for (int i = 0; i < workers.length; i++) {
307            if (workers[i] == null) {
308                continue;
309            }
310            if (workers[i].isWorking) {
311                x++;
312            }
313        }
314        if ((j + x) > n) {
315            return true;
316        }
317        return false;
318    }
319
320}
321
322
323/**
324 * Implements one Thread of the pool.
325 */
326class PoolThread extends Thread {
327
328
329    ThreadPool pool;
330
331
332    private static final Logger logger = LogManager.getLogger(PoolThread.class);
333
334
335    private static final boolean debug = logger.isDebugEnabled();
336
337
338    volatile boolean isWorking = false;
339
340
341    /**
342     * @param pool ThreadPool.
343     */
344    public PoolThread(ThreadPool pool) {
345        this.pool = pool;
346    }
347
348
349    /**
350     * Run the thread.
351     */
352    @Override
353    public void run() {
354        logger.info("ready");
355        Runnable job;
356        int done = 0;
357        long time = 0;
358        long t;
359        boolean running = true;
360        while (running) {
361            try {
362                logger.debug("looking for a job");
363                job = pool.getJob();
364                if (job == null) {
365                    break;
366                }
367                if (debug) {
368                    logger.info("working");
369                }
370                t = System.currentTimeMillis();
371                isWorking = true;
372                job.run();
373                isWorking = false;
374                time += System.currentTimeMillis() - t;
375                done++;
376                if (debug) {
377                    logger.info("done");
378                }
379                if (Thread.currentThread().isInterrupted()) {
380                    running = false;
381                    isWorking = false;
382                    //throw new RuntimeException("interrupt in while(running) loop");
383                }
384            } catch (InterruptedException e) {
385                Thread.currentThread().interrupt();
386                running = false;
387                isWorking = false;
388            } catch (PreemptingException e) {
389                logger.debug("caught {}", e);
390                //e.printStackTrace();
391            } catch (RuntimeException e) {
392                logger.warn("caught {}", e);
393                e.printStackTrace();
394            }
395        }
396        isWorking = false;
397        logger.info("terminated, done {} jobs in {} milliseconds", done, time);
398    }
399
400}