001    /*
002     * $Id: DistHashTableServer.java 3296 2010-08-26 17:30:55Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    
008    import java.io.IOException;
009    import java.util.ArrayList;
010    import java.util.Iterator;
011    import java.util.List;
012    import java.util.SortedMap;
013    import java.util.TreeMap;
014    import java.util.Map.Entry;
015    
016    import org.apache.log4j.Logger;
017    
018    
019    /**
020     * Server for the distributed version of a list.
021     * @author Heinz Kredel
022     * @todo redistribute list for late comming clients, removal of elements.
023     */
024    
025    public class DistHashTableServer<K> extends Thread {
026    
027    
028        private static final Logger logger = Logger.getLogger(DistHashTableServer.class);
029    
030    
031        public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99;
032    
033    
034        protected final ChannelFactory cf;
035    
036    
037        protected List<DHTBroadcaster<K>> servers;
038    
039    
040        private boolean goon = true;
041    
042    
043        private Thread mythread = null;
044    
045    
046        protected final SortedMap<K, DHTTransport> theList;
047    
048    
049        private long etime;
050    
051    
052        private long dtime;
053    
054    
055        private long ertime;
056    
057    
058        private long drtime;
059    
060    
061        /**
062         * Constructs a new DistHashTableServer.
063         */
064        public DistHashTableServer() {
065            this(DEFAULT_PORT);
066        }
067    
068    
069        /**
070         * DistHashTableServer.
071         * @param port to run server on.
072         */
073        public DistHashTableServer(int port) {
074            this(new ChannelFactory(port));
075        }
076    
077    
078        /**
079         * DistHashTableServer.
080         * @param cf ChannelFactory to use.
081         */
082        public DistHashTableServer(ChannelFactory cf) {
083            this.cf = cf;
084            cf.init();
085            servers = new ArrayList<DHTBroadcaster<K>>();
086            theList = new TreeMap<K, DHTTransport>();
087            etime = DHTTransport.etime;
088            dtime = DHTTransport.dtime;
089            ertime = DHTTransport.ertime;
090            drtime = DHTTransport.drtime;
091        }
092    
093    
094        /**
095         * main. Usage: DistHashTableServer &lt;port&gt;
096         */
097        public static void main(String[] args) {
098            int port = DEFAULT_PORT;
099            if (args.length < 1) {
100                System.out.println("Usage: DistHashTableServer <port>");
101            } else {
102                try {
103                    port = Integer.parseInt(args[0]);
104                } catch (NumberFormatException e) {
105                }
106            }
107            (new DistHashTableServer/*raw: <K>*/(port)).run();
108            // until CRTL-C
109        }
110    
111    
112        /**
113         * thread initialization and start.
114         */
115        public void init() {
116            this.start();
117        }
118    
119    
120        /**
121         * main server method.
122         */
123        @Override
124        public void run() {
125            SocketChannel channel = null;
126            DHTBroadcaster<K> s = null;
127            mythread = Thread.currentThread();
128            Entry<K, DHTTransport> e;
129            K n;
130            DHTTransport tc;
131            while (goon) {
132                //logger.debug("list server " + this + " go on");
133                try {
134                    channel = cf.getChannel();
135                    if (logger.isDebugEnabled()) {
136                        logger.debug("dls channel = " + channel);
137                    }
138                    if (mythread.isInterrupted()) {
139                        goon = false;
140                        //logger.info("list server " + this + " interrupted");
141                    } else {
142                        s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList);
143                        int ls = 0;
144                        synchronized (servers) {
145                            if (goon) {
146                                servers.add(s);
147                                ls = theList.size();
148                                s.start();
149                            }
150                        }
151                        if (logger.isInfoEnabled()) {
152                            logger.info("server " + s + " started " + s.isAlive());
153                        }
154                        if (ls > 0) {
155                            //logger.debug("sending " + ls + " list elements");
156                            synchronized (theList) {
157                                Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator();
158                                for (int i = 0; i < ls; i++) {
159                                    e = it.next();
160                                    n = e.getKey();
161                                    tc = e.getValue();
162                                    //DHTTransport tc = (DHTTransport) o;                             
163                                    try {
164                                        s.sendChannel(tc);
165                                    } catch (IOException ioe) {
166                                        // stop s
167                                    }
168                                }
169                            }
170                        }
171                    }
172                } catch (InterruptedException end) {
173                    goon = false;
174                    Thread.currentThread().interrupt();
175                }
176            }
177            if (logger.isDebugEnabled()) {
178                logger.debug("listserver " + this + " terminated");
179            }
180        }
181    
182    
183        /**
184         * terminate all servers.
185         */
186        public void terminate() {
187            goon = false;
188            logger.debug("terminating ListServer");
189            if (cf != null) {
190                cf.terminate();
191            }
192            int svs = 0;
193            if (servers != null) {
194                synchronized (servers) {
195                    svs = servers.size();
196                    Iterator<DHTBroadcaster<K>> it = servers.iterator();
197                    while (it.hasNext()) {
198                        DHTBroadcaster<K> br = it.next();
199                        br.closeChannel();
200                        try {
201                            int c = 0;
202                            while (br.isAlive()) {
203                                c++;
204                                if (c > 10) {
205                                    logger.warn("giving up on " + br);
206                                    break;
207                                }
208                                //System.out.print(".");
209                                br.interrupt();
210                                br.join(100);
211                            }
212                            if (logger.isDebugEnabled()) {
213                                logger.debug("server " + br + " terminated");
214                            }
215                        } catch (InterruptedException e) {
216                            Thread.currentThread().interrupt();
217                        }
218                    }
219                    servers.clear();
220                }
221                logger.info(svs + " broadcasters terminated");
222                //? servers = null;
223            }
224            logger.debug("DHTBroadcasters terminated");
225            long enc = DHTTransport.etime - etime;
226            long dec = DHTTransport.dtime - dtime;
227            long encr = DHTTransport.ertime - ertime;
228            long decr = DHTTransport.drtime - drtime;
229            long drest = (encr * dec) / (enc + 1);
230            logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr
231                    + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = "
232                    + (enc + dec + encr + drest)); // +decr not meaningful
233            if (mythread == null) {
234                return;
235            }
236            try {
237                while (mythread.isAlive()) {
238                    //System.out.print("-");
239                    mythread.interrupt();
240                    mythread.join(100);
241                }
242                if (logger.isDebugEnabled()) {
243                    logger.debug("server terminated " + mythread);
244                }
245            } catch (InterruptedException e) {
246                Thread.currentThread().interrupt();
247            }
248            mythread = null;
249            logger.debug("ListServer terminated");
250        }
251    
252    
253        /**
254         * number of servers.
255         */
256        public int size() {
257            synchronized (servers) {
258                return servers.size();
259            }
260        }
261    
262    
263        /**
264         * toString.
265         * @return a string representation of this.
266         */
267        @Override
268        public String toString() {
269            return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")";
270        }
271    
272    }
273    
274    
275    /**
276     * Thread for broadcasting all incoming objects to the list clients.
277     */
278    
279    class DHTBroadcaster<K> extends Thread /*implements Runnable*/{
280    
281    
282        private static final Logger logger = Logger.getLogger(DHTBroadcaster.class);
283    
284    
285        private final SocketChannel channel;
286    
287    
288        private final List<DHTBroadcaster<K>> bcaster;
289    
290    
291        private final SortedMap<K, DHTTransport> theList;
292    
293    
294        /**
295         * DHTBroadcaster.
296         * @param s SocketChannel to use.
297         * @param bc list of broadcasters.
298         * @param le DHTCounter.
299         * @param sm SortedMap with key value pairs.
300         */
301        public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) {
302            channel = s;
303            bcaster = bc;
304            theList = sm;
305        }
306    
307    
308        /**
309         * closeChannel.
310         */
311        public void closeChannel() {
312            channel.close();
313        }
314    
315    
316        /**
317         * sendChannel.
318         * @param tc DHTTransport.
319         * @throws IOException
320         */
321        public void sendChannel(DHTTransport tc) throws IOException {
322            channel.send(tc);
323        }
324    
325    
326        /**
327         * broadcast.
328         * @param o DHTTransport element to broadcast.
329         */
330        @SuppressWarnings("unchecked")
331        public void broadcast(DHTTransport o) {
332            if (logger.isDebugEnabled()) {
333                logger.debug("broadcast = " + o);
334            }
335            DHTTransport<K, Object> tc = null;
336            if (o == null) {
337                return;
338            }
339            //if ( ! (o instanceof DHTTransport) ) {
340            //   return;
341            //}
342            tc = (DHTTransport<K, Object>) o;
343            K key = null;
344            synchronized (theList) {
345                //test
346                //Object x = theList.get( tc.key );
347                //if ( x != null ) {
348                //   logger.info("theList duplicate key " + tc.key );
349                //}
350                try {
351                    key = tc.key();
352                    theList.put(key, tc);
353                } catch (IOException e) {
354                    logger.warn("IO exception: tc.key() not ok " + tc);
355                    e.printStackTrace();
356                } catch (ClassNotFoundException e) {
357                    logger.warn("CNF exception: tc.key() not ok " + tc);
358                    e.printStackTrace();
359                } catch (Exception e) {
360                    logger.warn("exception:tc.key() not ok " + tc);
361                    e.printStackTrace();
362                }
363            }
364            logger.info("sending key=" + key + " to " + bcaster.size() + " nodes");
365            synchronized (bcaster) {
366                Iterator<DHTBroadcaster<K>> it = bcaster.iterator();
367                while (it.hasNext()) {
368                    DHTBroadcaster<K> br = it.next();
369                    try {
370                        if (logger.isDebugEnabled()) {
371                            logger.debug("bcasting to " + br);
372                        }
373                        br.sendChannel(tc);
374                    } catch (IOException e) {
375                        logger.info("bcaster, exception " + e);
376                        try {
377                            br.closeChannel();
378                            while (br.isAlive()) {
379                                br.interrupt();
380                                br.join(100);
381                            }
382                        } catch (InterruptedException w) {
383                            Thread.currentThread().interrupt();
384                        }
385                        it.remove( /*br*/); //ConcurrentModificationException
386                        logger.debug("bcaster.remove() " + br);
387                    } catch (Exception e) {
388                        logger.info("bcaster, exception " + e);
389                    }
390                }
391            }
392        }
393    
394    
395        /**
396         * run.
397         */
398        @Override
399        public void run() {
400            boolean goon = true;
401            while (goon) {
402                try {
403                    logger.debug("trying to receive");
404                    Object o = channel.receive();
405                    if (this.isInterrupted()) {
406                        break;
407                    }
408                    if (logger.isDebugEnabled()) {
409                        logger.debug("received = " + o);
410                    }
411                    if (!(o instanceof DHTTransport)) {
412                        logger.warn("swallowed: " + o);
413                        continue;
414                    }
415                    DHTTransport tc = (DHTTransport) o;
416                    broadcast(tc);
417                    if (this.isInterrupted()) {
418                        goon = false;
419                    }
420                } catch (IOException e) {
421                    goon = false;
422                    logger.info("receive, IO exception " + e);
423                    //e.printStackTrace();
424                } catch (ClassNotFoundException e) {
425                    goon = false;
426                    logger.info("receive, CNF exception " + e);
427                    e.printStackTrace();
428                } catch (Exception e) {
429                    goon = false;
430                    logger.info("receive, exception " + e);
431                    e.printStackTrace();
432                }
433            }
434            if (logger.isDebugEnabled()) {
435                logger.debug("DHTBroadcaster terminated " + this);
436            }
437            channel.close();
438        }
439    
440    
441        /**
442         * toString.
443         * @return a string representation of this.
444         */
445        @Override
446        public String toString() {
447            return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")";
448        }
449    
450    }