001 /*
002 * $Id: ExecutableChannels.java,v 1.2 2004/09/19 10:33:04 kredel Exp $
003 */
004
005 //package edu.jas.util;
006 package comm;
007
008 import util.Logger;
009
010 import java.io.IOException;
011 import java.io.FileNotFoundException;
012 //import java.io.Reader;
013 import java.io.FileReader;
014 import java.io.BufferedReader;
015
016 //import java.util.Iterator;
017 import java.util.List;
018 import java.util.ArrayList;
019
020 //import org.apache.log4j.Logger;
021
022 //import edu.unima.ky.parallel.ChannelFactory;
023 //import edu.unima.ky.parallel.SocketChannel;
024
025 /**
026 * Class ExecutableChannels.
027 * Used to establish channels to peer servers
028 * and to provide send and receive methods to each peer.
029 * @author Heinz Kredel.
030 */
031 public class ExecutableChannels {
032
033 // private static Logger logger = Logger.getLogger(ExecutableChannels.class);
034 private static Logger logger = new Logger();
035
036 protected final ChannelFactory cf;
037 protected SocketChannel[] channels = null;
038 protected String[] servers = null;
039 protected int[] ports = null;
040 protected final int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT;
041
042
043 protected ExecutableChannels() {
044 cf = new ChannelFactory();
045 }
046
047
048 /**
049 * Constructor from array of server:port strings.
050 * @param srvs server:port array.
051 */
052 public ExecutableChannels(String[] srvs) {
053 this();
054 if ( srvs == null ) {
055 return;
056 }
057 servers = new String[ srvs.length ];
058 ports = new int[ srvs.length ];
059 for ( int i = 0; i < srvs.length; i++ ) {
060 setServerPort( i, srvs[i] );
061 }
062 }
063
064
065 /**
066 * Constructor from maschine file.
067 * @param fname name of machine file.
068 * @throws FileNotFoundException.
069 */
070 public ExecutableChannels(String fname) throws FileNotFoundException {
071 this();
072 BufferedReader in = new BufferedReader( new FileReader( fname ) );
073 String line = null;
074 List list = new ArrayList();
075 int x;
076 try {
077 while (true) {
078 if ( !in.ready() ) {
079 break;
080 }
081 line = in.readLine();
082 x = line.indexOf("#");
083 if ( x >= 0 ) {
084 line = line.substring(0,x);
085 }
086 line = line.trim();
087 if ( line.length() == 0 ) {
088 continue;
089 }
090 list.add(line);
091 }
092 } catch (IOException e) {
093 }
094 logger.debug("list.size() in " + fname + " = " + list.size());
095 if ( list.size() == 0 ) {
096 return;
097 }
098 servers = new String[ list.size() ];
099 ports = new int[ list.size() ];
100 for ( int i = 0; i < servers.length; i++ ) {
101 setServerPort( i, (String)list.get( i ) );
102 }
103 }
104
105
106 /*
107 * internal method to fill ports array.
108 */
109 protected void setServerPort(int i, String srv) {
110 int x = srv.indexOf(":");
111 ports[i] = DEFAULT_PORT;
112 if ( x < 0 ) {
113 servers[i] = srv;
114 } else {
115 servers[i] = srv.substring(0,x);
116 String p = srv.substring(x+1,srv.length());
117 try {
118 ports[i] = Integer.parseInt( p );
119 } catch (NumberFormatException ignored) {
120 }
121 }
122 }
123
124
125 /**
126 * String representation.
127 */
128 public String toString() {
129 StringBuffer s = new StringBuffer("ExecutableChannels(");
130 if ( servers != null ) {
131 for ( int i = 0; i < servers.length; i++ ) {
132 s.append( servers[i] + ":" + ports[i] );
133 if ( i < servers.length-1 ) {
134 s.append(" ");
135 }
136 }
137 }
138 s.append(")");
139 return s.toString();
140 }
141
142
143 /**
144 * Number of servers.
145 * @return server.length or -1 if not jet initialized.
146 */
147 public int numServers() {
148 if ( servers != null ) {
149 return servers.length;
150 } else {
151 return -1;
152 }
153 }
154
155
156 /**
157 * Get master host, i.e. first host in servers array.
158 * @return servers[0] or null if not jet initialized.
159 */
160 public String getMasterHost() {
161 if ( servers != null && servers.length > 0 ) {
162 return servers[0];
163 } else {
164 return null;
165 }
166 }
167
168
169 /**
170 * Get master port.
171 * @return port for master host or 0 if not jet initialized.
172 */
173 public int getMasterPort() {
174 if ( ports != null && ports.length > 0 ) {
175 return ports[0];
176 } else {
177 return 0;
178 }
179 }
180
181
182 /**
183 * Number of channels.
184 * @return channel.length or -1 if not jet initialized.
185 */
186 public int numChannels() {
187 if ( channels != null ) {
188 return channels.length;
189 } else {
190 return -1;
191 }
192 }
193
194
195 /**
196 * Open, setup of SocketChannels.
197 * Opens a channel to each server peer, except master.
198 */
199 public void open() throws IOException {
200 logger.debug("opening " + servers.length + " channels");
201 if ( servers.length <= 1 ) {
202 throw new IOException("to few servers");
203 }
204 channels = new SocketChannel[ servers.length-1 ];
205 for ( int i = 1; i < servers.length; i++ ) {
206 channels[i-1] = cf.getChannel( servers[i], ports[i] );
207 }
208 }
209
210
211 /**
212 * Open, setup of SocketChannels.
213 * If number of channels is larger than number of server peer,
214 * open channels in round robin fashion.
215 * No channel to master is opened.
216 * @param nc number of channels to open.
217 * @throws IOException.
218 */
219 public void open(int nc) throws IOException {
220 logger.debug("opening " + nc + " channels");
221 if ( servers.length <= 1 ) {
222 throw new IOException("to few servers");
223 }
224 channels = new SocketChannel[ nc ];
225 int j = 1; // 0 is master
226 for ( int i = 0; i < channels.length; i++ ) {
227 if ( j >= servers.length ) { // modulo #servers
228 j = 1;
229 }
230 channels[i] = cf.getChannel( servers[j], ports[j] );
231 j++;
232 }
233 }
234
235
236 /**
237 * Close all channels and ChannelFactory.
238 */
239 public void close() {
240 logger.debug("closing ExecutableChannels");
241 if ( cf != null ) cf.terminate();
242 if ( channels != null ) {
243 for ( int i = 0; i < channels.length; i++ ) {
244 if ( channels[i] != null ) {
245 channels[i].close();
246 channels[i] = null;
247 }
248 }
249 channels = null;
250 }
251 logger.debug("ExecuteChannels closed");
252 }
253
254
255 /**
256 * Get channel.
257 * @param i channel number.
258 */
259 public SocketChannel getChannel(int i) {
260 if ( channels != null && 0 <= i && i < channels.length ) {
261 return channels[i];
262 } else {
263 return null;
264 }
265 }
266
267
268 /**
269 * Get channel array.
270 * @return channel array.
271 */
272 public SocketChannel[] getChannels() {
273 return channels;
274 }
275
276
277 /**
278 * Send on channel i.
279 * @param i channel number.
280 * @param o object to send.
281 */
282 public void send(int i, Object o) throws IOException {
283 if ( channels != null && 0 <= i && i < channels.length ) {
284 channels[i].send(o);
285 }
286 }
287
288
289 /**
290 * Recieve on channel i.
291 * @param i channel number.
292 * @return object recieved.
293 */
294 public Object receive(int i) throws IOException, ClassNotFoundException {
295 if ( channels != null && 0 <= i && i < channels.length ) {
296 return channels[i].receive();
297 } else {
298 return null;
299 }
300
301 }
302
303 }