001/* 002 * $Id: DistHashTable.java 4638 2013-09-13 19:14:05Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.AbstractMap; 010import java.util.ArrayList; 011import java.util.Collection; 012import java.util.Iterator; 013import java.util.List; 014import java.util.Set; 015import java.util.SortedMap; 016import java.util.TreeMap; 017//import java.util.concurrent.ConcurrentSkipListMap; 018 019import org.apache.log4j.Logger; 020 021 022/** 023 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to 024 * keep the sequence order of elements. 025 * @author Heinz Kredel 026 */ 027 028public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{ 029 030 031 private static final Logger logger = Logger.getLogger(DistHashTable.class); 032 033 034 private static boolean debug = logger.isDebugEnabled(); 035 036 037 protected final SortedMap<K, V> theList; 038 039 040 protected final ChannelFactory cf; 041 042 043 protected SocketChannel channel = null; 044 045 046 protected DHTListener<K, V> listener = null; 047 048 049 /** 050 * Constructs a new DistHashTable. 051 * @param host name or IP of server host. 052 */ 053 public DistHashTable(String host) { 054 this(host, DistHashTableServer.DEFAULT_PORT); 055 } 056 057 058 /** 059 * DistHashTable. 060 * @param host name or IP of server host. 061 * @param port on server. 062 */ 063 public DistHashTable(String host, int port) { 064 this(new ChannelFactory(port + 1), host, port); 065 } 066 067 068 /** 069 * DistHashTable. 070 * @param cf ChannelFactory to use. 071 * @param host name or IP of server host. 072 * @param port on server. 073 */ 074 public DistHashTable(ChannelFactory cf, String host, int port) { 075 this.cf = cf; 076 cf.init(); // why? see constructor 077 try { 078 channel = cf.getChannel(host, port); 079 } catch (IOException e) { 080 e.printStackTrace(); 081 throw new RuntimeException(e); 082 } 083 if (debug) { 084 logger.debug("dl channel = " + channel); 085 } 086 //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6 087 theList = new TreeMap<K, V>(); 088 listener = new DHTListener<K, V>(channel, theList); 089 // listener.start() is in initialize() 090 } 091 092 093 /** 094 * DistHashTable. 095 * @param sc SocketChannel to use. 096 */ 097 public DistHashTable(SocketChannel sc) { 098 cf = null; 099 channel = sc; 100 //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6 101 theList = new TreeMap<K, V>(); 102 listener = new DHTListener<K, V>(channel, theList); 103 // listener.start() is in initialize() 104 } 105 106 107 /** 108 * Hash code. 109 */ 110 @Override 111 public int hashCode() { 112 return theList.hashCode(); 113 } 114 115 116 /** 117 * Equals. 118 */ 119 @Override 120 public boolean equals(Object o) { 121 return theList.equals(o); 122 } 123 124 125 /** 126 * Contains key. 127 */ 128 @Override 129 public boolean containsKey(Object o) { 130 return theList.containsKey(o); 131 } 132 133 134 /** 135 * Contains value. 136 */ 137 @Override 138 public boolean containsValue(Object o) { 139 return theList.containsValue(o); 140 } 141 142 143 /** 144 * Get the values as Collection. 145 */ 146 @Override 147 public Collection<V> values() { 148 synchronized (theList) { 149 return new ArrayList<V>(theList.values()); 150 //return theList.values(); 151 } 152 } 153 154 155 /** 156 * Get the keys as set. 157 */ 158 @Override 159 public Set<K> keySet() { 160 synchronized (theList) { 161 return theList.keySet(); 162 } 163 } 164 165 166 /** 167 * Get the entries as Set. 168 */ 169 @Override 170 public Set<Entry<K, V>> entrySet() { 171 synchronized (theList) { 172 return theList.entrySet(); 173 } 174 } 175 176 177 /** 178 * Get the internal list, convert from Collection. 179 */ 180 // To be fixed?, but is ok. 181 public List<V> getValueList() { 182 synchronized (theList) { 183 return new ArrayList<V>(theList.values()); 184 } 185 } 186 187 188 /** 189 * Get the internal sorted map. For synchronization purpose in normalform. 190 */ 191 public SortedMap<K, V> getList() { 192 return theList; 193 } 194 195 196 /** 197 * Size of the (local) list. 198 */ 199 @Override 200 public int size() { 201 synchronized (theList) { 202 return theList.size(); 203 } 204 } 205 206 207 /** 208 * Is the List empty? 209 */ 210 @Override 211 public boolean isEmpty() { 212 synchronized (theList) { 213 return theList.isEmpty(); 214 } 215 } 216 217 218 /** 219 * List key iterator. 220 */ 221 public Iterator<K> iterator() { 222 synchronized (theList) { 223 return theList.keySet().iterator(); 224 } 225 } 226 227 228 /** 229 * List value iterator. 230 */ 231 public Iterator<V> valueIterator() { 232 synchronized (theList) { 233 return theList.values().iterator(); 234 } 235 } 236 237 238 /** 239 * Put object to the distributed hash table. Blocks until the key value pair 240 * is send and received from the server. 241 * @param key 242 * @param value 243 */ 244 public void putWait(K key, V value) { 245 //V o = 246 put(key, value); // = send 247 // assume key does not change multiple times before test: 248 while (!value.equals(getWait(key))) { 249 //System.out.print("#"); 250 } 251 } 252 253 254 /** 255 * Put object to the distributed hash table. Returns immediately after 256 * sending, does not block. 257 * @param key 258 * @param value 259 */ 260 @Override 261 public V put(K key, V value) { 262 if (key == null || value == null) { 263 throw new NullPointerException("null keys or values not allowed"); 264 } 265 try { 266 DHTTransport<K, V> tc = DHTTransport.<K, V> create(key, value); 267 channel.send(tc); 268 //System.out.println("send: "+tc+" @ "+listener); 269 } catch (IOException e) { 270 logger.info("send, exception " + e); 271 e.printStackTrace(); 272 } catch (Exception e) { 273 logger.info("send, exception " + e); 274 e.printStackTrace(); 275 } 276 return null; 277 } 278 279 280 /** 281 * Get value under key from DHT. Blocks until the object is send and 282 * received from the server (actually it blocks until some value under key 283 * is received). 284 * @param key 285 * @return the value stored under the key. 286 */ 287 public V getWait(K key) { 288 V value = null; 289 try { 290 synchronized (theList) { 291 //value = theList.get(key); 292 value = get(key); 293 while (value == null) { 294 //System.out.print("^"); 295 theList.wait(100); 296 value = theList.get(key); 297 } 298 } 299 } catch (InterruptedException e) { 300 Thread.currentThread().interrupt(); 301 e.printStackTrace(); 302 } 303 return value; 304 } 305 306 307 /** 308 * Get value under key from DHT. If no value is jet available null is 309 * returned. 310 * @param key 311 * @return the value stored under the key. 312 */ 313 @Override 314 public V get(Object key) { 315 synchronized (theList) { 316 return theList.get(key); 317 } 318 } 319 320 321 /** 322 * Clear the List. 323 * Clearance request is distributed to all clients. 324 */ 325 @Override 326 public void clear() { 327 synchronized (theList) { 328 theList.clear(); 329 } 330 // done after 11 month: send clear message to others 331 try { 332 DHTTransport<K, V> tc = new DHTTransportClear<K, V>(); 333 channel.send(tc); 334 //System.out.println("send: "+tc+" @ "+listener); 335 } catch (IOException e) { 336 logger.info("send, exception " + e); 337 e.printStackTrace(); 338 } catch (Exception e) { 339 logger.info("send, exception " + e); 340 e.printStackTrace(); 341 } 342 } 343 344 345 /** 346 * Initialize and start the list thread. 347 */ 348 public void init() { 349 if (listener == null) { 350 return; 351 } 352 if (listener.isDone()) { 353 return; 354 } 355 if (debug) { 356 logger.debug("initialize " + listener); 357 } 358 synchronized (theList) { 359 listener.start(); 360 } 361 } 362 363 364 /** 365 * Terminate the list thread. 366 */ 367 public void terminate() { 368 if (cf != null) { 369 cf.terminate(); 370 } 371 if (channel != null) { 372 channel.close(); 373 } 374 //theList.clear(); 375 if (listener == null) { 376 return; 377 } 378 if (debug) { 379 logger.debug("terminate " + listener); 380 } 381 listener.setDone(); 382 try { 383 while (listener.isAlive()) { 384 //System.out.print("+"); 385 listener.interrupt(); 386 listener.join(100); 387 } 388 } catch (InterruptedException e) { 389 Thread.currentThread().interrupt(); 390 } 391 listener = null; 392 } 393 394} 395 396 397/** 398 * Thread to comunicate with the list server. 399 */ 400class DHTListener<K, V> extends Thread { 401 402 403 private static final Logger logger = Logger.getLogger(DHTListener.class); 404 405 406 private static boolean debug = logger.isDebugEnabled(); 407 408 409 private final SocketChannel channel; 410 411 412 private final SortedMap<K, V> theList; 413 414 415 private boolean goon; 416 417 418 DHTListener(SocketChannel s, SortedMap<K, V> list) { 419 channel = s; 420 theList = list; 421 goon = true; 422 } 423 424 425 boolean isDone() { 426 return !goon; 427 } 428 429 430 void setDone() { 431 goon = false; 432 } 433 434 435 /** 436 * run. 437 */ 438 @SuppressWarnings("unchecked") 439 @Override 440 public void run() { 441 logger.debug("running "); 442 Object o; 443 DHTTransport<K, V> tc; 444 //goon = true; 445 while (goon) { 446 tc = null; 447 o = null; 448 try { 449 o = channel.receive(); 450 if (debug) { 451 logger.debug("receive(" + o + ")"); 452 } 453 if (this.isInterrupted()) { 454 goon = false; 455 break; 456 } 457 if (o == null) { 458 goon = false; 459 break; 460 } 461 if (o instanceof DHTTransportClear) { 462 logger.debug("receive, clear"); 463 synchronized (theList) { 464 theList.clear(); 465 theList.notifyAll(); 466 } 467 continue; 468 } 469 if (o instanceof DHTTransport) { 470 tc = (DHTTransport<K, V>) o; 471 K key = tc.key(); 472 if (key != null) { 473 logger.info("receive, put(key=" + key + ")"); 474 V val = tc.value(); 475 synchronized (theList) { 476 theList.put(key, val); 477 theList.notifyAll(); 478 } 479 } 480 } 481 } catch (IOException e) { 482 goon = false; 483 logger.info("receive, IO exception " + e); 484 //e.printStackTrace(); 485 } catch (ClassNotFoundException e) { 486 goon = false; 487 logger.info("receive, CNF exception " + e); 488 e.printStackTrace(); 489 } catch (Exception e) { 490 goon = false; 491 logger.info("receive, exception " + e); 492 e.printStackTrace(); 493 } 494 } 495 } 496 497}