001/* 002 * $Id: ChannelFactory.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005//package edu.unima.ky.parallel; 006package edu.jas.util; 007 008 009import java.io.IOException; 010 011import java.net.ServerSocket; 012import java.net.Socket; 013import java.net.BindException; 014 015import java.util.concurrent.BlockingQueue; 016import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue; 017 018import org.apache.logging.log4j.Logger; 019import org.apache.logging.log4j.LogManager; 020 021 022/** 023 * ChannelFactory implements a symmetric and non blocking way of setting up 024 * sockets on the client and server side. The constructor sets up a ServerSocket 025 * and accepts and stores any Socket creation requests from clients. The created 026 * Sockets can the be retrieved from the store without blocking. Refactored for 027 * java.util.concurrent. 028 * @author Akitoshi Yoshida 029 * @author Heinz Kredel 030 * @see SocketChannel 031 */ 032public class ChannelFactory extends Thread { 033 034 035 private static final Logger logger = LogManager.getLogger(ChannelFactory.class); 036 037 038 private static final boolean debug = logger.isDebugEnabled(); 039 040 041 /** 042 * default port of socket. 043 */ 044 public final static int DEFAULT_PORT = 4711; 045 046 047 /** 048 * port of socket. 049 */ 050 private final int port; 051 052 053 /** 054 * BoundedBuffer for sockets. 055 */ 056 private final BlockingQueue<SocketChannel> buf; 057 058 059 /** 060 * local server socket. 061 */ 062 private volatile ServerSocket srv; 063 064 065 /** 066 * is local server up and running. 067 */ 068 private volatile boolean srvrun = false; 069 070 071 /** 072 * is thread started. 073 */ 074 private volatile boolean srvstart = false; 075 076 077 /** 078 * Constructs a ChannelFactory on the DEFAULT_PORT. 079 */ 080 public ChannelFactory() { 081 this(DEFAULT_PORT); 082 } 083 084 085 /** 086 * Constructs a ChannelFactory. 087 * @param p port. 088 */ 089 public ChannelFactory(int p) { 090 buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/); 091 if (p <= 0) { 092 port = DEFAULT_PORT; 093 } else { 094 port = p; 095 } 096 try { 097 srv = new ServerSocket(port); 098 //this.start(); moved to init and getChannel 099 logger.info("server bound to port " + port); 100 } catch (BindException e) { 101 srv = null; 102 logger.warn("server not started, port used " + port); 103 if (debug) { 104 e.printStackTrace(); 105 } 106 } catch (IOException e) { 107 logger.debug("IOException " + e); 108 if (logger.isDebugEnabled()) { 109 e.printStackTrace(); 110 } 111 } 112 } 113 114 115 /** 116 * toString. 117 */ 118 @Override 119 public String toString() { 120 return this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")"; 121 } 122 123 124 /** 125 * thread initialization and start. 126 */ 127 public void init() { 128 if (srv != null && ! srvstart ) { 129 this.start(); 130 srvstart = true; 131 logger.info("ChannelFactory at " + srv); 132 } 133 } 134 135 136 /** 137 * Get a new socket channel from a server socket. 138 */ 139 public SocketChannel getChannel() throws InterruptedException { 140 // return (SocketChannel)buf.get(); 141 if (srv == null) { 142 if (srvrun) { 143 throw new IllegalArgumentException("dont call when no server listens"); 144 } 145 } else if ( ! srvstart ) { 146 init(); 147 } 148 return buf.take(); 149 } 150 151 152 /** 153 * Get a new socket channel to a given host. 154 * @param h hostname 155 */ 156 public SocketChannel getChannel(String h) throws IOException { 157 return getChannel(h,DEFAULT_PORT); 158 } 159 160 161 /** 162 * Get a new socket channel to a given host. 163 * @param h hostname 164 * @param p port 165 */ 166 public SocketChannel getChannel(String h, int p) throws IOException { 167 if (p <= 0) { 168 p = port; 169 } 170 SocketChannel c = null; 171 int i = 0; 172 int delay = 5; // 50 173 logger.debug("connecting to " + h); 174 while (c == null) { 175 try { 176 c = new SocketChannel(new Socket(h, p)); 177 } catch (IOException e) { 178 //System.out.println(e); 179 // wait server ready 180 i++; 181 if (i % 50 == 0) { 182 delay += delay; 183 logger.info("Server on " + h + ":" + p + " not ready in " + delay + "ms"); 184 } 185 try { 186 Thread.sleep(delay); 187 if (i % 50 == 0 && debug) { 188 throw new Exception("time wait, host = " + h + ", port = " + port); 189 } 190 } catch (InterruptedException w) { 191 Thread.currentThread().interrupt(); 192 throw new IOException("Interrupted during IO wait " + w); 193 } catch (Exception ee) { // debug only 194 ee.printStackTrace(); 195 } 196 } 197 } 198 logger.debug("connected, iter = " + i); 199 return c; 200 } 201 202 203 /** 204 * Run the servers accept() in an infinite loop. 205 */ 206 @Override 207 public void run() { 208 if (srv == null) { 209 return; // nothing to do 210 } 211 srvrun = true; 212 while (true) { 213 try { 214 logger.info("waiting for connection on " + srv); 215 Socket s = srv.accept(); 216 if (this.isInterrupted()) { 217 //System.out.println("ChannelFactory interrupted"); 218 srvrun = false; 219 if (s != null) { // by code-spotter 220 s.close(); 221 } 222 return; 223 } 224 //logger.debug("Socket = " +s); 225 logger.debug("connection accepted"); 226 SocketChannel c = new SocketChannel(s); 227 buf.put(c); 228 } catch (IOException e) { 229 //logger.debug("ChannelFactory IO terminating"); 230 srvrun = false; 231 return; 232 } catch (InterruptedException e) { 233 // unfug Thread.currentThread().interrupt(); 234 //logger.debug("ChannelFactory IE terminating"); 235 srvrun = false; 236 return; 237 } 238 } 239 } 240 241 242 /** 243 * Terminate the Channel Factory 244 */ 245 public void terminate() { 246 if ( ! srvstart ) { 247 logger.debug("server not started"); 248 return; 249 } 250 this.interrupt(); 251 try { 252 if (srv != null) { 253 srv.close(); 254 srvrun = false; 255 } 256 this.interrupt(); 257 while (!buf.isEmpty()) { 258 logger.debug("closing unused SocketChannel"); 259 //((SocketChannel)buf.get()).close(); 260 SocketChannel c = buf.poll(); 261 if ( c != null ) { 262 c.close(); 263 } 264 } 265 } catch (IOException e) { 266 //} catch (InterruptedException e) { 267 // Thread.currentThread().interrupt(); 268 } 269 try { 270 this.join(); 271 } catch (InterruptedException e) { 272 // unfug Thread.currentThread().interrupt(); 273 } 274 logger.debug("ChannelFactory terminated"); 275 } 276 277}