001 /*
002 * $Id: DistThreadPool.java 3337 2010-09-27 21:05:17Z kredel $
003 */
004
005 package edu.jas.util;
006
007
008 import java.io.FileNotFoundException;
009 import java.io.IOException;
010 import java.util.LinkedList;
011
012 import org.apache.log4j.Logger;
013
014
015 /**
016 * Distributed thread pool. Using stack / list workpile and Executable Channels
017 * and Servers.
018 * @author Heinz Kredel
019 */
020
021 public class DistThreadPool /*extends ThreadPool*/{
022
023
024 /**
025 * machine file to use.
026 */
027 private final String mfile;
028
029
030 /**
031 * default machine file for test.
032 */
033 private final static String DEFAULT_MFILE = ExecutableChannels.DEFAULT_MFILE;
034
035
036 /**
037 * Number of threads to use.
038 */
039 protected final int threads;
040
041
042 /**
043 * Default number of threads to use.
044 */
045 static final int DEFAULT_SIZE = 3;
046
047
048 /**
049 * Channels to remote executable servers.
050 */
051 final ExecutableChannels ec;
052
053
054 /**
055 * Array of workers.
056 */
057 protected DistPoolThread[] workers;
058
059
060 /**
061 * Number of idle workers.
062 */
063 protected int idleworkers = 0;
064
065
066 /**
067 * Work queue / stack.
068 */
069 // should be expressed using strategy pattern
070 // List or Collection is not appropriate
071 // LIFO strategy for recursion
072 protected LinkedList<Runnable> jobstack; // FIFO strategy for GB
073
074
075 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
076
077
078 private static final Logger logger = Logger.getLogger(DistThreadPool.class);
079
080
081 private final boolean debug = logger.isDebugEnabled();
082
083
084 /**
085 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO
086 * and size DEFAULT_SIZE.
087 */
088 public DistThreadPool() {
089 this(StrategyEnumeration.FIFO, DEFAULT_SIZE, null);
090 }
091
092
093 /**
094 * Constructs a new DistThreadPool with size DEFAULT_SIZE.
095 * @param strategy for job processing.
096 */
097 public DistThreadPool(StrategyEnumeration strategy) {
098 this(strategy, DEFAULT_SIZE, null);
099 }
100
101
102 /**
103 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
104 * @param size of the pool.
105 */
106 public DistThreadPool(int size) {
107 this(StrategyEnumeration.FIFO, size, null);
108 }
109
110
111 /**
112 * Constructs a new DistThreadPool with strategy StrategyEnumeration.FIFO.
113 * @param size of the pool.
114 * @param mfile machine file.
115 */
116 public DistThreadPool(int size, String mfile) {
117 this(StrategyEnumeration.FIFO, size, mfile);
118 }
119
120
121 /**
122 * Constructs a new DistThreadPool.
123 * @param strategy for job processing.
124 * @param size of the pool.
125 * @param mfile machine file.
126 */
127 public DistThreadPool(StrategyEnumeration strategy, int size, String mfile) {
128 this.strategy = strategy;
129 if (size < 0) {
130 this.threads = 0;
131 } else {
132 this.threads = size;
133 }
134 if (mfile == null || mfile.length() == 0) {
135 this.mfile = DEFAULT_MFILE;
136 } else {
137 this.mfile = mfile;
138 }
139 jobstack = new LinkedList<Runnable>(); // ok for all strategies ?
140 try {
141 ec = new ExecutableChannels(this.mfile);
142 } catch (FileNotFoundException e) {
143 e.printStackTrace();
144 throw new IllegalArgumentException("DistThreadPool " + e);
145 }
146 if (debug) {
147 logger.debug("ExecutableChannels = " + ec);
148 }
149 try {
150 ec.open(threads);
151 } catch (IOException e) {
152 e.printStackTrace();
153 throw new IllegalArgumentException("DistThreadPool " + e);
154 }
155 if (debug) {
156 logger.debug("ExecutableChannels = " + ec);
157 }
158 workers = new DistPoolThread[0];
159 }
160
161
162 /**
163 * thread initialization and start.
164 */
165 public void init() {
166 if (workers == null || workers.length == 0) {
167 workers = new DistPoolThread[threads];
168 for (int i = 0; i < workers.length; i++) {
169 workers[i] = new DistPoolThread(this, ec, i);
170 workers[i].start();
171 }
172 logger.info("size = " + threads + ", strategy = " + strategy);
173 }
174 }
175
176
177 /**
178 * number of worker threads.
179 */
180 public int getNumber() {
181 if (workers == null || workers.length < threads) {
182 init(); // start threads
183 }
184 return workers.length; // not null
185 }
186
187
188 /**
189 * get used strategy.
190 */
191 public StrategyEnumeration getStrategy() {
192 return strategy;
193 }
194
195
196 /**
197 * the used executable channel.
198 */
199 public ExecutableChannels getEC() {
200 return ec; // not null
201 }
202
203
204 /**
205 * Terminates the threads.
206 * @param shutDown true, if shut-down of the remote executable servers is
207 * requested, false, if remote executable servers stay alive.
208 */
209 public void terminate(boolean shutDown) {
210 if (shutDown) {
211 ShutdownRequest sdr = new ShutdownRequest();
212 for (int i = 0; i < workers.length; i++) {
213 addJob(sdr);
214 }
215 try {
216 Thread.sleep(20);
217 } catch (InterruptedException e) {
218 Thread.currentThread().interrupt();
219 }
220 logger.info("remaining jobs = " + jobstack.size());
221 try {
222 for (int i = 0; i < workers.length; i++) {
223 while (workers[i].isAlive()) {
224 workers[i].interrupt();
225 workers[i].join(100);
226 }
227 }
228 } catch (InterruptedException e) {
229 Thread.currentThread().interrupt();
230 }
231 } else {
232 terminate();
233 }
234 }
235
236
237 /**
238 * Terminates the threads.
239 */
240 public void terminate() {
241 while (hasJobs()) {
242 try {
243 Thread.sleep(100);
244 } catch (InterruptedException e) {
245 Thread.currentThread().interrupt();
246 }
247 }
248 for (int i = 0; i < workers.length; i++) {
249 try {
250 while (workers[i].isAlive()) {
251 workers[i].interrupt();
252 workers[i].join(100);
253 }
254 } catch (InterruptedException e) {
255 Thread.currentThread().interrupt();
256 }
257 }
258 ec.close();
259 }
260
261
262 /**
263 * adds a job to the workpile.
264 * @param job
265 */
266 public synchronized void addJob(Runnable job) {
267 if (workers == null || workers.length < threads) {
268 init(); // start threads
269 }
270 jobstack.addLast(job);
271 logger.debug("adding job");
272 if (idleworkers > 0) {
273 logger.debug("notifying a jobless worker");
274 notify();
275 }
276 }
277
278
279 /**
280 * get a job for processing.
281 */
282 protected synchronized Runnable getJob() throws InterruptedException {
283 while (jobstack.isEmpty()) {
284 idleworkers++;
285 logger.debug("waiting");
286 wait();
287 idleworkers--;
288 }
289 // is expressed using strategy enumeration
290 if (strategy == StrategyEnumeration.LIFO) {
291 return jobstack.removeLast(); // LIFO
292 }
293 return jobstack.removeFirst(); // FIFO
294 }
295
296
297 /**
298 * check if there are jobs for processing.
299 */
300 public boolean hasJobs() {
301 if (jobstack.size() > 0) {
302 return true;
303 }
304 for (int i = 0; i < workers.length; i++) {
305 if (workers[i].working) {
306 return true;
307 }
308 }
309 return false;
310 }
311
312
313 /**
314 * check if there are more than n jobs for processing.
315 * @param n Integer
316 * @return true, if there are possibly more than n jobs.
317 */
318 public boolean hasJobs(int n) {
319 int j = jobstack.size();
320 if (j > 0 && (j + workers.length > n)) {
321 return true;
322 // if j > 0 no worker should be idle
323 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
324 }
325 int x = 0;
326 for (int i = 0; i < workers.length; i++) {
327 if (workers[i].working) {
328 x++;
329 }
330 }
331 if ((j + x) > n) {
332 return true;
333 }
334 return false;
335 }
336
337 }
338
339
340 /**
341 * Implements a shutdown task.
342 */
343 class ShutdownRequest implements Runnable {
344
345
346 /**
347 * Run the thread.
348 */
349 public void run() {
350 System.out.println("ShutdownRequest");
351 }
352 }
353
354
355 /**
356 * Implements one local part of the distributed thread.
357 */
358 class DistPoolThread extends Thread {
359
360
361 final DistThreadPool pool;
362
363
364 final ExecutableChannels ec;
365
366
367 final int myId;
368
369
370 private static final Logger logger = Logger.getLogger(DistPoolThread.class);
371
372
373 private final boolean debug = logger.isInfoEnabled();
374
375
376 boolean working = false;
377
378
379 /**
380 * @param pool DistThreadPool.
381 */
382 public DistPoolThread(DistThreadPool pool, ExecutableChannels ec, int i) {
383 this.pool = pool;
384 this.ec = ec;
385 myId = i;
386 }
387
388
389 /**
390 * Run the thread.
391 */
392 @Override
393 public void run() {
394 logger.info("ready, myId = " + myId);
395 Runnable job;
396 int done = 0;
397 long time = 0;
398 long t;
399 boolean running = true;
400 while (running) {
401 try {
402 logger.debug("looking for a job");
403 job = pool.getJob();
404 working = true;
405 if (debug) {
406 logger.info("working " + myId + " on " + job);
407 }
408 t = System.currentTimeMillis();
409 // send and wait, like rmi
410 try {
411 if (job instanceof ShutdownRequest) {
412 ec.send(myId, ExecutableServer.STOP);
413 } else {
414 ec.send(myId, job);
415 }
416 logger.info("send " + myId + " at " + ec + " send job " + job);
417 } catch (IOException e) {
418 e.printStackTrace();
419 logger.info("error send " + myId + " at " + ec + " e = " + e);
420 working = false;
421 }
422 //job.run();
423 Object o = null;
424 try {
425 if (working) {
426 logger.info("waiting " + myId + " on " + job);
427 o = ec.receive(myId);
428 logger.info("receive " + myId + " at " + ec + " send job " + job + " received " + o);
429 }
430 } catch (IOException e) {
431 logger.info("receive exception " + myId + " send job " + job + ", " + e);
432 //e.printStackTrace();
433 running = false;
434 } catch (ClassNotFoundException e) {
435 logger.info("receive exception " + myId + " send job " + job + ", " + e);
436 //e.printStackTrace();
437 running = false;
438 } finally {
439 logger.info("receive finally " + myId + " at " + ec + " send job " + job + " received "
440 + o + " running " + running);
441 }
442 working = false;
443 time += System.currentTimeMillis() - t;
444 done++;
445 if (debug) {
446 logger.info("done " + myId + " with " + o);
447 }
448 } catch (InterruptedException e) {
449 running = false;
450 Thread.currentThread().interrupt();
451 }
452 }
453 logger.info("terminated " + myId + " , done " + done + " jobs in " + time + " milliseconds");
454 }
455
456 }