001 /* 002 * $Id: ExecutableChannels.java,v 1.2 2004/09/19 10:33:04 kredel Exp $ 003 */ 004 005 //package edu.jas.util; 006 package comm; 007 008 import util.Logger; 009 010 import java.io.IOException; 011 import java.io.FileNotFoundException; 012 //import java.io.Reader; 013 import java.io.FileReader; 014 import java.io.BufferedReader; 015 016 //import java.util.Iterator; 017 import java.util.List; 018 import java.util.ArrayList; 019 020 //import org.apache.log4j.Logger; 021 022 //import edu.unima.ky.parallel.ChannelFactory; 023 //import edu.unima.ky.parallel.SocketChannel; 024 025 /** 026 * Class ExecutableChannels. 027 * Used to establish channels to peer servers 028 * and to provide send and receive methods to each peer. 029 * @author Heinz Kredel. 030 */ 031 public class ExecutableChannels { 032 033 // private static Logger logger = Logger.getLogger(ExecutableChannels.class); 034 private static Logger logger = new Logger(); 035 036 protected final ChannelFactory cf; 037 protected SocketChannel[] channels = null; 038 protected String[] servers = null; 039 protected int[] ports = null; 040 protected final int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT; 041 042 043 protected ExecutableChannels() { 044 cf = new ChannelFactory(); 045 } 046 047 048 /** 049 * Constructor from array of server:port strings. 050 * @param srvs server:port array. 051 */ 052 public ExecutableChannels(String[] srvs) { 053 this(); 054 if ( srvs == null ) { 055 return; 056 } 057 servers = new String[ srvs.length ]; 058 ports = new int[ srvs.length ]; 059 for ( int i = 0; i < srvs.length; i++ ) { 060 setServerPort( i, srvs[i] ); 061 } 062 } 063 064 065 /** 066 * Constructor from maschine file. 067 * @param fname name of machine file. 068 * @throws FileNotFoundException. 069 */ 070 public ExecutableChannels(String fname) throws FileNotFoundException { 071 this(); 072 BufferedReader in = new BufferedReader( new FileReader( fname ) ); 073 String line = null; 074 List list = new ArrayList(); 075 int x; 076 try { 077 while (true) { 078 if ( !in.ready() ) { 079 break; 080 } 081 line = in.readLine(); 082 x = line.indexOf("#"); 083 if ( x >= 0 ) { 084 line = line.substring(0,x); 085 } 086 line = line.trim(); 087 if ( line.length() == 0 ) { 088 continue; 089 } 090 list.add(line); 091 } 092 } catch (IOException e) { 093 } 094 logger.debug("list.size() in " + fname + " = " + list.size()); 095 if ( list.size() == 0 ) { 096 return; 097 } 098 servers = new String[ list.size() ]; 099 ports = new int[ list.size() ]; 100 for ( int i = 0; i < servers.length; i++ ) { 101 setServerPort( i, (String)list.get( i ) ); 102 } 103 } 104 105 106 /* 107 * internal method to fill ports array. 108 */ 109 protected void setServerPort(int i, String srv) { 110 int x = srv.indexOf(":"); 111 ports[i] = DEFAULT_PORT; 112 if ( x < 0 ) { 113 servers[i] = srv; 114 } else { 115 servers[i] = srv.substring(0,x); 116 String p = srv.substring(x+1,srv.length()); 117 try { 118 ports[i] = Integer.parseInt( p ); 119 } catch (NumberFormatException ignored) { 120 } 121 } 122 } 123 124 125 /** 126 * String representation. 127 */ 128 public String toString() { 129 StringBuffer s = new StringBuffer("ExecutableChannels("); 130 if ( servers != null ) { 131 for ( int i = 0; i < servers.length; i++ ) { 132 s.append( servers[i] + ":" + ports[i] ); 133 if ( i < servers.length-1 ) { 134 s.append(" "); 135 } 136 } 137 } 138 s.append(")"); 139 return s.toString(); 140 } 141 142 143 /** 144 * Number of servers. 145 * @return server.length or -1 if not jet initialized. 146 */ 147 public int numServers() { 148 if ( servers != null ) { 149 return servers.length; 150 } else { 151 return -1; 152 } 153 } 154 155 156 /** 157 * Get master host, i.e. first host in servers array. 158 * @return servers[0] or null if not jet initialized. 159 */ 160 public String getMasterHost() { 161 if ( servers != null && servers.length > 0 ) { 162 return servers[0]; 163 } else { 164 return null; 165 } 166 } 167 168 169 /** 170 * Get master port. 171 * @return port for master host or 0 if not jet initialized. 172 */ 173 public int getMasterPort() { 174 if ( ports != null && ports.length > 0 ) { 175 return ports[0]; 176 } else { 177 return 0; 178 } 179 } 180 181 182 /** 183 * Number of channels. 184 * @return channel.length or -1 if not jet initialized. 185 */ 186 public int numChannels() { 187 if ( channels != null ) { 188 return channels.length; 189 } else { 190 return -1; 191 } 192 } 193 194 195 /** 196 * Open, setup of SocketChannels. 197 * Opens a channel to each server peer, except master. 198 */ 199 public void open() throws IOException { 200 logger.debug("opening " + servers.length + " channels"); 201 if ( servers.length <= 1 ) { 202 throw new IOException("to few servers"); 203 } 204 channels = new SocketChannel[ servers.length-1 ]; 205 for ( int i = 1; i < servers.length; i++ ) { 206 channels[i-1] = cf.getChannel( servers[i], ports[i] ); 207 } 208 } 209 210 211 /** 212 * Open, setup of SocketChannels. 213 * If number of channels is larger than number of server peer, 214 * open channels in round robin fashion. 215 * No channel to master is opened. 216 * @param nc number of channels to open. 217 * @throws IOException. 218 */ 219 public void open(int nc) throws IOException { 220 logger.debug("opening " + nc + " channels"); 221 if ( servers.length <= 1 ) { 222 throw new IOException("to few servers"); 223 } 224 channels = new SocketChannel[ nc ]; 225 int j = 1; // 0 is master 226 for ( int i = 0; i < channels.length; i++ ) { 227 if ( j >= servers.length ) { // modulo #servers 228 j = 1; 229 } 230 channels[i] = cf.getChannel( servers[j], ports[j] ); 231 j++; 232 } 233 } 234 235 236 /** 237 * Close all channels and ChannelFactory. 238 */ 239 public void close() { 240 logger.debug("closing ExecutableChannels"); 241 if ( cf != null ) cf.terminate(); 242 if ( channels != null ) { 243 for ( int i = 0; i < channels.length; i++ ) { 244 if ( channels[i] != null ) { 245 channels[i].close(); 246 channels[i] = null; 247 } 248 } 249 channels = null; 250 } 251 logger.debug("ExecuteChannels closed"); 252 } 253 254 255 /** 256 * Get channel. 257 * @param i channel number. 258 */ 259 public SocketChannel getChannel(int i) { 260 if ( channels != null && 0 <= i && i < channels.length ) { 261 return channels[i]; 262 } else { 263 return null; 264 } 265 } 266 267 268 /** 269 * Get channel array. 270 * @return channel array. 271 */ 272 public SocketChannel[] getChannels() { 273 return channels; 274 } 275 276 277 /** 278 * Send on channel i. 279 * @param i channel number. 280 * @param o object to send. 281 */ 282 public void send(int i, Object o) throws IOException { 283 if ( channels != null && 0 <= i && i < channels.length ) { 284 channels[i].send(o); 285 } 286 } 287 288 289 /** 290 * Recieve on channel i. 291 * @param i channel number. 292 * @return object recieved. 293 */ 294 public Object receive(int i) throws IOException, ClassNotFoundException { 295 if ( channels != null && 0 <= i && i < channels.length ) { 296 return channels[i].receive(); 297 } else { 298 return null; 299 } 300 301 } 302 303 }