001 /*
002 * $Id: ChannelFactory.java 3297 2010-08-26 19:09:03Z kredel $
003 */
004
005 //package edu.unima.ky.parallel;
006 package edu.jas.util;
007
008
009 import java.io.IOException;
010
011 import java.net.ServerSocket;
012 import java.net.Socket;
013 import java.net.BindException;
014
015 import java.util.concurrent.BlockingQueue;
016 import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue;
017
018 import org.apache.log4j.Logger;
019
020
021 /**
022 * ChannelFactory implements a symmetric and non blocking way of setting up
023 * sockets on the client and server side. The constructor sets up a ServerSocket
024 * and accepts and stores any Socket creation requests from clients. The created
025 * Sockets can the be retrieved from the store without blocking. Refactored for
026 * java.util.concurrent.
027 * @author Akitoshi Yoshida
028 * @author Heinz Kredel.
029 * @see SocketChannel
030 */
031 public class ChannelFactory extends Thread {
032
033
034 private static final Logger logger = Logger.getLogger(ChannelFactory.class);
035
036
037 /**
038 * default port of socket.
039 */
040 public final static int DEFAULT_PORT = 4711;
041
042
043 /**
044 * port of socket.
045 */
046 private final int port;
047
048
049 /**
050 * BoundedBuffer for sockets.
051 */
052 //private BoundedBuffer buf = new BoundedBuffer(100);
053 private final BlockingQueue<SocketChannel> buf;
054
055
056 /**
057 * local server socket.
058 */
059 private volatile ServerSocket srv;
060
061
062 /**
063 * is local server up and running.
064 */
065 private volatile boolean srvrun = false;
066
067
068 /**
069 * is thread started.
070 */
071 private volatile boolean srvstart = false;
072
073
074 /**
075 * Constructs a ChannelFactory on the DEFAULT_PORT.
076 */
077 public ChannelFactory() {
078 this(DEFAULT_PORT);
079 }
080
081
082 /**
083 * Constructs a ChannelFactory.
084 * @param p port.
085 */
086 public ChannelFactory(int p) {
087 // buf = new ArrayBlockingQueue<SocketChannel>(100);
088 buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/);
089 if (p <= 0) {
090 port = DEFAULT_PORT;
091 } else {
092 port = p;
093 }
094 try {
095 srv = new ServerSocket(port);
096 //this.start(); moved to init and getChannel
097 logger.info("server bound to port " + port);
098 } catch (BindException e) {
099 srv = null;
100 logger.warn("server not started, port used " + port);
101 } catch (IOException e) {
102 logger.debug("IOException " + e);
103 if (logger.isDebugEnabled()) {
104 e.printStackTrace();
105 }
106 }
107 }
108
109
110 /**
111 * toString.
112 */
113 @Override
114 public String toString() {
115 return "" + this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")";
116 }
117
118
119 /**
120 * thread initialization and start.
121 */
122 public void init() {
123 if (srv != null && ! srvstart ) {
124 this.start();
125 srvstart = true;
126 logger.info("ChannelFactory at " + srv);
127 }
128 }
129
130
131 /**
132 * Get a new socket channel from a server socket.
133 */
134 public SocketChannel getChannel() throws InterruptedException {
135 // return (SocketChannel)buf.get();
136 if (srv == null) {
137 if (srvrun) {
138 throw new IllegalArgumentException("dont call when no server listens");
139 }
140 } else if ( ! srvstart ) {
141 init();
142 }
143 return buf.take();
144 }
145
146
147 /**
148 * Get a new socket channel to a given host.
149 * @param h hostname
150 * @param p port
151 */
152 public SocketChannel getChannel(String h, int p) throws IOException {
153 if (p <= 0) {
154 p = port;
155 }
156 SocketChannel c = null;
157 int i = 0;
158 int delay = 5; // 50
159 logger.debug("connecting to " + h);
160 while (c == null) {
161 try {
162 c = new SocketChannel(new Socket(h, p));
163 } catch (IOException e) {
164 //System.out.println(e);
165 // wait server ready
166 i++;
167 if (i % 50 == 0) {
168 delay += delay;
169 logger.info("Server on " + h + " not ready in " + delay + "ms");
170 }
171 System.out.println("Server on " + h + " not ready in " + delay + "ms");
172 try {
173 Thread.sleep(delay);
174 } catch (InterruptedException w) {
175 Thread.currentThread().interrupt();
176 throw new IOException("Interrupted during IO wait " + w);
177 }
178 }
179 }
180 logger.debug("connected, iter = " + i);
181 return c;
182 }
183
184
185 /**
186 * Run the servers accept() in an infinite loop.
187 */
188 @Override
189 public void run() {
190 if (srv == null) {
191 return; // nothing to do
192 }
193 srvrun = true;
194 while (true) {
195 try {
196 logger.info("waiting for connection");
197 Socket s = srv.accept();
198 if (this.isInterrupted()) {
199 //System.out.println("ChannelFactory interrupted");
200 srvrun = false;
201 return;
202 }
203 //logger.debug("Socket = " +s);
204 logger.debug("connection accepted");
205 SocketChannel c = new SocketChannel(s);
206 buf.put(c);
207 } catch (IOException e) {
208 //logger.debug("ChannelFactory IO terminating");
209 srvrun = false;
210 return;
211 } catch (InterruptedException e) {
212 // unfug Thread.currentThread().interrupt();
213 //logger.debug("ChannelFactory IE terminating");
214 srvrun = false;
215 return;
216 }
217 }
218 }
219
220
221 /**
222 * Terminate the Channel Factory
223 */
224 public void terminate() {
225 if ( ! srvstart ) {
226 logger.debug("server not started");
227 return;
228 }
229 this.interrupt();
230 try {
231 if (srv != null) {
232 srv.close();
233 srvrun = false;
234 }
235 this.interrupt();
236 while (!buf.isEmpty()) {
237 logger.debug("closing unused SocketChannel");
238 //((SocketChannel)buf.get()).close();
239 SocketChannel c = buf.poll();
240 if ( c != null ) {
241 c.close();
242 }
243 }
244 } catch (IOException e) {
245 //} catch (InterruptedException e) {
246 // Thread.currentThread().interrupt();
247 }
248 try {
249 this.join();
250 } catch (InterruptedException e) {
251 // unfug Thread.currentThread().interrupt();
252 }
253 logger.debug("ChannelFactory terminated");
254 }
255
256 }