001/* 002 * $Id: ChannelFactory.java 4229 2012-10-03 17:19:36Z kredel $ 003 */ 004 005//package edu.unima.ky.parallel; 006package edu.jas.util; 007 008 009import java.io.IOException; 010 011import java.net.ServerSocket; 012import java.net.Socket; 013import java.net.BindException; 014 015import java.util.concurrent.BlockingQueue; 016import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue; 017 018import org.apache.log4j.Logger; 019 020 021/** 022 * ChannelFactory implements a symmetric and non blocking way of setting up 023 * sockets on the client and server side. The constructor sets up a ServerSocket 024 * and accepts and stores any Socket creation requests from clients. The created 025 * Sockets can the be retrieved from the store without blocking. Refactored for 026 * java.util.concurrent. 027 * @author Akitoshi Yoshida 028 * @author Heinz Kredel. 029 * @see SocketChannel 030 */ 031public class ChannelFactory extends Thread { 032 033 034 private static final Logger logger = Logger.getLogger(ChannelFactory.class); 035 036 037 private final boolean debug = logger.isDebugEnabled(); 038 039 040 /** 041 * default port of socket. 042 */ 043 public final static int DEFAULT_PORT = 4711; 044 045 046 /** 047 * port of socket. 048 */ 049 private final int port; 050 051 052 /** 053 * BoundedBuffer for sockets. 054 */ 055 private final BlockingQueue<SocketChannel> buf; 056 057 058 /** 059 * local server socket. 060 */ 061 private volatile ServerSocket srv; 062 063 064 /** 065 * is local server up and running. 066 */ 067 private volatile boolean srvrun = false; 068 069 070 /** 071 * is thread started. 072 */ 073 private volatile boolean srvstart = false; 074 075 076 /** 077 * Constructs a ChannelFactory on the DEFAULT_PORT. 078 */ 079 public ChannelFactory() { 080 this(DEFAULT_PORT); 081 } 082 083 084 /** 085 * Constructs a ChannelFactory. 086 * @param p port. 087 */ 088 public ChannelFactory(int p) { 089 buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/); 090 if (p <= 0) { 091 port = DEFAULT_PORT; 092 } else { 093 port = p; 094 } 095 try { 096 srv = new ServerSocket(port); 097 //this.start(); moved to init and getChannel 098 logger.info("server bound to port " + port); 099 } catch (BindException e) { 100 srv = null; 101 logger.warn("server not started, port used " + port); 102 if (debug) { 103 e.printStackTrace(); 104 } 105 } catch (IOException e) { 106 logger.debug("IOException " + e); 107 if (logger.isDebugEnabled()) { 108 e.printStackTrace(); 109 } 110 } 111 } 112 113 114 /** 115 * toString. 116 */ 117 @Override 118 public String toString() { 119 return "" + this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")"; 120 } 121 122 123 /** 124 * thread initialization and start. 125 */ 126 public void init() { 127 if (srv != null && ! srvstart ) { 128 this.start(); 129 srvstart = true; 130 logger.info("ChannelFactory at " + srv); 131 } 132 } 133 134 135 /** 136 * Get a new socket channel from a server socket. 137 */ 138 public SocketChannel getChannel() throws InterruptedException { 139 // return (SocketChannel)buf.get(); 140 if (srv == null) { 141 if (srvrun) { 142 throw new IllegalArgumentException("dont call when no server listens"); 143 } 144 } else if ( ! srvstart ) { 145 init(); 146 } 147 return buf.take(); 148 } 149 150 151 /** 152 * Get a new socket channel to a given host. 153 * @param h hostname 154 * @param p port 155 */ 156 public SocketChannel getChannel(String h, int p) throws IOException { 157 if (p <= 0) { 158 p = port; 159 } 160 SocketChannel c = null; 161 int i = 0; 162 int delay = 5; // 50 163 logger.debug("connecting to " + h); 164 while (c == null) { 165 try { 166 c = new SocketChannel(new Socket(h, p)); 167 } catch (IOException e) { 168 //System.out.println(e); 169 // wait server ready 170 i++; 171 if (i % 50 == 0) { 172 delay += delay; 173 logger.info("Server on " + h + " not ready in " + delay + "ms"); 174 } 175 System.out.println("Server on " + h + " not ready in " + delay + "ms"); 176 try { 177 Thread.sleep(delay); 178 if (i % 50 == 0 && debug) { 179 throw new Exception("time wait, host = " + h + ", port = " + port); 180 } 181 } catch (InterruptedException w) { 182 Thread.currentThread().interrupt(); 183 throw new IOException("Interrupted during IO wait " + w); 184 } catch (Exception ee) { // debug only 185 ee.printStackTrace(); 186 } 187 } 188 } 189 logger.debug("connected, iter = " + i); 190 return c; 191 } 192 193 194 /** 195 * Run the servers accept() in an infinite loop. 196 */ 197 @Override 198 public void run() { 199 if (srv == null) { 200 return; // nothing to do 201 } 202 srvrun = true; 203 while (true) { 204 try { 205 logger.info("waiting for connection"); 206 Socket s = srv.accept(); 207 if (this.isInterrupted()) { 208 //System.out.println("ChannelFactory interrupted"); 209 srvrun = false; 210 return; 211 } 212 //logger.debug("Socket = " +s); 213 logger.debug("connection accepted"); 214 SocketChannel c = new SocketChannel(s); 215 buf.put(c); 216 } catch (IOException e) { 217 //logger.debug("ChannelFactory IO terminating"); 218 srvrun = false; 219 return; 220 } catch (InterruptedException e) { 221 // unfug Thread.currentThread().interrupt(); 222 //logger.debug("ChannelFactory IE terminating"); 223 srvrun = false; 224 return; 225 } 226 } 227 } 228 229 230 /** 231 * Terminate the Channel Factory 232 */ 233 public void terminate() { 234 if ( ! srvstart ) { 235 logger.debug("server not started"); 236 return; 237 } 238 this.interrupt(); 239 try { 240 if (srv != null) { 241 srv.close(); 242 srvrun = false; 243 } 244 this.interrupt(); 245 while (!buf.isEmpty()) { 246 logger.debug("closing unused SocketChannel"); 247 //((SocketChannel)buf.get()).close(); 248 SocketChannel c = buf.poll(); 249 if ( c != null ) { 250 c.close(); 251 } 252 } 253 } catch (IOException e) { 254 //} catch (InterruptedException e) { 255 // Thread.currentThread().interrupt(); 256 } 257 try { 258 this.join(); 259 } catch (InterruptedException e) { 260 // unfug Thread.currentThread().interrupt(); 261 } 262 logger.debug("ChannelFactory terminated"); 263 } 264 265}