001/* 002 * $Id: DistributedListServer.java 4521 2013-07-27 10:16:36Z kredel $ 003 */ 004 005package edu.jas.util; 006 007import java.io.IOException; 008import java.io.Serializable; 009 010import java.util.Iterator; 011import java.util.List; 012import java.util.ArrayList; 013import java.util.SortedMap; 014import java.util.TreeMap; 015import java.util.Map.Entry; 016 017import org.apache.log4j.Logger; 018 019//import edu.unima.ky.parallel.ChannelFactory; 020//import edu.unima.ky.parallel.SocketChannel; 021 022 023/** 024 * Server for the distributed version of a list. 025 * @author Heinz Kredel 026 * TODO: redistribute list for late comming clients, removal of elements. 027 */ 028public class DistributedListServer extends Thread { 029 030 private static final Logger logger = Logger.getLogger(DistributedListServer.class); 031 032 public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99; 033 protected final ChannelFactory cf; 034 035 protected List<Broadcaster> servers; 036 037 private boolean goon = true; 038 private Thread mythread = null; 039 040 private Counter listElem = null; 041 protected final SortedMap<Counter,Object> theList; 042 043 044 /** 045 * Constructs a new DistributedListServer. 046 */ 047 048 public DistributedListServer() { 049 this(DEFAULT_PORT); 050 } 051 052 /** 053 * DistributedListServer. 054 * @param port to run server on. 055 */ 056 public DistributedListServer(int port) { 057 this( new ChannelFactory(port) ); 058 } 059 060 /** 061 * DistributedListServer. 062 * @param cf ChannelFactory to use. 063 */ 064 public DistributedListServer(ChannelFactory cf) { 065 listElem = new Counter(0); 066 this.cf = cf; 067 cf.init(); 068 servers = new ArrayList<Broadcaster>(); 069 theList = new TreeMap<Counter,Object>(); 070 } 071 072 073 /** 074 * main. 075 * Usage: DistributedListServer <port> 076 */ 077 public static void main(String[] args) throws InterruptedException { 078 int port = DEFAULT_PORT; 079 if ( args.length < 1 ) { 080 System.out.println("Usage: DistributedListServer <port>"); 081 } else { 082 try { 083 port = Integer.parseInt( args[0] ); 084 } catch (NumberFormatException e) { 085 } 086 } 087 DistributedListServer dls = new DistributedListServer(port); 088 dls.init(); 089 dls.join(); 090 // until CRTL-C 091 } 092 093 094 /** 095 * thread initialization and start. 096 */ 097 public void init() { 098 this.start(); 099 } 100 101 102 /** 103 * main server method. 104 */ 105 @Override 106 public void run() { 107 SocketChannel channel = null; 108 Broadcaster s = null; 109 mythread = Thread.currentThread(); 110 Entry e; 111 Object n; 112 Object o; 113 while (goon) { 114 // logger.debug("list server " + this + " go on"); 115 try { 116 channel = cf.getChannel(); 117 logger.debug("dls channel = "+channel); 118 if ( mythread.isInterrupted() ) { 119 goon = false; 120 //logger.info("list server " + this + " interrupted"); 121 } else { 122 s = new Broadcaster(channel,servers,listElem,theList); 123 int ls = 0; 124 synchronized (servers) { 125 servers.add( s ); 126 ls = theList.size(); 127 s.start(); 128 } 129 //logger.debug("server " + s + " started"); 130 if ( ls > 0 ) { 131 logger.info("sending " + ls + " list elements"); 132 synchronized (theList) { 133 Iterator it = theList.entrySet().iterator(); 134 for ( int i = 0; i < ls; i++ ) { 135 e = (Entry)it.next(); 136 n = e.getKey(); 137 o = e.getValue(); 138 try { 139 s.sendChannel( n,o ); 140 } catch (IOException ioe) { 141 // stop s 142 } 143 } 144 } 145 } 146 } 147 } catch (InterruptedException end) { 148 goon = false; 149 Thread.currentThread().interrupt(); 150 } 151 } 152 //logger.debug("listserver " + this + " terminated"); 153 } 154 155 156 /** 157 * terminate all servers. 158 */ 159 public void terminate() { 160 goon = false; 161 logger.debug("terminating ListServer"); 162 if ( cf != null ) cf.terminate(); 163 if ( servers != null ) { 164 Iterator it = servers.iterator(); 165 while ( it.hasNext() ) { 166 Broadcaster br = (Broadcaster) it.next(); 167 br.closeChannel(); 168 try { 169 while ( br.isAlive() ) { 170 //System.out.print("."); 171 br.interrupt(); 172 br.join(100); 173 } 174 //logger.debug("server " + br + " terminated"); 175 } catch (InterruptedException e) { 176 Thread.currentThread().interrupt(); 177 } 178 } 179 servers = null; 180 } 181 logger.debug("Broadcasters terminated"); 182 if ( mythread == null ) return; 183 try { 184 while ( mythread.isAlive() ) { 185 // System.out.print("-"); 186 mythread.interrupt(); 187 mythread.join(100); 188 } 189 //logger.debug("server " + mythread + " terminated"); 190 } catch (InterruptedException e) { 191 Thread.currentThread().interrupt(); 192 } 193 mythread = null; 194 logger.debug("ListServer terminated"); 195 } 196 197 198 /** 199 * number of servers. 200 */ 201 public int size() { 202 if ( servers == null ) { 203 return -1; 204 } 205 return servers.size(); 206 } 207 208} 209 210 211/** 212 * Class for holding the list index used as key in TreeMap. 213 * Implemented since Integer has no add() method. 214 * Must implement Comparable so that TreeMap works with correct ordering. 215 */ 216 217class Counter implements Serializable, Comparable<Counter> { 218 219 private int value; 220 221 222 /** 223 * Counter. 224 */ 225 public Counter() { 226 this(0); 227 } 228 229 230 /** 231 * Counter. 232 * @param v 233 */ 234 public Counter(int v) { 235 value = v; 236 } 237 238 239 /** 240 * intValue. 241 * @return the value. 242 */ 243 public int intValue() { 244 return value; 245 } 246 247 248 /** 249 * add. 250 * @param v 251 */ 252 public void add(int v) { // synchronized elsewhere 253 value += v; 254 } 255 256 257 /** 258 * equals. 259 * @param ob an Object. 260 * @return true if this is equal to o, else false. 261 */ 262 @Override 263 public boolean equals(Object ob) { 264 if ( ! (ob instanceof Counter) ) { 265 return false; 266 } 267 return 0 == compareTo( (Counter)ob ); 268 } 269 270 271 /** 272 * compareTo. 273 * @param c a Counter. 274 * @return 1 if (this < c), 0 if (this == c), -1 if (this > c). 275 */ 276 public int compareTo(Counter c) { 277 int x = c.intValue(); 278 if ( value > x ) { 279 return 1; 280 } 281 if ( value < x ) { 282 return -1; 283 } 284 return 0; 285 } 286 287 288 /** 289 * Hash code for this Counter. 290 * @see java.lang.Object#hashCode() 291 */ 292 @Override 293 public int hashCode() { 294 return value; 295 } 296 297 298 /** 299 * toString. 300 */ 301 @Override 302 public String toString() { 303 return "Counter("+value+")"; 304 } 305 306} 307 308 309/** 310 * Thread for broadcasting all incoming objects to the list clients. 311 */ 312 313class Broadcaster extends Thread /*implements Runnable*/ { 314 315 private static final Logger logger = Logger.getLogger(Broadcaster.class); 316 private final SocketChannel channel; 317 private final List bcaster; 318 private Counter listElem; 319 private final SortedMap<Counter,Object> theList; 320 321 322 /** 323 * Broadcaster. 324 * @param s SocketChannel to use. 325 * @param p list of broadcasters. 326 * @param le counter 327 * @param sm SortedMap with counter value pairs. 328 */ 329 public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) { 330 channel = s; 331 bcaster = p; 332 listElem = le; 333 theList = sm; 334 } 335 336 337 /** 338 * closeChannel. 339 */ 340 public void closeChannel() { 341 channel.close(); 342 } 343 344 345 /** 346 * sendChannel. 347 * @param n counter. 348 * @param o value. 349 * @throws IOException 350 */ 351 public void sendChannel(Object n, Object o) throws IOException { 352 synchronized (channel) { 353 channel.send(n); 354 channel.send(o); 355 } 356 } 357 358 359 /** 360 * broadcast. 361 * @param o object to store and send. 362 */ 363 public void broadcast(Object o) { 364 Counter li = null; 365 synchronized (listElem) { 366 listElem.add(1); 367 li = new Counter( listElem.intValue() ); 368 } 369 synchronized (theList) { 370 theList.put( li, o ); 371 } 372 synchronized (bcaster) { 373 Iterator it = bcaster.iterator(); 374 while ( it.hasNext() ) { 375 Broadcaster br = (Broadcaster) it.next(); 376 try { 377 br.sendChannel(li,o); 378 //System.out.println("bcast: "+o+" to "+x.channel); 379 } catch (IOException e) { 380 try { 381 br.closeChannel(); 382 while ( br.isAlive() ) { 383 br.interrupt(); 384 br.join(100); 385 } 386 } catch (InterruptedException u) { 387 Thread.currentThread().interrupt(); 388 } 389 bcaster.remove( br ); 390 } 391 } 392 } 393 } 394 395 396 /** 397 * run. 398 */ 399 @Override 400 public void run() { 401 Object o; 402 boolean goon = true; 403 while (goon) { 404 try { 405 o = channel.receive(); 406 //System.out.println("receive: "+o+" from "+channel); 407 broadcast(o); 408 if ( this.isInterrupted() ) { 409 goon = false; 410 } 411 } catch (IOException e) { 412 goon = false; 413 } catch (ClassNotFoundException e) { 414 goon = false; 415 e.printStackTrace(); 416 417 } 418 } 419 logger.debug("broadcaster terminated "+this); 420 channel.close(); 421 } 422 423 424 /** 425 * toString. 426 * @return a string representation of this. 427 */ 428 @Override 429 public String toString() { 430 return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")"; 431 } 432 433}