8.2 Kanäle, Senden und Empfangen

Wir besprechen zunächst alle MPI-Funktionen, die zur Implementierung von Kanälen benötigt werden, bevor wir die Implementierung von send und receive erläutern. Für die Implementierung von

channel ch (v1: t1, ...vn: tn);

müssen wir zuerst sicherstellen, dass wir eine Verbindung zu MPI haben. Dazu dienen die Funktionen

   static public String [] Init(String[] args) 
                           throws MPIException 
   static public native void Finalize() 
                             throws MPIException
der Klasse MPI.

Init muss auf allen beteiligten MPI-Prozessen aufgerufen werden und terminiert erst, wenn alle beteiligten Prozesse (die von mpirun gestartet wurden) Init aufgerufen haben. Init übernimmt die Kommandozeilen-Parameter, die über die main(String[] args) Methode bereitgestellt werden, als Argument. Init analysiert, ob unter den angegebenen Parametern welche für MPI darunter sind, und wertet diese aus. Über die MPI-Parameter kann man hauptsächlich mehr Informationen zur Fehlersuche erhalten. Am interessantesten ist der Parameter `-mpiqueue', mit dem der Inhalt der Nachrichtenpuffer von MPI am Ende ausgedruckt werden kann (von Finalize). Die Nachrichtenpuffer sollten am Ende zwar leer sein, aber falls nicht, kann man hier feststellen, welche Nachrichten nicht zugestellt wurden. Weitere Parameter dienen auch zum Starten von externen (grafischen) Debuggern.

Finalize muss auf allen beteiligten MPI-Prozessen aufgerufen werden und beendet die Verbindung zwischen den MPI-Prozessen. Der Zustand von MPI ist danach undefiniert.

Nach der Initialisierung von MPI muss der Prozess seinen Platz innerhalb der gesamten MPI-Umgebung identifizieren. Die MPI-Kommunikationsumgebung wird, wie schon erwähnt, durch einen Kommunikator definiert. Die Klasse MPI besitzt dazu zwei statische Variablen:

  static public Intracomm COMM_WORLD
  static public Comm COMM_SELF
COMM_WORLD aus der Klasse Intracomm (die Comm erweitert) enthält den globalen Kommunikationskontext, der immer zur Verfügung steht. Er wird von Init initialisiert.

COMM_SELF aus der Klasse Comm enthält den aktuellen Kommunikationskontext, der gegebenenfalls eine Teilmenge von COMM_WORLD oder eine Kopie von COMM_WORLD oder sonstige Modifikationen enthält.

Über den Kommunikator COMM_WORLD können wir nun die Anzahl der beteiligten Prozesse und unsere Position feststellen.

  public native int Size() throws MPIException 
  public native int Rank() throws MPIException
Size liefert die Größe der jeweiligen Kommunikationsgruppe und Rank die Position des Prozesses darin. Es gilt 0 <= Rank() <= Size().

Ein Kanal channel ch (v1: t1, ...vn: tn); könnte somit durch folgende Klasse beschrieben werden.

    public class MpiChannel {
       Comm communicator; 
       int size; 
       int rank;  // to destination or from source
       int tag;
       t1 v1; ...; tn vn;
    }

Damit kommen wir zur Implementierung von send und receive.

send chan (e1, ...., en);

Die geeigneten MPI-Funktionen Send und Recv sind Methoden der Klasse Comm. Unter den vielen Varianten von MPI Send wählen wir hier eine relativ einfach benutzbare aus.

  public void Send(Object   buf,
                   int      offset, 
                   int      count, 
                   Datatype type,
                   int      dest,  
                   int      tag) throws MPIException
Die Variable buf definiert ein Array, das die zu übertragenden Daten enthält. offset und count legen den Startindex und die Anzahl der zu übertragenden Array-Elemente fest. type kennzeichnet den Datentyp der Array-Elemente. dest bezeichnet den Rank des Empfängerprozesses. Mit tag können zur Unterscheidung verschiedene Nachrichtenklassen definiert werden.

mpiJava unterstüzt als Datentyp OBJECT für Java Object. In diesem Fall werden die Objekte in buf serialisiert und dann als MPI Byte-Message verschickt. Diese Objekte müssen dann natürlich das Interface Serializable implementieren. Die anderen Datentypen sind unter anderem BYTE, CHAR, SHORT, BOOLEAN, INT, LONG, FLOAT und DOUBLE.

In der MPI-Spezifikation steht: `Send may block until the message is received'. Das heißt, es ist dem Implementierer viel Freiraum bei der Realisierung gelassen. Umgekehrt müssen wir bei der Verwendung damit rechnen, dass Send tatsächlich blockiert und wir einen Deadlock produzieren.

