001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Iterator;
011import java.util.List;
012import java.util.Map.Entry;
013import java.util.SortedMap;
014import java.util.TreeMap;
015
016import org.apache.logging.log4j.LogManager;
017import org.apache.logging.log4j.Logger;
018
019
020/**
021 * Server for the distributed version of a list. TODO: redistribute list for
022 * late coming clients, removal of elements.
023 * @author Heinz Kredel
024 */
025
026public class DistHashTableServer<K> extends Thread {
027
028
029    private static final Logger logger = LogManager.getLogger(DistHashTableServer.class);
030
031
032    private static final boolean debug = logger.isDebugEnabled();
033
034
035    public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99;
036
037
038    protected final ChannelFactory cf;
039
040
041    protected List<DHTBroadcaster<K>> servers;
042
043
044    private volatile boolean goon = true;
045
046
047    private volatile Thread mythread = null;
048
049
050    protected final SortedMap<K, DHTTransport> theList;
051
052
053    private long etime;
054
055
056    private long dtime;
057
058
059    private long ertime;
060
061
062    private long drtime;
063
064
065    /**
066     * Constructs a new DistHashTableServer.
067     */
068    public DistHashTableServer() {
069        this(DEFAULT_PORT);
070    }
071
072
073    /**
074     * DistHashTableServer.
075     * @param port to run server on.
076     */
077    public DistHashTableServer(int port) {
078        this(new ChannelFactory(port));
079    }
080
081
082    /**
083     * DistHashTableServer.
084     * @param cf ChannelFactory to use.
085     */
086    public DistHashTableServer(ChannelFactory cf) {
087        this.cf = cf;
088        cf.init();
089        servers = new ArrayList<DHTBroadcaster<K>>();
090        theList = new TreeMap<K, DHTTransport>();
091        etime = DHTTransport.etime;
092        dtime = DHTTransport.dtime;
093        ertime = DHTTransport.ertime;
094        drtime = DHTTransport.drtime;
095    }
096
097
098    /**
099     * main. Usage: DistHashTableServer &lt;port&gt;
100     */
101    public static void main(String[] args) throws InterruptedException {
102        int port = DEFAULT_PORT;
103        if (args.length < 1) {
104            System.out.println("Usage: DistHashTableServer <port>");
105        } else {
106            try {
107                port = Integer.parseInt(args[0]);
108            } catch (NumberFormatException e) {
109            }
110        }
111        DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port);
112        dhts.init();
113        dhts.join();
114        // until CRTL-C
115    }
116
117
118    /**
119     * thread initialization and start.
120     */
121    public void init() {
122        this.start();
123    }
124
125
126    /**
127     * main server method.
128     */
129    @Override
130    public void run() {
131        SocketChannel channel = null;
132        DHTBroadcaster<K> s = null;
133        mythread = Thread.currentThread();
134        Entry<K, DHTTransport> e;
135        DHTTransport tc;
136        while (goon) {
137            //logger.debug("list server {} go on", this);
138            try {
139                channel = cf.getChannel();
140                if (debug) {
141                    logger.debug("dls channel = {}", channel);
142                }
143                if (mythread.isInterrupted()) {
144                    goon = false;
145                    //logger.info("list server {} interrupted", this);
146                } else {
147                    s = new DHTBroadcaster<K>(channel, servers, /*listElem,*/theList);
148                    int ls = 0;
149                    synchronized (servers) {
150                        if (goon) {
151                            servers.add(s);
152                            ls = theList.size();
153                            s.start();
154                        }
155                    }
156                    if (debug) {
157                        logger.info("server {} started {}", s, s.isAlive());
158                    }
159                    if (ls > 0) {
160                        //logger.debug("sending {} list elements", ls);
161                        synchronized (theList) {
162                            Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator();
163                            for (int i = 0; i < ls; i++) {
164                                e = it.next();
165                                // n = e.getKey(); // findbugs, already in tc
166                                tc = e.getValue();
167                                //DHTTransport tc = (DHTTransport) o;                             
168                                try {
169                                    s.sendChannel(tc);
170                                } catch (IOException ioe) {
171                                    // stop s
172                                }
173                            }
174                        }
175                    }
176                }
177            } catch (InterruptedException end) {
178                goon = false;
179                Thread.currentThread().interrupt();
180            }
181        }
182        if (debug) {
183            logger.info("DHTserver {} terminated", this);
184        }
185    }
186
187
188    /**
189     * terminate all servers.
190     */
191    public void terminate() {
192        goon = false;
193        logger.info("terminating");
194        if (cf != null) {
195            cf.terminate();
196        }
197        int svs = 0;
198        List<DHTBroadcaster<K>> scopy = null;
199        if (servers != null) {
200            synchronized (servers) {
201                svs = servers.size();
202                scopy = new ArrayList<DHTBroadcaster<K>>(servers);
203                Iterator<DHTBroadcaster<K>> it = scopy.iterator();
204                while (it.hasNext()) {
205                    DHTBroadcaster<K> br = it.next();
206                    br.goon = false;
207                    br.closeChannel();
208                    try {
209                        int c = 0;
210                        while (br.isAlive()) {
211                            c++;
212                            if (c > 10) {
213                                logger.warn("giving up on {}", br);
214                                break;
215                            }
216                            //System.out.print(".");
217                            br.interrupt();
218                            br.join(50);
219                        }
220                        if (debug) {
221                            logger.info("server {} terminated", br);
222                        }
223                        // now possible: 
224                        servers.remove(br);
225                    } catch (InterruptedException e) {
226                        Thread.currentThread().interrupt();
227                    }
228                }
229                servers.clear();
230            }
231            logger.info("{} broadcasters terminated {}", svs, scopy);
232            //? servers = null;
233        }
234        logger.debug("DHTBroadcasters terminated");
235        long enc = DHTTransport.etime - etime;
236        long dec = DHTTransport.dtime - dtime;
237        long encr = DHTTransport.ertime - ertime;
238        long decr = DHTTransport.drtime - drtime;
239        long drest = (encr * dec) / (enc + 1);
240        long sumest = enc + dec + encr + drest; // +decr not meaningful
241        logger.info("DHT time: encode = {}, decode = {}, enc raw = {}, dec raw wait = {}, dec raw est = {}, sum est = {}", enc, dec, encr, decr, drest, sumest);
242        if (mythread == null) {
243            return;
244        }
245        try {
246            while (mythread.isAlive()) {
247                //System.out.print("-");
248                mythread.interrupt();
249                mythread.join(100);
250            }
251            logger.warn("server terminated {}", mythread);
252        } catch (InterruptedException e) {
253            Thread.currentThread().interrupt();
254        }
255        mythread = null;
256        logger.info("terminated");
257    }
258
259
260    /**
261     * number of servers.
262     */
263    public int size() {
264        if (servers == null) {
265            return -1;
266        }
267        //synchronized (servers) removed
268        return servers.size();
269    }
270
271
272    /**
273     * toString.
274     * @return a string representation of this.
275     */
276    @Override
277    public String toString() {
278        return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")";
279    }
280
281}
282
283
284/**
285 * Thread for broadcasting all incoming objects to the list clients.
286 */
287class DHTBroadcaster<K> extends Thread /*implements Runnable*/ {
288
289
290    private static final Logger logger = LogManager.getLogger(DHTBroadcaster.class);
291
292
293    private static final boolean debug = logger.isDebugEnabled();
294
295
296    private final SocketChannel channel;
297
298
299    private final List<DHTBroadcaster<K>> bcaster;
300
301
302    private final SortedMap<K, DHTTransport> theList;
303
304
305    volatile boolean goon = true;
306
307
308    /**
309     * DHTBroadcaster.
310     * @param s SocketChannel to use.
311     * @param bc list of broadcasters.
312     * @param sm SortedMap with key value pairs.
313     */
314    public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) {
315        channel = s;
316        bcaster = bc;
317        theList = sm;
318    }
319
320
321    /**
322     * closeChannel.
323     */
324    public void closeChannel() {
325        channel.close();
326    }
327
328
329    /**
330     * sendChannel.
331     * @param tc DHTTransport.
332     * @throws IOException
333     */
334    public void sendChannel(DHTTransport tc) throws IOException {
335        if (goon) {
336            channel.send(tc);
337        }
338    }
339
340
341    /**
342     * broadcast.
343     * @param o DHTTransport element to broadcast.
344     */
345    @SuppressWarnings({ "unchecked", "cast" })
346    public void broadcast(DHTTransport o) {
347        if (debug) {
348            logger.debug("broadcast = {}", o);
349        }
350        DHTTransport<K, Object> tc = null;
351        if (o == null) {
352            return;
353        }
354        //if ( ! (o instanceof DHTTransport) ) {
355        //   return;
356        //}
357        tc = (DHTTransport<K, Object>) o;
358        K key = null;
359        synchronized (theList) {
360            //test
361            //Object x = theList.get( tc.key );
362            //if ( x != null ) {
363            //   logger.info("theList duplicate key {}", tc.key );
364            //}
365            try {
366                if (!(o instanceof DHTTransportClear)) {
367                    key = tc.key();
368                    theList.put(key, tc);
369                }
370            } catch (IOException e) {
371                logger.warn("IO exception: tc.key() not ok {}", tc);
372                e.printStackTrace();
373            } catch (ClassNotFoundException e) {
374                logger.warn("CNF exception: tc.key() not ok {}", tc);
375                e.printStackTrace();
376            } catch (Exception e) {
377                logger.warn("exception: tc.key() not ok {}", tc);
378                e.printStackTrace();
379            }
380        }
381        logger.info("sending key={} to {} nodes", key, bcaster.size());
382        List<DHTBroadcaster<K>> bccopy = null;
383        synchronized (bcaster) {
384            bccopy = new ArrayList<DHTBroadcaster<K>>(bcaster);
385        }
386        Iterator<DHTBroadcaster<K>> it = bccopy.iterator();
387        while (it.hasNext()) {
388            DHTBroadcaster<K> br = it.next();
389            try {
390                if (debug) {
391                    logger.debug("bcasting to {}", br);
392                }
393                br.sendChannel(tc);
394            } catch (IOException e) {
395                logger.info("bcaster, IOexception {}", e);
396                synchronized (bcaster) {
397                    bcaster.remove(br); //no more: ConcurrentModificationException
398                }
399                try {
400                    br.goon = false;
401                    br.closeChannel();
402                    while (br.isAlive()) {
403                        br.interrupt();
404                        br.join(100);
405                    }
406                } catch (InterruptedException w) {
407                    Thread.currentThread().interrupt();
408                }
409                //
410                logger.info("bcaster.remove() {}", br);
411            } catch (Exception e) {
412                logger.info("bcaster, exception {}", e);
413            }
414        }
415    }
416
417
418    /**
419     * run.
420     */
421    @Override
422    public void run() {
423        goon = true;
424        while (goon) {
425            try {
426                logger.debug("trying to receive");
427                Object o = channel.receive();
428                if (this.isInterrupted()) {
429                    goon = false;
430                    break;
431                }
432                if (debug) {
433                    logger.debug("received = {}", o);
434                }
435                if (!(o instanceof DHTTransport)) {
436                    logger.warn("wrong object type: {}", o);
437                    goon = false;
438                    break; //continue;
439                }
440                if (o instanceof DHTTransportClear) {
441                    logger.info("receive, clear");
442                    synchronized (theList) {
443                        theList.clear();
444                        theList.notifyAll();
445                    }
446                }
447                DHTTransport tc = (DHTTransport) o;
448                broadcast(tc);
449                if (this.isInterrupted()) {
450                    goon = false;
451                }
452            } catch (IOException e) {
453                goon = false;
454                logger.info("receive, IO exception {}", e);
455                //e.printStackTrace();
456            } catch (ClassNotFoundException e) {
457                goon = false;
458                logger.info("receive, CNF exception {}", e);
459                e.printStackTrace();
460            } catch (Exception e) {
461                goon = false;
462                logger.info("receive, exception {}", e);
463                e.printStackTrace();
464            }
465        }
466        logger.info("ending {}", this);
467        synchronized (bcaster) {
468            bcaster.remove(this);
469        }
470        channel.close();
471    }
472
473
474    /**
475     * toString.
476     * @return a string representation of this.
477     */
478    @Override
479    public String toString() {
480        return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")";
481    }
482
483}