001 /*
002 * $Id: TaggedSocketChannel.java 3297 2010-08-26 19:09:03Z kredel $
003 */
004
005 package edu.jas.util;
006
007
008 import java.io.IOException;
009 import java.io.Serializable;
010 import java.util.HashMap;
011 import java.util.Map;
012 import java.util.Map.Entry;
013 import java.util.concurrent.BlockingQueue;
014 import java.util.concurrent.LinkedBlockingQueue;
015 import java.util.concurrent.atomic.AtomicInteger;
016
017 import org.apache.log4j.Logger;
018
019
020 /**
021 * TaggedSocketChannel provides a communication channel with message tags for
022 * Java objects using TCP/IP sockets.
023 * @author Heinz Kredel.
024 */
025 public class TaggedSocketChannel extends Thread {
026
027
028 private static final Logger logger = Logger.getLogger(TaggedSocketChannel.class);
029
030
031 private static final boolean debug = logger.isDebugEnabled();
032
033
034 /**
035 * Flag if receiver is running.
036 */
037 private volatile boolean isRunning = false;
038
039
040 /**
041 * End message.
042 */
043 private final static String DONE = "TaggedSocketChannel Done";
044
045
046 /**
047 * Blocked threads count.
048 */
049 private final AtomicInteger blockedCount;
050
051
052 /**
053 * Underlying socket channel.
054 */
055 protected final SocketChannel sc;
056
057
058 /**
059 * Queues for each message tag.
060 */
061 protected final Map<Integer, BlockingQueue> queues;
062
063
064 /**
065 * Constructs a tagged socket channel on the given socket channel s.
066 * @param s A socket channel object.
067 */
068 public TaggedSocketChannel(SocketChannel s) {
069 sc = s;
070 blockedCount = new AtomicInteger(0);
071 queues = new HashMap<Integer, BlockingQueue>();
072 }
073
074
075 /**
076 * thread initialization and start.
077 */
078 public void init() {
079 synchronized (queues) {
080 if ( ! isRunning ) {
081 this.start();
082 isRunning = true;
083 }
084 }
085 logger.info("TaggedSocketChannel at " + sc);
086 }
087
088
089 /**
090 * Get the SocketChannel
091 */
092 public SocketChannel getSocket() {
093 return sc;
094 }
095
096
097 /**
098 * Sends an object.
099 * @param tag message tag
100 * @param v object to send
101 * @throws IOException
102 */
103 public void send(Integer tag, Object v) throws IOException {
104 if (tag == null) {
105 throw new IllegalArgumentException("tag " + tag + " not allowed");
106 }
107 if (v instanceof Exception) {
108 throw new IllegalArgumentException("message " + v + " not allowed");
109 }
110 TaggedMessage tm = new TaggedMessage(tag, v);
111 sc.send(tm);
112 }
113
114
115 /**
116 * Receive an object.
117 * @param tag message tag
118 * @return object received
119 * @throws InterruptedException
120 * @throws IOException
121 * @throws ClassNotFoundException
122 */
123 public Object receive(Integer tag) throws InterruptedException, IOException, ClassNotFoundException {
124 BlockingQueue tq = null;
125 int i = 0;
126 do {
127 synchronized (queues) {
128 tq = queues.get(tag);
129 if (tq == null) {
130 if ( ! isRunning ) { // avoid dead-lock
131 throw new IOException("receiver not running for " + this);
132 }
133 //tq = new LinkedBlockingQueue();
134 //queues.put(tag, tq);
135 try {
136 logger.debug("receive wait, tag = " + tag);
137 i = blockedCount.incrementAndGet();
138 queues.wait();
139 } catch (InterruptedException e) {
140 logger.info("receive wait exception, tag = " + tag + ", blockedCount = " + i);
141 throw e;
142 } finally {
143 i = blockedCount.decrementAndGet();
144 }
145 }
146 }
147 } while ( tq == null );
148 Object v = null;
149 try {
150 i = blockedCount.incrementAndGet();
151 v = tq.take();
152 } finally {
153 i = blockedCount.decrementAndGet();
154 }
155 if ( v instanceof IOException ) {
156 throw (IOException) v;
157 }
158 if ( v instanceof ClassNotFoundException ) {
159 throw (ClassNotFoundException) v;
160 }
161 if ( v instanceof Exception ) {
162 throw new RuntimeException(v.toString());
163 }
164 return v;
165 }
166
167
168 /**
169 * Closes the channel.
170 */
171 public void close() {
172 terminate();
173 }
174
175
176 /**
177 * To string.
178 * @see java.lang.Thread#toString()
179 */
180 @Override
181 public String toString() {
182 return "socketChannel(" + sc + ", tags = " + queues.keySet() + ")";
183 //return "socketChannel(" + sc + ", tags = " + queues.keySet() + ", values = " + queues.values() + ")";
184 }
185
186
187 /**
188 * Number of tags.
189 * @return size of key set.
190 */
191 public int tagSize() {
192 return queues.keySet().size();
193 }
194
195
196 /**
197 * Number of messages.
198 * @return sum of all messages in queues.
199 */
200 public int messages() {
201 int m = 0;
202 synchronized (queues) {
203 for ( BlockingQueue tq : queues.values() ) {
204 m += tq.size();
205 }
206 }
207 return m;
208 }
209
210
211 /**
212 * Run receive() in an infinite loop.
213 * @see java.lang.Thread#run()
214 */
215 @Override
216 public void run() {
217 if (sc == null) {
218 isRunning = false;
219 return; // nothing to do
220 }
221 isRunning = true;
222 while (isRunning) {
223 try {
224 Object r = null;
225 try {
226 logger.debug("waiting for tagged object");
227 r = sc.receive();
228 if (this.isInterrupted()) {
229 //r = new InterruptedException();
230 isRunning = false;
231 }
232 } catch (IOException e) {
233 r = e;
234 } catch (ClassNotFoundException e) {
235 r = e;
236 } catch (Exception e) {
237 r = e;
238 }
239 //logger.debug("Socket = " +s);
240 logger.debug("object recieved");
241 if (r instanceof TaggedMessage) {
242 TaggedMessage tm = (TaggedMessage) r;
243 BlockingQueue tq = null;
244 synchronized (queues) {
245 tq = queues.get(tm.tag);
246 if (tq == null) {
247 tq = new LinkedBlockingQueue();
248 queues.put(tm.tag, tq);
249 queues.notifyAll();
250 }
251 }
252 tq.put(tm.msg);
253 } else if ( r instanceof Exception ){
254 if (debug) {
255 logger.debug("exception " + r);
256 }
257 synchronized (queues) { // deliver to all queues
258 isRunning = false;
259 for ( BlockingQueue q : queues.values() ) {
260 final int bc = blockedCount.get();
261 for ( int i = 0; i <= bc; i++ ) { // one more
262 q.put(r);
263 }
264 if (bc > 0) {
265 logger.debug("put exception to queue, blockedCount = " + bc);
266 }
267 }
268 queues.notifyAll();
269 }
270 //return;
271 } else {
272 if (debug) {
273 logger.debug("no tagged message and no exception " + r);
274 }
275 synchronized (queues) { // deliver to all queues
276 isRunning = false;
277 Exception e;
278 if ( r.equals(DONE) ) {
279 e = new Exception("DONE message");
280 } else {
281 e = new IllegalArgumentException("no tagged message and no exception '" + r + "'");
282 }
283 for ( BlockingQueue q : queues.values() ) {
284 final int bc = blockedCount.get();
285 for ( int i = 0; i <= bc; i++ ) { // one more
286 q.put(e);
287 }
288 if (bc > 0) {
289 logger.debug("put '" + e.toString() + "' to queue, blockedCount = " + bc);
290 }
291 }
292 queues.notifyAll();
293 }
294 if ( r.equals(DONE) ) {
295 logger.info("run terminating by request");
296 try {
297 sc.send(DONE); // terminate other end
298 } catch (IOException e) {
299 logger.warn("send other done failed " + e);
300 }
301 return;
302 }
303 }
304 } catch (InterruptedException e) {
305 // unfug Thread.currentThread().interrupt();
306 //logger.debug("ChannelFactory IE terminating");
307 if (debug) {
308 logger.debug("exception " + e);
309 }
310 synchronized (queues) { // deliver to all queues
311 isRunning = false;
312 for ( BlockingQueue q : queues.values() ) {
313 try {
314 final int bc = blockedCount.get();
315 for ( int i = 0; i <= bc; i++ ) { // one more
316 q.put(e);
317 }
318 if (bc > 0) {
319 logger.debug("put interrupted to queue, blockCount = " + bc);
320 }
321 } catch (InterruptedException ignored) {
322 }
323 }
324 queues.notifyAll();
325 }
326 //return via isRunning
327 }
328 }
329 if (this.isInterrupted()) {
330 Exception e = new InterruptedException("terminating via interrupt");
331 synchronized (queues) { // deliver to all queues
332 for ( BlockingQueue q : queues.values() ) {
333 try {
334 final int bc = blockedCount.get();
335 for ( int i = 0; i <= bc; i++ ) { // one more
336 q.put(e);
337 }
338 if (bc > 0) {
339 logger.debug("put terminating via interrupt to queue, blockCount = " + bc);
340 }
341 } catch (InterruptedException ignored) {
342 }
343 }
344 queues.notifyAll();
345 }
346 }
347 logger.info("run terminated");
348 }
349
350
351 /**
352 * Terminate the TaggedSocketChannel.
353 */
354 public void terminate() {
355 isRunning = false;
356 this.interrupt();
357 if (sc != null) {
358 //sc.close();
359 try {
360 sc.send(DONE);
361 } catch (IOException e) {
362 logger.warn("send done failed " + e);
363 }
364 logger.debug(sc + " not yet closed");
365 }
366 this.interrupt();
367 synchronized(queues) {
368 isRunning = false;
369 for (Entry<Integer, BlockingQueue> tq : queues.entrySet()) {
370 BlockingQueue q = tq.getValue();
371 if (q.size() != 0) {
372 logger.info("queue for tag " + tq.getKey() + " not empty " + q);
373 }
374 int bc = 0;
375 try {
376 bc = blockedCount.get();
377 for ( int i = 0; i <= bc; i++ ) { // one more
378 q.put(new IOException("queue terminate"));
379 }
380 } catch (InterruptedException ignored) {
381 }
382 if ( bc > 0 ) {
383 logger.debug("put IO-end to queue for tag " + tq.getKey() + ", blockCount = " + bc);
384 }
385 }
386 queues.notifyAll();
387 }
388 try {
389 this.join();
390 } catch (InterruptedException e) {
391 // unfug Thread.currentThread().interrupt();
392 }
393 logger.info("terminated");
394 }
395
396 }
397
398
399 /**
400 * TaggedMessage container.
401 * @author kredel
402 *
403 */
404 class TaggedMessage implements Serializable {
405
406
407 public final Integer tag;
408
409
410 public final Object msg;
411
412
413 /**
414 * Constructor.
415 * @param tag message tag
416 * @param msg message object
417 */
418 public TaggedMessage(Integer tag, Object msg) {
419 this.tag = tag;
420 this.msg = msg;
421 }
422
423 }