001/*
002 * $Id: DistributedList.java 4096 2012-08-12 11:56:27Z kredel $
003 */
004
005package edu.jas.util;
006
007import java.io.IOException;
008import java.util.Iterator;
009//import java.util.Collection;
010import java.util.List;
011import java.util.ArrayList;
012import java.util.SortedMap;
013import java.util.TreeMap;
014
015import org.apache.log4j.Logger;
016
017//import edu.unima.ky.parallel.ChannelFactory;
018//import edu.unima.ky.parallel.SocketChannel;
019
020
021/**
022 * Distributed version of a List.
023 * Implemented with a SortedMap / TreeMap to keep the sequence order of elements.
024 * @author Heinz Kredel
025 */
026
027public class DistributedList /* implements List not jet */ {
028
029    private static final Logger logger = Logger.getLogger(DistributedList.class);
030
031    protected final SortedMap<Counter,Object> theList;
032    protected final ChannelFactory cf;
033    protected SocketChannel channel = null;
034    protected Listener listener = null;
035
036
037    /**
038     * Constructor for DistributedList.
039     * @param host name or IP of server host.
040     */ 
041    public DistributedList(String host) {
042        this(host,DistributedListServer.DEFAULT_PORT);
043    }
044
045
046    /**
047     * Constructor for DistributedList.
048     * @param host name or IP of server host.
049     * @param port of server.
050     */
051    public DistributedList(String host,int port) {
052        this(new ChannelFactory(port+1),host,port);
053    }
054
055
056    /**
057     * Constructor for DistributedList.
058     * @param cf ChannelFactory to use.
059     * @param host name or IP of server host.
060     * @param port of server.
061     */
062    public DistributedList(ChannelFactory cf,String host,int port) {
063        this.cf = cf;
064        cf.init();
065        try {
066            channel = cf.getChannel(host,port);
067        } catch (IOException e) {
068            e.printStackTrace();
069        }
070        logger.debug("dl channel = " + channel);
071        theList = new TreeMap<Counter,Object>();
072    }
073
074
075    /**
076     * Constructor for DistributedList.
077     * @param sc SocketChannel to use.
078     */
079    public DistributedList(SocketChannel sc) {
080        cf = null;
081        channel = sc;
082        theList = new TreeMap<Counter,Object>();
083    }
084
085
086    /**
087     * List thread initialization and start.
088     */ 
089    public void init() {
090        listener = new Listener(channel,theList);
091        listener.start();
092    }
093
094
095    /**
096     * Terminate the list thread.
097     */ 
098    public void terminate() {
099        if ( cf != null ) {
100            cf.terminate();
101            logger.warn("terminating " + cf);
102        }
103        if ( channel != null ) {
104            channel.close();
105        }
106        //theList.clear();
107        if ( listener == null ) { 
108            return;
109        }
110        logger.debug("terminate " + listener);
111        listener.setDone(); 
112        try { 
113            while ( listener.isAlive() ) {
114                listener.interrupt(); 
115                listener.join(100);
116            }
117        } catch (InterruptedException u) { 
118            Thread.currentThread().interrupt();
119        }
120        listener = null;
121    }
122
123
124    /**
125     * Get the internal list, convert from Collection.
126     */ 
127    public List<Object> getList() {
128        return new ArrayList<Object>( theList.values() );
129    }
130
131
132    /**
133     * Size of the (local) list.
134     */ 
135    public int size() {
136        return theList.size();
137    }
138
139
140    /**
141     * Add object to the list and distribute to other lists.
142     * Blocks until the object is send and received from the server
143     * (actually it blocks until some object is received).
144     * @param o
145     */
146    public synchronized void add(Object o) {
147        int sz1 = theList.size() + 1;
148        try {
149            channel.send(o);
150            //System.out.println("send: "+o+" @ "+listener);
151        } catch (IOException e) {
152            e.printStackTrace();
153        }
154        try {
155            while ( theList.size() < sz1 ) {
156                this.wait(100);
157            }
158        } catch (InterruptedException e) {
159            Thread.currentThread().interrupt();
160            e.printStackTrace();
161        }
162    }
163
164
165    /**
166     * Clear the List.
167     * caveat: must be called on all clients.
168     */ 
169    public synchronized void clear() {
170        theList.clear();
171    }
172
173
174    /**
175     * Is the List empty?
176     */ 
177    public boolean isEmpty() {
178        return theList.isEmpty();
179    }
180
181
182    /**
183     * List iterator.
184     */ 
185    public Iterator iterator() {
186        return theList.values().iterator();
187    }
188
189}
190
191
192/**
193 * Thread to comunicate with the list server.
194 */
195
196class Listener extends Thread {
197
198    private SocketChannel channel;
199    private SortedMap<Counter,Object> theList;
200    private boolean goon;
201
202
203    Listener(SocketChannel s, SortedMap<Counter,Object> list) {
204        channel = s;
205        theList = list;
206    } 
207
208
209    void setDone() {
210        goon = false;
211    }
212
213
214    @Override
215    public void run() {
216        Counter n;
217        Object o;
218        goon = true;
219        while (goon) {
220            n = null;
221            o = null;
222            try {
223                n = (Counter) channel.receive();
224                if ( this.isInterrupted() ) {
225                    goon = false;
226                } else {
227                    o = channel.receive();
228                    //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread());
229                    if ( this.isInterrupted() ) {
230                        goon = false;
231                    }
232                    theList.put(n,o);
233                }
234            } catch (IOException e) {
235                goon = false;
236            } catch (ClassNotFoundException e) {
237                e.printStackTrace();
238                goon = false;
239            }
240        }
241    }
242
243}