001/*
002 * $Id: DistHashTable.java 4224 2012-09-29 13:40:12Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.AbstractMap;
010import java.util.ArrayList;
011import java.util.Collection;
012import java.util.Iterator;
013import java.util.List;
014import java.util.Set;
015import java.util.SortedMap;
016import java.util.TreeMap;
017//import java.util.concurrent.ConcurrentSkipListMap;
018
019import org.apache.log4j.Logger;
020
021
022/**
023 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to
024 * keep the sequence order of elements.
025 * @author Heinz Kredel
026 */
027
028public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{
029
030
031    private static final Logger logger = Logger.getLogger(DistHashTable.class);
032
033
034    private static boolean debug = logger.isDebugEnabled();
035
036
037    protected final SortedMap<K, V> theList;
038
039
040    protected final ChannelFactory cf;
041
042
043    protected SocketChannel channel = null;
044
045
046    protected DHTListener<K, V> listener = null;
047
048
049    /**
050     * Constructs a new DistHashTable.
051     * @param host name or IP of server host.
052     */
053    public DistHashTable(String host) {
054        this(host, DistHashTableServer.DEFAULT_PORT);
055    }
056
057
058    /**
059     * DistHashTable.
060     * @param host name or IP of server host.
061     * @param port on server.
062     */
063    public DistHashTable(String host, int port) {
064        this(new ChannelFactory(port + 1), host, port);
065    }
066
067
068    /**
069     * DistHashTable.
070     * @param cf ChannelFactory to use.
071     * @param host name or IP of server host.
072     * @param port on server.
073     */
074    public DistHashTable(ChannelFactory cf, String host, int port) {
075        this.cf = cf;
076        cf.init(); // why? see constructor
077        try {
078            channel = cf.getChannel(host, port);
079        } catch (IOException e) {
080            e.printStackTrace();
081            throw new RuntimeException(e);
082        }
083        if (debug) {
084            logger.debug("dl channel = " + channel);
085        }
086        //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6
087        theList = new TreeMap<K, V>();
088        listener = new DHTListener<K, V>(channel, theList);
089        // listener.start() is in initialize()
090    }
091
092
093    /**
094     * DistHashTable.
095     * @param sc SocketChannel to use.
096     */
097    public DistHashTable(SocketChannel sc) {
098        cf = null;
099        channel = sc;
100        //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6
101        theList = new TreeMap<K, V>();
102        listener = new DHTListener<K, V>(channel, theList);
103        // listener.start() is in initialize()
104    }
105
106
107    /**
108     * Hash code.
109     */
110    @Override
111    public int hashCode() {
112        return theList.hashCode();
113    }
114
115
116    /**
117     * Equals.
118     */
119    @Override
120    public boolean equals(Object o) {
121        return theList.equals(o);
122    }
123
124
125    /**
126     * Contains key.
127     */
128    @Override
129    public boolean containsKey(Object o) {
130        return theList.containsKey(o);
131    }
132
133
134    /**
135     * Contains value.
136     */
137    @Override
138    public boolean containsValue(Object o) {
139        return theList.containsValue(o);
140    }
141
142
143    /**
144     * Get the values as Collection.
145     */
146    @Override
147    public Collection<V> values() {
148        synchronized (theList) {
149            return new ArrayList<V>(theList.values());
150            //return theList.values();
151        }
152    }
153
154
155    /**
156     * Get the keys as set.
157     */
158    @Override
159    public Set<K> keySet() {
160        synchronized (theList) {
161            return theList.keySet();
162        }
163    }
164
165
166    /**
167     * Get the entries as Set.
168     */
169    @Override
170    public Set<Entry<K, V>> entrySet() {
171        synchronized (theList) {
172            return theList.entrySet();
173        }
174    }
175
176
177    /**
178     * Get the internal list, convert from Collection.
179     * @fix but is ok
180     */
181    public List<V> getValueList() {
182        synchronized (theList) {
183            return new ArrayList<V>(theList.values());
184        }
185    }
186
187
188    /**
189     * Get the internal sorted map. For synchronization purpose in normalform.
190     */
191    public SortedMap<K, V> getList() {
192        return theList;
193    }
194
195
196    /**
197     * Size of the (local) list.
198     */
199    @Override
200    public int size() {
201        synchronized (theList) {
202            return theList.size();
203        }
204    }
205
206
207    /**
208     * Is the List empty?
209     */
210    @Override
211    public boolean isEmpty() {
212        synchronized (theList) {
213            return theList.isEmpty();
214        }
215    }
216
217
218    /**
219     * List key iterator.
220     */
221    public Iterator<K> iterator() {
222        synchronized (theList) {
223            return theList.keySet().iterator();
224        }
225    }
226
227
228    /**
229     * List value iterator.
230     */
231    public Iterator<V> valueIterator() {
232        synchronized (theList) {
233            return theList.values().iterator();
234        }
235    }
236
237
238    /**
239     * Put object to the distributed hash table. Blocks until the key value pair
240     * is send and received from the server.
241     * @param key
242     * @param value
243     */
244    public void putWait(K key, V value) {
245        put(key, value); // = send
246        // assume key does not change multiple times before test:
247        while (!value.equals(getWait(key))) {
248            //System.out.print("#");
249        }
250    }
251
252
253    /**
254     * Put object to the distributed hash table. Returns immediately after
255     * sending does not block.
256     * @param key
257     * @param value
258     */
259    @Override
260    public V put(K key, V value) {
261        if (key == null || value == null) {
262            throw new NullPointerException("null keys or values not allowed");
263        }
264        try {
265            DHTTransport<K, V> tc = DHTTransport.<K, V> create(key, value);
266            channel.send(tc);
267            //System.out.println("send: "+tc+" @ "+listener);
268        } catch (IOException e) {
269            logger.info("send, exception " + e);
270            e.printStackTrace();
271        } catch (Exception e) {
272            logger.info("send, exception " + e);
273            e.printStackTrace();
274        }
275        return null;
276    }
277
278
279    /**
280     * Get value under key from DHT. Blocks until the object is send and
281     * received from the server (actually it blocks until some value under key
282     * is received).
283     * @param key
284     * @return the value stored under the key.
285     */
286    public V getWait(K key) {
287        V value = null;
288        try {
289            synchronized (theList) {
290                //value = theList.get(key);
291                value = get(key);
292                while (value == null) {
293                    //System.out.print("^");
294                    theList.wait(100);
295                    value = theList.get(key);
296                }
297            }
298        } catch (InterruptedException e) {
299            Thread.currentThread().interrupt();
300            e.printStackTrace();
301        }
302        return value;
303    }
304
305
306    /**
307     * Get value under key from DHT. If no value is jet available null is
308     * returned.
309     * @param key
310     * @return the value stored under the key.
311     */
312    @Override
313    public V get(Object key) {
314        synchronized (theList) {
315            return theList.get(key);
316        }
317    }
318
319
320    /**
321     * Clear the List. Caveat: must be called on all clients.
322     */
323    @Override
324    public void clear() {
325        // send clear message to others
326        synchronized (theList) {
327            theList.clear();
328        }
329    }
330
331
332    /**
333     * Initialize and start the list thread.
334     */
335    public void init() {
336        if (listener == null) {
337            return;
338        }
339        if (listener.isDone()) {
340            return;
341        }
342        if (debug) {
343            logger.debug("initialize " + listener);
344        }
345        synchronized (theList) {
346            listener.start();
347        }
348    }
349
350
351    /**
352     * Terminate the list thread.
353     */
354    public void terminate() {
355        if (cf != null) {
356            cf.terminate();
357        }
358        if (channel != null) {
359            channel.close();
360        }
361        //theList.clear();
362        if (listener == null) {
363            return;
364        }
365        if (debug) {
366            logger.debug("terminate " + listener);
367        }
368        listener.setDone();
369        try {
370            while (listener.isAlive()) {
371                //System.out.print("+");
372                listener.interrupt();
373                listener.join(100);
374            }
375        } catch (InterruptedException e) {
376            Thread.currentThread().interrupt();
377        }
378        listener = null;
379    }
380
381}
382
383
384/**
385 * Thread to comunicate with the list server.
386 */
387
388class DHTListener<K, V> extends Thread {
389
390
391    private static final Logger logger = Logger.getLogger(DHTListener.class);
392
393
394    private static boolean debug = logger.isDebugEnabled();
395
396
397    private final SocketChannel channel;
398
399
400    private final SortedMap<K, V> theList;
401
402
403    private boolean goon;
404
405
406    DHTListener(SocketChannel s, SortedMap<K, V> list) {
407        channel = s;
408        theList = list;
409        goon = true;
410    }
411
412
413    boolean isDone() {
414        return !goon;
415    }
416
417
418    void setDone() {
419        goon = false;
420    }
421
422
423    /**
424     * run.
425     */
426    @SuppressWarnings("unchecked")
427    @Override
428    public void run() {
429        Object o;
430        DHTTransport<K, V> tc;
431        //goon = true;
432        while (goon) {
433            tc = null;
434            o = null;
435            try {
436                o = channel.receive();
437                if (debug) {
438                    logger.debug("receive(" + o + ")");
439                }
440                if (this.isInterrupted()) {
441                    goon = false;
442                    break;
443                }
444                if (o == null) {
445                    goon = false;
446                    break;
447                }
448                if (o instanceof DHTTransport) {
449                    tc = (DHTTransport<K, V>) o;
450                    K key = tc.key();
451                    if (key != null) {
452                        logger.info("receive, put(key=" + key + ")");
453                        V val = tc.value();
454                        synchronized (theList) {
455                            theList.put(key, val);
456                            theList.notifyAll();
457                        }
458                    }
459                }
460            } catch (IOException e) {
461                goon = false;
462                logger.info("receive, IO exception " + e);
463                //e.printStackTrace();
464            } catch (ClassNotFoundException e) {
465                goon = false;
466                logger.info("receive, CNF exception " + e);
467                e.printStackTrace();
468            } catch (Exception e) {
469                goon = false;
470                logger.info("receive, exception " + e);
471                e.printStackTrace();
472            }
473        }
474    }
475
476}