001    //package edu.unima.ky.parallel;
002    
003    import java.io.*;
004    import java.net.*;
005    
006    /**
007     * ChannelFactory.
008     * socket channel factory.
009     *
010     * Usage: ChannelFactory(port).
011     * @author Akitoshi Yoshida
012     * @author Heinz Kredel.
013     */
014    public class ChannelFactory extends Thread {
015      
016    /**
017     * default port of socket. 
018     */
019      public final static int DEFAULT_PORT = 4711;
020    
021    /**
022     * port of socket. 
023     */
024      private int port;
025    
026    /**
027     * BoundedBuffer for sockets.
028     */
029      private BoundedBuffer buf = new BoundedBuffer(10);
030    
031    /**
032     * local server socket.
033     */
034      private ServerSocket srv = null;
035    
036    /**
037     * Constructs a ChannelFactory. 
038     * @param p port.
039     */
040      public ChannelFactory(int p) {
041        if (p<=0) { port = DEFAULT_PORT; } 
042           else { port = p; }
043        try {
044            srv = new ServerSocket(port);
045            this.start();
046            System.out.println("server started on port "+port);
047        } catch (IOException e) { System.out.println(e); }
048      }
049    
050    /**
051     * Constructs a ChannelFactory.
052     * on the DEFAULT_PORT.
053     */
054      public ChannelFactory() {
055        this(DEFAULT_PORT); 
056      }
057    
058    /**
059     * Get a new socket channel from a server socket.
060     */
061      public SocketChannel getChannel() 
062             throws IOException {
063        return (SocketChannel)buf.get();
064      }
065    
066    /**
067     * Get a new socket channel to a given host.
068     * @param h hostname.
069     * @param p port.
070     */
071      public SocketChannel getChannel(String h, int p) 
072             throws IOException {
073        if (p<=0) { p = port; } 
074        SocketChannel c = null;
075        System.out.println("connecting to "+h);
076        while (c == null) {
077            try { c = new SocketChannel( new Socket(h, p) );
078            } catch (IOException e) {
079              System.out.println(e);
080              // wait server ready
081              System.out.println("Server on "+h+" not ready");
082              try {
083                  Thread.currentThread().sleep((int)(5000));
084              } catch (InterruptedException e1) { } 
085            }
086        }
087        System.out.println("connected");
088        return c;
089      }
090    
091      public void run() {
092        while (true) {
093          try {
094              System.out.println("waiting for connection");
095              Socket s = srv.accept();
096              System.out.println("connection accepted");
097              SocketChannel c = new SocketChannel(s);
098              buf.put( (Object) c );
099          } catch (IOException e) {
100            System.out.println("ChannelFactory terminating");
101            return;
102          }
103        }
104      }
105    
106    /**
107     * Terminate the Channel Factory.
108     */
109      public void terminate() {
110        this.interrupt();
111        try {
112            srv.close();
113            while (!buf.empty()) { 
114                System.out.println("closing unused SocketChannel");
115                ((SocketChannel)buf.get()).close();
116            }
117        } catch (IOException e) { }
118      }
119    
120    }
121