001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007
008import java.io.BufferedReader;
009import java.io.FileNotFoundException;
010import java.io.FileInputStream;
011import java.io.InputStreamReader;
012import java.io.IOException;
013import java.nio.charset.Charset;
014import java.util.ArrayList;
015import java.util.List;
016
017import org.apache.logging.log4j.Logger;
018import org.apache.logging.log4j.LogManager; 
019
020
021// import edu.unima.ky.parallel.ChannelFactory;
022// import edu.unima.ky.parallel.SocketChannel;
023
024/**
025 * ExecutableChannels used to receive and execute classes.
026 * @author Heinz Kredel
027 */
028
029
030public class ExecutableChannels {
031
032
033    private static final Logger logger = LogManager.getLogger(ExecutableChannels.class);
034
035
036    /**
037     * default port.
038     */
039    protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT;
040
041
042    /**
043     * default machine file.
044     */
045    protected final static String DEFAULT_MFILE = "examples/machines.test";
046
047
048    protected final ChannelFactory cf;
049
050
051    protected SocketChannel[] channels = null;
052
053
054    protected String[] servers = null;
055
056
057    protected int[] ports = null;
058
059
060    /**
061     * Internal constructor.
062     */
063    protected ExecutableChannels() {
064        cf = new ChannelFactory();
065        cf.init();
066    }
067
068
069    /**
070     * Constructor from array of server:port strings.
071     * @param srvs A String array.
072     */
073    public ExecutableChannels(String[] srvs) {
074        this();
075        if (srvs == null) {
076            return;
077        }
078        servers = new String[srvs.length];
079        ports = new int[srvs.length];
080        for (int i = 0; i < srvs.length; i++) {
081            setServerPort(i, srvs[i]);
082        }
083    }
084
085
086    /**
087     * Constructor from machine file.
088     * @param mfile
089     * @throws FileNotFoundException.
090     */
091    public ExecutableChannels(String mfile) throws FileNotFoundException {
092        this();
093        if (mfile == null || mfile.length() == 0) {
094            mfile = DEFAULT_MFILE;
095        }
096        InputStreamReader isr = new InputStreamReader(new FileInputStream(mfile),Charset.forName("UTF8"));
097        BufferedReader in = new BufferedReader(isr);
098        String line = null;
099        List<String> list = new ArrayList<String>();
100        int x;
101        try {
102            while (true) {
103                if (!in.ready()) {
104                    break;
105                }
106                line = in.readLine();
107                if (line == null) {
108                    break;
109                }
110                x = line.indexOf("#");
111                if (x >= 0) {
112                    line = line.substring(0, x);
113                }
114                line = line.trim();
115                if (line.length() == 0) {
116                    continue;
117                }
118                list.add(line);
119            }
120        } catch (IOException ignored) {
121        } finally {
122            try {
123                in.close();
124                isr.close();
125            } catch (IOException ignore) {
126            }
127        }
128        logger.debug("list.size() in {} = {}", mfile, list.size());
129        if (list.size() == 0) {
130            return;
131        }
132        servers = new String[list.size()];
133        ports = new int[list.size()];
134        for (int i = 0; i < servers.length; i++) {
135            setServerPort(i, list.get(i));
136        }
137    }
138
139
140    /* 
141     * internal method
142     */
143    protected void setServerPort(int i, String srv) {
144        int x = srv.indexOf(":");
145        ports[i] = DEFAULT_PORT;
146        if (x < 0) {
147            servers[i] = srv;
148        } else {
149            servers[i] = srv.substring(0, x);
150            String p = srv.substring(x + 1, srv.length());
151            try {
152                ports[i] = Integer.parseInt(p);
153            } catch (NumberFormatException ignored) {
154            }
155        }
156    }
157
158
159    /**
160     * String representation.
161     */
162    @Override
163    public String toString() {
164        StringBuffer s = new StringBuffer("ExecutableChannels(");
165        if (servers != null) {
166            for (int i = 0; i < servers.length; i++) {
167                s.append(servers[i] + ":" + ports[i]);
168                if (i < servers.length - 1) {
169                    s.append(" ");
170                }
171            }
172        }
173        if (channels != null) {
174            s.append(" channels = ");
175            for (int i = 0; i < channels.length; i++) {
176                s.append(channels[i]);
177                if (i < channels.length - 1) {
178                    s.append(" ");
179                }
180            }
181        }
182        s.append(")");
183        return s.toString();
184    }
185
186
187    /**
188     * number of servers.
189     */
190    public int numServers() {
191        if (servers != null) {
192            return servers.length;
193        }
194        return -1;
195    }
196
197
198    /**
199     * get master host.
200     */
201    public String getMasterHost() {
202        if (servers != null && servers.length > 0) {
203            return servers[0];
204        }
205        return null;
206    }
207
208
209    /**
210     * get master port.
211     */
212    public int getMasterPort() {
213        if (ports != null && ports.length > 0) {
214            return ports[0];
215        }
216        return 0;
217    }
218
219
220    /**
221     * number of channels.
222     */
223    public int numChannels() {
224        if (channels != null) {
225            return channels.length;
226        }
227        return -1;
228    }
229
230
231    /**
232     * open, setup of SocketChannels.
233     * @throws IOException.
234     */
235    public void open() throws IOException {
236        logger.debug("opening {} channels", servers.length);
237        if (servers.length <= 1) {
238            throw new IOException("to few servers");
239        }
240        channels = new SocketChannel[servers.length - 1];
241        for (int i = 1; i < servers.length; i++) {
242            channels[i - 1] = cf.getChannel(servers[i], ports[i]);
243        }
244    }
245
246
247    /**
248     * open, setup of SocketChannels. If nc &gt; servers.length open in round
249     * robin fashion.
250     * @param nc number of channels to open.
251     * @throws IOException.
252     */
253    public void open(int nc) throws IOException {
254        logger.debug("opening {} channels", nc);
255        if (servers.length <= 1) {
256            throw new IOException("to few servers");
257        }
258        channels = new SocketChannel[nc];
259        int j = 1; // 0 is master
260        for (int i = 0; i < channels.length; i++) {
261            if (j >= servers.length) { // modulo #servers
262                j = 1;
263            }
264            channels[i] = cf.getChannel(servers[j], ports[j]);
265            j++;
266        }
267    }
268
269
270    /**
271     * close all channels and ChannelFactory.
272     */
273    public void close() {
274        logger.debug("closing ExecutableChannels");
275        if (cf != null) {
276            cf.terminate();
277        }
278        if (channels != null) {
279            for (int i = 0; i < channels.length; i++) {
280                if (channels[i] != null) {
281                    try {
282                        channels[i].send(ExecutableServer.STOP);
283                    } catch (IOException e) {
284                        if (logger.isDebugEnabled()) {
285                            e.printStackTrace();
286                        }
287                    } finally {
288                        channels[i].close();
289                    }
290                    channels[i] = null;
291                }
292            }
293            channels = null;
294        }
295        logger.debug("ExecuteChannels closed");
296    }
297
298
299    /**
300     * getChannel.
301     * @param i channel number.
302     */
303    public SocketChannel getChannel(int i) {
304        if (channels != null && 0 <= i && i < channels.length) {
305            return channels[i];
306        }
307        return null;
308    }
309
310
311    /**
312     * getChannels.
313     */
314    /*package*/SocketChannel[] getChannels() {
315        return channels;
316    }
317
318
319    /**
320     * send on channel i.
321     * @param i channel number.
322     * @param o object to send.
323     */
324    public void send(int i, Object o) throws IOException {
325        if (channels != null && 0 <= i && i < channels.length) {
326            channels[i].send(o);
327        }
328    }
329
330
331    /**
332     * receive on channel i.
333     * @param i channel number.
334     * @return object received.
335     */
336    public Object receive(int i) throws IOException, ClassNotFoundException {
337        if (channels != null && 0 <= i && i < channels.length) {
338            return channels[i].receive();
339        }
340        return null;
341
342    }
343
344}