001/*
002 * $Id: DistHashTable.java 4638 2013-09-13 19:14:05Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.AbstractMap;
010import java.util.ArrayList;
011import java.util.Collection;
012import java.util.Iterator;
013import java.util.List;
014import java.util.Set;
015import java.util.SortedMap;
016import java.util.TreeMap;
017//import java.util.concurrent.ConcurrentSkipListMap;
018
019import org.apache.log4j.Logger;
020
021
022/**
023 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to
024 * keep the sequence order of elements.
025 * @author Heinz Kredel
026 */
027
028public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{
029
030
031    private static final Logger logger = Logger.getLogger(DistHashTable.class);
032
033
034    private static boolean debug = logger.isDebugEnabled();
035
036
037    protected final SortedMap<K, V> theList;
038
039
040    protected final ChannelFactory cf;
041
042
043    protected SocketChannel channel = null;
044
045
046    protected DHTListener<K, V> listener = null;
047
048
049    /**
050     * Constructs a new DistHashTable.
051     * @param host name or IP of server host.
052     */
053    public DistHashTable(String host) {
054        this(host, DistHashTableServer.DEFAULT_PORT);
055    }
056
057
058    /**
059     * DistHashTable.
060     * @param host name or IP of server host.
061     * @param port on server.
062     */
063    public DistHashTable(String host, int port) {
064        this(new ChannelFactory(port + 1), host, port);
065    }
066
067
068    /**
069     * DistHashTable.
070     * @param cf ChannelFactory to use.
071     * @param host name or IP of server host.
072     * @param port on server.
073     */
074    public DistHashTable(ChannelFactory cf, String host, int port) {
075        this.cf = cf;
076        cf.init(); // why? see constructor
077        try {
078            channel = cf.getChannel(host, port);
079        } catch (IOException e) {
080            e.printStackTrace();
081            throw new RuntimeException(e);
082        }
083        if (debug) {
084            logger.debug("dl channel = " + channel);
085        }
086        //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6
087        theList = new TreeMap<K, V>();
088        listener = new DHTListener<K, V>(channel, theList);
089        // listener.start() is in initialize()
090    }
091
092
093    /**
094     * DistHashTable.
095     * @param sc SocketChannel to use.
096     */
097    public DistHashTable(SocketChannel sc) {
098        cf = null;
099        channel = sc;
100        //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6
101        theList = new TreeMap<K, V>();
102        listener = new DHTListener<K, V>(channel, theList);
103        // listener.start() is in initialize()
104    }
105
106
107    /**
108     * Hash code.
109     */
110    @Override
111    public int hashCode() {
112        return theList.hashCode();
113    }
114
115
116    /**
117     * Equals.
118     */
119    @Override
120    public boolean equals(Object o) {
121        return theList.equals(o);
122    }
123
124
125    /**
126     * Contains key.
127     */
128    @Override
129    public boolean containsKey(Object o) {
130        return theList.containsKey(o);
131    }
132
133
134    /**
135     * Contains value.
136     */
137    @Override
138    public boolean containsValue(Object o) {
139        return theList.containsValue(o);
140    }
141
142
143    /**
144     * Get the values as Collection.
145     */
146    @Override
147    public Collection<V> values() {
148        synchronized (theList) {
149            return new ArrayList<V>(theList.values());
150            //return theList.values();
151        }
152    }
153
154
155    /**
156     * Get the keys as set.
157     */
158    @Override
159    public Set<K> keySet() {
160        synchronized (theList) {
161            return theList.keySet();
162        }
163    }
164
165
166    /**
167     * Get the entries as Set.
168     */
169    @Override
170    public Set<Entry<K, V>> entrySet() {
171        synchronized (theList) {
172            return theList.entrySet();
173        }
174    }
175
176
177    /**
178     * Get the internal list, convert from Collection.
179     */
180    // To be fixed?, but is ok.
181    public List<V> getValueList() {
182        synchronized (theList) {
183            return new ArrayList<V>(theList.values());
184        }
185    }
186
187
188    /**
189     * Get the internal sorted map. For synchronization purpose in normalform.
190     */
191    public SortedMap<K, V> getList() {
192        return theList;
193    }
194
195
196    /**
197     * Size of the (local) list.
198     */
199    @Override
200    public int size() {
201        synchronized (theList) {
202            return theList.size();
203        }
204    }
205
206
207    /**
208     * Is the List empty?
209     */
210    @Override
211    public boolean isEmpty() {
212        synchronized (theList) {
213            return theList.isEmpty();
214        }
215    }
216
217
218    /**
219     * List key iterator.
220     */
221    public Iterator<K> iterator() {
222        synchronized (theList) {
223            return theList.keySet().iterator();
224        }
225    }
226
227
228    /**
229     * List value iterator.
230     */
231    public Iterator<V> valueIterator() {
232        synchronized (theList) {
233            return theList.values().iterator();
234        }
235    }
236
237
238    /**
239     * Put object to the distributed hash table. Blocks until the key value pair
240     * is send and received from the server.
241     * @param key
242     * @param value
243     */
244    public void putWait(K key, V value) {
245        //V o = 
246        put(key, value); // = send
247        // assume key does not change multiple times before test:
248        while (!value.equals(getWait(key))) {
249            //System.out.print("#");
250        }
251    }
252
253
254    /**
255     * Put object to the distributed hash table. Returns immediately after
256     * sending, does not block.
257     * @param key
258     * @param value
259     */
260    @Override
261    public V put(K key, V value) {
262        if (key == null || value == null) {
263            throw new NullPointerException("null keys or values not allowed");
264        }
265        try {
266            DHTTransport<K, V> tc = DHTTransport.<K, V> create(key, value);
267            channel.send(tc);
268            //System.out.println("send: "+tc+" @ "+listener);
269        } catch (IOException e) {
270            logger.info("send, exception " + e);
271            e.printStackTrace();
272        } catch (Exception e) {
273            logger.info("send, exception " + e);
274            e.printStackTrace();
275        }
276        return null;
277    }
278
279
280    /**
281     * Get value under key from DHT. Blocks until the object is send and
282     * received from the server (actually it blocks until some value under key
283     * is received).
284     * @param key
285     * @return the value stored under the key.
286     */
287    public V getWait(K key) {
288        V value = null;
289        try {
290            synchronized (theList) {
291                //value = theList.get(key);
292                value = get(key);
293                while (value == null) {
294                    //System.out.print("^");
295                    theList.wait(100);
296                    value = theList.get(key);
297                }
298            }
299        } catch (InterruptedException e) {
300            Thread.currentThread().interrupt();
301            e.printStackTrace();
302        }
303        return value;
304    }
305
306
307    /**
308     * Get value under key from DHT. If no value is jet available null is
309     * returned.
310     * @param key
311     * @return the value stored under the key.
312     */
313    @Override
314    public V get(Object key) {
315        synchronized (theList) {
316            return theList.get(key);
317        }
318    }
319
320
321    /**
322     * Clear the List. 
323     * Clearance request is distributed to all clients.
324     */
325    @Override
326    public void clear() {
327        synchronized (theList) {
328            theList.clear();
329        }
330        // done after 11 month: send clear message to others
331        try {
332            DHTTransport<K, V> tc = new DHTTransportClear<K, V>();
333            channel.send(tc);
334            //System.out.println("send: "+tc+" @ "+listener);
335        } catch (IOException e) {
336            logger.info("send, exception " + e);
337            e.printStackTrace();
338        } catch (Exception e) {
339            logger.info("send, exception " + e);
340            e.printStackTrace();
341        }
342    }
343
344
345    /**
346     * Initialize and start the list thread.
347     */
348    public void init() {
349        if (listener == null) {
350            return;
351        }
352        if (listener.isDone()) {
353            return;
354        }
355        if (debug) {
356            logger.debug("initialize " + listener);
357        }
358        synchronized (theList) {
359            listener.start();
360        }
361    }
362
363
364    /**
365     * Terminate the list thread.
366     */
367    public void terminate() {
368        if (cf != null) {
369            cf.terminate();
370        }
371        if (channel != null) {
372            channel.close();
373        }
374        //theList.clear();
375        if (listener == null) {
376            return;
377        }
378        if (debug) {
379            logger.debug("terminate " + listener);
380        }
381        listener.setDone();
382        try {
383            while (listener.isAlive()) {
384                //System.out.print("+");
385                listener.interrupt();
386                listener.join(100);
387            }
388        } catch (InterruptedException e) {
389            Thread.currentThread().interrupt();
390        }
391        listener = null;
392    }
393
394}
395
396
397/**
398 * Thread to comunicate with the list server.
399 */
400class DHTListener<K, V> extends Thread {
401
402
403    private static final Logger logger = Logger.getLogger(DHTListener.class);
404
405
406    private static boolean debug = logger.isDebugEnabled();
407
408
409    private final SocketChannel channel;
410
411
412    private final SortedMap<K, V> theList;
413
414
415    private boolean goon;
416
417
418    DHTListener(SocketChannel s, SortedMap<K, V> list) {
419        channel = s;
420        theList = list;
421        goon = true;
422    }
423
424
425    boolean isDone() {
426        return !goon;
427    }
428
429
430    void setDone() {
431        goon = false;
432    }
433
434
435    /**
436     * run.
437     */
438    @SuppressWarnings("unchecked")
439    @Override
440    public void run() {
441        logger.debug("running ");
442        Object o;
443        DHTTransport<K, V> tc;
444        //goon = true;
445        while (goon) {
446            tc = null;
447            o = null;
448            try {
449                o = channel.receive();
450                if (debug) {
451                    logger.debug("receive(" + o + ")");
452                }
453                if (this.isInterrupted()) {
454                    goon = false;
455                    break;
456                }
457                if (o == null) {
458                    goon = false;
459                    break;
460                }
461                if (o instanceof DHTTransportClear) {
462                    logger.debug("receive, clear");
463                    synchronized (theList) {
464                        theList.clear();
465                        theList.notifyAll();
466                    }
467                    continue;
468                }
469                if (o instanceof DHTTransport) {
470                    tc = (DHTTransport<K, V>) o;
471                    K key = tc.key();
472                    if (key != null) {
473                        logger.info("receive, put(key=" + key + ")");
474                        V val = tc.value();
475                        synchronized (theList) {
476                            theList.put(key, val);
477                            theList.notifyAll();
478                        }
479                    }
480                }
481            } catch (IOException e) {
482                goon = false;
483                logger.info("receive, IO exception " + e);
484                //e.printStackTrace();
485            } catch (ClassNotFoundException e) {
486                goon = false;
487                logger.info("receive, CNF exception " + e);
488                e.printStackTrace();
489            } catch (Exception e) {
490                goon = false;
491                logger.info("receive, exception " + e);
492                e.printStackTrace();
493            }
494        }
495    }
496
497}