001 /* 002 * $Id: DistHashTableServer.java 3296 2010-08-26 17:30:55Z kredel $ 003 */ 004 005 package edu.jas.util; 006 007 008 import java.io.IOException; 009 import java.util.ArrayList; 010 import java.util.Iterator; 011 import java.util.List; 012 import java.util.SortedMap; 013 import java.util.TreeMap; 014 import java.util.Map.Entry; 015 016 import org.apache.log4j.Logger; 017 018 019 /** 020 * Server for the distributed version of a list. 021 * @author Heinz Kredel 022 * @todo redistribute list for late comming clients, removal of elements. 023 */ 024 025 public class DistHashTableServer<K> extends Thread { 026 027 028 private static final Logger logger = Logger.getLogger(DistHashTableServer.class); 029 030 031 public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99; 032 033 034 protected final ChannelFactory cf; 035 036 037 protected List<DHTBroadcaster<K>> servers; 038 039 040 private boolean goon = true; 041 042 043 private Thread mythread = null; 044 045 046 protected final SortedMap<K, DHTTransport> theList; 047 048 049 private long etime; 050 051 052 private long dtime; 053 054 055 private long ertime; 056 057 058 private long drtime; 059 060 061 /** 062 * Constructs a new DistHashTableServer. 063 */ 064 public DistHashTableServer() { 065 this(DEFAULT_PORT); 066 } 067 068 069 /** 070 * DistHashTableServer. 071 * @param port to run server on. 072 */ 073 public DistHashTableServer(int port) { 074 this(new ChannelFactory(port)); 075 } 076 077 078 /** 079 * DistHashTableServer. 080 * @param cf ChannelFactory to use. 081 */ 082 public DistHashTableServer(ChannelFactory cf) { 083 this.cf = cf; 084 cf.init(); 085 servers = new ArrayList<DHTBroadcaster<K>>(); 086 theList = new TreeMap<K, DHTTransport>(); 087 etime = DHTTransport.etime; 088 dtime = DHTTransport.dtime; 089 ertime = DHTTransport.ertime; 090 drtime = DHTTransport.drtime; 091 } 092 093 094 /** 095 * main. Usage: DistHashTableServer <port> 096 */ 097 public static void main(String[] args) { 098 int port = DEFAULT_PORT; 099 if (args.length < 1) { 100 System.out.println("Usage: DistHashTableServer <port>"); 101 } else { 102 try { 103 port = Integer.parseInt(args[0]); 104 } catch (NumberFormatException e) { 105 } 106 } 107 (new DistHashTableServer/*raw: <K>*/(port)).run(); 108 // until CRTL-C 109 } 110 111 112 /** 113 * thread initialization and start. 114 */ 115 public void init() { 116 this.start(); 117 } 118 119 120 /** 121 * main server method. 122 */ 123 @Override 124 public void run() { 125 SocketChannel channel = null; 126 DHTBroadcaster<K> s = null; 127 mythread = Thread.currentThread(); 128 Entry<K, DHTTransport> e; 129 K n; 130 DHTTransport tc; 131 while (goon) { 132 //logger.debug("list server " + this + " go on"); 133 try { 134 channel = cf.getChannel(); 135 if (logger.isDebugEnabled()) { 136 logger.debug("dls channel = " + channel); 137 } 138 if (mythread.isInterrupted()) { 139 goon = false; 140 //logger.info("list server " + this + " interrupted"); 141 } else { 142 s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList); 143 int ls = 0; 144 synchronized (servers) { 145 if (goon) { 146 servers.add(s); 147 ls = theList.size(); 148 s.start(); 149 } 150 } 151 if (logger.isInfoEnabled()) { 152 logger.info("server " + s + " started " + s.isAlive()); 153 } 154 if (ls > 0) { 155 //logger.debug("sending " + ls + " list elements"); 156 synchronized (theList) { 157 Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator(); 158 for (int i = 0; i < ls; i++) { 159 e = it.next(); 160 n = e.getKey(); 161 tc = e.getValue(); 162 //DHTTransport tc = (DHTTransport) o; 163 try { 164 s.sendChannel(tc); 165 } catch (IOException ioe) { 166 // stop s 167 } 168 } 169 } 170 } 171 } 172 } catch (InterruptedException end) { 173 goon = false; 174 Thread.currentThread().interrupt(); 175 } 176 } 177 if (logger.isDebugEnabled()) { 178 logger.debug("listserver " + this + " terminated"); 179 } 180 } 181 182 183 /** 184 * terminate all servers. 185 */ 186 public void terminate() { 187 goon = false; 188 logger.debug("terminating ListServer"); 189 if (cf != null) { 190 cf.terminate(); 191 } 192 int svs = 0; 193 if (servers != null) { 194 synchronized (servers) { 195 svs = servers.size(); 196 Iterator<DHTBroadcaster<K>> it = servers.iterator(); 197 while (it.hasNext()) { 198 DHTBroadcaster<K> br = it.next(); 199 br.closeChannel(); 200 try { 201 int c = 0; 202 while (br.isAlive()) { 203 c++; 204 if (c > 10) { 205 logger.warn("giving up on " + br); 206 break; 207 } 208 //System.out.print("."); 209 br.interrupt(); 210 br.join(100); 211 } 212 if (logger.isDebugEnabled()) { 213 logger.debug("server " + br + " terminated"); 214 } 215 } catch (InterruptedException e) { 216 Thread.currentThread().interrupt(); 217 } 218 } 219 servers.clear(); 220 } 221 logger.info(svs + " broadcasters terminated"); 222 //? servers = null; 223 } 224 logger.debug("DHTBroadcasters terminated"); 225 long enc = DHTTransport.etime - etime; 226 long dec = DHTTransport.dtime - dtime; 227 long encr = DHTTransport.ertime - ertime; 228 long decr = DHTTransport.drtime - drtime; 229 long drest = (encr * dec) / (enc + 1); 230 logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr 231 + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = " 232 + (enc + dec + encr + drest)); // +decr not meaningful 233 if (mythread == null) { 234 return; 235 } 236 try { 237 while (mythread.isAlive()) { 238 //System.out.print("-"); 239 mythread.interrupt(); 240 mythread.join(100); 241 } 242 if (logger.isDebugEnabled()) { 243 logger.debug("server terminated " + mythread); 244 } 245 } catch (InterruptedException e) { 246 Thread.currentThread().interrupt(); 247 } 248 mythread = null; 249 logger.debug("ListServer terminated"); 250 } 251 252 253 /** 254 * number of servers. 255 */ 256 public int size() { 257 synchronized (servers) { 258 return servers.size(); 259 } 260 } 261 262 263 /** 264 * toString. 265 * @return a string representation of this. 266 */ 267 @Override 268 public String toString() { 269 return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")"; 270 } 271 272 } 273 274 275 /** 276 * Thread for broadcasting all incoming objects to the list clients. 277 */ 278 279 class DHTBroadcaster<K> extends Thread /*implements Runnable*/{ 280 281 282 private static final Logger logger = Logger.getLogger(DHTBroadcaster.class); 283 284 285 private final SocketChannel channel; 286 287 288 private final List<DHTBroadcaster<K>> bcaster; 289 290 291 private final SortedMap<K, DHTTransport> theList; 292 293 294 /** 295 * DHTBroadcaster. 296 * @param s SocketChannel to use. 297 * @param bc list of broadcasters. 298 * @param le DHTCounter. 299 * @param sm SortedMap with key value pairs. 300 */ 301 public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) { 302 channel = s; 303 bcaster = bc; 304 theList = sm; 305 } 306 307 308 /** 309 * closeChannel. 310 */ 311 public void closeChannel() { 312 channel.close(); 313 } 314 315 316 /** 317 * sendChannel. 318 * @param tc DHTTransport. 319 * @throws IOException 320 */ 321 public void sendChannel(DHTTransport tc) throws IOException { 322 channel.send(tc); 323 } 324 325 326 /** 327 * broadcast. 328 * @param o DHTTransport element to broadcast. 329 */ 330 @SuppressWarnings("unchecked") 331 public void broadcast(DHTTransport o) { 332 if (logger.isDebugEnabled()) { 333 logger.debug("broadcast = " + o); 334 } 335 DHTTransport<K, Object> tc = null; 336 if (o == null) { 337 return; 338 } 339 //if ( ! (o instanceof DHTTransport) ) { 340 // return; 341 //} 342 tc = (DHTTransport<K, Object>) o; 343 K key = null; 344 synchronized (theList) { 345 //test 346 //Object x = theList.get( tc.key ); 347 //if ( x != null ) { 348 // logger.info("theList duplicate key " + tc.key ); 349 //} 350 try { 351 key = tc.key(); 352 theList.put(key, tc); 353 } catch (IOException e) { 354 logger.warn("IO exception: tc.key() not ok " + tc); 355 e.printStackTrace(); 356 } catch (ClassNotFoundException e) { 357 logger.warn("CNF exception: tc.key() not ok " + tc); 358 e.printStackTrace(); 359 } catch (Exception e) { 360 logger.warn("exception:tc.key() not ok " + tc); 361 e.printStackTrace(); 362 } 363 } 364 logger.info("sending key=" + key + " to " + bcaster.size() + " nodes"); 365 synchronized (bcaster) { 366 Iterator<DHTBroadcaster<K>> it = bcaster.iterator(); 367 while (it.hasNext()) { 368 DHTBroadcaster<K> br = it.next(); 369 try { 370 if (logger.isDebugEnabled()) { 371 logger.debug("bcasting to " + br); 372 } 373 br.sendChannel(tc); 374 } catch (IOException e) { 375 logger.info("bcaster, exception " + e); 376 try { 377 br.closeChannel(); 378 while (br.isAlive()) { 379 br.interrupt(); 380 br.join(100); 381 } 382 } catch (InterruptedException w) { 383 Thread.currentThread().interrupt(); 384 } 385 it.remove( /*br*/); //ConcurrentModificationException 386 logger.debug("bcaster.remove() " + br); 387 } catch (Exception e) { 388 logger.info("bcaster, exception " + e); 389 } 390 } 391 } 392 } 393 394 395 /** 396 * run. 397 */ 398 @Override 399 public void run() { 400 boolean goon = true; 401 while (goon) { 402 try { 403 logger.debug("trying to receive"); 404 Object o = channel.receive(); 405 if (this.isInterrupted()) { 406 break; 407 } 408 if (logger.isDebugEnabled()) { 409 logger.debug("received = " + o); 410 } 411 if (!(o instanceof DHTTransport)) { 412 logger.warn("swallowed: " + o); 413 continue; 414 } 415 DHTTransport tc = (DHTTransport) o; 416 broadcast(tc); 417 if (this.isInterrupted()) { 418 goon = false; 419 } 420 } catch (IOException e) { 421 goon = false; 422 logger.info("receive, IO exception " + e); 423 //e.printStackTrace(); 424 } catch (ClassNotFoundException e) { 425 goon = false; 426 logger.info("receive, CNF exception " + e); 427 e.printStackTrace(); 428 } catch (Exception e) { 429 goon = false; 430 logger.info("receive, exception " + e); 431 e.printStackTrace(); 432 } 433 } 434 if (logger.isDebugEnabled()) { 435 logger.debug("DHTBroadcaster terminated " + this); 436 } 437 channel.close(); 438 } 439 440 441 /** 442 * toString. 443 * @return a string representation of this. 444 */ 445 @Override 446 public String toString() { 447 return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")"; 448 } 449 450 }