001 /*
002 * $Id: DistributedListServer.java 3279 2010-08-21 20:18:25Z kredel $
003 */
004
005 package edu.jas.util;
006
007 import java.io.IOException;
008 import java.io.Serializable;
009
010 import java.util.Iterator;
011 import java.util.List;
012 import java.util.ArrayList;
013 import java.util.SortedMap;
014 import java.util.TreeMap;
015 import java.util.Map.Entry;
016
017 import org.apache.log4j.Logger;
018
019 //import edu.unima.ky.parallel.ChannelFactory;
020 //import edu.unima.ky.parallel.SocketChannel;
021
022
023 /**
024 * Server for the distributed version of a list.
025 * @author Heinz Kredel
026 * @todo redistribute list for late comming clients, removal of elements.
027 */
028 public class DistributedListServer extends Thread {
029
030 private static final Logger logger = Logger.getLogger(DistributedListServer.class);
031
032 public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99;
033 protected final ChannelFactory cf;
034
035 protected List<Broadcaster> servers;
036
037 private boolean goon = true;
038 private Thread mythread = null;
039
040 private Counter listElem = null;
041 protected final SortedMap<Counter,Object> theList;
042
043
044 /**
045 * Constructs a new DistributedListServer.
046 */
047
048 public DistributedListServer() {
049 this(DEFAULT_PORT);
050 }
051
052 /**
053 * DistributedListServer.
054 * @param port to run server on.
055 */
056 public DistributedListServer(int port) {
057 this( new ChannelFactory(port) );
058 }
059
060 /**
061 * DistributedListServer.
062 * @param cf ChannelFactory to use.
063 */
064 public DistributedListServer(ChannelFactory cf) {
065 listElem = new Counter(0);
066 this.cf = cf;
067 cf.init();
068 servers = new ArrayList<Broadcaster>();
069 theList = new TreeMap<Counter,Object>();
070 }
071
072
073 /**
074 * main.
075 * Usage: DistributedListServer <port>
076 */
077 public static void main(String[] args) {
078 int port = DEFAULT_PORT;
079 if ( args.length < 1 ) {
080 System.out.println("Usage: DistributedListServer <port>");
081 } else {
082 try {
083 port = Integer.parseInt( args[0] );
084 } catch (NumberFormatException e) {
085 }
086 }
087 (new DistributedListServer(port)).run();
088 // until CRTL-C
089 }
090
091
092 /**
093 * thread initialization and start.
094 */
095 public void init() {
096 this.start();
097 }
098
099
100 /**
101 * main server method.
102 */
103 @Override
104 public void run() {
105 SocketChannel channel = null;
106 Broadcaster s = null;
107 mythread = Thread.currentThread();
108 Entry e;
109 Object n;
110 Object o;
111 while (goon) {
112 // logger.debug("list server " + this + " go on");
113 try {
114 channel = cf.getChannel();
115 logger.debug("dls channel = "+channel);
116 if ( mythread.isInterrupted() ) {
117 goon = false;
118 //logger.info("list server " + this + " interrupted");
119 } else {
120 s = new Broadcaster(channel,servers,listElem,theList);
121 int ls = 0;
122 synchronized (servers) {
123 servers.add( s );
124 ls = theList.size();
125 s.start();
126 }
127 //logger.debug("server " + s + " started");
128 if ( ls > 0 ) {
129 logger.info("sending " + ls + " list elements");
130 synchronized (theList) {
131 Iterator it = theList.entrySet().iterator();
132 for ( int i = 0; i < ls; i++ ) {
133 e = (Entry)it.next();
134 n = e.getKey();
135 o = e.getValue();
136 try {
137 s.sendChannel( n,o );
138 } catch (IOException ioe) {
139 // stop s
140 }
141 }
142 }
143 }
144 }
145 } catch (InterruptedException end) {
146 goon = false;
147 Thread.currentThread().interrupt();
148 }
149 }
150 //logger.debug("listserver " + this + " terminated");
151 }
152
153
154 /**
155 * terminate all servers.
156 */
157 public void terminate() {
158 goon = false;
159 logger.debug("terminating ListServer");
160 if ( cf != null ) cf.terminate();
161 if ( servers != null ) {
162 Iterator it = servers.iterator();
163 while ( it.hasNext() ) {
164 Broadcaster br = (Broadcaster) it.next();
165 br.closeChannel();
166 try {
167 while ( br.isAlive() ) {
168 //System.out.print(".");
169 br.interrupt();
170 br.join(100);
171 }
172 //logger.debug("server " + br + " terminated");
173 } catch (InterruptedException e) {
174 Thread.currentThread().interrupt();
175 }
176 }
177 servers = null;
178 }
179 logger.debug("Broadcasters terminated");
180 if ( mythread == null ) return;
181 try {
182 while ( mythread.isAlive() ) {
183 // System.out.print("-");
184 mythread.interrupt();
185 mythread.join(100);
186 }
187 //logger.debug("server " + mythread + " terminated");
188 } catch (InterruptedException e) {
189 Thread.currentThread().interrupt();
190 }
191 mythread = null;
192 logger.debug("ListServer terminated");
193 }
194
195
196 /**
197 * number of servers.
198 */
199 public int size() {
200 return servers.size();
201 }
202
203 }
204
205
206 /**
207 * Class for holding the list index used a key in TreeMap.
208 * Implemented since Integer has no add() method.
209 * Must implement Comparable so that TreeMap works with correct ordering.
210 */
211
212 class Counter implements Serializable, Comparable<Counter> {
213
214 private int value;
215
216
217 /**
218 * Counter.
219 */
220 public Counter() {
221 this(0);
222 }
223
224
225 /**
226 * Counter.
227 * @param v
228 */
229 public Counter(int v) {
230 value = v;
231 }
232
233
234 /**
235 * intValue.
236 * @return the value.
237 */
238 public int intValue() {
239 return value;
240 }
241
242
243 /**
244 * add.
245 * @param v
246 */
247 public void add(int v) { // synchronized elsewhere
248 value += v;
249 }
250
251
252 /**
253 * equals.
254 * @param ob an Object.
255 * @return true if this is equal to o, else false.
256 */
257 @Override
258 public boolean equals(Object ob) {
259 if ( ! (ob instanceof Counter) ) {
260 return false;
261 }
262 return 0 == compareTo( (Counter)ob );
263 }
264
265
266 /**
267 * compareTo.
268 * @param c a Counter.
269 * @return 1 if (this < c), 0 if (this == c), -1 if (this > c).
270 */
271 public int compareTo(Counter c) {
272 int x = c.intValue();
273 if ( value > x ) {
274 return 1;
275 }
276 if ( value < x ) {
277 return -1;
278 }
279 return 0;
280 }
281
282
283 /**
284 * toString.
285 */
286 @Override
287 public String toString() {
288 return "Counter("+value+")";
289 }
290
291 }
292
293
294 /**
295 * Thread for broadcasting all incoming objects to the list clients.
296 */
297
298 class Broadcaster extends Thread /*implements Runnable*/ {
299
300 private static final Logger logger = Logger.getLogger(Broadcaster.class);
301 private final SocketChannel channel;
302 private final List bcaster;
303 private Counter listElem;
304 private final SortedMap<Counter,Object> theList;
305
306
307 /**
308 * Broadcaster.
309 * @param s SocketChannel to use.
310 * @param p list of broadcasters.
311 * @param le counter
312 * @param sm SortedMap with counter value pairs.
313 */
314 public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) {
315 channel = s;
316 bcaster = p;
317 listElem = le;
318 theList = sm;
319 }
320
321
322 /**
323 * closeChannel.
324 */
325 public void closeChannel() {
326 channel.close();
327 }
328
329
330 /**
331 * sendChannel.
332 * @param n counter.
333 * @param o value.
334 * @throws IOException
335 */
336 public void sendChannel(Object n, Object o) throws IOException {
337 synchronized (channel) {
338 channel.send(n);
339 channel.send(o);
340 }
341 }
342
343
344 /**
345 * broadcast.
346 * @param o object to store and send.
347 */
348 public void broadcast(Object o) {
349 Counter li = null;
350 synchronized (listElem) {
351 listElem.add(1);
352 li = new Counter( listElem.intValue() );
353 }
354 synchronized (theList) {
355 theList.put( li, o );
356 }
357 synchronized (bcaster) {
358 Iterator it = bcaster.iterator();
359 while ( it.hasNext() ) {
360 Broadcaster br = (Broadcaster) it.next();
361 try {
362 br.sendChannel(li,o);
363 //System.out.println("bcast: "+o+" to "+x.channel);
364 } catch (IOException e) {
365 try {
366 br.closeChannel();
367 while ( br.isAlive() ) {
368 br.interrupt();
369 br.join(100);
370 }
371 } catch (InterruptedException u) {
372 Thread.currentThread().interrupt();
373 }
374 bcaster.remove( br );
375 }
376 }
377 }
378 }
379
380
381 /**
382 * run.
383 */
384 @Override
385 public void run() {
386 Object o;
387 boolean goon = true;
388 while (goon) {
389 try {
390 o = channel.receive();
391 //System.out.println("receive: "+o+" from "+channel);
392 broadcast(o);
393 if ( this.isInterrupted() ) {
394 goon = false;
395 }
396 } catch (IOException e) {
397 goon = false;
398 } catch (ClassNotFoundException e) {
399 goon = false;
400 e.printStackTrace();
401
402 }
403 }
404 logger.debug("broadcaster terminated "+this);
405 channel.close();
406 }
407
408
409 /**
410 * toString.
411 * @return a string representation of this.
412 */
413 @Override
414 public String toString() {
415 return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")";
416 }
417
418 }