001/* 002 * $Id: ExecutableChannels.java 4078 2012-07-28 12:40:12Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.BufferedReader; 009import java.io.FileNotFoundException; 010import java.io.FileInputStream; 011import java.io.InputStreamReader; 012import java.io.IOException; 013import java.nio.charset.Charset; 014import java.util.ArrayList; 015import java.util.List; 016 017import org.apache.log4j.Logger; 018 019 020// import edu.unima.ky.parallel.ChannelFactory; 021// import edu.unima.ky.parallel.SocketChannel; 022 023/** 024 * ExecutableChannels used to receive and execute classes. 025 * @author Heinz Kredel 026 */ 027 028 029public class ExecutableChannels { 030 031 032 private static final Logger logger = Logger.getLogger(ExecutableChannels.class); 033 034 035 /** 036 * default port. 037 */ 038 protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT; 039 040 041 /** 042 * default machine file. 043 */ 044 protected final static String DEFAULT_MFILE = "examples/machines.test"; 045 046 047 protected final ChannelFactory cf; 048 049 050 protected SocketChannel[] channels = null; 051 052 053 protected String[] servers = null; 054 055 056 protected int[] ports = null; 057 058 059 /** 060 * Internal constructor. 061 */ 062 protected ExecutableChannels() { 063 cf = new ChannelFactory(); 064 cf.init(); 065 } 066 067 068 /** 069 * Constructor from array of server:port strings. 070 * @param srvs A String array. 071 */ 072 public ExecutableChannels(String[] srvs) { 073 this(); 074 if (srvs == null) { 075 return; 076 } 077 servers = new String[srvs.length]; 078 ports = new int[srvs.length]; 079 for (int i = 0; i < srvs.length; i++) { 080 setServerPort(i, srvs[i]); 081 } 082 } 083 084 085 /** 086 * Constructor from machine file. 087 * @param mfile 088 * @throws FileNotFoundException. 089 */ 090 public ExecutableChannels(String mfile) throws FileNotFoundException { 091 this(); 092 if (mfile == null || mfile.length() == 0) { 093 mfile = DEFAULT_MFILE; 094 } 095 InputStreamReader isr = new InputStreamReader(new FileInputStream(mfile),Charset.forName("UTF8")); 096 BufferedReader in = new BufferedReader(isr); 097 String line = null; 098 List<String> list = new ArrayList<String>(); 099 int x; 100 try { 101 while (true) { 102 if (!in.ready()) { 103 break; 104 } 105 line = in.readLine(); 106 if (line == null) { 107 break; 108 } 109 x = line.indexOf("#"); 110 if (x >= 0) { 111 line = line.substring(0, x); 112 } 113 line = line.trim(); 114 if (line.length() == 0) { 115 continue; 116 } 117 list.add(line); 118 } 119 } catch (IOException ignored) { 120 } finally { 121 try { 122 in.close(); 123 } catch (IOException ignore) { 124 } 125 } 126 logger.debug("list.size() in " + mfile + " = " + list.size()); 127 if (list.size() == 0) { 128 return; 129 } 130 servers = new String[list.size()]; 131 ports = new int[list.size()]; 132 for (int i = 0; i < servers.length; i++) { 133 setServerPort(i, list.get(i)); 134 } 135 } 136 137 138 /* 139 * internal method 140 */ 141 protected void setServerPort(int i, String srv) { 142 int x = srv.indexOf(":"); 143 ports[i] = DEFAULT_PORT; 144 if (x < 0) { 145 servers[i] = srv; 146 } else { 147 servers[i] = srv.substring(0, x); 148 String p = srv.substring(x + 1, srv.length()); 149 try { 150 ports[i] = Integer.parseInt(p); 151 } catch (NumberFormatException ignored) { 152 } 153 } 154 } 155 156 157 /** 158 * String representation. 159 */ 160 @Override 161 public String toString() { 162 StringBuffer s = new StringBuffer("ExecutableChannels("); 163 if (servers != null) { 164 for (int i = 0; i < servers.length; i++) { 165 s.append(servers[i] + ":" + ports[i]); 166 if (i < servers.length - 1) { 167 s.append(" "); 168 } 169 } 170 } 171 if (channels != null) { 172 s.append(" channels = "); 173 for (int i = 0; i < channels.length; i++) { 174 s.append(channels[i]); 175 if (i < channels.length - 1) { 176 s.append(" "); 177 } 178 } 179 } 180 s.append(")"); 181 return s.toString(); 182 } 183 184 185 /** 186 * number of servers. 187 */ 188 public int numServers() { 189 if (servers != null) { 190 return servers.length; 191 } 192 return -1; 193 } 194 195 196 /** 197 * get master host. 198 */ 199 public String getMasterHost() { 200 if (servers != null && servers.length > 0) { 201 return servers[0]; 202 } 203 return null; 204 } 205 206 207 /** 208 * get master port. 209 */ 210 public int getMasterPort() { 211 if (ports != null && ports.length > 0) { 212 return ports[0]; 213 } 214 return 0; 215 } 216 217 218 /** 219 * number of channels. 220 */ 221 public int numChannels() { 222 if (channels != null) { 223 return channels.length; 224 } 225 return -1; 226 } 227 228 229 /** 230 * open, setup of SocketChannels. 231 * @throws IOException. 232 */ 233 public void open() throws IOException { 234 logger.debug("opening " + servers.length + " channels"); 235 if (servers.length <= 1) { 236 throw new IOException("to few servers"); 237 } 238 channels = new SocketChannel[servers.length - 1]; 239 for (int i = 1; i < servers.length; i++) { 240 channels[i - 1] = cf.getChannel(servers[i], ports[i]); 241 } 242 } 243 244 245 /** 246 * open, setup of SocketChannels. If nc > servers.length open in round 247 * robin fashion. 248 * @param nc number of channels to open. 249 * @throws IOException. 250 */ 251 public void open(int nc) throws IOException { 252 logger.debug("opening " + nc + " channels"); 253 if (servers.length <= 1) { 254 throw new IOException("to few servers"); 255 } 256 channels = new SocketChannel[nc]; 257 int j = 1; // 0 is master 258 for (int i = 0; i < channels.length; i++) { 259 if (j >= servers.length) { // modulo #servers 260 j = 1; 261 } 262 channels[i] = cf.getChannel(servers[j], ports[j]); 263 j++; 264 } 265 } 266 267 268 /** 269 * close all channels and ChannelFactory. 270 */ 271 public void close() { 272 logger.debug("closing ExecutableChannels"); 273 if (cf != null) { 274 cf.terminate(); 275 } 276 if (channels != null) { 277 for (int i = 0; i < channels.length; i++) { 278 if (channels[i] != null) { 279 try { 280 channels[i].send(ExecutableServer.STOP); 281 } catch (IOException e) { 282 e.printStackTrace(); 283 } finally { 284 channels[i].close(); 285 } 286 channels[i] = null; 287 } 288 } 289 channels = null; 290 } 291 logger.debug("ExecuteChannels closed"); 292 } 293 294 295 /** 296 * getChannel. 297 * @param i channel number. 298 */ 299 public SocketChannel getChannel(int i) { 300 if (channels != null && 0 <= i && i < channels.length) { 301 return channels[i]; 302 } 303 return null; 304 } 305 306 307 /** 308 * getChannels. 309 */ 310 /*package*/SocketChannel[] getChannels() { 311 return channels; 312 } 313 314 315 /** 316 * send on channel i. 317 * @param i channel number. 318 * @param o object to send. 319 */ 320 public void send(int i, Object o) throws IOException { 321 if (channels != null && 0 <= i && i < channels.length) { 322 channels[i].send(o); 323 } 324 } 325 326 327 /** 328 * recieve on channel i. 329 * @param i channel number. 330 * @return object recieved. 331 */ 332 public Object receive(int i) throws IOException, ClassNotFoundException { 333 if (channels != null && 0 <= i && i < channels.length) { 334 return channels[i].receive(); 335 } 336 return null; 337 338 } 339 340}