001 //package edu.unima.ky.parallel;
002
003 import java.io.*;
004 import java.net.*;
005
006 /**
007 * ChannelFactory.
008 * socket channel factory.
009 *
010 * Usage: ChannelFactory(port).
011 * @author Akitoshi Yoshida
012 * @author Heinz Kredel.
013 */
014 public class ChannelFactory extends Thread {
015
016 /**
017 * default port of socket.
018 */
019 public final static int DEFAULT_PORT = 4711;
020
021 /**
022 * port of socket.
023 */
024 private int port;
025
026 /**
027 * BoundedBuffer for sockets.
028 */
029 private BoundedBuffer buf = new BoundedBuffer(10);
030
031 /**
032 * local server socket.
033 */
034 private ServerSocket srv = null;
035
036 /**
037 * Constructs a ChannelFactory.
038 * @param p port.
039 */
040 public ChannelFactory(int p) {
041 if (p<=0) { port = DEFAULT_PORT; }
042 else { port = p; }
043 try {
044 srv = new ServerSocket(port);
045 this.start();
046 System.out.println("server started on port "+port);
047 } catch (IOException e) { System.out.println(e); }
048 }
049
050 /**
051 * Constructs a ChannelFactory.
052 * on the DEFAULT_PORT.
053 */
054 public ChannelFactory() {
055 this(DEFAULT_PORT);
056 }
057
058 /**
059 * Get a new socket channel from a server socket.
060 */
061 public SocketChannel getChannel()
062 throws IOException {
063 return (SocketChannel)buf.get();
064 }
065
066 /**
067 * Get a new socket channel to a given host.
068 * @param h hostname.
069 * @param p port.
070 */
071 public SocketChannel getChannel(String h, int p)
072 throws IOException {
073 if (p<=0) { p = port; }
074 SocketChannel c = null;
075 System.out.println("connecting to "+h);
076 while (c == null) {
077 try { c = new SocketChannel( new Socket(h, p) );
078 } catch (IOException e) {
079 System.out.println(e);
080 // wait server ready
081 System.out.println("Server on "+h+" not ready");
082 try {
083 Thread.currentThread().sleep((int)(5000));
084 } catch (InterruptedException e1) { }
085 }
086 }
087 System.out.println("connected");
088 return c;
089 }
090
091 public void run() {
092 while (true) {
093 try {
094 System.out.println("waiting for connection");
095 Socket s = srv.accept();
096 System.out.println("connection accepted");
097 SocketChannel c = new SocketChannel(s);
098 buf.put( (Object) c );
099 } catch (IOException e) {
100 System.out.println("ChannelFactory terminating");
101 return;
102 }
103 }
104 }
105
106 /**
107 * Terminate the Channel Factory.
108 */
109 public void terminate() {
110 this.interrupt();
111 try {
112 srv.close();
113 while (!buf.empty()) {
114 System.out.println("closing unused SocketChannel");
115 ((SocketChannel)buf.get()).close();
116 }
117 } catch (IOException e) { }
118 }
119
120 }
121