001 /* 002 * $Id: ExecutableChannels.java 3337 2010-09-27 21:05:17Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 008 import java.io.BufferedReader; 009 import java.io.FileNotFoundException; 010 import java.io.FileReader; 011 import java.io.IOException; 012 import java.util.ArrayList; 013 import java.util.List; 014 015 import org.apache.log4j.Logger; 016 017 018 // import edu.unima.ky.parallel.ChannelFactory; 019 // import edu.unima.ky.parallel.SocketChannel; 020 021 /** 022 * ExecutableChannels used to receive and execute classes. 023 * @author Heinz Kredel 024 */ 025 026 027 public class ExecutableChannels { 028 029 030 private static final Logger logger = Logger.getLogger(ExecutableChannels.class); 031 032 033 /** 034 * default port. 035 */ 036 protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT; 037 038 039 /** 040 * default machine file. 041 */ 042 protected final static String DEFAULT_MFILE = "examples/machines.test"; 043 044 045 protected final ChannelFactory cf; 046 047 048 protected SocketChannel[] channels = null; 049 050 051 protected String[] servers = null; 052 053 054 protected int[] ports = null; 055 056 057 /** 058 * Internal constructor. 059 */ 060 protected ExecutableChannels() { 061 cf = new ChannelFactory(); 062 cf.init(); 063 } 064 065 066 /** 067 * Constructor from array of server:port strings. 068 * @param srvs A String array. 069 */ 070 public ExecutableChannels(String[] srvs) { 071 this(); 072 if (srvs == null) { 073 return; 074 } 075 servers = new String[srvs.length]; 076 ports = new int[srvs.length]; 077 for (int i = 0; i < srvs.length; i++) { 078 setServerPort(i, srvs[i]); 079 } 080 } 081 082 083 /** 084 * Constructor from machine file. 085 * @param mfile 086 * @throws FileNotFoundException. 087 */ 088 public ExecutableChannels(String mfile) throws FileNotFoundException { 089 this(); 090 if (mfile == null || mfile.length() == 0) { 091 mfile = DEFAULT_MFILE; 092 } 093 BufferedReader in = new BufferedReader(new FileReader(mfile)); 094 String line = null; 095 List<String> list = new ArrayList<String>(); 096 int x; 097 try { 098 while (true) { 099 if (!in.ready()) { 100 break; 101 } 102 line = in.readLine(); 103 if (line == null) { 104 break; 105 } 106 x = line.indexOf("#"); 107 if (x >= 0) { 108 line = line.substring(0, x); 109 } 110 line = line.trim(); 111 if (line.length() == 0) { 112 continue; 113 } 114 list.add(line); 115 } 116 } catch (IOException ignored) { 117 } finally { 118 try { 119 in.close(); 120 } catch (IOException ignore) { 121 } 122 } 123 logger.debug("list.size() in " + mfile + " = " + list.size()); 124 if (list.size() == 0) { 125 return; 126 } 127 servers = new String[list.size()]; 128 ports = new int[list.size()]; 129 for (int i = 0; i < servers.length; i++) { 130 setServerPort(i, list.get(i)); 131 } 132 } 133 134 135 /* 136 * internal method 137 */ 138 protected void setServerPort(int i, String srv) { 139 int x = srv.indexOf(":"); 140 ports[i] = DEFAULT_PORT; 141 if (x < 0) { 142 servers[i] = srv; 143 } else { 144 servers[i] = srv.substring(0, x); 145 String p = srv.substring(x + 1, srv.length()); 146 try { 147 ports[i] = Integer.parseInt(p); 148 } catch (NumberFormatException ignored) { 149 } 150 } 151 } 152 153 154 /** 155 * String representation. 156 */ 157 @Override 158 public String toString() { 159 StringBuffer s = new StringBuffer("ExecutableChannels("); 160 if (servers != null) { 161 for (int i = 0; i < servers.length; i++) { 162 s.append(servers[i] + ":" + ports[i]); 163 if (i < servers.length - 1) { 164 s.append(" "); 165 } 166 } 167 } 168 if (channels != null) { 169 s.append(" channels = "); 170 for (int i = 0; i < channels.length; i++) { 171 s.append(channels[i]); 172 if (i < channels.length - 1) { 173 s.append(" "); 174 } 175 } 176 } 177 s.append(")"); 178 return s.toString(); 179 } 180 181 182 /** 183 * number of servers. 184 */ 185 public int numServers() { 186 if (servers != null) { 187 return servers.length; 188 } 189 return -1; 190 } 191 192 193 /** 194 * get master host. 195 */ 196 public String getMasterHost() { 197 if (servers != null && servers.length > 0) { 198 return servers[0]; 199 } 200 return null; 201 } 202 203 204 /** 205 * get master port. 206 */ 207 public int getMasterPort() { 208 if (ports != null && ports.length > 0) { 209 return ports[0]; 210 } 211 return 0; 212 } 213 214 215 /** 216 * number of channels. 217 */ 218 public int numChannels() { 219 if (channels != null) { 220 return channels.length; 221 } 222 return -1; 223 } 224 225 226 /** 227 * open, setup of SocketChannels. 228 * @throws IOException. 229 */ 230 public void open() throws IOException { 231 logger.debug("opening " + servers.length + " channels"); 232 if (servers.length <= 1) { 233 throw new IOException("to few servers"); 234 } 235 channels = new SocketChannel[servers.length - 1]; 236 for (int i = 1; i < servers.length; i++) { 237 channels[i - 1] = cf.getChannel(servers[i], ports[i]); 238 } 239 } 240 241 242 /** 243 * open, setup of SocketChannels. If nc > servers.length open in round 244 * robin fashion. 245 * @param nc number of channels to open. 246 * @throws IOException. 247 */ 248 public void open(int nc) throws IOException { 249 logger.debug("opening " + nc + " channels"); 250 if (servers.length <= 1) { 251 throw new IOException("to few servers"); 252 } 253 channels = new SocketChannel[nc]; 254 int j = 1; // 0 is master 255 for (int i = 0; i < channels.length; i++) { 256 if (j >= servers.length) { // modulo #servers 257 j = 1; 258 } 259 channels[i] = cf.getChannel(servers[j], ports[j]); 260 j++; 261 } 262 } 263 264 265 /** 266 * close all channels and ChannelFactory. 267 */ 268 public void close() { 269 logger.debug("closing ExecutableChannels"); 270 if (cf != null) { 271 cf.terminate(); 272 } 273 if (channels != null) { 274 for (int i = 0; i < channels.length; i++) { 275 if (channels[i] != null) { 276 try { 277 channels[i].send(ExecutableServer.STOP); 278 } catch (IOException e) { 279 e.printStackTrace(); 280 } finally { 281 channels[i].close(); 282 } 283 channels[i] = null; 284 } 285 } 286 channels = null; 287 } 288 logger.debug("ExecuteChannels closed"); 289 } 290 291 292 /** 293 * getChannel. 294 * @param i channel number. 295 */ 296 public SocketChannel getChannel(int i) { 297 if (channels != null && 0 <= i && i < channels.length) { 298 return channels[i]; 299 } 300 return null; 301 } 302 303 304 /** 305 * getChannels. 306 */ 307 /*package*/SocketChannel[] getChannels() { 308 return channels; 309 } 310 311 312 /** 313 * send on channel i. 314 * @param i channel number. 315 * @param o object to send. 316 */ 317 public void send(int i, Object o) throws IOException { 318 if (channels != null && 0 <= i && i < channels.length) { 319 channels[i].send(o); 320 } 321 } 322 323 324 /** 325 * recieve on channel i. 326 * @param i channel number. 327 * @return object recieved. 328 */ 329 public Object receive(int i) throws IOException, ClassNotFoundException { 330 if (channels != null && 0 <= i && i < channels.length) { 331 return channels[i].receive(); 332 } 333 return null; 334 335 } 336 337 }