neues Packet in Java 5 (aka JDK 1.5)
bietet für viele wiederkehrende Arbeiten fertig vorbereitete Lösungen, z.B. ThreadPools
verbesserte Klassen für die Synchronisation in kritischen Bereichen, z.B. Read / Write Locks
verbesserte Klassen für vielfältige Bedingungen, z.B. Semaphore, Barrier
Erweiterungen des Collection Frameworks, z.B. Queue, BlockingQueue, Semaphore
Thread (Task) Erzeugung
Kritische Bereiche
Bedingungen
neue Collections
parallele Tasks sind wieder Klassen,
die das Runnable
Interface implementieren
alternativ können Sie auch das
Callable<V>
Interface implementieren
neben der void run()
Methode gibt es damit noch
eine Methode, die einen Wert zurück gibt V call()
die Klasse Executors
erzeugt mit statischen Methoden
passende Thread Pools, genannt ExecutorService
zum Beispiel newFixedThreadPool(int threads)
liefert einen Thread Pool mit einer festen Anzahl von Threads
und einer unbeschränkten Arbeits-Queue
newSingleThreadExecutor()
erzeugt
einen Pool mit einem Thread
newScheduledThreadPool(int threads)
liefert einen Pool mit einer festen Anzahl von Threads
für zeitgesteuerte Tasks
die wichtigsten Methoden von ExecutorService
sind
void execute(Runnable r)
ohne Test auf Terminierung
Future<?> submit(Runnable r)
Future<V> submit(Callable<V> c)
mit Test auf Terminierung
void shutdown()
zum Beenden des Thread Pools
die Klasse Future<V>
bietet die Möglichkeit
auf die Terminierung der parallelen Task zu warten
die wichtigsten Methoden von Future
sind
V get()
blockiert bis die Task
(d.h. die run()-Methode) beendet ist und liefert den Wert von call()
zurück (oder null)
boolean isDone()
testet ob die Task
beendet ist
Der Erzeugung, dem Starten und Stoppen von Threads
Thread[] t = new Thread[anzahl]; for (int i = 0; i < anzahl; i++) { t[i] = new RunnableImpl(...,i); t[i].start(); } for (int i = 0; i < anzahl; i++) { try { t[i].join(); } catch (InterruptedException e) { } }
entspricht der folgende Code
ExecutorService pool = Executors.newFixedThreadPool(anzahl); Future[] f = new Future[anzahl]; for (int i = 0; i < anzahl; i++) { f[i] = pool.submit( new RunnableImpl(...,i) ); } for (int i = 0; i < anzahl; i++) { try { f[i].get(); } catch (InterruptedException ignored) { } catch (ExecutionException ignored) { } } pool.shutdown();
Zur Behandlung von kritischen Bereichen gibt es zwei
Unterpackete java.util.concurrent.locks
und java.util.concurrent.atomic
.
Locks bietet eine ReentrantLock
Klasse
und ein Lock
Interface.
Mit den Methoden lock()
und unlock()
verhält sich ein ReentrantLock
wie das bekannte
synchronized (.)
.
Beide realisieren ein weak fair scheduling.
Zusätzlich gibt es mit AbstractQueuedSynchronizer
eine abstrakte Klasse, die sich zur Implementierung von
Locks mit strong fair scheduling eignen.
Zum Beispiel entspricht dem synchronized Konstrukt
synchronized (mutex) { ... statements ... }
die folgende Lock Konstruktion
Lock mutex = new ReentrantLock(); mutex.lock(); try { ... statements ... } finally { mutex.unlock(); }
Das Interface ReadWriteLock
und die Klasse ReentrantReadWriteLock
bieten optimierte Locks für Situationen mit
vielen read-Operationen und wenigen write-Operationen.
Die Klassen AtomicInteger
,
AtomicLong
und andere, bieten atomare Integer und Long Variablen.
Damit kann in Situationen, in denen nur eine einzelne Variable durch einen kritischen Bereich geschützt werden müsste, der aufwändige Lock vermieden werden.
Die wichtigsten Methoden sind
boolean compareAndSet(erwartet, neu)
entspricht atomar if ( val == erwartet ) val = neu
addAndGet(delta)
entspricht atomar (val += delta)
getAndAdd(delta)
entspricht atomar (t = val; val += delta; t)
.
Zusätzlich gibt es Methoden die den Operationen
val++
, val--
,
++val
und --val
entsprechen.
Weitere Klassen bieten atomare Felder (Arrays) von Integern und Longs.
Für Referenzen und Feldern von Referenzen gibt es ebenfalls entsprechende Klassen, allerdings ohne die arithmetischen Operationen.
Neben den schon genannten Locks gibt es eine ganze Reihe weiterer nützlicher Klassen, wie Semaphore, Barrieren und Latches.
Lock
stellt eine Methode
newCondition()
bereit, die eine
neue Bedingung (gekoppelt an diesen Lock)
Condition
liefert.
Condition
verfügt über die
notwendigen Warte- und Signalisierungsmethoden:
await()
, await(timeout)
,
signal()
und signalAll()
.
Dies kann etwas verwirren, da eine Condition
(als Object) auch über die Methoden
wait()
und notify()
verfügt.
synchronized (mutex) { ... while ( ? ) { mutex.wait(); } ... } synchronized (mutex) { ... if ( ? ) { mutex.notify(); } ... }
Lock mutex = new ReentrantLock(); Condition cond = mutex.newCondition(); mutex.lock(); try { ... while ( ? ) { cond.await(); } ... } finally { mutex.unlock(); } mutex.lock(); try { ... if ( ? ) { cond.signal(); } ... } finally { mutex.unlock(); }
Die Semaphore
Klasse
verfügt über die zu erwartenden P- und V-Operationen,
die hier aber acquire()
und release()
genannt werden.
Eine strong fair Implementierung kann über
einen Konstruktor gewählt werden.
Die CyclicBarrier
Klasse
verfügt über eine await()
Methode,
die Terminiert, falls die vorgegebene Anzahl von Partnern
eingechecked haben.
Mit reset()
lässt sich die Barrier neu
initialisieren.
Die Klasse CountDownLatch
verfügt über eine await()
Methode,
die Terminiert, falls der interne Zähler 0
erreicht.
Mit countDown()
lässt sich der Zähler
herunter zählen.
Eine Reihe von Interfaces und Klassen erweitern die Collection-Tools um nützliche multithreading-Funktionalitäten, wie Queue, BlockingQueue, ConcurrentMap, ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, Exchanger, CopyOnWriteArrayList, CopyOnWriteArraySet, ConcurrentLinkedQueue, ConcurrentHashMap.
Das Interface Queue
definiert die Methoden für
First-In-First-Out (FIFO) Listen.
Das Interface BlockingQueue
definiert zusätzliche Methoden, die blockieren
bei Entnahmen von einer leeren Queue,
bzw. blockieren beim Einfügen in eine grössenbeschränkte,
schon volle Queue.
Das Interface ConcurrentMap
erweitert Map
um Methoden
zur atomaren Modifikation
z.B. putIfAbsent()
, replace()
.
Die Klassen ArrayBlockingQueue
,
LinkedBlockingQueue
und
PriorityBlockingQueue
bieten Implementierungen des BlockingQueue
Interfaces mit Hilfe von Arrays bzw. Listen.
Die mit Array implementieren Klassen sind immer grössenbeschränkt,
während die mit Listen implementierten Klassen in der Regel
in der Grösse unbeschränkt sind.
SynchronousQueue
erzwingt die
Synchronisierung der Einfüge- mit der Entnahme-Operation.
D.h. ein Thread kann nur etwas eingefügen, wenn ein anderer Thread
gleichzeitig dieses entnimmt.
Exchanger
kennt nur
T exchange(T val)
Methoden. Damit können genau
zwei Threads einen Wert synchron austauschen.
Bei den Klassen CopyOnWriteArrayList
und
CopyOnWriteArraySet
werden bei modifizierenden
Operationen Kopien erzeugt. Dadurch sind "keine" aufwändigen
Synchronisationen notwendig.
ConcurrentLinkedQueue
implementiert eine sehr effiziente, "wartefreie" Queue.
Die Klasse ConcurrentHashMap
stellt eine effiziente multi-threaded Variante von Hashtable
zur Verfügung.
In Collections
gibt es Methoden, die
von Listen, etc. synchronisierte Varianten erzeugen,
z.B. synchronizedList(list)
.
Manchmal kann man auch unmodifizierbare Listen, etc.
verwenden, für die dann natürlich auch keine Synchronisation
notwendig ist, z.B. unmodifiableList(list)
.
(Allerdings kann der ursprüngliche Erzeuger die Liste noch
verändern, d.h. es entspricht nicht final für die Elemente.)
© Universität Mannheim, Rechenzentrum, 2005-2007.
Heinz KredelLast modified: Wed Sep 12 21:29:25 CEST 2007