001/* 002 * $Id$ 003 */ 004 005package edu.jas.util; 006 007import java.io.IOException; 008import java.io.Serializable; 009 010import java.util.Iterator; 011import java.util.List; 012import java.util.ArrayList; 013import java.util.SortedMap; 014import java.util.TreeMap; 015import java.util.Map.Entry; 016 017import org.apache.logging.log4j.Logger; 018import org.apache.logging.log4j.LogManager; 019 020//import edu.unima.ky.parallel.ChannelFactory; 021//import edu.unima.ky.parallel.SocketChannel; 022 023 024/** 025 * Server for the distributed version of a list. 026 * @author Heinz Kredel 027 * TODO: redistribute list for late coming clients, removal of elements. 028 */ 029public class DistributedListServer extends Thread { 030 031 private static final Logger logger = LogManager.getLogger(DistributedListServer.class); 032 033 public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99; 034 035 036 protected final ChannelFactory cf; 037 038 protected List<Broadcaster> servers; 039 040 041 private volatile boolean goon = true; 042 043 private volatile Thread mythread = null; 044 045 046 private Counter listElem = null; 047 048 protected final SortedMap<Counter,Object> theList; 049 050 051 /** 052 * Constructs a new DistributedListServer. 053 */ 054 055 public DistributedListServer() { 056 this(DEFAULT_PORT); 057 } 058 059 /** 060 * DistributedListServer. 061 * @param port to run server on. 062 */ 063 public DistributedListServer(int port) { 064 this( new ChannelFactory(port) ); 065 } 066 067 /** 068 * DistributedListServer. 069 * @param cf ChannelFactory to use. 070 */ 071 public DistributedListServer(ChannelFactory cf) { 072 listElem = new Counter(0); 073 this.cf = cf; 074 cf.init(); 075 servers = new ArrayList<Broadcaster>(); 076 theList = new TreeMap<Counter,Object>(); 077 } 078 079 080 /** 081 * main. 082 * Usage: DistributedListServer <port> 083 */ 084 public static void main(String[] args) throws InterruptedException { 085 int port = DEFAULT_PORT; 086 if ( args.length < 1 ) { 087 System.out.println("Usage: DistributedListServer <port>"); 088 } else { 089 try { 090 port = Integer.parseInt( args[0] ); 091 } catch (NumberFormatException e) { 092 } 093 } 094 DistributedListServer dls = new DistributedListServer(port); 095 dls.init(); 096 dls.join(); 097 // until CRTL-C 098 } 099 100 101 /** 102 * thread initialization and start. 103 */ 104 public void init() { 105 this.start(); 106 } 107 108 109 /** 110 * main server method. 111 */ 112 @Override 113 public void run() { 114 SocketChannel channel = null; 115 Broadcaster s = null; 116 mythread = Thread.currentThread(); 117 Entry e; 118 Object n; 119 Object o; 120 while (goon) { 121 // logger.debug("list server {} go on", this); 122 try { 123 channel = cf.getChannel(); 124 logger.debug("dls channel = {}", channel); 125 if ( mythread.isInterrupted() ) { 126 goon = false; 127 //logger.info("list server {} interrupted", this); 128 } else { 129 s = new Broadcaster(channel,servers,listElem,theList); 130 int ls = 0; 131 synchronized (servers) { 132 servers.add( s ); 133 ls = theList.size(); 134 s.start(); 135 } 136 //logger.debug("server {} started", s); 137 if ( ls > 0 ) { 138 logger.info("sending {} list elements", ls); 139 synchronized (theList) { 140 Iterator it = theList.entrySet().iterator(); 141 for ( int i = 0; i < ls; i++ ) { 142 e = (Entry)it.next(); 143 n = e.getKey(); 144 o = e.getValue(); 145 try { 146 s.sendChannel( n,o ); 147 } catch (IOException ioe) { 148 // stop s 149 } 150 } 151 } 152 } 153 } 154 } catch (InterruptedException end) { 155 goon = false; 156 Thread.currentThread().interrupt(); 157 } 158 } 159 //logger.debug("listserver {} terminated", this); 160 } 161 162 163 /** 164 * terminate all servers. 165 */ 166 public void terminate() { 167 goon = false; 168 logger.debug("terminating ListServer"); 169 if ( cf != null ) cf.terminate(); 170 if ( servers != null ) { 171 Iterator it = servers.iterator(); 172 while ( it.hasNext() ) { 173 Broadcaster br = (Broadcaster) it.next(); 174 br.closeChannel(); 175 try { 176 while ( br.isAlive() ) { 177 //System.out.print("."); 178 br.interrupt(); 179 br.join(100); 180 } 181 //logger.debug("server {} terminated", br); 182 } catch (InterruptedException e) { 183 Thread.currentThread().interrupt(); 184 } 185 } 186 servers = null; 187 } 188 logger.debug("Broadcasters terminated"); 189 if ( mythread == null ) return; 190 try { 191 while ( mythread.isAlive() ) { 192 // System.out.print("-"); 193 mythread.interrupt(); 194 mythread.join(100); 195 } 196 //logger.debug("server {} terminated", mythread); 197 } catch (InterruptedException e) { 198 Thread.currentThread().interrupt(); 199 } 200 mythread = null; 201 logger.debug("ListServer terminated"); 202 } 203 204 205 /** 206 * number of servers. 207 */ 208 public int size() { 209 if ( servers == null ) { 210 return -1; 211 } 212 return servers.size(); 213 } 214 215} 216 217 218/** 219 * Class for holding the list index used as key in TreeMap. 220 * Implemented since Integer has no add() method. 221 * Must implement Comparable so that TreeMap works with correct ordering. 222 */ 223 224class Counter implements Serializable, Comparable<Counter> { 225 226 private int value; 227 228 229 /** 230 * Counter. 231 */ 232 public Counter() { 233 this(0); 234 } 235 236 237 /** 238 * Counter. 239 * @param v 240 */ 241 public Counter(int v) { 242 value = v; 243 } 244 245 246 /** 247 * intValue. 248 * @return the value. 249 */ 250 public int intValue() { 251 return value; 252 } 253 254 255 /** 256 * add. 257 * @param v 258 */ 259 public void add(int v) { // synchronized elsewhere 260 value += v; 261 } 262 263 264 /** 265 * equals. 266 * @param ob an Object. 267 * @return true if this is equal to o, else false. 268 */ 269 @Override 270 public boolean equals(Object ob) { 271 if ( ! (ob instanceof Counter) ) { 272 return false; 273 } 274 return 0 == compareTo( (Counter)ob ); 275 } 276 277 278 /** 279 * compareTo. 280 * @param c a Counter. 281 * @return 1 if (this < c), 0 if (this == c), -1 if (this > c). 282 */ 283 public int compareTo(Counter c) { 284 int x = c.intValue(); 285 if ( value > x ) { 286 return 1; 287 } 288 if ( value < x ) { 289 return -1; 290 } 291 return 0; 292 } 293 294 295 /** 296 * Hash code for this Counter. 297 * @see java.lang.Object#hashCode() 298 */ 299 @Override 300 public int hashCode() { 301 return value; 302 } 303 304 305 /** 306 * toString. 307 */ 308 @Override 309 public String toString() { 310 return "Counter("+value+")"; 311 } 312 313} 314 315 316/** 317 * Thread for broadcasting all incoming objects to the list clients. 318 */ 319 320class Broadcaster extends Thread /*implements Runnable*/ { 321 322 private static final Logger logger = LogManager.getLogger(Broadcaster.class); 323 324 private final SocketChannel channel; 325 326 private final List bcaster; 327 328 private Counter listElem; 329 330 private final SortedMap<Counter,Object> theList; 331 332 333 /** 334 * Broadcaster. 335 * @param s SocketChannel to use. 336 * @param p list of broadcasters. 337 * @param le counter 338 * @param sm SortedMap with counter value pairs. 339 */ 340 public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) { 341 channel = s; 342 bcaster = p; 343 listElem = le; 344 theList = sm; 345 } 346 347 348 /** 349 * closeChannel. 350 */ 351 public void closeChannel() { 352 channel.close(); 353 } 354 355 356 /** 357 * sendChannel. 358 * @param n counter. 359 * @param o value. 360 * @throws IOException 361 */ 362 public void sendChannel(Object n, Object o) throws IOException { 363 synchronized (channel) { 364 channel.send(n); 365 channel.send(o); 366 } 367 } 368 369 370 /** 371 * broadcast. 372 * @param o object to store and send. 373 */ 374 public void broadcast(Object o) { 375 Counter li = null; 376 synchronized (listElem) { 377 listElem.add(1); 378 li = new Counter( listElem.intValue() ); 379 } 380 synchronized (theList) { 381 theList.put( li, o ); 382 } 383 synchronized (bcaster) { 384 Iterator it = bcaster.iterator(); 385 while ( it.hasNext() ) { 386 Broadcaster br = (Broadcaster) it.next(); 387 try { 388 br.sendChannel(li,o); 389 //System.out.println("bcast: "+o+" to "+x.channel); 390 } catch (IOException e) { 391 try { 392 br.closeChannel(); 393 while ( br.isAlive() ) { 394 br.interrupt(); 395 br.join(100); 396 } 397 } catch (InterruptedException u) { 398 Thread.currentThread().interrupt(); 399 } 400 bcaster.remove( br ); 401 } 402 } 403 } 404 } 405 406 407 /** 408 * run. 409 */ 410 @Override 411 public void run() { 412 Object o; 413 boolean goon = true; 414 while (goon) { 415 try { 416 o = channel.receive(); 417 //System.out.println("receive: "+o+" from "+channel); 418 broadcast(o); 419 if ( this.isInterrupted() ) { 420 goon = false; 421 } 422 } catch (IOException e) { 423 goon = false; 424 } catch (ClassNotFoundException e) { 425 goon = false; 426 e.printStackTrace(); 427 428 } 429 } 430 logger.debug("broadcaster terminated {}", this); 431 channel.close(); 432 } 433 434 435 /** 436 * toString. 437 * @return a string representation of this. 438 */ 439 @Override 440 public String toString() { 441 return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")"; 442 } 443 444}