001/* 002 * $Id: DistHashTableServer.java 5872 2018-07-20 16:01:46Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Iterator; 011import java.util.List; 012import java.util.Map.Entry; 013import java.util.SortedMap; 014import java.util.TreeMap; 015 016import org.apache.logging.log4j.Logger; 017import org.apache.logging.log4j.LogManager; 018 019 020/** 021 * Server for the distributed version of a list. 022 * TODO: redistribute list for late coming clients, removal 023 * of elements. 024 * @author Heinz Kredel 025 */ 026 027public class DistHashTableServer<K> extends Thread { 028 029 030 private static final Logger logger = LogManager.getLogger(DistHashTableServer.class); 031 032 033 public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99; 034 035 036 protected final ChannelFactory cf; 037 038 039 protected List<DHTBroadcaster<K>> servers; 040 041 042 private volatile boolean goon = true; 043 044 045 private volatile Thread mythread = null; 046 047 048 protected final SortedMap<K, DHTTransport> theList; 049 050 051 private long etime; 052 053 054 private long dtime; 055 056 057 private long ertime; 058 059 060 private long drtime; 061 062 063 /** 064 * Constructs a new DistHashTableServer. 065 */ 066 public DistHashTableServer() { 067 this(DEFAULT_PORT); 068 } 069 070 071 /** 072 * DistHashTableServer. 073 * @param port to run server on. 074 */ 075 public DistHashTableServer(int port) { 076 this(new ChannelFactory(port)); 077 } 078 079 080 /** 081 * DistHashTableServer. 082 * @param cf ChannelFactory to use. 083 */ 084 public DistHashTableServer(ChannelFactory cf) { 085 this.cf = cf; 086 cf.init(); 087 servers = new ArrayList<DHTBroadcaster<K>>(); 088 theList = new TreeMap<K, DHTTransport>(); 089 etime = DHTTransport.etime; 090 dtime = DHTTransport.dtime; 091 ertime = DHTTransport.ertime; 092 drtime = DHTTransport.drtime; 093 } 094 095 096 /** 097 * main. Usage: DistHashTableServer <port> 098 */ 099 public static void main(String[] args) throws InterruptedException { 100 int port = DEFAULT_PORT; 101 if (args.length < 1) { 102 System.out.println("Usage: DistHashTableServer <port>"); 103 } else { 104 try { 105 port = Integer.parseInt(args[0]); 106 } catch (NumberFormatException e) { 107 } 108 } 109 DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port); 110 dhts.init(); 111 dhts.join(); 112 // until CRTL-C 113 } 114 115 116 /** 117 * thread initialization and start. 118 */ 119 public void init() { 120 this.start(); 121 } 122 123 124 /** 125 * main server method. 126 */ 127 @Override 128 public void run() { 129 SocketChannel channel = null; 130 DHTBroadcaster<K> s = null; 131 mythread = Thread.currentThread(); 132 Entry<K, DHTTransport> e; 133 DHTTransport tc; 134 while (goon) { 135 //logger.debug("list server " + this + " go on"); 136 try { 137 channel = cf.getChannel(); 138 if (logger.isDebugEnabled()) { 139 logger.debug("dls channel = " + channel); 140 } 141 if (mythread.isInterrupted()) { 142 goon = false; 143 //logger.info("list server " + this + " interrupted"); 144 } else { 145 s = new DHTBroadcaster<K>(channel, servers, /*listElem,*/theList); 146 int ls = 0; 147 synchronized (servers) { 148 if (goon) { 149 servers.add(s); 150 ls = theList.size(); 151 s.start(); 152 } 153 } 154 if (logger.isDebugEnabled()) { 155 logger.info("server " + s + " started " + s.isAlive()); 156 } 157 if (ls > 0) { 158 //logger.debug("sending " + ls + " list elements"); 159 synchronized (theList) { 160 Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator(); 161 for (int i = 0; i < ls; i++) { 162 e = it.next(); 163 // n = e.getKey(); // findbugs, already in tc 164 tc = e.getValue(); 165 //DHTTransport tc = (DHTTransport) o; 166 try { 167 s.sendChannel(tc); 168 } catch (IOException ioe) { 169 // stop s 170 } 171 } 172 } 173 } 174 } 175 } catch (InterruptedException end) { 176 goon = false; 177 Thread.currentThread().interrupt(); 178 } 179 } 180 if (logger.isDebugEnabled()) { 181 logger.info("DHTserver " + this + " terminated"); 182 } 183 } 184 185 186 /** 187 * terminate all servers. 188 */ 189 public void terminate() { 190 goon = false; 191 logger.info("terminating"); 192 if (cf != null) { 193 cf.terminate(); 194 } 195 int svs = 0; 196 List<DHTBroadcaster<K>> scopy = null; 197 if (servers != null) { 198 synchronized (servers) { 199 svs = servers.size(); 200 scopy = new ArrayList<DHTBroadcaster<K>>(servers); 201 Iterator<DHTBroadcaster<K>> it = scopy.iterator(); 202 while (it.hasNext()) { 203 DHTBroadcaster<K> br = it.next(); 204 br.goon = false; 205 br.closeChannel(); 206 try { 207 int c = 0; 208 while (br.isAlive()) { 209 c++; 210 if (c > 10) { 211 logger.warn("giving up on " + br); 212 break; 213 } 214 //System.out.print("."); 215 br.interrupt(); 216 br.join(50); 217 } 218 if (logger.isDebugEnabled()) { 219 logger.info("server+ " + br + " terminated"); 220 } 221 // now possible: 222 servers.remove(br); 223 } catch (InterruptedException e) { 224 Thread.currentThread().interrupt(); 225 } 226 } 227 servers.clear(); 228 } 229 logger.info("" + svs + " broadcasters terminated " + scopy); 230 //? servers = null; 231 } 232 logger.debug("DHTBroadcasters terminated"); 233 long enc = DHTTransport.etime - etime; 234 long dec = DHTTransport.dtime - dtime; 235 long encr = DHTTransport.ertime - ertime; 236 long decr = DHTTransport.drtime - drtime; 237 long drest = (encr * dec) / (enc + 1); 238 logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr 239 + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = " 240 + (enc + dec + encr + drest)); // +decr not meaningful 241 if (mythread == null) { 242 return; 243 } 244 try { 245 while (mythread.isAlive()) { 246 //System.out.print("-"); 247 mythread.interrupt(); 248 mythread.join(100); 249 } 250 if (logger.isWarnEnabled()) { 251 logger.warn("server terminated " + mythread); 252 } 253 } catch (InterruptedException e) { 254 Thread.currentThread().interrupt(); 255 } 256 mythread = null; 257 logger.info("terminated"); 258 } 259 260 261 /** 262 * number of servers. 263 */ 264 public int size() { 265 if (servers == null) { 266 return -1; 267 } 268 //synchronized (servers) removed 269 return servers.size(); 270 } 271 272 273 /** 274 * toString. 275 * @return a string representation of this. 276 */ 277 @Override 278 public String toString() { 279 return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")"; 280 } 281 282} 283 284 285/** 286 * Thread for broadcasting all incoming objects to the list clients. 287 */ 288class DHTBroadcaster<K> extends Thread /*implements Runnable*/ { 289 290 291 private static final Logger logger = LogManager.getLogger(DHTBroadcaster.class); 292 293 294 private final SocketChannel channel; 295 296 297 private final List<DHTBroadcaster<K>> bcaster; 298 299 300 private final SortedMap<K, DHTTransport> theList; 301 302 303 volatile boolean goon = true; 304 305 306 /** 307 * DHTBroadcaster. 308 * @param s SocketChannel to use. 309 * @param bc list of broadcasters. 310 * @param le DHTCounter. 311 * @param sm SortedMap with key value pairs. 312 */ 313 public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) { 314 channel = s; 315 bcaster = bc; 316 theList = sm; 317 } 318 319 320 /** 321 * closeChannel. 322 */ 323 public void closeChannel() { 324 channel.close(); 325 } 326 327 328 /** 329 * sendChannel. 330 * @param tc DHTTransport. 331 * @throws IOException 332 */ 333 public void sendChannel(DHTTransport tc) throws IOException { 334 if (goon) { 335 channel.send(tc); 336 } 337 } 338 339 340 /** 341 * broadcast. 342 * @param o DHTTransport element to broadcast. 343 */ 344 @SuppressWarnings({ "unchecked", "cast" }) 345 public void broadcast(DHTTransport o) { 346 if (logger.isDebugEnabled()) { 347 logger.debug("broadcast = " + o); 348 } 349 DHTTransport<K, Object> tc = null; 350 if (o == null) { 351 return; 352 } 353 //if ( ! (o instanceof DHTTransport) ) { 354 // return; 355 //} 356 tc = (DHTTransport<K, Object>) o; 357 K key = null; 358 synchronized (theList) { 359 //test 360 //Object x = theList.get( tc.key ); 361 //if ( x != null ) { 362 // logger.info("theList duplicate key " + tc.key ); 363 //} 364 try { 365 if (!(o instanceof DHTTransportClear)) { 366 key = tc.key(); 367 theList.put(key, tc); 368 } 369 } catch (IOException e) { 370 logger.warn("IO exception: tc.key() not ok " + tc); 371 e.printStackTrace(); 372 } catch (ClassNotFoundException e) { 373 logger.warn("CNF exception: tc.key() not ok " + tc); 374 e.printStackTrace(); 375 } catch (Exception e) { 376 logger.warn("exception: tc.key() not ok " + tc); 377 e.printStackTrace(); 378 } 379 } 380 logger.info("sending key=" + key + " to " + bcaster.size() + " nodes"); 381 List<DHTBroadcaster<K>> bccopy = null; 382 synchronized (bcaster) { 383 bccopy = new ArrayList<DHTBroadcaster<K>>(bcaster); 384 } 385 Iterator<DHTBroadcaster<K>> it = bccopy.iterator(); 386 while (it.hasNext()) { 387 DHTBroadcaster<K> br = it.next(); 388 try { 389 if (logger.isDebugEnabled()) { 390 logger.debug("bcasting to " + br); 391 } 392 br.sendChannel(tc); 393 } catch (IOException e) { 394 logger.info("bcaster, IOexception " + e); 395 synchronized (bcaster) { 396 bcaster.remove(br); //no more: ConcurrentModificationException 397 } 398 try { 399 br.goon = false; 400 br.closeChannel(); 401 while (br.isAlive()) { 402 br.interrupt(); 403 br.join(100); 404 } 405 } catch (InterruptedException w) { 406 Thread.currentThread().interrupt(); 407 } 408 // 409 logger.info("bcaster.remove() " + br); 410 } catch (Exception e) { 411 logger.info("bcaster, exception " + e); 412 } 413 } 414 } 415 416 417 /** 418 * run. 419 */ 420 @Override 421 public void run() { 422 goon = true; 423 while (goon) { 424 try { 425 logger.debug("trying to receive"); 426 Object o = channel.receive(); 427 if (this.isInterrupted()) { 428 goon = false; 429 break; 430 } 431 if (logger.isDebugEnabled()) { 432 logger.debug("received = " + o); 433 } 434 if (!(o instanceof DHTTransport)) { 435 logger.warn("wrong object type: " + o); 436 goon = false; 437 break; //continue; 438 } 439 if (o instanceof DHTTransportClear) { 440 logger.info("receive, clear"); 441 synchronized (theList) { 442 theList.clear(); 443 theList.notifyAll(); 444 } 445 } 446 DHTTransport tc = (DHTTransport) o; 447 broadcast(tc); 448 if (this.isInterrupted()) { 449 goon = false; 450 } 451 } catch (IOException e) { 452 goon = false; 453 logger.info("receive, IO exception " + e); 454 //e.printStackTrace(); 455 } catch (ClassNotFoundException e) { 456 goon = false; 457 logger.info("receive, CNF exception " + e); 458 e.printStackTrace(); 459 } catch (Exception e) { 460 goon = false; 461 logger.info("receive, exception " + e); 462 e.printStackTrace(); 463 } 464 } 465 if (logger.isInfoEnabled()) { 466 logger.info("ending " + this); 467 } 468 synchronized (bcaster) { 469 bcaster.remove(this); 470 } 471 channel.close(); 472 } 473 474 475 /** 476 * toString. 477 * @return a string representation of this. 478 */ 479 @Override 480 public String toString() { 481 return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")"; 482 } 483 484}