001    /*
002     * $Id: ThreadPool.java,v 1.7 2004/09/18 20:10:09 heinz Exp $
003     */
004    
005    //package edu.unima.ky.parallel;
006    //package edu.jas;
007    package thread;
008    
009    import util.Logger;
010    
011    //import java.util.Stack;
012    import java.util.LinkedList;
013    
014    //import org.apache.log4j.Logger;
015    
016    //import edu.unima.ky.parallel.Semaphore;
017    
018    /**
019     * Thread Pool using stack / list workpile.
020     * @author Akitoshi Yoshida
021     * @author Heinz Kredel.
022     */
023    
024    public class ThreadPool {
025        static final int DEFAULT_SIZE = 3;
026        protected PoolThread[] workers;
027        protected int idleworkers = 0; 
028        // should be expressed using strategy pattern
029        // List or Collection is not appropriate
030                                       // LIFO strategy for recursion
031        protected LinkedList jobstack; // FIFO strategy for GB
032    
033        protected StrategyEnumeration strategy = StrategyEnumeration.LIFO;
034    
035        //private static Logger logger = Logger.getLogger(ThreadPool.class);
036        private static Logger logger = new Logger(1);
037    
038    /**
039     * Constructs a new ThreadPool
040     * with strategy StrategyEnumeration.FIFO
041     * and size DEFAULT_SIZE.
042     */ 
043        public ThreadPool() {
044            this(StrategyEnumeration.FIFO,DEFAULT_SIZE);
045        }
046    
047    
048    /**
049     * Constructs a new ThreadPool
050     * with size DEFAULT_SIZE.
051     * @param strategy for job processing.
052     */ 
053        public ThreadPool(StrategyEnumeration strategy) {
054            this(strategy,DEFAULT_SIZE);
055        }
056    
057    
058    /**
059     * Constructs a new ThreadPool
060     * with strategy StrategyEnumeration.FIFO.
061     * @param size of the pool.
062     */
063        public ThreadPool(int size) {
064            this(StrategyEnumeration.FIFO,size);
065        }
066    
067    /**
068     * Constructs a new ThreadPool.
069     * @param strategy for job processing.
070     * @param size of the pool.
071     */ 
072        public ThreadPool(StrategyEnumeration strategy, int size) {
073            this.strategy = strategy;
074            jobstack = new LinkedList(); // ok for all strategies ?
075            workers = new PoolThread[size];
076            for (int i = 0; i < workers.length; i++) {
077                workers[i] = new PoolThread(this);
078                workers[i].start();
079            }
080            logger.info("strategy = " + strategy);
081        }
082    
083    /**
084     * Number of worker threads.
085     * @return number of worker threads.
086     */
087        public int getNumber() {
088            return workers.length; // not null
089        }
090    
091    /**
092     * Get used strategy.
093     * @return strategy.
094     */
095        public StrategyEnumeration getStrategy() {
096            return strategy; 
097        }
098    
099    /**
100     * Terminates the threads.
101     */
102        public void terminate() {
103            while ( hasJobs() ) {
104                try {
105                    Thread.sleep(100);
106                } catch (InterruptedException e) {
107                }
108            }
109            for (int i = 0; i < workers.length; i++) {
110                try { 
111                    while ( workers[i].isAlive() ) {
112                            workers[i].interrupt(); 
113                            workers[i].join(100);
114                    }
115                } catch (InterruptedException e) { 
116                }
117            }
118        }
119    
120    /**
121     * Adds a job to the workpile.
122     * @param job Runnable.
123     */
124        public synchronized void addJob(Runnable job) {
125            jobstack.addLast(job);
126            logger.debug("adding job" );
127            if (idleworkers > 0) {
128                logger.debug("notifying a jobless worker");
129                notify();
130            }
131        }
132    
133    
134    /**
135     * Get a job for processing.
136     * @return a Runnable.
137     * @throws InterruptedException
138     */
139        protected synchronized Runnable getJob() throws InterruptedException {
140            while (jobstack.isEmpty()) {
141                idleworkers++;
142                logger.debug("waiting");
143                wait();
144                idleworkers--;
145            }
146            // is expressed using strategy enumeration
147            if (strategy == StrategyEnumeration.LIFO) { 
148                 return (Runnable)jobstack.removeLast(); // LIFO
149            } else {
150                 return (Runnable)jobstack.removeFirst(); // FIFO
151            }
152        }
153    
154    
155    /**
156     * Check if there are jobs for processing.
157     * @return true if there are jobs for processing, else false.
158     */
159        public boolean hasJobs() {
160            if ( jobstack.size() > 0 ) {
161                return true;
162            }
163            for (int i = 0; i < workers.length; i++) {
164                if ( workers[i].working ) return true;
165            }
166            return false;
167        }
168    
169    
170    /**
171     * Check if there are more than n jobs for processing.
172     * @param n number of jobs to check.
173     * @return true if there are more than n jobs for processing, else false.
174     */
175        public boolean hasJobs(int n) {
176            int j = jobstack.size();
177            if ( j > 0 && ( j + workers.length > n ) ) return true;
178               // if j > 0 no worker should be idle
179               // ( ( j > 0 && ( j+workers.length > n ) ) || ( j > n )
180            int x = 0;
181            for (int i=0; i < workers.length; i++) {
182                if ( workers[i].working ) x++;
183            }
184            if ( (j + x) > n ) return true;
185            return false;
186        }
187    
188    }
189    
190    /**
191     * Implements one Thread of the pool.
192     */
193    class PoolThread extends Thread {
194        ThreadPool pool;
195        //private static Logger logger = Logger.getLogger(ThreadPool.class);
196        private static Logger logger = new Logger(1);
197    
198        boolean working = false;
199    
200    /**
201     * @param pool a ThreadPool.
202     */
203        public PoolThread(ThreadPool pool) {
204            this.pool = pool;
205        }
206    
207    
208    /**
209     * Run the thread.
210     */
211        public void run() {
212            logger.info( "ready" );
213            Runnable job;
214            int done = 0;
215            long time = 0;
216            long t;
217            boolean running = true;
218            while (running) {
219                try {
220                    logger.debug( "looking for a job" );
221                    job = pool.getJob();
222                    working = true;
223                    logger.info( "working" );
224                    t = System.currentTimeMillis();
225                    job.run(); 
226                    working = false;
227                    time += System.currentTimeMillis() - t;
228                    done++;
229                    logger.info( "done" );
230                } catch (InterruptedException e) { 
231                  running = false; 
232                }
233            }
234            logger.info( "terminated, done " + done + " jobs in " 
235                            + time + " milliseconds");
236        }
237    
238    }