Die anderen Varianten von Send sind: Bsend als gepuffertes Send, bei dem wir einen Nachrichtenpuffer verwalten müssen. Rsend verlangt, dass das zugehörige Recv schon auf den Empfang wartet (für sehr große Nachrichten schneller). Ssend ist das synchrone Send, das erst terminiert, wenn die Daten beim Empfänger abgenommen wurden. Isend ist das nicht blockierende Send, bei dem allerdings später auf seine Terminierung explizit gewartet werden muss.

Wir haben uns hier für eine möglichst einfach benutzbare Variante von Send entschieden. Die Implementierung von send chan (e_1, ..., e_n) wäre damit durch das folgende Programmfragment gegeben:

      t1[] x1 = new t1[1];
      ...
      tn[] xn = new tn[1];
      x1[0] = v1;
      communicator.Send(x1, 0, 1, <mpi t1>, dest, tag);
      ...
      xn[0] = vn;
      communicator.Send(xn, 0, 1, <mpi tn>, dest, tag);
send speichert die Ausdrücke ei in die Variablen vi und dann in das Array xi, und verschickt anschliessend die Sende-Arrays mit communicator.Send an den gewünschten Empfänger.

Auch bei der Implementierung von

receive chan (v1, ..., vn);

verwenden wir nur eine möglichst einfache Variante von Recv aus der Klasse Comm.

  public Status Recv(Object   buf, 
                     int      offset, 
                     int      count, 
                     Datatype type,
                     int      source, 
                     int      tag) throws MPIException
Dabei definiert die Variable buf ein Array, das die zu empfangenden Daten enthalten soll. offset und count legen den Startindex und die Anzahl der zu empfangenden Array-Elemente fest. Das heißt, ab offset werden die empfangenen Daten in das Array geschrieben und nur count viele Elemente werden aus dem Empfangspuffer entnommen. type kennzeichnet den Datentyp der Array-Elemente (siehe oben). source bezeichnet den Rank des Senderprozesses. Mit tag können zur Unterscheidung verschiedene Nachrichtenklassen definiert werden. Recv gibt ein Status-Objekt zurück, aus dem diverse Informationen über den Ablauf und Stand der Operation entnommen werden können. Falls zum Beispiel Recv mit einer Ausnahme terminiert, steht in Status unter anderem, wie viele Daten schon angekommen sind.

Recv blockiert, bis alle (d.h. count viele) Elemente eingetroffen sind bzw. bis eine Ausnahme ausgelöst wird. Die anderen Varianten von Receive korrespondieren zu den entsprechenden Send-Varianten, die oben genannt sind.

Die Implementierung von receive chan (v1, ..., vn); wäre damit durch das folgende Pogrammfragment gegeben:

    t1[] x1 = new t1[1];
    ...
    tn[] xn = new tn[1];
    Status stat;
    stat = communic.Recv(x1, 0, 1, <mpi t1>, from, tag);
    v1 = x1[0];
    ...
    stat = communic.Recv(xn, 0, 1, <mpi tn>, from, tag);
    vn = xn[0];
