001/* 002 * $Id: DistHashTable.java 4224 2012-09-29 13:40:12Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.AbstractMap; 010import java.util.ArrayList; 011import java.util.Collection; 012import java.util.Iterator; 013import java.util.List; 014import java.util.Set; 015import java.util.SortedMap; 016import java.util.TreeMap; 017//import java.util.concurrent.ConcurrentSkipListMap; 018 019import org.apache.log4j.Logger; 020 021 022/** 023 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to 024 * keep the sequence order of elements. 025 * @author Heinz Kredel 026 */ 027 028public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{ 029 030 031 private static final Logger logger = Logger.getLogger(DistHashTable.class); 032 033 034 private static boolean debug = logger.isDebugEnabled(); 035 036 037 protected final SortedMap<K, V> theList; 038 039 040 protected final ChannelFactory cf; 041 042 043 protected SocketChannel channel = null; 044 045 046 protected DHTListener<K, V> listener = null; 047 048 049 /** 050 * Constructs a new DistHashTable. 051 * @param host name or IP of server host. 052 */ 053 public DistHashTable(String host) { 054 this(host, DistHashTableServer.DEFAULT_PORT); 055 } 056 057 058 /** 059 * DistHashTable. 060 * @param host name or IP of server host. 061 * @param port on server. 062 */ 063 public DistHashTable(String host, int port) { 064 this(new ChannelFactory(port + 1), host, port); 065 } 066 067 068 /** 069 * DistHashTable. 070 * @param cf ChannelFactory to use. 071 * @param host name or IP of server host. 072 * @param port on server. 073 */ 074 public DistHashTable(ChannelFactory cf, String host, int port) { 075 this.cf = cf; 076 cf.init(); // why? see constructor 077 try { 078 channel = cf.getChannel(host, port); 079 } catch (IOException e) { 080 e.printStackTrace(); 081 throw new RuntimeException(e); 082 } 083 if (debug) { 084 logger.debug("dl channel = " + channel); 085 } 086 //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6 087 theList = new TreeMap<K, V>(); 088 listener = new DHTListener<K, V>(channel, theList); 089 // listener.start() is in initialize() 090 } 091 092 093 /** 094 * DistHashTable. 095 * @param sc SocketChannel to use. 096 */ 097 public DistHashTable(SocketChannel sc) { 098 cf = null; 099 channel = sc; 100 //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6 101 theList = new TreeMap<K, V>(); 102 listener = new DHTListener<K, V>(channel, theList); 103 // listener.start() is in initialize() 104 } 105 106 107 /** 108 * Hash code. 109 */ 110 @Override 111 public int hashCode() { 112 return theList.hashCode(); 113 } 114 115 116 /** 117 * Equals. 118 */ 119 @Override 120 public boolean equals(Object o) { 121 return theList.equals(o); 122 } 123 124 125 /** 126 * Contains key. 127 */ 128 @Override 129 public boolean containsKey(Object o) { 130 return theList.containsKey(o); 131 } 132 133 134 /** 135 * Contains value. 136 */ 137 @Override 138 public boolean containsValue(Object o) { 139 return theList.containsValue(o); 140 } 141 142 143 /** 144 * Get the values as Collection. 145 */ 146 @Override 147 public Collection<V> values() { 148 synchronized (theList) { 149 return new ArrayList<V>(theList.values()); 150 //return theList.values(); 151 } 152 } 153 154 155 /** 156 * Get the keys as set. 157 */ 158 @Override 159 public Set<K> keySet() { 160 synchronized (theList) { 161 return theList.keySet(); 162 } 163 } 164 165 166 /** 167 * Get the entries as Set. 168 */ 169 @Override 170 public Set<Entry<K, V>> entrySet() { 171 synchronized (theList) { 172 return theList.entrySet(); 173 } 174 } 175 176 177 /** 178 * Get the internal list, convert from Collection. 179 * @fix but is ok 180 */ 181 public List<V> getValueList() { 182 synchronized (theList) { 183 return new ArrayList<V>(theList.values()); 184 } 185 } 186 187 188 /** 189 * Get the internal sorted map. For synchronization purpose in normalform. 190 */ 191 public SortedMap<K, V> getList() { 192 return theList; 193 } 194 195 196 /** 197 * Size of the (local) list. 198 */ 199 @Override 200 public int size() { 201 synchronized (theList) { 202 return theList.size(); 203 } 204 } 205 206 207 /** 208 * Is the List empty? 209 */ 210 @Override 211 public boolean isEmpty() { 212 synchronized (theList) { 213 return theList.isEmpty(); 214 } 215 } 216 217 218 /** 219 * List key iterator. 220 */ 221 public Iterator<K> iterator() { 222 synchronized (theList) { 223 return theList.keySet().iterator(); 224 } 225 } 226 227 228 /** 229 * List value iterator. 230 */ 231 public Iterator<V> valueIterator() { 232 synchronized (theList) { 233 return theList.values().iterator(); 234 } 235 } 236 237 238 /** 239 * Put object to the distributed hash table. Blocks until the key value pair 240 * is send and received from the server. 241 * @param key 242 * @param value 243 */ 244 public void putWait(K key, V value) { 245 put(key, value); // = send 246 // assume key does not change multiple times before test: 247 while (!value.equals(getWait(key))) { 248 //System.out.print("#"); 249 } 250 } 251 252 253 /** 254 * Put object to the distributed hash table. Returns immediately after 255 * sending does not block. 256 * @param key 257 * @param value 258 */ 259 @Override 260 public V put(K key, V value) { 261 if (key == null || value == null) { 262 throw new NullPointerException("null keys or values not allowed"); 263 } 264 try { 265 DHTTransport<K, V> tc = DHTTransport.<K, V> create(key, value); 266 channel.send(tc); 267 //System.out.println("send: "+tc+" @ "+listener); 268 } catch (IOException e) { 269 logger.info("send, exception " + e); 270 e.printStackTrace(); 271 } catch (Exception e) { 272 logger.info("send, exception " + e); 273 e.printStackTrace(); 274 } 275 return null; 276 } 277 278 279 /** 280 * Get value under key from DHT. Blocks until the object is send and 281 * received from the server (actually it blocks until some value under key 282 * is received). 283 * @param key 284 * @return the value stored under the key. 285 */ 286 public V getWait(K key) { 287 V value = null; 288 try { 289 synchronized (theList) { 290 //value = theList.get(key); 291 value = get(key); 292 while (value == null) { 293 //System.out.print("^"); 294 theList.wait(100); 295 value = theList.get(key); 296 } 297 } 298 } catch (InterruptedException e) { 299 Thread.currentThread().interrupt(); 300 e.printStackTrace(); 301 } 302 return value; 303 } 304 305 306 /** 307 * Get value under key from DHT. If no value is jet available null is 308 * returned. 309 * @param key 310 * @return the value stored under the key. 311 */ 312 @Override 313 public V get(Object key) { 314 synchronized (theList) { 315 return theList.get(key); 316 } 317 } 318 319 320 /** 321 * Clear the List. Caveat: must be called on all clients. 322 */ 323 @Override 324 public void clear() { 325 // send clear message to others 326 synchronized (theList) { 327 theList.clear(); 328 } 329 } 330 331 332 /** 333 * Initialize and start the list thread. 334 */ 335 public void init() { 336 if (listener == null) { 337 return; 338 } 339 if (listener.isDone()) { 340 return; 341 } 342 if (debug) { 343 logger.debug("initialize " + listener); 344 } 345 synchronized (theList) { 346 listener.start(); 347 } 348 } 349 350 351 /** 352 * Terminate the list thread. 353 */ 354 public void terminate() { 355 if (cf != null) { 356 cf.terminate(); 357 } 358 if (channel != null) { 359 channel.close(); 360 } 361 //theList.clear(); 362 if (listener == null) { 363 return; 364 } 365 if (debug) { 366 logger.debug("terminate " + listener); 367 } 368 listener.setDone(); 369 try { 370 while (listener.isAlive()) { 371 //System.out.print("+"); 372 listener.interrupt(); 373 listener.join(100); 374 } 375 } catch (InterruptedException e) { 376 Thread.currentThread().interrupt(); 377 } 378 listener = null; 379 } 380 381} 382 383 384/** 385 * Thread to comunicate with the list server. 386 */ 387 388class DHTListener<K, V> extends Thread { 389 390 391 private static final Logger logger = Logger.getLogger(DHTListener.class); 392 393 394 private static boolean debug = logger.isDebugEnabled(); 395 396 397 private final SocketChannel channel; 398 399 400 private final SortedMap<K, V> theList; 401 402 403 private boolean goon; 404 405 406 DHTListener(SocketChannel s, SortedMap<K, V> list) { 407 channel = s; 408 theList = list; 409 goon = true; 410 } 411 412 413 boolean isDone() { 414 return !goon; 415 } 416 417 418 void setDone() { 419 goon = false; 420 } 421 422 423 /** 424 * run. 425 */ 426 @SuppressWarnings("unchecked") 427 @Override 428 public void run() { 429 Object o; 430 DHTTransport<K, V> tc; 431 //goon = true; 432 while (goon) { 433 tc = null; 434 o = null; 435 try { 436 o = channel.receive(); 437 if (debug) { 438 logger.debug("receive(" + o + ")"); 439 } 440 if (this.isInterrupted()) { 441 goon = false; 442 break; 443 } 444 if (o == null) { 445 goon = false; 446 break; 447 } 448 if (o instanceof DHTTransport) { 449 tc = (DHTTransport<K, V>) o; 450 K key = tc.key(); 451 if (key != null) { 452 logger.info("receive, put(key=" + key + ")"); 453 V val = tc.value(); 454 synchronized (theList) { 455 theList.put(key, val); 456 theList.notifyAll(); 457 } 458 } 459 } 460 } catch (IOException e) { 461 goon = false; 462 logger.info("receive, IO exception " + e); 463 //e.printStackTrace(); 464 } catch (ClassNotFoundException e) { 465 goon = false; 466 logger.info("receive, CNF exception " + e); 467 e.printStackTrace(); 468 } catch (Exception e) { 469 goon = false; 470 logger.info("receive, exception " + e); 471 e.printStackTrace(); 472 } 473 } 474 } 475 476}