001/* 002 * $Id: ThreadPool.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005// package edu.unima.ky.parallel; 006package edu.jas.util; 007 008 009import java.util.LinkedList; 010 011import org.apache.logging.log4j.Logger; 012import org.apache.logging.log4j.LogManager; 013 014import edu.jas.kern.PreemptingException; 015 016 017/** 018 * Thread pool using stack / list workpile. 019 * @author Akitoshi Yoshida 020 * @author Heinz Kredel 021 */ 022 023public class ThreadPool { 024 025 026 /** 027 * Default number of threads to use. 028 */ 029 static final int DEFAULT_SIZE = 3; 030 031 032 /** 033 * Number of threads to use. 034 */ 035 final int size; 036 037 038 /** 039 * Array of workers. 040 */ 041 protected PoolThread[] workers; 042 043 044 /** 045 * Number of idle workers. 046 */ 047 protected int idleworkers = 0; 048 049 050 /** 051 * Shutdown request. 052 */ 053 protected volatile boolean shutdown = false; 054 055 056 /** 057 * Work queue / stack. 058 */ 059 // should be expressed using strategy pattern 060 // List or Collection is not appropriate 061 // LIFO strategy for recursion 062 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 063 064 065 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 066 067 068 private static final Logger logger = LogManager.getLogger(ThreadPool.class); 069 070 071 private static final boolean debug = logger.isDebugEnabled(); 072 073 074 /** 075 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO and 076 * size DEFAULT_SIZE. 077 */ 078 public ThreadPool() { 079 this(StrategyEnumeration.FIFO, DEFAULT_SIZE); 080 } 081 082 083 /** 084 * Constructs a new ThreadPool with size DEFAULT_SIZE. 085 * @param strategy for job processing. 086 */ 087 public ThreadPool(StrategyEnumeration strategy) { 088 this(strategy, DEFAULT_SIZE); 089 } 090 091 092 /** 093 * Constructs a new ThreadPool with strategy StrategyEnumeration.FIFO. 094 * @param size of the pool. 095 */ 096 public ThreadPool(int size) { 097 this(StrategyEnumeration.FIFO, size); 098 } 099 100 101 /** 102 * Constructs a new ThreadPool. 103 * @param strategy for job processing. 104 * @param size of the pool. 105 */ 106 public ThreadPool(StrategyEnumeration strategy, int size) { 107 this.size = size; 108 this.strategy = strategy; 109 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 110 workers = new PoolThread[0]; 111 } 112 113 114 /** 115 * thread initialization and start. 116 */ 117 public void init() { 118 if (workers == null || workers.length == 0) { 119 workers = new PoolThread[size]; 120 for (int i = 0; i < workers.length; i++) { 121 workers[i] = new PoolThread(this); 122 workers[i].start(); 123 } 124 logger.info("size = " + size + ", strategy = " + strategy); 125 } 126 if (debug) { 127 Thread.dumpStack(); 128 } 129 } 130 131 132 /** 133 * toString. 134 */ 135 @Override 136 public String toString() { 137 return "ThreadPool( size=" + getNumber() + ", idle=" + idleworkers + ", " + getStrategy() + ", jobs=" 138 + jobstack.size() + ")"; 139 } 140 141 142 /** 143 * number of worker threads. 144 */ 145 public int getNumber() { 146 return size; 147 //if (workers == null || workers.length < size) { 148 // init(); // start threads 149 //} 150 //return workers.length; // not null 151 } 152 153 154 /** 155 * get used strategy. 156 */ 157 public StrategyEnumeration getStrategy() { 158 return strategy; 159 } 160 161 162 /** 163 * Terminates the threads. 164 */ 165 public void terminate() { 166 while (hasJobs()) { 167 try { 168 Thread.sleep(100); 169 //logger.info("waiting for termination in " + this); 170 } catch (InterruptedException e) { 171 Thread.currentThread().interrupt(); 172 } 173 } 174 if (workers == null) { 175 return; 176 } 177 for (int i = 0; i < workers.length; i++) { 178 if (workers[i] == null) { 179 continue; 180 } 181 try { 182 while (workers[i].isAlive()) { 183 workers[i].interrupt(); 184 workers[i].join(100); 185 } 186 } catch (InterruptedException e) { 187 Thread.currentThread().interrupt(); 188 } 189 } 190 } 191 192 193 /** 194 * Cancels the threads. 195 */ 196 public int cancel() { 197 shutdown = true; 198 int s = jobstack.size(); 199 if (hasJobs()) { 200 synchronized (this) { 201 logger.info("jobs canceled: " + jobstack); 202 jobstack.clear(); 203 notifyAll(); // for getJob 204 } 205 } 206 //int re = 0; 207 if (workers == null) { 208 return s; 209 } 210 for (int i = 0; i < workers.length; i++) { 211 if (workers[i] == null) { 212 continue; 213 } 214 try { 215 while (workers[i].isAlive()) { 216 synchronized (this) { 217 shutdown = true; 218 notifyAll(); // for getJob 219 workers[i].interrupt(); 220 } 221 //re++; 222 //if ( re > 3 * workers.length ) { 223 // logger.info("give up on: " + workers[i]); 224 // break; // give up 225 //} 226 workers[i].join(100); 227 } 228 } catch (InterruptedException e) { 229 Thread.currentThread().interrupt(); 230 } 231 } 232 return s; 233 } 234 235 236 /** 237 * adds a job to the workpile. 238 * @param job 239 */ 240 public synchronized void addJob(Runnable job) { 241 if (workers == null || workers.length < size) { 242 init(); // start threads 243 } 244 jobstack.addLast(job); 245 logger.debug("adding job"); 246 if (idleworkers > 0) { 247 logger.debug("notifying a jobless worker"); 248 notifyAll(); 249 } 250 } 251 252 253 /** 254 * get a job for processing. 255 */ 256 protected synchronized Runnable getJob() throws InterruptedException { 257 while (jobstack.isEmpty()) { 258 idleworkers++; 259 logger.debug("waiting"); 260 wait(1000); 261 idleworkers--; 262 if (shutdown) { 263 throw new InterruptedException("shutdown in getJob"); 264 } 265 } 266 // is expressed using strategy enumeration 267 if (strategy == StrategyEnumeration.LIFO) { 268 return jobstack.removeLast(); // LIFO 269 } 270 return jobstack.removeFirst(); // FIFO 271 } 272 273 274 /** 275 * check if there are jobs for processing. 276 */ 277 public boolean hasJobs() { 278 if (jobstack.size() > 0) { 279 return true; 280 } 281 for (int i = 0; i < workers.length; i++) { 282 if (workers[i] == null) { 283 continue; 284 } 285 if (workers[i].isWorking) { 286 return true; 287 } 288 } 289 return false; 290 } 291 292 293 /** 294 * check if there are more than n jobs for processing. 295 * @param n Integer 296 * @return true, if there are possibly more than n jobs. 297 */ 298 public boolean hasJobs(int n) { 299 int j = jobstack.size(); 300 if (j > 0 && (j + workers.length > n)) { 301 return true; 302 } 303 // if j > 0 no worker should be idle 304 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 305 int x = 0; 306 for (int i = 0; i < workers.length; i++) { 307 if (workers[i] == null) { 308 continue; 309 } 310 if (workers[i].isWorking) { 311 x++; 312 } 313 } 314 if ((j + x) > n) { 315 return true; 316 } 317 return false; 318 } 319 320} 321 322 323/** 324 * Implements one Thread of the pool. 325 */ 326class PoolThread extends Thread { 327 328 329 ThreadPool pool; 330 331 332 private static final Logger logger = LogManager.getLogger(PoolThread.class); 333 334 335 private static final boolean debug = logger.isDebugEnabled(); 336 337 338 volatile boolean isWorking = false; 339 340 341 /** 342 * @param pool ThreadPool. 343 */ 344 public PoolThread(ThreadPool pool) { 345 this.pool = pool; 346 } 347 348 349 /** 350 * Run the thread. 351 */ 352 @Override 353 public void run() { 354 logger.info("ready"); 355 Runnable job; 356 int done = 0; 357 long time = 0; 358 long t; 359 boolean running = true; 360 while (running) { 361 try { 362 logger.debug("looking for a job"); 363 job = pool.getJob(); 364 if (job == null) { 365 break; 366 } 367 if (debug) { 368 logger.info("working"); 369 } 370 t = System.currentTimeMillis(); 371 isWorking = true; 372 job.run(); 373 isWorking = false; 374 time += System.currentTimeMillis() - t; 375 done++; 376 if (debug) { 377 logger.info("done"); 378 } 379 if (Thread.currentThread().isInterrupted()) { 380 running = false; 381 isWorking = false; 382 //throw new RuntimeException("interrupt in while(running) loop"); 383 } 384 } catch (InterruptedException e) { 385 Thread.currentThread().interrupt(); 386 running = false; 387 isWorking = false; 388 } catch (PreemptingException e) { 389 logger.debug("catched " + e); 390 //e.printStackTrace(); 391 } catch (RuntimeException e) { 392 logger.warn("catched " + e); 393 e.printStackTrace(); 394 } 395 } 396 isWorking = false; 397 logger.info("terminated, done " + done + " jobs in " + time + " milliseconds"); 398 } 399 400}