001 /* 002 * $Id: ThreadPool.java 3571 2011-03-18 22:02:51Z kredel $ 003 */ 004 005 // package edu.unima.ky.parallel; 006 package edu.jas.util; 007 008 009 import java.util.LinkedList; 010 011 import org.apache.log4j.Logger; 012 013 014 /** 015 * Thread pool using stack / list workpile. 016 * @author Akitoshi Yoshida 017 * @author Heinz Kredel 018 */ 019 020 public class ThreadPool { 021 022 023 /** 024 * Default number of threads to use. 025 */ 026 static final int DEFAULT_SIZE = 3; 027 028 029 /** 030 * Number of threads to use. 031 */ 032 final int size; 033 034 035 /** 036 * Array of workers. 037 */ 038 protected PoolThread[] workers; 039 040 041 /** 042 * Number of idle workers. 043 */ 044 protected int idleworkers = 0; 045 046 047 /** 048 * Shutdown request. 049 */ 050 protected volatile boolean shutdown = false; 051 052 053 /** 054 * Work queue / stack. 055 */ 056 // should be expressed using strategy pattern 057 // List or Collection is not appropriate 058 // LIFO strategy for recursion 059 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 060 061 062 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 063 064 065 private static final Logger logger = Logger.getLogger(ThreadPool.class); 066 067 068 private static boolean debug = logger.isDebugEnabled(); 069 070 071 /** 072 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and 073 * size DEFAULT_SIZE. 074 */ 075 public ThreadPool() { 076 this(StrategyEnumeration.FIFO, DEFAULT_SIZE); 077 } 078 079 080 /** 081 * Constructs a new ThreadPool with size DEFAULT_SIZE. 082 * @param strategy for job processing. 083 */ 084 public ThreadPool(StrategyEnumeration strategy) { 085 this(strategy, DEFAULT_SIZE); 086 } 087 088 089 /** 090 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO. 091 * @param size of the pool. 092 */ 093 public ThreadPool(int size) { 094 this(StrategyEnumeration.FIFO, size); 095 } 096 097 098 /** 099 * Constructs a new ThreadPool. 100 * @param strategy for job processing. 101 * @param size of the pool. 102 */ 103 public ThreadPool(StrategyEnumeration strategy, int size) { 104 this.size = size; 105 this.strategy = strategy; 106 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 107 workers = new PoolThread[0]; 108 } 109 110 111 /** 112 * thread initialization and start. 113 */ 114 public void init() { 115 if (workers == null || workers.length == 0) { 116 workers = new PoolThread[size]; 117 for (int i = 0; i < workers.length; i++) { 118 workers[i] = new PoolThread(this); 119 workers[i].start(); 120 } 121 logger.info("size = " + size + ", strategy = " + strategy); 122 } 123 if (false) { // debug 124 Thread.dumpStack(); 125 } 126 } 127 128 129 /** 130 * toString. 131 */ 132 @Override 133 public String toString() { 134 return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs=" 135 + jobstack.size() + ")"; 136 } 137 138 139 /** 140 * number of worker threads. 141 */ 142 public int getNumber() { 143 if (workers == null || workers.length < size) { 144 init(); // start threads 145 } 146 return workers.length; // not null 147 } 148 149 150 /** 151 * get used strategy. 152 */ 153 public StrategyEnumeration getStrategy() { 154 return strategy; 155 } 156 157 158 /** 159 * Terminates the threads. 160 */ 161 public void terminate() { 162 while (hasJobs()) { 163 try { 164 Thread.sleep(100); 165 //logger.info("waiting for termination in " + this); 166 } catch (InterruptedException e) { 167 Thread.currentThread().interrupt(); 168 } 169 } 170 for (int i = 0; i < workers.length; i++) { 171 try { 172 while (workers[i].isAlive()) { 173 workers[i].interrupt(); 174 workers[i].join(100); 175 } 176 } catch (InterruptedException e) { 177 Thread.currentThread().interrupt(); 178 } 179 } 180 } 181 182 183 /** 184 * Cancels the threads. 185 */ 186 public int cancel() { 187 shutdown = true; 188 int s = jobstack.size(); 189 if (hasJobs()) { 190 synchronized (this) { 191 logger.info("jobs canceled: " + jobstack); 192 jobstack.clear(); 193 } 194 } 195 int re = 0; 196 for (int i = 0; i < workers.length; i++) { 197 if (workers[i] == null) { 198 continue; 199 } 200 try { 201 while (workers[i].isAlive()) { 202 synchronized (this) { 203 shutdown = true; 204 notifyAll(); // for getJob 205 workers[i].interrupt(); 206 } 207 re++; 208 //if ( re > 3 * workers.length ) { 209 // break; // give up 210 //} 211 workers[i].join(100); 212 } 213 } catch (InterruptedException e) { 214 Thread.currentThread().interrupt(); 215 } 216 } 217 return s; 218 } 219 220 221 /** 222 * adds a job to the workpile. 223 * @param job 224 */ 225 public synchronized void addJob(Runnable job) { 226 if (workers == null || workers.length < size) { 227 init(); // start threads 228 } 229 jobstack.addLast(job); 230 logger.debug("adding job"); 231 if (idleworkers > 0) { 232 logger.debug("notifying a jobless worker"); 233 notifyAll(); 234 } 235 } 236 237 238 /** 239 * get a job for processing. 240 */ 241 protected synchronized Runnable getJob() throws InterruptedException { 242 while (jobstack.isEmpty()) { 243 idleworkers++; 244 logger.debug("waiting"); 245 wait(); 246 idleworkers--; 247 if (shutdown) { 248 throw new InterruptedException("shutdown in getJob"); 249 } 250 } 251 // is expressed using strategy enumeration 252 if (strategy == StrategyEnumeration.LIFO) { 253 return jobstack.removeLast(); // LIFO 254 } 255 return jobstack.removeFirst(); // FIFO 256 } 257 258 259 /** 260 * check if there are jobs for processing. 261 */ 262 public boolean hasJobs() { 263 if (jobstack.size() > 0) { 264 return true; 265 } 266 for (int i = 0; i < workers.length; i++) { 267 if (workers[i] == null) { 268 continue; 269 } 270 if (workers[i].isWorking) { 271 return true; 272 } 273 } 274 return false; 275 } 276 277 278 /** 279 * check if there are more than n jobs for processing. 280 * @param n Integer 281 * @return true, if there are possibly more than n jobs. 282 */ 283 public boolean hasJobs(int n) { 284 int j = jobstack.size(); 285 if (j > 0 && (j + workers.length > n)) { 286 return true; 287 } 288 // if j > 0 no worker should be idle 289 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 290 int x = 0; 291 for (int i = 0; i < workers.length; i++) { 292 if (workers[i] == null) { 293 continue; 294 } 295 if (workers[i].isWorking) { 296 x++; 297 } 298 } 299 if ((j + x) > n) { 300 return true; 301 } 302 return false; 303 } 304 305 } 306 307 308 /** 309 * Implements one Thread of the pool. 310 */ 311 class PoolThread extends Thread { 312 313 314 ThreadPool pool; 315 316 317 private static final Logger logger = Logger.getLogger(ThreadPool.class); 318 319 320 private static boolean debug = logger.isDebugEnabled(); 321 322 323 volatile boolean isWorking = false; 324 325 326 /** 327 * @param pool ThreadPool. 328 */ 329 public PoolThread(ThreadPool pool) { 330 this.pool = pool; 331 } 332 333 334 /** 335 * Run the thread. 336 */ 337 @Override 338 public void run() { 339 logger.info("ready"); 340 Runnable job; 341 int done = 0; 342 long time = 0; 343 long t; 344 boolean running = true; 345 while (running) { 346 try { 347 logger.debug("looking for a job"); 348 job = pool.getJob(); 349 if (job == null) { 350 break; 351 } 352 if (debug) { 353 logger.info("working"); 354 } 355 t = System.currentTimeMillis(); 356 isWorking = true; 357 job.run(); 358 isWorking = false; 359 time += System.currentTimeMillis() - t; 360 done++; 361 if (debug) { 362 logger.info("done"); 363 } 364 } catch (InterruptedException e) { 365 Thread.currentThread().interrupt(); 366 running = false; 367 isWorking = false; 368 } catch (RuntimeException e) { 369 logger.warn("catched " + e); 370 } 371 } 372 isWorking = false; 373 logger.info("terminated, done " + done + " jobs in " + time + " milliseconds"); 374 } 375 376 }