001 /* 002 * $Id: DistributedListServer.java 3279 2010-08-21 20:18:25Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 import java.io.IOException; 008 import java.io.Serializable; 009 010 import java.util.Iterator; 011 import java.util.List; 012 import java.util.ArrayList; 013 import java.util.SortedMap; 014 import java.util.TreeMap; 015 import java.util.Map.Entry; 016 017 import org.apache.log4j.Logger; 018 019 //import edu.unima.ky.parallel.ChannelFactory; 020 //import edu.unima.ky.parallel.SocketChannel; 021 022 023 /** 024 * Server for the distributed version of a list. 025 * @author Heinz Kredel 026 * @todo redistribute list for late comming clients, removal of elements. 027 */ 028 public class DistributedListServer extends Thread { 029 030 private static final Logger logger = Logger.getLogger(DistributedListServer.class); 031 032 public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99; 033 protected final ChannelFactory cf; 034 035 protected List<Broadcaster> servers; 036 037 private boolean goon = true; 038 private Thread mythread = null; 039 040 private Counter listElem = null; 041 protected final SortedMap<Counter,Object> theList; 042 043 044 /** 045 * Constructs a new DistributedListServer. 046 */ 047 048 public DistributedListServer() { 049 this(DEFAULT_PORT); 050 } 051 052 /** 053 * DistributedListServer. 054 * @param port to run server on. 055 */ 056 public DistributedListServer(int port) { 057 this( new ChannelFactory(port) ); 058 } 059 060 /** 061 * DistributedListServer. 062 * @param cf ChannelFactory to use. 063 */ 064 public DistributedListServer(ChannelFactory cf) { 065 listElem = new Counter(0); 066 this.cf = cf; 067 cf.init(); 068 servers = new ArrayList<Broadcaster>(); 069 theList = new TreeMap<Counter,Object>(); 070 } 071 072 073 /** 074 * main. 075 * Usage: DistributedListServer <port> 076 */ 077 public static void main(String[] args) { 078 int port = DEFAULT_PORT; 079 if ( args.length < 1 ) { 080 System.out.println("Usage: DistributedListServer <port>"); 081 } else { 082 try { 083 port = Integer.parseInt( args[0] ); 084 } catch (NumberFormatException e) { 085 } 086 } 087 (new DistributedListServer(port)).run(); 088 // until CRTL-C 089 } 090 091 092 /** 093 * thread initialization and start. 094 */ 095 public void init() { 096 this.start(); 097 } 098 099 100 /** 101 * main server method. 102 */ 103 @Override 104 public void run() { 105 SocketChannel channel = null; 106 Broadcaster s = null; 107 mythread = Thread.currentThread(); 108 Entry e; 109 Object n; 110 Object o; 111 while (goon) { 112 // logger.debug("list server " + this + " go on"); 113 try { 114 channel = cf.getChannel(); 115 logger.debug("dls channel = "+channel); 116 if ( mythread.isInterrupted() ) { 117 goon = false; 118 //logger.info("list server " + this + " interrupted"); 119 } else { 120 s = new Broadcaster(channel,servers,listElem,theList); 121 int ls = 0; 122 synchronized (servers) { 123 servers.add( s ); 124 ls = theList.size(); 125 s.start(); 126 } 127 //logger.debug("server " + s + " started"); 128 if ( ls > 0 ) { 129 logger.info("sending " + ls + " list elements"); 130 synchronized (theList) { 131 Iterator it = theList.entrySet().iterator(); 132 for ( int i = 0; i < ls; i++ ) { 133 e = (Entry)it.next(); 134 n = e.getKey(); 135 o = e.getValue(); 136 try { 137 s.sendChannel( n,o ); 138 } catch (IOException ioe) { 139 // stop s 140 } 141 } 142 } 143 } 144 } 145 } catch (InterruptedException end) { 146 goon = false; 147 Thread.currentThread().interrupt(); 148 } 149 } 150 //logger.debug("listserver " + this + " terminated"); 151 } 152 153 154 /** 155 * terminate all servers. 156 */ 157 public void terminate() { 158 goon = false; 159 logger.debug("terminating ListServer"); 160 if ( cf != null ) cf.terminate(); 161 if ( servers != null ) { 162 Iterator it = servers.iterator(); 163 while ( it.hasNext() ) { 164 Broadcaster br = (Broadcaster) it.next(); 165 br.closeChannel(); 166 try { 167 while ( br.isAlive() ) { 168 //System.out.print("."); 169 br.interrupt(); 170 br.join(100); 171 } 172 //logger.debug("server " + br + " terminated"); 173 } catch (InterruptedException e) { 174 Thread.currentThread().interrupt(); 175 } 176 } 177 servers = null; 178 } 179 logger.debug("Broadcasters terminated"); 180 if ( mythread == null ) return; 181 try { 182 while ( mythread.isAlive() ) { 183 // System.out.print("-"); 184 mythread.interrupt(); 185 mythread.join(100); 186 } 187 //logger.debug("server " + mythread + " terminated"); 188 } catch (InterruptedException e) { 189 Thread.currentThread().interrupt(); 190 } 191 mythread = null; 192 logger.debug("ListServer terminated"); 193 } 194 195 196 /** 197 * number of servers. 198 */ 199 public int size() { 200 return servers.size(); 201 } 202 203 } 204 205 206 /** 207 * Class for holding the list index used a key in TreeMap. 208 * Implemented since Integer has no add() method. 209 * Must implement Comparable so that TreeMap works with correct ordering. 210 */ 211 212 class Counter implements Serializable, Comparable<Counter> { 213 214 private int value; 215 216 217 /** 218 * Counter. 219 */ 220 public Counter() { 221 this(0); 222 } 223 224 225 /** 226 * Counter. 227 * @param v 228 */ 229 public Counter(int v) { 230 value = v; 231 } 232 233 234 /** 235 * intValue. 236 * @return the value. 237 */ 238 public int intValue() { 239 return value; 240 } 241 242 243 /** 244 * add. 245 * @param v 246 */ 247 public void add(int v) { // synchronized elsewhere 248 value += v; 249 } 250 251 252 /** 253 * equals. 254 * @param ob an Object. 255 * @return true if this is equal to o, else false. 256 */ 257 @Override 258 public boolean equals(Object ob) { 259 if ( ! (ob instanceof Counter) ) { 260 return false; 261 } 262 return 0 == compareTo( (Counter)ob ); 263 } 264 265 266 /** 267 * compareTo. 268 * @param c a Counter. 269 * @return 1 if (this < c), 0 if (this == c), -1 if (this > c). 270 */ 271 public int compareTo(Counter c) { 272 int x = c.intValue(); 273 if ( value > x ) { 274 return 1; 275 } 276 if ( value < x ) { 277 return -1; 278 } 279 return 0; 280 } 281 282 283 /** 284 * toString. 285 */ 286 @Override 287 public String toString() { 288 return "Counter("+value+")"; 289 } 290 291 } 292 293 294 /** 295 * Thread for broadcasting all incoming objects to the list clients. 296 */ 297 298 class Broadcaster extends Thread /*implements Runnable*/ { 299 300 private static final Logger logger = Logger.getLogger(Broadcaster.class); 301 private final SocketChannel channel; 302 private final List bcaster; 303 private Counter listElem; 304 private final SortedMap<Counter,Object> theList; 305 306 307 /** 308 * Broadcaster. 309 * @param s SocketChannel to use. 310 * @param p list of broadcasters. 311 * @param le counter 312 * @param sm SortedMap with counter value pairs. 313 */ 314 public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) { 315 channel = s; 316 bcaster = p; 317 listElem = le; 318 theList = sm; 319 } 320 321 322 /** 323 * closeChannel. 324 */ 325 public void closeChannel() { 326 channel.close(); 327 } 328 329 330 /** 331 * sendChannel. 332 * @param n counter. 333 * @param o value. 334 * @throws IOException 335 */ 336 public void sendChannel(Object n, Object o) throws IOException { 337 synchronized (channel) { 338 channel.send(n); 339 channel.send(o); 340 } 341 } 342 343 344 /** 345 * broadcast. 346 * @param o object to store and send. 347 */ 348 public void broadcast(Object o) { 349 Counter li = null; 350 synchronized (listElem) { 351 listElem.add(1); 352 li = new Counter( listElem.intValue() ); 353 } 354 synchronized (theList) { 355 theList.put( li, o ); 356 } 357 synchronized (bcaster) { 358 Iterator it = bcaster.iterator(); 359 while ( it.hasNext() ) { 360 Broadcaster br = (Broadcaster) it.next(); 361 try { 362 br.sendChannel(li,o); 363 //System.out.println("bcast: "+o+" to "+x.channel); 364 } catch (IOException e) { 365 try { 366 br.closeChannel(); 367 while ( br.isAlive() ) { 368 br.interrupt(); 369 br.join(100); 370 } 371 } catch (InterruptedException u) { 372 Thread.currentThread().interrupt(); 373 } 374 bcaster.remove( br ); 375 } 376 } 377 } 378 } 379 380 381 /** 382 * run. 383 */ 384 @Override 385 public void run() { 386 Object o; 387 boolean goon = true; 388 while (goon) { 389 try { 390 o = channel.receive(); 391 //System.out.println("receive: "+o+" from "+channel); 392 broadcast(o); 393 if ( this.isInterrupted() ) { 394 goon = false; 395 } 396 } catch (IOException e) { 397 goon = false; 398 } catch (ClassNotFoundException e) { 399 goon = false; 400 e.printStackTrace(); 401 402 } 403 } 404 logger.debug("broadcaster terminated "+this); 405 channel.close(); 406 } 407 408 409 /** 410 * toString. 411 * @return a string representation of this. 412 */ 413 @Override 414 public String toString() { 415 return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")"; 416 } 417 418 }