001/* 002 * $Id: ThreadPool.java 4254 2012-10-14 18:13:32Z kredel $ 003 */ 004 005// package edu.unima.ky.parallel; 006package edu.jas.util; 007 008 009import java.util.LinkedList; 010 011import org.apache.log4j.Logger; 012 013import edu.jas.kern.PreemptingException; 014 015/** 016 * Thread pool using stack / list workpile. 017 * @author Akitoshi Yoshida 018 * @author Heinz Kredel 019 */ 020 021public class ThreadPool { 022 023 024 /** 025 * Default number of threads to use. 026 */ 027 static final int DEFAULT_SIZE = 3; 028 029 030 /** 031 * Number of threads to use. 032 */ 033 final int size; 034 035 036 /** 037 * Array of workers. 038 */ 039 protected PoolThread[] workers; 040 041 042 /** 043 * Number of idle workers. 044 */ 045 protected int idleworkers = 0; 046 047 048 /** 049 * Shutdown request. 050 */ 051 protected volatile boolean shutdown = false; 052 053 054 /** 055 * Work queue / stack. 056 */ 057 // should be expressed using strategy pattern 058 // List or Collection is not appropriate 059 // LIFO strategy for recursion 060 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 061 062 063 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 064 065 066 private static final Logger logger = Logger.getLogger(ThreadPool.class); 067 068 069 private static boolean debug = logger.isDebugEnabled(); 070 071 072 /** 073 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and 074 * size DEFAULT_SIZE. 075 */ 076 public ThreadPool() { 077 this(StrategyEnumeration.FIFO, DEFAULT_SIZE); 078 } 079 080 081 /** 082 * Constructs a new ThreadPool with size DEFAULT_SIZE. 083 * @param strategy for job processing. 084 */ 085 public ThreadPool(StrategyEnumeration strategy) { 086 this(strategy, DEFAULT_SIZE); 087 } 088 089 090 /** 091 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO. 092 * @param size of the pool. 093 */ 094 public ThreadPool(int size) { 095 this(StrategyEnumeration.FIFO, size); 096 } 097 098 099 /** 100 * Constructs a new ThreadPool. 101 * @param strategy for job processing. 102 * @param size of the pool. 103 */ 104 public ThreadPool(StrategyEnumeration strategy, int size) { 105 this.size = size; 106 this.strategy = strategy; 107 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 108 workers = new PoolThread[0]; 109 } 110 111 112 /** 113 * thread initialization and start. 114 */ 115 public void init() { 116 if (workers == null || workers.length == 0) { 117 workers = new PoolThread[size]; 118 for (int i = 0; i < workers.length; i++) { 119 workers[i] = new PoolThread(this); 120 workers[i].start(); 121 } 122 logger.info("size = " + size + ", strategy = " + strategy); 123 } 124 if (debug) { 125 Thread.dumpStack(); 126 } 127 } 128 129 130 /** 131 * toString. 132 */ 133 @Override 134 public String toString() { 135 return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs=" 136 + jobstack.size() + ")"; 137 } 138 139 140 /** 141 * number of worker threads. 142 */ 143 public int getNumber() { 144 return size; 145 //if (workers == null || workers.length < size) { 146 // init(); // start threads 147 //} 148 //return workers.length; // not null 149 } 150 151 152 /** 153 * get used strategy. 154 */ 155 public StrategyEnumeration getStrategy() { 156 return strategy; 157 } 158 159 160 /** 161 * Terminates the threads. 162 */ 163 public void terminate() { 164 while (hasJobs()) { 165 try { 166 Thread.sleep(100); 167 //logger.info("waiting for termination in " + this); 168 } catch (InterruptedException e) { 169 Thread.currentThread().interrupt(); 170 } 171 } 172 for (int i = 0; i < workers.length; i++) { 173 try { 174 while (workers[i].isAlive()) { 175 workers[i].interrupt(); 176 workers[i].join(100); 177 } 178 } catch (InterruptedException e) { 179 Thread.currentThread().interrupt(); 180 } 181 } 182 } 183 184 185 /** 186 * Cancels the threads. 187 */ 188 public int cancel() { 189 shutdown = true; 190 int s = jobstack.size(); 191 if (hasJobs()) { 192 synchronized (this) { 193 logger.info("jobs canceled: " + jobstack); 194 jobstack.clear(); 195 } 196 } 197 int re = 0; 198 for (int i = 0; i < workers.length; i++) { 199 if (workers[i] == null) { 200 continue; 201 } 202 try { 203 while (workers[i].isAlive()) { 204 synchronized (this) { 205 shutdown = true; 206 notifyAll(); // for getJob 207 workers[i].interrupt(); 208 } 209 re++; 210 //if ( re > 3 * workers.length ) { 211 // break; // give up 212 //} 213 workers[i].join(100); 214 } 215 } catch (InterruptedException e) { 216 Thread.currentThread().interrupt(); 217 } 218 } 219 return s; 220 } 221 222 223 /** 224 * adds a job to the workpile. 225 * @param job 226 */ 227 public synchronized void addJob(Runnable job) { 228 if (workers == null || workers.length < size) { 229 init(); // start threads 230 } 231 jobstack.addLast(job); 232 logger.debug("adding job"); 233 if (idleworkers > 0) { 234 logger.debug("notifying a jobless worker"); 235 notifyAll(); 236 } 237 } 238 239 240 /** 241 * get a job for processing. 242 */ 243 protected synchronized Runnable getJob() throws InterruptedException { 244 while (jobstack.isEmpty()) { 245 idleworkers++; 246 logger.debug("waiting"); 247 wait(); 248 idleworkers--; 249 if (shutdown) { 250 throw new InterruptedException("shutdown in getJob"); 251 } 252 } 253 // is expressed using strategy enumeration 254 if (strategy == StrategyEnumeration.LIFO) { 255 return jobstack.removeLast(); // LIFO 256 } 257 return jobstack.removeFirst(); // FIFO 258 } 259 260 261 /** 262 * check if there are jobs for processing. 263 */ 264 public boolean hasJobs() { 265 if (jobstack.size() > 0) { 266 return true; 267 } 268 for (int i = 0; i < workers.length; i++) { 269 if (workers[i] == null) { 270 continue; 271 } 272 if (workers[i].isWorking) { 273 return true; 274 } 275 } 276 return false; 277 } 278 279 280 /** 281 * check if there are more than n jobs for processing. 282 * @param n Integer 283 * @return true, if there are possibly more than n jobs. 284 */ 285 public boolean hasJobs(int n) { 286 int j = jobstack.size(); 287 if (j > 0 && (j + workers.length > n)) { 288 return true; 289 } 290 // if j > 0 no worker should be idle 291 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 292 int x = 0; 293 for (int i = 0; i < workers.length; i++) { 294 if (workers[i] == null) { 295 continue; 296 } 297 if (workers[i].isWorking) { 298 x++; 299 } 300 } 301 if ((j + x) > n) { 302 return true; 303 } 304 return false; 305 } 306 307} 308 309 310/** 311 * Implements one Thread of the pool. 312 */ 313class PoolThread extends Thread { 314 315 316 ThreadPool pool; 317 318 319 private static final Logger logger = Logger.getLogger(ThreadPool.class); 320 321 322 private static boolean debug = logger.isDebugEnabled(); 323 324 325 volatile boolean isWorking = false; 326 327 328 /** 329 * @param pool ThreadPool. 330 */ 331 public PoolThread(ThreadPool pool) { 332 this.pool = pool; 333 } 334 335 336 /** 337 * Run the thread. 338 */ 339 @Override 340 public void run() { 341 logger.info("ready"); 342 Runnable job; 343 int done = 0; 344 long time = 0; 345 long t; 346 boolean running = true; 347 while (running) { 348 try { 349 logger.debug("looking for a job"); 350 job = pool.getJob(); 351 if (job == null) { 352 break; 353 } 354 if (debug) { 355 logger.info("working"); 356 } 357 t = System.currentTimeMillis(); 358 isWorking = true; 359 job.run(); 360 isWorking = false; 361 time += System.currentTimeMillis() - t; 362 done++; 363 if (debug) { 364 logger.info("done"); 365 } 366 } catch (InterruptedException e) { 367 Thread.currentThread().interrupt(); 368 running = false; 369 isWorking = false; 370 } catch (PreemptingException e) { 371 logger.debug("catched " + e); 372 //e.printStackTrace(); 373 } catch (RuntimeException e) { 374 logger.warn("catched " + e); 375 e.printStackTrace(); 376 } 377 } 378 isWorking = false; 379 logger.info("terminated, done " + done + " jobs in " + time + " milliseconds"); 380 } 381 382}