001    /*
002     * $Id: DistributedList.java 3279 2010-08-21 20:18:25Z kredel $
003     */
004    
005    package edu.jas.util;
006    
007    import java.io.IOException;
008    import java.util.Iterator;
009    //import java.util.Collection;
010    import java.util.List;
011    import java.util.ArrayList;
012    import java.util.SortedMap;
013    import java.util.TreeMap;
014    
015    import org.apache.log4j.Logger;
016    
017    //import edu.unima.ky.parallel.ChannelFactory;
018    //import edu.unima.ky.parallel.SocketChannel;
019    
020    
021    /**
022     * Distributed version of a List.
023     * Implemented with a SortedMap / TreeMap to keep the sequence order of elements.
024     * @author Heinz Kredel
025     */
026    
027    public class DistributedList /* implements List not jet */ {
028    
029        private static final Logger logger = Logger.getLogger(DistributedList.class);
030    
031        protected final SortedMap<Counter,Object> theList;
032        protected final ChannelFactory cf;
033        protected SocketChannel channel = null;
034        protected Listener listener = null;
035    
036    
037        /**
038         * Constructor for DistributedList.
039         * @param host name or IP of server host.
040         */ 
041        public DistributedList(String host) {
042            this(host,DistributedListServer.DEFAULT_PORT);
043        }
044    
045    
046        /**
047         * Constructor for DistributedList.
048         * @param host name or IP of server host.
049         * @param port of server.
050         */
051        public DistributedList(String host,int port) {
052            this(new ChannelFactory(port+1),host,port);
053        }
054    
055    
056        /**
057         * Constructor for DistributedList.
058         * @param cf ChannelFactory to use.
059         * @param host name or IP of server host.
060         * @param port of server.
061         */
062        public DistributedList(ChannelFactory cf,String host,int port) {
063            this.cf = cf;
064            cf.init();
065            try {
066                channel = cf.getChannel(host,port);
067            } catch (IOException e) {
068                e.printStackTrace();
069            }
070            logger.debug("dl channel = " + channel);
071            theList = new TreeMap<Counter,Object>();
072            listener = new Listener(channel,theList);
073            listener.start();
074        }
075    
076    
077        /**
078         * Constructor for DistributedList.
079         * @param sc SocketChannel to use.
080         */
081        public DistributedList(SocketChannel sc) {
082            cf = null;
083            channel = sc;
084            theList = new TreeMap<Counter,Object>();
085            listener = new Listener(channel,theList);
086            listener.start();
087        }
088    
089    
090    /**
091     * Get the internal list, convert from Collection.
092     */ 
093        public List<Object> getList() {
094            return new ArrayList<Object>( theList.values() );
095        }
096    
097    
098    /**
099     * Size of the (local) list.
100     */ 
101        public int size() {
102            return theList.size();
103        }
104    
105    
106    /**
107     * Add object to the list and distribute to other lists.
108     * Blocks until the object is send and received from the server
109     * (actually it blocks until some object is received).
110     * @param o
111     */
112        public synchronized void add(Object o) {
113            int sz1 = theList.size() + 1;
114            try {
115                channel.send(o);
116                //System.out.println("send: "+o+" @ "+listener);
117            } catch (IOException e) {
118                e.printStackTrace();
119            }
120            try {
121                while ( theList.size() < sz1 ) {
122                    this.wait(100);
123                }
124            } catch (InterruptedException e) {
125                Thread.currentThread().interrupt();
126                e.printStackTrace();
127            }
128        }
129    
130    
131    /**
132     * Terminate the list thread.
133     */ 
134        public void terminate() {
135            if ( cf != null ) {
136               cf.terminate();
137            }
138            if ( channel != null ) {
139               channel.close();
140            }
141            //theList.clear();
142            if ( listener == null ) { 
143               return;
144            }
145            logger.debug("terminate " + listener);
146            listener.setDone(); 
147            try { 
148                 while ( listener.isAlive() ) {
149                         listener.interrupt(); 
150                         listener.join(100);
151                 }
152            } catch (InterruptedException u) { 
153                 Thread.currentThread().interrupt();
154            }
155            listener = null;
156        }
157    
158    
159    /**
160     * Clear the List.
161     * caveat: must be called on all clients.
162     */ 
163        public synchronized void clear() {
164            theList.clear();
165        }
166    
167    
168    /**
169     * Is the List empty?
170     */ 
171        public boolean isEmpty() {
172            return theList.isEmpty();
173        }
174    
175    
176    /**
177     * List iterator.
178     */ 
179        public Iterator iterator() {
180            return theList.values().iterator();
181        }
182    
183    }
184    
185    
186    /**
187     * Thread to comunicate with the list server.
188     */
189    
190    class Listener extends Thread {
191    
192        private SocketChannel channel;
193        private SortedMap<Counter,Object> theList;
194        private boolean goon;
195    
196    
197        Listener(SocketChannel s, SortedMap<Counter,Object> list) {
198            channel = s;
199            theList = list;
200        } 
201    
202    
203        void setDone() {
204            goon = false;
205        }
206    
207        @Override
208         public void run() {
209            Counter n;
210            Object o;
211            goon = true;
212            while (goon) {
213                n = null;
214                o = null;
215                try {
216                    n = (Counter) channel.receive();
217                    if ( this.isInterrupted() ) {
218                       goon = false;
219                    } else {
220                       o = channel.receive();
221                       //System.out.println("receive("+n+","+o+" @ "+Thread.currentThread());
222                       if ( this.isInterrupted() ) {
223                          goon = false;
224                       }
225                       theList.put(n,o);
226                    }
227                } catch (IOException e) {
228                       goon = false;
229                } catch (ClassNotFoundException e) {
230                       e.printStackTrace();
231                       goon = false;
232                }
233            }
234        }
235    
236    }