001/*
002 * $Id: ExecutableChannels.java 5872 2018-07-20 16:01:46Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.BufferedReader;
009import java.io.FileNotFoundException;
010import java.io.FileInputStream;
011import java.io.InputStreamReader;
012import java.io.IOException;
013import java.nio.charset.Charset;
014import java.util.ArrayList;
015import java.util.List;
016
017import org.apache.logging.log4j.Logger;
018import org.apache.logging.log4j.LogManager; 
019
020
021// import edu.unima.ky.parallel.ChannelFactory;
022// import edu.unima.ky.parallel.SocketChannel;
023
024/**
025 * ExecutableChannels used to receive and execute classes.
026 * @author Heinz Kredel
027 */
028
029
030public class ExecutableChannels {
031
032
033    private static final Logger logger = LogManager.getLogger(ExecutableChannels.class);
034
035
036    /**
037     * default port.
038     */
039    protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT;
040
041
042    /**
043     * default machine file.
044     */
045    protected final static String DEFAULT_MFILE = "examples/machines.test";
046
047
048    protected final ChannelFactory cf;
049
050
051    protected SocketChannel[] channels = null;
052
053
054    protected String[] servers = null;
055
056
057    protected int[] ports = null;
058
059
060    /**
061     * Internal constructor.
062     */
063    protected ExecutableChannels() {
064        cf = new ChannelFactory();
065        cf.init();
066    }
067
068
069    /**
070     * Constructor from array of server:port strings.
071     * @param srvs A String array.
072     */
073    public ExecutableChannels(String[] srvs) {
074        this();
075        if (srvs == null) {
076            return;
077        }
078        servers = new String[srvs.length];
079        ports = new int[srvs.length];
080        for (int i = 0; i < srvs.length; i++) {
081            setServerPort(i, srvs[i]);
082        }
083    }
084
085
086    /**
087     * Constructor from machine file.
088     * @param mfile
089     * @throws FileNotFoundException.
090     */
091    public ExecutableChannels(String mfile) throws FileNotFoundException {
092        this();
093        if (mfile == null || mfile.length() == 0) {
094            mfile = DEFAULT_MFILE;
095        }
096        InputStreamReader isr = new InputStreamReader(new FileInputStream(mfile),Charset.forName("UTF8"));
097        BufferedReader in = new BufferedReader(isr);
098        String line = null;
099        List<String> list = new ArrayList<String>();
100        int x;
101        try {
102            while (true) {
103                if (!in.ready()) {
104                    break;
105                }
106                line = in.readLine();
107                if (line == null) {
108                    break;
109                }
110                x = line.indexOf("#");
111                if (x >= 0) {
112                    line = line.substring(0, x);
113                }
114                line = line.trim();
115                if (line.length() == 0) {
116                    continue;
117                }
118                list.add(line);
119            }
120        } catch (IOException ignored) {
121        } finally {
122            try {
123                in.close();
124            } catch (IOException ignore) {
125            }
126        }
127        logger.debug("list.size() in " + mfile + " = " + list.size());
128        if (list.size() == 0) {
129            return;
130        }
131        servers = new String[list.size()];
132        ports = new int[list.size()];
133        for (int i = 0; i < servers.length; i++) {
134            setServerPort(i, list.get(i));
135        }
136    }
137
138
139    /* 
140     * internal method
141     */
142    protected void setServerPort(int i, String srv) {
143        int x = srv.indexOf(":");
144        ports[i] = DEFAULT_PORT;
145        if (x < 0) {
146            servers[i] = srv;
147        } else {
148            servers[i] = srv.substring(0, x);
149            String p = srv.substring(x + 1, srv.length());
150            try {
151                ports[i] = Integer.parseInt(p);
152            } catch (NumberFormatException ignored) {
153            }
154        }
155    }
156
157
158    /**
159     * String representation.
160     */
161    @Override
162    public String toString() {
163        StringBuffer s = new StringBuffer("ExecutableChannels(");
164        if (servers != null) {
165            for (int i = 0; i < servers.length; i++) {
166                s.append(servers[i] + ":" + ports[i]);
167                if (i < servers.length - 1) {
168                    s.append(" ");
169                }
170            }
171        }
172        if (channels != null) {
173            s.append(" channels = ");
174            for (int i = 0; i < channels.length; i++) {
175                s.append(channels[i]);
176                if (i < channels.length - 1) {
177                    s.append(" ");
178                }
179            }
180        }
181        s.append(")");
182        return s.toString();
183    }
184
185
186    /**
187     * number of servers.
188     */
189    public int numServers() {
190        if (servers != null) {
191            return servers.length;
192        }
193        return -1;
194    }
195
196
197    /**
198     * get master host.
199     */
200    public String getMasterHost() {
201        if (servers != null && servers.length > 0) {
202            return servers[0];
203        }
204        return null;
205    }
206
207
208    /**
209     * get master port.
210     */
211    public int getMasterPort() {
212        if (ports != null && ports.length > 0) {
213            return ports[0];
214        }
215        return 0;
216    }
217
218
219    /**
220     * number of channels.
221     */
222    public int numChannels() {
223        if (channels != null) {
224            return channels.length;
225        }
226        return -1;
227    }
228
229
230    /**
231     * open, setup of SocketChannels.
232     * @throws IOException.
233     */
234    public void open() throws IOException {
235        logger.debug("opening " + servers.length + " channels");
236        if (servers.length <= 1) {
237            throw new IOException("to few servers");
238        }
239        channels = new SocketChannel[servers.length - 1];
240        for (int i = 1; i < servers.length; i++) {
241            channels[i - 1] = cf.getChannel(servers[i], ports[i]);
242        }
243    }
244
245
246    /**
247     * open, setup of SocketChannels. If nc &gt; servers.length open in round
248     * robin fashion.
249     * @param nc number of channels to open.
250     * @throws IOException.
251     */
252    public void open(int nc) throws IOException {
253        logger.debug("opening " + nc + " channels");
254        if (servers.length <= 1) {
255            throw new IOException("to few servers");
256        }
257        channels = new SocketChannel[nc];
258        int j = 1; // 0 is master
259        for (int i = 0; i < channels.length; i++) {
260            if (j >= servers.length) { // modulo #servers
261                j = 1;
262            }
263            channels[i] = cf.getChannel(servers[j], ports[j]);
264            j++;
265        }
266    }
267
268
269    /**
270     * close all channels and ChannelFactory.
271     */
272    public void close() {
273        logger.debug("closing ExecutableChannels");
274        if (cf != null) {
275            cf.terminate();
276        }
277        if (channels != null) {
278            for (int i = 0; i < channels.length; i++) {
279                if (channels[i] != null) {
280                    try {
281                        channels[i].send(ExecutableServer.STOP);
282                    } catch (IOException e) {
283                        if (logger.isDebugEnabled()) {
284                            e.printStackTrace();
285                        }
286                    } finally {
287                        channels[i].close();
288                    }
289                    channels[i] = null;
290                }
291            }
292            channels = null;
293        }
294        logger.debug("ExecuteChannels closed");
295    }
296
297
298    /**
299     * getChannel.
300     * @param i channel number.
301     */
302    public SocketChannel getChannel(int i) {
303        if (channels != null && 0 <= i && i < channels.length) {
304            return channels[i];
305        }
306        return null;
307    }
308
309
310    /**
311     * getChannels.
312     */
313    /*package*/SocketChannel[] getChannels() {
314        return channels;
315    }
316
317
318    /**
319     * send on channel i.
320     * @param i channel number.
321     * @param o object to send.
322     */
323    public void send(int i, Object o) throws IOException {
324        if (channels != null && 0 <= i && i < channels.length) {
325            channels[i].send(o);
326        }
327    }
328
329
330    /**
331     * recieve on channel i.
332     * @param i channel number.
333     * @return object recieved.
334     */
335    public Object receive(int i) throws IOException, ClassNotFoundException {
336        if (channels != null && 0 <= i && i < channels.length) {
337            return channels[i].receive();
338        }
339        return null;
340
341    }
342
343}