001 002 package algo; 003 004 import java.io.IOException; 005 import java.io.Serializable; 006 import java.io.FileNotFoundException; 007 import java.util.ArrayList; 008 009 import comm.ChannelFactory; 010 import comm.ExecutableChannels; 011 import comm.SocketChannel; 012 import comm.RemoteExecutable; 013 import thread.Deque; 014 import util.Logger; 015 016 /** 017 * A parallel algorithm for an euclidean 2d TSP Problem. 018 * @author Heinz Kredel. 019 */ 020 public class DistTSP implements TSPInf { 021 022 protected Graph graph; 023 protected Point[] points; 024 protected DistBestStore best; 025 026 protected int number = 0; 027 protected String hostfile = "machines"; 028 protected int port = -1; 029 protected String master = "localhost"; 030 protected ChannelFactory cf; 031 protected DistRunTSP[] threads; 032 033 private static Logger logger = new Logger(2); 034 035 /** 036 * @param p the cities. 037 * @param th number of processes. 038 */ 039 public DistTSP(Point[] p, int th) { 040 this.points = p; 041 graph = new PlaneGraph(p); 042 number = th; 043 threads = null; 044 // distribute code 045 ExecutableChannels ec = null; 046 try { 047 ec = new ExecutableChannels( hostfile ); 048 ec.open( number ); 049 master = ec.getMasterHost(); 050 port = ec.getMasterPort(); 051 for ( int i = 0; i < number; i++ ) { 052 ec.send(i, new DistParClientTSP(master,port,points) ); 053 } 054 logger.info("code distributed"); 055 } catch (FileNotFoundException e) { 056 e.printStackTrace(); 057 return; 058 } catch (IOException e) { 059 e.printStackTrace(); 060 return; 061 } 062 if ( ec != null ) { 063 ec.close(); 064 } 065 cf = new ChannelFactory(port); 066 } 067 068 069 /** 070 * @return the number of iterations. 071 */ 072 public long getIterations() { 073 if ( threads == null ) { 074 return 0l; 075 } 076 long iter = 0l; 077 for ( int i = 0; i < threads.length; i++ ) { 078 if ( threads[i] != null ) { 079 iter += threads[i].getIterations(); 080 } 081 } 082 return iter; 083 } 084 085 /** 086 * @return the maximal number of iterations. 087 */ 088 public long getMaxIterations() { 089 if ( threads == null ) { 090 return Long.MAX_VALUE; 091 } 092 long maxiter = Long.MAX_VALUE; 093 for ( int i = 0; i < threads.length; i++ ) { 094 long mi = threads[i].getMaxIterations(); 095 if ( maxiter > mi ) { 096 maxiter = mi; 097 } 098 } 099 return maxiter; 100 } 101 102 /** 103 * @param m the maximal number of iterations. 104 * @return the previous maximal number of iterations. 105 */ 106 public long setMaxIterations(long m){ 107 if ( threads == null ) { 108 return m; 109 } 110 long maxiter = Long.MAX_VALUE; 111 for ( int i = 0; i < threads.length; i++ ) { 112 long mi = threads[i].setMaxIterations(m); 113 if ( maxiter > mi ) { 114 maxiter = mi; 115 } 116 } 117 return maxiter; 118 } 119 120 121 /** 122 * @return best.getPath(). 123 */ 124 public Path actualBest() { 125 if ( best == null ) { 126 return null; 127 } 128 return best.getPath(); 129 } 130 131 /** 132 * @param b a Path. 133 */ 134 public void setBest(Path b) { 135 if ( b == null ) { 136 return; 137 } 138 if ( best == null ) { 139 // best.setPath( b ); 140 return; 141 } 142 if ( b.cost() < best.getPath().cost() ) { 143 best.setPath( b ); 144 } 145 } 146 147 148 /** 149 * @return getBest(Long.MAX_VALUE). 150 */ 151 public Path getBest() { 152 return getBest(Long.MAX_VALUE); 153 } 154 155 /** 156 * @param max maximal number of iterations. 157 * @return bestPath. 158 */ 159 public Path getBest(long max) { 160 Path bestPath = PathByteArray.standardPath(graph); // standard path 161 if ( bestPath.length() <= 2 ) { 162 return bestPath; 163 } 164 best = new DistBestStore( bestPath, 0, number ); 165 Path path = PathByteArray.onePath(graph); // one/empty path 166 Deque globalStack = new Deque(10000); 167 try { 168 globalStack.push( path ); 169 } catch (InterruptedException e) { 170 e.printStackTrace(); 171 } 172 threads = new DistRunTSP[number]; 173 for ( int i = 0; i < number; i++ ) { 174 SocketChannel sc = null; 175 try { 176 sc = cf.getChannel(); 177 //System.out.println("getChannel() = "+sc); 178 } catch (InterruptedException e) { 179 e.printStackTrace(); 180 } 181 threads[i] = new DistRunTSP(globalStack,best,max,sc); 182 threads[i].start(); 183 } 184 try { 185 for ( int i = 0; i < number; i++ ) { 186 threads[i].join(); 187 } 188 } catch (InterruptedException e) { 189 e.printStackTrace(); 190 } 191 cf.terminate(); 192 return best.getPath(); 193 } 194 195 /** 196 * @param path a Path. 197 * @todo is not jet implemented 198 */ 199 public void getBest(Path path) { 200 System.out.println("method getBest() in DistTSP not implemented"); 201 } 202 203 } 204 205 /** 206 * Storage for best path found so far. 207 */ 208 class DistBestStore { 209 210 private Path path; 211 private int idlers; 212 public final int threads; 213 214 /** 215 * @param b a Path. 216 * @param i number of idle processes. 217 * @param t total number of processes. 218 */ 219 public DistBestStore(Path b, int i, int t) { 220 path = b; 221 idlers = i; 222 threads = t; 223 } 224 225 /** 226 * @return path. 227 */ 228 public Path getPath() { 229 return path; 230 } 231 232 /** 233 * @param p a Path. 234 */ 235 public void setPath(Path p) { 236 path = p; 237 } 238 239 public synchronized void incIdle() { 240 idlers++; 241 } 242 243 public synchronized void decIdle() { 244 idlers--; 245 } 246 247 public synchronized /*elsewhere*/ boolean allIdle() { 248 return (idlers == threads); 249 } 250 } 251 252 253 /** 254 * Thread to communicate with distributed processes. 255 */ 256 class DistRunTSP extends Thread { 257 258 private DistBestStore best; 259 private Deque globalStack; 260 private long maxIter; 261 private long iter; 262 private long localWork; 263 private long pushes; 264 private long requests; 265 private ArrayList stack; 266 private SocketChannel comm; 267 private static Logger logger = new Logger(2); 268 269 /** 270 * @param s work queue. 271 * @param b current best path. 272 * @param max maximal number of iterations. 273 * @param comm a channel to the remote process. 274 */ 275 public DistRunTSP(Deque s, 276 DistBestStore b, 277 long max, 278 SocketChannel comm) { 279 globalStack = s; 280 best = b; 281 maxIter = max; 282 stack = new ArrayList(500); 283 this.comm = comm; 284 } 285 286 287 public void run() { 288 requests = 0; 289 pushes = 0; 290 Object o = null; 291 Object m = null; 292 Path p = null; 293 Path w = null; 294 TransportMessage dtc; 295 while (true) { 296 // protocol: receive/push/send 297 o = null; 298 try { 299 o = comm.receive(); 300 requests++; 301 logger.debug("DistRunTSP receive "+comm+" request "+o); 302 } catch (IOException e) { 303 e.printStackTrace(); 304 //break; 305 } catch (ClassNotFoundException e) { 306 e.printStackTrace(); 307 break; 308 } 309 if ( ! (o instanceof TransportMessage) ) { 310 break; 311 } 312 dtc = (TransportMessage)o; 313 if ( dtc instanceof TransportMessageGetWork ) { 314 //logger.debug("DistRunTSP search work"); 315 Object work = null; 316 try { 317 best.incIdle(); 318 if ( globalStack.empty() && best.allIdle() ) { 319 logger.debug("pushing nulls"); 320 for ( int i = 0; i < best.threads; i++) { 321 globalStack.push(null); 322 } 323 //best.decIdle(); break; return; 324 } 325 //logger.debug("work blocking " + this + " " + comm); 326 work = globalStack.get(); // blocking pop/get 327 best.decIdle(); 328 } catch (InterruptedException e) { 329 e.printStackTrace(); 330 break; 331 } 332 logger.debug("work of " + globalStack.size() 333 + " found @ " + this + " = "+work); 334 try { 335 if ( work == null ) { 336 //logger.debug("no more Work available"); 337 comm.send( (TransportMessage)null ); 338 break; 339 } 340 p = (Path)work; 341 dtc = new TransportMessage( best.getPath(), 342 p, 343 maxIter, 344 globalStack.size() ); 345 comm.send( dtc ); 346 logger.debug("response sendWork = " + dtc); 347 } catch (IOException e) { 348 e.printStackTrace(); 349 break; 350 } 351 } else { // normal message 352 p = dtc.best; 353 w = dtc.work; 354 iter = dtc.iter; 355 localWork = dtc.items; 356 if ( w != null ) { 357 logger.debug(" put Part to global Stack "+w); 358 try { 359 globalStack.put( w ); 360 pushes++; 361 } catch (InterruptedException e) { 362 e.printStackTrace(); 363 break; 364 } 365 } 366 if ( p != null ) { 367 synchronized (best) { 368 if ( p.cost() < best.getPath().cost() ) { 369 best.setPath( p ); 370 } 371 } 372 } 373 } 374 } 375 logger.info("requests = " + requests 376 + " iterations = " + iter 377 + " pushes = " + pushes); 378 comm.close(); 379 } 380 381 /** 382 * @return iter. 383 */ 384 public long getIterations() { 385 if ( comm == null ) { 386 return iter; 387 } 388 try { 389 comm.send( new TransportMessage( best.getPath(), 390 (Path)null, 391 maxIter, 392 globalStack.size()) ); 393 } catch (IOException e) { 394 if ( logger.isDebugEnabled() ) { 395 e.printStackTrace(); 396 } 397 // maxIter = iter; 398 } 399 return iter; 400 } 401 402 /** 403 * @return maxIter. 404 */ 405 public long getMaxIterations() { 406 if ( comm == null ) { 407 return maxIter; 408 } 409 try { 410 comm.send( new TransportMessage( best.getPath(), 411 (Path)null, 412 maxIter, 413 globalStack.size()) ); 414 } catch (IOException e) { 415 e.printStackTrace(); 416 // maxIter = iter; 417 } 418 return maxIter; 419 } 420 421 /** 422 * @param m new maximal number of iterations. 423 * @return old maxIter. 424 */ 425 public long setMaxIterations(long m){ 426 long x = maxIter; 427 maxIter = m; 428 if ( comm == null ) { 429 return x; 430 } 431 try { 432 comm.send( new TransportMessage( best.getPath(), 433 (Path)null, 434 maxIter, 435 globalStack.size()) ); 436 } catch (IOException e) { 437 e.printStackTrace(); 438 // maxIter = iter; 439 } 440 return x; 441 } 442 443 } 444 445 446 /** 447 * Objects of this class are to be send to a ExecutableServer. 448 * i.e. these are the remote processes. 449 */ 450 451 class DistParClientTSP implements RemoteExecutable { 452 453 protected String host; 454 protected int port; 455 protected Graph graph; 456 protected Point[] points; 457 protected DistParBestStore best; 458 protected int number = 0; 459 protected ArrayList stack; 460 protected int count; 461 protected int depth; 462 protected long iter; 463 protected long maxIter; 464 protected long globalWork; 465 protected int pushes; 466 protected int runs; 467 protected DistParRemoteCommTSP communi; 468 private static Logger logger = null; 469 470 /** 471 * @param host of master process. 472 * @param port of master process. 473 * @param points the cities. 474 */ 475 public DistParClientTSP(String host, int port, Point[] points) { 476 this.host = host; 477 this.port = port; 478 this.points = points; 479 iter = 0; 480 maxIter = Long.MAX_VALUE; 481 pushes = 0; 482 runs = 0; 483 globalWork = 0; 484 } 485 486 /** 487 * @return best.getPath(). 488 */ 489 public Path actualBest() { 490 return best.getPath(); 491 } 492 493 /** 494 * @param b new best path. 495 */ 496 public void setBest(Path b) { 497 if ( b == null ) { 498 return; 499 } 500 if ( best == null ) { 501 // best.setPath( b ); 502 return; 503 } 504 synchronized ( best ) { 505 if ( b.cost() < best.getPath().cost() ) { 506 best.setPath( b ); 507 } 508 } 509 } 510 511 512 public void run() { 513 logger = new Logger(2); 514 graph = new PlaneGraph(points); 515 Path bestPath = PathByteArray.standardPath(graph); // standard path 516 best = new DistParBestStore(bestPath); 517 int ms = bestPath.maxsize(); 518 count = ms + 4; //*2 / +5 ? 519 depth = ms - 10; // = 3 ? 520 if ( depth < 3 ) { 521 depth = 3; 522 } 523 stack = new ArrayList(500); 524 globalWork = 0; 525 526 communi = new DistParRemoteCommTSP(host,port,this); 527 communi.start(); 528 logger.info("DistParClientTSP:" 529 + " host = " + host 530 + " port = " + port 531 + " best = " + actualBest() 532 ); 533 534 //Object o; 535 //Path p; 536 //Path w; 537 try { 538 getParBest(); // blocking 539 } catch (InterruptedException e) { 540 e.printStackTrace(); 541 } 542 logger.info("dist client runs = " + runs 543 + " iterations = " + iter); 544 communi.terminate(); 545 } 546 547 548 /** 549 * @throws InterruptedException. 550 */ 551 public void getParBest() throws InterruptedException { 552 // stack.add( path ); 553 Object o; 554 Path p; 555 while ( true ) { 556 if ( stack.size() > 0 ) { 557 synchronized (stack) { 558 o = stack.remove( stack.size()-1 ); 559 } 560 if ( o == null ) { 561 break; //return; 562 } 563 p = (Path)o; 564 getBest(p); // blocking 565 } else { 566 if ( communi != null ) { 567 communi.sendGetWork(); 568 } else { 569 break; 570 } 571 //logger.debug("getParBest wait()"); 572 synchronized (stack) { 573 if ( stack.size() == 0 ) { // recheck hard 574 stack.wait(); 575 } 576 } 577 } 578 } 579 logger.debug("getParBest terminated"); 580 } 581 582 public void addEndMark() { 583 synchronized ( stack ) { 584 stack.add( null ); 585 stack.notify(); // wake up main 586 } 587 } 588 589 /** 590 * @param path a Path. 591 */ 592 public void putWork(Path path) { 593 // termination ? 594 if ( path == null ) { 595 return; 596 } 597 synchronized (stack) { 598 //int s = stack.size(); 599 runs++; 600 stack.add( path ); 601 //if ( s == 0 ) { 602 stack.notify(); 603 //} 604 } 605 } 606 607 /** 608 * @return a Path. 609 */ 610 public Path getWork() { 611 Path w = null; 612 if ( stack.size() > count 613 && ( /*actualBest().length() <= depth &&*/ globalWork <= 5 ) ) { 614 Object o = null; 615 synchronized (stack) { 616 if ( stack.size() > 0 ) { 617 o = stack.remove( 0 ); //stack.size()-1, depth first 618 } 619 } 620 if ( o != null ) { 621 w = (Path)o; 622 pushes++; 623 } 624 } 625 return w; 626 } 627 628 /** 629 * @param len length of current path. 630 * @return true if work is available, else false. 631 */ 632 public boolean isWorkAvailable(int len) { 633 //logger.debug("DistParRemoteCommTSP, isWorkAvailable "+globalWork 634 // + " stack " + stack.size() 635 // + " length " + actualBest().length() ); 636 if ( stack.size() > count 637 && ( len <= depth && globalWork <= 5 ) ) { 638 communi.sendPutWork(); 639 return true; 640 } 641 return false; 642 } 643 644 /** 645 * @param path some starting path. 646 */ 647 public void getBest(Path path) { 648 Path[] ps = path.nextPaths(); 649 iter += ps.length; 650 double bestCost = best.getPath().cost(); 651 if ( path.length()+1 == path.maxsize() ) { 652 for ( int i = 0; i < ps.length; i++ ) { // branch 653 if ( ps[i].cost() < bestCost ) { 654 synchronized ( best ) { 655 if ( ps[i].cost() < best.getPath().cost() ) { 656 bestCost = ps[i].cost(); 657 best.setPath( (Path)ps[i].clone() ); // evtl. error 658 logger.info("dist par best = " + ps[i]); 659 } 660 } 661 } 662 } 663 return; 664 } 665 if ( iter >= maxIter ) { 666 return; 667 } 668 if ( path.length()+5 >= path.maxsize() ) { 669 for ( int i = 0; i < ps.length; i++ ) { // branch 670 if ( ps[i].cost() < bestCost ) { // cut 671 //System.out.print("#"); 672 getBestRec( ps[i].copyMax(), path.length()+1 ); // changes best and bestPath 673 //stack.add( ps[i] ); 674 } 675 } 676 return; 677 } 678 for ( int i = 0; i < ps.length; i++ ) { // branch 679 if ( ps[i].cost() < bestCost ) { // cut 680 // getBest( ps[i] ); // changes best and bestPath 681 //System.out.print("-"); 682 synchronized ( stack ) { 683 stack.add( ps[i] ); 684 } 685 } 686 } 687 if ( ps.length > 0 ) { 688 isWorkAvailable( ps[0].length() ); 689 } 690 return; 691 } 692 693 /** 694 * @param path some starting path. 695 * @param depth current path length. 696 */ 697 public void getBestRec(Path path, int depth) { 698 double bestCost = best.getPath().cost(); 699 for ( int k = 0; k < path.maxsize()-depth; k++ ) { // branch 700 Path ps = path.nextPath(k); 701 iter++; 702 if ( depth+1 == path.maxsize() ) { 703 if ( ps.cost() < bestCost ) { 704 synchronized ( best ) { 705 if ( ps.cost() < best.getPath().cost() ) { 706 bestCost = ps.cost(); 707 best.setPath( (Path)ps.clone() ); // had error 708 logger.info("dist par best = " + ps); 709 } 710 } 711 } 712 } 713 if ( iter >= maxIter ) { 714 return; 715 } 716 if ( ps.cost() < bestCost ) { // cut 717 getBestRec( ps, depth+1 ); // changes best and bestPath 718 //stack.add( ps[i] ); 719 } 720 } 721 return; 722 } 723 724 /** 725 * @return iter. 726 */ 727 public long getIterations() { 728 return iter; 729 } 730 731 /** 732 * @return maxIter. 733 */ 734 public long getMaxIterations() { 735 return maxIter; 736 } 737 738 /** 739 * @param m new maximal number of iterations. 740 * @return old maxIter. 741 */ 742 public long setMaxIterations(long m){ 743 long x = maxIter; 744 maxIter = m; 745 return x; 746 } 747 /* 748 public long getGlobalWork() { 749 return globalWork; 750 } 751 */ 752 753 /** 754 * @param w number of global work items. 755 */ 756 public void setGlobalWork(long w){ 757 globalWork = w; 758 return; 759 } 760 761 } 762 763 764 /** 765 * Remote storage for best path found so far. 766 */ 767 class DistParBestStore implements Serializable { 768 769 private Path path; 770 771 /** 772 * @param b best path so far. 773 */ 774 public DistParBestStore(Path b) { 775 path = b; 776 } 777 778 /** 779 * @return path. 780 */ 781 public Path getPath() { 782 return path; 783 } 784 785 /** 786 * @param p new best path so far. 787 */ 788 public void setPath(Path p) { 789 path = p; 790 } 791 } 792 793 794 /** 795 * Transport container for communication with the master process. 796 */ 797 class TransportMessage implements Serializable { 798 final Path best; 799 final Path work; 800 final long iter; // or maxIter 801 final long items; // work items global / local 802 803 /** 804 * @param b best path. 805 * @param w starting path. 806 * @param i maximal iterations. 807 * @param wi work items global / local. 808 */ 809 public TransportMessage(Path b, Path w, long i, long wi) { 810 best = b; 811 work = w; 812 iter = i; 813 items = wi; 814 } 815 816 /** 817 * @param b best path. 818 * @param w starting path. 819 * @param i maximal iterations. 820 * @param wi work items global / local. 821 */ 822 public TransportMessage(Object b, Object w, long i, long wi) { 823 this((Path)b,(Path)w,i,wi); 824 } 825 826 public String toString() { 827 return "DTP(" + best + "," + work + "," + iter + "," + items + ")"; 828 } 829 } 830 831 832 /** 833 * Transport container for communication with the master process. 834 * Meaning: requests new work. 835 */ 836 class TransportMessageGetWork extends TransportMessage { 837 838 public TransportMessageGetWork() { 839 super((Path)null,(Path)null,0l,0l); 840 } 841 842 } 843 844 845 /** 846 * Objects of this class communicate with the master server. 847 */ 848 class DistParRemoteCommTSP extends Thread implements Serializable { 849 850 protected String host; 851 protected int port; 852 protected SocketChannel comm = null; 853 protected DistParClientTSP rem; 854 private final static Logger logger = new Logger(2); 855 856 857 /** 858 * @param host of master process. 859 * @param port of master process. 860 * @param rem my distributed process for call back. 861 */ 862 public DistParRemoteCommTSP(String host, int port, DistParClientTSP rem) { 863 this.host = host; 864 this.port = port; 865 this.rem = rem; 866 867 ChannelFactory cf = new ChannelFactory(); 868 try { 869 comm = cf.getChannel( host, port ); 870 } catch (IOException e) { 871 e.printStackTrace(); 872 } 873 cf.terminate(); 874 } 875 876 public void terminate() { 877 try { // wg isBestPath before comm.close() 878 Thread.sleep(200); 879 } catch (InterruptedException e) { 880 } 881 if ( comm != null ) { 882 comm.close(); 883 } 884 try { 885 while ( this.isAlive() ) { 886 //System.out.print("."); 887 this.interrupt(); 888 this.join(100); 889 } 890 //System.out.println("TSPModelCommRemote terminated"); 891 } catch (InterruptedException e) { 892 } 893 } 894 895 public void run() { 896 if ( comm == null ) { 897 return; 898 } 899 TransportMessage dtc; 900 Object m; 901 Path p; 902 Path w; 903 long iter; 904 long maxIter; 905 long items; 906 int mess = 0; 907 while ( true ) { 908 m = null; 909 try { 910 m = comm.receive(); 911 mess++; 912 } catch (IOException e) { 913 if ( logger.isDebugEnabled() ) { 914 e.printStackTrace(); 915 } 916 } catch (ClassNotFoundException e) { 917 e.printStackTrace(); 918 } 919 logger.debug("DistParRemoteCommTSP, receive m = " + m); 920 dtc = null; 921 p = null; 922 w = null; 923 if ( m instanceof TransportMessage ) { 924 dtc = (TransportMessage)m; 925 rem.setBest( dtc.best ); 926 rem.putWork( dtc.work ); 927 rem.setMaxIterations( dtc.iter ); 928 rem.setGlobalWork( dtc.items ); 929 //logger.debug("recieved work = " + dtc.work 930 // + ", path = " + dtc.best); 931 } 932 Object response = null; 933 response = new TransportMessage( rem.actualBest(), 934 (Path)null, 935 rem.getIterations(), 936 rem.stack.size() ); 937 try { 938 if ( response != null ) { 939 //logger.debug("response to send = " + response); 940 comm.send( response ); 941 } 942 } catch (IOException e) { 943 //e.printStackTrace(); 944 break; 945 } 946 } 947 finalBest(); 948 logger.info("dist client comm messages = " + mess); 949 // notify worker 950 rem.addEndMark(); 951 } 952 953 public void finalBest() { 954 if ( comm == null ) { 955 return; 956 } 957 Object response = null; 958 try { 959 response = new TransportMessage( rem.actualBest(), 960 (Path)null, 961 rem.getIterations(), 962 rem.stack.size() ); 963 comm.send( response ); 964 } catch (IOException e) { 965 //e.printStackTrace(); 966 } 967 } 968 969 public void sendGetWork() { 970 try { 971 logger.debug("sendGetWork " +comm); 972 comm.send( new TransportMessageGetWork() ); 973 } catch (IOException e) { 974 e.printStackTrace(); 975 } 976 } 977 978 public void sendPutWork() { 979 try { 980 Object r = null; 981 r = new TransportMessage( rem.actualBest(), 982 rem.getWork(), 983 rem.getIterations(), 984 rem.stack.size() ); 985 comm.send( r ); 986 logger.debug("sendPutPart = "+r); 987 } catch (IOException e) { 988 e.printStackTrace(); 989 } 990 } 991 992 }