001/* 002 * $Id: DistributedList.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005package edu.jas.util; 006 007import java.io.IOException; 008import java.util.Iterator; 009//import java.util.Collection; 010import java.util.List; 011import java.util.ArrayList; 012import java.util.SortedMap; 013import java.util.TreeMap; 014 015import org.apache.logging.log4j.Logger; 016import org.apache.logging.log4j.LogManager; 017 018//import edu.unima.ky.parallel.ChannelFactory; 019//import edu.unima.ky.parallel.SocketChannel; 020 021 022/** 023 * Distributed version of a List. 024 * Implemented with a SortedMap / TreeMap to keep the sequence order of elements. 025 * @author Heinz Kredel 026 */ 027 028public class DistributedList /* implements List not jet */ { 029 030 private static final Logger logger = LogManager.getLogger(DistributedList.class); 031 032 033 protected final SortedMap<Counter,Object> theList; 034 035 protected final ChannelFactory cf; 036 037 protected SocketChannel channel = null; 038 039 protected Listener listener = null; 040 041 042 /** 043 * Constructor for DistributedList. 044 * @param host name or IP of server host. 045 */ 046 public DistributedList(String host) { 047 this(host,DistributedListServer.DEFAULT_PORT); 048 } 049 050 051 /** 052 * Constructor for DistributedList. 053 * @param host name or IP of server host. 054 * @param port of server. 055 */ 056 public DistributedList(String host,int port) { 057 this(new ChannelFactory(port+1),host,port); 058 } 059 060 061 /** 062 * Constructor for DistributedList. 063 * @param cf ChannelFactory to use. 064 * @param host name or IP of server host. 065 * @param port of server. 066 */ 067 public DistributedList(ChannelFactory cf,String host,int port) { 068 this.cf = cf; 069 cf.init(); 070 try { 071 channel = cf.getChannel(host,port); 072 } catch (IOException e) { 073 e.printStackTrace(); 074 } 075 logger.debug("dl channel = " + channel); 076 theList = new TreeMap<Counter,Object>(); 077 } 078 079 080 /** 081 * Constructor for DistributedList. 082 * @param sc SocketChannel to use. 083 */ 084 public DistributedList(SocketChannel sc) { 085 cf = null; 086 channel = sc; 087 theList = new TreeMap<Counter,Object>(); 088 } 089 090 091 /** 092 * List thread initialization and start. 093 */ 094 public void init() { 095 listener = new Listener(channel,theList); 096 listener.start(); 097 } 098 099 100 /** 101 * Terminate the list thread. 102 */ 103 public void terminate() { 104 if ( cf != null ) { 105 cf.terminate(); 106 //logger.warn("terminating " + cf); 107 } 108 if ( channel != null ) { 109 channel.close(); 110 } 111 //theList.clear(); 112 if ( listener == null ) { 113 return; 114 } 115 logger.debug("terminate " + listener); 116 listener.setDone(); 117 try { 118 while ( listener.isAlive() ) { 119 listener.interrupt(); 120 listener.join(100); 121 } 122 } catch (InterruptedException u) { 123 Thread.currentThread().interrupt(); 124 } 125 listener = null; 126 } 127 128 129 /** 130 * Get the internal list, convert from Collection. 131 */ 132 public List<Object> getList() { 133 return new ArrayList<Object>( theList.values() ); 134 } 135 136 137 /** 138 * Size of the (local) list. 139 */ 140 public int size() { 141 return theList.size(); 142 } 143 144 145 /** 146 * Add object to the list and distribute to other lists. 147 * Blocks until the object is send and received from the server 148 * (actually it blocks until some object is received). 149 * @param o 150 */ 151 public synchronized void add(Object o) { 152 int sz1 = theList.size() + 1; 153 try { 154 channel.send(o); 155 //System.out.println("send: "+o+" @ "+listener); 156 } catch (IOException e) { 157 e.printStackTrace(); 158 } 159 try { 160 while ( theList.size() < sz1 ) { 161 this.wait(100); 162 } 163 } catch (InterruptedException e) { 164 Thread.currentThread().interrupt(); 165 e.printStackTrace(); 166 } 167 } 168 169 170 /** 171 * Clear the List. 172 * caveat: must be called on all clients. 173 */ 174 public synchronized void clear() { 175 theList.clear(); 176 } 177 178 179 /** 180 * Is the List empty? 181 */ 182 public boolean isEmpty() { 183 return theList.isEmpty(); 184 } 185 186 187 /** 188 * List iterator. 189 */ 190 public Iterator iterator() { 191 return theList.values().iterator(); 192 } 193 194} 195 196 197/** 198 * Thread to comunicate with the list server. 199 */ 200 201class Listener extends Thread { 202 203 private SocketChannel channel; 204 205 private SortedMap<Counter,Object> theList; 206 207 private volatile boolean goon; 208 209 210 Listener(SocketChannel s, SortedMap<Counter,Object> list) { 211 channel = s; 212 theList = list; 213 } 214 215 216 void setDone() { 217 goon = false; 218 } 219 220 221 @Override 222 public void run() { 223 Counter n; 224 Object o; 225 goon = true; 226 while (goon) { 227 n = null; 228 o = null; 229 try { 230 n = (Counter) channel.receive(); 231 if ( this.isInterrupted() ) { 232 goon = false; 233 } else { 234 o = channel.receive(); 235 //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread()); 236 if ( this.isInterrupted() ) { 237 goon = false; 238 } 239 theList.put(n,o); 240 } 241 } catch (IOException e) { 242 goon = false; 243 } catch (ClassNotFoundException e) { 244 e.printStackTrace(); 245 goon = false; 246 } 247 } 248 } 249 250}