001/*
002 * $Id: ChannelFactory.java 4944 2014-10-05 18:35:23Z axelclk $
003 */
004
005//package edu.unima.ky.parallel;
006package edu.jas.util;
007
008
009import java.io.IOException;
010
011import java.net.ServerSocket;
012import java.net.Socket;
013import java.net.BindException;
014
015import java.util.concurrent.BlockingQueue;
016import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue;
017
018import org.apache.log4j.Logger;
019
020
021/**
022 * ChannelFactory implements a symmetric and non blocking way of setting up
023 * sockets on the client and server side. The constructor sets up a ServerSocket
024 * and accepts and stores any Socket creation requests from clients. The created
025 * Sockets can the be retrieved from the store without blocking. Refactored for
026 * java.util.concurrent.
027 * @author Akitoshi Yoshida
028 * @author Heinz Kredel.
029 * @see SocketChannel
030 */
031public class ChannelFactory extends Thread {
032
033
034    private static final Logger logger = Logger.getLogger(ChannelFactory.class);
035
036
037    private final boolean debug = logger.isDebugEnabled();
038
039
040    /**
041     * default port of socket.
042     */
043    public final static int DEFAULT_PORT = 4711;
044
045
046    /**
047     * port of socket.
048     */
049    private final int port;
050
051
052    /**
053     * BoundedBuffer for sockets.
054     */
055    private final BlockingQueue<SocketChannel> buf;
056
057
058    /**
059     * local server socket.
060     */
061    private volatile ServerSocket srv;
062
063
064    /**
065     * is local server up and running.
066     */
067    private volatile boolean srvrun = false;
068
069
070    /**
071     * is thread started.
072     */
073    private volatile boolean srvstart = false;
074
075
076    /**
077     * Constructs a ChannelFactory on the DEFAULT_PORT.
078     */
079    public ChannelFactory() {
080        this(DEFAULT_PORT);
081    }
082
083
084    /**
085     * Constructs a ChannelFactory.
086     * @param p port.
087     */
088    public ChannelFactory(int p) {
089        buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/);
090        if (p <= 0) {
091            port = DEFAULT_PORT;
092        } else {
093            port = p;
094        }
095        try {
096            srv = new ServerSocket(port);
097            //this.start(); moved to init and getChannel
098            logger.info("server bound to port " + port);
099        } catch (BindException e) {
100            srv = null;
101            logger.warn("server not started, port used " + port);
102            if (debug) {
103                e.printStackTrace();
104            }
105        } catch (IOException e) {
106            logger.debug("IOException " + e);
107            if (logger.isDebugEnabled()) {
108                e.printStackTrace();
109            }
110        }
111    }
112
113
114    /**
115     * toString.
116     */
117    @Override
118    public String toString() {
119        return this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")";
120    }
121
122
123    /**
124     * thread initialization and start.
125     */
126    public void init() {
127        if (srv != null && ! srvstart  ) {
128            this.start();
129            srvstart = true;
130            logger.info("ChannelFactory at " + srv);
131        }
132    }
133
134
135    /**
136     * Get a new socket channel from a server socket.
137     */
138    public SocketChannel getChannel() throws InterruptedException {
139        // return (SocketChannel)buf.get();
140        if (srv == null) {
141            if (srvrun) {
142                throw new IllegalArgumentException("dont call when no server listens");
143            }
144        } else if ( ! srvstart  ) {
145            init();
146        }
147        return buf.take();
148    }
149
150
151    /**
152     * Get a new socket channel to a given host.
153     * @param h hostname
154     */
155    public SocketChannel getChannel(String h) throws IOException {
156        return getChannel(h,DEFAULT_PORT);
157    }
158
159
160    /**
161     * Get a new socket channel to a given host.
162     * @param h hostname
163     * @param p port
164     */
165    public SocketChannel getChannel(String h, int p) throws IOException {
166        if (p <= 0) {
167            p = port;
168        }
169        SocketChannel c = null;
170        int i = 0;
171        int delay = 5; // 50
172        logger.debug("connecting to " + h);
173        while (c == null) {
174            try {
175                c = new SocketChannel(new Socket(h, p));
176            } catch (IOException e) {
177                //System.out.println(e);
178                // wait server ready
179                i++;
180                if (i % 50 == 0) {
181                    delay += delay;
182                    logger.info("Server on " + h + ":" + p + " not ready in " + delay + "ms");
183                }
184                try {
185                    Thread.sleep(delay);
186                    if (i % 50 == 0 && debug) {
187                        throw new Exception("time wait, host = " + h + ", port = " + port);
188                    }
189                } catch (InterruptedException w) {
190                    Thread.currentThread().interrupt();
191                    throw new IOException("Interrupted during IO wait " + w);
192                } catch (Exception ee) { // debug only
193                    ee.printStackTrace();
194                }
195            }
196        }
197        logger.debug("connected, iter = " + i);
198        return c;
199    }
200
201
202    /**
203     * Run the servers accept() in an infinite loop.
204     */
205    @Override
206    public void run() {
207        if (srv == null) {
208            return; // nothing to do
209        }
210        srvrun = true;
211        while (true) {
212            try {
213                logger.info("waiting for connection on " + srv);
214                Socket s = srv.accept();
215                if (this.isInterrupted()) {
216                    //System.out.println("ChannelFactory interrupted");
217                    srvrun = false;
218                    if (s != null) { // by code-spotter
219                        s.close();
220                    }
221                    return;
222                }
223                //logger.debug("Socket = " +s);
224                logger.debug("connection accepted");
225                SocketChannel c = new SocketChannel(s);
226                buf.put(c);
227            } catch (IOException e) {
228                //logger.debug("ChannelFactory IO terminating");
229                srvrun = false;
230                return;
231            } catch (InterruptedException e) {
232                // unfug Thread.currentThread().interrupt();
233                //logger.debug("ChannelFactory IE terminating");
234                srvrun = false;
235                return;
236            }
237        }
238    }
239
240
241    /**
242     * Terminate the Channel Factory
243     */
244    public void terminate() {
245        if ( ! srvstart ) {
246            logger.debug("server not started");
247            return; 
248        }
249        this.interrupt();
250        try {
251            if (srv != null) {
252                srv.close();
253                srvrun = false;
254            }
255            this.interrupt();
256            while (!buf.isEmpty()) {
257                logger.debug("closing unused SocketChannel");
258                //((SocketChannel)buf.get()).close();
259                SocketChannel c  = buf.poll();
260                if ( c != null ) {
261                    c.close();
262                }
263            }
264        } catch (IOException e) {
265            //} catch (InterruptedException e) {
266            // Thread.currentThread().interrupt();
267        }
268        try {
269            this.join();
270        } catch (InterruptedException e) {
271            // unfug Thread.currentThread().interrupt();
272        }
273        logger.debug("ChannelFactory terminated");
274    }
275
276}