001/* 002 * $Id: ChannelFactory.java 4944 2014-10-05 18:35:23Z axelclk $ 003 */ 004 005//package edu.unima.ky.parallel; 006package edu.jas.util; 007 008 009import java.io.IOException; 010 011import java.net.ServerSocket; 012import java.net.Socket; 013import java.net.BindException; 014 015import java.util.concurrent.BlockingQueue; 016import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue; 017 018import org.apache.log4j.Logger; 019 020 021/** 022 * ChannelFactory implements a symmetric and non blocking way of setting up 023 * sockets on the client and server side. The constructor sets up a ServerSocket 024 * and accepts and stores any Socket creation requests from clients. The created 025 * Sockets can the be retrieved from the store without blocking. Refactored for 026 * java.util.concurrent. 027 * @author Akitoshi Yoshida 028 * @author Heinz Kredel. 029 * @see SocketChannel 030 */ 031public class ChannelFactory extends Thread { 032 033 034 private static final Logger logger = Logger.getLogger(ChannelFactory.class); 035 036 037 private final boolean debug = logger.isDebugEnabled(); 038 039 040 /** 041 * default port of socket. 042 */ 043 public final static int DEFAULT_PORT = 4711; 044 045 046 /** 047 * port of socket. 048 */ 049 private final int port; 050 051 052 /** 053 * BoundedBuffer for sockets. 054 */ 055 private final BlockingQueue<SocketChannel> buf; 056 057 058 /** 059 * local server socket. 060 */ 061 private volatile ServerSocket srv; 062 063 064 /** 065 * is local server up and running. 066 */ 067 private volatile boolean srvrun = false; 068 069 070 /** 071 * is thread started. 072 */ 073 private volatile boolean srvstart = false; 074 075 076 /** 077 * Constructs a ChannelFactory on the DEFAULT_PORT. 078 */ 079 public ChannelFactory() { 080 this(DEFAULT_PORT); 081 } 082 083 084 /** 085 * Constructs a ChannelFactory. 086 * @param p port. 087 */ 088 public ChannelFactory(int p) { 089 buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/); 090 if (p <= 0) { 091 port = DEFAULT_PORT; 092 } else { 093 port = p; 094 } 095 try { 096 srv = new ServerSocket(port); 097 //this.start(); moved to init and getChannel 098 logger.info("server bound to port " + port); 099 } catch (BindException e) { 100 srv = null; 101 logger.warn("server not started, port used " + port); 102 if (debug) { 103 e.printStackTrace(); 104 } 105 } catch (IOException e) { 106 logger.debug("IOException " + e); 107 if (logger.isDebugEnabled()) { 108 e.printStackTrace(); 109 } 110 } 111 } 112 113 114 /** 115 * toString. 116 */ 117 @Override 118 public String toString() { 119 return this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")"; 120 } 121 122 123 /** 124 * thread initialization and start. 125 */ 126 public void init() { 127 if (srv != null && ! srvstart ) { 128 this.start(); 129 srvstart = true; 130 logger.info("ChannelFactory at " + srv); 131 } 132 } 133 134 135 /** 136 * Get a new socket channel from a server socket. 137 */ 138 public SocketChannel getChannel() throws InterruptedException { 139 // return (SocketChannel)buf.get(); 140 if (srv == null) { 141 if (srvrun) { 142 throw new IllegalArgumentException("dont call when no server listens"); 143 } 144 } else if ( ! srvstart ) { 145 init(); 146 } 147 return buf.take(); 148 } 149 150 151 /** 152 * Get a new socket channel to a given host. 153 * @param h hostname 154 */ 155 public SocketChannel getChannel(String h) throws IOException { 156 return getChannel(h,DEFAULT_PORT); 157 } 158 159 160 /** 161 * Get a new socket channel to a given host. 162 * @param h hostname 163 * @param p port 164 */ 165 public SocketChannel getChannel(String h, int p) throws IOException { 166 if (p <= 0) { 167 p = port; 168 } 169 SocketChannel c = null; 170 int i = 0; 171 int delay = 5; // 50 172 logger.debug("connecting to " + h); 173 while (c == null) { 174 try { 175 c = new SocketChannel(new Socket(h, p)); 176 } catch (IOException e) { 177 //System.out.println(e); 178 // wait server ready 179 i++; 180 if (i % 50 == 0) { 181 delay += delay; 182 logger.info("Server on " + h + ":" + p + " not ready in " + delay + "ms"); 183 } 184 try { 185 Thread.sleep(delay); 186 if (i % 50 == 0 && debug) { 187 throw new Exception("time wait, host = " + h + ", port = " + port); 188 } 189 } catch (InterruptedException w) { 190 Thread.currentThread().interrupt(); 191 throw new IOException("Interrupted during IO wait " + w); 192 } catch (Exception ee) { // debug only 193 ee.printStackTrace(); 194 } 195 } 196 } 197 logger.debug("connected, iter = " + i); 198 return c; 199 } 200 201 202 /** 203 * Run the servers accept() in an infinite loop. 204 */ 205 @Override 206 public void run() { 207 if (srv == null) { 208 return; // nothing to do 209 } 210 srvrun = true; 211 while (true) { 212 try { 213 logger.info("waiting for connection on " + srv); 214 Socket s = srv.accept(); 215 if (this.isInterrupted()) { 216 //System.out.println("ChannelFactory interrupted"); 217 srvrun = false; 218 if (s != null) { // by code-spotter 219 s.close(); 220 } 221 return; 222 } 223 //logger.debug("Socket = " +s); 224 logger.debug("connection accepted"); 225 SocketChannel c = new SocketChannel(s); 226 buf.put(c); 227 } catch (IOException e) { 228 //logger.debug("ChannelFactory IO terminating"); 229 srvrun = false; 230 return; 231 } catch (InterruptedException e) { 232 // unfug Thread.currentThread().interrupt(); 233 //logger.debug("ChannelFactory IE terminating"); 234 srvrun = false; 235 return; 236 } 237 } 238 } 239 240 241 /** 242 * Terminate the Channel Factory 243 */ 244 public void terminate() { 245 if ( ! srvstart ) { 246 logger.debug("server not started"); 247 return; 248 } 249 this.interrupt(); 250 try { 251 if (srv != null) { 252 srv.close(); 253 srvrun = false; 254 } 255 this.interrupt(); 256 while (!buf.isEmpty()) { 257 logger.debug("closing unused SocketChannel"); 258 //((SocketChannel)buf.get()).close(); 259 SocketChannel c = buf.poll(); 260 if ( c != null ) { 261 c.close(); 262 } 263 } 264 } catch (IOException e) { 265 //} catch (InterruptedException e) { 266 // Thread.currentThread().interrupt(); 267 } 268 try { 269 this.join(); 270 } catch (InterruptedException e) { 271 // unfug Thread.currentThread().interrupt(); 272 } 273 logger.debug("ChannelFactory terminated"); 274 } 275 276}