001 /* 002 * $Id: DistThreadPool.java 3337 2010-09-27 21:05:17Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 008 import java.io.FileNotFoundException; 009 import java.io.IOException; 010 import java.util.LinkedList; 011 012 import org.apache.log4j.Logger; 013 014 015 /** 016 * Distributed thread pool. Using stack / list workpile and Executable Channels 017 * and Servers. 018 * @author Heinz Kredel 019 */ 020 021 public class DistThreadPool /*extends ThreadPool*/{ 022 023 024 /** 025 * machine file to use. 026 */ 027 private final String mfile; 028 029 030 /** 031 * default machine file for test. 032 */ 033 private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE; 034 035 036 /** 037 * Number of threads to use. 038 */ 039 protected final int threads; 040 041 042 /** 043 * Default number of threads to use. 044 */ 045 static final int DEFAULT_SIZE = 3; 046 047 048 /** 049 * Channels to remote executable servers. 050 */ 051 final ExecutableChannels ec; 052 053 054 /** 055 * Array of workers. 056 */ 057 protected DistPoolThread[] workers; 058 059 060 /** 061 * Number of idle workers. 062 */ 063 protected int idleworkers = 0; 064 065 066 /** 067 * Work queue / stack. 068 */ 069 // should be expressed using strategy pattern 070 // List or Collection is not appropriate 071 // LIFO strategy for recursion 072 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB 073 074 075 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 076 077 078 private static final Logger logger = Logger.getLogger(DistThreadPool.class); 079 080 081 private final boolean debug = logger.isDebugEnabled(); 082 083 084 /** 085 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO 086 * and size DEFAULT_SIZE. 087 */ 088 public DistThreadPool() { 089 this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null); 090 } 091 092 093 /** 094 * Constructs a new DistThreadPool with size DEFAULT_SIZE. 095 * @param strategy for job processing. 096 */ 097 public DistThreadPool(StrategyEnumeration strategy) { 098 this(strategy, DEFAULT_SIZE, null); 099 } 100 101 102 /** 103 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 104 * @param size of the pool. 105 */ 106 public DistThreadPool(int size) { 107 this(StrategyEnumeration.FIFO, size, null); 108 } 109 110 111 /** 112 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO. 113 * @param size of the pool. 114 * @param mfile machine file. 115 */ 116 public DistThreadPool(int size, String mfile) { 117 this(StrategyEnumeration.FIFO, size, mfile); 118 } 119 120 121 /** 122 * Constructs a new DistThreadPool. 123 * @param strategy for job processing. 124 * @param size of the pool. 125 * @param mfile machine file. 126 */ 127 public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) { 128 this.strategy = strategy; 129 if (size < 0) { 130 this.threads = 0; 131 } else { 132 this.threads = size; 133 } 134 if (mfile == null || mfile.length() == 0) { 135 this.mfile = DEFAULT_MFILE; 136 } else { 137 this.mfile = mfile; 138 } 139 jobstack = new LinkedList<Runnable>(); // ok for all strategies ? 140 try { 141 ec = new ExecutableChannels(this.mfile); 142 } catch (FileNotFoundException e) { 143 e.printStackTrace(); 144 throw new IllegalArgumentException("DistThreadPool " + e); 145 } 146 if (debug) { 147 logger.debug("ExecutableChannels = " + ec); 148 } 149 try { 150 ec.open(threads); 151 } catch (IOException e) { 152 e.printStackTrace(); 153 throw new IllegalArgumentException("DistThreadPool " + e); 154 } 155 if (debug) { 156 logger.debug("ExecutableChannels = " + ec); 157 } 158 workers = new DistPoolThread[0]; 159 } 160 161 162 /** 163 * thread initialization and start. 164 */ 165 public void init() { 166 if (workers == null || workers.length == 0) { 167 workers = new DistPoolThread[threads]; 168 for (int i = 0; i < workers.length; i++) { 169 workers[i] = new DistPoolThread(this, ec, i); 170 workers[i].start(); 171 } 172 logger.info("size = " + threads + ", strategy = " + strategy); 173 } 174 } 175 176 177 /** 178 * number of worker threads. 179 */ 180 public int getNumber() { 181 if (workers == null || workers.length < threads) { 182 init(); // start threads 183 } 184 return workers.length; // not null 185 } 186 187 188 /** 189 * get used strategy. 190 */ 191 public StrategyEnumeration getStrategy() { 192 return strategy; 193 } 194 195 196 /** 197 * the used executable channel. 198 */ 199 public ExecutableChannels getEC() { 200 return ec; // not null 201 } 202 203 204 /** 205 * Terminates the threads. 206 * @param shutDown true, if shut-down of the remote executable servers is 207 * requested, false, if remote executable servers stay alive. 208 */ 209 public void terminate(boolean shutDown) { 210 if (shutDown) { 211 ShutdownRequest sdr = new ShutdownRequest(); 212 for (int i = 0; i < workers.length; i++) { 213 addJob(sdr); 214 } 215 try { 216 Thread.sleep(20); 217 } catch (InterruptedException e) { 218 Thread.currentThread().interrupt(); 219 } 220 logger.info("remaining jobs = " + jobstack.size()); 221 try { 222 for (int i = 0; i < workers.length; i++) { 223 while (workers[i].isAlive()) { 224 workers[i].interrupt(); 225 workers[i].join(100); 226 } 227 } 228 } catch (InterruptedException e) { 229 Thread.currentThread().interrupt(); 230 } 231 } else { 232 terminate(); 233 } 234 } 235 236 237 /** 238 * Terminates the threads. 239 */ 240 public void terminate() { 241 while (hasJobs()) { 242 try { 243 Thread.sleep(100); 244 } catch (InterruptedException e) { 245 Thread.currentThread().interrupt(); 246 } 247 } 248 for (int i = 0; i < workers.length; i++) { 249 try { 250 while (workers[i].isAlive()) { 251 workers[i].interrupt(); 252 workers[i].join(100); 253 } 254 } catch (InterruptedException e) { 255 Thread.currentThread().interrupt(); 256 } 257 } 258 ec.close(); 259 } 260 261 262 /** 263 * adds a job to the workpile. 264 * @param job 265 */ 266 public synchronized void addJob(Runnable job) { 267 if (workers == null || workers.length < threads) { 268 init(); // start threads 269 } 270 jobstack.addLast(job); 271 logger.debug("adding job"); 272 if (idleworkers > 0) { 273 logger.debug("notifying a jobless worker"); 274 notify(); 275 } 276 } 277 278 279 /** 280 * get a job for processing. 281 */ 282 protected synchronized Runnable getJob() throws InterruptedException { 283 while (jobstack.isEmpty()) { 284 idleworkers++; 285 logger.debug("waiting"); 286 wait(); 287 idleworkers--; 288 } 289 // is expressed using strategy enumeration 290 if (strategy == StrategyEnumeration.LIFO) { 291 return jobstack.removeLast(); // LIFO 292 } 293 return jobstack.removeFirst(); // FIFO 294 } 295 296 297 /** 298 * check if there are jobs for processing. 299 */ 300 public boolean hasJobs() { 301 if (jobstack.size() > 0) { 302 return true; 303 } 304 for (int i = 0; i < workers.length; i++) { 305 if (workers[i].working) { 306 return true; 307 } 308 } 309 return false; 310 } 311 312 313 /** 314 * check if there are more than n jobs for processing. 315 * @param n Integer 316 * @return true, if there are possibly more than n jobs. 317 */ 318 public boolean hasJobs(int n) { 319 int j = jobstack.size(); 320 if (j > 0 && (j + workers.length > n)) { 321 return true; 322 // if j > 0 no worker should be idle 323 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 324 } 325 int x = 0; 326 for (int i = 0; i < workers.length; i++) { 327 if (workers[i].working) { 328 x++; 329 } 330 } 331 if ((j + x) > n) { 332 return true; 333 } 334 return false; 335 } 336 337 } 338 339 340 /** 341 * Implements a shutdown task. 342 */ 343 class ShutdownRequest implements Runnable { 344 345 346 /** 347 * Run the thread. 348 */ 349 public void run() { 350 System.out.println("ShutdownRequest"); 351 } 352 } 353 354 355 /** 356 * Implements one local part of the distributed thread. 357 */ 358 class DistPoolThread extends Thread { 359 360 361 final DistThreadPool pool; 362 363 364 final ExecutableChannels ec; 365 366 367 final int myId; 368 369 370 private static final Logger logger = Logger.getLogger(DistPoolThread.class); 371 372 373 private final boolean debug = logger.isInfoEnabled(); 374 375 376 boolean working = false; 377 378 379 /** 380 * @param pool DistThreadPool. 381 */ 382 public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) { 383 this.pool = pool; 384 this.ec = ec; 385 myId = i; 386 } 387 388 389 /** 390 * Run the thread. 391 */ 392 @Override 393 public void run() { 394 logger.info("ready, myId = " + myId); 395 Runnable job; 396 int done = 0; 397 long time = 0; 398 long t; 399 boolean running = true; 400 while (running) { 401 try { 402 logger.debug("looking for a job"); 403 job = pool.getJob(); 404 working = true; 405 if (debug) { 406 logger.info("working " + myId + " on " + job); 407 } 408 t = System.currentTimeMillis(); 409 // send and wait, like rmi 410 try { 411 if (job instanceof ShutdownRequest) { 412 ec.send(myId, ExecutableServer.STOP); 413 } else { 414 ec.send(myId, job); 415 } 416 logger.info("send " + myId + " at " + ec + " send job " + job); 417 } catch (IOException e) { 418 e.printStackTrace(); 419 logger.info("error send " + myId + " at " + ec + " e = " + e); 420 working = false; 421 } 422 //job.run(); 423 Object o = null; 424 try { 425 if (working) { 426 logger.info("waiting " + myId + " on " + job); 427 o = ec.receive(myId); 428 logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o); 429 } 430 } catch (IOException e) { 431 logger.info("receive exception " + myId + " send job " + job + ", " + e); 432 //e.printStackTrace(); 433 running = false; 434 } catch (ClassNotFoundException e) { 435 logger.info("receive exception " + myId + " send job " + job + ", " + e); 436 //e.printStackTrace(); 437 running = false; 438 } finally { 439 logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received " 440 + o + " running " + running); 441 } 442 working = false; 443 time += System.currentTimeMillis() - t; 444 done++; 445 if (debug) { 446 logger.info("done " + myId + " with " + o); 447 } 448 } catch (InterruptedException e) { 449 running = false; 450 Thread.currentThread().interrupt(); 451 } 452 } 453 logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds"); 454 } 455 456 }