001 /* 002 * $Id: ComputerThreads.java 3295 2010-08-26 17:01:10Z kredel $ 003 */ 004 005 package edu.jas.kern; 006 007 008 import java.util.List; 009 import java.util.concurrent.BlockingQueue; 010 import java.util.concurrent.ExecutorService; 011 import java.util.concurrent.Executors; 012 import java.util.concurrent.ThreadPoolExecutor; 013 014 import org.apache.log4j.Logger; 015 016 017 /** 018 * ComputerThreads, provides global thread / executor service. 019 * @author Heinz Kredel 020 * @usage To obtain a reference to the thread pool use 021 * <code>ComputerThreads.getPool()</code>. Once a pool has been created 022 * it must be shutdown to exit JAS with 023 * <code>ComputerThreads.terminate()</code>. 024 */ 025 026 public class ComputerThreads { 027 028 029 private static final Logger logger = Logger.getLogger(ComputerThreads.class); 030 031 032 // private boolean debug = logger.isInfoEnabled(); //logger.isInfoEnabled(); 033 034 035 /** 036 * Flag for thread usage. <b>Note:</b> Only introduced because Google app 037 * engine does not support threads. 038 * @see edu.jas.ufd.GCDFactory#getProxy(edu.jas.structure.RingFactory) 039 */ 040 public static boolean NO_THREADS = false; 041 042 043 /** 044 * Number of processors. 045 */ 046 public static final int N_CPUS = Runtime.getRuntime().availableProcessors(); 047 048 049 /* 050 * Core number of threads. 051 * N_CPUS x 1.5, x 2, x 2.5, min 3, ?. 052 */ 053 public static final int N_THREADS = (N_CPUS < 3 ? 3 : N_CPUS + N_CPUS / 2); 054 055 056 //public static final int N_THREADS = ( N_CPUS < 3 ? 5 : 3*N_CPUS ); 057 058 059 /* 060 * Queue capacity. 061 */ 062 //public static final int Q_CAPACITY = 1000; // 10000 063 064 /* 065 * WorkQueue. 066 */ 067 //private static BlockingQueue<Runnable> workpile; 068 069 /* 070 * Saturation policy. 071 */ 072 //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.CallerRunsPolicy(); 073 //public static final RejectedExecutionHandler REH = new ThreadPoolExecutor.AbortPolicy(); 074 075 /** 076 * ExecutorService thread pool. 077 */ 078 //static ThreadPoolExecutor pool = null; 079 static ExecutorService pool = null; 080 081 082 /** 083 * No public constructor. 084 */ 085 private ComputerThreads() { 086 } 087 088 089 /** 090 * Test if a pool is running. 091 * @return true if a thread pool has been started or is running, else false. 092 */ 093 public static synchronized boolean isRunning() { 094 if (pool == null) { 095 return false; 096 } 097 if (pool.isTerminated() || pool.isShutdown()) { 098 return false; 099 } 100 return true; 101 } 102 103 104 /** 105 * Get the thread pool. 106 * @return pool ExecutorService. 107 */ 108 public static synchronized ExecutorService getPool() { 109 if (pool == null) { 110 // workpile = new ArrayBlockingQueue<Runnable>(Q_CAPACITY); 111 // pool = Executors.newFixedThreadPool(N_THREADS); 112 pool = Executors.newCachedThreadPool(); 113 // pool = new ThreadPoolExecutor(N_CPUS, N_THREADS, 114 // 100L, TimeUnit.MILLISECONDS, 115 // workpile, REH); 116 // pool = new ThreadPoolExecutor(N_CPUS, N_THREADS, 117 // 1000L, TimeUnit.MILLISECONDS, 118 // workpile); 119 } 120 //System.out.println("pool_init = " + pool); 121 return pool; 122 //return Executors.unconfigurableExecutorService(pool); 123 124 /* not useful, is not run from jython 125 final GCDProxy<C> proxy = this; 126 Runtime.getRuntime().addShutdownHook( 127 new Thread() { 128 public void run() { 129 logger.info("running shutdown hook"); 130 proxy.terminate(); 131 } 132 } 133 ); 134 */ 135 } 136 137 138 /** 139 * Stop execution. 140 */ 141 public static synchronized void terminate() { 142 if (pool == null) { 143 return; 144 } 145 if (pool instanceof ThreadPoolExecutor) { 146 ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool; 147 //logger.info("task queue size " + Q_CAPACITY); 148 //logger.info("reject execution handler" + REH.getClass().getName()); 149 logger.info("number of CPUs " + N_CPUS); 150 logger.info("core number of threads " + N_THREADS); 151 logger.info("current number of threads " + tpe.getPoolSize()); 152 logger.info("maximal number of threads " + tpe.getLargestPoolSize()); 153 BlockingQueue<Runnable> workpile = tpe.getQueue(); 154 if (workpile != null) { 155 logger.info("queued tasks " + workpile.size()); 156 } 157 List<Runnable> r = tpe.shutdownNow(); 158 if (r.size() != 0) { 159 logger.info("unfinished tasks " + r.size()); 160 } 161 logger.info("number of sheduled tasks " + tpe.getTaskCount()); 162 logger.info("number of completed tasks " + tpe.getCompletedTaskCount()); 163 } 164 pool = null; 165 //workpile = null; 166 } 167 168 169 /** 170 * Set no thread usage. 171 */ 172 public static synchronized void setNoThreads() { 173 NO_THREADS = true; 174 } 175 176 177 /** 178 * Set thread usage. 179 */ 180 public static synchronized void setThreads() { 181 NO_THREADS = false; 182 } 183 184 }