001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.Iterator;
010import java.util.List;
011import java.util.ArrayList;
012
013import org.apache.logging.log4j.Logger;
014import org.apache.logging.log4j.LogManager; 
015
016
017/**
018 * ExecutableServer is used to receive and execute classes.
019 * @author Heinz Kredel
020 */
021
022public class ExecutableServer extends Thread {
023
024
025    private static final Logger logger = LogManager.getLogger(ExecutableServer.class);
026
027
028    private static final boolean debug = logger.isDebugEnabled();
029
030
031    /**
032     * ChannelFactory to use.
033     */
034    protected final ChannelFactory cf;
035
036
037    /**
038     * List of server threads.
039     */
040    protected List<Executor> servers = null;
041
042
043    /**
044     * Default port to listen to.
045     */
046    public static final int DEFAULT_PORT = 7411;
047
048
049    /**
050     * Constant to signal completion.
051     */
052    public static final String DONE = "Done";
053
054
055    /**
056     * Constant to request shutdown.
057     */
058    public static final String STOP = "Stop";
059
060
061    private volatile boolean goon = true;
062
063
064    private volatile Thread mythread = null;
065
066
067    /**
068     * ExecutableServer on default port.
069     */
070    public ExecutableServer() {
071        this(DEFAULT_PORT);
072    }
073
074
075    /**
076     * ExecutableServer.
077     * @param port
078     */
079    public ExecutableServer(int port) {
080        this(new ChannelFactory(port));
081    }
082
083
084    /**
085     * ExecutableServer.
086     * @param cf channel factory to reuse.
087     */
088    public ExecutableServer(ChannelFactory cf) {
089        this.cf = cf;
090        cf.init();
091        servers = new ArrayList<Executor>();
092    }
093
094
095    /**
096     * main method to start serving thread.
097     * @param args args[0] is port
098     */
099    public static void main(String[] args) throws InterruptedException {
100        int port = DEFAULT_PORT;
101        if (args.length < 1) {
102            System.out.println("Usage: ExecutableServer <port>");
103        } else {
104            try {
105                port = Integer.parseInt(args[0]);
106            } catch (NumberFormatException e) {
107            }
108        }
109        //logger.info("ExecutableServer at port {}", port);
110        ExecutableServer es = new ExecutableServer(port);
111        es.init();
112        es.join(); // do not use terminate()
113        // until CRTL-C
114    }
115
116
117    /**
118     * thread initialization and start.
119     */
120    public void init() {
121        this.start();
122        logger.info("ExecutableServer at {}", cf);
123    }
124
125
126    /**
127     * number of servers.
128     */
129    public int size() {
130        if ( servers == null ) {
131            return -1;
132        }
133        return servers.size();
134    }
135
136
137    /**
138     * run is main server method.
139     */
140    @Override
141    public void run() {
142        SocketChannel channel = null;
143        Executor s = null;
144        mythread = Thread.currentThread();
145        while (goon) {
146            if (debug) {
147                logger.info("server {} go on", this);
148            }
149            try {
150                channel = cf.getChannel();
151                logger.info("channel = {}", channel);
152                if (mythread.isInterrupted()) {
153                    goon = false;
154                    logger.debug("execute server {} interrupted", this);
155                    channel.close();
156                } else {
157                    s = new Executor(channel); // ---,servers);
158                    if (goon) { // better synchronize with terminate
159                        servers.add(s);
160                        s.start();
161                        logger.debug("server {} started", s);
162                    } else {
163                        s = null;
164                        channel.close();
165                    }
166                }
167            } catch (InterruptedException e) {
168                goon = false;
169                Thread.currentThread().interrupt();
170                if (debug) {
171                    e.printStackTrace();
172                }
173            }
174        }
175        if (debug) {
176            logger.info("server {} terminated", this);
177        }
178    }
179
180
181    /**
182     * terminate all servers.
183     */
184    public void terminate() {
185        goon = false;
186        logger.info("terminating ExecutableServer");
187        if (cf != null)
188            cf.terminate();
189        if (servers != null) {
190            Iterator<Executor> it = servers.iterator();
191            while (it.hasNext()) {
192                Executor x = it.next();
193                if (x.channel != null) {
194                    x.channel.close();
195                }
196                try {
197                    while (x.isAlive()) {
198                        //System.out.print(".");
199                        x.interrupt();
200                        x.join(100);
201                    }
202                    logger.debug("server {} terminated", x);
203                } catch (InterruptedException e) {
204                    Thread.currentThread().interrupt();
205                }
206            }
207            servers = null;
208        }
209        logger.info("Executors terminated");
210        if (mythread == null)
211            return;
212        try {
213            while (mythread.isAlive()) {
214                //System.out.print("-");
215                mythread.interrupt();
216                mythread.join(100);
217            }
218            //logger.debug("server {} terminated", mythread);
219        } catch (InterruptedException e) {
220            Thread.currentThread().interrupt();
221        }
222        mythread = null;
223        logger.info("terminated");
224    }
225
226
227    /**
228     * String representation.
229     */
230    @Override
231    public String toString() {
232        StringBuffer s = new StringBuffer("ExecutableServer(");
233        s.append(cf.toString());
234        s.append(")");
235        return s.toString();
236    }
237
238}
239
240
241/**
242 * class for executing incoming objects.
243 */
244
245class Executor extends Thread /*implements Runnable*/{
246
247
248    private static final Logger logger = LogManager.getLogger(Executor.class);
249
250    private static final boolean debug = logger.isDebugEnabled();
251
252
253    protected final SocketChannel channel;
254
255
256    Executor(SocketChannel s) {
257        channel = s;
258    }
259
260
261    /**
262     * run.
263     */
264    @Override
265    public void run() {
266        Object o;
267        RemoteExecutable re = null;
268        String d;
269        boolean goon = true;
270        logger.debug("executor started {}", this);
271        while (goon) {
272            try {
273                o = channel.receive();
274                logger.info("receive: {} from {}", o, channel);
275                if (this.isInterrupted()) {
276                    goon = false;
277                } else {
278                    if (debug) {
279                        logger.debug("receive: {} from {}", o, channel);
280                    }
281                    if (o instanceof String) {
282                        d = (String) o;
283                        if (ExecutableServer.STOP.equals(d)) {
284                            goon = false; // stop this thread
285                            channel.send(ExecutableServer.DONE);
286                        } else {
287                            logger.warn("invalid/unknown String: {} from {}", d, channel);
288                            goon = false; // stop this thread ?
289                            channel.send(ExecutableServer.DONE);
290                        }
291                    }
292                    // check permission
293                    if (o instanceof RemoteExecutable) {
294                        re = (RemoteExecutable) o;
295                        if (debug) {
296                            logger.info("running {}", re);
297                        }
298                        try {
299                            re.run();
300                        } catch(Exception e) {
301                            logger.info("Exception on re.run() {}", e);
302                            if (logger.isInfoEnabled()) {
303                                e.printStackTrace();
304                            }
305                        } finally {
306                            logger.info("finally re.run() {}", re);
307                        }
308                        if (debug) {
309                            logger.info("finished {}", re);
310                        }
311                        if (this.isInterrupted()) {
312                            goon = false;
313                        } else {
314                            channel.send(ExecutableServer.DONE);
315                            logger.info("finished send {}", ExecutableServer.DONE);
316                            //goon = false; // stop this thread
317                        }
318                    }
319                }
320            } catch (IOException e) {
321                goon = false;
322                logger.info("IOException {}", e);
323                if (debug) {
324                    e.printStackTrace();
325                }
326            } catch (ClassNotFoundException e) {
327                goon = false;
328                logger.info("ClassNotFoundException {}", e);
329                e.printStackTrace();
330            } finally {
331                logger.info("finally {}", this);
332            }
333        }
334        channel.close();
335        logger.info("terminated {}", this);
336    }
337
338}