001/* 002 * $Id: DistributedList.java 4096 2012-08-12 11:56:27Z kredel $ 003 */ 004 005package edu.jas.util; 006 007import java.io.IOException; 008import java.util.Iterator; 009//import java.util.Collection; 010import java.util.List; 011import java.util.ArrayList; 012import java.util.SortedMap; 013import java.util.TreeMap; 014 015import org.apache.log4j.Logger; 016 017//import edu.unima.ky.parallel.ChannelFactory; 018//import edu.unima.ky.parallel.SocketChannel; 019 020 021/** 022 * Distributed version of a List. 023 * Implemented with a SortedMap / TreeMap to keep the sequence order of elements. 024 * @author Heinz Kredel 025 */ 026 027public class DistributedList /* implements List not jet */ { 028 029 private static final Logger logger = Logger.getLogger(DistributedList.class); 030 031 protected final SortedMap<Counter,Object> theList; 032 protected final ChannelFactory cf; 033 protected SocketChannel channel = null; 034 protected Listener listener = null; 035 036 037 /** 038 * Constructor for DistributedList. 039 * @param host name or IP of server host. 040 */ 041 public DistributedList(String host) { 042 this(host,DistributedListServer.DEFAULT_PORT); 043 } 044 045 046 /** 047 * Constructor for DistributedList. 048 * @param host name or IP of server host. 049 * @param port of server. 050 */ 051 public DistributedList(String host,int port) { 052 this(new ChannelFactory(port+1),host,port); 053 } 054 055 056 /** 057 * Constructor for DistributedList. 058 * @param cf ChannelFactory to use. 059 * @param host name or IP of server host. 060 * @param port of server. 061 */ 062 public DistributedList(ChannelFactory cf,String host,int port) { 063 this.cf = cf; 064 cf.init(); 065 try { 066 channel = cf.getChannel(host,port); 067 } catch (IOException e) { 068 e.printStackTrace(); 069 } 070 logger.debug("dl channel = " + channel); 071 theList = new TreeMap<Counter,Object>(); 072 } 073 074 075 /** 076 * Constructor for DistributedList. 077 * @param sc SocketChannel to use. 078 */ 079 public DistributedList(SocketChannel sc) { 080 cf = null; 081 channel = sc; 082 theList = new TreeMap<Counter,Object>(); 083 } 084 085 086 /** 087 * List thread initialization and start. 088 */ 089 public void init() { 090 listener = new Listener(channel,theList); 091 listener.start(); 092 } 093 094 095 /** 096 * Terminate the list thread. 097 */ 098 public void terminate() { 099 if ( cf != null ) { 100 cf.terminate(); 101 logger.warn("terminating " + cf); 102 } 103 if ( channel != null ) { 104 channel.close(); 105 } 106 //theList.clear(); 107 if ( listener == null ) { 108 return; 109 } 110 logger.debug("terminate " + listener); 111 listener.setDone(); 112 try { 113 while ( listener.isAlive() ) { 114 listener.interrupt(); 115 listener.join(100); 116 } 117 } catch (InterruptedException u) { 118 Thread.currentThread().interrupt(); 119 } 120 listener = null; 121 } 122 123 124 /** 125 * Get the internal list, convert from Collection. 126 */ 127 public List<Object> getList() { 128 return new ArrayList<Object>( theList.values() ); 129 } 130 131 132 /** 133 * Size of the (local) list. 134 */ 135 public int size() { 136 return theList.size(); 137 } 138 139 140 /** 141 * Add object to the list and distribute to other lists. 142 * Blocks until the object is send and received from the server 143 * (actually it blocks until some object is received). 144 * @param o 145 */ 146 public synchronized void add(Object o) { 147 int sz1 = theList.size() + 1; 148 try { 149 channel.send(o); 150 //System.out.println("send: "+o+" @ "+listener); 151 } catch (IOException e) { 152 e.printStackTrace(); 153 } 154 try { 155 while ( theList.size() < sz1 ) { 156 this.wait(100); 157 } 158 } catch (InterruptedException e) { 159 Thread.currentThread().interrupt(); 160 e.printStackTrace(); 161 } 162 } 163 164 165 /** 166 * Clear the List. 167 * caveat: must be called on all clients. 168 */ 169 public synchronized void clear() { 170 theList.clear(); 171 } 172 173 174 /** 175 * Is the List empty? 176 */ 177 public boolean isEmpty() { 178 return theList.isEmpty(); 179 } 180 181 182 /** 183 * List iterator. 184 */ 185 public Iterator iterator() { 186 return theList.values().iterator(); 187 } 188 189} 190 191 192/** 193 * Thread to comunicate with the list server. 194 */ 195 196class Listener extends Thread { 197 198 private SocketChannel channel; 199 private SortedMap<Counter,Object> theList; 200 private boolean goon; 201 202 203 Listener(SocketChannel s, SortedMap<Counter,Object> list) { 204 channel = s; 205 theList = list; 206 } 207 208 209 void setDone() { 210 goon = false; 211 } 212 213 214 @Override 215 public void run() { 216 Counter n; 217 Object o; 218 goon = true; 219 while (goon) { 220 n = null; 221 o = null; 222 try { 223 n = (Counter) channel.receive(); 224 if ( this.isInterrupted() ) { 225 goon = false; 226 } else { 227 o = channel.receive(); 228 //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread()); 229 if ( this.isInterrupted() ) { 230 goon = false; 231 } 232 theList.put(n,o); 233 } 234 } catch (IOException e) { 235 goon = false; 236 } catch (ClassNotFoundException e) { 237 e.printStackTrace(); 238 goon = false; 239 } 240 } 241 } 242 243}