001/*
002 * $Id: DistHashTableServer.java 4962 2014-10-17 19:05:55Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Iterator;
011import java.util.List;
012import java.util.Map.Entry;
013import java.util.SortedMap;
014import java.util.TreeMap;
015
016import org.apache.log4j.Logger;
017
018
019/**
020 * Server for the distributed version of a list.
021 * @author Heinz Kredel TODO: redistribute list for late coming clients, removal
022 *         of elements.
023 */
024
025public class DistHashTableServer<K> extends Thread {
026
027
028    private static final Logger logger = Logger.getLogger(DistHashTableServer.class);
029
030
031    public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99;
032
033
034    protected final ChannelFactory cf;
035
036
037    protected List<DHTBroadcaster<K>> servers;
038
039
040    private boolean goon = true;
041
042
043    private Thread mythread = null;
044
045
046    protected final SortedMap<K, DHTTransport> theList;
047
048
049    private long etime;
050
051
052    private long dtime;
053
054
055    private long ertime;
056
057
058    private long drtime;
059
060
061    /**
062     * Constructs a new DistHashTableServer.
063     */
064    public DistHashTableServer() {
065        this(DEFAULT_PORT);
066    }
067
068
069    /**
070     * DistHashTableServer.
071     * @param port to run server on.
072     */
073    public DistHashTableServer(int port) {
074        this(new ChannelFactory(port));
075    }
076
077
078    /**
079     * DistHashTableServer.
080     * @param cf ChannelFactory to use.
081     */
082    public DistHashTableServer(ChannelFactory cf) {
083        this.cf = cf;
084        cf.init();
085        servers = new ArrayList<DHTBroadcaster<K>>();
086        theList = new TreeMap<K, DHTTransport>();
087        etime = DHTTransport.etime;
088        dtime = DHTTransport.dtime;
089        ertime = DHTTransport.ertime;
090        drtime = DHTTransport.drtime;
091    }
092
093
094    /**
095     * main. Usage: DistHashTableServer &lt;port&gt;
096     */
097    public static void main(String[] args) throws InterruptedException {
098        int port = DEFAULT_PORT;
099        if (args.length < 1) {
100            System.out.println("Usage: DistHashTableServer <port>");
101        } else {
102            try {
103                port = Integer.parseInt(args[0]);
104            } catch (NumberFormatException e) {
105            }
106        }
107        DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port);
108        dhts.init();
109        dhts.join();
110        // until CRTL-C
111    }
112
113
114    /**
115     * thread initialization and start.
116     */
117    public void init() {
118        this.start();
119    }
120
121
122    /**
123     * main server method.
124     */
125    @Override
126    public void run() {
127        SocketChannel channel = null;
128        DHTBroadcaster<K> s = null;
129        mythread = Thread.currentThread();
130        Entry<K, DHTTransport> e;
131        DHTTransport tc;
132        while (goon) {
133            //logger.debug("list server " + this + " go on");
134            try {
135                channel = cf.getChannel();
136                if (logger.isDebugEnabled()) {
137                    logger.debug("dls channel = " + channel);
138                }
139                if (mythread.isInterrupted()) {
140                    goon = false;
141                    //logger.info("list server " + this + " interrupted");
142                } else {
143                    s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList);
144                    int ls = 0;
145                    synchronized (servers) {
146                        if (goon) {
147                            servers.add(s);
148                            ls = theList.size();
149                            s.start();
150                        }
151                    }
152                    if (logger.isDebugEnabled()) {
153                        logger.info("server " + s + " started " + s.isAlive());
154                    }
155                    if (ls > 0) {
156                        //logger.debug("sending " + ls + " list elements");
157                        synchronized (theList) {
158                            Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator();
159                            for (int i = 0; i < ls; i++) {
160                                e = it.next();
161                                // n = e.getKey(); // findbugs, already in tc
162                                tc = e.getValue();
163                                //DHTTransport tc = (DHTTransport) o;                             
164                                try {
165                                    s.sendChannel(tc);
166                                } catch (IOException ioe) {
167                                    // stop s
168                                }
169                            }
170                        }
171                    }
172                }
173            } catch (InterruptedException end) {
174                goon = false;
175                Thread.currentThread().interrupt();
176            }
177        }
178        if (logger.isDebugEnabled()) {
179            logger.info("DHTserver " + this + " terminated");
180        }
181    }
182
183
184    /**
185     * terminate all servers.
186     */
187    public void terminate() {
188        goon = false;
189        logger.debug("terminating");
190        if (cf != null) {
191            cf.terminate();
192        }
193        int svs = 0;
194        List<DHTBroadcaster<K>> scopy = null;
195        if (servers != null) {
196            synchronized (servers) {
197                svs = servers.size();
198                scopy = new ArrayList<DHTBroadcaster<K>>(servers);
199                Iterator<DHTBroadcaster<K>> it = scopy.iterator();
200                while (it.hasNext()) {
201                    DHTBroadcaster<K> br = it.next();
202                    br.goon = false;
203                    br.closeChannel();
204                    try {
205                        int c = 0;
206                        while (br.isAlive()) {
207                            c++;
208                            if (c > 10) {
209                                logger.warn("giving up on " + br);
210                                break;
211                            }
212                            //System.out.print(".");
213                            br.interrupt();
214                            br.join(100);
215                        }
216                        if (logger.isDebugEnabled()) {
217                            logger.info("server+ " + br + " terminated");
218                        }
219                        // now possible: 
220                        servers.remove(br);
221                    } catch (InterruptedException e) {
222                        Thread.currentThread().interrupt();
223                    }
224                }
225                servers.clear();
226            }
227            logger.info("" + svs + " broadcasters terminated " + scopy);
228            //? servers = null;
229        }
230        logger.debug("DHTBroadcasters terminated");
231        long enc = DHTTransport.etime - etime;
232        long dec = DHTTransport.dtime - dtime;
233        long encr = DHTTransport.ertime - ertime;
234        long decr = DHTTransport.drtime - drtime;
235        long drest = (encr * dec) / (enc + 1);
236        logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr
237                        + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = "
238                        + (enc + dec + encr + drest)); // +decr not meaningful
239        if (mythread == null) {
240            return;
241        }
242        try {
243            while (mythread.isAlive()) {
244                //System.out.print("-");
245                mythread.interrupt();
246                mythread.join(100);
247            }
248            if (logger.isDebugEnabled()) {
249                logger.debug("server terminated " + mythread);
250            }
251        } catch (InterruptedException e) {
252            Thread.currentThread().interrupt();
253        }
254        mythread = null;
255        logger.debug("terminated");
256    }
257
258
259    /**
260     * number of servers.
261     */
262    public int size() {
263        if (servers == null) {
264            return -1;
265        }
266        //synchronized (servers) removed
267        return servers.size();
268    }
269
270
271    /**
272     * toString.
273     * @return a string representation of this.
274     */
275    @Override
276    public String toString() {
277        return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")";
278    }
279
280}
281
282
283/**
284 * Thread for broadcasting all incoming objects to the list clients.
285 */
286class DHTBroadcaster<K> extends Thread /*implements Runnable*/{
287
288
289    private static final Logger logger = Logger.getLogger(DHTBroadcaster.class);
290
291
292    private final SocketChannel channel;
293
294
295    private final List<DHTBroadcaster<K>> bcaster;
296
297
298    private final SortedMap<K, DHTTransport> theList;
299
300
301    volatile boolean goon = true;
302
303
304    /**
305     * DHTBroadcaster.
306     * @param s SocketChannel to use.
307     * @param bc list of broadcasters.
308     * @param le DHTCounter.
309     * @param sm SortedMap with key value pairs.
310     */
311    public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) {
312        channel = s;
313        bcaster = bc;
314        theList = sm;
315    }
316
317
318    /**
319     * closeChannel.
320     */
321    public void closeChannel() {
322        channel.close();
323    }
324
325
326    /**
327     * sendChannel.
328     * @param tc DHTTransport.
329     * @throws IOException
330     */
331    public void sendChannel(DHTTransport tc) throws IOException {
332        if (goon) {
333            channel.send(tc);
334        }
335    }
336
337
338    /**
339     * broadcast.
340     * @param o DHTTransport element to broadcast.
341     */
342    @SuppressWarnings("cast")
343    public void broadcast(DHTTransport o) {
344        if (logger.isDebugEnabled()) {
345            logger.debug("broadcast = " + o);
346        }
347        DHTTransport<K, Object> tc = null;
348        if (o == null) {
349            return;
350        }
351        //if ( ! (o instanceof DHTTransport) ) {
352        //   return;
353        //}
354        tc = (DHTTransport<K, Object>) o;
355        K key = null;
356        synchronized (theList) {
357            //test
358            //Object x = theList.get( tc.key );
359            //if ( x != null ) {
360            //   logger.info("theList duplicate key " + tc.key );
361            //}
362            try {
363                if (!(o instanceof DHTTransportClear)) {
364                    key = tc.key();
365                    theList.put(key, tc);
366                }
367            } catch (IOException e) {
368                logger.warn("IO exception: tc.key() not ok " + tc);
369                e.printStackTrace();
370            } catch (ClassNotFoundException e) {
371                logger.warn("CNF exception: tc.key() not ok " + tc);
372                e.printStackTrace();
373            } catch (Exception e) {
374                logger.warn("exception: tc.key() not ok " + tc);
375                e.printStackTrace();
376            }
377        }
378        logger.info("sending key=" + key + " to " + bcaster.size() + " nodes");
379        List<DHTBroadcaster<K>> bccopy = null;
380        synchronized (bcaster) {
381            bccopy = new ArrayList<DHTBroadcaster<K>>(bcaster);
382        }
383        Iterator<DHTBroadcaster<K>> it = bccopy.iterator();
384        while (it.hasNext()) {
385            DHTBroadcaster<K> br = it.next();
386            try {
387                if (logger.isDebugEnabled()) {
388                    logger.debug("bcasting to " + br);
389                }
390                br.sendChannel(tc);
391            } catch (IOException e) {
392                logger.info("bcaster, IOexception " + e);
393                synchronized (bcaster) {
394                    bcaster.remove(br); //no more: ConcurrentModificationException
395                }
396                try {
397                    br.goon = false;
398                    br.closeChannel();
399                    while (br.isAlive()) {
400                        br.interrupt();
401                        br.join(100);
402                    }
403                } catch (InterruptedException w) {
404                    Thread.currentThread().interrupt();
405                }
406                //
407                logger.info("bcaster.remove() " + br);
408            } catch (Exception e) {
409                logger.info("bcaster, exception " + e);
410            }
411        }
412    }
413
414
415    /**
416     * run.
417     */
418    @Override
419    public void run() {
420        goon = true;
421        while (goon) {
422            try {
423                logger.debug("trying to receive");
424                Object o = channel.receive();
425                if (this.isInterrupted()) {
426                    goon = false;
427                    break;
428                }
429                if (logger.isDebugEnabled()) {
430                    logger.debug("received = " + o);
431                }
432                if (!(o instanceof DHTTransport)) {
433                    logger.warn("wrong object type: " + o);
434                    goon = false;
435                    break; //continue;
436                }
437                if (o instanceof DHTTransportClear) {
438                    logger.info("receive, clear");
439                    synchronized (theList) {
440                        theList.clear();
441                        theList.notifyAll();
442                    }
443                }
444                DHTTransport tc = (DHTTransport) o;
445                broadcast(tc);
446                if (this.isInterrupted()) {
447                    goon = false;
448                }
449            } catch (IOException e) {
450                goon = false;
451                logger.info("receive, IO exception " + e);
452                //e.printStackTrace();
453            } catch (ClassNotFoundException e) {
454                goon = false;
455                logger.info("receive, CNF exception " + e);
456                e.printStackTrace();
457            } catch (Exception e) {
458                goon = false;
459                logger.info("receive, exception " + e);
460                e.printStackTrace();
461            }
462        }
463        if (logger.isDebugEnabled()) {
464            logger.info("terminated+ " + this);
465        }
466        synchronized (bcaster) {
467            bcaster.remove(this);
468        }
469        channel.close();
470    }
471
472
473    /**
474     * toString.
475     * @return a string representation of this.
476     */
477    @Override
478    public String toString() {
479        return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")";
480    }
481
482}