8.3 Reduktionen

Als Alternative zu Send und Receive bietet MPI, wie auch OpenMP, die Möglichkeit zu Reduktionen. Das heißt, mit einer einzigen Operation werden parallel auf allen Prozessen Variablen eingesammelt, z.B. aufaddiert, und das Ergebnis wird in einem Prozess abgeliefert.

Als Gegenstück dazu bietet Broadcast die Möglichkeit, Daten an alle beteiligten Prozesse in einer Operation zu verteilen. Broadcast hat eine praktische Variante namens Scatter, mit der ein Array von Daten in gleichen Teilstücken an alle beteiligten Prozesse verstreut werden kann (d.h., jeder erhält ein anderes Teilstück).

Wir beginnen mit Broadcast, beschreiben dann Scatter und zum Schluss die Reduktion. Die besprochenen Methoden gehören alle zur Klasse Intracomm.

Unter den vielen Varianten von MPI Bcast wählen wir hier wieder eine einfach benutzbare aus.

  public void Bcast(Object   buf, 
                    int      offset, 
                    int      count, 
                    Datatype type,
                    int      root) throws MPIException

Die Variable buf definiert ein Array, das die zu übertragenden bzw. die zu empfangenden Daten enthält. offset und count legen den Startindex und die Anzahl der zu übertragenden Array-Elemente fest. type kennzeichnet den Datentyp der Array-Elemente. Einen Parameter tag gibt es nicht. Im Gegensatz zu Send benötigen wir hier keinen dest Parameter, sondern einen root-Parameter, der den Rechner bezeichnet, von dem aus die Daten gesendet werden. dest-wird nicht benötigt, da die Daten an alle Prozesse in dem Communicator gesendet werden. buf enthält also auf dem Senderprozess die zu sendenden Daten und nach der Terminierung auf den Empfängerprozessen die empfangenen Daten.

Abbildung 8.3: Schneeball-Broadcast
\includegraphics[width=10.45cm]{figures/Schneeball}

Bcast muss auf allen beteiligten Prozessen aufgerufen werden und terminiert auf allen Prozessen erst, nachdem alle Daten angekommen sind. Intern wird ein Schneeball-Algorithmus verwendet, der die Daten zunächst von einem Prozess zu b Prozessen sendet, von denen jeder die Daten wieder an b weitere Prozesse überträgt, usw. (siehe Abbildung 8.3).

Eine wichtige Variante von MPI Bcast ist Scatter, die wir hier noch besprechen, da wir sie in der nächsten Aufgabe benötigen.

  public void Scatter(Object   sendbuf, 
                      int      sendoffset, 
                      int      sendcount,
                      Datatype sendtype, 
                      Object   recvbuf, 
                      int      recvoffset, 
                      int      recvcount,
                      Datatype recvtype, 
                      int      root) throws MPIException

Da Scatter die Daten aus einem großen Array in kleine Arrays verstreut, benötigen wir hier zwei Puffer sendbuf und recvbuf. Die entsprechenden offset und count legen den Startindex und die Anzahl der zu übertragenden bzw. der zu empfangenden Array-Elemente fest. sendcount legt nicht die Gesamtzahl der zu übertragenden Elemente fest, sondern die an jeden einzelnen Prozess zu sendenden Anzahlen. type kennzeichnet den Datentyp der Array-Elemente. Dass es zwei type Parameter gibt, liegt daran, dass es bis zu einem gewissen Grad möglich ist, Daten zu konvertieren. Der root Parameter bezeichnet wieder den Rechner, von dem die Daten gesendet werden.

Scatter muss auch auf allen beteiligten Prozessen aufgerufen werden und terminiert auf allen Prozessen erst, nachdem alle Daten angekommen sind. Intern wird, wie bei Bcast, ein Schneeball-Algorithmus verwendet (siehe Abbildung 8.3).

Unter den Varianten von MPI Reduce wählen wir hier wieder eine einfache aus.

  public void Reduce(Object   sendbuf, 
                     int      sendoffset,
                     Object   recvbuf, 
                     int      recvoffset, 
                     int      count,
                     Datatype datatype, 
                     Op       op, 
                     int      root) throws MPIException

Da Reduce die Daten aus einem Array in ein einelementiges Array kombiniert, benötigen wir hier zwei Puffer sendbuf und recvbuf. Die entsprechenden offset und count legen den Startindex und die Anzahl der zu übertragenden Array-Elemente fest. Empfangen wird nur ein Element. type kennzeichnet den Datentyp der Array-Elemente. Der root-Parameter bezeichnet hier den Rechner, an den die Daten gesendet werden.

Reduce muss auf allen beteiligten Prozessen aufgerufen werden. Reduce terminiert auf allen Prozessen erst, nachdem alle Daten angekommen sind. Intern wird ein umgekehrter Schneeball-Algorithmus verwendet, der die Daten zunächst von b Prozessen zu einem weiteren Prozess sendet, dort wieder kombiniert, dann wieder von b Prozessen an einen Prozess sendet, usw., bis die Daten auf dem root-Prozess angekommen sind (wie in Abbildung 8.3 mit umgekehrten Pfeilen).

