001/* 002 * $Id: ExecutableServer.java 5940 2018-10-19 08:53:13Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.Iterator; 010import java.util.List; 011import java.util.ArrayList; 012 013import org.apache.logging.log4j.Logger; 014import org.apache.logging.log4j.LogManager; 015 016 017/** 018 * ExecutableServer is used to receive and execute classes. 019 * @author Heinz Kredel 020 */ 021 022public class ExecutableServer extends Thread { 023 024 025 private static final Logger logger = LogManager.getLogger(ExecutableServer.class); 026 027 028 private static final boolean debug = logger.isDebugEnabled(); 029 030 031 /** 032 * ChannelFactory to use. 033 */ 034 protected final ChannelFactory cf; 035 036 037 /** 038 * List of server threads. 039 */ 040 protected List<Executor> servers = null; 041 042 043 /** 044 * Default port to listen to. 045 */ 046 public static final int DEFAULT_PORT = 7411; 047 048 049 /** 050 * Constant to signal completion. 051 */ 052 public static final String DONE = "Done"; 053 054 055 /** 056 * Constant to request shutdown. 057 */ 058 public static final String STOP = "Stop"; 059 060 061 private volatile boolean goon = true; 062 063 064 private volatile Thread mythread = null; 065 066 067 /** 068 * ExecutableServer on default port. 069 */ 070 public ExecutableServer() { 071 this(DEFAULT_PORT); 072 } 073 074 075 /** 076 * ExecutableServer. 077 * @param port 078 */ 079 public ExecutableServer(int port) { 080 this(new ChannelFactory(port)); 081 } 082 083 084 /** 085 * ExecutableServer. 086 * @param cf channel factory to reuse. 087 */ 088 public ExecutableServer(ChannelFactory cf) { 089 this.cf = cf; 090 cf.init(); 091 servers = new ArrayList<Executor>(); 092 } 093 094 095 /** 096 * main method to start serving thread. 097 * @param args args[0] is port 098 */ 099 public static void main(String[] args) throws InterruptedException { 100 int port = DEFAULT_PORT; 101 if (args.length < 1) { 102 System.out.println("Usage: ExecutableServer <port>"); 103 } else { 104 try { 105 port = Integer.parseInt(args[0]); 106 } catch (NumberFormatException e) { 107 } 108 } 109 //logger.info("ExecutableServer at port " + port); 110 ExecutableServer es = new ExecutableServer(port); 111 es.init(); 112 es.join(); // do not use terminate() 113 // until CRTL-C 114 } 115 116 117 /** 118 * thread initialization and start. 119 */ 120 public void init() { 121 this.start(); 122 logger.info("ExecutableServer at " + cf); 123 } 124 125 126 /** 127 * number of servers. 128 */ 129 public int size() { 130 if ( servers == null ) { 131 return -1; 132 } 133 return servers.size(); 134 } 135 136 137 /** 138 * run is main server method. 139 */ 140 @Override 141 public void run() { 142 SocketChannel channel = null; 143 Executor s = null; 144 mythread = Thread.currentThread(); 145 while (goon) { 146 if (debug) { 147 logger.info("server " + this + " go on"); 148 } 149 try { 150 channel = cf.getChannel(); 151 logger.info("channel = " + channel); 152 if (mythread.isInterrupted()) { 153 goon = false; 154 logger.debug("execute server " + this + " interrupted"); 155 channel.close(); 156 } else { 157 s = new Executor(channel); // ---,servers); 158 if (goon) { // better synchronize with terminate 159 servers.add(s); 160 s.start(); 161 logger.debug("server " + s + " started"); 162 } else { 163 s = null; 164 channel.close(); 165 } 166 } 167 } catch (InterruptedException e) { 168 goon = false; 169 Thread.currentThread().interrupt(); 170 if (debug) { 171 e.printStackTrace(); 172 } 173 } 174 } 175 if (debug) { 176 logger.info("server " + this + " terminated"); 177 } 178 } 179 180 181 /** 182 * terminate all servers. 183 */ 184 public void terminate() { 185 goon = false; 186 logger.info("terminating ExecutableServer"); 187 if (cf != null) 188 cf.terminate(); 189 if (servers != null) { 190 Iterator<Executor> it = servers.iterator(); 191 while (it.hasNext()) { 192 Executor x = it.next(); 193 if (x.channel != null) { 194 x.channel.close(); 195 } 196 try { 197 while (x.isAlive()) { 198 //System.out.print("."); 199 x.interrupt(); 200 x.join(100); 201 } 202 logger.debug("server " + x + " terminated"); 203 } catch (InterruptedException e) { 204 Thread.currentThread().interrupt(); 205 } 206 } 207 servers = null; 208 } 209 logger.info("Executors terminated"); 210 if (mythread == null) 211 return; 212 try { 213 while (mythread.isAlive()) { 214 //System.out.print("-"); 215 mythread.interrupt(); 216 mythread.join(100); 217 } 218 //logger.debug("server " + mythread + " terminated"); 219 } catch (InterruptedException e) { 220 Thread.currentThread().interrupt(); 221 } 222 mythread = null; 223 logger.info("terminated"); 224 } 225 226 227 /** 228 * String representation. 229 */ 230 @Override 231 public String toString() { 232 StringBuffer s = new StringBuffer("ExecutableServer("); 233 s.append(cf.toString()); 234 s.append(")"); 235 return s.toString(); 236 } 237 238} 239 240 241/** 242 * class for executing incoming objects. 243 */ 244 245class Executor extends Thread /*implements Runnable*/{ 246 247 248 private static final Logger logger = LogManager.getLogger(Executor.class); 249 250 private static final boolean debug = logger.isDebugEnabled(); 251 252 253 protected final SocketChannel channel; 254 255 256 Executor(SocketChannel s) { 257 channel = s; 258 } 259 260 261 /** 262 * run. 263 */ 264 @Override 265 public void run() { 266 Object o; 267 RemoteExecutable re = null; 268 String d; 269 boolean goon = true; 270 logger.debug("executor started " + this); 271 while (goon) { 272 try { 273 o = channel.receive(); 274 logger.info("receive: " + o + " from " + channel); 275 if (this.isInterrupted()) { 276 goon = false; 277 } else { 278 if (debug) { 279 logger.debug("receive: " + o + " from " + channel); 280 } 281 if (o instanceof String) { 282 d = (String) o; 283 if (ExecutableServer.STOP.equals(d)) { 284 goon = false; // stop this thread 285 channel.send(ExecutableServer.DONE); 286 } else { 287 logger.warn("invalid/unknown String: " + d + " from " + channel); 288 goon = false; // stop this thread ? 289 channel.send(ExecutableServer.DONE); 290 } 291 } 292 // check permission 293 if (o instanceof RemoteExecutable) { 294 re = (RemoteExecutable) o; 295 if (debug) { 296 logger.info("running " + re); 297 } 298 try { 299 re.run(); 300 } catch(Exception e) { 301 logger.info("Exception on re.run()" + e); 302 if (logger.isInfoEnabled()) { 303 e.printStackTrace(); 304 } 305 } finally { 306 logger.info("finally re.run() " + re); 307 } 308 if (debug) { 309 logger.info("finished " + re); 310 } 311 if (this.isInterrupted()) { 312 goon = false; 313 } else { 314 channel.send(ExecutableServer.DONE); 315 logger.info("finished send " + ExecutableServer.DONE); 316 //goon = false; // stop this thread 317 } 318 } 319 } 320 } catch (IOException e) { 321 goon = false; 322 logger.info("IOException " + e); 323 if (debug) { 324 e.printStackTrace(); 325 } 326 } catch (ClassNotFoundException e) { 327 goon = false; 328 logger.info("ClassNotFoundException " + e); 329 e.printStackTrace(); 330 } finally { 331 logger.info("finally " + this); 332 } 333 } 334 channel.close(); 335 logger.info("terminated " + this); 336 } 337 338}