001 /*
002 * $Id: ExecutableServer.java 3320 2010-09-12 11:01:57Z kredel $
003 */
004
005 package edu.jas.util;
006
007
008 import java.io.IOException;
009 import java.util.Iterator;
010 import java.util.List;
011 import java.util.ArrayList;
012
013 import org.apache.log4j.Logger;
014 import org.apache.log4j.BasicConfigurator;
015
016
017 /**
018 * ExecutableServer is used to receive and execute classes.
019 * @author Heinz Kredel
020 */
021
022 public class ExecutableServer extends Thread {
023
024
025 private static final Logger logger = Logger.getLogger(ExecutableServer.class);
026
027
028 private final boolean debug = logger.isDebugEnabled();
029
030
031 /**
032 * ChannelFactory to use.
033 */
034 protected final ChannelFactory cf;
035
036
037 /**
038 * List of server threads.
039 */
040 protected List<Executor> servers = null;
041
042
043 /**
044 * Default port to listen to.
045 */
046 public static final int DEFAULT_PORT = 7411;
047
048
049 /**
050 * Constant to signal completion.
051 */
052 public static final String DONE = "Done";
053
054
055 /**
056 * Constant to request shutdown.
057 */
058 public static final String STOP = "Stop";
059
060
061 private volatile boolean goon = true;
062
063
064 private Thread mythread = null;
065
066
067 /**
068 * ExecutableServer on default port.
069 */
070 public ExecutableServer() {
071 this(DEFAULT_PORT);
072 }
073
074
075 /**
076 * ExecutableServer.
077 * @param port
078 */
079 public ExecutableServer(int port) {
080 this(new ChannelFactory(port));
081 }
082
083
084 /**
085 * ExecutableServer.
086 * @param cf channel factory to reuse.
087 */
088 public ExecutableServer(ChannelFactory cf) {
089 this.cf = cf;
090 cf.init();
091 servers = new ArrayList<Executor>();
092 }
093
094
095 /**
096 * main method to start serving thread.
097 * @param args args[0] is port
098 */
099 public static void main(String[] args) {
100 BasicConfigurator.configure();
101
102 int port = DEFAULT_PORT;
103 if (args.length < 1) {
104 System.out.println("Usage: ExecutableServer <port>");
105 } else {
106 try {
107 port = Integer.parseInt(args[0]);
108 } catch (NumberFormatException e) {
109 }
110 }
111 logger.info("ExecutableServer at port " + port);
112 (new ExecutableServer(port)).run();
113 // until CRTL-C
114 }
115
116
117 /**
118 * thread initialization and start.
119 */
120 public void init() {
121 this.start();
122 logger.info("ExecutableServer at " + cf);
123 }
124
125
126 /**
127 * number of servers.
128 */
129 public int size() {
130 return servers.size();
131 }
132
133
134 /**
135 * run is main server method.
136 */
137 @Override
138 public void run() {
139 SocketChannel channel = null;
140 Executor s = null;
141 mythread = Thread.currentThread();
142 while (goon) {
143 if (debug) {
144 logger.info("execute server " + this + " go on");
145 }
146 try {
147 channel = cf.getChannel();
148 logger.debug("execute channel = " + channel);
149 if (mythread.isInterrupted()) {
150 goon = false;
151 logger.debug("execute server " + this + " interrupted");
152 channel.close();
153 } else {
154 s = new Executor(channel); // ---,servers);
155 if (goon) { // better synchronize with terminate
156 servers.add(s);
157 s.start();
158 logger.debug("server " + s + " started");
159 } else {
160 s = null;
161 channel.close();
162 }
163 }
164 } catch (InterruptedException e) {
165 goon = false;
166 Thread.currentThread().interrupt();
167 if (logger.isDebugEnabled()) {
168 e.printStackTrace();
169 }
170 }
171 }
172 if (debug) {
173 logger.info("execute server " + this + " terminated");
174 }
175 }
176
177
178 /**
179 * terminate all servers.
180 */
181 public void terminate() {
182 goon = false;
183 logger.debug("terminating ExecutableServer");
184 if (cf != null)
185 cf.terminate();
186 if (servers != null) {
187 Iterator<Executor> it = servers.iterator();
188 while (it.hasNext()) {
189 Executor x = it.next();
190 if (x.channel != null) {
191 x.channel.close();
192 }
193 try {
194 while (x.isAlive()) {
195 //System.out.print(".");
196 x.interrupt();
197 x.join(100);
198 }
199 logger.debug("server " + x + " terminated");
200 } catch (InterruptedException e) {
201 Thread.currentThread().interrupt();
202 }
203 }
204 servers = null;
205 }
206 logger.debug("Executors terminated");
207 if (mythread == null)
208 return;
209 try {
210 while (mythread.isAlive()) {
211 //System.out.print("-");
212 mythread.interrupt();
213 mythread.join(100);
214 }
215 //logger.debug("server " + mythread + " terminated");
216 } catch (InterruptedException e) {
217 Thread.currentThread().interrupt();
218 }
219 mythread = null;
220 logger.debug("ExecuteServer terminated");
221 }
222
223 }
224
225
226 /**
227 * class for executing incoming objects.
228 */
229
230 class Executor extends Thread /*implements Runnable*/{
231
232
233 private static final Logger logger = Logger.getLogger(Executor.class);
234
235 private final boolean debug = logger.isInfoEnabled();
236
237
238 protected final SocketChannel channel;
239
240
241 Executor(SocketChannel s) {
242 channel = s;
243 }
244
245
246 /**
247 * run.
248 */
249 @Override
250 public void run() {
251 Object o;
252 RemoteExecutable re = null;
253 String d;
254 boolean goon = true;
255 logger.debug("executor started " + this);
256 while (goon) {
257 try {
258 o = channel.receive();
259 logger.info("receive: " + o + " from " + channel);
260 if (this.isInterrupted()) {
261 goon = false;
262 } else {
263 if (logger.isDebugEnabled()) {
264 logger.debug("receive: " + o + " from " + channel);
265 }
266 if (o instanceof String) {
267 d = (String) o;
268 if (ExecutableServer.STOP.equals(d)) {
269 goon = false; // stop this thread
270 channel.send(ExecutableServer.DONE);
271 } else {
272 goon = false; // stop this thread
273 channel.send(ExecutableServer.DONE);
274 }
275 }
276 // check permission
277 if (o instanceof RemoteExecutable) {
278 re = (RemoteExecutable) o;
279 if ( debug ) {
280 logger.info("running " + re);
281 }
282 try {
283 re.run();
284 } catch(Exception e) {
285 e.printStackTrace();
286 logger.info("Exception on re.run()", e);
287 } finally {
288 logger.info("finally re.run() " + re);
289 }
290 if ( debug ) {
291 logger.info("finished " + re);
292 }
293 if (this.isInterrupted()) {
294 goon = false;
295 } else {
296 channel.send(ExecutableServer.DONE);
297 logger.info("finished send " + ExecutableServer.DONE);
298 //goon = false; // stop this thread
299 }
300 }
301 }
302 } catch (IOException e) {
303 goon = false;
304 //e.printStackTrace();
305 logger.info("IOException ", e);
306 } catch (ClassNotFoundException e) {
307 goon = false;
308 e.printStackTrace();
309 logger.info("ClassNotFoundException ", e);
310 } finally {
311 logger.info("finally " + this);
312 }
313 }
314 logger.info("executor terminated " + this);
315 channel.close();
316 }
317
318 }