001/* 002 * $Id: DistThreadPool.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.FileNotFoundException; 009import java.io.IOException; 010import java.util.LinkedList; 011 012import org.apache.logging.log4j.Logger; 013import org.apache.logging.log4j.LogManager; 014 015 016/** 017 * Distributed thread pool. Using stack / list work-pile and Executable Channels 018 * and Servers. 019 * @author Heinz Kredel 020 */ 021 022public class DistThreadPool /*extends ThreadPool*/{ 023 024 025 /** 026 * machine file to use. 027 */ 028 private final String mfile; 029 030 031 /** 032 * default machine file for test. 033 */ 034 private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE; 035 036 037 /** 038 * Number of threads to use. 039 */ 040 protected final int threads; 041 042 043 /** 044 * Default number of threads to use. 045 */ 046 static final int DEFAULT_SIZE = 3; 047 048 049 /** 050 * Channels to remote executable servers. 051 */ 052 final ExecutableChannels ec; 053 054 055 /** 056 * Array of workers. 057 */ 058 protected DistPoolThread[] workers; 059 060 061 /** 062 * Number of idle workers. 063 */ 064 protected int idleworkers = 0; 065 066 067 /** 068 * Work queue / stack. 069 */ 070 // should be expressed using strategy pattern 071 // List or Collection is not appropriate 072 // LIFO strategy for recursion 073 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 074 075 076 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 077 078 079 private static final Logger logger = LogManager.getLogger(DistThreadPool.class); 080 081 082 private static final boolean debug = true; //logger.isDebugEnabled(); 083 084 085 /** 086 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO 087 * and size DEFAULT_SIZE. 088 */ 089 public DistThreadPool() { 090 this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null); 091 } 092 093 094 /** 095 * Constructs a new DistThreadPool with size DEFAULT_SIZE. 096 * @param strategy for job processing. 097 */ 098 public DistThreadPool(StrategyEnumeration strategy) { 099 this(strategy, DEFAULT_SIZE, null); 100 } 101 102 103 /** 104 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 105 * @param size of the pool. 106 */ 107 public DistThreadPool(int size) { 108 this(StrategyEnumeration.FIFO, size, null); 109 } 110 111 112 /** 113 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 114 * @param size of the pool. 115 * @param mfile machine file. 116 */ 117 public DistThreadPool(int size, String mfile) { 118 this(StrategyEnumeration.FIFO, size, mfile); 119 } 120 121 122 /** 123 * Constructs a new DistThreadPool. 124 * @param strategy for job processing. 125 * @param size of the pool. 126 * @param mfile machine file. 127 */ 128 public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) { 129 this.strategy = strategy; 130 if (size < 0) { 131 this.threads = 0; 132 } else { 133 this.threads = size; 134 } 135 if (mfile == null || mfile.length() == 0) { 136 this.mfile = DEFAULT_MFILE; 137 } else { 138 this.mfile = mfile; 139 } 140 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 141 try { 142 ec = new ExecutableChannels(this.mfile); 143 } catch (FileNotFoundException e) { 144 e.printStackTrace(); 145 throw new IllegalArgumentException("DistThreadPool " + e); 146 } 147 if (debug) { 148 logger.info("ec = " + ec); 149 } 150 try { 151 ec.open(threads); 152 } catch (IOException e) { 153 e.printStackTrace(); 154 throw new IllegalArgumentException("DistThreadPool " + e); 155 } 156 if (debug) { 157 logger.info("ec = " + ec); 158 } 159 workers = new DistPoolThread[0]; 160 } 161 162 163 /** 164 * String representation. 165 */ 166 @Override 167 public String toString() { 168 StringBuffer s = new StringBuffer("DistThreadPool("); 169 s.append("threads="+threads); 170 s.append(", strategy="+strategy); 171 s.append(", exchan="+ec); 172 s.append(", workers="+workers.length); 173 s.append(")"); 174 return s.toString(); 175 } 176 177 178 /** 179 * thread initialization and start. 180 */ 181 public void init() { 182 if (workers == null || workers.length == 0) { 183 workers = new DistPoolThread[threads]; 184 for (int i = 0; i < workers.length; i++) { 185 workers[i] = new DistPoolThread(this, ec, i); 186 workers[i].start(); 187 } 188 logger.info("init: " + this.toString()); 189 } 190 } 191 192 193 /** 194 * number of worker threads. 195 */ 196 public int getNumber() { 197 if (workers == null || workers.length < threads) { 198 init(); // start threads 199 } 200 return workers.length; // not null 201 } 202 203 204 /** 205 * get used strategy. 206 */ 207 public StrategyEnumeration getStrategy() { 208 return strategy; 209 } 210 211 212 /** 213 * the used executable channel. 214 */ 215 public ExecutableChannels getEC() { 216 return ec; // not null 217 } 218 219 220 /** 221 * Terminates the threads. 222 * @param shutDown true, if shut-down of the remote executable servers is 223 * requested, false, if remote executable servers stay alive. 224 */ 225 public void terminate(boolean shutDown) { 226 if (shutDown) { 227 logger.info("shutdown = " + this); 228 ShutdownRequest sdr = new ShutdownRequest(); 229 for (int i = 0; i < workers.length; i++) { 230 addJob(sdr); 231 } 232 try { 233 Thread.sleep(20); 234 } catch (InterruptedException e) { 235 Thread.currentThread().interrupt(); 236 } 237 logger.info("remaining jobs = " + jobstack.size()); 238 try { 239 for (int i = 0; i < workers.length; i++) { 240 while (workers[i].isAlive()) { 241 workers[i].interrupt(); 242 workers[i].join(100); 243 } 244 } 245 } catch (InterruptedException e) { 246 Thread.currentThread().interrupt(); 247 } 248 ec.close(); 249 } else { 250 terminate(); 251 } 252 logger.info("terminated = " + this); 253 } 254 255 256 /** 257 * Terminates the threads. 258 */ 259 public void terminate() { 260 logger.info("terminate = " + this); 261 while (hasJobs()) { 262 try { 263 Thread.sleep(100); 264 } catch (InterruptedException e) { 265 Thread.currentThread().interrupt(); 266 } 267 } 268 for (int i = 0; i < workers.length; i++) { 269 try { 270 while (workers[i].isAlive()) { 271 workers[i].interrupt(); 272 workers[i].join(100); 273 } 274 } catch (InterruptedException e) { 275 Thread.currentThread().interrupt(); 276 } 277 } 278 ec.close(); 279 logger.info("terminated = " + this); 280 } 281 282 283 /** 284 * adds a job to the workpile. 285 * @param job 286 */ 287 public synchronized void addJob(Runnable job) { 288 if (workers == null || workers.length < threads) { 289 init(); // start threads 290 } 291 jobstack.addLast(job); 292 logger.debug("adding job"); 293 if (idleworkers > 0) { 294 logger.debug("notifying a jobless worker"); 295 notifyAll(); // findbugs 296 } 297 } 298 299 300 /** 301 * get a job for processing. 302 */ 303 protected synchronized Runnable getJob() throws InterruptedException { 304 while (jobstack.isEmpty()) { 305 idleworkers++; 306 logger.debug("waiting"); 307 wait(); 308 idleworkers--; 309 } 310 // is expressed using strategy enumeration 311 if (strategy == StrategyEnumeration.LIFO) { 312 return jobstack.removeLast(); // LIFO 313 } 314 return jobstack.removeFirst(); // FIFO 315 } 316 317 318 /** 319 * check if there are jobs for processing. 320 */ 321 public boolean hasJobs() { 322 if (jobstack.size() > 0) { 323 return true; 324 } 325 for (int i = 0; i < workers.length; i++) { 326 if (workers[i].working) { 327 return true; 328 } 329 } 330 return false; 331 } 332 333 334 /** 335 * check if there are more than n jobs for processing. 336 * @param n Integer 337 * @return true, if there are possibly more than n jobs. 338 */ 339 public boolean hasJobs(int n) { 340 int j = jobstack.size(); 341 if (j > 0 && (j + workers.length > n)) { 342 return true; 343 // if j > 0 no worker should be idle 344 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 345 } 346 int x = 0; 347 for (int i = 0; i < workers.length; i++) { 348 if (workers[i].working) { 349 x++; 350 } 351 } 352 if ((j + x) > n) { 353 return true; 354 } 355 return false; 356 } 357 358} 359 360 361/** 362 * Implements a shutdown task. 363 */ 364class ShutdownRequest implements Runnable { 365 366 367 /** 368 * Run the thread. 369 */ 370 public void run() { 371 System.out.println("running ShutdownRequest"); 372 } 373 374 375 /** 376 * toString. 377 * @see java.lang.Object#toString() 378 */ 379 @Override 380 public String toString() { 381 return "ShutdownRequest"; 382 } 383 384} 385 386 387/** 388 * Implements one local part of the distributed thread. 389 */ 390class DistPoolThread extends Thread { 391 392 393 final DistThreadPool pool; 394 395 396 final ExecutableChannels ec; 397 398 399 final int myId; 400 401 402 private static final Logger logger = LogManager.getLogger(DistPoolThread.class); 403 404 405 private static final boolean debug = logger.isDebugEnabled(); 406 407 408 boolean working = false; 409 410 411 /** 412 * @param pool DistThreadPool. 413 */ 414 public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) { 415 this.pool = pool; 416 this.ec = ec; 417 myId = i; 418 } 419 420 421 /** 422 * Run the thread. 423 */ 424 @Override 425 public void run() { 426 logger.info("ready, myId = " + myId); 427 Runnable job; 428 int done = 0; 429 long time = 0; 430 long t; 431 boolean running = true; 432 while (running) { 433 try { 434 logger.debug("looking for a job"); 435 job = pool.getJob(); 436 working = true; 437 if (debug) { 438 logger.info("working " + myId + " on " + job); 439 } 440 t = System.currentTimeMillis(); 441 // send and wait, like rmi 442 try { 443 if (job instanceof ShutdownRequest) { 444 ec.send(myId, ExecutableServer.STOP); 445 } else { 446 ec.send(myId, job); 447 } 448 if (debug) { 449 logger.info("send " + myId + " at " + ec + " send job " + job); 450 } 451 } catch (IOException e) { 452 e.printStackTrace(); 453 logger.info("error send " + myId + " at " + ec + " e = " + e); 454 working = false; 455 } 456 // remote: job.run(); 457 Object o = null; 458 try { 459 if (working) { 460 logger.info("waiting " + myId + " on " + job); 461 o = ec.receive(myId); 462 if (debug) { 463 logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o); 464 } 465 } 466 } catch (IOException e) { 467 logger.info("receive exception " + myId + " send job " + job + ", " + e); 468 //e.printStackTrace(); 469 running = false; 470 } catch (ClassNotFoundException e) { 471 logger.info("receive exception " + myId + " send job " + job + ", " + e); 472 //e.printStackTrace(); 473 running = false; 474 } finally { 475 if (debug) { 476 logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received " 477 + o + " running " + running); 478 } 479 } 480 working = false; 481 time += System.currentTimeMillis() - t; 482 done++; 483 if (debug) { 484 logger.info("done " + myId + " with " + o); 485 } 486 } catch (InterruptedException e) { 487 running = false; 488 Thread.currentThread().interrupt(); 489 } 490 } 491 logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds"); 492 } 493 494}