001/* 002 * $Id: ThreadPool.java 4962 2014-10-17 19:05:55Z kredel $ 003 */ 004 005// package edu.unima.ky.parallel; 006package edu.jas.util; 007 008 009import java.util.LinkedList; 010 011import org.apache.log4j.Logger; 012 013import edu.jas.kern.PreemptingException; 014 015 016/** 017 * Thread pool using stack / list workpile. 018 * @author Akitoshi Yoshida 019 * @author Heinz Kredel 020 */ 021 022public class ThreadPool { 023 024 025 /** 026 * Default number of threads to use. 027 */ 028 static final int DEFAULT_SIZE = 3; 029 030 031 /** 032 * Number of threads to use. 033 */ 034 final int size; 035 036 037 /** 038 * Array of workers. 039 */ 040 protected PoolThread[] workers; 041 042 043 /** 044 * Number of idle workers. 045 */ 046 protected int idleworkers = 0; 047 048 049 /** 050 * Shutdown request. 051 */ 052 protected volatile boolean shutdown = false; 053 054 055 /** 056 * Work queue / stack. 057 */ 058 // should be expressed using strategy pattern 059 // List or Collection is not appropriate 060 // LIFO strategy for recursion 061 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 062 063 064 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 065 066 067 private static final Logger logger = Logger.getLogger(ThreadPool.class); 068 069 070 private static boolean debug = logger.isDebugEnabled(); 071 072 073 /** 074 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and 075 * size DEFAULT_SIZE. 076 */ 077 public ThreadPool() { 078 this(StrategyEnumeration.FIFO, DEFAULT_SIZE); 079 } 080 081 082 /** 083 * Constructs a new ThreadPool with size DEFAULT_SIZE. 084 * @param strategy for job processing. 085 */ 086 public ThreadPool(StrategyEnumeration strategy) { 087 this(strategy, DEFAULT_SIZE); 088 } 089 090 091 /** 092 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO. 093 * @param size of the pool. 094 */ 095 public ThreadPool(int size) { 096 this(StrategyEnumeration.FIFO, size); 097 } 098 099 100 /** 101 * Constructs a new ThreadPool. 102 * @param strategy for job processing. 103 * @param size of the pool. 104 */ 105 public ThreadPool(StrategyEnumeration strategy, int size) { 106 this.size = size; 107 this.strategy = strategy; 108 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 109 workers = new PoolThread[0]; 110 } 111 112 113 /** 114 * thread initialization and start. 115 */ 116 public void init() { 117 if (workers == null || workers.length == 0) { 118 workers = new PoolThread[size]; 119 for (int i = 0; i < workers.length; i++) { 120 workers[i] = new PoolThread(this); 121 workers[i].start(); 122 } 123 logger.info("size = " + size + ", strategy = " + strategy); 124 } 125 if (debug) { 126 Thread.dumpStack(); 127 } 128 } 129 130 131 /** 132 * toString. 133 */ 134 @Override 135 public String toString() { 136 return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs=" 137 + jobstack.size() + ")"; 138 } 139 140 141 /** 142 * number of worker threads. 143 */ 144 public int getNumber() { 145 return size; 146 //if (workers == null || workers.length < size) { 147 // init(); // start threads 148 //} 149 //return workers.length; // not null 150 } 151 152 153 /** 154 * get used strategy. 155 */ 156 public StrategyEnumeration getStrategy() { 157 return strategy; 158 } 159 160 161 /** 162 * Terminates the threads. 163 */ 164 public void terminate() { 165 while (hasJobs()) { 166 try { 167 Thread.sleep(100); 168 //logger.info("waiting for termination in " + this); 169 } catch (InterruptedException e) { 170 Thread.currentThread().interrupt(); 171 } 172 } 173 for (int i = 0; i < workers.length; i++) { 174 try { 175 while (workers[i].isAlive()) { 176 workers[i].interrupt(); 177 workers[i].join(100); 178 } 179 } catch (InterruptedException e) { 180 Thread.currentThread().interrupt(); 181 } 182 } 183 } 184 185 186 /** 187 * Cancels the threads. 188 */ 189 public int cancel() { 190 shutdown = true; 191 int s = jobstack.size(); 192 if (hasJobs()) { 193 synchronized (this) { 194 logger.info("jobs canceled: " + jobstack); 195 jobstack.clear(); 196 notifyAll(); // for getJob 197 } 198 } 199 //int re = 0; 200 for (int i = 0; i < workers.length; i++) { 201 if (workers[i] == null) { 202 continue; 203 } 204 try { 205 while (workers[i].isAlive()) { 206 synchronized (this) { 207 shutdown = true; 208 notifyAll(); // for getJob 209 workers[i].interrupt(); 210 } 211 //re++; 212 //if ( re > 3 * workers.length ) { 213 // logger.info("give up on: " + workers[i]); 214 // break; // give up 215 //} 216 workers[i].join(100); 217 } 218 } catch (InterruptedException e) { 219 Thread.currentThread().interrupt(); 220 } 221 } 222 return s; 223 } 224 225 226 /** 227 * adds a job to the workpile. 228 * @param job 229 */ 230 public synchronized void addJob(Runnable job) { 231 if (workers == null || workers.length < size) { 232 init(); // start threads 233 } 234 jobstack.addLast(job); 235 logger.debug("adding job"); 236 if (idleworkers > 0) { 237 logger.debug("notifying a jobless worker"); 238 notifyAll(); 239 } 240 } 241 242 243 /** 244 * get a job for processing. 245 */ 246 protected synchronized Runnable getJob() throws InterruptedException { 247 while (jobstack.isEmpty()) { 248 idleworkers++; 249 logger.debug("waiting"); 250 wait(1000); 251 idleworkers--; 252 if (shutdown) { 253 throw new InterruptedException("shutdown in getJob"); 254 } 255 } 256 // is expressed using strategy enumeration 257 if (strategy == StrategyEnumeration.LIFO) { 258 return jobstack.removeLast(); // LIFO 259 } 260 return jobstack.removeFirst(); // FIFO 261 } 262 263 264 /** 265 * check if there are jobs for processing. 266 */ 267 public boolean hasJobs() { 268 if (jobstack.size() > 0) { 269 return true; 270 } 271 for (int i = 0; i < workers.length; i++) { 272 if (workers[i] == null) { 273 continue; 274 } 275 if (workers[i].isWorking) { 276 return true; 277 } 278 } 279 return false; 280 } 281 282 283 /** 284 * check if there are more than n jobs for processing. 285 * @param n Integer 286 * @return true, if there are possibly more than n jobs. 287 */ 288 public boolean hasJobs(int n) { 289 int j = jobstack.size(); 290 if (j > 0 && (j + workers.length > n)) { 291 return true; 292 } 293 // if j > 0 no worker should be idle 294 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 295 int x = 0; 296 for (int i = 0; i < workers.length; i++) { 297 if (workers[i] == null) { 298 continue; 299 } 300 if (workers[i].isWorking) { 301 x++; 302 } 303 } 304 if ((j + x) > n) { 305 return true; 306 } 307 return false; 308 } 309 310} 311 312 313/** 314 * Implements one Thread of the pool. 315 */ 316class PoolThread extends Thread { 317 318 319 ThreadPool pool; 320 321 322 private static final Logger logger = Logger.getLogger(PoolThread.class); 323 324 325 private static boolean debug = logger.isDebugEnabled(); 326 327 328 volatile boolean isWorking = false; 329 330 331 /** 332 * @param pool ThreadPool. 333 */ 334 public PoolThread(ThreadPool pool) { 335 this.pool = pool; 336 } 337 338 339 /** 340 * Run the thread. 341 */ 342 @Override 343 public void run() { 344 logger.info("ready"); 345 Runnable job; 346 int done = 0; 347 long time = 0; 348 long t; 349 boolean running = true; 350 while (running) { 351 try { 352 logger.debug("looking for a job"); 353 job = pool.getJob(); 354 if (job == null) { 355 break; 356 } 357 if (debug) { 358 logger.info("working"); 359 } 360 t = System.currentTimeMillis(); 361 isWorking = true; 362 job.run(); 363 isWorking = false; 364 time += System.currentTimeMillis() - t; 365 done++; 366 if (debug) { 367 logger.info("done"); 368 } 369 if (Thread.currentThread().isInterrupted()) { 370 running = false; 371 isWorking = false; 372 //throw new RuntimeException("interrupt in while(running) loop"); 373 } 374 } catch (InterruptedException e) { 375 Thread.currentThread().interrupt(); 376 running = false; 377 isWorking = false; 378 } catch (PreemptingException e) { 379 logger.debug("catched " + e); 380 //e.printStackTrace(); 381 } catch (RuntimeException e) { 382 logger.warn("catched " + e); 383 e.printStackTrace(); 384 } 385 } 386 isWorking = false; 387 logger.info("terminated, done " + done + " jobs in " + time + " milliseconds"); 388 } 389 390}