001    /*
002     * $Id: ComputerThreads.java 3295 2010-08-26 17:01:10Z kredel $
003     */
004    
005    package edu.jas.kern;
006    
007    
008    import java.util.List;
009    import java.util.concurrent.BlockingQueue;
010    import java.util.concurrent.ExecutorService;
011    import java.util.concurrent.Executors;
012    import java.util.concurrent.ThreadPoolExecutor;
013    
014    import org.apache.log4j.Logger;
015    
016    
017    /**
018     * ComputerThreads, provides global thread / executor service.
019     * @author Heinz Kredel
020     * @usage To obtain a reference to the thread pool use
021     *        <code>ComputerThreads.getPool()</code>. Once a pool has been created
022     *        it must be shutdown to exit JAS with
023     *        <code>ComputerThreads.terminate()</code>.
024     */
025    
026    public class ComputerThreads {
027    
028    
029        private static final Logger logger = Logger.getLogger(ComputerThreads.class);
030    
031    
032        // private boolean debug = logger.isInfoEnabled(); //logger.isInfoEnabled();
033    
034    
035        /**
036         * Flag for thread usage. <b>Note:</b> Only introduced because Google app
037         * engine does not support threads.
038         * @see edu.jas.ufd.GCDFactory#getProxy(edu.jas.structure.RingFactory)
039         */
040        public static boolean NO_THREADS = false;
041    
042    
043        /**
044         * Number of processors.
045         */
046        public static final int N_CPUS = Runtime.getRuntime().availableProcessors();
047    
048    
049        /*
050          * Core number of threads.
051          * N_CPUS x 1.5, x 2, x 2.5, min 3, ?.
052          */
053        public static final int N_THREADS = (N_CPUS < 3 ? 3 : N_CPUS + N_CPUS / 2);
054    
055    
056        //public static final int N_THREADS = ( N_CPUS < 3 ? 5 : 3*N_CPUS );
057    
058    
059        /*
060          * Queue capacity.
061          */
062        //public static final int Q_CAPACITY = 1000; // 10000
063    
064        /*
065          * WorkQueue.
066          */
067        //private static BlockingQueue<Runnable> workpile; 
068    
069        /*
070          * Saturation policy.
071          */
072        //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.CallerRunsPolicy();
073        //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.AbortPolicy();
074    
075        /**
076         * ExecutorService thread pool.
077         */
078        //static ThreadPoolExecutor pool = null;
079        static ExecutorService pool = null;
080    
081    
082        /**
083         * No public constructor.
084         */
085        private ComputerThreads() {
086        }
087    
088    
089        /**
090         * Test if a pool is running.
091         * @return true if a thread pool has been started or is running, else false.
092         */
093        public static synchronized boolean isRunning() {
094            if (pool == null) {
095                return false;
096            }
097            if (pool.isTerminated() || pool.isShutdown()) {
098                return false;
099            }
100            return true;
101        }
102    
103    
104        /**
105         * Get the thread pool.
106         * @return pool ExecutorService.
107         */
108        public static synchronized ExecutorService getPool() {
109            if (pool == null) {
110                // workpile = new ArrayBlockingQueue<Runnable>(Q_CAPACITY);
111                //            pool = Executors.newFixedThreadPool(N_THREADS);
112                pool = Executors.newCachedThreadPool();
113                //             pool = new ThreadPoolExecutor(N_CPUS, N_THREADS,
114                //                                           100L, TimeUnit.MILLISECONDS,
115                //                                           workpile, REH);
116                //             pool = new ThreadPoolExecutor(N_CPUS, N_THREADS,
117                //                                           1000L, TimeUnit.MILLISECONDS,
118                //                                           workpile);
119            }
120            //System.out.println("pool_init = " + pool);
121            return pool;
122            //return Executors.unconfigurableExecutorService(pool);
123    
124            /* not useful, is not run from jython
125            final GCDProxy<C> proxy = this;
126            Runtime.getRuntime().addShutdownHook( 
127                             new Thread() {
128                                 public void run() {
129                                        logger.info("running shutdown hook");
130                                        proxy.terminate();
131                                 }
132                             }
133            );
134            */
135        }
136    
137    
138        /**
139         * Stop execution.
140         */
141        public static synchronized void terminate() {
142            if (pool == null) {
143                return;
144            }
145            if (pool instanceof ThreadPoolExecutor) {
146                ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;
147                //logger.info("task queue size         " + Q_CAPACITY);
148                //logger.info("reject execution handler" + REH.getClass().getName());
149                logger.info("number of CPUs            " + N_CPUS);
150                logger.info("core number of threads    " + N_THREADS);
151                logger.info("current number of threads " + tpe.getPoolSize());
152                logger.info("maximal number of threads " + tpe.getLargestPoolSize());
153                BlockingQueue<Runnable> workpile = tpe.getQueue();
154                if (workpile != null) {
155                    logger.info("queued tasks              " + workpile.size());
156                }
157                List<Runnable> r = tpe.shutdownNow();
158                if (r.size() != 0) {
159                    logger.info("unfinished tasks          " + r.size());
160                }
161                logger.info("number of sheduled tasks  " + tpe.getTaskCount());
162                logger.info("number of completed tasks " + tpe.getCompletedTaskCount());
163            }
164            pool = null;
165            //workpile = null;
166        }
167    
168    
169        /**
170         * Set no thread usage.
171         */
172        public static synchronized void setNoThreads() {
173            NO_THREADS = true;
174        }
175    
176    
177        /**
178         * Set thread usage.
179         */
180        public static synchronized void setThreads() {
181            NO_THREADS = false;
182        }
183    
184    }