001 /*
002 * $Id: DistHashTableServer.java 3296 2010-08-26 17:30:55Z kredel $
003 */
004
005 package edu.jas.util;
006
007
008 import java.io.IOException;
009 import java.util.ArrayList;
010 import java.util.Iterator;
011 import java.util.List;
012 import java.util.SortedMap;
013 import java.util.TreeMap;
014 import java.util.Map.Entry;
015
016 import org.apache.log4j.Logger;
017
018
019 /**
020 * Server for the distributed version of a list.
021 * @author Heinz Kredel
022 * @todo redistribute list for late comming clients, removal of elements.
023 */
024
025 public class DistHashTableServer<K> extends Thread {
026
027
028 private static final Logger logger = Logger.getLogger(DistHashTableServer.class);
029
030
031 public final static int DEFAULT_PORT = 9009; //ChannelFactory.DEFAULT_PORT + 99;
032
033
034 protected final ChannelFactory cf;
035
036
037 protected List<DHTBroadcaster<K>> servers;
038
039
040 private boolean goon = true;
041
042
043 private Thread mythread = null;
044
045
046 protected final SortedMap<K, DHTTransport> theList;
047
048
049 private long etime;
050
051
052 private long dtime;
053
054
055 private long ertime;
056
057
058 private long drtime;
059
060
061 /**
062 * Constructs a new DistHashTableServer.
063 */
064 public DistHashTableServer() {
065 this(DEFAULT_PORT);
066 }
067
068
069 /**
070 * DistHashTableServer.
071 * @param port to run server on.
072 */
073 public DistHashTableServer(int port) {
074 this(new ChannelFactory(port));
075 }
076
077
078 /**
079 * DistHashTableServer.
080 * @param cf ChannelFactory to use.
081 */
082 public DistHashTableServer(ChannelFactory cf) {
083 this.cf = cf;
084 cf.init();
085 servers = new ArrayList<DHTBroadcaster<K>>();
086 theList = new TreeMap<K, DHTTransport>();
087 etime = DHTTransport.etime;
088 dtime = DHTTransport.dtime;
089 ertime = DHTTransport.ertime;
090 drtime = DHTTransport.drtime;
091 }
092
093
094 /**
095 * main. Usage: DistHashTableServer <port>
096 */
097 public static void main(String[] args) {
098 int port = DEFAULT_PORT;
099 if (args.length < 1) {
100 System.out.println("Usage: DistHashTableServer <port>");
101 } else {
102 try {
103 port = Integer.parseInt(args[0]);
104 } catch (NumberFormatException e) {
105 }
106 }
107 (new DistHashTableServer/*raw: <K>*/(port)).run();
108 // until CRTL-C
109 }
110
111
112 /**
113 * thread initialization and start.
114 */
115 public void init() {
116 this.start();
117 }
118
119
120 /**
121 * main server method.
122 */
123 @Override
124 public void run() {
125 SocketChannel channel = null;
126 DHTBroadcaster<K> s = null;
127 mythread = Thread.currentThread();
128 Entry<K, DHTTransport> e;
129 K n;
130 DHTTransport tc;
131 while (goon) {
132 //logger.debug("list server " + this + " go on");
133 try {
134 channel = cf.getChannel();
135 if (logger.isDebugEnabled()) {
136 logger.debug("dls channel = " + channel);
137 }
138 if (mythread.isInterrupted()) {
139 goon = false;
140 //logger.info("list server " + this + " interrupted");
141 } else {
142 s = new DHTBroadcaster<K>(channel, servers,/*listElem,*/theList);
143 int ls = 0;
144 synchronized (servers) {
145 if (goon) {
146 servers.add(s);
147 ls = theList.size();
148 s.start();
149 }
150 }
151 if (logger.isInfoEnabled()) {
152 logger.info("server " + s + " started " + s.isAlive());
153 }
154 if (ls > 0) {
155 //logger.debug("sending " + ls + " list elements");
156 synchronized (theList) {
157 Iterator<Entry<K, DHTTransport>> it = theList.entrySet().iterator();
158 for (int i = 0; i < ls; i++) {
159 e = it.next();
160 n = e.getKey();
161 tc = e.getValue();
162 //DHTTransport tc = (DHTTransport) o;
163 try {
164 s.sendChannel(tc);
165 } catch (IOException ioe) {
166 // stop s
167 }
168 }
169 }
170 }
171 }
172 } catch (InterruptedException end) {
173 goon = false;
174 Thread.currentThread().interrupt();
175 }
176 }
177 if (logger.isDebugEnabled()) {
178 logger.debug("listserver " + this + " terminated");
179 }
180 }
181
182
183 /**
184 * terminate all servers.
185 */
186 public void terminate() {
187 goon = false;
188 logger.debug("terminating ListServer");
189 if (cf != null) {
190 cf.terminate();
191 }
192 int svs = 0;
193 if (servers != null) {
194 synchronized (servers) {
195 svs = servers.size();
196 Iterator<DHTBroadcaster<K>> it = servers.iterator();
197 while (it.hasNext()) {
198 DHTBroadcaster<K> br = it.next();
199 br.closeChannel();
200 try {
201 int c = 0;
202 while (br.isAlive()) {
203 c++;
204 if (c > 10) {
205 logger.warn("giving up on " + br);
206 break;
207 }
208 //System.out.print(".");
209 br.interrupt();
210 br.join(100);
211 }
212 if (logger.isDebugEnabled()) {
213 logger.debug("server " + br + " terminated");
214 }
215 } catch (InterruptedException e) {
216 Thread.currentThread().interrupt();
217 }
218 }
219 servers.clear();
220 }
221 logger.info(svs + " broadcasters terminated");
222 //? servers = null;
223 }
224 logger.debug("DHTBroadcasters terminated");
225 long enc = DHTTransport.etime - etime;
226 long dec = DHTTransport.dtime - dtime;
227 long encr = DHTTransport.ertime - ertime;
228 long decr = DHTTransport.drtime - drtime;
229 long drest = (encr * dec) / (enc + 1);
230 logger.info("DHT time: encode = " + enc + ", decode = " + dec + ", enc raw = " + encr
231 + ", dec raw wait = " + decr + ", dec raw est = " + drest + ", sum est = "
232 + (enc + dec + encr + drest)); // +decr not meaningful
233 if (mythread == null) {
234 return;
235 }
236 try {
237 while (mythread.isAlive()) {
238 //System.out.print("-");
239 mythread.interrupt();
240 mythread.join(100);
241 }
242 if (logger.isDebugEnabled()) {
243 logger.debug("server terminated " + mythread);
244 }
245 } catch (InterruptedException e) {
246 Thread.currentThread().interrupt();
247 }
248 mythread = null;
249 logger.debug("ListServer terminated");
250 }
251
252
253 /**
254 * number of servers.
255 */
256 public int size() {
257 synchronized (servers) {
258 return servers.size();
259 }
260 }
261
262
263 /**
264 * toString.
265 * @return a string representation of this.
266 */
267 @Override
268 public String toString() {
269 return "DHTServer(" + servers.size() + ", " + cf + ", " + super.toString() + ")";
270 }
271
272 }
273
274
275 /**
276 * Thread for broadcasting all incoming objects to the list clients.
277 */
278
279 class DHTBroadcaster<K> extends Thread /*implements Runnable*/{
280
281
282 private static final Logger logger = Logger.getLogger(DHTBroadcaster.class);
283
284
285 private final SocketChannel channel;
286
287
288 private final List<DHTBroadcaster<K>> bcaster;
289
290
291 private final SortedMap<K, DHTTransport> theList;
292
293
294 /**
295 * DHTBroadcaster.
296 * @param s SocketChannel to use.
297 * @param bc list of broadcasters.
298 * @param le DHTCounter.
299 * @param sm SortedMap with key value pairs.
300 */
301 public DHTBroadcaster(SocketChannel s, List<DHTBroadcaster<K>> bc, SortedMap<K, DHTTransport> sm) {
302 channel = s;
303 bcaster = bc;
304 theList = sm;
305 }
306
307
308 /**
309 * closeChannel.
310 */
311 public void closeChannel() {
312 channel.close();
313 }
314
315
316 /**
317 * sendChannel.
318 * @param tc DHTTransport.
319 * @throws IOException
320 */
321 public void sendChannel(DHTTransport tc) throws IOException {
322 channel.send(tc);
323 }
324
325
326 /**
327 * broadcast.
328 * @param o DHTTransport element to broadcast.
329 */
330 @SuppressWarnings("unchecked")
331 public void broadcast(DHTTransport o) {
332 if (logger.isDebugEnabled()) {
333 logger.debug("broadcast = " + o);
334 }
335 DHTTransport<K, Object> tc = null;
336 if (o == null) {
337 return;
338 }
339 //if ( ! (o instanceof DHTTransport) ) {
340 // return;
341 //}
342 tc = (DHTTransport<K, Object>) o;
343 K key = null;
344 synchronized (theList) {
345 //test
346 //Object x = theList.get( tc.key );
347 //if ( x != null ) {
348 // logger.info("theList duplicate key " + tc.key );
349 //}
350 try {
351 key = tc.key();
352 theList.put(key, tc);
353 } catch (IOException e) {
354 logger.warn("IO exception: tc.key() not ok " + tc);
355 e.printStackTrace();
356 } catch (ClassNotFoundException e) {
357 logger.warn("CNF exception: tc.key() not ok " + tc);
358 e.printStackTrace();
359 } catch (Exception e) {
360 logger.warn("exception:tc.key() not ok " + tc);
361 e.printStackTrace();
362 }
363 }
364 logger.info("sending key=" + key + " to " + bcaster.size() + " nodes");
365 synchronized (bcaster) {
366 Iterator<DHTBroadcaster<K>> it = bcaster.iterator();
367 while (it.hasNext()) {
368 DHTBroadcaster<K> br = it.next();
369 try {
370 if (logger.isDebugEnabled()) {
371 logger.debug("bcasting to " + br);
372 }
373 br.sendChannel(tc);
374 } catch (IOException e) {
375 logger.info("bcaster, exception " + e);
376 try {
377 br.closeChannel();
378 while (br.isAlive()) {
379 br.interrupt();
380 br.join(100);
381 }
382 } catch (InterruptedException w) {
383 Thread.currentThread().interrupt();
384 }
385 it.remove( /*br*/); //ConcurrentModificationException
386 logger.debug("bcaster.remove() " + br);
387 } catch (Exception e) {
388 logger.info("bcaster, exception " + e);
389 }
390 }
391 }
392 }
393
394
395 /**
396 * run.
397 */
398 @Override
399 public void run() {
400 boolean goon = true;
401 while (goon) {
402 try {
403 logger.debug("trying to receive");
404 Object o = channel.receive();
405 if (this.isInterrupted()) {
406 break;
407 }
408 if (logger.isDebugEnabled()) {
409 logger.debug("received = " + o);
410 }
411 if (!(o instanceof DHTTransport)) {
412 logger.warn("swallowed: " + o);
413 continue;
414 }
415 DHTTransport tc = (DHTTransport) o;
416 broadcast(tc);
417 if (this.isInterrupted()) {
418 goon = false;
419 }
420 } catch (IOException e) {
421 goon = false;
422 logger.info("receive, IO exception " + e);
423 //e.printStackTrace();
424 } catch (ClassNotFoundException e) {
425 goon = false;
426 logger.info("receive, CNF exception " + e);
427 e.printStackTrace();
428 } catch (Exception e) {
429 goon = false;
430 logger.info("receive, exception " + e);
431 e.printStackTrace();
432 }
433 }
434 if (logger.isDebugEnabled()) {
435 logger.debug("DHTBroadcaster terminated " + this);
436 }
437 channel.close();
438 }
439
440
441 /**
442 * toString.
443 * @return a string representation of this.
444 */
445 @Override
446 public String toString() {
447 return "DHTBroadcaster(" + channel + "," + bcaster.size() + ")";
448 }
449
450 }