In mpiJava sind folgende Operationen für `op' verfügbar: MPI.MAX, MPI.MIN, MPI.SUM, MPI.PROD, MPI.LAND, MPI.BAND, MPI.LOR, MPI.BOR, MPI.LXOR, MPI.BXOR, MPI.MINLOC und MPI.MAXLOC.

Damit haben wir alle MPI-Funktionen zusammen, um die nächste Aufgabe zu lösen.

Aufgaben

Aufgabe 46   Summieren Sie die m Elemente eines Arrays A[ ] mit t >= 3 parallel arbeitenden Prozessen. Das heißt, implementieren Sie das Programm in Abbildung 5.4. Benutzen Sie mpiJava mit Scatter und Reduce zur Parallelisierung.

Wir besprechen eine Musterlösung mit mpiJava. Die Programmteile S(i) realisieren wir alle in einer Klasse. Mit Hilfe der MPI-Rank-Information myid bearbeiten wir in einer Fallunterscheidung entweder S(0) oder die anderen S(i) für i > 0. Zur Vereinfachung nehmen wir an, dass m (counts) ein Vielfaches von t (numprocs) ist. Falls dem nicht so ist, beenden wir das Programm mit einer Fehlermeldung, die nur auf dem Master-Prozess ausgegeben wird. Im Hauptprogramm main() wird MPI initialisiert und beendet. Nach der Initialisierung des Vektors ivec mit Bildung der Kontrollsumme ans beginnen wir mit dem Datentransport.

Die Daten werden mit MPI.COMM_WORLD.Scatter() an alle Partner geschickt. Jeder Prozess erhält in dem Array vec der Länge cnt die zu verarbeitenden Daten. In allen Prozessen wird dann die Summation durchgeführt.

Anschließend werden die Ergebnisse mit MPI.COMM_WORLD.Reduce() an Prozess 0 geschickt und dabei aufsummiert. Als Sendepuffer verwenden wir ssum und als Empfangspuffer verwenden wir rsum. Das Ergebnis befindet sich dann in rsum[0].

import java.io.*;
import mpi.* ;
 
public class ExVecMPIRed {
  public static void main(String[] args) 
         throws MPIException {
    MPI.Init(args) ;
    (new ExVecMPIRed()).work(
         new PrintWriter(System.out,true));
    MPI.Finalize();
  }

  void work(PrintWriter out) throws MPIException {
    int counts = 20000;
    // initialize MPI Communicators
    int myid     = MPI.COMM_WORLD.Rank() ;
    int numprocs = MPI.COMM_WORLD.Size() ;
    System.out.println("ExVecMPIRed on " + myid 
           + " of " + numprocs + " ") ;
    int cnt = counts/numprocs;
    if ( counts % numprocs != 0 ) {
       if ( myid == 0 ) {
          System.out.println("counts " + counts + 
             " is not divisible by numprocs " + numprocs 
             + ", shuting down." ) ;
       }
       return; 
    }
    // initialize data on root process
    int ans = 0;
    int[] ivec = new int[counts];
    if ( myid == 0 ) {
       for (int i=0; i<counts; i++) { 
           ivec[i] = i; ans += i; 
       }
    }
    int[] vec = new int[cnt];
    MPI.COMM_WORLD.Scatter(ivec, 0, cnt, MPI.INT, 
                           vec, 0, cnt, MPI.INT, 0);
    int s = 0;
    for (int i=0; i < cnt; i += 1) {
        s += vec[i];
    }
    int[] ssum = new int[1];
    int[] rsum = new int[1];
    ssum[0] = s;
    MPI.COMM_WORLD.Reduce(ssum, 0, rsum, 0, 1, 
                          MPI.INT, MPI.SUM, 0);
    if ( myid == 0 ) {
       int sum = rsum[0];
       System.out.println("sum = " + sum );
       System.out.println("ans = " + ans );
    }
  }
}

Mit Hilfe des Makefiles wird die Klasse kompiliert und anschließend ausgeführt.

   >make ExVecMPIRed np=4

Ein Beispiel für die Ausgabe der Terminalversion des Programms für vier Prozesse könnte wie folgt aussehen:

   ExVecMPIRed on 0 of 4 ExVecMPIRed on 1 of 4 
   ExVecMPIRed on 3 of 4 ExVecMPIRed on 2 of 4 
   sum = 199990000
   ans = 199990000
Die ersten Anzeigen von `ExVecMPIRed on 0 of 4' zeigen, dass die Arbeitsschritte in der Reihenfolge 0, 1, 3, 2 begonnen wurden. Das Ergebnis sum ist wie erwartet gleich ans.


Heinz Kredel
2002-04-05