001/*
002 * $Id$
003 */
004
005package edu.jas.util;
006
007import java.io.IOException;
008import java.util.Iterator;
009//import java.util.Collection;
010import java.util.List;
011import java.util.ArrayList;
012import java.util.SortedMap;
013import java.util.TreeMap;
014
015import org.apache.logging.log4j.Logger;
016import org.apache.logging.log4j.LogManager; 
017
018//import edu.unima.ky.parallel.ChannelFactory;
019//import edu.unima.ky.parallel.SocketChannel;
020
021
022/**
023 * Distributed version of a List.
024 * Implemented with a SortedMap / TreeMap to keep the sequence order of elements.
025 * @author Heinz Kredel
026 */
027
028public class DistributedList /* implements List not jet */ {
029
030    private static final Logger logger = LogManager.getLogger(DistributedList.class);
031
032
033    protected final SortedMap<Counter,Object> theList;
034
035    protected final ChannelFactory cf;
036
037    protected SocketChannel channel = null;
038
039    protected Listener listener = null;
040
041
042    /**
043     * Constructor for DistributedList.
044     * @param host name or IP of server host.
045     */ 
046    public DistributedList(String host) {
047        this(host,DistributedListServer.DEFAULT_PORT);
048    }
049
050
051    /**
052     * Constructor for DistributedList.
053     * @param host name or IP of server host.
054     * @param port of server.
055     */
056    public DistributedList(String host,int port) {
057        this(new ChannelFactory(port+1),host,port);
058    }
059
060
061    /**
062     * Constructor for DistributedList.
063     * @param cf ChannelFactory to use.
064     * @param host name or IP of server host.
065     * @param port of server.
066     */
067    public DistributedList(ChannelFactory cf,String host,int port) {
068        this.cf = cf;
069        cf.init();
070        try {
071            channel = cf.getChannel(host,port);
072        } catch (IOException e) {
073            e.printStackTrace();
074        }
075        logger.debug("dl channel = {}", channel);
076        theList = new TreeMap<Counter,Object>();
077    }
078
079
080    /**
081     * Constructor for DistributedList.
082     * @param sc SocketChannel to use.
083     */
084    public DistributedList(SocketChannel sc) {
085        cf = null;
086        channel = sc;
087        theList = new TreeMap<Counter,Object>();
088    }
089
090
091    /**
092     * List thread initialization and start.
093     */ 
094    public void init() {
095        listener = new Listener(channel,theList);
096        listener.start();
097    }
098
099
100    /**
101     * Terminate the list thread.
102     */ 
103    public void terminate() {
104        if ( cf != null ) {
105            cf.terminate();
106            //logger.warn("terminating {}", cf);
107        }
108        if ( channel != null ) {
109            channel.close();
110        }
111        //theList.clear();
112        if ( listener == null ) { 
113            return;
114        }
115        logger.debug("terminate {}", listener);
116        listener.setDone(); 
117        try { 
118            while ( listener.isAlive() ) {
119                listener.interrupt(); 
120                listener.join(100);
121            }
122        } catch (InterruptedException u) { 
123            Thread.currentThread().interrupt();
124        }
125        listener = null;
126    }
127
128
129    /**
130     * Get the internal list, convert from Collection.
131     */ 
132    public List<Object> getList() {
133        return new ArrayList<Object>( theList.values() );
134    }
135
136
137    /**
138     * Size of the (local) list.
139     */ 
140    public int size() {
141        return theList.size();
142    }
143
144
145    /**
146     * Add object to the list and distribute to other lists.
147     * Blocks until the object is send and received from the server
148     * (actually it blocks until some object is received).
149     * @param o
150     */
151    public synchronized void add(Object o) {
152        int sz1 = theList.size() + 1;
153        try {
154            channel.send(o);
155            //System.out.println("send: "+o+" @ "+listener);
156        } catch (IOException e) {
157            e.printStackTrace();
158        }
159        try {
160            while ( theList.size() < sz1 ) {
161                this.wait(100);
162            }
163        } catch (InterruptedException e) {
164            Thread.currentThread().interrupt();
165            e.printStackTrace();
166        }
167    }
168
169
170    /**
171     * Clear the List.
172     * caveat: must be called on all clients.
173     */ 
174    public synchronized void clear() {
175        theList.clear();
176    }
177
178
179    /**
180     * Is the List empty?
181     */ 
182    public boolean isEmpty() {
183        return theList.isEmpty();
184    }
185
186
187    /**
188     * List iterator.
189     */ 
190    public Iterator iterator() {
191        return theList.values().iterator();
192    }
193
194}
195
196
197/**
198 * Thread to communicate with the list server.
199 */
200
201class Listener extends Thread {
202
203    private SocketChannel channel;
204
205    private SortedMap<Counter,Object> theList;
206
207    private volatile boolean goon;
208
209
210    Listener(SocketChannel s, SortedMap<Counter,Object> list) {
211        channel = s;
212        theList = list;
213    } 
214
215
216    void setDone() {
217        goon = false;
218    }
219
220
221    @Override
222    public void run() {
223        Counter n;
224        Object o;
225        goon = true;
226        while (goon) {
227            n = null;
228            o = null;
229            try {
230                n = (Counter) channel.receive();
231                if ( this.isInterrupted() ) {
232                    goon = false;
233                } else {
234                    o = channel.receive();
235                    //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread());
236                    if ( this.isInterrupted() ) {
237                        goon = false;
238                    }
239                    theList.put(n,o);
240                }
241            } catch (IOException e) {
242                goon = false;
243            } catch (ClassNotFoundException e) {
244                e.printStackTrace();
245                goon = false;
246            }
247        }
248    }
249
250}