001 /* 002 * $Id: ExecutableServer.java 3320 2010-09-12 11:01:57Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 008 import java.io.IOException; 009 import java.util.Iterator; 010 import java.util.List; 011 import java.util.ArrayList; 012 013 import org.apache.log4j.Logger; 014 import org.apache.log4j.BasicConfigurator; 015 016 017 /** 018 * ExecutableServer is used to receive and execute classes. 019 * @author Heinz Kredel 020 */ 021 022 public class ExecutableServer extends Thread { 023 024 025 private static final Logger logger = Logger.getLogger(ExecutableServer.class); 026 027 028 private final boolean debug = logger.isDebugEnabled(); 029 030 031 /** 032 * ChannelFactory to use. 033 */ 034 protected final ChannelFactory cf; 035 036 037 /** 038 * List of server threads. 039 */ 040 protected List<Executor> servers = null; 041 042 043 /** 044 * Default port to listen to. 045 */ 046 public static final int DEFAULT_PORT = 7411; 047 048 049 /** 050 * Constant to signal completion. 051 */ 052 public static final String DONE = "Done"; 053 054 055 /** 056 * Constant to request shutdown. 057 */ 058 public static final String STOP = "Stop"; 059 060 061 private volatile boolean goon = true; 062 063 064 private Thread mythread = null; 065 066 067 /** 068 * ExecutableServer on default port. 069 */ 070 public ExecutableServer() { 071 this(DEFAULT_PORT); 072 } 073 074 075 /** 076 * ExecutableServer. 077 * @param port 078 */ 079 public ExecutableServer(int port) { 080 this(new ChannelFactory(port)); 081 } 082 083 084 /** 085 * ExecutableServer. 086 * @param cf channel factory to reuse. 087 */ 088 public ExecutableServer(ChannelFactory cf) { 089 this.cf = cf; 090 cf.init(); 091 servers = new ArrayList<Executor>(); 092 } 093 094 095 /** 096 * main method to start serving thread. 097 * @param args args[0] is port 098 */ 099 public static void main(String[] args) { 100 BasicConfigurator.configure(); 101 102 int port = DEFAULT_PORT; 103 if (args.length < 1) { 104 System.out.println("Usage: ExecutableServer <port>"); 105 } else { 106 try { 107 port = Integer.parseInt(args[0]); 108 } catch (NumberFormatException e) { 109 } 110 } 111 logger.info("ExecutableServer at port " + port); 112 (new ExecutableServer(port)).run(); 113 // until CRTL-C 114 } 115 116 117 /** 118 * thread initialization and start. 119 */ 120 public void init() { 121 this.start(); 122 logger.info("ExecutableServer at " + cf); 123 } 124 125 126 /** 127 * number of servers. 128 */ 129 public int size() { 130 return servers.size(); 131 } 132 133 134 /** 135 * run is main server method. 136 */ 137 @Override 138 public void run() { 139 SocketChannel channel = null; 140 Executor s = null; 141 mythread = Thread.currentThread(); 142 while (goon) { 143 if (debug) { 144 logger.info("execute server " + this + " go on"); 145 } 146 try { 147 channel = cf.getChannel(); 148 logger.debug("execute channel = " + channel); 149 if (mythread.isInterrupted()) { 150 goon = false; 151 logger.debug("execute server " + this + " interrupted"); 152 channel.close(); 153 } else { 154 s = new Executor(channel); // ---,servers); 155 if (goon) { // better synchronize with terminate 156 servers.add(s); 157 s.start(); 158 logger.debug("server " + s + " started"); 159 } else { 160 s = null; 161 channel.close(); 162 } 163 } 164 } catch (InterruptedException e) { 165 goon = false; 166 Thread.currentThread().interrupt(); 167 if (logger.isDebugEnabled()) { 168 e.printStackTrace(); 169 } 170 } 171 } 172 if (debug) { 173 logger.info("execute server " + this + " terminated"); 174 } 175 } 176 177 178 /** 179 * terminate all servers. 180 */ 181 public void terminate() { 182 goon = false; 183 logger.debug("terminating ExecutableServer"); 184 if (cf != null) 185 cf.terminate(); 186 if (servers != null) { 187 Iterator<Executor> it = servers.iterator(); 188 while (it.hasNext()) { 189 Executor x = it.next(); 190 if (x.channel != null) { 191 x.channel.close(); 192 } 193 try { 194 while (x.isAlive()) { 195 //System.out.print("."); 196 x.interrupt(); 197 x.join(100); 198 } 199 logger.debug("server " + x + " terminated"); 200 } catch (InterruptedException e) { 201 Thread.currentThread().interrupt(); 202 } 203 } 204 servers = null; 205 } 206 logger.debug("Executors terminated"); 207 if (mythread == null) 208 return; 209 try { 210 while (mythread.isAlive()) { 211 //System.out.print("-"); 212 mythread.interrupt(); 213 mythread.join(100); 214 } 215 //logger.debug("server " + mythread + " terminated"); 216 } catch (InterruptedException e) { 217 Thread.currentThread().interrupt(); 218 } 219 mythread = null; 220 logger.debug("ExecuteServer terminated"); 221 } 222 223 } 224 225 226 /** 227 * class for executing incoming objects. 228 */ 229 230 class Executor extends Thread /*implements Runnable*/{ 231 232 233 private static final Logger logger = Logger.getLogger(Executor.class); 234 235 private final boolean debug = logger.isInfoEnabled(); 236 237 238 protected final SocketChannel channel; 239 240 241 Executor(SocketChannel s) { 242 channel = s; 243 } 244 245 246 /** 247 * run. 248 */ 249 @Override 250 public void run() { 251 Object o; 252 RemoteExecutable re = null; 253 String d; 254 boolean goon = true; 255 logger.debug("executor started " + this); 256 while (goon) { 257 try { 258 o = channel.receive(); 259 logger.info("receive: " + o + " from " + channel); 260 if (this.isInterrupted()) { 261 goon = false; 262 } else { 263 if (logger.isDebugEnabled()) { 264 logger.debug("receive: " + o + " from " + channel); 265 } 266 if (o instanceof String) { 267 d = (String) o; 268 if (ExecutableServer.STOP.equals(d)) { 269 goon = false; // stop this thread 270 channel.send(ExecutableServer.DONE); 271 } else { 272 goon = false; // stop this thread 273 channel.send(ExecutableServer.DONE); 274 } 275 } 276 // check permission 277 if (o instanceof RemoteExecutable) { 278 re = (RemoteExecutable) o; 279 if ( debug ) { 280 logger.info("running " + re); 281 } 282 try { 283 re.run(); 284 } catch(Exception e) { 285 e.printStackTrace(); 286 logger.info("Exception on re.run()", e); 287 } finally { 288 logger.info("finally re.run() " + re); 289 } 290 if ( debug ) { 291 logger.info("finished " + re); 292 } 293 if (this.isInterrupted()) { 294 goon = false; 295 } else { 296 channel.send(ExecutableServer.DONE); 297 logger.info("finished send " + ExecutableServer.DONE); 298 //goon = false; // stop this thread 299 } 300 } 301 } 302 } catch (IOException e) { 303 goon = false; 304 //e.printStackTrace(); 305 logger.info("IOException ", e); 306 } catch (ClassNotFoundException e) { 307 goon = false; 308 e.printStackTrace(); 309 logger.info("ClassNotFoundException ", e); 310 } finally { 311 logger.info("finally " + this); 312 } 313 } 314 logger.info("executor terminated " + this); 315 channel.close(); 316 } 317 318 }