001/*
002 * $Id: DistHashTableServer.java 5872 2018-07-20 16:01:46Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Iterator;
011import java.util.List;
012import java.util.Map.Entry;
013import java.util.SortedMap;
014import java.util.TreeMap;
015
016import org.apache.logging.log4j.Logger;
017import org.apache.logging.log4j.LogManager; 
018
019
020/**
021 * Server for the distributed version of a list.
022 * TODO: redistribute list for late coming clients, removal
023 *       of elements.
024 * @author Heinz Kredel
025 */
026
027public class DistHashTableServer<K> extends Thread {
028
029
030    private static final Logger logger = LogManager.getLogger(DistHashTableServer.class);
031
032
033    public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99;
034
035
036    protected final ChannelFactory cf;
037
038
039    protected List<DHTBroadcaster<K>> servers;
040
041
042    private volatile boolean goon = true;
043
044
045    private volatile Thread mythread = null;
046
047
048    protected final SortedMap<K, DHTTransport> theList;
049
050
051    private long etime;
052
053
054    private long dtime;
055
056
057    private long ertime;
058
059
060    private long drtime;
061
062
063    /**
064     * Constructs a new DistHashTableServer.
065     */
066    public DistHashTableServer() {
067        this(DEFAULT_PORT);
068    }
069
070
071    /**
072     * DistHashTableServer.
073     * @param port to run server on.
074     */
075    public DistHashTableServer(int port) {
076        this(new ChannelFactory(port));
077    }
078
079
080    /**
081     * DistHashTableServer.
082     * @param cf ChannelFactory to use.
083     */
084    public DistHashTableServer(ChannelFactory cf) {
085        this.cf = cf;
086        cf.init();
087        servers = new ArrayList<DHTBroadcaster<K>>();
088        theList = new TreeMap<K, DHTTransport>();
089        etime = DHTTransport.etime;
090        dtime = DHTTransport.dtime;
091        ertime = DHTTransport.ertime;
092        drtime = DHTTransport.drtime;
093    }
094
095
096    /**
097     * main. Usage: DistHashTableServer &lt;port&gt;
098     */
099    public static void main(String[] args) throws InterruptedException {
100        int port = DEFAULT_PORT;
101        if (args.length < 1) {
102            System.out.println("Usage: DistHashTableServer <port>");
103        } else {
104            try {
105                port = Integer.parseInt(args[0]);
106            } catch (NumberFormatException e) {
107            }
108        }
109        DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port);
110        dhts.init();
111        dhts.join();
112        // until CRTL-C
113    }
114
115
116    /**
117     * thread initialization and start.
118     */
119    public void init() {
120        this.start();
121    }
122
123
124    /**
125     * main server method.
126     */
127    @Override
128    public void run() {
129        SocketChannel channel = null;
130        DHTBroadcaster<K> s = null;
131        mythread = Thread.currentThread();
132        Entry<K, DHTTransport> e;
133        DHTTransport tc;
134        while (goon) {
135            //logger.debug("list server " + this + " go on");
136            try {
137                channel = cf.getChannel();
138                if (logger.isDebugEnabled()) {
139                    logger.debug("dls channel = " + channel);
140                }
141                if (mythread.isInterrupted()) {
142                    goon = false;
143                    //logger.info("list server " + this + " interrupted");
144                } else {
145                    s = new DHTBroadcaster<K>(channel, servers, /*listElem,*/theList);
146                    int ls = 0;
147                    synchronized (servers) {
148                        if (goon) {
149                            servers.add(s);
150                            ls = theList.size();
151                            s.start();
152                        }
153                    }
154                    if (logger.isDebugEnabled()) {
155                        logger.info("server " + s + " started " + s.isAlive());
156                    }
157                    if (ls > 0) {
158                        //logger.debug("sending " + ls + " list elements");
159                        synchronized (theList) {
160                            Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator();
161                            for (int i = 0; i < ls; i++) {
162                                e = it.next();
163                                // n = e.getKey(); // findbugs, already in tc
164                                tc = e.getValue();
165                                //DHTTransport tc = (DHTTransport) o;                             
166                                try {
167                                    s.sendChannel(tc);
168                                } catch (IOException ioe) {
169                                    // stop s
170                                }
171                            }
172                        }
173                    }
174                }
175            } catch (InterruptedException end) {
176                goon = false;
177                Thread.currentThread().interrupt();
178            }
179        }
180        if (logger.isDebugEnabled()) {
181            logger.info("DHTserver " + this + " terminated");
182        }
183    }
184
185
186    /**
187     * terminate all servers.
188     */
189    public void terminate() {
190        goon = false;
191        logger.info("terminating");
192        if (cf != null) {
193            cf.terminate();
194        }
195        int svs = 0;
196        List<DHTBroadcaster<K>> scopy = null;
197        if (servers != null) {
198            synchronized (servers) {
199                svs = servers.size();
200                scopy = new ArrayList<DHTBroadcaster<K>>(servers);
201                Iterator<DHTBroadcaster<K>> it = scopy.iterator();
202                while (it.hasNext()) {
203                    DHTBroadcaster<K> br = it.next();
204                    br.goon = false;
205                    br.closeChannel();
206                    try {
207                        int c = 0;
208                        while (br.isAlive()) {
209                            c++;
210                            if (c > 10) {
211                                logger.warn("giving up on " + br);
212                                break;
213                            }
214                            //System.out.print(".");
215                            br.interrupt();
216                            br.join(50);
217                        }
218                        if (logger.isDebugEnabled()) {
219                            logger.info("server+ " + br + " terminated");
220                        }
221                        // now possible: 
222                        servers.remove(br);
223                    } catch (InterruptedException e) {
224                        Thread.currentThread().interrupt();
225                    }
226                }
227                servers.clear();
228            }
229            logger.info("" + svs + " broadcasters terminated " + scopy);
230            //? servers = null;
231        }
232        logger.debug("DHTBroadcasters terminated");
233        long enc = DHTTransport.etime - etime;
234        long dec = DHTTransport.dtime - dtime;
235        long encr = DHTTransport.ertime - ertime;
236        long decr = DHTTransport.drtime - drtime;
237        long drest = (encr * dec) / (enc + 1);
238        logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr
239                        + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = "
240                        + (enc + dec + encr + drest)); // +decr not meaningful
241        if (mythread == null) {
242            return;
243        }
244        try {
245            while (mythread.isAlive()) {
246                //System.out.print("-");
247                mythread.interrupt();
248                mythread.join(100);
249            }
250            if (logger.isWarnEnabled()) {
251                logger.warn("server terminated " + mythread);
252            }
253        } catch (InterruptedException e) {
254            Thread.currentThread().interrupt();
255        }
256        mythread = null;
257        logger.info("terminated");
258    }
259
260
261    /**
262     * number of servers.
263     */
264    public int size() {
265        if (servers == null) {
266            return -1;
267        }
268        //synchronized (servers) removed
269        return servers.size();
270    }
271
272
273    /**
274     * toString.
275     * @return a string representation of this.
276     */
277    @Override
278    public String toString() {
279        return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")";
280    }
281
282}
283
284
285/**
286 * Thread for broadcasting all incoming objects to the list clients.
287 */
288class DHTBroadcaster<K> extends Thread /*implements Runnable*/ {
289
290
291    private static final Logger logger = LogManager.getLogger(DHTBroadcaster.class);
292
293
294    private final SocketChannel channel;
295
296
297    private final List<DHTBroadcaster<K>> bcaster;
298
299
300    private final SortedMap<K, DHTTransport> theList;
301
302
303    volatile boolean goon = true;
304
305
306    /**
307     * DHTBroadcaster.
308     * @param s SocketChannel to use.
309     * @param bc list of broadcasters.
310     * @param le DHTCounter.
311     * @param sm SortedMap with key value pairs.
312     */
313    public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) {
314        channel = s;
315        bcaster = bc;
316        theList = sm;
317    }
318
319
320    /**
321     * closeChannel.
322     */
323    public void closeChannel() {
324        channel.close();
325    }
326
327
328    /**
329     * sendChannel.
330     * @param tc DHTTransport.
331     * @throws IOException
332     */
333    public void sendChannel(DHTTransport tc) throws IOException {
334        if (goon) {
335            channel.send(tc);
336        }
337    }
338
339
340    /**
341     * broadcast.
342     * @param o DHTTransport element to broadcast.
343     */
344    @SuppressWarnings({ "unchecked", "cast" })
345    public void broadcast(DHTTransport o) {
346        if (logger.isDebugEnabled()) {
347            logger.debug("broadcast = " + o);
348        }
349        DHTTransport<K, Object> tc = null;
350        if (o == null) {
351            return;
352        }
353        //if ( ! (o instanceof DHTTransport) ) {
354        //   return;
355        //}
356        tc = (DHTTransport<K, Object>) o;
357        K key = null;
358        synchronized (theList) {
359            //test
360            //Object x = theList.get( tc.key );
361            //if ( x != null ) {
362            //   logger.info("theList duplicate key " + tc.key );
363            //}
364            try {
365                if (!(o instanceof DHTTransportClear)) {
366                    key = tc.key();
367                    theList.put(key, tc);
368                }
369            } catch (IOException e) {
370                logger.warn("IO exception: tc.key() not ok " + tc);
371                e.printStackTrace();
372            } catch (ClassNotFoundException e) {
373                logger.warn("CNF exception: tc.key() not ok " + tc);
374                e.printStackTrace();
375            } catch (Exception e) {
376                logger.warn("exception: tc.key() not ok " + tc);
377                e.printStackTrace();
378            }
379        }
380        logger.info("sending key=" + key + " to " + bcaster.size() + " nodes");
381        List<DHTBroadcaster<K>> bccopy = null;
382        synchronized (bcaster) {
383            bccopy = new ArrayList<DHTBroadcaster<K>>(bcaster);
384        }
385        Iterator<DHTBroadcaster<K>> it = bccopy.iterator();
386        while (it.hasNext()) {
387            DHTBroadcaster<K> br = it.next();
388            try {
389                if (logger.isDebugEnabled()) {
390                    logger.debug("bcasting to " + br);
391                }
392                br.sendChannel(tc);
393            } catch (IOException e) {
394                logger.info("bcaster, IOexception " + e);
395                synchronized (bcaster) {
396                    bcaster.remove(br); //no more: ConcurrentModificationException
397                }
398                try {
399                    br.goon = false;
400                    br.closeChannel();
401                    while (br.isAlive()) {
402                        br.interrupt();
403                        br.join(100);
404                    }
405                } catch (InterruptedException w) {
406                    Thread.currentThread().interrupt();
407                }
408                //
409                logger.info("bcaster.remove() " + br);
410            } catch (Exception e) {
411                logger.info("bcaster, exception " + e);
412            }
413        }
414    }
415
416
417    /**
418     * run.
419     */
420    @Override
421    public void run() {
422        goon = true;
423        while (goon) {
424            try {
425                logger.debug("trying to receive");
426                Object o = channel.receive();
427                if (this.isInterrupted()) {
428                    goon = false;
429                    break;
430                }
431                if (logger.isDebugEnabled()) {
432                    logger.debug("received = " + o);
433                }
434                if (!(o instanceof DHTTransport)) {
435                    logger.warn("wrong object type: " + o);
436                    goon = false;
437                    break; //continue;
438                }
439                if (o instanceof DHTTransportClear) {
440                    logger.info("receive, clear");
441                    synchronized (theList) {
442                        theList.clear();
443                        theList.notifyAll();
444                    }
445                }
446                DHTTransport tc = (DHTTransport) o;
447                broadcast(tc);
448                if (this.isInterrupted()) {
449                    goon = false;
450                }
451            } catch (IOException e) {
452                goon = false;
453                logger.info("receive, IO exception " + e);
454                //e.printStackTrace();
455            } catch (ClassNotFoundException e) {
456                goon = false;
457                logger.info("receive, CNF exception " + e);
458                e.printStackTrace();
459            } catch (Exception e) {
460                goon = false;
461                logger.info("receive, exception " + e);
462                e.printStackTrace();
463            }
464        }
465        if (logger.isInfoEnabled()) {
466            logger.info("ending " + this);
467        }
468        synchronized (bcaster) {
469            bcaster.remove(this);
470        }
471        channel.close();
472    }
473
474
475    /**
476     * toString.
477     * @return a string representation of this.
478     */
479    @Override
480    public String toString() {
481        return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")";
482    }
483
484}