001 /* 002 * $Id: ExecutableServer.java,v 1.1 2004/09/18 20:28:07 heinz Exp $ 003 */ 004 005 //package edu.jas.util; 006 package comm; 007 008 import util.Logger; 009 010 import java.io.IOException; 011 import java.util.Iterator; 012 import java.util.List; 013 import java.util.ArrayList; 014 015 //import org.apache.log4j.Logger; 016 017 //import edu.unima.ky.parallel.ChannelFactory; 018 //import edu.unima.ky.parallel.SocketChannel; 019 020 /** 021 * Class ExecutableServer 022 * Used to receive and execute objects. 023 * @author Heinz Kredel. 024 */ 025 public class ExecutableServer extends Thread { 026 027 // private static Logger logger = Logger.getLogger(ExecutableServer.class); 028 private static Logger logger = new Logger(); 029 030 /** 031 * DEFAULT_PORT to listen to. 032 */ 033 public static final int DEFAULT_PORT = 7411; 034 public static final String DONE = "Done"; 035 036 protected final ChannelFactory cf; 037 protected ArrayList servers = null; 038 039 private boolean goon = true; 040 private Thread mythread = null; 041 042 public ExecutableServer() { 043 this(DEFAULT_PORT); 044 } 045 046 /** 047 * @param port to listen on. 048 */ 049 public ExecutableServer(int port) { 050 this( new ChannelFactory(port) ); 051 } 052 053 /** 054 * @param cf (re)use this ChannelFactory. 055 */ 056 public ExecutableServer(ChannelFactory cf) { 057 this.cf = cf; 058 servers = new ArrayList(); 059 } 060 061 062 /** 063 * Main method to start serving thread. 064 * @param args args[0] is port. 065 */ 066 public static void main(String[] args) { 067 068 int port = DEFAULT_PORT; 069 if ( args.length < 1 ) { 070 System.out.println("Usage: ExecutableServer <port>"); 071 } else { 072 try { 073 port = Integer.parseInt( args[0] ); 074 } catch (NumberFormatException e) { 075 } 076 } 077 System.out.println("ExecutableServer at port " + port); 078 (new ExecutableServer(port)).run(); 079 // until CRTL-C 080 } 081 082 083 /** 084 * Thread initialization and start. 085 */ 086 public void init() { 087 this.start(); 088 } 089 090 /** 091 * Number of servers. 092 */ 093 public int size() { 094 return servers.size(); 095 } 096 097 /** 098 * Accept channels and setup of executor threads. 099 */ 100 public void run() { 101 SocketChannel channel = null; 102 Executor s = null; 103 mythread = Thread.currentThread(); 104 while (goon) { 105 logger.debug("execute server " + this + " go on"); 106 try { 107 channel = cf.getChannel(); 108 logger.debug("execute channel = "+channel); 109 if ( mythread.isInterrupted() ) { 110 goon = false; 111 //logger.info("execute server " + this + " interrupted"); 112 } else { 113 s = new Executor(channel,servers); 114 servers.add( s ); 115 s.start(); 116 logger.debug("server " + s + " started"); 117 } 118 } catch (InterruptedException e) { 119 goon = false; 120 e.printStackTrace(); 121 } 122 } 123 logger.debug("execute server " + this + " terminated"); 124 } 125 126 127 /** 128 * Terminate all servers. 129 */ 130 public void terminate() { 131 goon = false; 132 logger.debug("terminating ExecutableServer"); 133 if ( cf != null ) cf.terminate(); 134 if ( servers != null ) { 135 Iterator it = servers.iterator(); 136 while ( it.hasNext() ) { 137 Executor x = (Executor) it.next(); 138 x.channel.close(); 139 try { 140 while ( x.isAlive() ) { 141 //System.out.print("."); 142 x.interrupt(); 143 x.join(100); 144 } 145 logger.debug("server " + x + " terminated"); 146 } catch (InterruptedException e) { 147 } 148 } 149 servers = null; 150 } 151 logger.debug("Executors terminated"); 152 if ( mythread == null ) return; 153 try { 154 while ( mythread.isAlive() ) { 155 //System.out.print("-"); 156 mythread.interrupt(); 157 mythread.join(100); 158 } 159 //logger.debug("server " + mythread + " terminated"); 160 } catch (InterruptedException e) { 161 } 162 logger.debug("ExecuteServer terminated"); 163 } 164 165 } 166 167 168 /** 169 * Class for executing incoming objects. 170 */ 171 class Executor extends Thread /*implements Runnable*/ { 172 173 //private static Logger logger = Logger.getLogger(Executor.class); 174 private static Logger logger = new Logger(); 175 176 protected final SocketChannel channel; 177 private List list; 178 179 Executor(SocketChannel s, List p) { 180 channel = s; 181 list = p; 182 } 183 184 public void run() { 185 Object o; 186 RemoteExecutable re = null; 187 boolean goon = true; 188 logger.debug("executor started "+this); 189 while (goon) { 190 try { 191 o = channel.receive(); 192 if ( this.isInterrupted() ) { 193 goon = false; 194 } else { 195 System.out.println("receive: "+o+" from "+channel); 196 // check permission 197 if ( o instanceof RemoteExecutable ) { 198 re = (RemoteExecutable)o; 199 re.run(); 200 if ( this.isInterrupted() ) { 201 goon = false; 202 } else { 203 channel.send(ExecutableServer.DONE); 204 goon = false; // stop this thread 205 } 206 } 207 } 208 } catch (IOException e) { 209 e.printStackTrace(); 210 goon = false; 211 } catch (ClassNotFoundException e) { 212 e.printStackTrace(); 213 goon = false; 214 } 215 } 216 logger.debug("executor terminated "+this); 217 channel.close(); 218 } 219 220 }