001/* 002 * $Id: DistHashTableServer.java 4962 2014-10-17 19:05:55Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Iterator; 011import java.util.List; 012import java.util.Map.Entry; 013import java.util.SortedMap; 014import java.util.TreeMap; 015 016import org.apache.log4j.Logger; 017 018 019/** 020 * Server for the distributed version of a list. 021 * @author Heinz Kredel TODO: redistribute list for late coming clients, removal 022 * of elements. 023 */ 024 025public class DistHashTableServer<K> extends Thread { 026 027 028 private static final Logger logger = Logger.getLogger(DistHashTableServer.class); 029 030 031 public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99; 032 033 034 protected final ChannelFactory cf; 035 036 037 protected List<DHTBroadcaster<K>> servers; 038 039 040 private boolean goon = true; 041 042 043 private Thread mythread = null; 044 045 046 protected final SortedMap<K, DHTTransport> theList; 047 048 049 private long etime; 050 051 052 private long dtime; 053 054 055 private long ertime; 056 057 058 private long drtime; 059 060 061 /** 062 * Constructs a new DistHashTableServer. 063 */ 064 public DistHashTableServer() { 065 this(DEFAULT_PORT); 066 } 067 068 069 /** 070 * DistHashTableServer. 071 * @param port to run server on. 072 */ 073 public DistHashTableServer(int port) { 074 this(new ChannelFactory(port)); 075 } 076 077 078 /** 079 * DistHashTableServer. 080 * @param cf ChannelFactory to use. 081 */ 082 public DistHashTableServer(ChannelFactory cf) { 083 this.cf = cf; 084 cf.init(); 085 servers = new ArrayList<DHTBroadcaster<K>>(); 086 theList = new TreeMap<K, DHTTransport>(); 087 etime = DHTTransport.etime; 088 dtime = DHTTransport.dtime; 089 ertime = DHTTransport.ertime; 090 drtime = DHTTransport.drtime; 091 } 092 093 094 /** 095 * main. Usage: DistHashTableServer <port> 096 */ 097 public static void main(String[] args) throws InterruptedException { 098 int port = DEFAULT_PORT; 099 if (args.length < 1) { 100 System.out.println("Usage: DistHashTableServer <port>"); 101 } else { 102 try { 103 port = Integer.parseInt(args[0]); 104 } catch (NumberFormatException e) { 105 } 106 } 107 DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port); 108 dhts.init(); 109 dhts.join(); 110 // until CRTL-C 111 } 112 113 114 /** 115 * thread initialization and start. 116 */ 117 public void init() { 118 this.start(); 119 } 120 121 122 /** 123 * main server method. 124 */ 125 @Override 126 public void run() { 127 SocketChannel channel = null; 128 DHTBroadcaster<K> s = null; 129 mythread = Thread.currentThread(); 130 Entry<K, DHTTransport> e; 131 DHTTransport tc; 132 while (goon) { 133 //logger.debug("list server " + this + " go on"); 134 try { 135 channel = cf.getChannel(); 136 if (logger.isDebugEnabled()) { 137 logger.debug("dls channel = " + channel); 138 } 139 if (mythread.isInterrupted()) { 140 goon = false; 141 //logger.info("list server " + this + " interrupted"); 142 } else { 143 s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList); 144 int ls = 0; 145 synchronized (servers) { 146 if (goon) { 147 servers.add(s); 148 ls = theList.size(); 149 s.start(); 150 } 151 } 152 if (logger.isDebugEnabled()) { 153 logger.info("server " + s + " started " + s.isAlive()); 154 } 155 if (ls > 0) { 156 //logger.debug("sending " + ls + " list elements"); 157 synchronized (theList) { 158 Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator(); 159 for (int i = 0; i < ls; i++) { 160 e = it.next(); 161 // n = e.getKey(); // findbugs, already in tc 162 tc = e.getValue(); 163 //DHTTransport tc = (DHTTransport) o; 164 try { 165 s.sendChannel(tc); 166 } catch (IOException ioe) { 167 // stop s 168 } 169 } 170 } 171 } 172 } 173 } catch (InterruptedException end) { 174 goon = false; 175 Thread.currentThread().interrupt(); 176 } 177 } 178 if (logger.isDebugEnabled()) { 179 logger.info("DHTserver " + this + " terminated"); 180 } 181 } 182 183 184 /** 185 * terminate all servers. 186 */ 187 public void terminate() { 188 goon = false; 189 logger.debug("terminating"); 190 if (cf != null) { 191 cf.terminate(); 192 } 193 int svs = 0; 194 List<DHTBroadcaster<K>> scopy = null; 195 if (servers != null) { 196 synchronized (servers) { 197 svs = servers.size(); 198 scopy = new ArrayList<DHTBroadcaster<K>>(servers); 199 Iterator<DHTBroadcaster<K>> it = scopy.iterator(); 200 while (it.hasNext()) { 201 DHTBroadcaster<K> br = it.next(); 202 br.goon = false; 203 br.closeChannel(); 204 try { 205 int c = 0; 206 while (br.isAlive()) { 207 c++; 208 if (c > 10) { 209 logger.warn("giving up on " + br); 210 break; 211 } 212 //System.out.print("."); 213 br.interrupt(); 214 br.join(100); 215 } 216 if (logger.isDebugEnabled()) { 217 logger.info("server+ " + br + " terminated"); 218 } 219 // now possible: 220 servers.remove(br); 221 } catch (InterruptedException e) { 222 Thread.currentThread().interrupt(); 223 } 224 } 225 servers.clear(); 226 } 227 logger.info("" + svs + " broadcasters terminated " + scopy); 228 //? servers = null; 229 } 230 logger.debug("DHTBroadcasters terminated"); 231 long enc = DHTTransport.etime - etime; 232 long dec = DHTTransport.dtime - dtime; 233 long encr = DHTTransport.ertime - ertime; 234 long decr = DHTTransport.drtime - drtime; 235 long drest = (encr * dec) / (enc + 1); 236 logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr 237 + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = " 238 + (enc + dec + encr + drest)); // +decr not meaningful 239 if (mythread == null) { 240 return; 241 } 242 try { 243 while (mythread.isAlive()) { 244 //System.out.print("-"); 245 mythread.interrupt(); 246 mythread.join(100); 247 } 248 if (logger.isDebugEnabled()) { 249 logger.debug("server terminated " + mythread); 250 } 251 } catch (InterruptedException e) { 252 Thread.currentThread().interrupt(); 253 } 254 mythread = null; 255 logger.debug("terminated"); 256 } 257 258 259 /** 260 * number of servers. 261 */ 262 public int size() { 263 if (servers == null) { 264 return -1; 265 } 266 //synchronized (servers) removed 267 return servers.size(); 268 } 269 270 271 /** 272 * toString. 273 * @return a string representation of this. 274 */ 275 @Override 276 public String toString() { 277 return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")"; 278 } 279 280} 281 282 283/** 284 * Thread for broadcasting all incoming objects to the list clients. 285 */ 286class DHTBroadcaster<K> extends Thread /*implements Runnable*/{ 287 288 289 private static final Logger logger = Logger.getLogger(DHTBroadcaster.class); 290 291 292 private final SocketChannel channel; 293 294 295 private final List<DHTBroadcaster<K>> bcaster; 296 297 298 private final SortedMap<K, DHTTransport> theList; 299 300 301 volatile boolean goon = true; 302 303 304 /** 305 * DHTBroadcaster. 306 * @param s SocketChannel to use. 307 * @param bc list of broadcasters. 308 * @param le DHTCounter. 309 * @param sm SortedMap with key value pairs. 310 */ 311 public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) { 312 channel = s; 313 bcaster = bc; 314 theList = sm; 315 } 316 317 318 /** 319 * closeChannel. 320 */ 321 public void closeChannel() { 322 channel.close(); 323 } 324 325 326 /** 327 * sendChannel. 328 * @param tc DHTTransport. 329 * @throws IOException 330 */ 331 public void sendChannel(DHTTransport tc) throws IOException { 332 if (goon) { 333 channel.send(tc); 334 } 335 } 336 337 338 /** 339 * broadcast. 340 * @param o DHTTransport element to broadcast. 341 */ 342 @SuppressWarnings("cast") 343 public void broadcast(DHTTransport o) { 344 if (logger.isDebugEnabled()) { 345 logger.debug("broadcast = " + o); 346 } 347 DHTTransport<K, Object> tc = null; 348 if (o == null) { 349 return; 350 } 351 //if ( ! (o instanceof DHTTransport) ) { 352 // return; 353 //} 354 tc = (DHTTransport<K, Object>) o; 355 K key = null; 356 synchronized (theList) { 357 //test 358 //Object x = theList.get( tc.key ); 359 //if ( x != null ) { 360 // logger.info("theList duplicate key " + tc.key ); 361 //} 362 try { 363 if (!(o instanceof DHTTransportClear)) { 364 key = tc.key(); 365 theList.put(key, tc); 366 } 367 } catch (IOException e) { 368 logger.warn("IO exception: tc.key() not ok " + tc); 369 e.printStackTrace(); 370 } catch (ClassNotFoundException e) { 371 logger.warn("CNF exception: tc.key() not ok " + tc); 372 e.printStackTrace(); 373 } catch (Exception e) { 374 logger.warn("exception: tc.key() not ok " + tc); 375 e.printStackTrace(); 376 } 377 } 378 logger.info("sending key=" + key + " to " + bcaster.size() + " nodes"); 379 List<DHTBroadcaster<K>> bccopy = null; 380 synchronized (bcaster) { 381 bccopy = new ArrayList<DHTBroadcaster<K>>(bcaster); 382 } 383 Iterator<DHTBroadcaster<K>> it = bccopy.iterator(); 384 while (it.hasNext()) { 385 DHTBroadcaster<K> br = it.next(); 386 try { 387 if (logger.isDebugEnabled()) { 388 logger.debug("bcasting to " + br); 389 } 390 br.sendChannel(tc); 391 } catch (IOException e) { 392 logger.info("bcaster, IOexception " + e); 393 synchronized (bcaster) { 394 bcaster.remove(br); //no more: ConcurrentModificationException 395 } 396 try { 397 br.goon = false; 398 br.closeChannel(); 399 while (br.isAlive()) { 400 br.interrupt(); 401 br.join(100); 402 } 403 } catch (InterruptedException w) { 404 Thread.currentThread().interrupt(); 405 } 406 // 407 logger.info("bcaster.remove() " + br); 408 } catch (Exception e) { 409 logger.info("bcaster, exception " + e); 410 } 411 } 412 } 413 414 415 /** 416 * run. 417 */ 418 @Override 419 public void run() { 420 goon = true; 421 while (goon) { 422 try { 423 logger.debug("trying to receive"); 424 Object o = channel.receive(); 425 if (this.isInterrupted()) { 426 goon = false; 427 break; 428 } 429 if (logger.isDebugEnabled()) { 430 logger.debug("received = " + o); 431 } 432 if (!(o instanceof DHTTransport)) { 433 logger.warn("wrong object type: " + o); 434 goon = false; 435 break; //continue; 436 } 437 if (o instanceof DHTTransportClear) { 438 logger.info("receive, clear"); 439 synchronized (theList) { 440 theList.clear(); 441 theList.notifyAll(); 442 } 443 } 444 DHTTransport tc = (DHTTransport) o; 445 broadcast(tc); 446 if (this.isInterrupted()) { 447 goon = false; 448 } 449 } catch (IOException e) { 450 goon = false; 451 logger.info("receive, IO exception " + e); 452 //e.printStackTrace(); 453 } catch (ClassNotFoundException e) { 454 goon = false; 455 logger.info("receive, CNF exception " + e); 456 e.printStackTrace(); 457 } catch (Exception e) { 458 goon = false; 459 logger.info("receive, exception " + e); 460 e.printStackTrace(); 461 } 462 } 463 if (logger.isDebugEnabled()) { 464 logger.info("terminated+ " + this); 465 } 466 synchronized (bcaster) { 467 bcaster.remove(this); 468 } 469 channel.close(); 470 } 471 472 473 /** 474 * toString. 475 * @return a string representation of this. 476 */ 477 @Override 478 public String toString() { 479 return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")"; 480 } 481 482}