001/* 002 * $Id$ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Iterator; 011import java.util.List; 012import java.util.Map.Entry; 013import java.util.SortedMap; 014import java.util.TreeMap; 015 016import org.apache.logging.log4j.LogManager; 017import org.apache.logging.log4j.Logger; 018 019 020/** 021 * Server for the distributed version of a list. TODO: redistribute list for 022 * late coming clients, removal of elements. 023 * @author Heinz Kredel 024 */ 025 026public class DistHashTableServer<K> extends Thread { 027 028 029 private static final Logger logger = LogManager.getLogger(DistHashTableServer.class); 030 031 032 private static final boolean debug = logger.isDebugEnabled(); 033 034 035 public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99; 036 037 038 protected final ChannelFactory cf; 039 040 041 protected List<DHTBroadcaster<K>> servers; 042 043 044 private volatile boolean goon = true; 045 046 047 private volatile Thread mythread = null; 048 049 050 protected final SortedMap<K, DHTTransport> theList; 051 052 053 private long etime; 054 055 056 private long dtime; 057 058 059 private long ertime; 060 061 062 private long drtime; 063 064 065 /** 066 * Constructs a new DistHashTableServer. 067 */ 068 public DistHashTableServer() { 069 this(DEFAULT_PORT); 070 } 071 072 073 /** 074 * DistHashTableServer. 075 * @param port to run server on. 076 */ 077 public DistHashTableServer(int port) { 078 this(new ChannelFactory(port)); 079 } 080 081 082 /** 083 * DistHashTableServer. 084 * @param cf ChannelFactory to use. 085 */ 086 public DistHashTableServer(ChannelFactory cf) { 087 this.cf = cf; 088 cf.init(); 089 servers = new ArrayList<DHTBroadcaster<K>>(); 090 theList = new TreeMap<K, DHTTransport>(); 091 etime = DHTTransport.etime; 092 dtime = DHTTransport.dtime; 093 ertime = DHTTransport.ertime; 094 drtime = DHTTransport.drtime; 095 } 096 097 098 /** 099 * main. Usage: DistHashTableServer <port> 100 */ 101 public static void main(String[] args) throws InterruptedException { 102 int port = DEFAULT_PORT; 103 if (args.length < 1) { 104 System.out.println("Usage: DistHashTableServer <port>"); 105 } else { 106 try { 107 port = Integer.parseInt(args[0]); 108 } catch (NumberFormatException e) { 109 } 110 } 111 DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port); 112 dhts.init(); 113 dhts.join(); 114 // until CRTL-C 115 } 116 117 118 /** 119 * thread initialization and start. 120 */ 121 public void init() { 122 this.start(); 123 } 124 125 126 /** 127 * main server method. 128 */ 129 @Override 130 public void run() { 131 SocketChannel channel = null; 132 DHTBroadcaster<K> s = null; 133 mythread = Thread.currentThread(); 134 Entry<K, DHTTransport> e; 135 DHTTransport tc; 136 while (goon) { 137 //logger.debug("list server {} go on", this); 138 try { 139 channel = cf.getChannel(); 140 if (debug) { 141 logger.debug("dls channel = {}", channel); 142 } 143 if (mythread.isInterrupted()) { 144 goon = false; 145 //logger.info("list server {} interrupted", this); 146 } else { 147 s = new DHTBroadcaster<K>(channel, servers, /*listElem,*/theList); 148 int ls = 0; 149 synchronized (servers) { 150 if (goon) { 151 servers.add(s); 152 ls = theList.size(); 153 s.start(); 154 } 155 } 156 if (debug) { 157 logger.info("server {} started {}", s, s.isAlive()); 158 } 159 if (ls > 0) { 160 //logger.debug("sending {} list elements", ls); 161 synchronized (theList) { 162 Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator(); 163 for (int i = 0; i < ls; i++) { 164 e = it.next(); 165 // n = e.getKey(); // findbugs, already in tc 166 tc = e.getValue(); 167 //DHTTransport tc = (DHTTransport) o; 168 try { 169 s.sendChannel(tc); 170 } catch (IOException ioe) { 171 // stop s 172 } 173 } 174 } 175 } 176 } 177 } catch (InterruptedException end) { 178 goon = false; 179 Thread.currentThread().interrupt(); 180 } 181 } 182 if (debug) { 183 logger.info("DHTserver {} terminated", this); 184 } 185 } 186 187 188 /** 189 * terminate all servers. 190 */ 191 public void terminate() { 192 goon = false; 193 logger.info("terminating"); 194 if (cf != null) { 195 cf.terminate(); 196 } 197 int svs = 0; 198 List<DHTBroadcaster<K>> scopy = null; 199 if (servers != null) { 200 synchronized (servers) { 201 svs = servers.size(); 202 scopy = new ArrayList<DHTBroadcaster<K>>(servers); 203 Iterator<DHTBroadcaster<K>> it = scopy.iterator(); 204 while (it.hasNext()) { 205 DHTBroadcaster<K> br = it.next(); 206 br.goon = false; 207 br.closeChannel(); 208 try { 209 int c = 0; 210 while (br.isAlive()) { 211 c++; 212 if (c > 10) { 213 logger.warn("giving up on {}", br); 214 break; 215 } 216 //System.out.print("."); 217 br.interrupt(); 218 br.join(50); 219 } 220 if (debug) { 221 logger.info("server {} terminated", br); 222 } 223 // now possible: 224 servers.remove(br); 225 } catch (InterruptedException e) { 226 Thread.currentThread().interrupt(); 227 } 228 } 229 servers.clear(); 230 } 231 logger.info("{} broadcasters terminated {}", svs, scopy); 232 //? servers = null; 233 } 234 logger.debug("DHTBroadcasters terminated"); 235 long enc = DHTTransport.etime - etime; 236 long dec = DHTTransport.dtime - dtime; 237 long encr = DHTTransport.ertime - ertime; 238 long decr = DHTTransport.drtime - drtime; 239 long drest = (encr * dec) / (enc + 1); 240 long sumest = enc + dec + encr + drest; // +decr not meaningful 241 logger.info("DHT time: encode = {}, decode = {}, enc raw = {}, dec raw wait = {}, dec raw est = {}, sum est = {}", enc, dec, encr, decr, drest, sumest); 242 if (mythread == null) { 243 return; 244 } 245 try { 246 while (mythread.isAlive()) { 247 //System.out.print("-"); 248 mythread.interrupt(); 249 mythread.join(100); 250 } 251 logger.warn("server terminated {}", mythread); 252 } catch (InterruptedException e) { 253 Thread.currentThread().interrupt(); 254 } 255 mythread = null; 256 logger.info("terminated"); 257 } 258 259 260 /** 261 * number of servers. 262 */ 263 public int size() { 264 if (servers == null) { 265 return -1; 266 } 267 //synchronized (servers) removed 268 return servers.size(); 269 } 270 271 272 /** 273 * toString. 274 * @return a string representation of this. 275 */ 276 @Override 277 public String toString() { 278 return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")"; 279 } 280 281} 282 283 284/** 285 * Thread for broadcasting all incoming objects to the list clients. 286 */ 287class DHTBroadcaster<K> extends Thread /*implements Runnable*/ { 288 289 290 private static final Logger logger = LogManager.getLogger(DHTBroadcaster.class); 291 292 293 private static final boolean debug = logger.isDebugEnabled(); 294 295 296 private final SocketChannel channel; 297 298 299 private final List<DHTBroadcaster<K>> bcaster; 300 301 302 private final SortedMap<K, DHTTransport> theList; 303 304 305 volatile boolean goon = true; 306 307 308 /** 309 * DHTBroadcaster. 310 * @param s SocketChannel to use. 311 * @param bc list of broadcasters. 312 * @param sm SortedMap with key value pairs. 313 */ 314 public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) { 315 channel = s; 316 bcaster = bc; 317 theList = sm; 318 } 319 320 321 /** 322 * closeChannel. 323 */ 324 public void closeChannel() { 325 channel.close(); 326 } 327 328 329 /** 330 * sendChannel. 331 * @param tc DHTTransport. 332 * @throws IOException 333 */ 334 public void sendChannel(DHTTransport tc) throws IOException { 335 if (goon) { 336 channel.send(tc); 337 } 338 } 339 340 341 /** 342 * broadcast. 343 * @param o DHTTransport element to broadcast. 344 */ 345 @SuppressWarnings({ "unchecked", "cast" }) 346 public void broadcast(DHTTransport o) { 347 if (debug) { 348 logger.debug("broadcast = {}", o); 349 } 350 DHTTransport<K, Object> tc = null; 351 if (o == null) { 352 return; 353 } 354 //if ( ! (o instanceof DHTTransport) ) { 355 // return; 356 //} 357 tc = (DHTTransport<K, Object>) o; 358 K key = null; 359 synchronized (theList) { 360 //test 361 //Object x = theList.get( tc.key ); 362 //if ( x != null ) { 363 // logger.info("theList duplicate key {}", tc.key ); 364 //} 365 try { 366 if (!(o instanceof DHTTransportClear)) { 367 key = tc.key(); 368 theList.put(key, tc); 369 } 370 } catch (IOException e) { 371 logger.warn("IO exception: tc.key() not ok {}", tc); 372 e.printStackTrace(); 373 } catch (ClassNotFoundException e) { 374 logger.warn("CNF exception: tc.key() not ok {}", tc); 375 e.printStackTrace(); 376 } catch (Exception e) { 377 logger.warn("exception: tc.key() not ok {}", tc); 378 e.printStackTrace(); 379 } 380 } 381 logger.info("sending key={} to {} nodes", key, bcaster.size()); 382 List<DHTBroadcaster<K>> bccopy = null; 383 synchronized (bcaster) { 384 bccopy = new ArrayList<DHTBroadcaster<K>>(bcaster); 385 } 386 Iterator<DHTBroadcaster<K>> it = bccopy.iterator(); 387 while (it.hasNext()) { 388 DHTBroadcaster<K> br = it.next(); 389 try { 390 if (debug) { 391 logger.debug("bcasting to {}", br); 392 } 393 br.sendChannel(tc); 394 } catch (IOException e) { 395 logger.info("bcaster, IOexception {}", e); 396 synchronized (bcaster) { 397 bcaster.remove(br); //no more: ConcurrentModificationException 398 } 399 try { 400 br.goon = false; 401 br.closeChannel(); 402 while (br.isAlive()) { 403 br.interrupt(); 404 br.join(100); 405 } 406 } catch (InterruptedException w) { 407 Thread.currentThread().interrupt(); 408 } 409 // 410 logger.info("bcaster.remove() {}", br); 411 } catch (Exception e) { 412 logger.info("bcaster, exception {}", e); 413 } 414 } 415 } 416 417 418 /** 419 * run. 420 */ 421 @Override 422 public void run() { 423 goon = true; 424 while (goon) { 425 try { 426 logger.debug("trying to receive"); 427 Object o = channel.receive(); 428 if (this.isInterrupted()) { 429 goon = false; 430 break; 431 } 432 if (debug) { 433 logger.debug("received = {}", o); 434 } 435 if (!(o instanceof DHTTransport)) { 436 logger.warn("wrong object type: {}", o); 437 goon = false; 438 break; //continue; 439 } 440 if (o instanceof DHTTransportClear) { 441 logger.info("receive, clear"); 442 synchronized (theList) { 443 theList.clear(); 444 theList.notifyAll(); 445 } 446 } 447 DHTTransport tc = (DHTTransport) o; 448 broadcast(tc); 449 if (this.isInterrupted()) { 450 goon = false; 451 } 452 } catch (IOException e) { 453 goon = false; 454 logger.info("receive, IO exception {}", e); 455 //e.printStackTrace(); 456 } catch (ClassNotFoundException e) { 457 goon = false; 458 logger.info("receive, CNF exception {}", e); 459 e.printStackTrace(); 460 } catch (Exception e) { 461 goon = false; 462 logger.info("receive, exception {}", e); 463 e.printStackTrace(); 464 } 465 } 466 logger.info("ending {}", this); 467 synchronized (bcaster) { 468 bcaster.remove(this); 469 } 470 channel.close(); 471 } 472 473 474 /** 475 * toString. 476 * @return a string representation of this. 477 */ 478 @Override 479 public String toString() { 480 return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")"; 481 } 482 483}