001 /* 002 * $Id: ChannelFactory.java 3297 2010-08-26 19:09:03Z kredel $ 003 */ 004 005 //package edu.unima.ky.parallel; 006 package edu.jas.util; 007 008 009 import java.io.IOException; 010 011 import java.net.ServerSocket; 012 import java.net.Socket; 013 import java.net.BindException; 014 015 import java.util.concurrent.BlockingQueue; 016 import java.util.concurrent.LinkedBlockingQueue; //import java.util.concurrent.ArrayBlockingQueue; 017 018 import org.apache.log4j.Logger; 019 020 021 /** 022 * ChannelFactory implements a symmetric and non blocking way of setting up 023 * sockets on the client and server side. The constructor sets up a ServerSocket 024 * and accepts and stores any Socket creation requests from clients. The created 025 * Sockets can the be retrieved from the store without blocking. Refactored for 026 * java.util.concurrent. 027 * @author Akitoshi Yoshida 028 * @author Heinz Kredel. 029 * @see SocketChannel 030 */ 031 public class ChannelFactory extends Thread { 032 033 034 private static final Logger logger = Logger.getLogger(ChannelFactory.class); 035 036 037 /** 038 * default port of socket. 039 */ 040 public final static int DEFAULT_PORT = 4711; 041 042 043 /** 044 * port of socket. 045 */ 046 private final int port; 047 048 049 /** 050 * BoundedBuffer for sockets. 051 */ 052 //private BoundedBuffer buf = new BoundedBuffer(100); 053 private final BlockingQueue<SocketChannel> buf; 054 055 056 /** 057 * local server socket. 058 */ 059 private volatile ServerSocket srv; 060 061 062 /** 063 * is local server up and running. 064 */ 065 private volatile boolean srvrun = false; 066 067 068 /** 069 * is thread started. 070 */ 071 private volatile boolean srvstart = false; 072 073 074 /** 075 * Constructs a ChannelFactory on the DEFAULT_PORT. 076 */ 077 public ChannelFactory() { 078 this(DEFAULT_PORT); 079 } 080 081 082 /** 083 * Constructs a ChannelFactory. 084 * @param p port. 085 */ 086 public ChannelFactory(int p) { 087 // buf = new ArrayBlockingQueue<SocketChannel>(100); 088 buf = new LinkedBlockingQueue<SocketChannel>(/*infinite*/); 089 if (p <= 0) { 090 port = DEFAULT_PORT; 091 } else { 092 port = p; 093 } 094 try { 095 srv = new ServerSocket(port); 096 //this.start(); moved to init and getChannel 097 logger.info("server bound to port " + port); 098 } catch (BindException e) { 099 srv = null; 100 logger.warn("server not started, port used " + port); 101 } catch (IOException e) { 102 logger.debug("IOException " + e); 103 if (logger.isDebugEnabled()) { 104 e.printStackTrace(); 105 } 106 } 107 } 108 109 110 /** 111 * toString. 112 */ 113 @Override 114 public String toString() { 115 return "" + this.getClass().getSimpleName() + "(" + srv + ", buf = " + buf.size() + ")"; 116 } 117 118 119 /** 120 * thread initialization and start. 121 */ 122 public void init() { 123 if (srv != null && ! srvstart ) { 124 this.start(); 125 srvstart = true; 126 logger.info("ChannelFactory at " + srv); 127 } 128 } 129 130 131 /** 132 * Get a new socket channel from a server socket. 133 */ 134 public SocketChannel getChannel() throws InterruptedException { 135 // return (SocketChannel)buf.get(); 136 if (srv == null) { 137 if (srvrun) { 138 throw new IllegalArgumentException("dont call when no server listens"); 139 } 140 } else if ( ! srvstart ) { 141 init(); 142 } 143 return buf.take(); 144 } 145 146 147 /** 148 * Get a new socket channel to a given host. 149 * @param h hostname 150 * @param p port 151 */ 152 public SocketChannel getChannel(String h, int p) throws IOException { 153 if (p <= 0) { 154 p = port; 155 } 156 SocketChannel c = null; 157 int i = 0; 158 int delay = 5; // 50 159 logger.debug("connecting to " + h); 160 while (c == null) { 161 try { 162 c = new SocketChannel(new Socket(h, p)); 163 } catch (IOException e) { 164 //System.out.println(e); 165 // wait server ready 166 i++; 167 if (i % 50 == 0) { 168 delay += delay; 169 logger.info("Server on " + h + " not ready in " + delay + "ms"); 170 } 171 System.out.println("Server on " + h + " not ready in " + delay + "ms"); 172 try { 173 Thread.sleep(delay); 174 } catch (InterruptedException w) { 175 Thread.currentThread().interrupt(); 176 throw new IOException("Interrupted during IO wait " + w); 177 } 178 } 179 } 180 logger.debug("connected, iter = " + i); 181 return c; 182 } 183 184 185 /** 186 * Run the servers accept() in an infinite loop. 187 */ 188 @Override 189 public void run() { 190 if (srv == null) { 191 return; // nothing to do 192 } 193 srvrun = true; 194 while (true) { 195 try { 196 logger.info("waiting for connection"); 197 Socket s = srv.accept(); 198 if (this.isInterrupted()) { 199 //System.out.println("ChannelFactory interrupted"); 200 srvrun = false; 201 return; 202 } 203 //logger.debug("Socket = " +s); 204 logger.debug("connection accepted"); 205 SocketChannel c = new SocketChannel(s); 206 buf.put(c); 207 } catch (IOException e) { 208 //logger.debug("ChannelFactory IO terminating"); 209 srvrun = false; 210 return; 211 } catch (InterruptedException e) { 212 // unfug Thread.currentThread().interrupt(); 213 //logger.debug("ChannelFactory IE terminating"); 214 srvrun = false; 215 return; 216 } 217 } 218 } 219 220 221 /** 222 * Terminate the Channel Factory 223 */ 224 public void terminate() { 225 if ( ! srvstart ) { 226 logger.debug("server not started"); 227 return; 228 } 229 this.interrupt(); 230 try { 231 if (srv != null) { 232 srv.close(); 233 srvrun = false; 234 } 235 this.interrupt(); 236 while (!buf.isEmpty()) { 237 logger.debug("closing unused SocketChannel"); 238 //((SocketChannel)buf.get()).close(); 239 SocketChannel c = buf.poll(); 240 if ( c != null ) { 241 c.close(); 242 } 243 } 244 } catch (IOException e) { 245 //} catch (InterruptedException e) { 246 // Thread.currentThread().interrupt(); 247 } 248 try { 249 this.join(); 250 } catch (InterruptedException e) { 251 // unfug Thread.currentThread().interrupt(); 252 } 253 logger.debug("ChannelFactory terminated"); 254 } 255 256 }