001/* 002 * $Id: DistributedListServer.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005package edu.jas.util; 006 007import java.io.IOException; 008import java.io.Serializable; 009 010import java.util.Iterator; 011import java.util.List; 012import java.util.ArrayList; 013import java.util.SortedMap; 014import java.util.TreeMap; 015import java.util.Map.Entry; 016 017import org.apache.logging.log4j.Logger; 018import org.apache.logging.log4j.LogManager; 019 020//import edu.unima.ky.parallel.ChannelFactory; 021//import edu.unima.ky.parallel.SocketChannel; 022 023 024/** 025 * Server for the distributed version of a list. 026 * @author Heinz Kredel 027 * TODO: redistribute list for late comming clients, removal of elements. 028 */ 029public class DistributedListServer extends Thread { 030 031 private static final Logger logger = LogManager.getLogger(DistributedListServer.class); 032 033 public final static int DEFAULT_PORT = ChannelFactory.DEFAULT_PORT + 99; 034 035 036 protected final ChannelFactory cf; 037 038 protected List<Broadcaster> servers; 039 040 041 private volatile boolean goon = true; 042 043 private volatile Thread mythread = null; 044 045 046 private Counter listElem = null; 047 048 protected final SortedMap<Counter,Object> theList; 049 050 051 /** 052 * Constructs a new DistributedListServer. 053 */ 054 055 public DistributedListServer() { 056 this(DEFAULT_PORT); 057 } 058 059 /** 060 * DistributedListServer. 061 * @param port to run server on. 062 */ 063 public DistributedListServer(int port) { 064 this( new ChannelFactory(port) ); 065 } 066 067 /** 068 * DistributedListServer. 069 * @param cf ChannelFactory to use. 070 */ 071 public DistributedListServer(ChannelFactory cf) { 072 listElem = new Counter(0); 073 this.cf = cf; 074 cf.init(); 075 servers = new ArrayList<Broadcaster>(); 076 theList = new TreeMap<Counter,Object>(); 077 } 078 079 080 /** 081 * main. 082 * Usage: DistributedListServer <port> 083 */ 084 public static void main(String[] args) throws InterruptedException { 085 int port = DEFAULT_PORT; 086 if ( args.length < 1 ) { 087 System.out.println("Usage: DistributedListServer <port>"); 088 } else { 089 try { 090 port = Integer.parseInt( args[0] ); 091 } catch (NumberFormatException e) { 092 } 093 } 094 DistributedListServer dls = new DistributedListServer(port); 095 dls.init(); 096 dls.join(); 097 // until CRTL-C 098 } 099 100 101 /** 102 * thread initialization and start. 103 */ 104 public void init() { 105 this.start(); 106 } 107 108 109 /** 110 * main server method. 111 */ 112 @Override 113 public void run() { 114 SocketChannel channel = null; 115 Broadcaster s = null; 116 mythread = Thread.currentThread(); 117 Entry e; 118 Object n; 119 Object o; 120 while (goon) { 121 // logger.debug("list server " + this + " go on"); 122 try { 123 channel = cf.getChannel(); 124 logger.debug("dls channel = "+channel); 125 if ( mythread.isInterrupted() ) { 126 goon = false; 127 //logger.info("list server " + this + " interrupted"); 128 } else { 129 s = new Broadcaster(channel,servers,listElem,theList); 130 int ls = 0; 131 synchronized (servers) { 132 servers.add( s ); 133 ls = theList.size(); 134 s.start(); 135 } 136 //logger.debug("server " + s + " started"); 137 if ( ls > 0 ) { 138 logger.info("sending " + ls + " list elements"); 139 synchronized (theList) { 140 Iterator it = theList.entrySet().iterator(); 141 for ( int i = 0; i < ls; i++ ) { 142 e = (Entry)it.next(); 143 n = e.getKey(); 144 o = e.getValue(); 145 try { 146 s.sendChannel( n,o ); 147 } catch (IOException ioe) { 148 // stop s 149 } 150 } 151 } 152 } 153 } 154 } catch (InterruptedException end) { 155 goon = false; 156 Thread.currentThread().interrupt(); 157 } 158 } 159 //logger.debug("listserver " + this + " terminated"); 160 } 161 162 163 /** 164 * terminate all servers. 165 */ 166 public void terminate() { 167 goon = false; 168 logger.debug("terminating ListServer"); 169 if ( cf != null ) cf.terminate(); 170 if ( servers != null ) { 171 Iterator it = servers.iterator(); 172 while ( it.hasNext() ) { 173 Broadcaster br = (Broadcaster) it.next(); 174 br.closeChannel(); 175 try { 176 while ( br.isAlive() ) { 177 //System.out.print("."); 178 br.interrupt(); 179 br.join(100); 180 } 181 //logger.debug("server " + br + " terminated"); 182 } catch (InterruptedException e) { 183 Thread.currentThread().interrupt(); 184 } 185 } 186 servers = null; 187 } 188 logger.debug("Broadcasters terminated"); 189 if ( mythread == null ) return; 190 try { 191 while ( mythread.isAlive() ) { 192 // System.out.print("-"); 193 mythread.interrupt(); 194 mythread.join(100); 195 } 196 //logger.debug("server " + mythread + " terminated"); 197 } catch (InterruptedException e) { 198 Thread.currentThread().interrupt(); 199 } 200 mythread = null; 201 logger.debug("ListServer terminated"); 202 } 203 204 205 /** 206 * number of servers. 207 */ 208 public int size() { 209 if ( servers == null ) { 210 return -1; 211 } 212 return servers.size(); 213 } 214 215} 216 217 218/** 219 * Class for holding the list index used as key in TreeMap. 220 * Implemented since Integer has no add() method. 221 * Must implement Comparable so that TreeMap works with correct ordering. 222 */ 223 224class Counter implements Serializable, Comparable<Counter> { 225 226 private int value; 227 228 229 /** 230 * Counter. 231 */ 232 public Counter() { 233 this(0); 234 } 235 236 237 /** 238 * Counter. 239 * @param v 240 */ 241 public Counter(int v) { 242 value = v; 243 } 244 245 246 /** 247 * intValue. 248 * @return the value. 249 */ 250 public int intValue() { 251 return value; 252 } 253 254 255 /** 256 * add. 257 * @param v 258 */ 259 public void add(int v) { // synchronized elsewhere 260 value += v; 261 } 262 263 264 /** 265 * equals. 266 * @param ob an Object. 267 * @return true if this is equal to o, else false. 268 */ 269 @Override 270 public boolean equals(Object ob) { 271 if ( ! (ob instanceof Counter) ) { 272 return false; 273 } 274 return 0 == compareTo( (Counter)ob ); 275 } 276 277 278 /** 279 * compareTo. 280 * @param c a Counter. 281 * @return 1 if (this < c), 0 if (this == c), -1 if (this > c). 282 */ 283 public int compareTo(Counter c) { 284 int x = c.intValue(); 285 if ( value > x ) { 286 return 1; 287 } 288 if ( value < x ) { 289 return -1; 290 } 291 return 0; 292 } 293 294 295 /** 296 * Hash code for this Counter. 297 * @see java.lang.Object#hashCode() 298 */ 299 @Override 300 public int hashCode() { 301 return value; 302 } 303 304 305 /** 306 * toString. 307 */ 308 @Override 309 public String toString() { 310 return "Counter("+value+")"; 311 } 312 313} 314 315 316/** 317 * Thread for broadcasting all incoming objects to the list clients. 318 */ 319 320class Broadcaster extends Thread /*implements Runnable*/ { 321 322 private static final Logger logger = LogManager.getLogger(Broadcaster.class); 323 private final SocketChannel channel; 324 private final List bcaster; 325 private Counter listElem; 326 private final SortedMap<Counter,Object> theList; 327 328 329 /** 330 * Broadcaster. 331 * @param s SocketChannel to use. 332 * @param p list of broadcasters. 333 * @param le counter 334 * @param sm SortedMap with counter value pairs. 335 */ 336 public Broadcaster(SocketChannel s, List p, Counter le, SortedMap<Counter,Object> sm) { 337 channel = s; 338 bcaster = p; 339 listElem = le; 340 theList = sm; 341 } 342 343 344 /** 345 * closeChannel. 346 */ 347 public void closeChannel() { 348 channel.close(); 349 } 350 351 352 /** 353 * sendChannel. 354 * @param n counter. 355 * @param o value. 356 * @throws IOException 357 */ 358 public void sendChannel(Object n, Object o) throws IOException { 359 synchronized (channel) { 360 channel.send(n); 361 channel.send(o); 362 } 363 } 364 365 366 /** 367 * broadcast. 368 * @param o object to store and send. 369 */ 370 public void broadcast(Object o) { 371 Counter li = null; 372 synchronized (listElem) { 373 listElem.add(1); 374 li = new Counter( listElem.intValue() ); 375 } 376 synchronized (theList) { 377 theList.put( li, o ); 378 } 379 synchronized (bcaster) { 380 Iterator it = bcaster.iterator(); 381 while ( it.hasNext() ) { 382 Broadcaster br = (Broadcaster) it.next(); 383 try { 384 br.sendChannel(li,o); 385 //System.out.println("bcast: "+o+" to "+x.channel); 386 } catch (IOException e) { 387 try { 388 br.closeChannel(); 389 while ( br.isAlive() ) { 390 br.interrupt(); 391 br.join(100); 392 } 393 } catch (InterruptedException u) { 394 Thread.currentThread().interrupt(); 395 } 396 bcaster.remove( br ); 397 } 398 } 399 } 400 } 401 402 403 /** 404 * run. 405 */ 406 @Override 407 public void run() { 408 Object o; 409 boolean goon = true; 410 while (goon) { 411 try { 412 o = channel.receive(); 413 //System.out.println("receive: "+o+" from "+channel); 414 broadcast(o); 415 if ( this.isInterrupted() ) { 416 goon = false; 417 } 418 } catch (IOException e) { 419 goon = false; 420 } catch (ClassNotFoundException e) { 421 goon = false; 422 e.printStackTrace(); 423 424 } 425 } 426 logger.debug("broadcaster terminated "+this); 427 channel.close(); 428 } 429 430 431 /** 432 * toString. 433 * @return a string representation of this. 434 */ 435 @Override 436 public String toString() { 437 return "Broadcaster("+channel+","+bcaster.size()+","+listElem+")"; 438 } 439 440}