001    /*
002     * $Id: DistributedListServer.java 3279 2010-08-21 20:18:25Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    import java.io.IOException;
008    import java.io.Serializable;
009    
010    import java.util.Iterator;
011    import java.util.List;
012    import java.util.ArrayList;
013    import java.util.SortedMap;
014    import java.util.TreeMap;
015    import java.util.Map.Entry;
016    
017    import org.apache.log4j.Logger;
018    
019    //import edu.unima.ky.parallel.ChannelFactory;
020    //import edu.unima.ky.parallel.SocketChannel;
021    
022    
023    /**
024     * Server for the distributed version of a list.
025     * @author Heinz Kredel
026     * @todo redistribute list for late comming clients, removal of elements.
027     */
028    public class DistributedListServer extends Thread {
029    
030        private static final Logger logger = Logger.getLogger(DistributedListServer.class);
031    
032        public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99;
033        protected final ChannelFactory cf;
034    
035        protected List<Broadcaster> servers;
036    
037        private boolean goon = true;
038        private Thread mythread = null;
039    
040        private Counter listElem = null;
041        protected final SortedMap<Counter,Object> theList;
042    
043    
044        /**
045         * Constructs a new DistributedListServer.
046         */ 
047    
048        public DistributedListServer() {
049            this(DEFAULT_PORT);
050        }
051    
052        /**
053         * DistributedListServer.
054         * @param port to run server on.
055         */
056        public DistributedListServer(int port) {
057            this( new ChannelFactory(port) );
058        }
059    
060        /**
061         * DistributedListServer.
062         * @param cf ChannelFactory to use.
063         */
064        public DistributedListServer(ChannelFactory cf) {
065            listElem = new Counter(0);
066            this.cf = cf;
067            cf.init();
068            servers = new ArrayList<Broadcaster>();
069            theList = new TreeMap<Counter,Object>();
070        }
071    
072    
073        /**
074         * main.
075         * Usage: DistributedListServer &lt;port&gt;
076         */
077        public static void main(String[] args) {
078            int port = DEFAULT_PORT;
079            if ( args.length < 1 ) {
080                System.out.println("Usage: DistributedListServer <port>");
081            } else {
082                try {
083                    port = Integer.parseInt( args[0] );
084                } catch (NumberFormatException e) {
085                }
086            }
087            (new DistributedListServer(port)).run();
088            // until CRTL-C
089        }
090    
091    
092        /**
093         * thread initialization and start.
094         */ 
095        public void init() {
096            this.start();
097        }
098    
099    
100        /**
101         * main server method.
102         */ 
103        @Override
104         public void run() {
105            SocketChannel channel = null;
106            Broadcaster s = null;
107            mythread = Thread.currentThread();
108            Entry e;
109            Object n;
110            Object o;
111            while (goon) {
112                // logger.debug("list server " + this + " go on");
113                try {
114                    channel = cf.getChannel();
115                    logger.debug("dls channel = "+channel);
116                    if ( mythread.isInterrupted() ) {
117                        goon = false;
118                        //logger.info("list server " + this + " interrupted");
119                    } else {
120                        s = new Broadcaster(channel,servers,listElem,theList);
121                        int ls = 0;
122                        synchronized (servers) {
123                            servers.add( s );
124                            ls = theList.size();
125                            s.start();
126                        }
127                        //logger.debug("server " + s + " started");
128                        if ( ls > 0 ) {
129                            logger.info("sending " + ls + " list elements");
130                            synchronized (theList) {
131                                Iterator it = theList.entrySet().iterator();
132                                for ( int i = 0; i < ls; i++ ) {
133                                    e = (Entry)it.next();
134                                    n = e.getKey();
135                                    o = e.getValue();
136                                    try {
137                                        s.sendChannel( n,o );
138                                    } catch (IOException ioe) {
139                                        // stop s
140                                    }
141                                }
142                            } 
143                        }
144                    }
145                } catch (InterruptedException end) {
146                    goon = false;
147                    Thread.currentThread().interrupt();
148                }
149            }
150            //logger.debug("listserver " + this + " terminated");
151        }
152    
153    
154        /**
155         * terminate all servers.
156         */ 
157        public void terminate() {
158            goon = false;
159            logger.debug("terminating ListServer");
160            if ( cf != null ) cf.terminate();
161            if ( servers != null ) {
162                Iterator it = servers.iterator();
163                while ( it.hasNext() ) {
164                    Broadcaster br = (Broadcaster) it.next();
165                    br.closeChannel();
166                    try { 
167                        while ( br.isAlive() ) {
168                            //System.out.print(".");
169                            br.interrupt(); 
170                            br.join(100);
171                        }
172                        //logger.debug("server " + br + " terminated");
173                    } catch (InterruptedException e) { 
174                        Thread.currentThread().interrupt();
175                    }
176                }
177                servers = null;
178            }
179            logger.debug("Broadcasters terminated");
180            if ( mythread == null ) return;
181            try { 
182                while ( mythread.isAlive() ) {
183                    // System.out.print("-");
184                    mythread.interrupt(); 
185                    mythread.join(100);
186                }
187                //logger.debug("server " + mythread + " terminated");
188            } catch (InterruptedException e) { 
189                Thread.currentThread().interrupt();
190            }
191            mythread = null;
192            logger.debug("ListServer terminated");
193        }
194    
195    
196        /**
197         * number of servers.
198         */ 
199        public int size() {
200            return servers.size();
201        }
202    
203    }
204    
205    
206    /**
207     * Class for holding the list index used a key in TreeMap.
208     * Implemented since Integer has no add() method.
209     * Must implement Comparable so that TreeMap works with correct ordering.
210     */ 
211    
212    class Counter implements Serializable, Comparable<Counter> {
213    
214        private int value;
215    
216    
217        /**
218         * Counter.
219         */
220        public Counter() {
221            this(0);
222        }
223    
224    
225        /**
226         * Counter.
227         * @param v
228         */
229        public Counter(int v) {
230            value = v;
231        }
232    
233    
234        /**
235         * intValue.
236         * @return the value.
237         */
238        public int intValue() {
239            return value;
240        }
241    
242    
243        /**
244         * add.
245         * @param v
246         */
247        public void add(int v) { // synchronized elsewhere
248            value += v;
249        }
250    
251    
252        /**
253         * equals.
254         * @param ob an Object.
255         * @return true if this is equal to o, else false.
256         */
257        @Override
258         public boolean equals(Object ob) {
259            if ( ! (ob instanceof Counter) ) {
260               return false;
261            }
262            return 0 == compareTo( (Counter)ob );
263        }
264    
265    
266        /**
267         * compareTo.
268         * @param c a Counter.
269         * @return 1 if (this &lt; c), 0 if (this == c), -1 if (this &gt; c).
270         */
271        public int compareTo(Counter c) {
272            int x = c.intValue();
273            if ( value > x ) { 
274                return 1;
275            }
276            if ( value < x ) { 
277                return -1;
278            }
279            return 0;
280        }
281    
282    
283        /**
284         * toString.
285         */  
286        @Override
287         public String toString() {
288            return "Counter("+value+")";
289        }
290    
291    }
292    
293    
294    /**
295     * Thread for broadcasting all incoming objects to the list clients.
296     */ 
297    
298    class Broadcaster extends Thread /*implements Runnable*/ {
299    
300        private static final Logger logger = Logger.getLogger(Broadcaster.class);
301        private final SocketChannel channel;
302        private final List bcaster;
303        private Counter listElem;
304        private final SortedMap<Counter,Object> theList;
305    
306    
307        /**
308         * Broadcaster.
309         * @param s SocketChannel to use.
310         * @param p list of broadcasters.
311         * @param le counter
312         * @param sm SortedMap with counter value pairs.
313         */
314        public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) {
315            channel = s;
316            bcaster = p;
317            listElem = le;
318            theList = sm;
319        } 
320    
321    
322        /**
323         * closeChannel.
324         */
325        public void closeChannel() {
326            channel.close();
327        }
328    
329    
330        /**
331         * sendChannel.
332         * @param n counter.
333         * @param o value.
334         * @throws IOException
335         */
336        public void sendChannel(Object n, Object o) throws IOException {
337            synchronized (channel) {
338                channel.send(n);
339                channel.send(o);
340            }
341        }
342    
343    
344        /**
345         * broadcast.
346         * @param o object to store and send.
347         */
348        public void broadcast(Object o) {
349            Counter li = null;
350            synchronized (listElem) {
351                listElem.add(1);
352                li = new Counter( listElem.intValue() );
353            }
354            synchronized (theList) {
355                theList.put( li, o );
356            }
357            synchronized (bcaster) {
358                Iterator it = bcaster.iterator();
359                while ( it.hasNext() ) {
360                    Broadcaster br = (Broadcaster) it.next();
361                    try {
362                        br.sendChannel(li,o);
363                        //System.out.println("bcast: "+o+" to "+x.channel);
364                    } catch (IOException e) {
365                        try { 
366                            br.closeChannel();
367                            while ( br.isAlive() ) {
368                                br.interrupt(); 
369                                br.join(100);
370                            }
371                        } catch (InterruptedException u) { 
372                            Thread.currentThread().interrupt();
373                        }
374                        bcaster.remove( br );
375                    }
376                }
377            }
378        }
379    
380    
381        /**
382         * run.
383         */
384        @Override
385         public void run() {
386            Object o;
387            boolean goon = true;
388            while (goon) {
389                try {
390                    o = channel.receive();
391                    //System.out.println("receive: "+o+" from "+channel);
392                    broadcast(o);
393                    if ( this.isInterrupted() ) {
394                        goon = false;
395                    }
396                } catch (IOException e) {
397                    goon = false;
398                } catch (ClassNotFoundException e) {
399                    goon = false;
400                    e.printStackTrace();
401    
402                }
403            }
404            logger.debug("broadcaster terminated "+this);
405            channel.close();
406        }
407    
408    
409        /**
410         * toString.
411         * @return a string representation of this.
412         */
413        @Override
414         public String toString() {
415            return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")";
416        }
417    
418    }