001
002 package algo;
003
004 import java.io.IOException;
005 import java.io.Serializable;
006 import java.io.FileNotFoundException;
007 import java.util.ArrayList;
008
009 import comm.ChannelFactory;
010 import comm.ExecutableChannels;
011 import comm.SocketChannel;
012 import comm.RemoteExecutable;
013 import thread.Deque;
014 import util.Logger;
015
016 /**
017 * A parallel algorithm for an euclidean 2d TSP Problem.
018 * @author Heinz Kredel.
019 */
020 public class DistTSP implements TSPInf {
021
022 protected Graph graph;
023 protected Point[] points;
024 protected DistBestStore best;
025
026 protected int number = 0;
027 protected String hostfile = "machines";
028 protected int port = -1;
029 protected String master = "localhost";
030 protected ChannelFactory cf;
031 protected DistRunTSP[] threads;
032
033 private static Logger logger = new Logger(2);
034
035 /**
036 * @param p the cities.
037 * @param th number of processes.
038 */
039 public DistTSP(Point[] p, int th) {
040 this.points = p;
041 graph = new PlaneGraph(p);
042 number = th;
043 threads = null;
044 // distribute code
045 ExecutableChannels ec = null;
046 try {
047 ec = new ExecutableChannels( hostfile );
048 ec.open( number );
049 master = ec.getMasterHost();
050 port = ec.getMasterPort();
051 for ( int i = 0; i < number; i++ ) {
052 ec.send(i, new DistParClientTSP(master,port,points) );
053 }
054 logger.info("code distributed");
055 } catch (FileNotFoundException e) {
056 e.printStackTrace();
057 return;
058 } catch (IOException e) {
059 e.printStackTrace();
060 return;
061 }
062 if ( ec != null ) {
063 ec.close();
064 }
065 cf = new ChannelFactory(port);
066 }
067
068
069 /**
070 * @return the number of iterations.
071 */
072 public long getIterations() {
073 if ( threads == null ) {
074 return 0l;
075 }
076 long iter = 0l;
077 for ( int i = 0; i < threads.length; i++ ) {
078 if ( threads[i] != null ) {
079 iter += threads[i].getIterations();
080 }
081 }
082 return iter;
083 }
084
085 /**
086 * @return the maximal number of iterations.
087 */
088 public long getMaxIterations() {
089 if ( threads == null ) {
090 return Long.MAX_VALUE;
091 }
092 long maxiter = Long.MAX_VALUE;
093 for ( int i = 0; i < threads.length; i++ ) {
094 long mi = threads[i].getMaxIterations();
095 if ( maxiter > mi ) {
096 maxiter = mi;
097 }
098 }
099 return maxiter;
100 }
101
102 /**
103 * @param m the maximal number of iterations.
104 * @return the previous maximal number of iterations.
105 */
106 public long setMaxIterations(long m){
107 if ( threads == null ) {
108 return m;
109 }
110 long maxiter = Long.MAX_VALUE;
111 for ( int i = 0; i < threads.length; i++ ) {
112 long mi = threads[i].setMaxIterations(m);
113 if ( maxiter > mi ) {
114 maxiter = mi;
115 }
116 }
117 return maxiter;
118 }
119
120
121 /**
122 * @return best.getPath().
123 */
124 public Path actualBest() {
125 if ( best == null ) {
126 return null;
127 }
128 return best.getPath();
129 }
130
131 /**
132 * @param b a Path.
133 */
134 public void setBest(Path b) {
135 if ( b == null ) {
136 return;
137 }
138 if ( best == null ) {
139 // best.setPath( b );
140 return;
141 }
142 if ( b.cost() < best.getPath().cost() ) {
143 best.setPath( b );
144 }
145 }
146
147
148 /**
149 * @return getBest(Long.MAX_VALUE).
150 */
151 public Path getBest() {
152 return getBest(Long.MAX_VALUE);
153 }
154
155 /**
156 * @param max maximal number of iterations.
157 * @return bestPath.
158 */
159 public Path getBest(long max) {
160 Path bestPath = PathByteArray.standardPath(graph); // standard path
161 if ( bestPath.length() <= 2 ) {
162 return bestPath;
163 }
164 best = new DistBestStore( bestPath, 0, number );
165 Path path = PathByteArray.onePath(graph); // one/empty path
166 Deque globalStack = new Deque(10000);
167 try {
168 globalStack.push( path );
169 } catch (InterruptedException e) {
170 e.printStackTrace();
171 }
172 threads = new DistRunTSP[number];
173 for ( int i = 0; i < number; i++ ) {
174 SocketChannel sc = null;
175 try {
176 sc = cf.getChannel();
177 //System.out.println("getChannel() = "+sc);
178 } catch (InterruptedException e) {
179 e.printStackTrace();
180 }
181 threads[i] = new DistRunTSP(globalStack,best,max,sc);
182 threads[i].start();
183 }
184 try {
185 for ( int i = 0; i < number; i++ ) {
186 threads[i].join();
187 }
188 } catch (InterruptedException e) {
189 e.printStackTrace();
190 }
191 cf.terminate();
192 return best.getPath();
193 }
194
195 /**
196 * @param path a Path.
197 * @todo is not jet implemented
198 */
199 public void getBest(Path path) {
200 System.out.println("method getBest() in DistTSP not implemented");
201 }
202
203 }
204
205 /**
206 * Storage for best path found so far.
207 */
208 class DistBestStore {
209
210 private Path path;
211 private int idlers;
212 public final int threads;
213
214 /**
215 * @param b a Path.
216 * @param i number of idle processes.
217 * @param t total number of processes.
218 */
219 public DistBestStore(Path b, int i, int t) {
220 path = b;
221 idlers = i;
222 threads = t;
223 }
224
225 /**
226 * @return path.
227 */
228 public Path getPath() {
229 return path;
230 }
231
232 /**
233 * @param p a Path.
234 */
235 public void setPath(Path p) {
236 path = p;
237 }
238
239 public synchronized void incIdle() {
240 idlers++;
241 }
242
243 public synchronized void decIdle() {
244 idlers--;
245 }
246
247 public synchronized /*elsewhere*/ boolean allIdle() {
248 return (idlers == threads);
249 }
250 }
251
252
253 /**
254 * Thread to communicate with distributed processes.
255 */
256 class DistRunTSP extends Thread {
257
258 private DistBestStore best;
259 private Deque globalStack;
260 private long maxIter;
261 private long iter;
262 private long localWork;
263 private long pushes;
264 private long requests;
265 private ArrayList stack;
266 private SocketChannel comm;
267 private static Logger logger = new Logger(2);
268
269 /**
270 * @param s work queue.
271 * @param b current best path.
272 * @param max maximal number of iterations.
273 * @param comm a channel to the remote process.
274 */
275 public DistRunTSP(Deque s,
276 DistBestStore b,
277 long max,
278 SocketChannel comm) {
279 globalStack = s;
280 best = b;
281 maxIter = max;
282 stack = new ArrayList(500);
283 this.comm = comm;
284 }
285
286
287 public void run() {
288 requests = 0;
289 pushes = 0;
290 Object o = null;
291 Object m = null;
292 Path p = null;
293 Path w = null;
294 TransportMessage dtc;
295 while (true) {
296 // protocol: receive/push/send
297 o = null;
298 try {
299 o = comm.receive();
300 requests++;
301 logger.debug("DistRunTSP receive "+comm+" request "+o);
302 } catch (IOException e) {
303 e.printStackTrace();
304 //break;
305 } catch (ClassNotFoundException e) {
306 e.printStackTrace();
307 break;
308 }
309 if ( ! (o instanceof TransportMessage) ) {
310 break;
311 }
312 dtc = (TransportMessage)o;
313 if ( dtc instanceof TransportMessageGetWork ) {
314 //logger.debug("DistRunTSP search work");
315 Object work = null;
316 try {
317 best.incIdle();
318 if ( globalStack.empty() && best.allIdle() ) {
319 logger.debug("pushing nulls");
320 for ( int i = 0; i < best.threads; i++) {
321 globalStack.push(null);
322 }
323 //best.decIdle(); break; return;
324 }
325 //logger.debug("work blocking " + this + " " + comm);
326 work = globalStack.get(); // blocking pop/get
327 best.decIdle();
328 } catch (InterruptedException e) {
329 e.printStackTrace();
330 break;
331 }
332 logger.debug("work of " + globalStack.size()
333 + " found @ " + this + " = "+work);
334 try {
335 if ( work == null ) {
336 //logger.debug("no more Work available");
337 comm.send( (TransportMessage)null );
338 break;
339 }
340 p = (Path)work;
341 dtc = new TransportMessage( best.getPath(),
342 p,
343 maxIter,
344 globalStack.size() );
345 comm.send( dtc );
346 logger.debug("response sendWork = " + dtc);
347 } catch (IOException e) {
348 e.printStackTrace();
349 break;
350 }
351 } else { // normal message
352 p = dtc.best;
353 w = dtc.work;
354 iter = dtc.iter;
355 localWork = dtc.items;
356 if ( w != null ) {
357 logger.debug(" put Part to global Stack "+w);
358 try {
359 globalStack.put( w );
360 pushes++;
361 } catch (InterruptedException e) {
362 e.printStackTrace();
363 break;
364 }
365 }
366 if ( p != null ) {
367 synchronized (best) {
368 if ( p.cost() < best.getPath().cost() ) {
369 best.setPath( p );
370 }
371 }
372 }
373 }
374 }
375 logger.info("requests = " + requests
376 + " iterations = " + iter
377 + " pushes = " + pushes);
378 comm.close();
379 }
380
381 /**
382 * @return iter.
383 */
384 public long getIterations() {
385 if ( comm == null ) {
386 return iter;
387 }
388 try {
389 comm.send( new TransportMessage( best.getPath(),
390 (Path)null,
391 maxIter,
392 globalStack.size()) );
393 } catch (IOException e) {
394 if ( logger.isDebugEnabled() ) {
395 e.printStackTrace();
396 }
397 // maxIter = iter;
398 }
399 return iter;
400 }
401
402 /**
403 * @return maxIter.
404 */
405 public long getMaxIterations() {
406 if ( comm == null ) {
407 return maxIter;
408 }
409 try {
410 comm.send( new TransportMessage( best.getPath(),
411 (Path)null,
412 maxIter,
413 globalStack.size()) );
414 } catch (IOException e) {
415 e.printStackTrace();
416 // maxIter = iter;
417 }
418 return maxIter;
419 }
420
421 /**
422 * @param m new maximal number of iterations.
423 * @return old maxIter.
424 */
425 public long setMaxIterations(long m){
426 long x = maxIter;
427 maxIter = m;
428 if ( comm == null ) {
429 return x;
430 }
431 try {
432 comm.send( new TransportMessage( best.getPath(),
433 (Path)null,
434 maxIter,
435 globalStack.size()) );
436 } catch (IOException e) {
437 e.printStackTrace();
438 // maxIter = iter;
439 }
440 return x;
441 }
442
443 }
444
445
446 /**
447 * Objects of this class are to be send to a ExecutableServer.
448 * i.e. these are the remote processes.
449 */
450
451 class DistParClientTSP implements RemoteExecutable {
452
453 protected String host;
454 protected int port;
455 protected Graph graph;
456 protected Point[] points;
457 protected DistParBestStore best;
458 protected int number = 0;
459 protected ArrayList stack;
460 protected int count;
461 protected int depth;
462 protected long iter;
463 protected long maxIter;
464 protected long globalWork;
465 protected int pushes;
466 protected int runs;
467 protected DistParRemoteCommTSP communi;
468 private static Logger logger = null;
469
470 /**
471 * @param host of master process.
472 * @param port of master process.
473 * @param points the cities.
474 */
475 public DistParClientTSP(String host, int port, Point[] points) {
476 this.host = host;
477 this.port = port;
478 this.points = points;
479 iter = 0;
480 maxIter = Long.MAX_VALUE;
481 pushes = 0;
482 runs = 0;
483 globalWork = 0;
484 }
485
486 /**
487 * @return best.getPath().
488 */
489 public Path actualBest() {
490 return best.getPath();
491 }
492
493 /**
494 * @param b new best path.
495 */
496 public void setBest(Path b) {
497 if ( b == null ) {
498 return;
499 }
500 if ( best == null ) {
501 // best.setPath( b );
502 return;
503 }
504 synchronized ( best ) {
505 if ( b.cost() < best.getPath().cost() ) {
506 best.setPath( b );
507 }
508 }
509 }
510
511
512 public void run() {
513 logger = new Logger(2);
514 graph = new PlaneGraph(points);
515 Path bestPath = PathByteArray.standardPath(graph); // standard path
516 best = new DistParBestStore(bestPath);
517 int ms = bestPath.maxsize();
518 count = ms + 4; //*2 / +5 ?
519 depth = ms - 10; // = 3 ?
520 if ( depth < 3 ) {
521 depth = 3;
522 }
523 stack = new ArrayList(500);
524 globalWork = 0;
525
526 communi = new DistParRemoteCommTSP(host,port,this);
527 communi.start();
528 logger.info("DistParClientTSP:"
529 + " host = " + host
530 + " port = " + port
531 + " best = " + actualBest()
532 );
533
534 //Object o;
535 //Path p;
536 //Path w;
537 try {
538 getParBest(); // blocking
539 } catch (InterruptedException e) {
540 e.printStackTrace();
541 }
542 logger.info("dist client runs = " + runs
543 + " iterations = " + iter);
544 communi.terminate();
545 }
546
547
548 /**
549 * @throws InterruptedException.
550 */
551 public void getParBest() throws InterruptedException {
552 // stack.add( path );
553 Object o;
554 Path p;
555 while ( true ) {
556 if ( stack.size() > 0 ) {
557 synchronized (stack) {
558 o = stack.remove( stack.size()-1 );
559 }
560 if ( o == null ) {
561 break; //return;
562 }
563 p = (Path)o;
564 getBest(p); // blocking
565 } else {
566 if ( communi != null ) {
567 communi.sendGetWork();
568 } else {
569 break;
570 }
571 //logger.debug("getParBest wait()");
572 synchronized (stack) {
573 if ( stack.size() == 0 ) { // recheck hard
574 stack.wait();
575 }
576 }
577 }
578 }
579 logger.debug("getParBest terminated");
580 }
581
582 public void addEndMark() {
583 synchronized ( stack ) {
584 stack.add( null );
585 stack.notify(); // wake up main
586 }
587 }
588
589 /**
590 * @param path a Path.
591 */
592 public void putWork(Path path) {
593 // termination ?
594 if ( path == null ) {
595 return;
596 }
597 synchronized (stack) {
598 //int s = stack.size();
599 runs++;
600 stack.add( path );
601 //if ( s == 0 ) {
602 stack.notify();
603 //}
604 }
605 }
606
607 /**
608 * @return a Path.
609 */
610 public Path getWork() {
611 Path w = null;
612 if ( stack.size() > count
613 && ( /*actualBest().length() <= depth &&*/ globalWork <= 5 ) ) {
614 Object o = null;
615 synchronized (stack) {
616 if ( stack.size() > 0 ) {
617 o = stack.remove( 0 ); //stack.size()-1, depth first
618 }
619 }
620 if ( o != null ) {
621 w = (Path)o;
622 pushes++;
623 }
624 }
625 return w;
626 }
627
628 /**
629 * @param len length of current path.
630 * @return true if work is available, else false.
631 */
632 public boolean isWorkAvailable(int len) {
633 //logger.debug("DistParRemoteCommTSP, isWorkAvailable "+globalWork
634 // + " stack " + stack.size()
635 // + " length " + actualBest().length() );
636 if ( stack.size() > count
637 && ( len <= depth && globalWork <= 5 ) ) {
638 communi.sendPutWork();
639 return true;
640 }
641 return false;
642 }
643
644 /**
645 * @param path some starting path.
646 */
647 public void getBest(Path path) {
648 Path[] ps = path.nextPaths();
649 iter += ps.length;
650 double bestCost = best.getPath().cost();
651 if ( path.length()+1 == path.maxsize() ) {
652 for ( int i = 0; i < ps.length; i++ ) { // branch
653 if ( ps[i].cost() < bestCost ) {
654 synchronized ( best ) {
655 if ( ps[i].cost() < best.getPath().cost() ) {
656 bestCost = ps[i].cost();
657 best.setPath( (Path)ps[i].clone() ); // evtl. error
658 logger.info("dist par best = " + ps[i]);
659 }
660 }
661 }
662 }
663 return;
664 }
665 if ( iter >= maxIter ) {
666 return;
667 }
668 if ( path.length()+5 >= path.maxsize() ) {
669 for ( int i = 0; i < ps.length; i++ ) { // branch
670 if ( ps[i].cost() < bestCost ) { // cut
671 //System.out.print("#");
672 getBestRec( ps[i].copyMax(), path.length()+1 ); // changes best and bestPath
673 //stack.add( ps[i] );
674 }
675 }
676 return;
677 }
678 for ( int i = 0; i < ps.length; i++ ) { // branch
679 if ( ps[i].cost() < bestCost ) { // cut
680 // getBest( ps[i] ); // changes best and bestPath
681 //System.out.print("-");
682 synchronized ( stack ) {
683 stack.add( ps[i] );
684 }
685 }
686 }
687 if ( ps.length > 0 ) {
688 isWorkAvailable( ps[0].length() );
689 }
690 return;
691 }
692
693 /**
694 * @param path some starting path.
695 * @param depth current path length.
696 */
697 public void getBestRec(Path path, int depth) {
698 double bestCost = best.getPath().cost();
699 for ( int k = 0; k < path.maxsize()-depth; k++ ) { // branch
700 Path ps = path.nextPath(k);
701 iter++;
702 if ( depth+1 == path.maxsize() ) {
703 if ( ps.cost() < bestCost ) {
704 synchronized ( best ) {
705 if ( ps.cost() < best.getPath().cost() ) {
706 bestCost = ps.cost();
707 best.setPath( (Path)ps.clone() ); // had error
708 logger.info("dist par best = " + ps);
709 }
710 }
711 }
712 }
713 if ( iter >= maxIter ) {
714 return;
715 }
716 if ( ps.cost() < bestCost ) { // cut
717 getBestRec( ps, depth+1 ); // changes best and bestPath
718 //stack.add( ps[i] );
719 }
720 }
721 return;
722 }
723
724 /**
725 * @return iter.
726 */
727 public long getIterations() {
728 return iter;
729 }
730
731 /**
732 * @return maxIter.
733 */
734 public long getMaxIterations() {
735 return maxIter;
736 }
737
738 /**
739 * @param m new maximal number of iterations.
740 * @return old maxIter.
741 */
742 public long setMaxIterations(long m){
743 long x = maxIter;
744 maxIter = m;
745 return x;
746 }
747 /*
748 public long getGlobalWork() {
749 return globalWork;
750 }
751 */
752
753 /**
754 * @param w number of global work items.
755 */
756 public void setGlobalWork(long w){
757 globalWork = w;
758 return;
759 }
760
761 }
762
763
764 /**
765 * Remote storage for best path found so far.
766 */
767 class DistParBestStore implements Serializable {
768
769 private Path path;
770
771 /**
772 * @param b best path so far.
773 */
774 public DistParBestStore(Path b) {
775 path = b;
776 }
777
778 /**
779 * @return path.
780 */
781 public Path getPath() {
782 return path;
783 }
784
785 /**
786 * @param p new best path so far.
787 */
788 public void setPath(Path p) {
789 path = p;
790 }
791 }
792
793
794 /**
795 * Transport container for communication with the master process.
796 */
797 class TransportMessage implements Serializable {
798 final Path best;
799 final Path work;
800 final long iter; // or maxIter
801 final long items; // work items global / local
802
803 /**
804 * @param b best path.
805 * @param w starting path.
806 * @param i maximal iterations.
807 * @param wi work items global / local.
808 */
809 public TransportMessage(Path b, Path w, long i, long wi) {
810 best = b;
811 work = w;
812 iter = i;
813 items = wi;
814 }
815
816 /**
817 * @param b best path.
818 * @param w starting path.
819 * @param i maximal iterations.
820 * @param wi work items global / local.
821 */
822 public TransportMessage(Object b, Object w, long i, long wi) {
823 this((Path)b,(Path)w,i,wi);
824 }
825
826 public String toString() {
827 return "DTP(" + best + "," + work + "," + iter + "," + items + ")";
828 }
829 }
830
831
832 /**
833 * Transport container for communication with the master process.
834 * Meaning: requests new work.
835 */
836 class TransportMessageGetWork extends TransportMessage {
837
838 public TransportMessageGetWork() {
839 super((Path)null,(Path)null,0l,0l);
840 }
841
842 }
843
844
845 /**
846 * Objects of this class communicate with the master server.
847 */
848 class DistParRemoteCommTSP extends Thread implements Serializable {
849
850 protected String host;
851 protected int port;
852 protected SocketChannel comm = null;
853 protected DistParClientTSP rem;
854 private final static Logger logger = new Logger(2);
855
856
857 /**
858 * @param host of master process.
859 * @param port of master process.
860 * @param rem my distributed process for call back.
861 */
862 public DistParRemoteCommTSP(String host, int port, DistParClientTSP rem) {
863 this.host = host;
864 this.port = port;
865 this.rem = rem;
866
867 ChannelFactory cf = new ChannelFactory();
868 try {
869 comm = cf.getChannel( host, port );
870 } catch (IOException e) {
871 e.printStackTrace();
872 }
873 cf.terminate();
874 }
875
876 public void terminate() {
877 try { // wg isBestPath before comm.close()
878 Thread.sleep(200);
879 } catch (InterruptedException e) {
880 }
881 if ( comm != null ) {
882 comm.close();
883 }
884 try {
885 while ( this.isAlive() ) {
886 //System.out.print(".");
887 this.interrupt();
888 this.join(100);
889 }
890 //System.out.println("TSPModelCommRemote terminated");
891 } catch (InterruptedException e) {
892 }
893 }
894
895 public void run() {
896 if ( comm == null ) {
897 return;
898 }
899 TransportMessage dtc;
900 Object m;
901 Path p;
902 Path w;
903 long iter;
904 long maxIter;
905 long items;
906 int mess = 0;
907 while ( true ) {
908 m = null;
909 try {
910 m = comm.receive();
911 mess++;
912 } catch (IOException e) {
913 if ( logger.isDebugEnabled() ) {
914 e.printStackTrace();
915 }
916 } catch (ClassNotFoundException e) {
917 e.printStackTrace();
918 }
919 logger.debug("DistParRemoteCommTSP, receive m = " + m);
920 dtc = null;
921 p = null;
922 w = null;
923 if ( m instanceof TransportMessage ) {
924 dtc = (TransportMessage)m;
925 rem.setBest( dtc.best );
926 rem.putWork( dtc.work );
927 rem.setMaxIterations( dtc.iter );
928 rem.setGlobalWork( dtc.items );
929 //logger.debug("recieved work = " + dtc.work
930 // + ", path = " + dtc.best);
931 }
932 Object response = null;
933 response = new TransportMessage( rem.actualBest(),
934 (Path)null,
935 rem.getIterations(),
936 rem.stack.size() );
937 try {
938 if ( response != null ) {
939 //logger.debug("response to send = " + response);
940 comm.send( response );
941 }
942 } catch (IOException e) {
943 //e.printStackTrace();
944 break;
945 }
946 }
947 finalBest();
948 logger.info("dist client comm messages = " + mess);
949 // notify worker
950 rem.addEndMark();
951 }
952
953 public void finalBest() {
954 if ( comm == null ) {
955 return;
956 }
957 Object response = null;
958 try {
959 response = new TransportMessage( rem.actualBest(),
960 (Path)null,
961 rem.getIterations(),
962 rem.stack.size() );
963 comm.send( response );
964 } catch (IOException e) {
965 //e.printStackTrace();
966 }
967 }
968
969 public void sendGetWork() {
970 try {
971 logger.debug("sendGetWork " +comm);
972 comm.send( new TransportMessageGetWork() );
973 } catch (IOException e) {
974 e.printStackTrace();
975 }
976 }
977
978 public void sendPutWork() {
979 try {
980 Object r = null;
981 r = new TransportMessage( rem.actualBest(),
982 rem.getWork(),
983 rem.getIterations(),
984 rem.stack.size() );
985 comm.send( r );
986 logger.debug("sendPutPart = "+r);
987 } catch (IOException e) {
988 e.printStackTrace();
989 }
990 }
991
992 }