receive empfängt mit communicator.Recv die Daten in die entsprechenden Arrays, dann werden die Daten aus den Arrays entnommen und in den Variablen vi abgelegt. In der Variablen stat wird der Status der Operation abgespeichert.

Aufgaben

Aufgabe 43   Entwickeln Sie eine Klasse MpiChannel analog zur Klasse Channel in Abschnitt 4.3. Die Klasse soll die MPI-Funktionen Init(), Finalize(), Size() und Rank() einkapseln. Ein Konstruktor soll den Partner spezifizieren können. Die Klasse soll Methoden send() und recv() haben, die intern die entsprechenden MPI-Funktionen aufrufen.

Aufgabe 44   Summieren Sie die m Elemente eines Arrays A[ ] mit t >= 3 parallel arbeitenden Prozessen. Das heißt, implementieren Sie das Programm in Abbildung 5.4. Benutzen Sie mpiJava mit Send und Recv zur Parallelisierung.

Eine entsprechende Aufgabe für Distributed Memory Computer finden Sie im Buch in Aufgabe 22 und für reines Java in Aufgabe 7. Eine zweite elegantere Lösung mit Jomp enthält Aufgabe 40.

Abbildung 8.2: Vektorsumme mit MPI
\framebox{
\begin{minipage}{10.0cm}
\begin{minipage}[t]{8.6cm}
\begin{quote}
{\...
...send }chan[i]( $s_{{\rm local}}$\ );
\end{tabbing}\end{minipage}\end{minipage}}

Wir besprechen eine Musterlösung für diese Aufgabe. Die Programmteile S(i) realisieren wir alle in einer Klasse. Mit Hilfe der MPI-Rank-Information myid bearbeiten wir in einer Fallunterscheidung entweder S(0) oder die anderen S(i) für i > 0. Zur Vereinfachung nehmen wir an, dass m (counts) ein Vielfaches von t (numprocs) ist. Falls dem nicht so ist, beenden wir das Programm mit einer Fehlermeldung, die nur auf dem Master-Prozess ausgegeben wird. Im Hauptprogramm main() wird MPI initialisiert und beendet. Nach der Initialisierung des Vektors ivec mit Bildung der Kontrollsumme ans beginnen wir mit dem Datentransport.

Die Daten von 0 bis cnt-1 bleiben auf Prozess 0 und die anderen werden mit MPI.COMM_WORLD.Send() an alle Partner geschickt. Die Sendeoperationen werden nur in Prozess 0 ausgeführt. Die anderen Prozesse empfangen mit MPI.COMM_WORLD.Recv() die Daten von Prozess 0.

Jetzt liegen alle Datenpakete in dem Array vec der Länge cnt auf allen Prozessen und die Summation wird durchgeführt.

Anschließend werden die Ergebnisse an Prozess 0 geschickt, der sie empfängt und sofort in seiner Variablen sum aufsummiert. Als Sendepuffer verwenden wir ssum und als Empfangspuffer verwenden wir rsum. Die Ergebnisse werden alle in rsum[0] empfangen.

import java.io.*;
import mpi.* ;
 
public class ExVecMPI {
  public static void main(String[] args) 
         throws MPIException {
    MPI.Init(args);
    (new ExVecMPI()).work(
         new PrintWriter(System.out,true));
    MPI.Finalize();
  }

