001    /*
002     * $Id: ChannelFactory.java,v 1.10 2004/01/21 22:59:20 kredel Exp $
003     */
004    
005    //package edu.unima.ky.parallel;
006    package comm;
007    
008    import util.Logger;
009    import thread.BoundedBuffer;
010    
011    import java.io.IOException;
012    import java.net.ServerSocket;
013    import java.net.Socket;
014    
015    //import org.apache.log4j.Logger;
016    
017    /**
018     * ChannelFactory.
019     * A symmetric and non blocking way of setting up sockets on the 
020     * client and server side.
021     * The constructor sets up a ServerSocket and accepts and stores any Socket 
022     * creation requests from clients. The created Sockets can the be retrieved 
023     * from the store without blocking.
024     * @author Akitoshi Yoshida
025     * @author Heinz Kredel.
026     * @see SocketChannel
027     */
028    public class ChannelFactory extends Thread {
029    
030       // private static final Logger logger = Logger.getLogger(ChannelFactory.class);
031       private static Logger logger = new Logger();
032      
033    
034      /**
035       * default port of socket. 
036       */
037      public final static int DEFAULT_PORT = 4711;
038    
039      /**
040       * port of socket. 
041       */
042      private int port;
043    
044      /**
045       * BoundedBuffer for sockets.
046       */
047      private BoundedBuffer buf = new BoundedBuffer(50);
048    
049      /**
050       * local server socket.
051       */
052      private ServerSocket srv = null;
053    
054      /**
055       * Constructs a ChannelFactory. 
056       * @param p port.
057       */
058      public ChannelFactory(int p) {
059        if (p<=0) { port = DEFAULT_PORT; } 
060           else { port = p; }
061        try {
062            srv = new ServerSocket(port);
063            this.start();
064            logger.info("server started on port "+port);
065        } catch (IOException e) { 
066            logger.debug("IOException "+e);
067            //e.printStackTrace(); 
068        }
069      }
070    
071      /**
072       * Constructs a ChannelFactory.
073       * on the DEFAULT_PORT.
074       */
075      public ChannelFactory() {
076        this(DEFAULT_PORT); 
077      }
078    
079      /**
080       * Get a new socket channel from a server socket.
081       */
082      public SocketChannel getChannel() 
083             throws InterruptedException {
084        return (SocketChannel)buf.get();
085      }
086    
087      /**
088       * Get a new socket channel to a given host.
089       * @param h hostname.
090       * @param p port.
091       */
092      public SocketChannel getChannel(String h, int p) 
093             throws IOException {
094        if (p<=0) { p = port; } 
095        SocketChannel c = null;
096        int i = 0;
097        int delay = 50;
098        logger.debug("connecting to "+h);
099        while (c == null) {
100            try { 
101                c = new SocketChannel( new Socket(h, p) );
102            } catch (IOException e) {
103                //System.out.println(e);
104                // wait server ready
105                i++;
106                if ( i % 50 == 0 ) {
107                   delay += delay;
108                   logger.info("Server on "+h+" not ready in " + delay +"ms" );
109                }
110                try {
111                    Thread.sleep(delay);
112                } catch (InterruptedException ignored) { 
113                } 
114            }
115        }
116        logger.debug("connected, iter = " + i);
117        return c;
118      }
119    
120      public void run() {
121        while (true) {
122          try {
123              logger.info("waiting for connection");
124              Socket s = srv.accept();
125              if ( this.isInterrupted() ) {
126                  //System.out.println("ChannelFactory interrupted");
127                 return;
128              }
129              //logger.debug("Socket = " +s);
130              logger.debug("connection accepted");
131              SocketChannel c = new SocketChannel(s);
132              buf.put( (Object) c );
133          } catch (IOException e) {
134              //logger.debug("ChannelFactory IO terminating");
135            return;
136          } catch (InterruptedException e) {
137              //logger.debug("ChannelFactory IE terminating");
138            return;
139          }
140        }
141      }
142    
143      /**
144       * Terminate the Channel Factory.
145       */
146      public void terminate() {
147        this.interrupt();
148        try {
149            if ( srv != null ) {
150               srv.close();
151            }
152            this.interrupt();
153            while ( !buf.empty() ) { 
154                logger.debug("closing unused SocketChannel");
155                ((SocketChannel)buf.get()).close();
156            }
157        } catch (IOException e) { 
158        } catch (InterruptedException e) { 
159        }
160        try {
161            this.join();
162        } catch (InterruptedException e) { 
163        }
164        logger.debug("ChannelFactory terminated");
165      }
166    
167    }