001 /*
002 * $Id: ChannelFactory.java,v 1.10 2004/01/21 22:59:20 kredel Exp $
003 */
004
005 //package edu.unima.ky.parallel;
006 package comm;
007
008 import util.Logger;
009 import thread.BoundedBuffer;
010
011 import java.io.IOException;
012 import java.net.ServerSocket;
013 import java.net.Socket;
014
015 //import org.apache.log4j.Logger;
016
017 /**
018 * ChannelFactory.
019 * A symmetric and non blocking way of setting up sockets on the
020 * client and server side.
021 * The constructor sets up a ServerSocket and accepts and stores any Socket
022 * creation requests from clients. The created Sockets can the be retrieved
023 * from the store without blocking.
024 * @author Akitoshi Yoshida
025 * @author Heinz Kredel.
026 * @see SocketChannel
027 */
028 public class ChannelFactory extends Thread {
029
030 // private static final Logger logger = Logger.getLogger(ChannelFactory.class);
031 private static Logger logger = new Logger();
032
033
034 /**
035 * default port of socket.
036 */
037 public final static int DEFAULT_PORT = 4711;
038
039 /**
040 * port of socket.
041 */
042 private int port;
043
044 /**
045 * BoundedBuffer for sockets.
046 */
047 private BoundedBuffer buf = new BoundedBuffer(50);
048
049 /**
050 * local server socket.
051 */
052 private ServerSocket srv = null;
053
054 /**
055 * Constructs a ChannelFactory.
056 * @param p port.
057 */
058 public ChannelFactory(int p) {
059 if (p<=0) { port = DEFAULT_PORT; }
060 else { port = p; }
061 try {
062 srv = new ServerSocket(port);
063 this.start();
064 logger.info("server started on port "+port);
065 } catch (IOException e) {
066 logger.debug("IOException "+e);
067 //e.printStackTrace();
068 }
069 }
070
071 /**
072 * Constructs a ChannelFactory.
073 * on the DEFAULT_PORT.
074 */
075 public ChannelFactory() {
076 this(DEFAULT_PORT);
077 }
078
079 /**
080 * Get a new socket channel from a server socket.
081 */
082 public SocketChannel getChannel()
083 throws InterruptedException {
084 return (SocketChannel)buf.get();
085 }
086
087 /**
088 * Get a new socket channel to a given host.
089 * @param h hostname.
090 * @param p port.
091 */
092 public SocketChannel getChannel(String h, int p)
093 throws IOException {
094 if (p<=0) { p = port; }
095 SocketChannel c = null;
096 int i = 0;
097 int delay = 50;
098 logger.debug("connecting to "+h);
099 while (c == null) {
100 try {
101 c = new SocketChannel( new Socket(h, p) );
102 } catch (IOException e) {
103 //System.out.println(e);
104 // wait server ready
105 i++;
106 if ( i % 50 == 0 ) {
107 delay += delay;
108 logger.info("Server on "+h+" not ready in " + delay +"ms" );
109 }
110 try {
111 Thread.sleep(delay);
112 } catch (InterruptedException ignored) {
113 }
114 }
115 }
116 logger.debug("connected, iter = " + i);
117 return c;
118 }
119
120 public void run() {
121 while (true) {
122 try {
123 logger.info("waiting for connection");
124 Socket s = srv.accept();
125 if ( this.isInterrupted() ) {
126 //System.out.println("ChannelFactory interrupted");
127 return;
128 }
129 //logger.debug("Socket = " +s);
130 logger.debug("connection accepted");
131 SocketChannel c = new SocketChannel(s);
132 buf.put( (Object) c );
133 } catch (IOException e) {
134 //logger.debug("ChannelFactory IO terminating");
135 return;
136 } catch (InterruptedException e) {
137 //logger.debug("ChannelFactory IE terminating");
138 return;
139 }
140 }
141 }
142
143 /**
144 * Terminate the Channel Factory.
145 */
146 public void terminate() {
147 this.interrupt();
148 try {
149 if ( srv != null ) {
150 srv.close();
151 }
152 this.interrupt();
153 while ( !buf.empty() ) {
154 logger.debug("closing unused SocketChannel");
155 ((SocketChannel)buf.get()).close();
156 }
157 } catch (IOException e) {
158 } catch (InterruptedException e) {
159 }
160 try {
161 this.join();
162 } catch (InterruptedException e) {
163 }
164 logger.debug("ChannelFactory terminated");
165 }
166
167 }