001 /*
002 * $Id: ExecutableChannels.java 3337 2010-09-27 21:05:17Z kredel $
003 */
004
005 package edu.jas.util;
006
007
008 import java.io.BufferedReader;
009 import java.io.FileNotFoundException;
010 import java.io.FileReader;
011 import java.io.IOException;
012 import java.util.ArrayList;
013 import java.util.List;
014
015 import org.apache.log4j.Logger;
016
017
018 // import edu.unima.ky.parallel.ChannelFactory;
019 // import edu.unima.ky.parallel.SocketChannel;
020
021 /**
022 * ExecutableChannels used to receive and execute classes.
023 * @author Heinz Kredel
024 */
025
026
027 public class ExecutableChannels {
028
029
030 private static final Logger logger = Logger.getLogger(ExecutableChannels.class);
031
032
033 /**
034 * default port.
035 */
036 protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT;
037
038
039 /**
040 * default machine file.
041 */
042 protected final static String DEFAULT_MFILE = "examples/machines.test";
043
044
045 protected final ChannelFactory cf;
046
047
048 protected SocketChannel[] channels = null;
049
050
051 protected String[] servers = null;
052
053
054 protected int[] ports = null;
055
056
057 /**
058 * Internal constructor.
059 */
060 protected ExecutableChannels() {
061 cf = new ChannelFactory();
062 cf.init();
063 }
064
065
066 /**
067 * Constructor from array of server:port strings.
068 * @param srvs A String array.
069 */
070 public ExecutableChannels(String[] srvs) {
071 this();
072 if (srvs == null) {
073 return;
074 }
075 servers = new String[srvs.length];
076 ports = new int[srvs.length];
077 for (int i = 0; i < srvs.length; i++) {
078 setServerPort(i, srvs[i]);
079 }
080 }
081
082
083 /**
084 * Constructor from machine file.
085 * @param mfile
086 * @throws FileNotFoundException.
087 */
088 public ExecutableChannels(String mfile) throws FileNotFoundException {
089 this();
090 if (mfile == null || mfile.length() == 0) {
091 mfile = DEFAULT_MFILE;
092 }
093 BufferedReader in = new BufferedReader(new FileReader(mfile));
094 String line = null;
095 List<String> list = new ArrayList<String>();
096 int x;
097 try {
098 while (true) {
099 if (!in.ready()) {
100 break;
101 }
102 line = in.readLine();
103 if (line == null) {
104 break;
105 }
106 x = line.indexOf("#");
107 if (x >= 0) {
108 line = line.substring(0, x);
109 }
110 line = line.trim();
111 if (line.length() == 0) {
112 continue;
113 }
114 list.add(line);
115 }
116 } catch (IOException ignored) {
117 } finally {
118 try {
119 in.close();
120 } catch (IOException ignore) {
121 }
122 }
123 logger.debug("list.size() in " + mfile + " = " + list.size());
124 if (list.size() == 0) {
125 return;
126 }
127 servers = new String[list.size()];
128 ports = new int[list.size()];
129 for (int i = 0; i < servers.length; i++) {
130 setServerPort(i, list.get(i));
131 }
132 }
133
134
135 /*
136 * internal method
137 */
138 protected void setServerPort(int i, String srv) {
139 int x = srv.indexOf(":");
140 ports[i] = DEFAULT_PORT;
141 if (x < 0) {
142 servers[i] = srv;
143 } else {
144 servers[i] = srv.substring(0, x);
145 String p = srv.substring(x + 1, srv.length());
146 try {
147 ports[i] = Integer.parseInt(p);
148 } catch (NumberFormatException ignored) {
149 }
150 }
151 }
152
153
154 /**
155 * String representation.
156 */
157 @Override
158 public String toString() {
159 StringBuffer s = new StringBuffer("ExecutableChannels(");
160 if (servers != null) {
161 for (int i = 0; i < servers.length; i++) {
162 s.append(servers[i] + ":" + ports[i]);
163 if (i < servers.length - 1) {
164 s.append(" ");
165 }
166 }
167 }
168 if (channels != null) {
169 s.append(" channels = ");
170 for (int i = 0; i < channels.length; i++) {
171 s.append(channels[i]);
172 if (i < channels.length - 1) {
173 s.append(" ");
174 }
175 }
176 }
177 s.append(")");
178 return s.toString();
179 }
180
181
182 /**
183 * number of servers.
184 */
185 public int numServers() {
186 if (servers != null) {
187 return servers.length;
188 }
189 return -1;
190 }
191
192
193 /**
194 * get master host.
195 */
196 public String getMasterHost() {
197 if (servers != null && servers.length > 0) {
198 return servers[0];
199 }
200 return null;
201 }
202
203
204 /**
205 * get master port.
206 */
207 public int getMasterPort() {
208 if (ports != null && ports.length > 0) {
209 return ports[0];
210 }
211 return 0;
212 }
213
214
215 /**
216 * number of channels.
217 */
218 public int numChannels() {
219 if (channels != null) {
220 return channels.length;
221 }
222 return -1;
223 }
224
225
226 /**
227 * open, setup of SocketChannels.
228 * @throws IOException.
229 */
230 public void open() throws IOException {
231 logger.debug("opening " + servers.length + " channels");
232 if (servers.length <= 1) {
233 throw new IOException("to few servers");
234 }
235 channels = new SocketChannel[servers.length - 1];
236 for (int i = 1; i < servers.length; i++) {
237 channels[i - 1] = cf.getChannel(servers[i], ports[i]);
238 }
239 }
240
241
242 /**
243 * open, setup of SocketChannels. If nc > servers.length open in round
244 * robin fashion.
245 * @param nc number of channels to open.
246 * @throws IOException.
247 */
248 public void open(int nc) throws IOException {
249 logger.debug("opening " + nc + " channels");
250 if (servers.length <= 1) {
251 throw new IOException("to few servers");
252 }
253 channels = new SocketChannel[nc];
254 int j = 1; // 0 is master
255 for (int i = 0; i < channels.length; i++) {
256 if (j >= servers.length) { // modulo #servers
257 j = 1;
258 }
259 channels[i] = cf.getChannel(servers[j], ports[j]);
260 j++;
261 }
262 }
263
264
265 /**
266 * close all channels and ChannelFactory.
267 */
268 public void close() {
269 logger.debug("closing ExecutableChannels");
270 if (cf != null) {
271 cf.terminate();
272 }
273 if (channels != null) {
274 for (int i = 0; i < channels.length; i++) {
275 if (channels[i] != null) {
276 try {
277 channels[i].send(ExecutableServer.STOP);
278 } catch (IOException e) {
279 e.printStackTrace();
280 } finally {
281 channels[i].close();
282 }
283 channels[i] = null;
284 }
285 }
286 channels = null;
287 }
288 logger.debug("ExecuteChannels closed");
289 }
290
291
292 /**
293 * getChannel.
294 * @param i channel number.
295 */
296 public SocketChannel getChannel(int i) {
297 if (channels != null && 0 <= i && i < channels.length) {
298 return channels[i];
299 }
300 return null;
301 }
302
303
304 /**
305 * getChannels.
306 */
307 /*package*/SocketChannel[] getChannels() {
308 return channels;
309 }
310
311
312 /**
313 * send on channel i.
314 * @param i channel number.
315 * @param o object to send.
316 */
317 public void send(int i, Object o) throws IOException {
318 if (channels != null && 0 <= i && i < channels.length) {
319 channels[i].send(o);
320 }
321 }
322
323
324 /**
325 * recieve on channel i.
326 * @param i channel number.
327 * @return object recieved.
328 */
329 public Object receive(int i) throws IOException, ClassNotFoundException {
330 if (channels != null && 0 <= i && i < channels.length) {
331 return channels[i].receive();
332 }
333 return null;
334
335 }
336
337 }