001/* 002 * $Id: DistThreadPool.java 4593 2013-08-21 17:24:54Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.FileNotFoundException; 009import java.io.IOException; 010import java.util.LinkedList; 011 012import org.apache.log4j.Logger; 013 014 015/** 016 * Distributed thread pool. Using stack / list work-pile and Executable Channels 017 * and Servers. 018 * @author Heinz Kredel 019 */ 020 021public class DistThreadPool /*extends ThreadPool*/{ 022 023 024 /** 025 * machine file to use. 026 */ 027 private final String mfile; 028 029 030 /** 031 * default machine file for test. 032 */ 033 private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE; 034 035 036 /** 037 * Number of threads to use. 038 */ 039 protected final int threads; 040 041 042 /** 043 * Default number of threads to use. 044 */ 045 static final int DEFAULT_SIZE = 3; 046 047 048 /** 049 * Channels to remote executable servers. 050 */ 051 final ExecutableChannels ec; 052 053 054 /** 055 * Array of workers. 056 */ 057 protected DistPoolThread[] workers; 058 059 060 /** 061 * Number of idle workers. 062 */ 063 protected int idleworkers = 0; 064 065 066 /** 067 * Work queue / stack. 068 */ 069 // should be expressed using strategy pattern 070 // List or Collection is not appropriate 071 // LIFO strategy for recursion 072 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 073 074 075 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 076 077 078 private static final Logger logger = Logger.getLogger(DistThreadPool.class); 079 080 081 private final boolean debug = true; //logger.isDebugEnabled(); 082 083 084 /** 085 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO 086 * and size DEFAULT_SIZE. 087 */ 088 public DistThreadPool() { 089 this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null); 090 } 091 092 093 /** 094 * Constructs a new DistThreadPool with size DEFAULT_SIZE. 095 * @param strategy for job processing. 096 */ 097 public DistThreadPool(StrategyEnumeration strategy) { 098 this(strategy, DEFAULT_SIZE, null); 099 } 100 101 102 /** 103 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 104 * @param size of the pool. 105 */ 106 public DistThreadPool(int size) { 107 this(StrategyEnumeration.FIFO, size, null); 108 } 109 110 111 /** 112 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 113 * @param size of the pool. 114 * @param mfile machine file. 115 */ 116 public DistThreadPool(int size, String mfile) { 117 this(StrategyEnumeration.FIFO, size, mfile); 118 } 119 120 121 /** 122 * Constructs a new DistThreadPool. 123 * @param strategy for job processing. 124 * @param size of the pool. 125 * @param mfile machine file. 126 */ 127 public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) { 128 this.strategy = strategy; 129 if (size < 0) { 130 this.threads = 0; 131 } else { 132 this.threads = size; 133 } 134 if (mfile == null || mfile.length() == 0) { 135 this.mfile = DEFAULT_MFILE; 136 } else { 137 this.mfile = mfile; 138 } 139 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 140 try { 141 ec = new ExecutableChannels(this.mfile); 142 } catch (FileNotFoundException e) { 143 e.printStackTrace(); 144 throw new IllegalArgumentException("DistThreadPool " + e); 145 } 146 if (debug) { 147 logger.info("ec = " + ec); 148 } 149 try { 150 ec.open(threads); 151 } catch (IOException e) { 152 e.printStackTrace(); 153 throw new IllegalArgumentException("DistThreadPool " + e); 154 } 155 if (debug) { 156 logger.info("ec = " + ec); 157 } 158 workers = new DistPoolThread[0]; 159 } 160 161 162 /** 163 * String representation. 164 */ 165 @Override 166 public String toString() { 167 StringBuffer s = new StringBuffer("DistThreadPool("); 168 s.append("threads="+threads); 169 s.append(", strategy="+strategy); 170 s.append(", exchan="+ec); 171 s.append(", workers="+workers.length); 172 s.append(")"); 173 return s.toString(); 174 } 175 176 177 /** 178 * thread initialization and start. 179 */ 180 public void init() { 181 if (workers == null || workers.length == 0) { 182 workers = new DistPoolThread[threads]; 183 for (int i = 0; i < workers.length; i++) { 184 workers[i] = new DistPoolThread(this, ec, i); 185 workers[i].start(); 186 } 187 logger.info("init: " + this.toString()); 188 } 189 } 190 191 192 /** 193 * number of worker threads. 194 */ 195 public int getNumber() { 196 if (workers == null || workers.length < threads) { 197 init(); // start threads 198 } 199 return workers.length; // not null 200 } 201 202 203 /** 204 * get used strategy. 205 */ 206 public StrategyEnumeration getStrategy() { 207 return strategy; 208 } 209 210 211 /** 212 * the used executable channel. 213 */ 214 public ExecutableChannels getEC() { 215 return ec; // not null 216 } 217 218 219 /** 220 * Terminates the threads. 221 * @param shutDown true, if shut-down of the remote executable servers is 222 * requested, false, if remote executable servers stay alive. 223 */ 224 public void terminate(boolean shutDown) { 225 if (shutDown) { 226 ShutdownRequest sdr = new ShutdownRequest(); 227 for (int i = 0; i < workers.length; i++) { 228 addJob(sdr); 229 } 230 try { 231 Thread.sleep(20); 232 } catch (InterruptedException e) { 233 Thread.currentThread().interrupt(); 234 } 235 logger.info("remaining jobs = " + jobstack.size()); 236 try { 237 for (int i = 0; i < workers.length; i++) { 238 while (workers[i].isAlive()) { 239 workers[i].interrupt(); 240 workers[i].join(100); 241 } 242 } 243 } catch (InterruptedException e) { 244 Thread.currentThread().interrupt(); 245 } 246 } else { 247 terminate(); 248 } 249 } 250 251 252 /** 253 * Terminates the threads. 254 */ 255 public void terminate() { 256 while (hasJobs()) { 257 try { 258 Thread.sleep(100); 259 } catch (InterruptedException e) { 260 Thread.currentThread().interrupt(); 261 } 262 } 263 for (int i = 0; i < workers.length; i++) { 264 try { 265 while (workers[i].isAlive()) { 266 workers[i].interrupt(); 267 workers[i].join(100); 268 } 269 } catch (InterruptedException e) { 270 Thread.currentThread().interrupt(); 271 } 272 } 273 ec.close(); 274 } 275 276 277 /** 278 * adds a job to the workpile. 279 * @param job 280 */ 281 public synchronized void addJob(Runnable job) { 282 if (workers == null || workers.length < threads) { 283 init(); // start threads 284 } 285 jobstack.addLast(job); 286 logger.debug("adding job"); 287 if (idleworkers > 0) { 288 logger.debug("notifying a jobless worker"); 289 notifyAll(); // findbugs 290 } 291 } 292 293 294 /** 295 * get a job for processing. 296 */ 297 protected synchronized Runnable getJob() throws InterruptedException { 298 while (jobstack.isEmpty()) { 299 idleworkers++; 300 logger.debug("waiting"); 301 wait(); 302 idleworkers--; 303 } 304 // is expressed using strategy enumeration 305 if (strategy == StrategyEnumeration.LIFO) { 306 return jobstack.removeLast(); // LIFO 307 } 308 return jobstack.removeFirst(); // FIFO 309 } 310 311 312 /** 313 * check if there are jobs for processing. 314 */ 315 public boolean hasJobs() { 316 if (jobstack.size() > 0) { 317 return true; 318 } 319 for (int i = 0; i < workers.length; i++) { 320 if (workers[i].working) { 321 return true; 322 } 323 } 324 return false; 325 } 326 327 328 /** 329 * check if there are more than n jobs for processing. 330 * @param n Integer 331 * @return true, if there are possibly more than n jobs. 332 */ 333 public boolean hasJobs(int n) { 334 int j = jobstack.size(); 335 if (j > 0 && (j + workers.length > n)) { 336 return true; 337 // if j > 0 no worker should be idle 338 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 339 } 340 int x = 0; 341 for (int i = 0; i < workers.length; i++) { 342 if (workers[i].working) { 343 x++; 344 } 345 } 346 if ((j + x) > n) { 347 return true; 348 } 349 return false; 350 } 351 352} 353 354 355/** 356 * Implements a shutdown task. 357 */ 358class ShutdownRequest implements Runnable { 359 360 361 /** 362 * Run the thread. 363 */ 364 public void run() { 365 System.out.println("running ShutdownRequest"); 366 } 367 368 369 /** 370 * toString. 371 * @see java.lang.Object#toString() 372 */ 373 @Override 374 public String toString() { 375 return "ShutdownRequest"; 376 } 377 378} 379 380 381/** 382 * Implements one local part of the distributed thread. 383 */ 384class DistPoolThread extends Thread { 385 386 387 final DistThreadPool pool; 388 389 390 final ExecutableChannels ec; 391 392 393 final int myId; 394 395 396 private static final Logger logger = Logger.getLogger(DistPoolThread.class); 397 398 399 private final boolean debug = logger.isDebugEnabled(); 400 401 402 boolean working = false; 403 404 405 /** 406 * @param pool DistThreadPool. 407 */ 408 public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) { 409 this.pool = pool; 410 this.ec = ec; 411 myId = i; 412 } 413 414 415 /** 416 * Run the thread. 417 */ 418 @Override 419 public void run() { 420 logger.info("ready, myId = " + myId); 421 Runnable job; 422 int done = 0; 423 long time = 0; 424 long t; 425 boolean running = true; 426 while (running) { 427 try { 428 logger.debug("looking for a job"); 429 job = pool.getJob(); 430 working = true; 431 if (debug) { 432 logger.info("working " + myId + " on " + job); 433 } 434 t = System.currentTimeMillis(); 435 // send and wait, like rmi 436 try { 437 if (job instanceof ShutdownRequest) { 438 ec.send(myId, ExecutableServer.STOP); 439 } else { 440 ec.send(myId, job); 441 } 442 if (debug) { 443 logger.info("send " + myId + " at " + ec + " send job " + job); 444 } 445 } catch (IOException e) { 446 e.printStackTrace(); 447 logger.info("error send " + myId + " at " + ec + " e = " + e); 448 working = false; 449 } 450 // remote: job.run(); 451 Object o = null; 452 try { 453 if (working) { 454 logger.info("waiting " + myId + " on " + job); 455 o = ec.receive(myId); 456 if (debug) { 457 logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o); 458 } 459 } 460 } catch (IOException e) { 461 logger.info("receive exception " + myId + " send job " + job + ", " + e); 462 //e.printStackTrace(); 463 running = false; 464 } catch (ClassNotFoundException e) { 465 logger.info("receive exception " + myId + " send job " + job + ", " + e); 466 //e.printStackTrace(); 467 running = false; 468 } finally { 469 if (debug) { 470 logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received " 471 + o + " running " + running); 472 } 473 } 474 working = false; 475 time += System.currentTimeMillis() - t; 476 done++; 477 if (debug) { 478 logger.info("done " + myId + " with " + o); 479 } 480 } catch (InterruptedException e) { 481 running = false; 482 Thread.currentThread().interrupt(); 483 } 484 } 485 logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds"); 486 } 487 488}