001    /*
002     * $Id: ExecutableChannels.java 3337 2010-09-27 21:05:17Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    
008    import java.io.BufferedReader;
009    import java.io.FileNotFoundException;
010    import java.io.FileReader;
011    import java.io.IOException;
012    import java.util.ArrayList;
013    import java.util.List;
014    
015    import org.apache.log4j.Logger;
016    
017    
018    // import edu.unima.ky.parallel.ChannelFactory;
019    // import edu.unima.ky.parallel.SocketChannel;
020    
021    /**
022     * ExecutableChannels used to receive and execute classes.
023     * @author Heinz Kredel
024     */
025    
026    
027    public class ExecutableChannels {
028    
029    
030        private static final Logger logger = Logger.getLogger(ExecutableChannels.class);
031    
032    
033        /**
034         * default port.
035         */
036        protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT;
037    
038    
039        /**
040         * default machine file.
041         */
042        protected final static String DEFAULT_MFILE = "examples/machines.test";
043    
044    
045        protected final ChannelFactory cf;
046    
047    
048        protected SocketChannel[] channels = null;
049    
050    
051        protected String[] servers = null;
052    
053    
054        protected int[] ports = null;
055    
056    
057        /**
058         * Internal constructor.
059         */
060        protected ExecutableChannels() {
061            cf = new ChannelFactory();
062            cf.init();
063        }
064    
065    
066        /**
067         * Constructor from array of server:port strings.
068         * @param srvs A String array.
069         */
070        public ExecutableChannels(String[] srvs) {
071            this();
072            if (srvs == null) {
073                return;
074            }
075            servers = new String[srvs.length];
076            ports = new int[srvs.length];
077            for (int i = 0; i < srvs.length; i++) {
078                setServerPort(i, srvs[i]);
079            }
080        }
081    
082    
083        /**
084         * Constructor from machine file.
085         * @param mfile
086         * @throws FileNotFoundException.
087         */
088        public ExecutableChannels(String mfile) throws FileNotFoundException {
089            this();
090            if (mfile == null || mfile.length() == 0) {
091                mfile = DEFAULT_MFILE;
092            }
093            BufferedReader in = new BufferedReader(new FileReader(mfile));
094            String line = null;
095            List<String> list = new ArrayList<String>();
096            int x;
097            try {
098                while (true) {
099                    if (!in.ready()) {
100                        break;
101                    }
102                    line = in.readLine();
103                    if (line == null) {
104                        break;
105                    }
106                    x = line.indexOf("#");
107                    if (x >= 0) {
108                        line = line.substring(0, x);
109                    }
110                    line = line.trim();
111                    if (line.length() == 0) {
112                        continue;
113                    }
114                    list.add(line);
115                }
116            } catch (IOException ignored) {
117            } finally {
118                try {
119                    in.close();
120                } catch (IOException ignore) {
121                }
122            }
123            logger.debug("list.size() in " + mfile + " = " + list.size());
124            if (list.size() == 0) {
125                return;
126            }
127            servers = new String[list.size()];
128            ports = new int[list.size()];
129            for (int i = 0; i < servers.length; i++) {
130                setServerPort(i, list.get(i));
131            }
132        }
133    
134    
135        /* 
136         * internal method
137         */
138        protected void setServerPort(int i, String srv) {
139            int x = srv.indexOf(":");
140            ports[i] = DEFAULT_PORT;
141            if (x < 0) {
142                servers[i] = srv;
143            } else {
144                servers[i] = srv.substring(0, x);
145                String p = srv.substring(x + 1, srv.length());
146                try {
147                    ports[i] = Integer.parseInt(p);
148                } catch (NumberFormatException ignored) {
149                }
150            }
151        }
152    
153    
154        /**
155         * String representation.
156         */
157        @Override
158        public String toString() {
159            StringBuffer s = new StringBuffer("ExecutableChannels(");
160            if (servers != null) {
161                for (int i = 0; i < servers.length; i++) {
162                    s.append(servers[i] + ":" + ports[i]);
163                    if (i < servers.length - 1) {
164                        s.append(" ");
165                    }
166                }
167            }
168            if (channels != null) {
169                s.append(" channels = ");
170                for (int i = 0; i < channels.length; i++) {
171                    s.append(channels[i]);
172                    if (i < channels.length - 1) {
173                        s.append(" ");
174                    }
175                }
176            }
177            s.append(")");
178            return s.toString();
179        }
180    
181    
182        /**
183         * number of servers.
184         */
185        public int numServers() {
186            if (servers != null) {
187                return servers.length;
188            }
189            return -1;
190        }
191    
192    
193        /**
194         * get master host.
195         */
196        public String getMasterHost() {
197            if (servers != null && servers.length > 0) {
198                return servers[0];
199            }
200            return null;
201        }
202    
203    
204        /**
205         * get master port.
206         */
207        public int getMasterPort() {
208            if (ports != null && ports.length > 0) {
209                return ports[0];
210            }
211            return 0;
212        }
213    
214    
215        /**
216         * number of channels.
217         */
218        public int numChannels() {
219            if (channels != null) {
220                return channels.length;
221            }
222            return -1;
223        }
224    
225    
226        /**
227         * open, setup of SocketChannels.
228         * @throws IOException.
229         */
230        public void open() throws IOException {
231            logger.debug("opening " + servers.length + " channels");
232            if (servers.length <= 1) {
233                throw new IOException("to few servers");
234            }
235            channels = new SocketChannel[servers.length - 1];
236            for (int i = 1; i < servers.length; i++) {
237                channels[i - 1] = cf.getChannel(servers[i], ports[i]);
238            }
239        }
240    
241    
242        /**
243         * open, setup of SocketChannels. If nc &gt; servers.length open in round
244         * robin fashion.
245         * @param nc number of channels to open.
246         * @throws IOException.
247         */
248        public void open(int nc) throws IOException {
249            logger.debug("opening " + nc + " channels");
250            if (servers.length <= 1) {
251                throw new IOException("to few servers");
252            }
253            channels = new SocketChannel[nc];
254            int j = 1; // 0 is master
255            for (int i = 0; i < channels.length; i++) {
256                if (j >= servers.length) { // modulo #servers
257                    j = 1;
258                }
259                channels[i] = cf.getChannel(servers[j], ports[j]);
260                j++;
261            }
262        }
263    
264    
265        /**
266         * close all channels and ChannelFactory.
267         */
268        public void close() {
269            logger.debug("closing ExecutableChannels");
270            if (cf != null) {
271                cf.terminate();
272            }
273            if (channels != null) {
274                for (int i = 0; i < channels.length; i++) {
275                    if (channels[i] != null) {
276                        try {
277                            channels[i].send(ExecutableServer.STOP);
278                        } catch (IOException e) {
279                            e.printStackTrace();
280                        } finally {
281                            channels[i].close();
282                        }
283                        channels[i] = null;
284                    }
285                }
286                channels = null;
287            }
288            logger.debug("ExecuteChannels closed");
289        }
290    
291    
292        /**
293         * getChannel.
294         * @param i channel number.
295         */
296        public SocketChannel getChannel(int i) {
297            if (channels != null && 0 <= i && i < channels.length) {
298                return channels[i];
299            }
300            return null;
301        }
302    
303    
304        /**
305         * getChannels.
306         */
307        /*package*/SocketChannel[] getChannels() {
308            return channels;
309        }
310    
311    
312        /**
313         * send on channel i.
314         * @param i channel number.
315         * @param o object to send.
316         */
317        public void send(int i, Object o) throws IOException {
318            if (channels != null && 0 <= i && i < channels.length) {
319                channels[i].send(o);
320            }
321        }
322    
323    
324        /**
325         * recieve on channel i.
326         * @param i channel number.
327         * @return object recieved.
328         */
329        public Object receive(int i) throws IOException, ClassNotFoundException {
330            if (channels != null && 0 <= i && i < channels.length) {
331                return channels[i].receive();
332            }
333            return null;
334    
335        }
336    
337    }