001 /* 002 * $Id: ThreadPool.java,v 1.7 2004/09/18 20:10:09 heinz Exp $ 003 */ 004 005 //package edu.unima.ky.parallel; 006 //package edu.jas; 007 package thread; 008 009 import util.Logger; 010 011 //import java.util.Stack; 012 import java.util.LinkedList; 013 014 //import org.apache.log4j.Logger; 015 016 //import edu.unima.ky.parallel.Semaphore; 017 018 /** 019 * Thread Pool using stack / list workpile. 020 * @author Akitoshi Yoshida 021 * @author Heinz Kredel. 022 */ 023 024 public class ThreadPool { 025 static final int DEFAULT_SIZE = 3; 026 protected PoolThread[] workers; 027 protected int idleworkers = 0; 028 // should be expressed using strategy pattern 029 // List or Collection is not appropriate 030 // LIFO strategy for recursion 031 protected LinkedList jobstack; // FIFO strategy for GB 032 033 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO; 034 035 //private static Logger logger = Logger.getLogger(ThreadPool.class); 036 private static Logger logger = new Logger(1); 037 038 /** 039 * Constructs a new ThreadPool 040 * with strategy StrategyEnumeration.FIFO 041 * and size DEFAULT_SIZE. 042 */ 043 public ThreadPool() { 044 this(StrategyEnumeration.FIFO,DEFAULT_SIZE); 045 } 046 047 048 /** 049 * Constructs a new ThreadPool 050 * with size DEFAULT_SIZE. 051 * @param strategy for job processing. 052 */ 053 public ThreadPool(StrategyEnumeration strategy) { 054 this(strategy,DEFAULT_SIZE); 055 } 056 057 058 /** 059 * Constructs a new ThreadPool 060 * with strategy StrategyEnumeration.FIFO. 061 * @param size of the pool. 062 */ 063 public ThreadPool(int size) { 064 this(StrategyEnumeration.FIFO,size); 065 } 066 067 /** 068 * Constructs a new ThreadPool. 069 * @param strategy for job processing. 070 * @param size of the pool. 071 */ 072 public ThreadPool(StrategyEnumeration strategy, int size) { 073 this.strategy = strategy; 074 jobstack = new LinkedList(); // ok for all strategies ? 075 workers = new PoolThread[size]; 076 for (int i = 0; i < workers.length; i++) { 077 workers[i] = new PoolThread(this); 078 workers[i].start(); 079 } 080 logger.info("strategy = " + strategy); 081 } 082 083 /** 084 * Number of worker threads. 085 * @return number of worker threads. 086 */ 087 public int getNumber() { 088 return workers.length; // not null 089 } 090 091 /** 092 * Get used strategy. 093 * @return strategy. 094 */ 095 public StrategyEnumeration getStrategy() { 096 return strategy; 097 } 098 099 /** 100 * Terminates the threads. 101 */ 102 public void terminate() { 103 while ( hasJobs() ) { 104 try { 105 Thread.sleep(100); 106 } catch (InterruptedException e) { 107 } 108 } 109 for (int i = 0; i < workers.length; i++) { 110 try { 111 while ( workers[i].isAlive() ) { 112 workers[i].interrupt(); 113 workers[i].join(100); 114 } 115 } catch (InterruptedException e) { 116 } 117 } 118 } 119 120 /** 121 * Adds a job to the workpile. 122 * @param job Runnable. 123 */ 124 public synchronized void addJob(Runnable job) { 125 jobstack.addLast(job); 126 logger.debug("adding job" ); 127 if (idleworkers > 0) { 128 logger.debug("notifying a jobless worker"); 129 notify(); 130 } 131 } 132 133 134 /** 135 * Get a job for processing. 136 * @return a Runnable. 137 * @throws InterruptedException 138 */ 139 protected synchronized Runnable getJob() throws InterruptedException { 140 while (jobstack.isEmpty()) { 141 idleworkers++; 142 logger.debug("waiting"); 143 wait(); 144 idleworkers--; 145 } 146 // is expressed using strategy enumeration 147 if (strategy == StrategyEnumeration.LIFO) { 148 return (Runnable)jobstack.removeLast(); // LIFO 149 } else { 150 return (Runnable)jobstack.removeFirst(); // FIFO 151 } 152 } 153 154 155 /** 156 * Check if there are jobs for processing. 157 * @return true if there are jobs for processing, else false. 158 */ 159 public boolean hasJobs() { 160 if ( jobstack.size() > 0 ) { 161 return true; 162 } 163 for (int i = 0; i < workers.length; i++) { 164 if ( workers[i].working ) return true; 165 } 166 return false; 167 } 168 169 170 /** 171 * Check if there are more than n jobs for processing. 172 * @param n number of jobs to check. 173 * @return true if there are more than n jobs for processing, else false. 174 */ 175 public boolean hasJobs(int n) { 176 int j = jobstack.size(); 177 if ( j > 0 && ( j + workers.length > n ) ) return true; 178 // if j > 0 no worker should be idle 179 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n ) 180 int x = 0; 181 for (int i=0; i < workers.length; i++) { 182 if ( workers[i].working ) x++; 183 } 184 if ( (j + x) > n ) return true; 185 return false; 186 } 187 188 } 189 190 /** 191 * Implements one Thread of the pool. 192 */ 193 class PoolThread extends Thread { 194 ThreadPool pool; 195 //private static Logger logger = Logger.getLogger(ThreadPool.class); 196 private static Logger logger = new Logger(1); 197 198 boolean working = false; 199 200 /** 201 * @param pool a ThreadPool. 202 */ 203 public PoolThread(ThreadPool pool) { 204 this.pool = pool; 205 } 206 207 208 /** 209 * Run the thread. 210 */ 211 public void run() { 212 logger.info( "ready" ); 213 Runnable job; 214 int done = 0; 215 long time = 0; 216 long t; 217 boolean running = true; 218 while (running) { 219 try { 220 logger.debug( "looking for a job" ); 221 job = pool.getJob(); 222 working = true; 223 logger.info( "working" ); 224 t = System.currentTimeMillis(); 225 job.run(); 226 working = false; 227 time += System.currentTimeMillis() - t; 228 done++; 229 logger.info( "done" ); 230 } catch (InterruptedException e) { 231 running = false; 232 } 233 } 234 logger.info( "terminated, done " + done + " jobs in " 235 + time + " milliseconds"); 236 } 237 238 }