  void work(PrintWriter out) throws MPIException {
    int counts = 40000;
    // initialize MPI Communicators
    int myid     = MPI.COMM_WORLD.Rank() ;
    int numprocs = MPI.COMM_WORLD.Size() ;
    System.out.println("ExVecMPI on " + myid 
           + " of " + numprocs + " ") ;
    int cnt = counts/numprocs;
    if ( counts % numprocs != 0 ) {
       if ( myid == 0 ) {
          System.out.println("counts " + counts + 
             " is not divisible by numprocs " + numprocs 
             + ", shuting down." ) ;
       }
       return; 
    }
    // initialize data on root process
    int ans = 0;
    int[] ivec = new int[counts];
    if ( myid == 0 ) {
       for (int i=0; i<counts; i++) { 
           ivec[i] = i; ans += i; 
       }
    }
    int msgtag = 4711;
    int[] vec = new int[cnt];
    if ( myid == 0 ) {
       for (int i=0; i < cnt; i++ ) 
           vec[i] = ivec[i];
       for (int j=1; j < numprocs; j++ ) {
           MPI.COMM_WORLD.Send(ivec, j*cnt, cnt, 
                               MPI.INT, j, msgtag);
       }
    } else {
       MPI.COMM_WORLD.Recv(vec, 0, cnt, 
                           MPI.INT, 0, msgtag);
    }
    int s = 0;
    for (int i=0; i < cnt; i += 1) {
        s += vec[i];
    }
    int[] ssum = new int[1];
    int[] rsum = new int[1];
    if ( myid == 0 ) {
       int sum = s;
       for (int j=1; j < numprocs; j++ ) {
           MPI.COMM_WORLD.Recv(rsum, 0, 1, 
                               MPI.INT, j, msgtag);
           sum += rsum[0];
       }
       System.out.println("sum = " + sum );
       System.out.println("ans = " + ans );
    } else {
      ssum[0] = s;
      MPI.COMM_WORLD.Send(ssum, 0, 1, 
                          MPI.INT, 0, msgtag);
    }
  }
}

Zur Kompilierung des Programms verwenden wir ein (GNU-)Makefile, das die erforderlichen Einstellungen des mpiJava-Systems vornimmt. Das Makefile finden Sie auf unserer Website. Mit diesem Makefile genügt es

   >make ExVecMPI
einzugeben, um die Übersetzung mit den MPI-Klassen und schließlich zur Ausführung anzustoßen.

Das mpiJava Programm muss sich wie ein normales Java-Programm in einer Datei befinden, die die Endung java hat. Die Übersetzung mit mpiJava geschieht (durch das Makefile) wie folgt:

   /opt/jdk1.2.2/bin/javac \
      -classpath /home/kredel/java/mpiJava/lib/classes \
      ExVecMPI.java

Falls keine Fehler aufgetreten sind, wird das Programm mit dem mpiJava-Laufzeitsystem sogleich ausgeführt.

   /home/kredel/mpiJava/src/scripts/prunjava 4 ExVecMPI

Das Makefile kennt einen Parameter `np=x', mit dem die Anzahl `x' der gewünschten MPI-Prozesse an das Skript prunjava weitergegeben wird.

   >make ExVecMPI np=4
Die Voreinstellung sind 2 Prozesse. In diesem Beispiel werden 4 Prozesse gestartet. Die Prozesse werden in diesem Beispiel alle auf dem lokalen Rechner gestartet, da nur dieser in das machine-File eingetragen ist. Trotzdem muss natürlich die rsh-Umgebung richtig aufgesetzt sein.

Ein Beispiel für die Ausgabe der Terminalversion des Programms für vier Prozesse könnte wie folgt aussehen:

   >make ExVecMPI np=4
   ExVecMPI on 0 of 4 ExVecMPI on 2 of 4 
   ExVecMPI on 3 of 4 
   ExVecMPI on 1 of 4 
   sum = 799980000
   ans = 799980000

Die ersten Anzeigen von `ExVecMPI on 0 of 4' zeigen, dass die Arbeitsschritte in der Reihenfolge 0, 2, 3, 1 begonnen wurden. Das Ergebnis sum ist wie erwartet gleich ans.

Aufgabe 45   Bei größer dimensioniertem Array rsum (int[numprocs]) könnten wir auch die Ergebnisse von Prozess i an die Stelle rsum[i] ablegen und anschließend die Elemente von rsum[] aufaddieren. Wie müssen dazu die Parameter von Recv angepasst werden ?


Heinz Kredel
2002-04-05