001/*
002 * $Id: DistHashTableServer.java 4074 2012-07-28 10:04:58Z kredel $
003 */
004
005package edu.jas.util;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Iterator;
011import java.util.List;
012import java.util.Map.Entry;
013import java.util.SortedMap;
014import java.util.TreeMap;
015
016import org.apache.log4j.Logger;
017
018
019/**
020 * Server for the distributed version of a list.
021 * @author Heinz Kredel
022 * @todo redistribute list for late coming clients, removal of elements.
023 */
024
025public class DistHashTableServer<K> extends Thread {
026
027
028    private static final Logger logger = Logger.getLogger(DistHashTableServer.class);
029
030
031    public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99;
032
033
034    protected final ChannelFactory cf;
035
036
037    protected List<DHTBroadcaster<K>> servers;
038
039
040    private boolean goon = true;
041
042
043    private Thread mythread = null;
044
045
046    protected final SortedMap<K, DHTTransport> theList;
047
048
049    private long etime;
050
051
052    private long dtime;
053
054
055    private long ertime;
056
057
058    private long drtime;
059
060
061    /**
062     * Constructs a new DistHashTableServer.
063     */
064    public DistHashTableServer() {
065        this(DEFAULT_PORT);
066    }
067
068
069    /**
070     * DistHashTableServer.
071     * @param port to run server on.
072     */
073    public DistHashTableServer(int port) {
074        this(new ChannelFactory(port));
075    }
076
077
078    /**
079     * DistHashTableServer.
080     * @param cf ChannelFactory to use.
081     */
082    public DistHashTableServer(ChannelFactory cf) {
083        this.cf = cf;
084        cf.init();
085        servers = new ArrayList<DHTBroadcaster<K>>();
086        theList = new TreeMap<K, DHTTransport>();
087        etime = DHTTransport.etime;
088        dtime = DHTTransport.dtime;
089        ertime = DHTTransport.ertime;
090        drtime = DHTTransport.drtime;
091    }
092
093
094    /**
095     * main. Usage: DistHashTableServer &lt;port&gt;
096     */
097    public static void main(String[] args) throws InterruptedException {
098        int port = DEFAULT_PORT;
099        if (args.length < 1) {
100            System.out.println("Usage: DistHashTableServer <port>");
101        } else {
102            try {
103                port = Integer.parseInt(args[0]);
104            } catch (NumberFormatException e) {
105            }
106        }
107        DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port);
108        dhts.init();
109        dhts.join();
110        // until CRTL-C
111    }
112
113
114    /**
115     * thread initialization and start.
116     */
117    public void init() {
118        this.start();
119    }
120
121
122    /**
123     * main server method.
124     */
125    @Override
126    public void run() {
127        SocketChannel channel = null;
128        DHTBroadcaster<K> s = null;
129        mythread = Thread.currentThread();
130        Entry<K, DHTTransport> e;
131        DHTTransport tc;
132        while (goon) {
133            //logger.debug("list server " + this + " go on");
134            try {
135                channel = cf.getChannel();
136                if (logger.isDebugEnabled()) {
137                    logger.debug("dls channel = " + channel);
138                }
139                if (mythread.isInterrupted()) {
140                    goon = false;
141                    //logger.info("list server " + this + " interrupted");
142                } else {
143                    s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList);
144                    int ls = 0;
145                    synchronized (servers) {
146                        if (goon) {
147                            servers.add(s);
148                            ls = theList.size();
149                            s.start();
150                        }
151                    }
152                    if (logger.isInfoEnabled()) {
153                        logger.info("server " + s + " started " + s.isAlive());
154                    }
155                    if (ls > 0) {
156                        //logger.debug("sending " + ls + " list elements");
157                        synchronized (theList) {
158                            Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator();
159                            for (int i = 0; i < ls; i++) {
160                                e = it.next();
161                                // n = e.getKey(); // findbugs, already in tc
162                                tc = e.getValue();
163                                //DHTTransport tc = (DHTTransport) o;                             
164                                try {
165                                    s.sendChannel(tc);
166                                } catch (IOException ioe) {
167                                    // stop s
168                                }
169                            }
170                        }
171                    }
172                }
173            } catch (InterruptedException end) {
174                goon = false;
175                Thread.currentThread().interrupt();
176            }
177        }
178        if (logger.isDebugEnabled()) {
179            logger.debug("listserver " + this + " terminated");
180        }
181    }
182
183
184    /**
185     * terminate all servers.
186     */
187    public void terminate() {
188        goon = false;
189        logger.debug("terminating ListServer");
190        if (cf != null) {
191            cf.terminate();
192        }
193        int svs = 0;
194        if (servers != null) {
195            synchronized (servers) {
196                svs = servers.size();
197                Iterator<DHTBroadcaster<K>> it = servers.iterator();
198                while (it.hasNext()) {
199                    DHTBroadcaster<K> br = it.next();
200                    br.closeChannel();
201                    try {
202                        int c = 0;
203                        while (br.isAlive()) {
204                            c++;
205                            if (c > 10) {
206                                logger.warn("giving up on " + br);
207                                break;
208                            }
209                            //System.out.print(".");
210                            br.interrupt();
211                            br.join(100);
212                        }
213                        if (logger.isDebugEnabled()) {
214                            logger.debug("server " + br + " terminated");
215                        }
216                    } catch (InterruptedException e) {
217                        Thread.currentThread().interrupt();
218                    }
219                }
220                servers.clear();
221            }
222            logger.info(svs + " broadcasters terminated");
223            //? servers = null;
224        }
225        logger.debug("DHTBroadcasters terminated");
226        long enc = DHTTransport.etime - etime;
227        long dec = DHTTransport.dtime - dtime;
228        long encr = DHTTransport.ertime - ertime;
229        long decr = DHTTransport.drtime - drtime;
230        long drest = (encr * dec) / (enc + 1);
231        logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr
232                        + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = "
233                        + (enc + dec + encr + drest)); // +decr not meaningful
234        if (mythread == null) {
235            return;
236        }
237        try {
238            while (mythread.isAlive()) {
239                //System.out.print("-");
240                mythread.interrupt();
241                mythread.join(100);
242            }
243            if (logger.isDebugEnabled()) {
244                logger.debug("server terminated " + mythread);
245            }
246        } catch (InterruptedException e) {
247            Thread.currentThread().interrupt();
248        }
249        mythread = null;
250        logger.debug("ListServer terminated");
251    }
252
253
254    /**
255     * number of servers.
256     */
257    public int size() {
258        if ( servers == null ) {
259            return -1;
260        }
261        synchronized (servers) {
262            return servers.size();
263        }
264    }
265
266
267    /**
268     * toString.
269     * @return a string representation of this.
270     */
271    @Override
272    public String toString() {
273        return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")";
274    }
275
276}
277
278
279/**
280 * Thread for broadcasting all incoming objects to the list clients.
281 */
282
283class DHTBroadcaster<K> extends Thread /*implements Runnable*/{
284
285
286    private static final Logger logger = Logger.getLogger(DHTBroadcaster.class);
287
288
289    private final SocketChannel channel;
290
291
292    private final List<DHTBroadcaster<K>> bcaster;
293
294
295    private final SortedMap<K, DHTTransport> theList;
296
297
298    /**
299     * DHTBroadcaster.
300     * @param s SocketChannel to use.
301     * @param bc list of broadcasters.
302     * @param le DHTCounter.
303     * @param sm SortedMap with key value pairs.
304     */
305    public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) {
306        channel = s;
307        bcaster = bc;
308        theList = sm;
309    }
310
311
312    /**
313     * closeChannel.
314     */
315    public void closeChannel() {
316        channel.close();
317    }
318
319
320    /**
321     * sendChannel.
322     * @param tc DHTTransport.
323     * @throws IOException
324     */
325    public void sendChannel(DHTTransport tc) throws IOException {
326        channel.send(tc);
327    }
328
329
330    /**
331     * broadcast.
332     * @param o DHTTransport element to broadcast.
333     */
334    @SuppressWarnings("unchecked")
335    public void broadcast(DHTTransport o) {
336        if (logger.isDebugEnabled()) {
337            logger.debug("broadcast = " + o);
338        }
339        DHTTransport<K, Object> tc = null;
340        if (o == null) {
341            return;
342        }
343        //if ( ! (o instanceof DHTTransport) ) {
344        //   return;
345        //}
346        tc = (DHTTransport<K, Object>) o;
347        K key = null;
348        synchronized (theList) {
349            //test
350            //Object x = theList.get( tc.key );
351            //if ( x != null ) {
352            //   logger.info("theList duplicate key " + tc.key );
353            //}
354            try {
355                key = tc.key();
356                theList.put(key, tc);
357            } catch (IOException e) {
358                logger.warn("IO exception: tc.key() not ok " + tc);
359                e.printStackTrace();
360            } catch (ClassNotFoundException e) {
361                logger.warn("CNF exception: tc.key() not ok " + tc);
362                e.printStackTrace();
363            } catch (Exception e) {
364                logger.warn("exception:tc.key() not ok " + tc);
365                e.printStackTrace();
366            }
367        }
368        logger.info("sending key=" + key + " to " + bcaster.size() + " nodes");
369        synchronized (bcaster) {
370            Iterator<DHTBroadcaster<K>> it = bcaster.iterator();
371            while (it.hasNext()) {
372                DHTBroadcaster<K> br = it.next();
373                try {
374                    if (logger.isDebugEnabled()) {
375                        logger.debug("bcasting to " + br);
376                    }
377                    br.sendChannel(tc);
378                } catch (IOException e) {
379                    logger.info("bcaster, exception " + e);
380                    try {
381                        br.closeChannel();
382                        while (br.isAlive()) {
383                            br.interrupt();
384                            br.join(100);
385                        }
386                    } catch (InterruptedException w) {
387                        Thread.currentThread().interrupt();
388                    }
389                    it.remove( /*br*/); //ConcurrentModificationException
390                    logger.debug("bcaster.remove() " + br);
391                } catch (Exception e) {
392                    logger.info("bcaster, exception " + e);
393                }
394            }
395        }
396    }
397
398
399    /**
400     * run.
401     */
402    @Override
403    public void run() {
404        boolean goon = true;
405        while (goon) {
406            try {
407                logger.debug("trying to receive");
408                Object o = channel.receive();
409                if (this.isInterrupted()) {
410                    break;
411                }
412                if (logger.isDebugEnabled()) {
413                    logger.debug("received = " + o);
414                }
415                if (!(o instanceof DHTTransport)) {
416                    logger.warn("swallowed: " + o);
417                    continue;
418                }
419                DHTTransport tc = (DHTTransport) o;
420                broadcast(tc);
421                if (this.isInterrupted()) {
422                    goon = false;
423                }
424            } catch (IOException e) {
425                goon = false;
426                logger.info("receive, IO exception " + e);
427                //e.printStackTrace();
428            } catch (ClassNotFoundException e) {
429                goon = false;
430                logger.info("receive, CNF exception " + e);
431                e.printStackTrace();
432            } catch (Exception e) {
433                goon = false;
434                logger.info("receive, exception " + e);
435                e.printStackTrace();
436            }
437        }
438        if (logger.isDebugEnabled()) {
439            logger.debug("DHTBroadcaster terminated " + this);
440        }
441        channel.close();
442    }
443
444
445    /**
446     * toString.
447     * @return a string representation of this.
448     */
449    @Override
450    public String toString() {
451        return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")";
452    }
453
454}