001    /*
002     * $Id: DistHashTable.java 3279 2010-08-21 20:18:25Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    
008    import java.io.IOException;
009    import java.util.AbstractMap;
010    import java.util.ArrayList;
011    import java.util.Collection;
012    import java.util.Iterator;
013    import java.util.List;
014    import java.util.Set;
015    import java.util.SortedMap;
016    import java.util.TreeMap;
017    
018    import org.apache.log4j.Logger;
019    
020    
021    /**
022     * Distributed version of a HashTable. Implemented with a SortedMap / TreeMap to
023     * keep the sequence order of elements.
024     * @author Heinz Kredel
025     */
026    
027    public class DistHashTable<K, V> extends AbstractMap<K, V> /* implements Map<K,V> */{
028    
029    
030        private static final Logger logger = Logger.getLogger(DistHashTable.class);
031    
032    
033        protected final SortedMap<K, V> theList;
034    
035    
036        protected final ChannelFactory cf;
037    
038    
039        protected SocketChannel channel = null;
040    
041    
042        protected DHTListener<K, V> listener = null;
043    
044    
045        /**
046         * Constructs a new DistHashTable.
047         * @param host name or IP of server host.
048         */
049        public DistHashTable(String host) {
050            this(host, DistHashTableServer.DEFAULT_PORT);
051        }
052    
053    
054        /**
055         * DistHashTable.
056         * @param host name or IP of server host.
057         * @param port on server.
058         */
059        public DistHashTable(String host, int port) {
060            this(new ChannelFactory(port + 1), host, port);
061        }
062    
063    
064        /**
065         * DistHashTable.
066         * @param cf ChannelFactory to use.
067         * @param host name or IP of server host.
068         * @param port on server.
069         */
070        public DistHashTable(ChannelFactory cf, String host, int port) {
071            this.cf = cf;
072            cf.init();
073            try {
074                channel = cf.getChannel(host, port);
075            } catch (IOException e) {
076                e.printStackTrace();
077            }
078            if (logger.isDebugEnabled()) {
079                logger.debug("dl channel = " + channel);
080            }
081            theList = new TreeMap<K, V>();
082            listener = new DHTListener<K, V>(channel, theList);
083            synchronized (theList) {
084                listener.start();
085            }
086        }
087    
088    
089        /**
090         * DistHashTable.
091         * @param sc SocketChannel to use.
092         */
093        public DistHashTable(SocketChannel sc) {
094            cf = null;
095            channel = sc;
096            theList = new TreeMap<K, V>();
097            listener = new DHTListener<K, V>(channel, theList);
098            synchronized (theList) {
099                listener.start();
100            }
101        }
102    
103    
104        /**
105         * Hash code.
106         */
107        @Override
108        public int hashCode() {
109            return theList.hashCode();
110        }
111    
112    
113        /**
114         * Equals.
115         */
116        @Override
117        public boolean equals(Object o) {
118            return theList.equals(o);
119        }
120    
121    
122        /**
123         * Contains key.
124         */
125        @Override
126        public boolean containsKey(Object o) {
127            return theList.containsKey(o);
128        }
129    
130    
131        /**
132         * Contains value.
133         */
134        @Override
135        public boolean containsValue(Object o) {
136            return theList.containsValue(o);
137        }
138    
139    
140        /**
141         * Get the values as Collection.
142         */
143        @Override
144        public Collection<V> values() {
145            return theList.values();
146        }
147    
148    
149        /**
150         * Get the keys as set.
151         */
152        @Override
153        public Set<K> keySet() {
154            return theList.keySet();
155        }
156    
157    
158        /**
159         * Get the entries as Set.
160         */
161        @Override
162        public Set<Entry<K, V>> entrySet() {
163            return theList.entrySet();
164        }
165    
166    
167        /**
168         * Get the internal list, convert from Collection.
169         * @fix but is ok
170         */
171        public List<V> getValueList() {
172            synchronized (theList) {
173                return new ArrayList<V>(theList.values());
174            }
175        }
176    
177    
178        /**
179         * Get the internal sorted map. For synchronization purpose in normalform.
180         */
181        public SortedMap<K, V> getList() {
182            return theList;
183        }
184    
185    
186        /**
187         * Size of the (local) list.
188         */
189        @Override
190        public int size() {
191            synchronized (theList) {
192                return theList.size();
193            }
194        }
195    
196    
197        /**
198         * Is the List empty?
199         */
200        @Override
201        public boolean isEmpty() {
202            synchronized (theList) {
203                return theList.isEmpty();
204            }
205        }
206    
207    
208        /**
209         * List key iterator.
210         */
211        public Iterator<K> iterator() {
212            synchronized (theList) {
213                return theList.keySet().iterator();
214            }
215        }
216    
217    
218        /**
219         * List value iterator.
220         */
221        public Iterator<V> valueIterator() {
222            synchronized (theList) {
223                return theList.values().iterator();
224            }
225        }
226    
227    
228        /**
229         * Put object to the distributed hash table. Blocks until the key value pair
230         * is send and received from the server.
231         * @param key
232         * @param value
233         */
234        public void putWait(K key, V value) {
235            put(key, value); // = send
236            try {
237                synchronized (theList) {
238                    while (!value.equals(theList.get(key))) {
239                        //System.out.print("#");
240                        theList.wait(100);
241                    }
242                }
243            } catch (InterruptedException e) {
244                Thread.currentThread().interrupt();
245                e.printStackTrace();
246            }
247        }
248    
249    
250        /**
251         * Put object to the distributed hash table. Returns immediately after
252         * sending does not block.
253         * @param key
254         * @param value
255         */
256        @Override
257        public V put(K key, V value) {
258            if (key == null || value == null) {
259                throw new NullPointerException("null keys or values not allowed");
260            }
261            try {
262                DHTTransport<K,V> tc = DHTTransport.<K,V> create(key, value);
263                channel.send(tc);
264                //System.out.println("send: "+tc+" @ "+listener);
265            } catch (IOException e) {
266                logger.info("send, exception " + e);
267                e.printStackTrace();
268            } catch (Exception e) {
269                logger.info("send, exception " + e);
270                e.printStackTrace();
271            }
272            return null;
273        }
274    
275    
276        /**
277         * Get value under key from DHT. Blocks until the object is send and
278         * received from the server (actually it blocks until some value under key
279         * is received).
280         * @param key
281         * @return the value stored under the key.
282         */
283        public V getWait(K key) {
284            V value = null;
285            try {
286                synchronized (theList) {
287                    //value = theList.get(key);
288                    value = get(key);
289                    while (value == null) {
290                        //System.out.print("^");
291                        theList.wait(100);
292                        value = theList.get(key);
293                    }
294                }
295            } catch (InterruptedException e) {
296                Thread.currentThread().interrupt();
297                e.printStackTrace();
298            }
299            return value;
300        }
301    
302    
303        /**
304         * Get value under key from DHT. If no value is jet available null is
305         * returned.
306         * @param key
307         * @return the value stored under the key.
308         */
309        @Override
310        public V get(Object key) {
311            synchronized (theList) {
312                return theList.get(key);
313            }
314        }
315    
316    
317        /**
318         * Clear the List. Caveat: must be called on all clients.
319         */
320        @Override
321        public void clear() {
322            // send clear message to others
323            synchronized (theList) {
324                theList.clear();
325            }
326        }
327    
328    
329        /**
330         * Terminate the list thread.
331         */
332        public void terminate() {
333            if (cf != null) {
334                cf.terminate();
335            }
336            if (channel != null) {
337                channel.close();
338            }
339            //theList.clear();
340            if (listener == null) {
341                return;
342            }
343            if (logger.isDebugEnabled()) {
344                logger.debug("terminate " + listener);
345            }
346            listener.setDone();
347            try {
348                while (listener.isAlive()) {
349                    //System.out.print("+");
350                    listener.interrupt();
351                    listener.join(100);
352                }
353            } catch (InterruptedException e) {
354                Thread.currentThread().interrupt();
355            }
356            listener = null;
357        }
358    
359    }
360    
361    
362    /**
363     * Thread to comunicate with the list server.
364     */
365    
366    class DHTListener<K, V> extends Thread {
367    
368    
369        private static final Logger logger = Logger.getLogger(DHTListener.class);
370    
371    
372        private final SocketChannel channel;
373    
374    
375        private final SortedMap<K, V> theList;
376    
377    
378        private boolean goon;
379    
380    
381        DHTListener(SocketChannel s, SortedMap<K, V> list) {
382            channel = s;
383            theList = list;
384        }
385    
386    
387        void setDone() {
388            goon = false;
389        }
390    
391    
392        /**
393         * run.
394         */
395        @SuppressWarnings("unchecked")
396        @Override
397        public void run() {
398            Object o;
399            DHTTransport<K, V> tc;
400            goon = true;
401            while (goon) {
402                tc = null;
403                o = null;
404                try {
405                    o = channel.receive();
406                    if (logger.isDebugEnabled()) {
407                        logger.debug("receive(" + o + ")");
408                    }
409                    if (this.isInterrupted()) {
410                        goon = false;
411                        break;
412                    }
413                    if (o == null) {
414                        goon = false;
415                        break;
416                    }
417                    if (o instanceof DHTTransport) {
418                        tc = (DHTTransport<K, V>) o;
419                        K key = tc.key();
420                        if (key != null) {
421                            logger.info("receive, put(key=" + key + ")");
422                            V val = tc.value();
423                            synchronized (theList) {
424                                theList.put(key, val);
425                                theList.notifyAll();
426                            }
427                        }
428                    }
429                } catch (IOException e) {
430                    goon = false;
431                    logger.info("receive, IO exception " + e);
432                    //e.printStackTrace();
433                } catch (ClassNotFoundException e) {
434                    goon = false;
435                    logger.info("receive, CNF exception " + e);
436                    e.printStackTrace();
437                } catch (Exception e) {
438                    goon = false;
439                    logger.info("receive, exception " + e);
440                    e.printStackTrace();
441                }
442            }
443        }
444    
445    }