001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007import java.io.IOException;
008import java.io.Serializable;
009
010import java.util.Iterator;
011import java.util.List;
012import java.util.ArrayList;
013import java.util.SortedMap;
014import java.util.TreeMap;
015import java.util.Map.Entry;
016
017import org.apache.logging.log4j.Logger;
018import org.apache.logging.log4j.LogManager; 
019
020//import edu.unima.ky.parallel.ChannelFactory;
021//import edu.unima.ky.parallel.SocketChannel;
022
023
024/**
025 * Server for the distributed version of a list.
026 * @author Heinz Kredel
027 * TODO: redistribute list for late coming clients, removal of elements.
028 */
029public class DistributedListServer extends Thread {
030
031    private static final Logger logger = LogManager.getLogger(DistributedListServer.class);
032
033    public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99;
034
035
036    protected final ChannelFactory cf;
037
038    protected List<Broadcaster> servers;
039
040
041    private volatile boolean goon = true;
042
043    private volatile Thread mythread = null;
044
045
046    private Counter listElem = null;
047
048    protected final SortedMap<Counter,Object> theList;
049
050
051    /**
052     * Constructs a new DistributedListServer.
053     */ 
054
055    public DistributedListServer() {
056        this(DEFAULT_PORT);
057    }
058
059    /**
060     * DistributedListServer.
061     * @param port to run server on.
062     */
063    public DistributedListServer(int port) {
064        this( new ChannelFactory(port) );
065    }
066
067    /**
068     * DistributedListServer.
069     * @param cf ChannelFactory to use.
070     */
071    public DistributedListServer(ChannelFactory cf) {
072        listElem = new Counter(0);
073        this.cf = cf;
074        cf.init();
075        servers = new ArrayList<Broadcaster>();
076        theList = new TreeMap<Counter,Object>();
077    }
078
079
080    /**
081     * main.
082     * Usage: DistributedListServer &lt;port&gt;
083     */
084    public static void main(String[] args) throws InterruptedException {
085        int port = DEFAULT_PORT;
086        if ( args.length < 1 ) {
087            System.out.println("Usage: DistributedListServer <port>");
088        } else {
089            try {
090                port = Integer.parseInt( args[0] );
091            } catch (NumberFormatException e) {
092            }
093        }
094        DistributedListServer dls = new DistributedListServer(port);
095        dls.init();
096        dls.join();
097        // until CRTL-C
098    }
099
100
101    /**
102     * thread initialization and start.
103     */ 
104    public void init() {
105        this.start();
106    }
107
108
109    /**
110     * main server method.
111     */ 
112    @Override
113     public void run() {
114        SocketChannel channel = null;
115        Broadcaster s = null;
116        mythread = Thread.currentThread();
117        Entry e;
118        Object n;
119        Object o;
120        while (goon) {
121            // logger.debug("list server {} go on", this);
122            try {
123                channel = cf.getChannel();
124                logger.debug("dls channel = {}", channel);
125                if ( mythread.isInterrupted() ) {
126                    goon = false;
127                    //logger.info("list server {} interrupted", this);
128                } else {
129                    s = new Broadcaster(channel,servers,listElem,theList);
130                    int ls = 0;
131                    synchronized (servers) {
132                        servers.add( s );
133                        ls = theList.size();
134                        s.start();
135                    }
136                    //logger.debug("server {} started", s);
137                    if ( ls > 0 ) {
138                        logger.info("sending {} list elements", ls);
139                        synchronized (theList) {
140                            Iterator it = theList.entrySet().iterator();
141                            for ( int i = 0; i < ls; i++ ) {
142                                e = (Entry)it.next();
143                                n = e.getKey();
144                                o = e.getValue();
145                                try {
146                                    s.sendChannel( n,o );
147                                } catch (IOException ioe) {
148                                    // stop s
149                                }
150                            }
151                        } 
152                    }
153                }
154            } catch (InterruptedException end) {
155                goon = false;
156                Thread.currentThread().interrupt();
157            }
158        }
159        //logger.debug("listserver {} terminated", this);
160    }
161
162
163    /**
164     * terminate all servers.
165     */ 
166    public void terminate() {
167        goon = false;
168        logger.debug("terminating ListServer");
169        if ( cf != null ) cf.terminate();
170        if ( servers != null ) {
171            Iterator it = servers.iterator();
172            while ( it.hasNext() ) {
173                Broadcaster br = (Broadcaster) it.next();
174                br.closeChannel();
175                try { 
176                    while ( br.isAlive() ) {
177                        //System.out.print(".");
178                        br.interrupt(); 
179                        br.join(100);
180                    }
181                    //logger.debug("server {} terminated", br);
182                } catch (InterruptedException e) { 
183                    Thread.currentThread().interrupt();
184                }
185            }
186            servers = null;
187        }
188        logger.debug("Broadcasters terminated");
189        if ( mythread == null ) return;
190        try { 
191            while ( mythread.isAlive() ) {
192                // System.out.print("-");
193                mythread.interrupt(); 
194                mythread.join(100);
195            }
196            //logger.debug("server {} terminated", mythread);
197        } catch (InterruptedException e) { 
198            Thread.currentThread().interrupt();
199        }
200        mythread = null;
201        logger.debug("ListServer terminated");
202    }
203
204
205    /**
206     * number of servers.
207     */ 
208    public int size() {
209        if ( servers == null ) {
210            return -1;
211        }
212        return servers.size();
213    }
214
215}
216
217
218/**
219 * Class for holding the list index used as key in TreeMap.
220 * Implemented since Integer has no add() method.
221 * Must implement Comparable so that TreeMap works with correct ordering.
222 */ 
223
224class Counter implements Serializable, Comparable<Counter> {
225
226    private int value;
227
228
229    /**
230     * Counter.
231     */
232    public Counter() {
233        this(0);
234    }
235
236
237    /**
238     * Counter.
239     * @param v
240     */
241    public Counter(int v) {
242        value = v;
243    }
244
245
246    /**
247     * intValue.
248     * @return the value.
249     */
250    public int intValue() {
251        return value;
252    }
253
254
255    /**
256     * add.
257     * @param v
258     */
259    public void add(int v) { // synchronized elsewhere
260        value += v;
261    }
262
263
264    /**
265     * equals.
266     * @param ob an Object.
267     * @return true if this is equal to o, else false.
268     */
269    @Override
270    public boolean equals(Object ob) {
271        if ( ! (ob instanceof Counter) ) {
272           return false;
273        }
274        return 0 == compareTo( (Counter)ob );
275    }
276
277
278    /**
279     * compareTo.
280     * @param c a Counter.
281     * @return 1 if (this &lt; c), 0 if (this == c), -1 if (this &gt; c).
282     */
283    public int compareTo(Counter c) {
284        int x = c.intValue();
285        if ( value > x ) { 
286            return 1;
287        }
288        if ( value < x ) { 
289            return -1;
290        }
291        return 0;
292    }
293
294
295    /**
296     * Hash code for this Counter.
297     * @see java.lang.Object#hashCode()
298     */
299    @Override
300    public int hashCode() {
301        return value;
302    }
303
304
305    /**
306     * toString.
307     */  
308    @Override
309     public String toString() {
310        return "Counter("+value+")";
311    }
312
313}
314
315
316/**
317 * Thread for broadcasting all incoming objects to the list clients.
318 */ 
319
320class Broadcaster extends Thread /*implements Runnable*/ {
321
322    private static final Logger logger = LogManager.getLogger(Broadcaster.class);
323
324    private final SocketChannel channel;
325
326    private final List bcaster;
327
328    private Counter listElem;
329
330    private final SortedMap<Counter,Object> theList;
331
332
333    /**
334     * Broadcaster.
335     * @param s SocketChannel to use.
336     * @param p list of broadcasters.
337     * @param le counter
338     * @param sm SortedMap with counter value pairs.
339     */
340    public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) {
341        channel = s;
342        bcaster = p;
343        listElem = le;
344        theList = sm;
345    } 
346
347
348    /**
349     * closeChannel.
350     */
351    public void closeChannel() {
352        channel.close();
353    }
354
355
356    /**
357     * sendChannel.
358     * @param n counter.
359     * @param o value.
360     * @throws IOException
361     */
362    public void sendChannel(Object n, Object o) throws IOException {
363        synchronized (channel) {
364            channel.send(n);
365            channel.send(o);
366        }
367    }
368
369
370    /**
371     * broadcast.
372     * @param o object to store and send.
373     */
374    public void broadcast(Object o) {
375        Counter li = null;
376        synchronized (listElem) {
377            listElem.add(1);
378            li = new Counter( listElem.intValue() );
379        }
380        synchronized (theList) {
381            theList.put( li, o );
382        }
383        synchronized (bcaster) {
384            Iterator it = bcaster.iterator();
385            while ( it.hasNext() ) {
386                Broadcaster br = (Broadcaster) it.next();
387                try {
388                    br.sendChannel(li,o);
389                    //System.out.println("bcast: "+o+" to "+x.channel);
390                } catch (IOException e) {
391                    try { 
392                        br.closeChannel();
393                        while ( br.isAlive() ) {
394                            br.interrupt(); 
395                            br.join(100);
396                        }
397                    } catch (InterruptedException u) { 
398                        Thread.currentThread().interrupt();
399                    }
400                    bcaster.remove( br );
401                }
402            }
403        }
404    }
405
406
407    /**
408     * run.
409     */
410    @Override
411     public void run() {
412        Object o;
413        boolean goon = true;
414        while (goon) {
415            try {
416                o = channel.receive();
417                //System.out.println("receive: "+o+" from "+channel);
418                broadcast(o);
419                if ( this.isInterrupted() ) {
420                    goon = false;
421                }
422            } catch (IOException e) {
423                goon = false;
424            } catch (ClassNotFoundException e) {
425                goon = false;
426                e.printStackTrace();
427
428            }
429        }
430        logger.debug("broadcaster terminated {}", this);
431        channel.close();
432    }
433
434
435    /**
436     * toString.
437     * @return a string representation of this.
438     */
439    @Override
440     public String toString() {
441        return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")";
442    }
443
444}