001 /* 002 * $Id: DistHashTable.java 3279 2010-08-21 20:18:25Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 008 import java.io.IOException; 009 import java.util.AbstractMap; 010 import java.util.ArrayList; 011 import java.util.Collection; 012 import java.util.Iterator; 013 import java.util.List; 014 import java.util.Set; 015 import java.util.SortedMap; 016 import java.util.TreeMap; 017 018 import org.apache.log4j.Logger; 019 020 021 /** 022 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to 023 * keep the sequence order of elements. 024 * @author Heinz Kredel 025 */ 026 027 public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{ 028 029 030 private static final Logger logger = Logger.getLogger(DistHashTable.class); 031 032 033 protected final SortedMap<K, V> theList; 034 035 036 protected final ChannelFactory cf; 037 038 039 protected SocketChannel channel = null; 040 041 042 protected DHTListener<K, V> listener = null; 043 044 045 /** 046 * Constructs a new DistHashTable. 047 * @param host name or IP of server host. 048 */ 049 public DistHashTable(String host) { 050 this(host, DistHashTableServer.DEFAULT_PORT); 051 } 052 053 054 /** 055 * DistHashTable. 056 * @param host name or IP of server host. 057 * @param port on server. 058 */ 059 public DistHashTable(String host, int port) { 060 this(new ChannelFactory(port + 1), host, port); 061 } 062 063 064 /** 065 * DistHashTable. 066 * @param cf ChannelFactory to use. 067 * @param host name or IP of server host. 068 * @param port on server. 069 */ 070 public DistHashTable(ChannelFactory cf, String host, int port) { 071 this.cf = cf; 072 cf.init(); 073 try { 074 channel = cf.getChannel(host, port); 075 } catch (IOException e) { 076 e.printStackTrace(); 077 } 078 if (logger.isDebugEnabled()) { 079 logger.debug("dl channel = " + channel); 080 } 081 theList = new TreeMap<K, V>(); 082 listener = new DHTListener<K, V>(channel, theList); 083 synchronized (theList) { 084 listener.start(); 085 } 086 } 087 088 089 /** 090 * DistHashTable. 091 * @param sc SocketChannel to use. 092 */ 093 public DistHashTable(SocketChannel sc) { 094 cf = null; 095 channel = sc; 096 theList = new TreeMap<K, V>(); 097 listener = new DHTListener<K, V>(channel, theList); 098 synchronized (theList) { 099 listener.start(); 100 } 101 } 102 103 104 /** 105 * Hash code. 106 */ 107 @Override 108 public int hashCode() { 109 return theList.hashCode(); 110 } 111 112 113 /** 114 * Equals. 115 */ 116 @Override 117 public boolean equals(Object o) { 118 return theList.equals(o); 119 } 120 121 122 /** 123 * Contains key. 124 */ 125 @Override 126 public boolean containsKey(Object o) { 127 return theList.containsKey(o); 128 } 129 130 131 /** 132 * Contains value. 133 */ 134 @Override 135 public boolean containsValue(Object o) { 136 return theList.containsValue(o); 137 } 138 139 140 /** 141 * Get the values as Collection. 142 */ 143 @Override 144 public Collection<V> values() { 145 return theList.values(); 146 } 147 148 149 /** 150 * Get the keys as set. 151 */ 152 @Override 153 public Set<K> keySet() { 154 return theList.keySet(); 155 } 156 157 158 /** 159 * Get the entries as Set. 160 */ 161 @Override 162 public Set<Entry<K, V>> entrySet() { 163 return theList.entrySet(); 164 } 165 166 167 /** 168 * Get the internal list, convert from Collection. 169 * @fix but is ok 170 */ 171 public List<V> getValueList() { 172 synchronized (theList) { 173 return new ArrayList<V>(theList.values()); 174 } 175 } 176 177 178 /** 179 * Get the internal sorted map. For synchronization purpose in normalform. 180 */ 181 public SortedMap<K, V> getList() { 182 return theList; 183 } 184 185 186 /** 187 * Size of the (local) list. 188 */ 189 @Override 190 public int size() { 191 synchronized (theList) { 192 return theList.size(); 193 } 194 } 195 196 197 /** 198 * Is the List empty? 199 */ 200 @Override 201 public boolean isEmpty() { 202 synchronized (theList) { 203 return theList.isEmpty(); 204 } 205 } 206 207 208 /** 209 * List key iterator. 210 */ 211 public Iterator<K> iterator() { 212 synchronized (theList) { 213 return theList.keySet().iterator(); 214 } 215 } 216 217 218 /** 219 * List value iterator. 220 */ 221 public Iterator<V> valueIterator() { 222 synchronized (theList) { 223 return theList.values().iterator(); 224 } 225 } 226 227 228 /** 229 * Put object to the distributed hash table. Blocks until the key value pair 230 * is send and received from the server. 231 * @param key 232 * @param value 233 */ 234 public void putWait(K key, V value) { 235 put(key, value); // = send 236 try { 237 synchronized (theList) { 238 while (!value.equals(theList.get(key))) { 239 //System.out.print("#"); 240 theList.wait(100); 241 } 242 } 243 } catch (InterruptedException e) { 244 Thread.currentThread().interrupt(); 245 e.printStackTrace(); 246 } 247 } 248 249 250 /** 251 * Put object to the distributed hash table. Returns immediately after 252 * sending does not block. 253 * @param key 254 * @param value 255 */ 256 @Override 257 public V put(K key, V value) { 258 if (key == null || value == null) { 259 throw new NullPointerException("null keys or values not allowed"); 260 } 261 try { 262 DHTTransport<K,V> tc = DHTTransport.<K,V> create(key, value); 263 channel.send(tc); 264 //System.out.println("send: "+tc+" @ "+listener); 265 } catch (IOException e) { 266 logger.info("send, exception " + e); 267 e.printStackTrace(); 268 } catch (Exception e) { 269 logger.info("send, exception " + e); 270 e.printStackTrace(); 271 } 272 return null; 273 } 274 275 276 /** 277 * Get value under key from DHT. Blocks until the object is send and 278 * received from the server (actually it blocks until some value under key 279 * is received). 280 * @param key 281 * @return the value stored under the key. 282 */ 283 public V getWait(K key) { 284 V value = null; 285 try { 286 synchronized (theList) { 287 //value = theList.get(key); 288 value = get(key); 289 while (value == null) { 290 //System.out.print("^"); 291 theList.wait(100); 292 value = theList.get(key); 293 } 294 } 295 } catch (InterruptedException e) { 296 Thread.currentThread().interrupt(); 297 e.printStackTrace(); 298 } 299 return value; 300 } 301 302 303 /** 304 * Get value under key from DHT. If no value is jet available null is 305 * returned. 306 * @param key 307 * @return the value stored under the key. 308 */ 309 @Override 310 public V get(Object key) { 311 synchronized (theList) { 312 return theList.get(key); 313 } 314 } 315 316 317 /** 318 * Clear the List. Caveat: must be called on all clients. 319 */ 320 @Override 321 public void clear() { 322 // send clear message to others 323 synchronized (theList) { 324 theList.clear(); 325 } 326 } 327 328 329 /** 330 * Terminate the list thread. 331 */ 332 public void terminate() { 333 if (cf != null) { 334 cf.terminate(); 335 } 336 if (channel != null) { 337 channel.close(); 338 } 339 //theList.clear(); 340 if (listener == null) { 341 return; 342 } 343 if (logger.isDebugEnabled()) { 344 logger.debug("terminate " + listener); 345 } 346 listener.setDone(); 347 try { 348 while (listener.isAlive()) { 349 //System.out.print("+"); 350 listener.interrupt(); 351 listener.join(100); 352 } 353 } catch (InterruptedException e) { 354 Thread.currentThread().interrupt(); 355 } 356 listener = null; 357 } 358 359 } 360 361 362 /** 363 * Thread to comunicate with the list server. 364 */ 365 366 class DHTListener<K, V> extends Thread { 367 368 369 private static final Logger logger = Logger.getLogger(DHTListener.class); 370 371 372 private final SocketChannel channel; 373 374 375 private final SortedMap<K, V> theList; 376 377 378 private boolean goon; 379 380 381 DHTListener(SocketChannel s, SortedMap<K, V> list) { 382 channel = s; 383 theList = list; 384 } 385 386 387 void setDone() { 388 goon = false; 389 } 390 391 392 /** 393 * run. 394 */ 395 @SuppressWarnings("unchecked") 396 @Override 397 public void run() { 398 Object o; 399 DHTTransport<K, V> tc; 400 goon = true; 401 while (goon) { 402 tc = null; 403 o = null; 404 try { 405 o = channel.receive(); 406 if (logger.isDebugEnabled()) { 407 logger.debug("receive(" + o + ")"); 408 } 409 if (this.isInterrupted()) { 410 goon = false; 411 break; 412 } 413 if (o == null) { 414 goon = false; 415 break; 416 } 417 if (o instanceof DHTTransport) { 418 tc = (DHTTransport<K, V>) o; 419 K key = tc.key(); 420 if (key != null) { 421 logger.info("receive, put(key=" + key + ")"); 422 V val = tc.value(); 423 synchronized (theList) { 424 theList.put(key, val); 425 theList.notifyAll(); 426 } 427 } 428 } 429 } catch (IOException e) { 430 goon = false; 431 logger.info("receive, IO exception " + e); 432 //e.printStackTrace(); 433 } catch (ClassNotFoundException e) { 434 goon = false; 435 logger.info("receive, CNF exception " + e); 436 e.printStackTrace(); 437 } catch (Exception e) { 438 goon = false; 439 logger.info("receive, exception " + e); 440 e.printStackTrace(); 441 } 442 } 443 } 444 445 }