001 /* 002 * $Id: ChannelFactory.java,v 1.10 2004/01/21 22:59:20 kredel Exp $ 003 */ 004 005 //package edu.unima.ky.parallel; 006 package comm; 007 008 import util.Logger; 009 import thread.BoundedBuffer; 010 011 import java.io.IOException; 012 import java.net.ServerSocket; 013 import java.net.Socket; 014 015 //import org.apache.log4j.Logger; 016 017 /** 018 * ChannelFactory. 019 * A symmetric and non blocking way of setting up sockets on the 020 * client and server side. 021 * The constructor sets up a ServerSocket and accepts and stores any Socket 022 * creation requests from clients. The created Sockets can the be retrieved 023 * from the store without blocking. 024 * @author Akitoshi Yoshida 025 * @author Heinz Kredel. 026 * @see SocketChannel 027 */ 028 public class ChannelFactory extends Thread { 029 030 // private static final Logger logger = Logger.getLogger(ChannelFactory.class); 031 private static Logger logger = new Logger(); 032 033 034 /** 035 * default port of socket. 036 */ 037 public final static int DEFAULT_PORT = 4711; 038 039 /** 040 * port of socket. 041 */ 042 private int port; 043 044 /** 045 * BoundedBuffer for sockets. 046 */ 047 private BoundedBuffer buf = new BoundedBuffer(50); 048 049 /** 050 * local server socket. 051 */ 052 private ServerSocket srv = null; 053 054 /** 055 * Constructs a ChannelFactory. 056 * @param p port. 057 */ 058 public ChannelFactory(int p) { 059 if (p<=0) { port = DEFAULT_PORT; } 060 else { port = p; } 061 try { 062 srv = new ServerSocket(port); 063 this.start(); 064 logger.info("server started on port "+port); 065 } catch (IOException e) { 066 logger.debug("IOException "+e); 067 //e.printStackTrace(); 068 } 069 } 070 071 /** 072 * Constructs a ChannelFactory. 073 * on the DEFAULT_PORT. 074 */ 075 public ChannelFactory() { 076 this(DEFAULT_PORT); 077 } 078 079 /** 080 * Get a new socket channel from a server socket. 081 */ 082 public SocketChannel getChannel() 083 throws InterruptedException { 084 return (SocketChannel)buf.get(); 085 } 086 087 /** 088 * Get a new socket channel to a given host. 089 * @param h hostname. 090 * @param p port. 091 */ 092 public SocketChannel getChannel(String h, int p) 093 throws IOException { 094 if (p<=0) { p = port; } 095 SocketChannel c = null; 096 int i = 0; 097 int delay = 50; 098 logger.debug("connecting to "+h); 099 while (c == null) { 100 try { 101 c = new SocketChannel( new Socket(h, p) ); 102 } catch (IOException e) { 103 //System.out.println(e); 104 // wait server ready 105 i++; 106 if ( i % 50 == 0 ) { 107 delay += delay; 108 logger.info("Server on "+h+" not ready in " + delay +"ms" ); 109 } 110 try { 111 Thread.sleep(delay); 112 } catch (InterruptedException ignored) { 113 } 114 } 115 } 116 logger.debug("connected, iter = " + i); 117 return c; 118 } 119 120 public void run() { 121 while (true) { 122 try { 123 logger.info("waiting for connection"); 124 Socket s = srv.accept(); 125 if ( this.isInterrupted() ) { 126 //System.out.println("ChannelFactory interrupted"); 127 return; 128 } 129 //logger.debug("Socket = " +s); 130 logger.debug("connection accepted"); 131 SocketChannel c = new SocketChannel(s); 132 buf.put( (Object) c ); 133 } catch (IOException e) { 134 //logger.debug("ChannelFactory IO terminating"); 135 return; 136 } catch (InterruptedException e) { 137 //logger.debug("ChannelFactory IE terminating"); 138 return; 139 } 140 } 141 } 142 143 /** 144 * Terminate the Channel Factory. 145 */ 146 public void terminate() { 147 this.interrupt(); 148 try { 149 if ( srv != null ) { 150 srv.close(); 151 } 152 this.interrupt(); 153 while ( !buf.empty() ) { 154 logger.debug("closing unused SocketChannel"); 155 ((SocketChannel)buf.get()).close(); 156 } 157 } catch (IOException e) { 158 } catch (InterruptedException e) { 159 } 160 try { 161 this.join(); 162 } catch (InterruptedException e) { 163 } 164 logger.debug("ChannelFactory terminated"); 165 } 166 167 }