001/* 002 * $Id: DistThreadPool.java 4305 2012-12-01 10:46:45Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.FileNotFoundException; 009import java.io.IOException; 010import java.util.LinkedList; 011 012import org.apache.log4j.Logger; 013 014 015/** 016 * Distributed thread pool. Using stack / list work-pile and Executable Channels 017 * and Servers. 018 * @author Heinz Kredel 019 */ 020 021public class DistThreadPool /*extends ThreadPool*/{ 022 023 024 /** 025 * machine file to use. 026 */ 027 private final String mfile; 028 029 030 /** 031 * default machine file for test. 032 */ 033 private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE; 034 035 036 /** 037 * Number of threads to use. 038 */ 039 protected final int threads; 040 041 042 /** 043 * Default number of threads to use. 044 */ 045 static final int DEFAULT_SIZE = 3; 046 047 048 /** 049 * Channels to remote executable servers. 050 */ 051 final ExecutableChannels ec; 052 053 054 /** 055 * Array of workers. 056 */ 057 protected DistPoolThread[] workers; 058 059 060 /** 061 * Number of idle workers. 062 */ 063 protected int idleworkers = 0; 064 065 066 /** 067 * Work queue / stack. 068 */ 069 // should be expressed using strategy pattern 070 // List or Collection is not appropriate 071 // LIFO strategy for recursion 072 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 073 074 075 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 076 077 078 private static final Logger logger = Logger.getLogger(DistThreadPool.class); 079 080 081 private final boolean debug = true; //logger.isDebugEnabled(); 082 083 084 /** 085 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO 086 * and size DEFAULT_SIZE. 087 */ 088 public DistThreadPool() { 089 this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null); 090 } 091 092 093 /** 094 * Constructs a new DistThreadPool with size DEFAULT_SIZE. 095 * @param strategy for job processing. 096 */ 097 public DistThreadPool(StrategyEnumeration strategy) { 098 this(strategy, DEFAULT_SIZE, null); 099 } 100 101 102 /** 103 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 104 * @param size of the pool. 105 */ 106 public DistThreadPool(int size) { 107 this(StrategyEnumeration.FIFO, size, null); 108 } 109 110 111 /** 112 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 113 * @param size of the pool. 114 * @param mfile machine file. 115 */ 116 public DistThreadPool(int size, String mfile) { 117 this(StrategyEnumeration.FIFO, size, mfile); 118 } 119 120 121 /** 122 * Constructs a new DistThreadPool. 123 * @param strategy for job processing. 124 * @param size of the pool. 125 * @param mfile machine file. 126 */ 127 public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) { 128 this.strategy = strategy; 129 if (size < 0) { 130 this.threads = 0; 131 } else { 132 this.threads = size; 133 } 134 if (mfile == null || mfile.length() == 0) { 135 this.mfile = DEFAULT_MFILE; 136 } else { 137 this.mfile = mfile; 138 } 139 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 140 try { 141 ec = new ExecutableChannels(this.mfile); 142 } catch (FileNotFoundException e) { 143 e.printStackTrace(); 144 throw new IllegalArgumentException("DistThreadPool " + e); 145 } 146 if (debug) { 147 logger.info("ec = " + ec); 148 } 149 try { 150 ec.open(threads); 151 } catch (IOException e) { 152 e.printStackTrace(); 153 throw new IllegalArgumentException("DistThreadPool " + e); 154 } 155 if (debug) { 156 logger.info("ec = " + ec); 157 } 158 workers = new DistPoolThread[0]; 159 } 160 161 162 /** 163 * String representation. 164 */ 165 @Override 166 public String toString() { 167 StringBuffer s = new StringBuffer("DistThreadPool("); 168 s.append("threads="+threads); 169 s.append(", exchan="+ec); 170 s.append(", workers="+workers.length); 171 s.append(")"); 172 return s.toString(); 173 } 174 175 176 /** 177 * thread initialization and start. 178 */ 179 public void init() { 180 if (workers == null || workers.length == 0) { 181 workers = new DistPoolThread[threads]; 182 for (int i = 0; i < workers.length; i++) { 183 workers[i] = new DistPoolThread(this, ec, i); 184 workers[i].start(); 185 } 186 logger.info("size = " + threads + ", strategy = " + strategy); 187 } 188 } 189 190 191 /** 192 * number of worker threads. 193 */ 194 public int getNumber() { 195 if (workers == null || workers.length < threads) { 196 init(); // start threads 197 } 198 return workers.length; // not null 199 } 200 201 202 /** 203 * get used strategy. 204 */ 205 public StrategyEnumeration getStrategy() { 206 return strategy; 207 } 208 209 210 /** 211 * the used executable channel. 212 */ 213 public ExecutableChannels getEC() { 214 return ec; // not null 215 } 216 217 218 /** 219 * Terminates the threads. 220 * @param shutDown true, if shut-down of the remote executable servers is 221 * requested, false, if remote executable servers stay alive. 222 */ 223 public void terminate(boolean shutDown) { 224 if (shutDown) { 225 ShutdownRequest sdr = new ShutdownRequest(); 226 for (int i = 0; i < workers.length; i++) { 227 addJob(sdr); 228 } 229 try { 230 Thread.sleep(20); 231 } catch (InterruptedException e) { 232 Thread.currentThread().interrupt(); 233 } 234 logger.info("remaining jobs = " + jobstack.size()); 235 try { 236 for (int i = 0; i < workers.length; i++) { 237 while (workers[i].isAlive()) { 238 workers[i].interrupt(); 239 workers[i].join(100); 240 } 241 } 242 } catch (InterruptedException e) { 243 Thread.currentThread().interrupt(); 244 } 245 } else { 246 terminate(); 247 } 248 } 249 250 251 /** 252 * Terminates the threads. 253 */ 254 public void terminate() { 255 while (hasJobs()) { 256 try { 257 Thread.sleep(100); 258 } catch (InterruptedException e) { 259 Thread.currentThread().interrupt(); 260 } 261 } 262 for (int i = 0; i < workers.length; i++) { 263 try { 264 while (workers[i].isAlive()) { 265 workers[i].interrupt(); 266 workers[i].join(100); 267 } 268 } catch (InterruptedException e) { 269 Thread.currentThread().interrupt(); 270 } 271 } 272 ec.close(); 273 } 274 275 276 /** 277 * adds a job to the workpile. 278 * @param job 279 */ 280 public synchronized void addJob(Runnable job) { 281 if (workers == null || workers.length < threads) { 282 init(); // start threads 283 } 284 jobstack.addLast(job); 285 logger.debug("adding job"); 286 if (idleworkers > 0) { 287 logger.debug("notifying a jobless worker"); 288 notifyAll(); // findbugs 289 } 290 } 291 292 293 /** 294 * get a job for processing. 295 */ 296 protected synchronized Runnable getJob() throws InterruptedException { 297 while (jobstack.isEmpty()) { 298 idleworkers++; 299 logger.debug("waiting"); 300 wait(); 301 idleworkers--; 302 } 303 // is expressed using strategy enumeration 304 if (strategy == StrategyEnumeration.LIFO) { 305 return jobstack.removeLast(); // LIFO 306 } 307 return jobstack.removeFirst(); // FIFO 308 } 309 310 311 /** 312 * check if there are jobs for processing. 313 */ 314 public boolean hasJobs() { 315 if (jobstack.size() > 0) { 316 return true; 317 } 318 for (int i = 0; i < workers.length; i++) { 319 if (workers[i].working) { 320 return true; 321 } 322 } 323 return false; 324 } 325 326 327 /** 328 * check if there are more than n jobs for processing. 329 * @param n Integer 330 * @return true, if there are possibly more than n jobs. 331 */ 332 public boolean hasJobs(int n) { 333 int j = jobstack.size(); 334 if (j > 0 && (j + workers.length > n)) { 335 return true; 336 // if j > 0 no worker should be idle 337 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 338 } 339 int x = 0; 340 for (int i = 0; i < workers.length; i++) { 341 if (workers[i].working) { 342 x++; 343 } 344 } 345 if ((j + x) > n) { 346 return true; 347 } 348 return false; 349 } 350 351} 352 353 354/** 355 * Implements a shutdown task. 356 */ 357class ShutdownRequest implements Runnable { 358 359 360 /** 361 * Run the thread. 362 */ 363 public void run() { 364 System.out.println("running ShutdownRequest"); 365 } 366 367 368 /** 369 * toString. 370 * @see java.lang.Object#toString() 371 */ 372 @Override 373 public String toString() { 374 return "ShutdownRequest"; 375 } 376 377} 378 379 380/** 381 * Implements one local part of the distributed thread. 382 */ 383class DistPoolThread extends Thread { 384 385 386 final DistThreadPool pool; 387 388 389 final ExecutableChannels ec; 390 391 392 final int myId; 393 394 395 private static final Logger logger = Logger.getLogger(DistPoolThread.class); 396 397 398 private final boolean debug = logger.isDebugEnabled(); 399 400 401 boolean working = false; 402 403 404 /** 405 * @param pool DistThreadPool. 406 */ 407 public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) { 408 this.pool = pool; 409 this.ec = ec; 410 myId = i; 411 } 412 413 414 /** 415 * Run the thread. 416 */ 417 @Override 418 public void run() { 419 logger.info("ready, myId = " + myId); 420 Runnable job; 421 int done = 0; 422 long time = 0; 423 long t; 424 boolean running = true; 425 while (running) { 426 try { 427 logger.debug("looking for a job"); 428 job = pool.getJob(); 429 working = true; 430 if (debug) { 431 logger.info("working " + myId + " on " + job); 432 } 433 t = System.currentTimeMillis(); 434 // send and wait, like rmi 435 try { 436 if (job instanceof ShutdownRequest) { 437 ec.send(myId, ExecutableServer.STOP); 438 } else { 439 ec.send(myId, job); 440 } 441 if (debug) { 442 logger.info("send " + myId + " at " + ec + " send job " + job); 443 } 444 } catch (IOException e) { 445 e.printStackTrace(); 446 logger.info("error send " + myId + " at " + ec + " e = " + e); 447 working = false; 448 } 449 // remote: job.run(); 450 Object o = null; 451 try { 452 if (working) { 453 logger.info("waiting " + myId + " on " + job); 454 o = ec.receive(myId); 455 if (debug) { 456 logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o); 457 } 458 } 459 } catch (IOException e) { 460 logger.info("receive exception " + myId + " send job " + job + ", " + e); 461 //e.printStackTrace(); 462 running = false; 463 } catch (ClassNotFoundException e) { 464 logger.info("receive exception " + myId + " send job " + job + ", " + e); 465 //e.printStackTrace(); 466 running = false; 467 } finally { 468 if (debug) { 469 logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received " 470 + o + " running " + running); 471 } 472 } 473 working = false; 474 time += System.currentTimeMillis() - t; 475 done++; 476 if (debug) { 477 logger.info("done " + myId + " with " + o); 478 } 479 } catch (InterruptedException e) { 480 running = false; 481 Thread.currentThread().interrupt(); 482 } 483 } 484 logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds"); 485 } 486 487}