001 /* 002 * $Id: DistributedList.java 3279 2010-08-21 20:18:25Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 import java.io.IOException; 008 import java.util.Iterator; 009 //import java.util.Collection; 010 import java.util.List; 011 import java.util.ArrayList; 012 import java.util.SortedMap; 013 import java.util.TreeMap; 014 015 import org.apache.log4j.Logger; 016 017 //import edu.unima.ky.parallel.ChannelFactory; 018 //import edu.unima.ky.parallel.SocketChannel; 019 020 021 /** 022 * Distributed version of a List. 023 * Implemented with a SortedMap / TreeMap to keep the sequence order of elements. 024 * @author Heinz Kredel 025 */ 026 027 public class DistributedList /* implements List not jet */ { 028 029 private static final Logger logger = Logger.getLogger(DistributedList.class); 030 031 protected final SortedMap<Counter,Object> theList; 032 protected final ChannelFactory cf; 033 protected SocketChannel channel = null; 034 protected Listener listener = null; 035 036 037 /** 038 * Constructor for DistributedList. 039 * @param host name or IP of server host. 040 */ 041 public DistributedList(String host) { 042 this(host,DistributedListServer.DEFAULT_PORT); 043 } 044 045 046 /** 047 * Constructor for DistributedList. 048 * @param host name or IP of server host. 049 * @param port of server. 050 */ 051 public DistributedList(String host,int port) { 052 this(new ChannelFactory(port+1),host,port); 053 } 054 055 056 /** 057 * Constructor for DistributedList. 058 * @param cf ChannelFactory to use. 059 * @param host name or IP of server host. 060 * @param port of server. 061 */ 062 public DistributedList(ChannelFactory cf,String host,int port) { 063 this.cf = cf; 064 cf.init(); 065 try { 066 channel = cf.getChannel(host,port); 067 } catch (IOException e) { 068 e.printStackTrace(); 069 } 070 logger.debug("dl channel = " + channel); 071 theList = new TreeMap<Counter,Object>(); 072 listener = new Listener(channel,theList); 073 listener.start(); 074 } 075 076 077 /** 078 * Constructor for DistributedList. 079 * @param sc SocketChannel to use. 080 */ 081 public DistributedList(SocketChannel sc) { 082 cf = null; 083 channel = sc; 084 theList = new TreeMap<Counter,Object>(); 085 listener = new Listener(channel,theList); 086 listener.start(); 087 } 088 089 090 /** 091 * Get the internal list, convert from Collection. 092 */ 093 public List<Object> getList() { 094 return new ArrayList<Object>( theList.values() ); 095 } 096 097 098 /** 099 * Size of the (local) list. 100 */ 101 public int size() { 102 return theList.size(); 103 } 104 105 106 /** 107 * Add object to the list and distribute to other lists. 108 * Blocks until the object is send and received from the server 109 * (actually it blocks until some object is received). 110 * @param o 111 */ 112 public synchronized void add(Object o) { 113 int sz1 = theList.size() + 1; 114 try { 115 channel.send(o); 116 //System.out.println("send: "+o+" @ "+listener); 117 } catch (IOException e) { 118 e.printStackTrace(); 119 } 120 try { 121 while ( theList.size() < sz1 ) { 122 this.wait(100); 123 } 124 } catch (InterruptedException e) { 125 Thread.currentThread().interrupt(); 126 e.printStackTrace(); 127 } 128 } 129 130 131 /** 132 * Terminate the list thread. 133 */ 134 public void terminate() { 135 if ( cf != null ) { 136 cf.terminate(); 137 } 138 if ( channel != null ) { 139 channel.close(); 140 } 141 //theList.clear(); 142 if ( listener == null ) { 143 return; 144 } 145 logger.debug("terminate " + listener); 146 listener.setDone(); 147 try { 148 while ( listener.isAlive() ) { 149 listener.interrupt(); 150 listener.join(100); 151 } 152 } catch (InterruptedException u) { 153 Thread.currentThread().interrupt(); 154 } 155 listener = null; 156 } 157 158 159 /** 160 * Clear the List. 161 * caveat: must be called on all clients. 162 */ 163 public synchronized void clear() { 164 theList.clear(); 165 } 166 167 168 /** 169 * Is the List empty? 170 */ 171 public boolean isEmpty() { 172 return theList.isEmpty(); 173 } 174 175 176 /** 177 * List iterator. 178 */ 179 public Iterator iterator() { 180 return theList.values().iterator(); 181 } 182 183 } 184 185 186 /** 187 * Thread to comunicate with the list server. 188 */ 189 190 class Listener extends Thread { 191 192 private SocketChannel channel; 193 private SortedMap<Counter,Object> theList; 194 private boolean goon; 195 196 197 Listener(SocketChannel s, SortedMap<Counter,Object> list) { 198 channel = s; 199 theList = list; 200 } 201 202 203 void setDone() { 204 goon = false; 205 } 206 207 @Override 208 public void run() { 209 Counter n; 210 Object o; 211 goon = true; 212 while (goon) { 213 n = null; 214 o = null; 215 try { 216 n = (Counter) channel.receive(); 217 if ( this.isInterrupted() ) { 218 goon = false; 219 } else { 220 o = channel.receive(); 221 //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread()); 222 if ( this.isInterrupted() ) { 223 goon = false; 224 } 225 theList.put(n,o); 226 } 227 } catch (IOException e) { 228 goon = false; 229 } catch (ClassNotFoundException e) { 230 e.printStackTrace(); 231 goon = false; 232 } 233 } 234 } 235 236 }