001/* 002 * $Id: ExecutableServer.java 4236 2012-10-04 22:03:47Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.Iterator; 010import java.util.List; 011import java.util.ArrayList; 012 013import org.apache.log4j.Logger; 014import org.apache.log4j.BasicConfigurator; 015 016 017/** 018 * ExecutableServer is used to receive and execute classes. 019 * @author Heinz Kredel 020 */ 021 022public class ExecutableServer extends Thread { 023 024 025 private static final Logger logger = Logger.getLogger(ExecutableServer.class); 026 027 028 private final boolean debug = logger.isDebugEnabled(); 029 030 031 /** 032 * ChannelFactory to use. 033 */ 034 protected final ChannelFactory cf; 035 036 037 /** 038 * List of server threads. 039 */ 040 protected List<Executor> servers = null; 041 042 043 /** 044 * Default port to listen to. 045 */ 046 public static final int DEFAULT_PORT = 7411; 047 048 049 /** 050 * Constant to signal completion. 051 */ 052 public static final String DONE = "Done"; 053 054 055 /** 056 * Constant to request shutdown. 057 */ 058 public static final String STOP = "Stop"; 059 060 061 private volatile boolean goon = true; 062 063 064 private Thread mythread = null; 065 066 067 /** 068 * ExecutableServer on default port. 069 */ 070 public ExecutableServer() { 071 this(DEFAULT_PORT); 072 } 073 074 075 /** 076 * ExecutableServer. 077 * @param port 078 */ 079 public ExecutableServer(int port) { 080 this(new ChannelFactory(port)); 081 } 082 083 084 /** 085 * ExecutableServer. 086 * @param cf channel factory to reuse. 087 */ 088 public ExecutableServer(ChannelFactory cf) { 089 this.cf = cf; 090 cf.init(); 091 servers = new ArrayList<Executor>(); 092 } 093 094 095 /** 096 * main method to start serving thread. 097 * @param args args[0] is port 098 */ 099 public static void main(String[] args) throws InterruptedException { 100 BasicConfigurator.configure(); 101 102 int port = DEFAULT_PORT; 103 if (args.length < 1) { 104 System.out.println("Usage: ExecutableServer <port>"); 105 } else { 106 try { 107 port = Integer.parseInt(args[0]); 108 } catch (NumberFormatException e) { 109 } 110 } 111 //logger.info("ExecutableServer at port " + port); 112 ExecutableServer es = new ExecutableServer(port); 113 es.init(); 114 es.join(); // do not use terminate() 115 // until CRTL-C 116 } 117 118 119 /** 120 * thread initialization and start. 121 */ 122 public void init() { 123 this.start(); 124 logger.info("ExecutableServer at " + cf); 125 } 126 127 128 /** 129 * number of servers. 130 */ 131 public int size() { 132 if ( servers == null ) { 133 return -1; 134 } 135 return servers.size(); 136 } 137 138 139 /** 140 * run is main server method. 141 */ 142 @Override 143 public void run() { 144 SocketChannel channel = null; 145 Executor s = null; 146 mythread = Thread.currentThread(); 147 while (goon) { 148 if (debug) { 149 logger.info("execute server " + this + " go on"); 150 } 151 try { 152 channel = cf.getChannel(); 153 logger.debug("execute channel = " + channel); 154 if (mythread.isInterrupted()) { 155 goon = false; 156 logger.debug("execute server " + this + " interrupted"); 157 channel.close(); 158 } else { 159 s = new Executor(channel); // ---,servers); 160 if (goon) { // better synchronize with terminate 161 servers.add(s); 162 s.start(); 163 logger.debug("server " + s + " started"); 164 } else { 165 s = null; 166 channel.close(); 167 } 168 } 169 } catch (InterruptedException e) { 170 goon = false; 171 Thread.currentThread().interrupt(); 172 if (debug) { 173 e.printStackTrace(); 174 } 175 } 176 } 177 if (debug) { 178 logger.info("execute server " + this + " terminated"); 179 } 180 } 181 182 183 /** 184 * terminate all servers. 185 */ 186 public void terminate() { 187 goon = false; 188 logger.debug("terminating ExecutableServer"); 189 if (cf != null) 190 cf.terminate(); 191 if (servers != null) { 192 Iterator<Executor> it = servers.iterator(); 193 while (it.hasNext()) { 194 Executor x = it.next(); 195 if (x.channel != null) { 196 x.channel.close(); 197 } 198 try { 199 while (x.isAlive()) { 200 //System.out.print("."); 201 x.interrupt(); 202 x.join(100); 203 } 204 logger.debug("server " + x + " terminated"); 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 } 208 } 209 servers = null; 210 } 211 logger.debug("Executors terminated"); 212 if (mythread == null) 213 return; 214 try { 215 while (mythread.isAlive()) { 216 //System.out.print("-"); 217 mythread.interrupt(); 218 mythread.join(100); 219 } 220 //logger.debug("server " + mythread + " terminated"); 221 } catch (InterruptedException e) { 222 Thread.currentThread().interrupt(); 223 } 224 mythread = null; 225 logger.debug("ExecuteServer terminated"); 226 } 227 228 229 /** 230 * String representation. 231 */ 232 @Override 233 public String toString() { 234 StringBuffer s = new StringBuffer("ExecutableServer("); 235 s.append(cf.toString()); 236 s.append(")"); 237 return s.toString(); 238 } 239 240} 241 242 243/** 244 * class for executing incoming objects. 245 */ 246 247class Executor extends Thread /*implements Runnable*/{ 248 249 250 private static final Logger logger = Logger.getLogger(Executor.class); 251 252 private final boolean debug = logger.isDebugEnabled(); 253 254 255 protected final SocketChannel channel; 256 257 258 Executor(SocketChannel s) { 259 channel = s; 260 } 261 262 263 /** 264 * run. 265 */ 266 @Override 267 public void run() { 268 Object o; 269 RemoteExecutable re = null; 270 String d; 271 boolean goon = true; 272 logger.debug("executor started " + this); 273 while (goon) { 274 try { 275 o = channel.receive(); 276 logger.info("receive: " + o + " from " + channel); 277 if (this.isInterrupted()) { 278 goon = false; 279 } else { 280 if (debug) { 281 logger.debug("receive: " + o + " from " + channel); 282 } 283 if (o instanceof String) { 284 d = (String) o; 285 if (ExecutableServer.STOP.equals(d)) { 286 goon = false; // stop this thread 287 channel.send(ExecutableServer.DONE); 288 } else { 289 logger.warn("invalid/unknown String: " + d + " from " + channel); 290 goon = false; // stop this thread ? 291 channel.send(ExecutableServer.DONE); 292 } 293 } 294 // check permission 295 if (o instanceof RemoteExecutable) { 296 re = (RemoteExecutable) o; 297 if (debug) { 298 logger.info("running " + re); 299 } 300 try { 301 re.run(); 302 } catch(Exception e) { 303 logger.info("Exception on re.run()" + e); 304 e.printStackTrace(); 305 } finally { 306 logger.info("finally re.run() " + re); 307 } 308 if (debug) { 309 logger.info("finished " + re); 310 } 311 if (this.isInterrupted()) { 312 goon = false; 313 } else { 314 channel.send(ExecutableServer.DONE); 315 logger.info("finished send " + ExecutableServer.DONE); 316 //goon = false; // stop this thread 317 } 318 } 319 } 320 } catch (IOException e) { 321 goon = false; 322 logger.info("IOException " + e); 323 if (debug) { 324 e.printStackTrace(); 325 } 326 } catch (ClassNotFoundException e) { 327 goon = false; 328 logger.info("ClassNotFoundException " + e); 329 e.printStackTrace(); 330 } finally { 331 logger.info("finally " + this); 332 } 333 } 334 logger.info("executor terminated " + this); 335 channel.close(); 336 } 337 338}