001 /*
002 * $Id: ThreadPool.java 3571 2011-03-18 22:02:51Z kredel $
003 */
004
005 // package edu.unima.ky.parallel;
006 package edu.jas.util;
007
008
009 import java.util.LinkedList;
010
011 import org.apache.log4j.Logger;
012
013
014 /**
015 * Thread pool using stack / list workpile.
016 * @author Akitoshi Yoshida
017 * @author Heinz Kredel
018 */
019
020 public class ThreadPool {
021
022
023 /**
024 * Default number of threads to use.
025 */
026 static final int DEFAULT_SIZE = 3;
027
028
029 /**
030 * Number of threads to use.
031 */
032 final int size;
033
034
035 /**
036 * Array of workers.
037 */
038 protected PoolThread[] workers;
039
040
041 /**
042 * Number of idle workers.
043 */
044 protected int idleworkers = 0;
045
046
047 /**
048 * Shutdown request.
049 */
050 protected volatile boolean shutdown = false;
051
052
053 /**
054 * Work queue / stack.
055 */
056 // should be expressed using strategy pattern
057 // List or Collection is not appropriate
058 // LIFO strategy for recursion
059 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
060
061
062 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
063
064
065 private static final Logger logger = Logger.getLogger(ThreadPool.class);
066
067
068 private static boolean debug = logger.isDebugEnabled();
069
070
071 /**
072 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and
073 * size DEFAULT_SIZE.
074 */
075 public ThreadPool() {
076 this(StrategyEnumeration.FIFO, DEFAULT_SIZE);
077 }
078
079
080 /**
081 * Constructs a new ThreadPool with size DEFAULT_SIZE.
082 * @param strategy for job processing.
083 */
084 public ThreadPool(StrategyEnumeration strategy) {
085 this(strategy, DEFAULT_SIZE);
086 }
087
088
089 /**
090 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO.
091 * @param size of the pool.
092 */
093 public ThreadPool(int size) {
094 this(StrategyEnumeration.FIFO, size);
095 }
096
097
098 /**
099 * Constructs a new ThreadPool.
100 * @param strategy for job processing.
101 * @param size of the pool.
102 */
103 public ThreadPool(StrategyEnumeration strategy, int size) {
104 this.size = size;
105 this.strategy = strategy;
106 jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
107 workers = new PoolThread[0];
108 }
109
110
111 /**
112 * thread initialization and start.
113 */
114 public void init() {
115 if (workers == null || workers.length == 0) {
116 workers = new PoolThread[size];
117 for (int i = 0; i < workers.length; i++) {
118 workers[i] = new PoolThread(this);
119 workers[i].start();
120 }
121 logger.info("size = " + size + ", strategy = " + strategy);
122 }
123 if (false) { // debug
124 Thread.dumpStack();
125 }
126 }
127
128
129 /**
130 * toString.
131 */
132 @Override
133 public String toString() {
134 return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs="
135 + jobstack.size() + ")";
136 }
137
138
139 /**
140 * number of worker threads.
141 */
142 public int getNumber() {
143 if (workers == null || workers.length < size) {
144 init(); // start threads
145 }
146 return workers.length; // not null
147 }
148
149
150 /**
151 * get used strategy.
152 */
153 public StrategyEnumeration getStrategy() {
154 return strategy;
155 }
156
157
158 /**
159 * Terminates the threads.
160 */
161 public void terminate() {
162 while (hasJobs()) {
163 try {
164 Thread.sleep(100);
165 //logger.info("waiting for termination in " + this);
166 } catch (InterruptedException e) {
167 Thread.currentThread().interrupt();
168 }
169 }
170 for (int i = 0; i < workers.length; i++) {
171 try {
172 while (workers[i].isAlive()) {
173 workers[i].interrupt();
174 workers[i].join(100);
175 }
176 } catch (InterruptedException e) {
177 Thread.currentThread().interrupt();
178 }
179 }
180 }
181
182
183 /**
184 * Cancels the threads.
185 */
186 public int cancel() {
187 shutdown = true;
188 int s = jobstack.size();
189 if (hasJobs()) {
190 synchronized (this) {
191 logger.info("jobs canceled: " + jobstack);
192 jobstack.clear();
193 }
194 }
195 int re = 0;
196 for (int i = 0; i < workers.length; i++) {
197 if (workers[i] == null) {
198 continue;
199 }
200 try {
201 while (workers[i].isAlive()) {
202 synchronized (this) {
203 shutdown = true;
204 notifyAll(); // for getJob
205 workers[i].interrupt();
206 }
207 re++;
208 //if ( re > 3 * workers.length ) {
209 // break; // give up
210 //}
211 workers[i].join(100);
212 }
213 } catch (InterruptedException e) {
214 Thread.currentThread().interrupt();
215 }
216 }
217 return s;
218 }
219
220
221 /**
222 * adds a job to the workpile.
223 * @param job
224 */
225 public synchronized void addJob(Runnable job) {
226 if (workers == null || workers.length < size) {
227 init(); // start threads
228 }
229 jobstack.addLast(job);
230 logger.debug("adding job");
231 if (idleworkers > 0) {
232 logger.debug("notifying a jobless worker");
233 notifyAll();
234 }
235 }
236
237
238 /**
239 * get a job for processing.
240 */
241 protected synchronized Runnable getJob() throws InterruptedException {
242 while (jobstack.isEmpty()) {
243 idleworkers++;
244 logger.debug("waiting");
245 wait();
246 idleworkers--;
247 if (shutdown) {
248 throw new InterruptedException("shutdown in getJob");
249 }
250 }
251 // is expressed using strategy enumeration
252 if (strategy == StrategyEnumeration.LIFO) {
253 return jobstack.removeLast(); // LIFO
254 }
255 return jobstack.removeFirst(); // FIFO
256 }
257
258
259 /**
260 * check if there are jobs for processing.
261 */
262 public boolean hasJobs() {
263 if (jobstack.size() > 0) {
264 return true;
265 }
266 for (int i = 0; i < workers.length; i++) {
267 if (workers[i] == null) {
268 continue;
269 }
270 if (workers[i].isWorking) {
271 return true;
272 }
273 }
274 return false;
275 }
276
277
278 /**
279 * check if there are more than n jobs for processing.
280 * @param n Integer
281 * @return true, if there are possibly more than n jobs.
282 */
283 public boolean hasJobs(int n) {
284 int j = jobstack.size();
285 if (j > 0 && (j + workers.length > n)) {
286 return true;
287 }
288 // if j > 0 no worker should be idle
289 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
290 int x = 0;
291 for (int i = 0; i < workers.length; i++) {
292 if (workers[i] == null) {
293 continue;
294 }
295 if (workers[i].isWorking) {
296 x++;
297 }
298 }
299 if ((j + x) > n) {
300 return true;
301 }
302 return false;
303 }
304
305 }
306
307
308 /**
309 * Implements one Thread of the pool.
310 */
311 class PoolThread extends Thread {
312
313
314 ThreadPool pool;
315
316
317 private static final Logger logger = Logger.getLogger(ThreadPool.class);
318
319
320 private static boolean debug = logger.isDebugEnabled();
321
322
323 volatile boolean isWorking = false;
324
325
326 /**
327 * @param pool ThreadPool.
328 */
329 public PoolThread(ThreadPool pool) {
330 this.pool = pool;
331 }
332
333
334 /**
335 * Run the thread.
336 */
337 @Override
338 public void run() {
339 logger.info("ready");
340 Runnable job;
341 int done = 0;
342 long time = 0;
343 long t;
344 boolean running = true;
345 while (running) {
346 try {
347 logger.debug("looking for a job");
348 job = pool.getJob();
349 if (job == null) {
350 break;
351 }
352 if (debug) {
353 logger.info("working");
354 }
355 t = System.currentTimeMillis();
356 isWorking = true;
357 job.run();
358 isWorking = false;
359 time += System.currentTimeMillis() - t;
360 done++;
361 if (debug) {
362 logger.info("done");
363 }
364 } catch (InterruptedException e) {
365 Thread.currentThread().interrupt();
366 running = false;
367 isWorking = false;
368 } catch (RuntimeException e) {
369 logger.warn("catched " + e);
370 }
371 }
372 isWorking = false;
373 logger.info("terminated, done " + done + " jobs in " + time + " milliseconds");
374 }
375
376 }