001/*
002 * $Id: ChannelFactory.java 4229 2012-10-03 17:19:36Z kredel $
003 */
004
005//package edu.unima.ky.parallel;
006package edu.jas.util;
007
008
009import java.io.IOException;
010
011import java.net.ServerSocket;
012import java.net.Socket;
013import java.net.BindException;
014
015import java.util.concurrent.BlockingQueue;
016import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue;
017
018import org.apache.log4j.Logger;
019
020
021/**
022 * ChannelFactory implements a symmetric and non blocking way of setting up
023 * sockets on the client and server side. The constructor sets up a ServerSocket
024 * and accepts and stores any Socket creation requests from clients. The created
025 * Sockets can the be retrieved from the store without blocking. Refactored for
026 * java.util.concurrent.
027 * @author Akitoshi Yoshida
028 * @author Heinz Kredel.
029 * @see SocketChannel
030 */
031public class ChannelFactory extends Thread {
032
033
034    private static final Logger logger = Logger.getLogger(ChannelFactory.class);
035
036
037    private final boolean debug = logger.isDebugEnabled();
038
039
040    /**
041     * default port of socket.
042     */
043    public final static int DEFAULT_PORT = 4711;
044
045
046    /**
047     * port of socket.
048     */
049    private final int port;
050
051
052    /**
053     * BoundedBuffer for sockets.
054     */
055    private final BlockingQueue<SocketChannel> buf;
056
057
058    /**
059     * local server socket.
060     */
061    private volatile ServerSocket srv;
062
063
064    /**
065     * is local server up and running.
066     */
067    private volatile boolean srvrun = false;
068
069
070    /**
071     * is thread started.
072     */
073    private volatile boolean srvstart = false;
074
075
076    /**
077     * Constructs a ChannelFactory on the DEFAULT_PORT.
078     */
079    public ChannelFactory() {
080        this(DEFAULT_PORT);
081    }
082
083
084    /**
085     * Constructs a ChannelFactory.
086     * @param p port.
087     */
088    public ChannelFactory(int p) {
089        buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/);
090        if (p <= 0) {
091            port = DEFAULT_PORT;
092        } else {
093            port = p;
094        }
095        try {
096            srv = new ServerSocket(port);
097            //this.start(); moved to init and getChannel
098            logger.info("server bound to port " + port);
099        } catch (BindException e) {
100            srv = null;
101            logger.warn("server not started, port used " + port);
102            if (debug) {
103                e.printStackTrace();
104            }
105        } catch (IOException e) {
106            logger.debug("IOException " + e);
107            if (logger.isDebugEnabled()) {
108                e.printStackTrace();
109            }
110        }
111    }
112
113
114    /**
115     * toString.
116     */
117    @Override
118    public String toString() {
119        return "" + this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")";
120    }
121
122
123    /**
124     * thread initialization and start.
125     */
126    public void init() {
127        if (srv != null && ! srvstart  ) {
128            this.start();
129            srvstart = true;
130            logger.info("ChannelFactory at " + srv);
131        }
132    }
133
134
135    /**
136     * Get a new socket channel from a server socket.
137     */
138    public SocketChannel getChannel() throws InterruptedException {
139        // return (SocketChannel)buf.get();
140        if (srv == null) {
141            if (srvrun) {
142                throw new IllegalArgumentException("dont call when no server listens");
143            }
144        } else if ( ! srvstart  ) {
145            init();
146        }
147        return buf.take();
148    }
149
150
151    /**
152     * Get a new socket channel to a given host.
153     * @param h hostname
154     * @param p port
155     */
156    public SocketChannel getChannel(String h, int p) throws IOException {
157        if (p <= 0) {
158            p = port;
159        }
160        SocketChannel c = null;
161        int i = 0;
162        int delay = 5; // 50
163        logger.debug("connecting to " + h);
164        while (c == null) {
165            try {
166                c = new SocketChannel(new Socket(h, p));
167            } catch (IOException e) {
168                //System.out.println(e);
169                // wait server ready
170                i++;
171                if (i % 50 == 0) {
172                    delay += delay;
173                    logger.info("Server on " + h + " not ready in " + delay + "ms");
174                }
175                System.out.println("Server on " + h + " not ready in " + delay + "ms");
176                try {
177                    Thread.sleep(delay);
178                    if (i % 50 == 0 && debug) {
179                        throw new Exception("time wait, host = " + h + ", port = " + port);
180                    }
181                } catch (InterruptedException w) {
182                    Thread.currentThread().interrupt();
183                    throw new IOException("Interrupted during IO wait " + w);
184                } catch (Exception ee) { // debug only
185                    ee.printStackTrace();
186                }
187            }
188        }
189        logger.debug("connected, iter = " + i);
190        return c;
191    }
192
193
194    /**
195     * Run the servers accept() in an infinite loop.
196     */
197    @Override
198    public void run() {
199        if (srv == null) {
200            return; // nothing to do
201        }
202        srvrun = true;
203        while (true) {
204            try {
205                logger.info("waiting for connection");
206                Socket s = srv.accept();
207                if (this.isInterrupted()) {
208                    //System.out.println("ChannelFactory interrupted");
209                    srvrun = false;
210                    return;
211                }
212                //logger.debug("Socket = " +s);
213                logger.debug("connection accepted");
214                SocketChannel c = new SocketChannel(s);
215                buf.put(c);
216            } catch (IOException e) {
217                //logger.debug("ChannelFactory IO terminating");
218                srvrun = false;
219                return;
220            } catch (InterruptedException e) {
221                // unfug Thread.currentThread().interrupt();
222                //logger.debug("ChannelFactory IE terminating");
223                srvrun = false;
224                return;
225            }
226        }
227    }
228
229
230    /**
231     * Terminate the Channel Factory
232     */
233    public void terminate() {
234        if ( ! srvstart ) {
235            logger.debug("server not started");
236            return; 
237        }
238        this.interrupt();
239        try {
240            if (srv != null) {
241                srv.close();
242                srvrun = false;
243            }
244            this.interrupt();
245            while (!buf.isEmpty()) {
246                logger.debug("closing unused SocketChannel");
247                //((SocketChannel)buf.get()).close();
248                SocketChannel c  = buf.poll();
249                if ( c != null ) {
250                    c.close();
251                }
252            }
253        } catch (IOException e) {
254            //} catch (InterruptedException e) {
255            // Thread.currentThread().interrupt();
256        }
257        try {
258            this.join();
259        } catch (InterruptedException e) {
260            // unfug Thread.currentThread().interrupt();
261        }
262        logger.debug("ChannelFactory terminated");
263    }
264
265}