001 /*
002 * $Id: ExecutableServer.java,v 1.1 2004/09/18 20:28:07 heinz Exp $
003 */
004
005 //package edu.jas.util;
006 package comm;
007
008 import util.Logger;
009
010 import java.io.IOException;
011 import java.util.Iterator;
012 import java.util.List;
013 import java.util.ArrayList;
014
015 //import org.apache.log4j.Logger;
016
017 //import edu.unima.ky.parallel.ChannelFactory;
018 //import edu.unima.ky.parallel.SocketChannel;
019
020 /**
021 * Class ExecutableServer
022 * Used to receive and execute objects.
023 * @author Heinz Kredel.
024 */
025 public class ExecutableServer extends Thread {
026
027 // private static Logger logger = Logger.getLogger(ExecutableServer.class);
028 private static Logger logger = new Logger();
029
030 /**
031 * DEFAULT_PORT to listen to.
032 */
033 public static final int DEFAULT_PORT = 7411;
034 public static final String DONE = "Done";
035
036 protected final ChannelFactory cf;
037 protected ArrayList servers = null;
038
039 private boolean goon = true;
040 private Thread mythread = null;
041
042 public ExecutableServer() {
043 this(DEFAULT_PORT);
044 }
045
046 /**
047 * @param port to listen on.
048 */
049 public ExecutableServer(int port) {
050 this( new ChannelFactory(port) );
051 }
052
053 /**
054 * @param cf (re)use this ChannelFactory.
055 */
056 public ExecutableServer(ChannelFactory cf) {
057 this.cf = cf;
058 servers = new ArrayList();
059 }
060
061
062 /**
063 * Main method to start serving thread.
064 * @param args args[0] is port.
065 */
066 public static void main(String[] args) {
067
068 int port = DEFAULT_PORT;
069 if ( args.length < 1 ) {
070 System.out.println("Usage: ExecutableServer <port>");
071 } else {
072 try {
073 port = Integer.parseInt( args[0] );
074 } catch (NumberFormatException e) {
075 }
076 }
077 System.out.println("ExecutableServer at port " + port);
078 (new ExecutableServer(port)).run();
079 // until CRTL-C
080 }
081
082
083 /**
084 * Thread initialization and start.
085 */
086 public void init() {
087 this.start();
088 }
089
090 /**
091 * Number of servers.
092 */
093 public int size() {
094 return servers.size();
095 }
096
097 /**
098 * Accept channels and setup of executor threads.
099 */
100 public void run() {
101 SocketChannel channel = null;
102 Executor s = null;
103 mythread = Thread.currentThread();
104 while (goon) {
105 logger.debug("execute server " + this + " go on");
106 try {
107 channel = cf.getChannel();
108 logger.debug("execute channel = "+channel);
109 if ( mythread.isInterrupted() ) {
110 goon = false;
111 //logger.info("execute server " + this + " interrupted");
112 } else {
113 s = new Executor(channel,servers);
114 servers.add( s );
115 s.start();
116 logger.debug("server " + s + " started");
117 }
118 } catch (InterruptedException e) {
119 goon = false;
120 e.printStackTrace();
121 }
122 }
123 logger.debug("execute server " + this + " terminated");
124 }
125
126
127 /**
128 * Terminate all servers.
129 */
130 public void terminate() {
131 goon = false;
132 logger.debug("terminating ExecutableServer");
133 if ( cf != null ) cf.terminate();
134 if ( servers != null ) {
135 Iterator it = servers.iterator();
136 while ( it.hasNext() ) {
137 Executor x = (Executor) it.next();
138 x.channel.close();
139 try {
140 while ( x.isAlive() ) {
141 //System.out.print(".");
142 x.interrupt();
143 x.join(100);
144 }
145 logger.debug("server " + x + " terminated");
146 } catch (InterruptedException e) {
147 }
148 }
149 servers = null;
150 }
151 logger.debug("Executors terminated");
152 if ( mythread == null ) return;
153 try {
154 while ( mythread.isAlive() ) {
155 //System.out.print("-");
156 mythread.interrupt();
157 mythread.join(100);
158 }
159 //logger.debug("server " + mythread + " terminated");
160 } catch (InterruptedException e) {
161 }
162 logger.debug("ExecuteServer terminated");
163 }
164
165 }
166
167
168 /**
169 * Class for executing incoming objects.
170 */
171 class Executor extends Thread /*implements Runnable*/ {
172
173 //private static Logger logger = Logger.getLogger(Executor.class);
174 private static Logger logger = new Logger();
175
176 protected final SocketChannel channel;
177 private List list;
178
179 Executor(SocketChannel s, List p) {
180 channel = s;
181 list = p;
182 }
183
184 public void run() {
185 Object o;
186 RemoteExecutable re = null;
187 boolean goon = true;
188 logger.debug("executor started "+this);
189 while (goon) {
190 try {
191 o = channel.receive();
192 if ( this.isInterrupted() ) {
193 goon = false;
194 } else {
195 System.out.println("receive: "+o+" from "+channel);
196 // check permission
197 if ( o instanceof RemoteExecutable ) {
198 re = (RemoteExecutable)o;
199 re.run();
200 if ( this.isInterrupted() ) {
201 goon = false;
202 } else {
203 channel.send(ExecutableServer.DONE);
204 goon = false; // stop this thread
205 }
206 }
207 }
208 } catch (IOException e) {
209 e.printStackTrace();
210 goon = false;
211 } catch (ClassNotFoundException e) {
212 e.printStackTrace();
213 goon = false;
214 }
215 }
216 logger.debug("executor terminated "+this);
217 channel.close();
218 }
219
220 }