001    /*
002     * $Id: ChannelFactory.java 3297 2010-08-26 19:09:03Z kredel $
003     */
004    
005    //package edu.unima.ky.parallel;
006    package edu.jas.util;
007    
008    
009    import java.io.IOException;
010    
011    import java.net.ServerSocket;
012    import java.net.Socket;
013    import java.net.BindException;
014    
015    import java.util.concurrent.BlockingQueue;
016    import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue;
017    
018    import org.apache.log4j.Logger;
019    
020    
021    /**
022     * ChannelFactory implements a symmetric and non blocking way of setting up
023     * sockets on the client and server side. The constructor sets up a ServerSocket
024     * and accepts and stores any Socket creation requests from clients. The created
025     * Sockets can the be retrieved from the store without blocking. Refactored for
026     * java.util.concurrent.
027     * @author Akitoshi Yoshida
028     * @author Heinz Kredel.
029     * @see SocketChannel
030     */
031    public class ChannelFactory extends Thread {
032    
033    
034        private static final Logger logger = Logger.getLogger(ChannelFactory.class);
035    
036    
037        /**
038         * default port of socket.
039         */
040        public final static int DEFAULT_PORT = 4711;
041    
042    
043        /**
044         * port of socket.
045         */
046        private final int port;
047    
048    
049        /**
050         * BoundedBuffer for sockets.
051         */
052        //private BoundedBuffer buf = new BoundedBuffer(100);
053        private final BlockingQueue<SocketChannel> buf;
054    
055    
056        /**
057         * local server socket.
058         */
059        private volatile ServerSocket srv;
060    
061    
062        /**
063         * is local server up and running.
064         */
065        private volatile boolean srvrun = false;
066    
067    
068        /**
069         * is thread started.
070         */
071        private volatile boolean srvstart = false;
072    
073    
074        /**
075         * Constructs a ChannelFactory on the DEFAULT_PORT.
076         */
077        public ChannelFactory() {
078            this(DEFAULT_PORT);
079        }
080    
081    
082        /**
083         * Constructs a ChannelFactory.
084         * @param p port.
085         */
086        public ChannelFactory(int p) {
087            // buf = new ArrayBlockingQueue<SocketChannel>(100); 
088            buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/);
089            if (p <= 0) {
090                port = DEFAULT_PORT;
091            } else {
092                port = p;
093            }
094            try {
095                srv = new ServerSocket(port);
096                //this.start(); moved to init and getChannel
097                logger.info("server bound to port " + port);
098            } catch (BindException e) {
099                srv = null;
100                logger.warn("server not started, port used " + port);
101            } catch (IOException e) {
102                logger.debug("IOException " + e);
103                if (logger.isDebugEnabled()) {
104                    e.printStackTrace();
105                }
106            }
107        }
108    
109    
110        /**
111         * toString.
112         */
113        @Override
114        public String toString() {
115            return "" + this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")";
116        }
117    
118    
119        /**
120         * thread initialization and start.
121         */
122        public void init() {
123            if (srv != null && ! srvstart  ) {
124                this.start();
125                srvstart = true;
126                logger.info("ChannelFactory at " + srv);
127            }
128        }
129    
130    
131        /**
132         * Get a new socket channel from a server socket.
133         */
134        public SocketChannel getChannel() throws InterruptedException {
135            // return (SocketChannel)buf.get();
136            if (srv == null) {
137                if (srvrun) {
138                    throw new IllegalArgumentException("dont call when no server listens");
139                }
140            } else if ( ! srvstart  ) {
141                init();
142            }
143            return buf.take();
144        }
145    
146    
147        /**
148         * Get a new socket channel to a given host.
149         * @param h hostname
150         * @param p port
151         */
152        public SocketChannel getChannel(String h, int p) throws IOException {
153            if (p <= 0) {
154                p = port;
155            }
156            SocketChannel c = null;
157            int i = 0;
158            int delay = 5; // 50
159            logger.debug("connecting to " + h);
160            while (c == null) {
161                try {
162                    c = new SocketChannel(new Socket(h, p));
163                } catch (IOException e) {
164                    //System.out.println(e);
165                    // wait server ready
166                    i++;
167                    if (i % 50 == 0) {
168                        delay += delay;
169                        logger.info("Server on " + h + " not ready in " + delay + "ms");
170                    }
171                    System.out.println("Server on " + h + " not ready in " + delay + "ms");
172                    try {
173                        Thread.sleep(delay);
174                    } catch (InterruptedException w) {
175                        Thread.currentThread().interrupt();
176                        throw new IOException("Interrupted during IO wait " + w);
177                    }
178                }
179            }
180            logger.debug("connected, iter = " + i);
181            return c;
182        }
183    
184    
185        /**
186         * Run the servers accept() in an infinite loop.
187         */
188        @Override
189        public void run() {
190            if (srv == null) {
191                return; // nothing to do
192            }
193            srvrun = true;
194            while (true) {
195                try {
196                    logger.info("waiting for connection");
197                    Socket s = srv.accept();
198                    if (this.isInterrupted()) {
199                        //System.out.println("ChannelFactory interrupted");
200                        srvrun = false;
201                        return;
202                    }
203                    //logger.debug("Socket = " +s);
204                    logger.debug("connection accepted");
205                    SocketChannel c = new SocketChannel(s);
206                    buf.put(c);
207                } catch (IOException e) {
208                    //logger.debug("ChannelFactory IO terminating");
209                    srvrun = false;
210                    return;
211                } catch (InterruptedException e) {
212                    // unfug Thread.currentThread().interrupt();
213                    //logger.debug("ChannelFactory IE terminating");
214                    srvrun = false;
215                    return;
216                }
217            }
218        }
219    
220    
221        /**
222         * Terminate the Channel Factory
223         */
224        public void terminate() {
225            if ( ! srvstart ) {
226                logger.debug("server not started");
227                return; 
228            }
229            this.interrupt();
230            try {
231                if (srv != null) {
232                    srv.close();
233                    srvrun = false;
234                }
235                this.interrupt();
236                while (!buf.isEmpty()) {
237                    logger.debug("closing unused SocketChannel");
238                    //((SocketChannel)buf.get()).close();
239                    SocketChannel c  = buf.poll();
240                    if ( c != null ) {
241                        c.close();
242                    }
243                }
244            } catch (IOException e) {
245                //} catch (InterruptedException e) {
246                // Thread.currentThread().interrupt();
247            }
248            try {
249                this.join();
250            } catch (InterruptedException e) {
251                // unfug Thread.currentThread().interrupt();
252            }
253            logger.debug("ChannelFactory terminated");
254        }
255    
256    }