001    /*
002     * $Id: ExecutableServer.java 3320 2010-09-12 11:01:57Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    
008    import java.io.IOException;
009    import java.util.Iterator;
010    import java.util.List;
011    import java.util.ArrayList;
012    
013    import org.apache.log4j.Logger;
014    import org.apache.log4j.BasicConfigurator;
015    
016    
017    /**
018     * ExecutableServer is used to receive and execute classes.
019     * @author Heinz Kredel
020     */
021    
022    public class ExecutableServer extends Thread {
023    
024    
025        private static final Logger logger = Logger.getLogger(ExecutableServer.class);
026    
027    
028        private final boolean debug = logger.isDebugEnabled();
029    
030    
031        /**
032         * ChannelFactory to use.
033         */
034        protected final ChannelFactory cf;
035    
036    
037        /**
038         * List of server threads.
039         */
040        protected List<Executor> servers = null;
041    
042    
043        /**
044         * Default port to listen to.
045         */
046        public static final int DEFAULT_PORT = 7411;
047    
048    
049        /**
050         * Constant to signal completion.
051         */
052        public static final String DONE = "Done";
053    
054    
055        /**
056         * Constant to request shutdown.
057         */
058        public static final String STOP = "Stop";
059    
060    
061        private volatile boolean goon = true;
062    
063    
064        private Thread mythread = null;
065    
066    
067        /**
068         * ExecutableServer on default port.
069         */
070        public ExecutableServer() {
071            this(DEFAULT_PORT);
072        }
073    
074    
075        /**
076         * ExecutableServer.
077         * @param port
078         */
079        public ExecutableServer(int port) {
080            this(new ChannelFactory(port));
081        }
082    
083    
084        /**
085         * ExecutableServer.
086         * @param cf channel factory to reuse.
087         */
088        public ExecutableServer(ChannelFactory cf) {
089            this.cf = cf;
090            cf.init();
091            servers = new ArrayList<Executor>();
092        }
093    
094    
095        /**
096         * main method to start serving thread.
097         * @param args args[0] is port
098         */
099        public static void main(String[] args) {
100            BasicConfigurator.configure();
101    
102            int port = DEFAULT_PORT;
103            if (args.length < 1) {
104                System.out.println("Usage: ExecutableServer <port>");
105            } else {
106                try {
107                    port = Integer.parseInt(args[0]);
108                } catch (NumberFormatException e) {
109                }
110            }
111            logger.info("ExecutableServer at port " + port);
112            (new ExecutableServer(port)).run();
113            // until CRTL-C
114        }
115    
116    
117        /**
118         * thread initialization and start.
119         */
120        public void init() {
121            this.start();
122            logger.info("ExecutableServer at " + cf);
123        }
124    
125    
126        /**
127         * number of servers.
128         */
129        public int size() {
130            return servers.size();
131        }
132    
133    
134        /**
135         * run is main server method.
136         */
137        @Override
138        public void run() {
139            SocketChannel channel = null;
140            Executor s = null;
141            mythread = Thread.currentThread();
142            while (goon) {
143                if (debug) {
144                    logger.info("execute server " + this + " go on");
145                }
146                try {
147                    channel = cf.getChannel();
148                    logger.debug("execute channel = " + channel);
149                    if (mythread.isInterrupted()) {
150                        goon = false;
151                        logger.debug("execute server " + this + " interrupted");
152                        channel.close();
153                    } else {
154                        s = new Executor(channel); // ---,servers);
155                        if (goon) { // better synchronize with terminate
156                            servers.add(s);
157                            s.start();
158                            logger.debug("server " + s + " started");
159                        } else {
160                            s = null;
161                            channel.close();
162                        }
163                    }
164                } catch (InterruptedException e) {
165                    goon = false;
166                    Thread.currentThread().interrupt();
167                    if (logger.isDebugEnabled()) {
168                        e.printStackTrace();
169                    }
170                }
171            }
172            if (debug) {
173                logger.info("execute server " + this + " terminated");
174            }
175        }
176    
177    
178        /**
179         * terminate all servers.
180         */
181        public void terminate() {
182            goon = false;
183            logger.debug("terminating ExecutableServer");
184            if (cf != null)
185                cf.terminate();
186            if (servers != null) {
187                Iterator<Executor> it = servers.iterator();
188                while (it.hasNext()) {
189                    Executor x = it.next();
190                    if (x.channel != null) {
191                        x.channel.close();
192                    }
193                    try {
194                        while (x.isAlive()) {
195                            //System.out.print(".");
196                            x.interrupt();
197                            x.join(100);
198                        }
199                        logger.debug("server " + x + " terminated");
200                    } catch (InterruptedException e) {
201                        Thread.currentThread().interrupt();
202                    }
203                }
204                servers = null;
205            }
206            logger.debug("Executors terminated");
207            if (mythread == null)
208                return;
209            try {
210                while (mythread.isAlive()) {
211                    //System.out.print("-");
212                    mythread.interrupt();
213                    mythread.join(100);
214                }
215                //logger.debug("server " + mythread + " terminated");
216            } catch (InterruptedException e) {
217                Thread.currentThread().interrupt();
218            }
219            mythread = null;
220            logger.debug("ExecuteServer terminated");
221        }
222    
223    }
224    
225    
226    /**
227     * class for executing incoming objects.
228     */
229    
230    class Executor extends Thread /*implements Runnable*/{
231    
232    
233        private static final Logger logger = Logger.getLogger(Executor.class);
234    
235        private final boolean debug = logger.isInfoEnabled();
236    
237    
238        protected final SocketChannel channel;
239    
240    
241        Executor(SocketChannel s) {
242            channel = s;
243        }
244    
245    
246        /**
247         * run.
248         */
249        @Override
250        public void run() {
251            Object o;
252            RemoteExecutable re = null;
253            String d;
254            boolean goon = true;
255            logger.debug("executor started " + this);
256            while (goon) {
257                try {
258                    o = channel.receive();
259                    logger.info("receive: " + o + " from " + channel);
260                    if (this.isInterrupted()) {
261                        goon = false;
262                    } else {
263                        if (logger.isDebugEnabled()) {
264                            logger.debug("receive: " + o + " from " + channel);
265                        }
266                        if (o instanceof String) {
267                            d = (String) o;
268                            if (ExecutableServer.STOP.equals(d)) {
269                                goon = false; // stop this thread
270                                channel.send(ExecutableServer.DONE);
271                            } else {
272                                goon = false; // stop this thread
273                                channel.send(ExecutableServer.DONE);
274                            }
275                        }
276                        // check permission
277                        if (o instanceof RemoteExecutable) {
278                            re = (RemoteExecutable) o;
279                            if ( debug ) {
280                                logger.info("running " + re);
281                            }
282                            try {
283                                re.run();
284                            } catch(Exception e) {
285                                e.printStackTrace();
286                                logger.info("Exception on re.run()", e);
287                            } finally {
288                                logger.info("finally re.run() " + re);
289                            }
290                            if ( debug ) {
291                                logger.info("finished " + re);
292                            }
293                            if (this.isInterrupted()) {
294                                goon = false;
295                            } else {
296                                channel.send(ExecutableServer.DONE);
297                                logger.info("finished send " + ExecutableServer.DONE);
298                                //goon = false; // stop this thread
299                            }
300                        }
301                    }
302                } catch (IOException e) {
303                    goon = false;
304                    //e.printStackTrace();
305                    logger.info("IOException ", e);
306                } catch (ClassNotFoundException e) {
307                    goon = false;
308                    e.printStackTrace();
309                    logger.info("ClassNotFoundException ", e);
310                } finally {
311                    logger.info("finally " + this);
312                }
313            }
314            logger.info("executor terminated " + this);
315            channel.close();
316        }
317    
318    }