001/*
002 * $Id: DistributedListServer.java 5872 2018-07-20 16:01:46Z kredel $
003 */
004
005package edu.jas.util;
006
007import java.io.IOException;
008import java.io.Serializable;
009
010import java.util.Iterator;
011import java.util.List;
012import java.util.ArrayList;
013import java.util.SortedMap;
014import java.util.TreeMap;
015import java.util.Map.Entry;
016
017import org.apache.logging.log4j.Logger;
018import org.apache.logging.log4j.LogManager; 
019
020//import edu.unima.ky.parallel.ChannelFactory;
021//import edu.unima.ky.parallel.SocketChannel;
022
023
024/**
025 * Server for the distributed version of a list.
026 * @author Heinz Kredel
027 * TODO: redistribute list for late comming clients, removal of elements.
028 */
029public class DistributedListServer extends Thread {
030
031    private static final Logger logger = LogManager.getLogger(DistributedListServer.class);
032
033    public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99;
034
035
036    protected final ChannelFactory cf;
037
038    protected List<Broadcaster> servers;
039
040
041    private volatile boolean goon = true;
042
043    private volatile Thread mythread = null;
044
045
046    private Counter listElem = null;
047
048    protected final SortedMap<Counter,Object> theList;
049
050
051    /**
052     * Constructs a new DistributedListServer.
053     */ 
054
055    public DistributedListServer() {
056        this(DEFAULT_PORT);
057    }
058
059    /**
060     * DistributedListServer.
061     * @param port to run server on.
062     */
063    public DistributedListServer(int port) {
064        this( new ChannelFactory(port) );
065    }
066
067    /**
068     * DistributedListServer.
069     * @param cf ChannelFactory to use.
070     */
071    public DistributedListServer(ChannelFactory cf) {
072        listElem = new Counter(0);
073        this.cf = cf;
074        cf.init();
075        servers = new ArrayList<Broadcaster>();
076        theList = new TreeMap<Counter,Object>();
077    }
078
079
080    /**
081     * main.
082     * Usage: DistributedListServer &lt;port&gt;
083     */
084    public static void main(String[] args) throws InterruptedException {
085        int port = DEFAULT_PORT;
086        if ( args.length < 1 ) {
087            System.out.println("Usage: DistributedListServer <port>");
088        } else {
089            try {
090                port = Integer.parseInt( args[0] );
091            } catch (NumberFormatException e) {
092            }
093        }
094        DistributedListServer dls = new DistributedListServer(port);
095        dls.init();
096        dls.join();
097        // until CRTL-C
098    }
099
100
101    /**
102     * thread initialization and start.
103     */ 
104    public void init() {
105        this.start();
106    }
107
108
109    /**
110     * main server method.
111     */ 
112    @Override
113     public void run() {
114        SocketChannel channel = null;
115        Broadcaster s = null;
116        mythread = Thread.currentThread();
117        Entry e;
118        Object n;
119        Object o;
120        while (goon) {
121            // logger.debug("list server " + this + " go on");
122            try {
123                channel = cf.getChannel();
124                logger.debug("dls channel = "+channel);
125                if ( mythread.isInterrupted() ) {
126                    goon = false;
127                    //logger.info("list server " + this + " interrupted");
128                } else {
129                    s = new Broadcaster(channel,servers,listElem,theList);
130                    int ls = 0;
131                    synchronized (servers) {
132                        servers.add( s );
133                        ls = theList.size();
134                        s.start();
135                    }
136                    //logger.debug("server " + s + " started");
137                    if ( ls > 0 ) {
138                        logger.info("sending " + ls + " list elements");
139                        synchronized (theList) {
140                            Iterator it = theList.entrySet().iterator();
141                            for ( int i = 0; i < ls; i++ ) {
142                                e = (Entry)it.next();
143                                n = e.getKey();
144                                o = e.getValue();
145                                try {
146                                    s.sendChannel( n,o );
147                                } catch (IOException ioe) {
148                                    // stop s
149                                }
150                            }
151                        } 
152                    }
153                }
154            } catch (InterruptedException end) {
155                goon = false;
156                Thread.currentThread().interrupt();
157            }
158        }
159        //logger.debug("listserver " + this + " terminated");
160    }
161
162
163    /**
164     * terminate all servers.
165     */ 
166    public void terminate() {
167        goon = false;
168        logger.debug("terminating ListServer");
169        if ( cf != null ) cf.terminate();
170        if ( servers != null ) {
171            Iterator it = servers.iterator();
172            while ( it.hasNext() ) {
173                Broadcaster br = (Broadcaster) it.next();
174                br.closeChannel();
175                try { 
176                    while ( br.isAlive() ) {
177                        //System.out.print(".");
178                        br.interrupt(); 
179                        br.join(100);
180                    }
181                    //logger.debug("server " + br + " terminated");
182                } catch (InterruptedException e) { 
183                    Thread.currentThread().interrupt();
184                }
185            }
186            servers = null;
187        }
188        logger.debug("Broadcasters terminated");
189        if ( mythread == null ) return;
190        try { 
191            while ( mythread.isAlive() ) {
192                // System.out.print("-");
193                mythread.interrupt(); 
194                mythread.join(100);
195            }
196            //logger.debug("server " + mythread + " terminated");
197        } catch (InterruptedException e) { 
198            Thread.currentThread().interrupt();
199        }
200        mythread = null;
201        logger.debug("ListServer terminated");
202    }
203
204
205    /**
206     * number of servers.
207     */ 
208    public int size() {
209        if ( servers == null ) {
210            return -1;
211        }
212        return servers.size();
213    }
214
215}
216
217
218/**
219 * Class for holding the list index used as key in TreeMap.
220 * Implemented since Integer has no add() method.
221 * Must implement Comparable so that TreeMap works with correct ordering.
222 */ 
223
224class Counter implements Serializable, Comparable<Counter> {
225
226    private int value;
227
228
229    /**
230     * Counter.
231     */
232    public Counter() {
233        this(0);
234    }
235
236
237    /**
238     * Counter.
239     * @param v
240     */
241    public Counter(int v) {
242        value = v;
243    }
244
245
246    /**
247     * intValue.
248     * @return the value.
249     */
250    public int intValue() {
251        return value;
252    }
253
254
255    /**
256     * add.
257     * @param v
258     */
259    public void add(int v) { // synchronized elsewhere
260        value += v;
261    }
262
263
264    /**
265     * equals.
266     * @param ob an Object.
267     * @return true if this is equal to o, else false.
268     */
269    @Override
270    public boolean equals(Object ob) {
271        if ( ! (ob instanceof Counter) ) {
272           return false;
273        }
274        return 0 == compareTo( (Counter)ob );
275    }
276
277
278    /**
279     * compareTo.
280     * @param c a Counter.
281     * @return 1 if (this &lt; c), 0 if (this == c), -1 if (this &gt; c).
282     */
283    public int compareTo(Counter c) {
284        int x = c.intValue();
285        if ( value > x ) { 
286            return 1;
287        }
288        if ( value < x ) { 
289            return -1;
290        }
291        return 0;
292    }
293
294
295    /**
296     * Hash code for this Counter.
297     * @see java.lang.Object#hashCode()
298     */
299    @Override
300    public int hashCode() {
301        return value;
302    }
303
304
305    /**
306     * toString.
307     */  
308    @Override
309     public String toString() {
310        return "Counter("+value+")";
311    }
312
313}
314
315
316/**
317 * Thread for broadcasting all incoming objects to the list clients.
318 */ 
319
320class Broadcaster extends Thread /*implements Runnable*/ {
321
322    private static final Logger logger = LogManager.getLogger(Broadcaster.class);
323    private final SocketChannel channel;
324    private final List bcaster;
325    private Counter listElem;
326    private final SortedMap<Counter,Object> theList;
327
328
329    /**
330     * Broadcaster.
331     * @param s SocketChannel to use.
332     * @param p list of broadcasters.
333     * @param le counter
334     * @param sm SortedMap with counter value pairs.
335     */
336    public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) {
337        channel = s;
338        bcaster = p;
339        listElem = le;
340        theList = sm;
341    } 
342
343
344    /**
345     * closeChannel.
346     */
347    public void closeChannel() {
348        channel.close();
349    }
350
351
352    /**
353     * sendChannel.
354     * @param n counter.
355     * @param o value.
356     * @throws IOException
357     */
358    public void sendChannel(Object n, Object o) throws IOException {
359        synchronized (channel) {
360            channel.send(n);
361            channel.send(o);
362        }
363    }
364
365
366    /**
367     * broadcast.
368     * @param o object to store and send.
369     */
370    public void broadcast(Object o) {
371        Counter li = null;
372        synchronized (listElem) {
373            listElem.add(1);
374            li = new Counter( listElem.intValue() );
375        }
376        synchronized (theList) {
377            theList.put( li, o );
378        }
379        synchronized (bcaster) {
380            Iterator it = bcaster.iterator();
381            while ( it.hasNext() ) {
382                Broadcaster br = (Broadcaster) it.next();
383                try {
384                    br.sendChannel(li,o);
385                    //System.out.println("bcast: "+o+" to "+x.channel);
386                } catch (IOException e) {
387                    try { 
388                        br.closeChannel();
389                        while ( br.isAlive() ) {
390                            br.interrupt(); 
391                            br.join(100);
392                        }
393                    } catch (InterruptedException u) { 
394                        Thread.currentThread().interrupt();
395                    }
396                    bcaster.remove( br );
397                }
398            }
399        }
400    }
401
402
403    /**
404     * run.
405     */
406    @Override
407     public void run() {
408        Object o;
409        boolean goon = true;
410        while (goon) {
411            try {
412                o = channel.receive();
413                //System.out.println("receive: "+o+" from "+channel);
414                broadcast(o);
415                if ( this.isInterrupted() ) {
416                    goon = false;
417                }
418            } catch (IOException e) {
419                goon = false;
420            } catch (ClassNotFoundException e) {
421                goon = false;
422                e.printStackTrace();
423
424            }
425        }
426        logger.debug("broadcaster terminated "+this);
427        channel.close();
428    }
429
430
431    /**
432     * toString.
433     * @return a string representation of this.
434     */
435    @Override
436     public String toString() {
437        return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")";
438    }
439
440}