001 /*
002 * $Id: DistHashTable.java 3279 2010-08-21 20:18:25Z kredel $
003 */
004
005 package edu.jas.util;
006
007
008 import java.io.IOException;
009 import java.util.AbstractMap;
010 import java.util.ArrayList;
011 import java.util.Collection;
012 import java.util.Iterator;
013 import java.util.List;
014 import java.util.Set;
015 import java.util.SortedMap;
016 import java.util.TreeMap;
017
018 import org.apache.log4j.Logger;
019
020
021 /**
022 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to
023 * keep the sequence order of elements.
024 * @author Heinz Kredel
025 */
026
027 public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{
028
029
030 private static final Logger logger = Logger.getLogger(DistHashTable.class);
031
032
033 protected final SortedMap<K, V> theList;
034
035
036 protected final ChannelFactory cf;
037
038
039 protected SocketChannel channel = null;
040
041
042 protected DHTListener<K, V> listener = null;
043
044
045 /**
046 * Constructs a new DistHashTable.
047 * @param host name or IP of server host.
048 */
049 public DistHashTable(String host) {
050 this(host, DistHashTableServer.DEFAULT_PORT);
051 }
052
053
054 /**
055 * DistHashTable.
056 * @param host name or IP of server host.
057 * @param port on server.
058 */
059 public DistHashTable(String host, int port) {
060 this(new ChannelFactory(port + 1), host, port);
061 }
062
063
064 /**
065 * DistHashTable.
066 * @param cf ChannelFactory to use.
067 * @param host name or IP of server host.
068 * @param port on server.
069 */
070 public DistHashTable(ChannelFactory cf, String host, int port) {
071 this.cf = cf;
072 cf.init();
073 try {
074 channel = cf.getChannel(host, port);
075 } catch (IOException e) {
076 e.printStackTrace();
077 }
078 if (logger.isDebugEnabled()) {
079 logger.debug("dl channel = " + channel);
080 }
081 theList = new TreeMap<K, V>();
082 listener = new DHTListener<K, V>(channel, theList);
083 synchronized (theList) {
084 listener.start();
085 }
086 }
087
088
089 /**
090 * DistHashTable.
091 * @param sc SocketChannel to use.
092 */
093 public DistHashTable(SocketChannel sc) {
094 cf = null;
095 channel = sc;
096 theList = new TreeMap<K, V>();
097 listener = new DHTListener<K, V>(channel, theList);
098 synchronized (theList) {
099 listener.start();
100 }
101 }
102
103
104 /**
105 * Hash code.
106 */
107 @Override
108 public int hashCode() {
109 return theList.hashCode();
110 }
111
112
113 /**
114 * Equals.
115 */
116 @Override
117 public boolean equals(Object o) {
118 return theList.equals(o);
119 }
120
121
122 /**
123 * Contains key.
124 */
125 @Override
126 public boolean containsKey(Object o) {
127 return theList.containsKey(o);
128 }
129
130
131 /**
132 * Contains value.
133 */
134 @Override
135 public boolean containsValue(Object o) {
136 return theList.containsValue(o);
137 }
138
139
140 /**
141 * Get the values as Collection.
142 */
143 @Override
144 public Collection<V> values() {
145 return theList.values();
146 }
147
148
149 /**
150 * Get the keys as set.
151 */
152 @Override
153 public Set<K> keySet() {
154 return theList.keySet();
155 }
156
157
158 /**
159 * Get the entries as Set.
160 */
161 @Override
162 public Set<Entry<K, V>> entrySet() {
163 return theList.entrySet();
164 }
165
166
167 /**
168 * Get the internal list, convert from Collection.
169 * @fix but is ok
170 */
171 public List<V> getValueList() {
172 synchronized (theList) {
173 return new ArrayList<V>(theList.values());
174 }
175 }
176
177
178 /**
179 * Get the internal sorted map. For synchronization purpose in normalform.
180 */
181 public SortedMap<K, V> getList() {
182 return theList;
183 }
184
185
186 /**
187 * Size of the (local) list.
188 */
189 @Override
190 public int size() {
191 synchronized (theList) {
192 return theList.size();
193 }
194 }
195
196
197 /**
198 * Is the List empty?
199 */
200 @Override
201 public boolean isEmpty() {
202 synchronized (theList) {
203 return theList.isEmpty();
204 }
205 }
206
207
208 /**
209 * List key iterator.
210 */
211 public Iterator<K> iterator() {
212 synchronized (theList) {
213 return theList.keySet().iterator();
214 }
215 }
216
217
218 /**
219 * List value iterator.
220 */
221 public Iterator<V> valueIterator() {
222 synchronized (theList) {
223 return theList.values().iterator();
224 }
225 }
226
227
228 /**
229 * Put object to the distributed hash table. Blocks until the key value pair
230 * is send and received from the server.
231 * @param key
232 * @param value
233 */
234 public void putWait(K key, V value) {
235 put(key, value); // = send
236 try {
237 synchronized (theList) {
238 while (!value.equals(theList.get(key))) {
239 //System.out.print("#");
240 theList.wait(100);
241 }
242 }
243 } catch (InterruptedException e) {
244 Thread.currentThread().interrupt();
245 e.printStackTrace();
246 }
247 }
248
249
250 /**
251 * Put object to the distributed hash table. Returns immediately after
252 * sending does not block.
253 * @param key
254 * @param value
255 */
256 @Override
257 public V put(K key, V value) {
258 if (key == null || value == null) {
259 throw new NullPointerException("null keys or values not allowed");
260 }
261 try {
262 DHTTransport<K,V> tc = DHTTransport.<K,V> create(key, value);
263 channel.send(tc);
264 //System.out.println("send: "+tc+" @ "+listener);
265 } catch (IOException e) {
266 logger.info("send, exception " + e);
267 e.printStackTrace();
268 } catch (Exception e) {
269 logger.info("send, exception " + e);
270 e.printStackTrace();
271 }
272 return null;
273 }
274
275
276 /**
277 * Get value under key from DHT. Blocks until the object is send and
278 * received from the server (actually it blocks until some value under key
279 * is received).
280 * @param key
281 * @return the value stored under the key.
282 */
283 public V getWait(K key) {
284 V value = null;
285 try {
286 synchronized (theList) {
287 //value = theList.get(key);
288 value = get(key);
289 while (value == null) {
290 //System.out.print("^");
291 theList.wait(100);
292 value = theList.get(key);
293 }
294 }
295 } catch (InterruptedException e) {
296 Thread.currentThread().interrupt();
297 e.printStackTrace();
298 }
299 return value;
300 }
301
302
303 /**
304 * Get value under key from DHT. If no value is jet available null is
305 * returned.
306 * @param key
307 * @return the value stored under the key.
308 */
309 @Override
310 public V get(Object key) {
311 synchronized (theList) {
312 return theList.get(key);
313 }
314 }
315
316
317 /**
318 * Clear the List. Caveat: must be called on all clients.
319 */
320 @Override
321 public void clear() {
322 // send clear message to others
323 synchronized (theList) {
324 theList.clear();
325 }
326 }
327
328
329 /**
330 * Terminate the list thread.
331 */
332 public void terminate() {
333 if (cf != null) {
334 cf.terminate();
335 }
336 if (channel != null) {
337 channel.close();
338 }
339 //theList.clear();
340 if (listener == null) {
341 return;
342 }
343 if (logger.isDebugEnabled()) {
344 logger.debug("terminate " + listener);
345 }
346 listener.setDone();
347 try {
348 while (listener.isAlive()) {
349 //System.out.print("+");
350 listener.interrupt();
351 listener.join(100);
352 }
353 } catch (InterruptedException e) {
354 Thread.currentThread().interrupt();
355 }
356 listener = null;
357 }
358
359 }
360
361
362 /**
363 * Thread to comunicate with the list server.
364 */
365
366 class DHTListener<K, V> extends Thread {
367
368
369 private static final Logger logger = Logger.getLogger(DHTListener.class);
370
371
372 private final SocketChannel channel;
373
374
375 private final SortedMap<K, V> theList;
376
377
378 private boolean goon;
379
380
381 DHTListener(SocketChannel s, SortedMap<K, V> list) {
382 channel = s;
383 theList = list;
384 }
385
386
387 void setDone() {
388 goon = false;
389 }
390
391
392 /**
393 * run.
394 */
395 @SuppressWarnings("unchecked")
396 @Override
397 public void run() {
398 Object o;
399 DHTTransport<K, V> tc;
400 goon = true;
401 while (goon) {
402 tc = null;
403 o = null;
404 try {
405 o = channel.receive();
406 if (logger.isDebugEnabled()) {
407 logger.debug("receive(" + o + ")");
408 }
409 if (this.isInterrupted()) {
410 goon = false;
411 break;
412 }
413 if (o == null) {
414 goon = false;
415 break;
416 }
417 if (o instanceof DHTTransport) {
418 tc = (DHTTransport<K, V>) o;
419 K key = tc.key();
420 if (key != null) {
421 logger.info("receive, put(key=" + key + ")");
422 V val = tc.value();
423 synchronized (theList) {
424 theList.put(key, val);
425 theList.notifyAll();
426 }
427 }
428 }
429 } catch (IOException e) {
430 goon = false;
431 logger.info("receive, IO exception " + e);
432 //e.printStackTrace();
433 } catch (ClassNotFoundException e) {
434 goon = false;
435 logger.info("receive, CNF exception " + e);
436 e.printStackTrace();
437 } catch (Exception e) {
438 goon = false;
439 logger.info("receive, exception " + e);
440 e.printStackTrace();
441 }
442 }
443 }
444
445 }