001/*
002 * $Id: ExecutableServer.java 4236 2012-10-04 22:03:47Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.Iterator;
010import java.util.List;
011import java.util.ArrayList;
012
013import org.apache.log4j.Logger;
014import org.apache.log4j.BasicConfigurator;
015
016
017/**
018 * ExecutableServer is used to receive and execute classes.
019 * @author Heinz Kredel
020 */
021
022public class ExecutableServer extends Thread {
023
024
025    private static final Logger logger = Logger.getLogger(ExecutableServer.class);
026
027
028    private final boolean debug = logger.isDebugEnabled();
029
030
031    /**
032     * ChannelFactory to use.
033     */
034    protected final ChannelFactory cf;
035
036
037    /**
038     * List of server threads.
039     */
040    protected List<Executor> servers = null;
041
042
043    /**
044     * Default port to listen to.
045     */
046    public static final int DEFAULT_PORT = 7411;
047
048
049    /**
050     * Constant to signal completion.
051     */
052    public static final String DONE = "Done";
053
054
055    /**
056     * Constant to request shutdown.
057     */
058    public static final String STOP = "Stop";
059
060
061    private volatile boolean goon = true;
062
063
064    private Thread mythread = null;
065
066
067    /**
068     * ExecutableServer on default port.
069     */
070    public ExecutableServer() {
071        this(DEFAULT_PORT);
072    }
073
074
075    /**
076     * ExecutableServer.
077     * @param port
078     */
079    public ExecutableServer(int port) {
080        this(new ChannelFactory(port));
081    }
082
083
084    /**
085     * ExecutableServer.
086     * @param cf channel factory to reuse.
087     */
088    public ExecutableServer(ChannelFactory cf) {
089        this.cf = cf;
090        cf.init();
091        servers = new ArrayList<Executor>();
092    }
093
094
095    /**
096     * main method to start serving thread.
097     * @param args args[0] is port
098     */
099    public static void main(String[] args) throws InterruptedException {
100        BasicConfigurator.configure();
101
102        int port = DEFAULT_PORT;
103        if (args.length < 1) {
104            System.out.println("Usage: ExecutableServer <port>");
105        } else {
106            try {
107                port = Integer.parseInt(args[0]);
108            } catch (NumberFormatException e) {
109            }
110        }
111        //logger.info("ExecutableServer at port " + port);
112        ExecutableServer es = new ExecutableServer(port);
113        es.init();
114        es.join(); // do not use terminate()
115        // until CRTL-C
116    }
117
118
119    /**
120     * thread initialization and start.
121     */
122    public void init() {
123        this.start();
124        logger.info("ExecutableServer at " + cf);
125    }
126
127
128    /**
129     * number of servers.
130     */
131    public int size() {
132        if ( servers == null ) {
133            return -1;
134        }
135        return servers.size();
136    }
137
138
139    /**
140     * run is main server method.
141     */
142    @Override
143    public void run() {
144        SocketChannel channel = null;
145        Executor s = null;
146        mythread = Thread.currentThread();
147        while (goon) {
148            if (debug) {
149                logger.info("execute server " + this + " go on");
150            }
151            try {
152                channel = cf.getChannel();
153                logger.debug("execute channel = " + channel);
154                if (mythread.isInterrupted()) {
155                    goon = false;
156                    logger.debug("execute server " + this + " interrupted");
157                    channel.close();
158                } else {
159                    s = new Executor(channel); // ---,servers);
160                    if (goon) { // better synchronize with terminate
161                        servers.add(s);
162                        s.start();
163                        logger.debug("server " + s + " started");
164                    } else {
165                        s = null;
166                        channel.close();
167                    }
168                }
169            } catch (InterruptedException e) {
170                goon = false;
171                Thread.currentThread().interrupt();
172                if (debug) {
173                    e.printStackTrace();
174                }
175            }
176        }
177        if (debug) {
178            logger.info("execute server " + this + " terminated");
179        }
180    }
181
182
183    /**
184     * terminate all servers.
185     */
186    public void terminate() {
187        goon = false;
188        logger.debug("terminating ExecutableServer");
189        if (cf != null)
190            cf.terminate();
191        if (servers != null) {
192            Iterator<Executor> it = servers.iterator();
193            while (it.hasNext()) {
194                Executor x = it.next();
195                if (x.channel != null) {
196                    x.channel.close();
197                }
198                try {
199                    while (x.isAlive()) {
200                        //System.out.print(".");
201                        x.interrupt();
202                        x.join(100);
203                    }
204                    logger.debug("server " + x + " terminated");
205                } catch (InterruptedException e) {
206                    Thread.currentThread().interrupt();
207                }
208            }
209            servers = null;
210        }
211        logger.debug("Executors terminated");
212        if (mythread == null)
213            return;
214        try {
215            while (mythread.isAlive()) {
216                //System.out.print("-");
217                mythread.interrupt();
218                mythread.join(100);
219            }
220            //logger.debug("server " + mythread + " terminated");
221        } catch (InterruptedException e) {
222            Thread.currentThread().interrupt();
223        }
224        mythread = null;
225        logger.debug("ExecuteServer terminated");
226    }
227
228
229    /**
230     * String representation.
231     */
232    @Override
233    public String toString() {
234        StringBuffer s = new StringBuffer("ExecutableServer(");
235        s.append(cf.toString());
236        s.append(")");
237        return s.toString();
238    }
239
240}
241
242
243/**
244 * class for executing incoming objects.
245 */
246
247class Executor extends Thread /*implements Runnable*/{
248
249
250    private static final Logger logger = Logger.getLogger(Executor.class);
251
252    private final boolean debug = logger.isDebugEnabled();
253
254
255    protected final SocketChannel channel;
256
257
258    Executor(SocketChannel s) {
259        channel = s;
260    }
261
262
263    /**
264     * run.
265     */
266    @Override
267    public void run() {
268        Object o;
269        RemoteExecutable re = null;
270        String d;
271        boolean goon = true;
272        logger.debug("executor started " + this);
273        while (goon) {
274            try {
275                o = channel.receive();
276                logger.info("receive: " + o + " from " + channel);
277                if (this.isInterrupted()) {
278                    goon = false;
279                } else {
280                    if (debug) {
281                        logger.debug("receive: " + o + " from " + channel);
282                    }
283                    if (o instanceof String) {
284                        d = (String) o;
285                        if (ExecutableServer.STOP.equals(d)) {
286                            goon = false; // stop this thread
287                            channel.send(ExecutableServer.DONE);
288                        } else {
289                            logger.warn("invalid/unknown String: " + d + " from " + channel);
290                            goon = false; // stop this thread ?
291                            channel.send(ExecutableServer.DONE);
292                        }
293                    }
294                    // check permission
295                    if (o instanceof RemoteExecutable) {
296                        re = (RemoteExecutable) o;
297                        if (debug) {
298                            logger.info("running " + re);
299                        }
300                        try {
301                            re.run();
302                        } catch(Exception e) {
303                            logger.info("Exception on re.run()" + e);
304                            e.printStackTrace();
305                        } finally {
306                            logger.info("finally re.run() " + re);
307                        }
308                        if (debug) {
309                            logger.info("finished " + re);
310                        }
311                        if (this.isInterrupted()) {
312                            goon = false;
313                        } else {
314                            channel.send(ExecutableServer.DONE);
315                            logger.info("finished send " + ExecutableServer.DONE);
316                            //goon = false; // stop this thread
317                        }
318                    }
319                }
320            } catch (IOException e) {
321                goon = false;
322                logger.info("IOException " + e);
323                if (debug) {
324                    e.printStackTrace();
325                }
326            } catch (ClassNotFoundException e) {
327                goon = false;
328                logger.info("ClassNotFoundException " + e);
329                e.printStackTrace();
330            } finally {
331                logger.info("finally " + this);
332            }
333        }
334        logger.info("executor terminated " + this);
335        channel.close();
336    }
337
338}