001/*
002 * $Id: ExecutableChannels.java 4078 2012-07-28 12:40:12Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.BufferedReader;
009import java.io.FileNotFoundException;
010import java.io.FileInputStream;
011import java.io.InputStreamReader;
012import java.io.IOException;
013import java.nio.charset.Charset;
014import java.util.ArrayList;
015import java.util.List;
016
017import org.apache.log4j.Logger;
018
019
020// import edu.unima.ky.parallel.ChannelFactory;
021// import edu.unima.ky.parallel.SocketChannel;
022
023/**
024 * ExecutableChannels used to receive and execute classes.
025 * @author Heinz Kredel
026 */
027
028
029public class ExecutableChannels {
030
031
032    private static final Logger logger = Logger.getLogger(ExecutableChannels.class);
033
034
035    /**
036     * default port.
037     */
038    protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT;
039
040
041    /**
042     * default machine file.
043     */
044    protected final static String DEFAULT_MFILE = "examples/machines.test";
045
046
047    protected final ChannelFactory cf;
048
049
050    protected SocketChannel[] channels = null;
051
052
053    protected String[] servers = null;
054
055
056    protected int[] ports = null;
057
058
059    /**
060     * Internal constructor.
061     */
062    protected ExecutableChannels() {
063        cf = new ChannelFactory();
064        cf.init();
065    }
066
067
068    /**
069     * Constructor from array of server:port strings.
070     * @param srvs A String array.
071     */
072    public ExecutableChannels(String[] srvs) {
073        this();
074        if (srvs == null) {
075            return;
076        }
077        servers = new String[srvs.length];
078        ports = new int[srvs.length];
079        for (int i = 0; i < srvs.length; i++) {
080            setServerPort(i, srvs[i]);
081        }
082    }
083
084
085    /**
086     * Constructor from machine file.
087     * @param mfile
088     * @throws FileNotFoundException.
089     */
090    public ExecutableChannels(String mfile) throws FileNotFoundException {
091        this();
092        if (mfile == null || mfile.length() == 0) {
093            mfile = DEFAULT_MFILE;
094        }
095        InputStreamReader isr = new InputStreamReader(new FileInputStream(mfile),Charset.forName("UTF8"));
096        BufferedReader in = new BufferedReader(isr);
097        String line = null;
098        List<String> list = new ArrayList<String>();
099        int x;
100        try {
101            while (true) {
102                if (!in.ready()) {
103                    break;
104                }
105                line = in.readLine();
106                if (line == null) {
107                    break;
108                }
109                x = line.indexOf("#");
110                if (x >= 0) {
111                    line = line.substring(0, x);
112                }
113                line = line.trim();
114                if (line.length() == 0) {
115                    continue;
116                }
117                list.add(line);
118            }
119        } catch (IOException ignored) {
120        } finally {
121            try {
122                in.close();
123            } catch (IOException ignore) {
124            }
125        }
126        logger.debug("list.size() in " + mfile + " = " + list.size());
127        if (list.size() == 0) {
128            return;
129        }
130        servers = new String[list.size()];
131        ports = new int[list.size()];
132        for (int i = 0; i < servers.length; i++) {
133            setServerPort(i, list.get(i));
134        }
135    }
136
137
138    /* 
139     * internal method
140     */
141    protected void setServerPort(int i, String srv) {
142        int x = srv.indexOf(":");
143        ports[i] = DEFAULT_PORT;
144        if (x < 0) {
145            servers[i] = srv;
146        } else {
147            servers[i] = srv.substring(0, x);
148            String p = srv.substring(x + 1, srv.length());
149            try {
150                ports[i] = Integer.parseInt(p);
151            } catch (NumberFormatException ignored) {
152            }
153        }
154    }
155
156
157    /**
158     * String representation.
159     */
160    @Override
161    public String toString() {
162        StringBuffer s = new StringBuffer("ExecutableChannels(");
163        if (servers != null) {
164            for (int i = 0; i < servers.length; i++) {
165                s.append(servers[i] + ":" + ports[i]);
166                if (i < servers.length - 1) {
167                    s.append(" ");
168                }
169            }
170        }
171        if (channels != null) {
172            s.append(" channels = ");
173            for (int i = 0; i < channels.length; i++) {
174                s.append(channels[i]);
175                if (i < channels.length - 1) {
176                    s.append(" ");
177                }
178            }
179        }
180        s.append(")");
181        return s.toString();
182    }
183
184
185    /**
186     * number of servers.
187     */
188    public int numServers() {
189        if (servers != null) {
190            return servers.length;
191        }
192        return -1;
193    }
194
195
196    /**
197     * get master host.
198     */
199    public String getMasterHost() {
200        if (servers != null && servers.length > 0) {
201            return servers[0];
202        }
203        return null;
204    }
205
206
207    /**
208     * get master port.
209     */
210    public int getMasterPort() {
211        if (ports != null && ports.length > 0) {
212            return ports[0];
213        }
214        return 0;
215    }
216
217
218    /**
219     * number of channels.
220     */
221    public int numChannels() {
222        if (channels != null) {
223            return channels.length;
224        }
225        return -1;
226    }
227
228
229    /**
230     * open, setup of SocketChannels.
231     * @throws IOException.
232     */
233    public void open() throws IOException {
234        logger.debug("opening " + servers.length + " channels");
235        if (servers.length <= 1) {
236            throw new IOException("to few servers");
237        }
238        channels = new SocketChannel[servers.length - 1];
239        for (int i = 1; i < servers.length; i++) {
240            channels[i - 1] = cf.getChannel(servers[i], ports[i]);
241        }
242    }
243
244
245    /**
246     * open, setup of SocketChannels. If nc &gt; servers.length open in round
247     * robin fashion.
248     * @param nc number of channels to open.
249     * @throws IOException.
250     */
251    public void open(int nc) throws IOException {
252        logger.debug("opening " + nc + " channels");
253        if (servers.length <= 1) {
254            throw new IOException("to few servers");
255        }
256        channels = new SocketChannel[nc];
257        int j = 1; // 0 is master
258        for (int i = 0; i < channels.length; i++) {
259            if (j >= servers.length) { // modulo #servers
260                j = 1;
261            }
262            channels[i] = cf.getChannel(servers[j], ports[j]);
263            j++;
264        }
265    }
266
267
268    /**
269     * close all channels and ChannelFactory.
270     */
271    public void close() {
272        logger.debug("closing ExecutableChannels");
273        if (cf != null) {
274            cf.terminate();
275        }
276        if (channels != null) {
277            for (int i = 0; i < channels.length; i++) {
278                if (channels[i] != null) {
279                    try {
280                        channels[i].send(ExecutableServer.STOP);
281                    } catch (IOException e) {
282                        e.printStackTrace();
283                    } finally {
284                        channels[i].close();
285                    }
286                    channels[i] = null;
287                }
288            }
289            channels = null;
290        }
291        logger.debug("ExecuteChannels closed");
292    }
293
294
295    /**
296     * getChannel.
297     * @param i channel number.
298     */
299    public SocketChannel getChannel(int i) {
300        if (channels != null && 0 <= i && i < channels.length) {
301            return channels[i];
302        }
303        return null;
304    }
305
306
307    /**
308     * getChannels.
309     */
310    /*package*/SocketChannel[] getChannels() {
311        return channels;
312    }
313
314
315    /**
316     * send on channel i.
317     * @param i channel number.
318     * @param o object to send.
319     */
320    public void send(int i, Object o) throws IOException {
321        if (channels != null && 0 <= i && i < channels.length) {
322            channels[i].send(o);
323        }
324    }
325
326
327    /**
328     * recieve on channel i.
329     * @param i channel number.
330     * @return object recieved.
331     */
332    public Object receive(int i) throws IOException, ClassNotFoundException {
333        if (channels != null && 0 <= i && i < channels.length) {
334            return channels[i].receive();
335        }
336        return null;
337
338    }
339
340}