001/*
002 * $Id: DistributedListServer.java 4074 2012-07-28 10:04:58Z kredel $
003 */
004
005package edu.jas.util;
006
007import java.io.IOException;
008import java.io.Serializable;
009
010import java.util.Iterator;
011import java.util.List;
012import java.util.ArrayList;
013import java.util.SortedMap;
014import java.util.TreeMap;
015import java.util.Map.Entry;
016
017import org.apache.log4j.Logger;
018
019//import edu.unima.ky.parallel.ChannelFactory;
020//import edu.unima.ky.parallel.SocketChannel;
021
022
023/**
024 * Server for the distributed version of a list.
025 * @author Heinz Kredel
026 * @todo redistribute list for late comming clients, removal of elements.
027 */
028public class DistributedListServer extends Thread {
029
030    private static final Logger logger = Logger.getLogger(DistributedListServer.class);
031
032    public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99;
033    protected final ChannelFactory cf;
034
035    protected List<Broadcaster> servers;
036
037    private boolean goon = true;
038    private Thread mythread = null;
039
040    private Counter listElem = null;
041    protected final SortedMap<Counter,Object> theList;
042
043
044    /**
045     * Constructs a new DistributedListServer.
046     */ 
047
048    public DistributedListServer() {
049        this(DEFAULT_PORT);
050    }
051
052    /**
053     * DistributedListServer.
054     * @param port to run server on.
055     */
056    public DistributedListServer(int port) {
057        this( new ChannelFactory(port) );
058    }
059
060    /**
061     * DistributedListServer.
062     * @param cf ChannelFactory to use.
063     */
064    public DistributedListServer(ChannelFactory cf) {
065        listElem = new Counter(0);
066        this.cf = cf;
067        cf.init();
068        servers = new ArrayList<Broadcaster>();
069        theList = new TreeMap<Counter,Object>();
070    }
071
072
073    /**
074     * main.
075     * Usage: DistributedListServer &lt;port&gt;
076     */
077    public static void main(String[] args) throws InterruptedException {
078        int port = DEFAULT_PORT;
079        if ( args.length < 1 ) {
080            System.out.println("Usage: DistributedListServer <port>");
081        } else {
082            try {
083                port = Integer.parseInt( args[0] );
084            } catch (NumberFormatException e) {
085            }
086        }
087        DistributedListServer dls = new DistributedListServer(port);
088        dls.init();
089        dls.join();
090        // until CRTL-C
091    }
092
093
094    /**
095     * thread initialization and start.
096     */ 
097    public void init() {
098        this.start();
099    }
100
101
102    /**
103     * main server method.
104     */ 
105    @Override
106     public void run() {
107        SocketChannel channel = null;
108        Broadcaster s = null;
109        mythread = Thread.currentThread();
110        Entry e;
111        Object n;
112        Object o;
113        while (goon) {
114            // logger.debug("list server " + this + " go on");
115            try {
116                channel = cf.getChannel();
117                logger.debug("dls channel = "+channel);
118                if ( mythread.isInterrupted() ) {
119                    goon = false;
120                    //logger.info("list server " + this + " interrupted");
121                } else {
122                    s = new Broadcaster(channel,servers,listElem,theList);
123                    int ls = 0;
124                    synchronized (servers) {
125                        servers.add( s );
126                        ls = theList.size();
127                        s.start();
128                    }
129                    //logger.debug("server " + s + " started");
130                    if ( ls > 0 ) {
131                        logger.info("sending " + ls + " list elements");
132                        synchronized (theList) {
133                            Iterator it = theList.entrySet().iterator();
134                            for ( int i = 0; i < ls; i++ ) {
135                                e = (Entry)it.next();
136                                n = e.getKey();
137                                o = e.getValue();
138                                try {
139                                    s.sendChannel( n,o );
140                                } catch (IOException ioe) {
141                                    // stop s
142                                }
143                            }
144                        } 
145                    }
146                }
147            } catch (InterruptedException end) {
148                goon = false;
149                Thread.currentThread().interrupt();
150            }
151        }
152        //logger.debug("listserver " + this + " terminated");
153    }
154
155
156    /**
157     * terminate all servers.
158     */ 
159    public void terminate() {
160        goon = false;
161        logger.debug("terminating ListServer");
162        if ( cf != null ) cf.terminate();
163        if ( servers != null ) {
164            Iterator it = servers.iterator();
165            while ( it.hasNext() ) {
166                Broadcaster br = (Broadcaster) it.next();
167                br.closeChannel();
168                try { 
169                    while ( br.isAlive() ) {
170                        //System.out.print(".");
171                        br.interrupt(); 
172                        br.join(100);
173                    }
174                    //logger.debug("server " + br + " terminated");
175                } catch (InterruptedException e) { 
176                    Thread.currentThread().interrupt();
177                }
178            }
179            servers = null;
180        }
181        logger.debug("Broadcasters terminated");
182        if ( mythread == null ) return;
183        try { 
184            while ( mythread.isAlive() ) {
185                // System.out.print("-");
186                mythread.interrupt(); 
187                mythread.join(100);
188            }
189            //logger.debug("server " + mythread + " terminated");
190        } catch (InterruptedException e) { 
191            Thread.currentThread().interrupt();
192        }
193        mythread = null;
194        logger.debug("ListServer terminated");
195    }
196
197
198    /**
199     * number of servers.
200     */ 
201    public int size() {
202        if ( servers == null ) {
203            return -1;
204        }
205        return servers.size();
206    }
207
208}
209
210
211/**
212 * Class for holding the list index used as key in TreeMap.
213 * Implemented since Integer has no add() method.
214 * Must implement Comparable so that TreeMap works with correct ordering.
215 */ 
216
217class Counter implements Serializable, Comparable<Counter> {
218
219    private int value;
220
221
222    /**
223     * Counter.
224     */
225    public Counter() {
226        this(0);
227    }
228
229
230    /**
231     * Counter.
232     * @param v
233     */
234    public Counter(int v) {
235        value = v;
236    }
237
238
239    /**
240     * intValue.
241     * @return the value.
242     */
243    public int intValue() {
244        return value;
245    }
246
247
248    /**
249     * add.
250     * @param v
251     */
252    public void add(int v) { // synchronized elsewhere
253        value += v;
254    }
255
256
257    /**
258     * equals.
259     * @param ob an Object.
260     * @return true if this is equal to o, else false.
261     */
262    @Override
263    public boolean equals(Object ob) {
264        if ( ! (ob instanceof Counter) ) {
265           return false;
266        }
267        return 0 == compareTo( (Counter)ob );
268    }
269
270
271    /**
272     * compareTo.
273     * @param c a Counter.
274     * @return 1 if (this &lt; c), 0 if (this == c), -1 if (this &gt; c).
275     */
276    public int compareTo(Counter c) {
277        int x = c.intValue();
278        if ( value > x ) { 
279            return 1;
280        }
281        if ( value < x ) { 
282            return -1;
283        }
284        return 0;
285    }
286
287
288    /**
289     * Hash code for this Counter.
290     * @see java.lang.Object#hashCode()
291     */
292    @Override
293    public int hashCode() {
294        return value;
295    }
296
297
298    /**
299     * toString.
300     */  
301    @Override
302     public String toString() {
303        return "Counter("+value+")";
304    }
305
306}
307
308
309/**
310 * Thread for broadcasting all incoming objects to the list clients.
311 */ 
312
313class Broadcaster extends Thread /*implements Runnable*/ {
314
315    private static final Logger logger = Logger.getLogger(Broadcaster.class);
316    private final SocketChannel channel;
317    private final List bcaster;
318    private Counter listElem;
319    private final SortedMap<Counter,Object> theList;
320
321
322    /**
323     * Broadcaster.
324     * @param s SocketChannel to use.
325     * @param p list of broadcasters.
326     * @param le counter
327     * @param sm SortedMap with counter value pairs.
328     */
329    public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) {
330        channel = s;
331        bcaster = p;
332        listElem = le;
333        theList = sm;
334    } 
335
336
337    /**
338     * closeChannel.
339     */
340    public void closeChannel() {
341        channel.close();
342    }
343
344
345    /**
346     * sendChannel.
347     * @param n counter.
348     * @param o value.
349     * @throws IOException
350     */
351    public void sendChannel(Object n, Object o) throws IOException {
352        synchronized (channel) {
353            channel.send(n);
354            channel.send(o);
355        }
356    }
357
358
359    /**
360     * broadcast.
361     * @param o object to store and send.
362     */
363    public void broadcast(Object o) {
364        Counter li = null;
365        synchronized (listElem) {
366            listElem.add(1);
367            li = new Counter( listElem.intValue() );
368        }
369        synchronized (theList) {
370            theList.put( li, o );
371        }
372        synchronized (bcaster) {
373            Iterator it = bcaster.iterator();
374            while ( it.hasNext() ) {
375                Broadcaster br = (Broadcaster) it.next();
376                try {
377                    br.sendChannel(li,o);
378                    //System.out.println("bcast: "+o+" to "+x.channel);
379                } catch (IOException e) {
380                    try { 
381                        br.closeChannel();
382                        while ( br.isAlive() ) {
383                            br.interrupt(); 
384                            br.join(100);
385                        }
386                    } catch (InterruptedException u) { 
387                        Thread.currentThread().interrupt();
388                    }
389                    bcaster.remove( br );
390                }
391            }
392        }
393    }
394
395
396    /**
397     * run.
398     */
399    @Override
400     public void run() {
401        Object o;
402        boolean goon = true;
403        while (goon) {
404            try {
405                o = channel.receive();
406                //System.out.println("receive: "+o+" from "+channel);
407                broadcast(o);
408                if ( this.isInterrupted() ) {
409                    goon = false;
410                }
411            } catch (IOException e) {
412                goon = false;
413            } catch (ClassNotFoundException e) {
414                goon = false;
415                e.printStackTrace();
416
417            }
418        }
419        logger.debug("broadcaster terminated "+this);
420        channel.close();
421    }
422
423
424    /**
425     * toString.
426     * @return a string representation of this.
427     */
428    @Override
429     public String toString() {
430        return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")";
431    }
432
433}