001 /*
002 * $Id: ThreadPool.java,v 1.7 2004/09/18 20:10:09 heinz Exp $
003 */
004
005 //package edu.unima.ky.parallel;
006 //package edu.jas;
007 package thread;
008
009 import util.Logger;
010
011 //import java.util.Stack;
012 import java.util.LinkedList;
013
014 //import org.apache.log4j.Logger;
015
016 //import edu.unima.ky.parallel.Semaphore;
017
018 /**
019 * Thread Pool using stack / list workpile.
020 * @author Akitoshi Yoshida
021 * @author Heinz Kredel.
022 */
023
024 public class ThreadPool {
025 static final int DEFAULT_SIZE = 3;
026 protected PoolThread[] workers;
027 protected int idleworkers = 0;
028 // should be expressed using strategy pattern
029 // List or Collection is not appropriate
030 // LIFO strategy for recursion
031 protected LinkedList jobstack; // FIFO strategy for GB
032
033 protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
034
035 //private static Logger logger = Logger.getLogger(ThreadPool.class);
036 private static Logger logger = new Logger(1);
037
038 /**
039 * Constructs a new ThreadPool
040 * with strategy StrategyEnumeration.FIFO
041 * and size DEFAULT_SIZE.
042 */
043 public ThreadPool() {
044 this(StrategyEnumeration.FIFO,DEFAULT_SIZE);
045 }
046
047
048 /**
049 * Constructs a new ThreadPool
050 * with size DEFAULT_SIZE.
051 * @param strategy for job processing.
052 */
053 public ThreadPool(StrategyEnumeration strategy) {
054 this(strategy,DEFAULT_SIZE);
055 }
056
057
058 /**
059 * Constructs a new ThreadPool
060 * with strategy StrategyEnumeration.FIFO.
061 * @param size of the pool.
062 */
063 public ThreadPool(int size) {
064 this(StrategyEnumeration.FIFO,size);
065 }
066
067 /**
068 * Constructs a new ThreadPool.
069 * @param strategy for job processing.
070 * @param size of the pool.
071 */
072 public ThreadPool(StrategyEnumeration strategy, int size) {
073 this.strategy = strategy;
074 jobstack = new LinkedList(); // ok for all strategies ?
075 workers = new PoolThread[size];
076 for (int i = 0; i < workers.length; i++) {
077 workers[i] = new PoolThread(this);
078 workers[i].start();
079 }
080 logger.info("strategy = " + strategy);
081 }
082
083 /**
084 * Number of worker threads.
085 * @return number of worker threads.
086 */
087 public int getNumber() {
088 return workers.length; // not null
089 }
090
091 /**
092 * Get used strategy.
093 * @return strategy.
094 */
095 public StrategyEnumeration getStrategy() {
096 return strategy;
097 }
098
099 /**
100 * Terminates the threads.
101 */
102 public void terminate() {
103 while ( hasJobs() ) {
104 try {
105 Thread.sleep(100);
106 } catch (InterruptedException e) {
107 }
108 }
109 for (int i = 0; i < workers.length; i++) {
110 try {
111 while ( workers[i].isAlive() ) {
112 workers[i].interrupt();
113 workers[i].join(100);
114 }
115 } catch (InterruptedException e) {
116 }
117 }
118 }
119
120 /**
121 * Adds a job to the workpile.
122 * @param job Runnable.
123 */
124 public synchronized void addJob(Runnable job) {
125 jobstack.addLast(job);
126 logger.debug("adding job" );
127 if (idleworkers > 0) {
128 logger.debug("notifying a jobless worker");
129 notify();
130 }
131 }
132
133
134 /**
135 * Get a job for processing.
136 * @return a Runnable.
137 * @throws InterruptedException
138 */
139 protected synchronized Runnable getJob() throws InterruptedException {
140 while (jobstack.isEmpty()) {
141 idleworkers++;
142 logger.debug("waiting");
143 wait();
144 idleworkers--;
145 }
146 // is expressed using strategy enumeration
147 if (strategy == StrategyEnumeration.LIFO) {
148 return (Runnable)jobstack.removeLast(); // LIFO
149 } else {
150 return (Runnable)jobstack.removeFirst(); // FIFO
151 }
152 }
153
154
155 /**
156 * Check if there are jobs for processing.
157 * @return true if there are jobs for processing, else false.
158 */
159 public boolean hasJobs() {
160 if ( jobstack.size() > 0 ) {
161 return true;
162 }
163 for (int i = 0; i < workers.length; i++) {
164 if ( workers[i].working ) return true;
165 }
166 return false;
167 }
168
169
170 /**
171 * Check if there are more than n jobs for processing.
172 * @param n number of jobs to check.
173 * @return true if there are more than n jobs for processing, else false.
174 */
175 public boolean hasJobs(int n) {
176 int j = jobstack.size();
177 if ( j > 0 && ( j + workers.length > n ) ) return true;
178 // if j > 0 no worker should be idle
179 // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
180 int x = 0;
181 for (int i=0; i < workers.length; i++) {
182 if ( workers[i].working ) x++;
183 }
184 if ( (j + x) > n ) return true;
185 return false;
186 }
187
188 }
189
190 /**
191 * Implements one Thread of the pool.
192 */
193 class PoolThread extends Thread {
194 ThreadPool pool;
195 //private static Logger logger = Logger.getLogger(ThreadPool.class);
196 private static Logger logger = new Logger(1);
197
198 boolean working = false;
199
200 /**
201 * @param pool a ThreadPool.
202 */
203 public PoolThread(ThreadPool pool) {
204 this.pool = pool;
205 }
206
207
208 /**
209 * Run the thread.
210 */
211 public void run() {
212 logger.info( "ready" );
213 Runnable job;
214 int done = 0;
215 long time = 0;
216 long t;
217 boolean running = true;
218 while (running) {
219 try {
220 logger.debug( "looking for a job" );
221 job = pool.getJob();
222 working = true;
223 logger.info( "working" );
224 t = System.currentTimeMillis();
225 job.run();
226 working = false;
227 time += System.currentTimeMillis() - t;
228 done++;
229 logger.info( "done" );
230 } catch (InterruptedException e) {
231 running = false;
232 }
233 }
234 logger.info( "terminated, done " + done + " jobs in "
235 + time + " milliseconds");
236 }
237
238 }