001 /*
002 * $Id: ComputerThreads.java 3295 2010-08-26 17:01:10Z kredel $
003 */
004
005 package edu.jas.kern;
006
007
008 import java.util.List;
009 import java.util.concurrent.BlockingQueue;
010 import java.util.concurrent.ExecutorService;
011 import java.util.concurrent.Executors;
012 import java.util.concurrent.ThreadPoolExecutor;
013
014 import org.apache.log4j.Logger;
015
016
017 /**
018 * ComputerThreads, provides global thread / executor service.
019 * @author Heinz Kredel
020 * @usage To obtain a reference to the thread pool use
021 * <code>ComputerThreads.getPool()</code>. Once a pool has been created
022 * it must be shutdown to exit JAS with
023 * <code>ComputerThreads.terminate()</code>.
024 */
025
026 public class ComputerThreads {
027
028
029 private static final Logger logger = Logger.getLogger(ComputerThreads.class);
030
031
032 // private boolean debug = logger.isInfoEnabled(); //logger.isInfoEnabled();
033
034
035 /**
036 * Flag for thread usage. <b>Note:</b> Only introduced because Google app
037 * engine does not support threads.
038 * @see edu.jas.ufd.GCDFactory#getProxy(edu.jas.structure.RingFactory)
039 */
040 public static boolean NO_THREADS = false;
041
042
043 /**
044 * Number of processors.
045 */
046 public static final int N_CPUS = Runtime.getRuntime().availableProcessors();
047
048
049 /*
050 * Core number of threads.
051 * N_CPUS x 1.5, x 2, x 2.5, min 3, ?.
052 */
053 public static final int N_THREADS = (N_CPUS < 3 ? 3 : N_CPUS + N_CPUS / 2);
054
055
056 //public static final int N_THREADS = ( N_CPUS < 3 ? 5 : 3*N_CPUS );
057
058
059 /*
060 * Queue capacity.
061 */
062 //public static final int Q_CAPACITY = 1000; // 10000
063
064 /*
065 * WorkQueue.
066 */
067 //private static BlockingQueue<Runnable> workpile;
068
069 /*
070 * Saturation policy.
071 */
072 //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.CallerRunsPolicy();
073 //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.AbortPolicy();
074
075 /**
076 * ExecutorService thread pool.
077 */
078 //static ThreadPoolExecutor pool = null;
079 static ExecutorService pool = null;
080
081
082 /**
083 * No public constructor.
084 */
085 private ComputerThreads() {
086 }
087
088
089 /**
090 * Test if a pool is running.
091 * @return true if a thread pool has been started or is running, else false.
092 */
093 public static synchronized boolean isRunning() {
094 if (pool == null) {
095 return false;
096 }
097 if (pool.isTerminated() || pool.isShutdown()) {
098 return false;
099 }
100 return true;
101 }
102
103
104 /**
105 * Get the thread pool.
106 * @return pool ExecutorService.
107 */
108 public static synchronized ExecutorService getPool() {
109 if (pool == null) {
110 // workpile = new ArrayBlockingQueue<Runnable>(Q_CAPACITY);
111 // pool = Executors.newFixedThreadPool(N_THREADS);
112 pool = Executors.newCachedThreadPool();
113 // pool = new ThreadPoolExecutor(N_CPUS, N_THREADS,
114 // 100L, TimeUnit.MILLISECONDS,
115 // workpile, REH);
116 // pool = new ThreadPoolExecutor(N_CPUS, N_THREADS,
117 // 1000L, TimeUnit.MILLISECONDS,
118 // workpile);
119 }
120 //System.out.println("pool_init = " + pool);
121 return pool;
122 //return Executors.unconfigurableExecutorService(pool);
123
124 /* not useful, is not run from jython
125 final GCDProxy<C> proxy = this;
126 Runtime.getRuntime().addShutdownHook(
127 new Thread() {
128 public void run() {
129 logger.info("running shutdown hook");
130 proxy.terminate();
131 }
132 }
133 );
134 */
135 }
136
137
138 /**
139 * Stop execution.
140 */
141 public static synchronized void terminate() {
142 if (pool == null) {
143 return;
144 }
145 if (pool instanceof ThreadPoolExecutor) {
146 ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;
147 //logger.info("task queue size " + Q_CAPACITY);
148 //logger.info("reject execution handler" + REH.getClass().getName());
149 logger.info("number of CPUs " + N_CPUS);
150 logger.info("core number of threads " + N_THREADS);
151 logger.info("current number of threads " + tpe.getPoolSize());
152 logger.info("maximal number of threads " + tpe.getLargestPoolSize());
153 BlockingQueue<Runnable> workpile = tpe.getQueue();
154 if (workpile != null) {
155 logger.info("queued tasks " + workpile.size());
156 }
157 List<Runnable> r = tpe.shutdownNow();
158 if (r.size() != 0) {
159 logger.info("unfinished tasks " + r.size());
160 }
161 logger.info("number of sheduled tasks " + tpe.getTaskCount());
162 logger.info("number of completed tasks " + tpe.getCompletedTaskCount());
163 }
164 pool = null;
165 //workpile = null;
166 }
167
168
169 /**
170 * Set no thread usage.
171 */
172 public static synchronized void setNoThreads() {
173 NO_THREADS = true;
174 }
175
176
177 /**
178 * Set thread usage.
179 */
180 public static synchronized void setThreads() {
181 NO_THREADS = false;
182 }
183
184 }