001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.AbstractMap;
010import java.util.ArrayList;
011import java.util.Collection;
012import java.util.Iterator;
013import java.util.List;
014import java.util.Set;
015import java.util.SortedMap;
016import java.util.TreeMap;
017//import java.util.concurrent.ConcurrentSkipListMap;
018
019import org.apache.logging.log4j.Logger;
020import org.apache.logging.log4j.LogManager; 
021
022
023/**
024 * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to
025 * keep the sequence order of elements.
026 * @author Heinz Kredel
027 */
028
029public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{
030
031
032    private static final Logger logger = LogManager.getLogger(DistHashTable.class);
033
034
035    private static final boolean debug = logger.isDebugEnabled();
036
037
038    protected final SortedMap<K, V> theList;
039
040
041    protected final ChannelFactory cf;
042
043
044    protected SocketChannel channel = null;
045
046
047    protected DHTListener<K, V> listener = null;
048
049
050    /**
051     * Constructs a new DistHashTable.
052     * @param host name or IP of server host.
053     */
054    public DistHashTable(String host) {
055        this(host, DistHashTableServer.DEFAULT_PORT);
056    }
057
058
059    /**
060     * DistHashTable.
061     * @param host name or IP of server host.
062     * @param port on server.
063     */
064    public DistHashTable(String host, int port) {
065        this(new ChannelFactory(port + 1), host, port);
066    }
067
068
069    /**
070     * DistHashTable.
071     * @param cf ChannelFactory to use.
072     * @param host name or IP of server host.
073     * @param port on server.
074     */
075    public DistHashTable(ChannelFactory cf, String host, int port) {
076        this.cf = cf;
077        cf.init(); // why? see constructor
078        try {
079            channel = cf.getChannel(host, port);
080        } catch (IOException e) {
081            e.printStackTrace();
082            throw new RuntimeException(e);
083        }
084        if (debug) {
085            logger.debug("dl channel = {}", channel);
086        }
087        //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6
088        theList = new TreeMap<K, V>();
089        listener = new DHTListener<K, V>(channel, theList);
090        // listener.start() is in initialize()
091    }
092
093
094    /**
095     * DistHashTable.
096     * @param sc SocketChannel to use.
097     */
098    public DistHashTable(SocketChannel sc) {
099        cf = null;
100        channel = sc;
101        //theList = new ConcurrentSkipListMap<K, V>(); // Java 1.6
102        theList = new TreeMap<K, V>();
103        listener = new DHTListener<K, V>(channel, theList);
104        // listener.start() is in initialize()
105    }
106
107
108    /**
109     * Hash code.
110     */
111    @Override
112    public int hashCode() {
113        return theList.hashCode();
114    }
115
116
117    /**
118     * Equals.
119     */
120    @Override
121    public boolean equals(Object o) {
122        return theList.equals(o);
123    }
124
125
126    /**
127     * Contains key.
128     */
129    @Override
130    public boolean containsKey(Object o) {
131        return theList.containsKey(o);
132    }
133
134
135    /**
136     * Contains value.
137     */
138    @Override
139    public boolean containsValue(Object o) {
140        return theList.containsValue(o);
141    }
142
143
144    /**
145     * Get the values as Collection.
146     */
147    @Override
148    public Collection<V> values() {
149        synchronized (theList) {
150            return new ArrayList<V>(theList.values());
151            //return theList.values();
152        }
153    }
154
155
156    /**
157     * Get the keys as set.
158     */
159    @Override
160    public Set<K> keySet() {
161        synchronized (theList) {
162            return theList.keySet();
163        }
164    }
165
166
167    /**
168     * Get the entries as Set.
169     */
170    @Override
171    public Set<Entry<K, V>> entrySet() {
172        synchronized (theList) {
173            return theList.entrySet();
174        }
175    }
176
177
178    /**
179     * Get the internal list, convert from Collection.
180     */
181    // To be fixed?, but is ok.
182    public List<V> getValueList() {
183        synchronized (theList) {
184            return new ArrayList<V>(theList.values());
185        }
186    }
187
188
189    /**
190     * Get the internal sorted map. For synchronization purpose in normalform.
191     */
192    public SortedMap<K, V> getList() {
193        return theList;
194    }
195
196
197    /**
198     * Size of the (local) list.
199     */
200    @Override
201    public int size() {
202        synchronized (theList) {
203            return theList.size();
204        }
205    }
206
207
208    /**
209     * Is the List empty?
210     */
211    @Override
212    public boolean isEmpty() {
213        synchronized (theList) {
214            return theList.isEmpty();
215        }
216    }
217
218
219    /**
220     * List key iterator.
221     */
222    public Iterator<K> iterator() {
223        synchronized (theList) {
224            return theList.keySet().iterator();
225        }
226    }
227
228
229    /**
230     * List value iterator.
231     */
232    public Iterator<V> valueIterator() {
233        synchronized (theList) {
234            return theList.values().iterator();
235        }
236    }
237
238
239    /**
240     * Put object to the distributed hash table. Blocks until the key value pair
241     * is send and received from the server.
242     * @param key
243     * @param value
244     */
245    public void putWait(K key, V value) {
246        //V o = 
247        put(key, value); // = send
248        // assume key does not change multiple times before test:
249        while (!value.equals(getWait(key))) {
250            //System.out.print("#");
251        }
252    }
253
254
255    /**
256     * Put object to the distributed hash table. Returns immediately after
257     * sending, does not block.
258     * @param key
259     * @param value
260     */
261    @Override
262    public V put(K key, V value) {
263        if (key == null || value == null) {
264            throw new NullPointerException("null keys or values not allowed");
265        }
266        try {
267            DHTTransport<K, V> tc = DHTTransport.<K, V> create(key, value);
268            channel.send(tc);
269            //System.out.println("send: "+tc+" @ "+listener);
270        } catch (IOException e) {
271            logger.info("send, exception {}", e);
272            e.printStackTrace();
273        } catch (Exception e) {
274            logger.info("send, exception {}", e);
275            e.printStackTrace();
276        }
277        return null;
278    }
279
280
281    /**
282     * Get value under key from DHT. Blocks until the object is send and
283     * received from the server (actually it blocks until some value under key
284     * is received).
285     * @param key
286     * @return the value stored under the key.
287     */
288    public V getWait(K key) {
289        V value = null;
290        try {
291            synchronized (theList) {
292                //value = theList.get(key);
293                value = get(key);
294                while (value == null) {
295                    //System.out.print("^");
296                    theList.wait(100);
297                    value = theList.get(key);
298                }
299            }
300        } catch (InterruptedException e) {
301            Thread.currentThread().interrupt();
302            e.printStackTrace();
303        }
304        return value;
305    }
306
307
308    /**
309     * Get value under key from DHT. If no value is jet available null is
310     * returned.
311     * @param key
312     * @return the value stored under the key.
313     */
314    @Override
315    public V get(Object key) {
316        synchronized (theList) {
317            return theList.get(key);
318        }
319    }
320
321
322    /**
323     * Clear the List. 
324     * Clearance request is distributed to all clients.
325     */
326    @Override
327    public void clear() {
328        synchronized (theList) {
329            theList.clear();
330        }
331        // done after 11 month: send clear message to others
332        try {
333            DHTTransport<K, V> tc = new DHTTransportClear<K, V>();
334            channel.send(tc);
335            //System.out.println("send: "+tc+" @ "+listener);
336        } catch (IOException e) {
337            logger.info("send, exception {}", e);
338            e.printStackTrace();
339        } catch (Exception e) {
340            logger.info("send, exception {}", e);
341            e.printStackTrace();
342        }
343    }
344
345
346    /**
347     * Initialize and start the list thread.
348     */
349    public void init() {
350        if (listener == null) {
351            return;
352        }
353        if (listener.isDone()) {
354            return;
355        }
356        if (debug) {
357            logger.debug("initialize {}", listener);
358        }
359        synchronized (theList) {
360            listener.start();
361        }
362    }
363
364
365    /**
366     * Terminate the list thread.
367     */
368    public void terminate() {
369        if (cf != null) {
370            cf.terminate();
371        }
372        if (channel != null) {
373            channel.close();
374        }
375        //theList.clear();
376        if (listener == null) {
377            return;
378        }
379        if (debug) {
380            logger.debug("terminate {}", listener);
381        }
382        listener.setDone();
383        try {
384            while (listener.isAlive()) {
385                //System.out.print("+");
386                listener.interrupt();
387                listener.join(100);
388            }
389        } catch (InterruptedException e) {
390            Thread.currentThread().interrupt();
391        }
392        listener = null;
393    }
394
395}
396
397
398/**
399 * Thread to communicate with the list server.
400 */
401class DHTListener<K, V> extends Thread {
402
403
404    private static final Logger logger = LogManager.getLogger(DHTListener.class);
405
406
407    private static final boolean debug = logger.isDebugEnabled();
408
409
410    private final SocketChannel channel;
411
412
413    private final SortedMap<K, V> theList;
414
415
416    private boolean goon;
417
418
419    DHTListener(SocketChannel s, SortedMap<K, V> list) {
420        channel = s;
421        theList = list;
422        goon = true;
423    }
424
425
426    boolean isDone() {
427        return !goon;
428    }
429
430
431    void setDone() {
432        goon = false;
433    }
434
435
436    /**
437     * run.
438     */
439    @SuppressWarnings("unchecked")
440    @Override
441    public void run() {
442        logger.debug("running ");
443        Object o;
444        DHTTransport<K, V> tc;
445        //goon = true;
446        while (goon) {
447            tc = null;
448            o = null;
449            try {
450                o = channel.receive();
451                if (debug) {
452                    logger.debug("receive({})", o);
453                }
454                if (this.isInterrupted()) {
455                    goon = false;
456                    break;
457                }
458                if (o == null) {
459                    goon = false;
460                    break;
461                }
462                if (o instanceof DHTTransportClear) {
463                    logger.debug("receive, clear");
464                    synchronized (theList) {
465                        theList.clear();
466                        theList.notifyAll();
467                    }
468                    continue;
469                }
470                if (o instanceof DHTTransport) {
471                    tc = (DHTTransport<K, V>) o;
472                    K key = tc.key();
473                    if (key != null) {
474                        logger.info("receive, put(key={})", key);
475                        V val = tc.value();
476                        synchronized (theList) {
477                            theList.put(key, val);
478                            theList.notifyAll();
479                        }
480                    }
481                }
482            } catch (IOException e) {
483                goon = false;
484                logger.info("receive, IO exception {}", e);
485                //e.printStackTrace();
486            } catch (ClassNotFoundException e) {
487                goon = false;
488                logger.info("receive, CNF exception {}", e);
489                e.printStackTrace();
490            } catch (Exception e) {
491                goon = false;
492                logger.info("receive, exception {}", e);
493                e.printStackTrace();
494            }
495        }
496    }
497
498}