001/* 002 * $Id: DistHashTableServer.java 4074 2012-07-28 10:04:58Z kredel $ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Iterator; 011import java.util.List; 012import java.util.Map.Entry; 013import java.util.SortedMap; 014import java.util.TreeMap; 015 016import org.apache.log4j.Logger; 017 018 019/** 020 * Server for the distributed version of a list. 021 * @author Heinz Kredel 022 * @todo redistribute list for late coming clients, removal of elements. 023 */ 024 025public class DistHashTableServer<K> extends Thread { 026 027 028 private static final Logger logger = Logger.getLogger(DistHashTableServer.class); 029 030 031 public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99; 032 033 034 protected final ChannelFactory cf; 035 036 037 protected List<DHTBroadcaster<K>> servers; 038 039 040 private boolean goon = true; 041 042 043 private Thread mythread = null; 044 045 046 protected final SortedMap<K, DHTTransport> theList; 047 048 049 private long etime; 050 051 052 private long dtime; 053 054 055 private long ertime; 056 057 058 private long drtime; 059 060 061 /** 062 * Constructs a new DistHashTableServer. 063 */ 064 public DistHashTableServer() { 065 this(DEFAULT_PORT); 066 } 067 068 069 /** 070 * DistHashTableServer. 071 * @param port to run server on. 072 */ 073 public DistHashTableServer(int port) { 074 this(new ChannelFactory(port)); 075 } 076 077 078 /** 079 * DistHashTableServer. 080 * @param cf ChannelFactory to use. 081 */ 082 public DistHashTableServer(ChannelFactory cf) { 083 this.cf = cf; 084 cf.init(); 085 servers = new ArrayList<DHTBroadcaster<K>>(); 086 theList = new TreeMap<K, DHTTransport>(); 087 etime = DHTTransport.etime; 088 dtime = DHTTransport.dtime; 089 ertime = DHTTransport.ertime; 090 drtime = DHTTransport.drtime; 091 } 092 093 094 /** 095 * main. Usage: DistHashTableServer <port> 096 */ 097 public static void main(String[] args) throws InterruptedException { 098 int port = DEFAULT_PORT; 099 if (args.length < 1) { 100 System.out.println("Usage: DistHashTableServer <port>"); 101 } else { 102 try { 103 port = Integer.parseInt(args[0]); 104 } catch (NumberFormatException e) { 105 } 106 } 107 DistHashTableServer dhts = new DistHashTableServer/*raw: <K>*/(port); 108 dhts.init(); 109 dhts.join(); 110 // until CRTL-C 111 } 112 113 114 /** 115 * thread initialization and start. 116 */ 117 public void init() { 118 this.start(); 119 } 120 121 122 /** 123 * main server method. 124 */ 125 @Override 126 public void run() { 127 SocketChannel channel = null; 128 DHTBroadcaster<K> s = null; 129 mythread = Thread.currentThread(); 130 Entry<K, DHTTransport> e; 131 DHTTransport tc; 132 while (goon) { 133 //logger.debug("list server " + this + " go on"); 134 try { 135 channel = cf.getChannel(); 136 if (logger.isDebugEnabled()) { 137 logger.debug("dls channel = " + channel); 138 } 139 if (mythread.isInterrupted()) { 140 goon = false; 141 //logger.info("list server " + this + " interrupted"); 142 } else { 143 s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList); 144 int ls = 0; 145 synchronized (servers) { 146 if (goon) { 147 servers.add(s); 148 ls = theList.size(); 149 s.start(); 150 } 151 } 152 if (logger.isInfoEnabled()) { 153 logger.info("server " + s + " started " + s.isAlive()); 154 } 155 if (ls > 0) { 156 //logger.debug("sending " + ls + " list elements"); 157 synchronized (theList) { 158 Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator(); 159 for (int i = 0; i < ls; i++) { 160 e = it.next(); 161 // n = e.getKey(); // findbugs, already in tc 162 tc = e.getValue(); 163 //DHTTransport tc = (DHTTransport) o; 164 try { 165 s.sendChannel(tc); 166 } catch (IOException ioe) { 167 // stop s 168 } 169 } 170 } 171 } 172 } 173 } catch (InterruptedException end) { 174 goon = false; 175 Thread.currentThread().interrupt(); 176 } 177 } 178 if (logger.isDebugEnabled()) { 179 logger.debug("listserver " + this + " terminated"); 180 } 181 } 182 183 184 /** 185 * terminate all servers. 186 */ 187 public void terminate() { 188 goon = false; 189 logger.debug("terminating ListServer"); 190 if (cf != null) { 191 cf.terminate(); 192 } 193 int svs = 0; 194 if (servers != null) { 195 synchronized (servers) { 196 svs = servers.size(); 197 Iterator<DHTBroadcaster<K>> it = servers.iterator(); 198 while (it.hasNext()) { 199 DHTBroadcaster<K> br = it.next(); 200 br.closeChannel(); 201 try { 202 int c = 0; 203 while (br.isAlive()) { 204 c++; 205 if (c > 10) { 206 logger.warn("giving up on " + br); 207 break; 208 } 209 //System.out.print("."); 210 br.interrupt(); 211 br.join(100); 212 } 213 if (logger.isDebugEnabled()) { 214 logger.debug("server " + br + " terminated"); 215 } 216 } catch (InterruptedException e) { 217 Thread.currentThread().interrupt(); 218 } 219 } 220 servers.clear(); 221 } 222 logger.info(svs + " broadcasters terminated"); 223 //? servers = null; 224 } 225 logger.debug("DHTBroadcasters terminated"); 226 long enc = DHTTransport.etime - etime; 227 long dec = DHTTransport.dtime - dtime; 228 long encr = DHTTransport.ertime - ertime; 229 long decr = DHTTransport.drtime - drtime; 230 long drest = (encr * dec) / (enc + 1); 231 logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr 232 + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = " 233 + (enc + dec + encr + drest)); // +decr not meaningful 234 if (mythread == null) { 235 return; 236 } 237 try { 238 while (mythread.isAlive()) { 239 //System.out.print("-"); 240 mythread.interrupt(); 241 mythread.join(100); 242 } 243 if (logger.isDebugEnabled()) { 244 logger.debug("server terminated " + mythread); 245 } 246 } catch (InterruptedException e) { 247 Thread.currentThread().interrupt(); 248 } 249 mythread = null; 250 logger.debug("ListServer terminated"); 251 } 252 253 254 /** 255 * number of servers. 256 */ 257 public int size() { 258 if ( servers == null ) { 259 return -1; 260 } 261 synchronized (servers) { 262 return servers.size(); 263 } 264 } 265 266 267 /** 268 * toString. 269 * @return a string representation of this. 270 */ 271 @Override 272 public String toString() { 273 return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")"; 274 } 275 276} 277 278 279/** 280 * Thread for broadcasting all incoming objects to the list clients. 281 */ 282 283class DHTBroadcaster<K> extends Thread /*implements Runnable*/{ 284 285 286 private static final Logger logger = Logger.getLogger(DHTBroadcaster.class); 287 288 289 private final SocketChannel channel; 290 291 292 private final List<DHTBroadcaster<K>> bcaster; 293 294 295 private final SortedMap<K, DHTTransport> theList; 296 297 298 /** 299 * DHTBroadcaster. 300 * @param s SocketChannel to use. 301 * @param bc list of broadcasters. 302 * @param le DHTCounter. 303 * @param sm SortedMap with key value pairs. 304 */ 305 public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) { 306 channel = s; 307 bcaster = bc; 308 theList = sm; 309 } 310 311 312 /** 313 * closeChannel. 314 */ 315 public void closeChannel() { 316 channel.close(); 317 } 318 319 320 /** 321 * sendChannel. 322 * @param tc DHTTransport. 323 * @throws IOException 324 */ 325 public void sendChannel(DHTTransport tc) throws IOException { 326 channel.send(tc); 327 } 328 329 330 /** 331 * broadcast. 332 * @param o DHTTransport element to broadcast. 333 */ 334 @SuppressWarnings("unchecked") 335 public void broadcast(DHTTransport o) { 336 if (logger.isDebugEnabled()) { 337 logger.debug("broadcast = " + o); 338 } 339 DHTTransport<K, Object> tc = null; 340 if (o == null) { 341 return; 342 } 343 //if ( ! (o instanceof DHTTransport) ) { 344 // return; 345 //} 346 tc = (DHTTransport<K, Object>) o; 347 K key = null; 348 synchronized (theList) { 349 //test 350 //Object x = theList.get( tc.key ); 351 //if ( x != null ) { 352 // logger.info("theList duplicate key " + tc.key ); 353 //} 354 try { 355 key = tc.key(); 356 theList.put(key, tc); 357 } catch (IOException e) { 358 logger.warn("IO exception: tc.key() not ok " + tc); 359 e.printStackTrace(); 360 } catch (ClassNotFoundException e) { 361 logger.warn("CNF exception: tc.key() not ok " + tc); 362 e.printStackTrace(); 363 } catch (Exception e) { 364 logger.warn("exception:tc.key() not ok " + tc); 365 e.printStackTrace(); 366 } 367 } 368 logger.info("sending key=" + key + " to " + bcaster.size() + " nodes"); 369 synchronized (bcaster) { 370 Iterator<DHTBroadcaster<K>> it = bcaster.iterator(); 371 while (it.hasNext()) { 372 DHTBroadcaster<K> br = it.next(); 373 try { 374 if (logger.isDebugEnabled()) { 375 logger.debug("bcasting to " + br); 376 } 377 br.sendChannel(tc); 378 } catch (IOException e) { 379 logger.info("bcaster, exception " + e); 380 try { 381 br.closeChannel(); 382 while (br.isAlive()) { 383 br.interrupt(); 384 br.join(100); 385 } 386 } catch (InterruptedException w) { 387 Thread.currentThread().interrupt(); 388 } 389 it.remove( /*br*/); //ConcurrentModificationException 390 logger.debug("bcaster.remove() " + br); 391 } catch (Exception e) { 392 logger.info("bcaster, exception " + e); 393 } 394 } 395 } 396 } 397 398 399 /** 400 * run. 401 */ 402 @Override 403 public void run() { 404 boolean goon = true; 405 while (goon) { 406 try { 407 logger.debug("trying to receive"); 408 Object o = channel.receive(); 409 if (this.isInterrupted()) { 410 break; 411 } 412 if (logger.isDebugEnabled()) { 413 logger.debug("received = " + o); 414 } 415 if (!(o instanceof DHTTransport)) { 416 logger.warn("swallowed: " + o); 417 continue; 418 } 419 DHTTransport tc = (DHTTransport) o; 420 broadcast(tc); 421 if (this.isInterrupted()) { 422 goon = false; 423 } 424 } catch (IOException e) { 425 goon = false; 426 logger.info("receive, IO exception " + e); 427 //e.printStackTrace(); 428 } catch (ClassNotFoundException e) { 429 goon = false; 430 logger.info("receive, CNF exception " + e); 431 e.printStackTrace(); 432 } catch (Exception e) { 433 goon = false; 434 logger.info("receive, exception " + e); 435 e.printStackTrace(); 436 } 437 } 438 if (logger.isDebugEnabled()) { 439 logger.debug("DHTBroadcaster terminated " + this); 440 } 441 channel.close(); 442 } 443 444 445 /** 446 * toString. 447 * @return a string representation of this. 448 */ 449 @Override 450 public String toString() { 451 return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")"; 452 } 453 454}