001/*
002 * $Id$
003 */
004
005//package edu.unima.ky.parallel;
006package edu.jas.util;
007
008
009import java.io.IOException;
010
011import java.net.ServerSocket;
012import java.net.Socket;
013import java.net.BindException;
014
015import java.util.concurrent.BlockingQueue;
016import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue;
017
018import org.apache.logging.log4j.Logger;
019import org.apache.logging.log4j.LogManager; 
020
021
022/**
023 * ChannelFactory implements a symmetric and non blocking way of setting up
024 * sockets on the client and server side. The constructor sets up a ServerSocket
025 * and accepts and stores any Socket creation requests from clients. The created
026 * Sockets can the be retrieved from the store without blocking. Refactored for
027 * java.util.concurrent.
028 * @author Akitoshi Yoshida
029 * @author Heinz Kredel
030 * @see SocketChannel
031 */
032public class ChannelFactory extends Thread {
033
034
035    private static final Logger logger = LogManager.getLogger(ChannelFactory.class);
036
037
038    private static final boolean debug = logger.isDebugEnabled();
039
040
041    /**
042     * default port of socket.
043     */
044    public final static int DEFAULT_PORT = 4711;
045
046
047    /**
048     * port of socket.
049     */
050    private final int port;
051
052
053    /**
054     * BoundedBuffer for sockets.
055     */
056    private final BlockingQueue<SocketChannel> buf;
057
058
059    /**
060     * local server socket.
061     */
062    private volatile ServerSocket srv;
063
064
065    /**
066     * is local server up and running.
067     */
068    private volatile boolean srvrun = false;
069
070
071    /**
072     * is thread started.
073     */
074    private volatile boolean srvstart = false;
075
076
077    /**
078     * Constructs a ChannelFactory on the DEFAULT_PORT.
079     */
080    public ChannelFactory() {
081        this(DEFAULT_PORT);
082    }
083
084
085    /**
086     * Constructs a ChannelFactory.
087     * @param p port.
088     */
089    public ChannelFactory(int p) {
090        buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/);
091        if (p <= 0) {
092            port = DEFAULT_PORT;
093        } else {
094            port = p;
095        }
096        try {
097            srv = new ServerSocket(port);
098            //this.start(); moved to init and getChannel
099            logger.info("server bound to port {}", port);
100        } catch (BindException e) {
101            srv = null;
102            logger.warn("server not started, port used {}", port);
103            if (debug) {
104                e.printStackTrace();
105            }
106        } catch (IOException e) {
107            logger.info("IOException {}", e);
108            if (debug) {
109                e.printStackTrace();
110            }
111        }
112    }
113
114
115    /**
116     * toString.
117     */
118    @Override
119    public String toString() {
120        return this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")";
121    }
122
123
124    /**
125     * thread initialization and start.
126     */
127    public void init() {
128        if (srv != null && ! srvstart  ) {
129            this.start();
130            srvstart = true;
131            logger.info("ChannelFactory at {}", srv);
132        }
133    }
134
135
136    /**
137     * Get a new socket channel from a server socket.
138     */
139    public SocketChannel getChannel() throws InterruptedException {
140        // return (SocketChannel)buf.get();
141        if (srv == null) {
142            if (srvrun) {
143                throw new IllegalArgumentException("dont call when no server listens");
144            }
145        } else if ( ! srvstart  ) {
146            init();
147        }
148        return buf.take();
149    }
150
151
152    /**
153     * Get a new socket channel to a given host.
154     * @param h hostname
155     */
156    public SocketChannel getChannel(String h) throws IOException {
157        return getChannel(h,DEFAULT_PORT);
158    }
159
160
161    /**
162     * Get a new socket channel to a given host.
163     * @param h hostname
164     * @param p port
165     */
166    public SocketChannel getChannel(String h, int p) throws IOException {
167        if (p <= 0) {
168            p = port;
169        }
170        SocketChannel c = null;
171        int i = 0;
172        int delay = 5; // 50
173        logger.debug("connecting to {}", h);
174        while (c == null) {
175            try {
176                c = new SocketChannel(new Socket(h, p));
177            } catch (IOException e) {
178                //System.out.println(e);
179                // wait server ready
180                i++;
181                if (i % 50 == 0) {
182                    delay += delay;
183                    logger.info("Server on {}:{} not ready in {} ms", h, p, delay);
184                }
185                try {
186                    Thread.sleep(delay);
187                    if (i % 50 == 0 && debug) {
188                        throw new Exception("time wait, host = " + h + ", port = " + port);
189                    }
190                } catch (InterruptedException w) {
191                    Thread.currentThread().interrupt();
192                    throw new IOException("Interrupted during IO wait " + w);
193                } catch (Exception ee) { // debug only
194                    ee.printStackTrace();
195                }
196            }
197        }
198        logger.debug("connected, iter = {}", i);
199        return c;
200    }
201
202
203    /**
204     * Run the servers accept() in an infinite loop.
205     */
206    @Override
207    public void run() {
208        if (srv == null) {
209            return; // nothing to do
210        }
211        srvrun = true;
212        while (true) {
213            try {
214                logger.info("waiting for connection on {}", srv);
215                Socket s = srv.accept();
216                if (this.isInterrupted()) {
217                    //System.out.println("ChannelFactory interrupted");
218                    srvrun = false;
219                    if (s != null) { // by code-spotter
220                        s.close();
221                    }
222                    return;
223                }
224                //logger.debug("Socket = {}", s);
225                logger.debug("connection accepted");
226                SocketChannel c = new SocketChannel(s);
227                buf.put(c);
228            } catch (IOException e) {
229                //logger.debug("ChannelFactory IO terminating");
230                srvrun = false;
231                return;
232            } catch (InterruptedException e) {
233                // unfug Thread.currentThread().interrupt();
234                //logger.debug("ChannelFactory IE terminating");
235                srvrun = false;
236                return;
237            }
238        }
239    }
240
241
242    /**
243     * Terminate the Channel Factory
244     */
245    public void terminate() {
246        if ( ! srvstart ) {
247            logger.debug("server not started");
248            return; 
249        }
250        this.interrupt();
251        try {
252            if (srv != null) {
253                srv.close();
254                srvrun = false;
255            }
256            this.interrupt();
257            while (!buf.isEmpty()) {
258                logger.debug("closing unused SocketChannel");
259                SocketChannel c  = buf.poll();
260                if ( c != null ) {
261                    c.close();
262                }
263            }
264        } catch (IOException e) {
265            //} catch (InterruptedException e) {
266            // Thread.currentThread().interrupt();
267        }
268        try {
269            this.join();
270        } catch (InterruptedException e) {
271            // unfug Thread.currentThread().interrupt();
272        }
273        logger.debug("ChannelFactory terminated");
274    }
275
276}