001    /*
002     * $Id: ExecutableChannels.java,v 1.2 2004/09/19 10:33:04 kredel Exp $
003     */
004    
005    //package edu.jas.util;
006    package comm;
007    
008    import util.Logger;
009    
010    import java.io.IOException;
011    import java.io.FileNotFoundException;
012    //import java.io.Reader;
013    import java.io.FileReader;
014    import java.io.BufferedReader;
015    
016    //import java.util.Iterator;
017    import java.util.List;
018    import java.util.ArrayList;
019    
020    //import org.apache.log4j.Logger;
021    
022    //import edu.unima.ky.parallel.ChannelFactory;
023    //import edu.unima.ky.parallel.SocketChannel;
024    
025    /**
026     * Class ExecutableChannels.
027     * Used to establish channels to peer servers 
028     * and to provide send and receive methods to each peer.
029     * @author Heinz Kredel.
030     */
031    public class ExecutableChannels {
032    
033        // private static Logger logger = Logger.getLogger(ExecutableChannels.class);
034        private static Logger logger = new Logger();
035    
036        protected final ChannelFactory cf;
037        protected SocketChannel[] channels = null;
038        protected String[] servers = null;
039        protected int[] ports = null;
040        protected final int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT;
041    
042    
043        protected ExecutableChannels() {
044            cf = new ChannelFactory();
045        }
046    
047    
048    /**
049     * Constructor from array of server:port strings.
050     * @param srvs server:port array.
051     */
052        public ExecutableChannels(String[] srvs) {
053            this();
054            if ( srvs == null ) {
055                return;
056            }
057            servers = new String[ srvs.length ];
058            ports = new int[ srvs.length ];
059            for ( int i = 0; i < srvs.length; i++ ) {
060                setServerPort( i, srvs[i] );
061            }
062        }
063    
064    
065        /**
066         * Constructor from maschine file.
067         * @param fname name of machine file.
068         * @throws FileNotFoundException.
069         */
070        public ExecutableChannels(String fname) throws FileNotFoundException {
071            this();
072            BufferedReader in = new BufferedReader( new FileReader( fname ) );
073            String line = null;
074            List list = new ArrayList();
075            int x;
076            try {
077                while (true) {
078                   if ( !in.ready() ) {
079                      break;
080                   }
081                   line = in.readLine();
082                   x = line.indexOf("#");
083                   if ( x >= 0 ) {
084                       line = line.substring(0,x);
085                   }
086                   line = line.trim();
087                   if ( line.length() == 0 ) {
088                       continue;
089                   }
090                   list.add(line);
091                }
092            } catch (IOException e) {
093            }
094            logger.debug("list.size() in " + fname + " = " + list.size());
095            if ( list.size() == 0 ) {
096                return;
097            }
098            servers = new String[ list.size() ];
099            ports = new int[ list.size() ];
100            for ( int i = 0; i < servers.length; i++ ) {
101                setServerPort( i, (String)list.get( i ) );
102            }
103        }
104    
105    
106        /* 
107         * internal method to fill ports array.
108         */
109        protected void setServerPort(int i, String srv) {
110            int x = srv.indexOf(":");
111            ports[i] = DEFAULT_PORT;
112            if ( x < 0 ) {
113               servers[i] = srv;
114            } else {
115               servers[i] = srv.substring(0,x);
116               String p = srv.substring(x+1,srv.length());
117               try { 
118                    ports[i] = Integer.parseInt( p );
119               } catch (NumberFormatException ignored) {
120               } 
121            }
122        }
123    
124    
125    /**
126     * String representation.
127     */ 
128        public String toString() {
129            StringBuffer s = new StringBuffer("ExecutableChannels(");
130            if ( servers != null ) {
131               for ( int i = 0; i < servers.length; i++ ) {
132                   s.append( servers[i] + ":" + ports[i] );
133                   if ( i < servers.length-1 ) {
134                      s.append(" ");
135                   }
136               }
137            }
138            s.append(")");
139            return s.toString();
140        }
141    
142    
143    /**
144     * Number of servers.
145     * @return server.length or -1 if not jet initialized.
146     */
147        public int numServers() {
148            if ( servers != null ) {
149               return servers.length;
150            } else {
151                return -1;
152            }
153        }
154    
155    
156    /**
157     * Get master host, i.e. first host in servers array.
158     * @return servers[0] or null if not jet initialized.
159     */
160        public String getMasterHost() {
161            if ( servers != null && servers.length > 0 ) {
162               return servers[0];
163            } else {
164                return null;
165            }
166        }
167    
168    
169    /**
170     * Get master port.
171     * @return port for master host or 0 if not jet initialized.
172     */
173        public int getMasterPort() {
174            if ( ports != null && ports.length > 0 ) {
175               return ports[0];
176            } else {
177                return 0;
178            }
179        }
180    
181    
182    /**
183     * Number of channels.
184     * @return channel.length or -1 if not jet initialized.
185     */
186        public int numChannels() {
187            if ( channels != null ) {
188               return channels.length;
189            } else {
190                return -1;
191            }
192        }
193    
194    
195    /**
196     * Open, setup of SocketChannels.
197     * Opens a channel to each server peer, except master.
198     */ 
199        public void open() throws IOException {
200            logger.debug("opening " + servers.length + " channels");
201            if ( servers.length <= 1 ) {
202                throw new IOException("to few servers");
203            }
204            channels = new SocketChannel[ servers.length-1 ];
205            for ( int i = 1; i < servers.length; i++ ) {
206                channels[i-1] = cf.getChannel( servers[i], ports[i] );
207            }
208        }
209    
210    
211    /**
212     * Open, setup of SocketChannels.
213     * If number of channels is larger than number of server peer, 
214     * open channels in round robin fashion.
215     * No channel to master is opened.
216     * @param nc number of channels to open.
217     * @throws IOException.
218     */
219        public void open(int nc) throws IOException {
220            logger.debug("opening " + nc + " channels");
221            if ( servers.length <= 1 ) {
222                throw new IOException("to few servers");
223            }
224            channels = new SocketChannel[ nc ];
225            int j = 1; // 0 is master
226            for ( int i = 0; i < channels.length; i++ ) {
227                if ( j >= servers.length ) { // modulo #servers
228                    j = 1;
229                }
230                channels[i] = cf.getChannel( servers[j], ports[j] );
231                j++;
232           }
233        }
234    
235    
236    /**
237     * Close all channels and ChannelFactory.
238     */ 
239        public void close() {
240            logger.debug("closing ExecutableChannels");
241            if ( cf != null ) cf.terminate();
242            if ( channels != null ) {
243                for ( int i = 0; i < channels.length; i++ ) {
244                   if ( channels[i] != null ) {
245                      channels[i].close();
246                      channels[i] = null;
247                   }
248               }
249               channels = null;
250            }
251            logger.debug("ExecuteChannels closed");
252        }
253    
254    
255    /**
256     * Get channel.
257     * @param i channel number.
258     */ 
259        public SocketChannel getChannel(int i) {
260            if ( channels != null && 0 <= i && i < channels.length ) {
261               return channels[i];
262            } else {
263                return null;
264            }
265        }
266    
267    
268    /**
269     * Get channel array.
270     * @return channel array.
271     */
272        public SocketChannel[] getChannels() {
273            return channels;
274        }
275    
276    
277    /**
278     * Send on channel i.
279     * @param i channel number.
280     * @param o object to send.
281     */ 
282        public void send(int i, Object o) throws IOException {
283            if ( channels != null && 0 <= i && i < channels.length ) {
284               channels[i].send(o);
285            } 
286        }
287    
288    
289    /**
290     * Recieve on channel i.
291     * @param i channel number.
292     * @return object recieved.
293     */ 
294        public Object receive(int i) throws IOException, ClassNotFoundException {
295            if ( channels != null && 0 <= i && i < channels.length ) {
296               return channels[i].receive();
297            } else {
298                return null;
299            }
300    
301        }
302    
303    }