001/*
002 * $Id: ComputerThreads.java 3295 2010-08-26 17:01:10Z kredel $
003 */
004
005package edu.jas.kern;
006
007
008import java.util.List;
009import java.util.concurrent.BlockingQueue;
010import java.util.concurrent.ExecutorService;
011import java.util.concurrent.Executors;
012import java.util.concurrent.ThreadPoolExecutor;
013
014import org.apache.log4j.Logger;
015
016
017/**
018 * ComputerThreads, provides global thread / executor service.
019 * @author Heinz Kredel
020 * @usage To obtain a reference to the thread pool use
021 *        <code>ComputerThreads.getPool()</code>. Once a pool has been created
022 *        it must be shutdown to exit JAS with
023 *        <code>ComputerThreads.terminate()</code>.
024 */
025
026public class ComputerThreads {
027
028
029    private static final Logger logger = Logger.getLogger(ComputerThreads.class);
030
031
032    // private boolean debug = logger.isInfoEnabled(); //logger.isInfoEnabled();
033
034
035    /**
036     * Flag for thread usage. <b>Note:</b> Only introduced because Google app
037     * engine does not support threads.
038     * @see edu.jas.ufd.GCDFactory#getProxy(edu.jas.structure.RingFactory)
039     */
040    public static boolean NO_THREADS = false;
041
042
043    /**
044     * Number of processors.
045     */
046    public static final int N_CPUS = Runtime.getRuntime().availableProcessors();
047
048
049    /*
050      * Core number of threads.
051      * N_CPUS x 1.5, x 2, x 2.5, min 3, ?.
052      */
053    public static final int N_THREADS = (N_CPUS < 3 ? 3 : N_CPUS + N_CPUS / 2);
054
055
056    //public static final int N_THREADS = ( N_CPUS < 3 ? 5 : 3*N_CPUS );
057
058
059    /*
060      * Queue capacity.
061      */
062    //public static final int Q_CAPACITY = 1000; // 10000
063
064    /*
065      * WorkQueue.
066      */
067    //private static BlockingQueue<Runnable> workpile; 
068
069    /*
070      * Saturation policy.
071      */
072    //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.CallerRunsPolicy();
073    //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.AbortPolicy();
074
075    /**
076     * ExecutorService thread pool.
077     */
078    //static ThreadPoolExecutor pool = null;
079    static ExecutorService pool = null;
080
081
082    /**
083     * No public constructor.
084     */
085    private ComputerThreads() {
086    }
087
088
089    /**
090     * Test if a pool is running.
091     * @return true if a thread pool has been started or is running, else false.
092     */
093    public static synchronized boolean isRunning() {
094        if (pool == null) {
095            return false;
096        }
097        if (pool.isTerminated() || pool.isShutdown()) {
098            return false;
099        }
100        return true;
101    }
102
103
104    /**
105     * Get the thread pool.
106     * @return pool ExecutorService.
107     */
108    public static synchronized ExecutorService getPool() {
109        if (pool == null) {
110            // workpile = new ArrayBlockingQueue<Runnable>(Q_CAPACITY);
111            //            pool = Executors.newFixedThreadPool(N_THREADS);
112            pool = Executors.newCachedThreadPool();
113            //             pool = new ThreadPoolExecutor(N_CPUS, N_THREADS,
114            //                                           100L, TimeUnit.MILLISECONDS,
115            //                                           workpile, REH);
116            //             pool = new ThreadPoolExecutor(N_CPUS, N_THREADS,
117            //                                           1000L, TimeUnit.MILLISECONDS,
118            //                                           workpile);
119        }
120        //System.out.println("pool_init = " + pool);
121        return pool;
122        //return Executors.unconfigurableExecutorService(pool);
123
124        /* not useful, is not run from jython
125        final GCDProxy<C> proxy = this;
126        Runtime.getRuntime().addShutdownHook( 
127                         new Thread() {
128                             public void run() {
129                                    logger.info("running shutdown hook");
130                                    proxy.terminate();
131                             }
132                         }
133        );
134        */
135    }
136
137
138    /**
139     * Stop execution.
140     */
141    public static synchronized void terminate() {
142        if (pool == null) {
143            return;
144        }
145        if (pool instanceof ThreadPoolExecutor) {
146            ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;
147            //logger.info("task queue size         " + Q_CAPACITY);
148            //logger.info("reject execution handler" + REH.getClass().getName());
149            logger.info("number of CPUs            " + N_CPUS);
150            logger.info("core number of threads    " + N_THREADS);
151            logger.info("current number of threads " + tpe.getPoolSize());
152            logger.info("maximal number of threads " + tpe.getLargestPoolSize());
153            BlockingQueue<Runnable> workpile = tpe.getQueue();
154            if (workpile != null) {
155                logger.info("queued tasks              " + workpile.size());
156            }
157            List<Runnable> r = tpe.shutdownNow();
158            if (r.size() != 0) {
159                logger.info("unfinished tasks          " + r.size());
160            }
161            logger.info("number of sheduled tasks  " + tpe.getTaskCount());
162            logger.info("number of completed tasks " + tpe.getCompletedTaskCount());
163        }
164        pool = null;
165        //workpile = null;
166    }
167
168
169    /**
170     * Set no thread usage.
171     */
172    public static synchronized void setNoThreads() {
173        NO_THREADS = true;
174    }
175
176
177    /**
178     * Set thread usage.
179     */
180    public static synchronized void setThreads() {
181        NO_THREADS = false;
182    }
183
184}