001/* 002 * $Id: DistHashTable.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.AbstractMap; 010import java.util.ArrayList; 011import java.util.Collection; 012import java.util.Iterator; 013import java.util.List; 014import java.util.Set; 015import java.util.SortedMap; 016import java.util.TreeMap; 017//import java.util.concurrent.ConcurrentSkipListMap; 018 019import org.apache.logging.log4j.Logger; 020import org.apache.logging.log4j.LogManager; 021 022 023/** 024 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to 025 * keep the sequence order of elements. 026 * @author Heinz Kredel 027 */ 028 029public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{ 030 031 032 private static final Logger logger = LogManager.getLogger(DistHashTable.class); 033 034 035 private static final boolean debug = logger.isDebugEnabled(); 036 037 038 protected final SortedMap<K, V> theList; 039 040 041 protected final ChannelFactory cf; 042 043 044 protected SocketChannel channel = null; 045 046 047 protected DHTListener<K, V> listener = null; 048 049 050 /** 051 * Constructs a new DistHashTable. 052 * @param host name or IP of server host. 053 */ 054 public DistHashTable(String host) { 055 this(host, DistHashTableServer.DEFAULT_PORT); 056 } 057 058 059 /** 060 * DistHashTable. 061 * @param host name or IP of server host. 062 * @param port on server. 063 */ 064 public DistHashTable(String host, int port) { 065 this(new ChannelFactory(port + 1), host, port); 066 } 067 068 069 /** 070 * DistHashTable. 071 * @param cf ChannelFactory to use. 072 * @param host name or IP of server host. 073 * @param port on server. 074 */ 075 public DistHashTable(ChannelFactory cf, String host, int port) { 076 this.cf = cf; 077 cf.init(); // why? see constructor 078 try { 079 channel = cf.getChannel(host, port); 080 } catch (IOException e) { 081 e.printStackTrace(); 082 throw new RuntimeException(e); 083 } 084 if (debug) { 085 logger.debug("dl channel = " + channel); 086 } 087 //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6 088 theList = new TreeMap<K, V>(); 089 listener = new DHTListener<K, V>(channel, theList); 090 // listener.start() is in initialize() 091 } 092 093 094 /** 095 * DistHashTable. 096 * @param sc SocketChannel to use. 097 */ 098 public DistHashTable(SocketChannel sc) { 099 cf = null; 100 channel = sc; 101 //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6 102 theList = new TreeMap<K, V>(); 103 listener = new DHTListener<K, V>(channel, theList); 104 // listener.start() is in initialize() 105 } 106 107 108 /** 109 * Hash code. 110 */ 111 @Override 112 public int hashCode() { 113 return theList.hashCode(); 114 } 115 116 117 /** 118 * Equals. 119 */ 120 @Override 121 public boolean equals(Object o) { 122 return theList.equals(o); 123 } 124 125 126 /** 127 * Contains key. 128 */ 129 @Override 130 public boolean containsKey(Object o) { 131 return theList.containsKey(o); 132 } 133 134 135 /** 136 * Contains value. 137 */ 138 @Override 139 public boolean containsValue(Object o) { 140 return theList.containsValue(o); 141 } 142 143 144 /** 145 * Get the values as Collection. 146 */ 147 @Override 148 public Collection<V> values() { 149 synchronized (theList) { 150 return new ArrayList<V>(theList.values()); 151 //return theList.values(); 152 } 153 } 154 155 156 /** 157 * Get the keys as set. 158 */ 159 @Override 160 public Set<K> keySet() { 161 synchronized (theList) { 162 return theList.keySet(); 163 } 164 } 165 166 167 /** 168 * Get the entries as Set. 169 */ 170 @Override 171 public Set<Entry<K, V>> entrySet() { 172 synchronized (theList) { 173 return theList.entrySet(); 174 } 175 } 176 177 178 /** 179 * Get the internal list, convert from Collection. 180 */ 181 // To be fixed?, but is ok. 182 public List<V> getValueList() { 183 synchronized (theList) { 184 return new ArrayList<V>(theList.values()); 185 } 186 } 187 188 189 /** 190 * Get the internal sorted map. For synchronization purpose in normalform. 191 */ 192 public SortedMap<K, V> getList() { 193 return theList; 194 } 195 196 197 /** 198 * Size of the (local) list. 199 */ 200 @Override 201 public int size() { 202 synchronized (theList) { 203 return theList.size(); 204 } 205 } 206 207 208 /** 209 * Is the List empty? 210 */ 211 @Override 212 public boolean isEmpty() { 213 synchronized (theList) { 214 return theList.isEmpty(); 215 } 216 } 217 218 219 /** 220 * List key iterator. 221 */ 222 public Iterator<K> iterator() { 223 synchronized (theList) { 224 return theList.keySet().iterator(); 225 } 226 } 227 228 229 /** 230 * List value iterator. 231 */ 232 public Iterator<V> valueIterator() { 233 synchronized (theList) { 234 return theList.values().iterator(); 235 } 236 } 237 238 239 /** 240 * Put object to the distributed hash table. Blocks until the key value pair 241 * is send and received from the server. 242 * @param key 243 * @param value 244 */ 245 public void putWait(K key, V value) { 246 //V o = 247 put(key, value); // = send 248 // assume key does not change multiple times before test: 249 while (!value.equals(getWait(key))) { 250 //System.out.print("#"); 251 } 252 } 253 254 255 /** 256 * Put object to the distributed hash table. Returns immediately after 257 * sending, does not block. 258 * @param key 259 * @param value 260 */ 261 @Override 262 public V put(K key, V value) { 263 if (key == null || value == null) { 264 throw new NullPointerException("null keys or values not allowed"); 265 } 266 try { 267 DHTTransport<K, V> tc = DHTTransport.<K, V> create(key, value); 268 channel.send(tc); 269 //System.out.println("send: "+tc+" @ "+listener); 270 } catch (IOException e) { 271 logger.info("send, exception " + e); 272 e.printStackTrace(); 273 } catch (Exception e) { 274 logger.info("send, exception " + e); 275 e.printStackTrace(); 276 } 277 return null; 278 } 279 280 281 /** 282 * Get value under key from DHT. Blocks until the object is send and 283 * received from the server (actually it blocks until some value under key 284 * is received). 285 * @param key 286 * @return the value stored under the key. 287 */ 288 public V getWait(K key) { 289 V value = null; 290 try { 291 synchronized (theList) { 292 //value = theList.get(key); 293 value = get(key); 294 while (value == null) { 295 //System.out.print("^"); 296 theList.wait(100); 297 value = theList.get(key); 298 } 299 } 300 } catch (InterruptedException e) { 301 Thread.currentThread().interrupt(); 302 e.printStackTrace(); 303 } 304 return value; 305 } 306 307 308 /** 309 * Get value under key from DHT. If no value is jet available null is 310 * returned. 311 * @param key 312 * @return the value stored under the key. 313 */ 314 @Override 315 public V get(Object key) { 316 synchronized (theList) { 317 return theList.get(key); 318 } 319 } 320 321 322 /** 323 * Clear the List. 324 * Clearance request is distributed to all clients. 325 */ 326 @Override 327 public void clear() { 328 synchronized (theList) { 329 theList.clear(); 330 } 331 // done after 11 month: send clear message to others 332 try { 333 DHTTransport<K, V> tc = new DHTTransportClear<K, V>(); 334 channel.send(tc); 335 //System.out.println("send: "+tc+" @ "+listener); 336 } catch (IOException e) { 337 logger.info("send, exception " + e); 338 e.printStackTrace(); 339 } catch (Exception e) { 340 logger.info("send, exception " + e); 341 e.printStackTrace(); 342 } 343 } 344 345 346 /** 347 * Initialize and start the list thread. 348 */ 349 public void init() { 350 if (listener == null) { 351 return; 352 } 353 if (listener.isDone()) { 354 return; 355 } 356 if (debug) { 357 logger.debug("initialize " + listener); 358 } 359 synchronized (theList) { 360 listener.start(); 361 } 362 } 363 364 365 /** 366 * Terminate the list thread. 367 */ 368 public void terminate() { 369 if (cf != null) { 370 cf.terminate(); 371 } 372 if (channel != null) { 373 channel.close(); 374 } 375 //theList.clear(); 376 if (listener == null) { 377 return; 378 } 379 if (debug) { 380 logger.debug("terminate " + listener); 381 } 382 listener.setDone(); 383 try { 384 while (listener.isAlive()) { 385 //System.out.print("+"); 386 listener.interrupt(); 387 listener.join(100); 388 } 389 } catch (InterruptedException e) { 390 Thread.currentThread().interrupt(); 391 } 392 listener = null; 393 } 394 395} 396 397 398/** 399 * Thread to comunicate with the list server. 400 */ 401class DHTListener<K, V> extends Thread { 402 403 404 private static final Logger logger = LogManager.getLogger(DHTListener.class); 405 406 407 private static final boolean debug = logger.isDebugEnabled(); 408 409 410 private final SocketChannel channel; 411 412 413 private final SortedMap<K, V> theList; 414 415 416 private boolean goon; 417 418 419 DHTListener(SocketChannel s, SortedMap<K, V> list) { 420 channel = s; 421 theList = list; 422 goon = true; 423 } 424 425 426 boolean isDone() { 427 return !goon; 428 } 429 430 431 void setDone() { 432 goon = false; 433 } 434 435 436 /** 437 * run. 438 */ 439 @SuppressWarnings("unchecked") 440 @Override 441 public void run() { 442 logger.debug("running "); 443 Object o; 444 DHTTransport<K, V> tc; 445 //goon = true; 446 while (goon) { 447 tc = null; 448 o = null; 449 try { 450 o = channel.receive(); 451 if (debug) { 452 logger.debug("receive(" + o + ")"); 453 } 454 if (this.isInterrupted()) { 455 goon = false; 456 break; 457 } 458 if (o == null) { 459 goon = false; 460 break; 461 } 462 if (o instanceof DHTTransportClear) { 463 logger.debug("receive, clear"); 464 synchronized (theList) { 465 theList.clear(); 466 theList.notifyAll(); 467 } 468 continue; 469 } 470 if (o instanceof DHTTransport) { 471 tc = (DHTTransport<K, V>) o; 472 K key = tc.key(); 473 if (key != null) { 474 logger.info("receive, put(key=" + key + ")"); 475 V val = tc.value(); 476 synchronized (theList) { 477 theList.put(key, val); 478 theList.notifyAll(); 479 } 480 } 481 } 482 } catch (IOException e) { 483 goon = false; 484 logger.info("receive, IO exception " + e); 485 //e.printStackTrace(); 486 } catch (ClassNotFoundException e) { 487 goon = false; 488 logger.info("receive, CNF exception " + e); 489 e.printStackTrace(); 490 } catch (Exception e) { 491 goon = false; 492 logger.info("receive, exception " + e); 493 e.printStackTrace(); 494 } 495 } 496 } 497 498}