001/* 002 * $Id: ExecutableChannels.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.BufferedReader; 009import java.io.FileNotFoundException; 010import java.io.FileInputStream; 011import java.io.InputStreamReader; 012import java.io.IOException; 013import java.nio.charset.Charset; 014import java.util.ArrayList; 015import java.util.List; 016 017import org.apache.logging.log4j.Logger; 018import org.apache.logging.log4j.LogManager; 019 020 021// import edu.unima.ky.parallel.ChannelFactory; 022// import edu.unima.ky.parallel.SocketChannel; 023 024/** 025 * ExecutableChannels used to receive and execute classes. 026 * @author Heinz Kredel 027 */ 028 029 030public class ExecutableChannels { 031 032 033 private static final Logger logger = LogManager.getLogger(ExecutableChannels.class); 034 035 036 /** 037 * default port. 038 */ 039 protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT; 040 041 042 /** 043 * default machine file. 044 */ 045 protected final static String DEFAULT_MFILE = "examples/machines.test"; 046 047 048 protected final ChannelFactory cf; 049 050 051 protected SocketChannel[] channels = null; 052 053 054 protected String[] servers = null; 055 056 057 protected int[] ports = null; 058 059 060 /** 061 * Internal constructor. 062 */ 063 protected ExecutableChannels() { 064 cf = new ChannelFactory(); 065 cf.init(); 066 } 067 068 069 /** 070 * Constructor from array of server:port strings. 071 * @param srvs A String array. 072 */ 073 public ExecutableChannels(String[] srvs) { 074 this(); 075 if (srvs == null) { 076 return; 077 } 078 servers = new String[srvs.length]; 079 ports = new int[srvs.length]; 080 for (int i = 0; i < srvs.length; i++) { 081 setServerPort(i, srvs[i]); 082 } 083 } 084 085 086 /** 087 * Constructor from machine file. 088 * @param mfile 089 * @throws FileNotFoundException. 090 */ 091 public ExecutableChannels(String mfile) throws FileNotFoundException { 092 this(); 093 if (mfile == null || mfile.length() == 0) { 094 mfile = DEFAULT_MFILE; 095 } 096 InputStreamReader isr = new InputStreamReader(new FileInputStream(mfile),Charset.forName("UTF8")); 097 BufferedReader in = new BufferedReader(isr); 098 String line = null; 099 List<String> list = new ArrayList<String>(); 100 int x; 101 try { 102 while (true) { 103 if (!in.ready()) { 104 break; 105 } 106 line = in.readLine(); 107 if (line == null) { 108 break; 109 } 110 x = line.indexOf("#"); 111 if (x >= 0) { 112 line = line.substring(0, x); 113 } 114 line = line.trim(); 115 if (line.length() == 0) { 116 continue; 117 } 118 list.add(line); 119 } 120 } catch (IOException ignored) { 121 } finally { 122 try { 123 in.close(); 124 } catch (IOException ignore) { 125 } 126 } 127 logger.debug("list.size() in " + mfile + " = " + list.size()); 128 if (list.size() == 0) { 129 return; 130 } 131 servers = new String[list.size()]; 132 ports = new int[list.size()]; 133 for (int i = 0; i < servers.length; i++) { 134 setServerPort(i, list.get(i)); 135 } 136 } 137 138 139 /* 140 * internal method 141 */ 142 protected void setServerPort(int i, String srv) { 143 int x = srv.indexOf(":"); 144 ports[i] = DEFAULT_PORT; 145 if (x < 0) { 146 servers[i] = srv; 147 } else { 148 servers[i] = srv.substring(0, x); 149 String p = srv.substring(x + 1, srv.length()); 150 try { 151 ports[i] = Integer.parseInt(p); 152 } catch (NumberFormatException ignored) { 153 } 154 } 155 } 156 157 158 /** 159 * String representation. 160 */ 161 @Override 162 public String toString() { 163 StringBuffer s = new StringBuffer("ExecutableChannels("); 164 if (servers != null) { 165 for (int i = 0; i < servers.length; i++) { 166 s.append(servers[i] + ":" + ports[i]); 167 if (i < servers.length - 1) { 168 s.append(" "); 169 } 170 } 171 } 172 if (channels != null) { 173 s.append(" channels = "); 174 for (int i = 0; i < channels.length; i++) { 175 s.append(channels[i]); 176 if (i < channels.length - 1) { 177 s.append(" "); 178 } 179 } 180 } 181 s.append(")"); 182 return s.toString(); 183 } 184 185 186 /** 187 * number of servers. 188 */ 189 public int numServers() { 190 if (servers != null) { 191 return servers.length; 192 } 193 return -1; 194 } 195 196 197 /** 198 * get master host. 199 */ 200 public String getMasterHost() { 201 if (servers != null && servers.length > 0) { 202 return servers[0]; 203 } 204 return null; 205 } 206 207 208 /** 209 * get master port. 210 */ 211 public int getMasterPort() { 212 if (ports != null && ports.length > 0) { 213 return ports[0]; 214 } 215 return 0; 216 } 217 218 219 /** 220 * number of channels. 221 */ 222 public int numChannels() { 223 if (channels != null) { 224 return channels.length; 225 } 226 return -1; 227 } 228 229 230 /** 231 * open, setup of SocketChannels. 232 * @throws IOException. 233 */ 234 public void open() throws IOException { 235 logger.debug("opening " + servers.length + " channels"); 236 if (servers.length <= 1) { 237 throw new IOException("to few servers"); 238 } 239 channels = new SocketChannel[servers.length - 1]; 240 for (int i = 1; i < servers.length; i++) { 241 channels[i - 1] = cf.getChannel(servers[i], ports[i]); 242 } 243 } 244 245 246 /** 247 * open, setup of SocketChannels. If nc > servers.length open in round 248 * robin fashion. 249 * @param nc number of channels to open. 250 * @throws IOException. 251 */ 252 public void open(int nc) throws IOException { 253 logger.debug("opening " + nc + " channels"); 254 if (servers.length <= 1) { 255 throw new IOException("to few servers"); 256 } 257 channels = new SocketChannel[nc]; 258 int j = 1; // 0 is master 259 for (int i = 0; i < channels.length; i++) { 260 if (j >= servers.length) { // modulo #servers 261 j = 1; 262 } 263 channels[i] = cf.getChannel(servers[j], ports[j]); 264 j++; 265 } 266 } 267 268 269 /** 270 * close all channels and ChannelFactory. 271 */ 272 public void close() { 273 logger.debug("closing ExecutableChannels"); 274 if (cf != null) { 275 cf.terminate(); 276 } 277 if (channels != null) { 278 for (int i = 0; i < channels.length; i++) { 279 if (channels[i] != null) { 280 try { 281 channels[i].send(ExecutableServer.STOP); 282 } catch (IOException e) { 283 if (logger.isDebugEnabled()) { 284 e.printStackTrace(); 285 } 286 } finally { 287 channels[i].close(); 288 } 289 channels[i] = null; 290 } 291 } 292 channels = null; 293 } 294 logger.debug("ExecuteChannels closed"); 295 } 296 297 298 /** 299 * getChannel. 300 * @param i channel number. 301 */ 302 public SocketChannel getChannel(int i) { 303 if (channels != null && 0 <= i && i < channels.length) { 304 return channels[i]; 305 } 306 return null; 307 } 308 309 310 /** 311 * getChannels. 312 */ 313 /*package*/SocketChannel[] getChannels() { 314 return channels; 315 } 316 317 318 /** 319 * send on channel i. 320 * @param i channel number. 321 * @param o object to send. 322 */ 323 public void send(int i, Object o) throws IOException { 324 if (channels != null && 0 <= i && i < channels.length) { 325 channels[i].send(o); 326 } 327 } 328 329 330 /** 331 * recieve on channel i. 332 * @param i channel number. 333 * @return object recieved. 334 */ 335 public Object receive(int i) throws IOException, ClassNotFoundException { 336 if (channels != null && 0 <= i && i < channels.length) { 337 return channels[i].receive(); 338 } 339 return null; 340 341 } 342 343}