001    /*
002     * $Id: ExecutableServer.java,v 1.1 2004/09/18 20:28:07 heinz Exp $
003     */
004    
005    //package edu.jas.util;
006    package comm;
007    
008    import util.Logger;
009    
010    import java.io.IOException;
011    import java.util.Iterator;
012    import java.util.List;
013    import java.util.ArrayList;
014    
015    //import org.apache.log4j.Logger;
016    
017    //import edu.unima.ky.parallel.ChannelFactory;
018    //import edu.unima.ky.parallel.SocketChannel;
019    
020    /**
021     * Class ExecutableServer
022     * Used to receive and execute objects.
023     * @author Heinz Kredel.
024     */
025    public class ExecutableServer extends Thread {
026    
027        // private static Logger logger = Logger.getLogger(ExecutableServer.class);
028        private static Logger logger = new Logger();
029    
030    /**
031     * DEFAULT_PORT to listen to.
032     */
033        public static final int DEFAULT_PORT = 7411;
034        public static final String DONE = "Done";
035    
036        protected final ChannelFactory cf;
037        protected ArrayList servers = null;
038    
039        private boolean goon = true;
040        private Thread mythread = null;
041    
042        public ExecutableServer() {
043            this(DEFAULT_PORT);
044        }
045    
046    /**
047     * @param port to listen on.
048     */
049        public ExecutableServer(int port) {
050            this( new ChannelFactory(port) );
051        }
052    
053    /**
054     * @param cf (re)use this ChannelFactory.
055     */
056        public ExecutableServer(ChannelFactory cf) {
057            this.cf = cf;
058            servers = new ArrayList();
059        }
060    
061    
062    /**
063     * Main method to start serving thread.
064     * @param args args[0] is port.
065     */
066        public static void main(String[] args) {
067    
068            int port = DEFAULT_PORT;
069            if ( args.length < 1 ) {
070                System.out.println("Usage: ExecutableServer <port>");
071            } else {
072                try {
073                    port = Integer.parseInt( args[0] );
074                } catch (NumberFormatException e) {
075                }
076            }
077            System.out.println("ExecutableServer at port " + port);
078            (new ExecutableServer(port)).run();
079            // until CRTL-C
080        }
081    
082    
083    /**
084     * Thread initialization and start.
085     */ 
086        public void init() {
087            this.start();
088        }
089    
090    /**
091     * Number of servers.
092     */ 
093        public int size() {
094            return servers.size();
095        }
096    
097    /**
098     * Accept channels and setup of executor threads.
099     */ 
100        public void run() {
101           SocketChannel channel = null;
102           Executor s = null;
103           mythread = Thread.currentThread();
104           while (goon) {
105              logger.debug("execute server " + this + " go on");
106              try {
107                   channel = cf.getChannel();
108                   logger.debug("execute channel = "+channel);
109                   if ( mythread.isInterrupted() ) {
110                       goon = false;
111                       //logger.info("execute server " + this + " interrupted");
112                   } else {
113                      s = new Executor(channel,servers);
114                      servers.add( s );
115                      s.start();
116                      logger.debug("server " + s + " started");
117                   }
118              } catch (InterruptedException e) {
119                   goon = false;
120                   e.printStackTrace();
121              }
122           }
123           logger.debug("execute server " + this + " terminated");
124        }
125    
126    
127    /**
128     * Terminate all servers.
129     */ 
130        public void terminate() {
131            goon = false;
132            logger.debug("terminating ExecutableServer");
133            if ( cf != null ) cf.terminate();
134            if ( servers != null ) {
135               Iterator it = servers.iterator();
136               while ( it.hasNext() ) {
137                  Executor x = (Executor) it.next();
138                  x.channel.close();
139                  try { 
140                      while ( x.isAlive() ) {
141                              //System.out.print(".");
142                              x.interrupt(); 
143                              x.join(100);
144                      }
145                      logger.debug("server " + x + " terminated");
146                  } catch (InterruptedException e) { 
147                  }
148               }
149               servers = null;
150            }
151            logger.debug("Executors terminated");
152            if ( mythread == null ) return;
153            try { 
154                while ( mythread.isAlive() ) {
155                        //System.out.print("-");
156                        mythread.interrupt(); 
157                        mythread.join(100);
158                }
159                //logger.debug("server " + mythread + " terminated");
160            } catch (InterruptedException e) { 
161            }
162            logger.debug("ExecuteServer terminated");
163        }
164    
165    }
166    
167    
168    /**
169     * Class for executing incoming objects.
170     */
171    class Executor extends Thread /*implements Runnable*/ {
172    
173        //private static Logger logger = Logger.getLogger(Executor.class);
174        private static Logger logger = new Logger();
175    
176        protected final SocketChannel channel;
177        private List list;
178    
179        Executor(SocketChannel s, List p) {
180            channel = s;
181            list = p;
182        } 
183    
184        public void run() {
185            Object o;
186            RemoteExecutable re = null;
187            boolean goon = true;
188            logger.debug("executor started "+this);
189            while (goon) {
190                  try {
191                       o = channel.receive();
192                       if ( this.isInterrupted() ) {
193                            goon = false;
194                       } else {
195                         System.out.println("receive: "+o+" from "+channel);
196                         // check permission
197                         if ( o instanceof RemoteExecutable ) {
198                             re = (RemoteExecutable)o;
199                             re.run();
200                             if ( this.isInterrupted() ) {
201                                goon = false;
202                             } else {
203                               channel.send(ExecutableServer.DONE);
204                               goon = false; // stop this thread
205                             }
206                         }
207                       }
208                  } catch (IOException e) {
209                      e.printStackTrace();
210                      goon = false;
211                  } catch (ClassNotFoundException e) {
212                      e.printStackTrace();
213                      goon = false;
214                  }
215            }
216            logger.debug("executor terminated "+this);
217            channel.close();
218        }
219    
220    }