001 /*
002 * $Id: DistributedList.java 3279 2010-08-21 20:18:25Z kredel $
003 */
004
005 package edu.jas.util;
006
007 import java.io.IOException;
008 import java.util.Iterator;
009 //import java.util.Collection;
010 import java.util.List;
011 import java.util.ArrayList;
012 import java.util.SortedMap;
013 import java.util.TreeMap;
014
015 import org.apache.log4j.Logger;
016
017 //import edu.unima.ky.parallel.ChannelFactory;
018 //import edu.unima.ky.parallel.SocketChannel;
019
020
021 /**
022 * Distributed version of a List.
023 * Implemented with a SortedMap / TreeMap to keep the sequence order of elements.
024 * @author Heinz Kredel
025 */
026
027 public class DistributedList /* implements List not jet */ {
028
029 private static final Logger logger = Logger.getLogger(DistributedList.class);
030
031 protected final SortedMap<Counter,Object> theList;
032 protected final ChannelFactory cf;
033 protected SocketChannel channel = null;
034 protected Listener listener = null;
035
036
037 /**
038 * Constructor for DistributedList.
039 * @param host name or IP of server host.
040 */
041 public DistributedList(String host) {
042 this(host,DistributedListServer.DEFAULT_PORT);
043 }
044
045
046 /**
047 * Constructor for DistributedList.
048 * @param host name or IP of server host.
049 * @param port of server.
050 */
051 public DistributedList(String host,int port) {
052 this(new ChannelFactory(port+1),host,port);
053 }
054
055
056 /**
057 * Constructor for DistributedList.
058 * @param cf ChannelFactory to use.
059 * @param host name or IP of server host.
060 * @param port of server.
061 */
062 public DistributedList(ChannelFactory cf,String host,int port) {
063 this.cf = cf;
064 cf.init();
065 try {
066 channel = cf.getChannel(host,port);
067 } catch (IOException e) {
068 e.printStackTrace();
069 }
070 logger.debug("dl channel = " + channel);
071 theList = new TreeMap<Counter,Object>();
072 listener = new Listener(channel,theList);
073 listener.start();
074 }
075
076
077 /**
078 * Constructor for DistributedList.
079 * @param sc SocketChannel to use.
080 */
081 public DistributedList(SocketChannel sc) {
082 cf = null;
083 channel = sc;
084 theList = new TreeMap<Counter,Object>();
085 listener = new Listener(channel,theList);
086 listener.start();
087 }
088
089
090 /**
091 * Get the internal list, convert from Collection.
092 */
093 public List<Object> getList() {
094 return new ArrayList<Object>( theList.values() );
095 }
096
097
098 /**
099 * Size of the (local) list.
100 */
101 public int size() {
102 return theList.size();
103 }
104
105
106 /**
107 * Add object to the list and distribute to other lists.
108 * Blocks until the object is send and received from the server
109 * (actually it blocks until some object is received).
110 * @param o
111 */
112 public synchronized void add(Object o) {
113 int sz1 = theList.size() + 1;
114 try {
115 channel.send(o);
116 //System.out.println("send: "+o+" @ "+listener);
117 } catch (IOException e) {
118 e.printStackTrace();
119 }
120 try {
121 while ( theList.size() < sz1 ) {
122 this.wait(100);
123 }
124 } catch (InterruptedException e) {
125 Thread.currentThread().interrupt();
126 e.printStackTrace();
127 }
128 }
129
130
131 /**
132 * Terminate the list thread.
133 */
134 public void terminate() {
135 if ( cf != null ) {
136 cf.terminate();
137 }
138 if ( channel != null ) {
139 channel.close();
140 }
141 //theList.clear();
142 if ( listener == null ) {
143 return;
144 }
145 logger.debug("terminate " + listener);
146 listener.setDone();
147 try {
148 while ( listener.isAlive() ) {
149 listener.interrupt();
150 listener.join(100);
151 }
152 } catch (InterruptedException u) {
153 Thread.currentThread().interrupt();
154 }
155 listener = null;
156 }
157
158
159 /**
160 * Clear the List.
161 * caveat: must be called on all clients.
162 */
163 public synchronized void clear() {
164 theList.clear();
165 }
166
167
168 /**
169 * Is the List empty?
170 */
171 public boolean isEmpty() {
172 return theList.isEmpty();
173 }
174
175
176 /**
177 * List iterator.
178 */
179 public Iterator iterator() {
180 return theList.values().iterator();
181 }
182
183 }
184
185
186 /**
187 * Thread to comunicate with the list server.
188 */
189
190 class Listener extends Thread {
191
192 private SocketChannel channel;
193 private SortedMap<Counter,Object> theList;
194 private boolean goon;
195
196
197 Listener(SocketChannel s, SortedMap<Counter,Object> list) {
198 channel = s;
199 theList = list;
200 }
201
202
203 void setDone() {
204 goon = false;
205 }
206
207 @Override
208 public void run() {
209 Counter n;
210 Object o;
211 goon = true;
212 while (goon) {
213 n = null;
214 o = null;
215 try {
216 n = (Counter) channel.receive();
217 if ( this.isInterrupted() ) {
218 goon = false;
219 } else {
220 o = channel.receive();
221 //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread());
222 if ( this.isInterrupted() ) {
223 goon = false;
224 }
225 theList.put(n,o);
226 }
227 } catch (IOException e) {
228 goon = false;
229 } catch (ClassNotFoundException e) {
230 e.printStackTrace();
231 goon = false;
232 }
233 }
234 }
235
236 }