This page (revision-1) was last changed on 10-Nov-2011 11:47 by Dieter Käppel

Only authorized users are allowed to rename pages.

Only authorized users are allowed to delete pages.

Page revision history

Version Date Modified Size Author Changes ... Change note
1 10-Nov-2011 11:47 3 KB Dieter Käppel

Page References

Incoming links Outgoing links
TimeoutExecutor ...nobody

Version management

Difference between version and

At line 1 added 118 lines
Das Package java.util.concurrency stellt Möglichkeiten zur parallelen Abarbeitung von Aufgaben zhur Verfügung. Insbesondere sinnvoll ist dies zur Steuerung externer Aufgaben, auf die gewartet wird und die unzuverlässig sind.
!Beschreibung
Der TimeoutExecutor benutzt eine begrenzte Zahl von Threads aus einem bestehenden ExecutorService zum Ausführen von Callables. Wenn das Ergebnis eines Callables nicht innerhalb der angegebenen Zeit vorliegt, wird die Abarbeitung abgebrochen.
!Ziel
Die Threads werden effizient genutzt, indem auf eine Ressource maximal die angegebene Zeit gewartet wird. Sobald ein Thread frei wird, entweder durch liefern des Ergebnisses oder durch den Timeout, wird dieser wieder verwendet.
{{{
package com.intersult.util.concurrent
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TimeoutExecutor<V> implements Future<List<V>> {
private class TimeoutFuture extends FutureTask<V> {
private long end;
TimeoutFuture(Callable<V> callable, long end) {
super(callable);
this.end = end;
}
@Override
protected void done() {
if (!isCancelled())
completionQueue.add(this);
}
public boolean isTimeout() {
return end < System.nanoTime();
}
public long getEnd() {
return end;
}
}
final BlockingQueue<TimeoutFuture> submissionQueue;
final BlockingQueue<TimeoutFuture> completionQueue;
private final Executor executor;
private boolean cancelled;
private final int maxThreads;
private final Collection<Callable<V>> callables;
public TimeoutExecutor(Executor executor, Collection<Callable<V>> callables, int maxThreads) {
this.maxThreads = maxThreads;
if (executor == null || callables == null)
throw new NullPointerException();
this.executor = executor;
this.callables = callables;
submissionQueue = new LinkedBlockingQueue<TimeoutFuture>(maxThreads);
completionQueue = new LinkedBlockingQueue<TimeoutFuture>(callables.size());
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone())
return false;
cancelled = true;
for (Future<V> future : submissionQueue) {
future.cancel(mayInterruptIfRunning);
}
return cancelled;
}
@Override
public List<V> get() throws InterruptedException, ExecutionException {
return get(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
@Override
public List<V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
long nanoTimeout = unit.toNanos(timeout);
List<V> result = new ArrayList<V>(callables.size());
for (Iterator<Callable<V>> iterator = callables.iterator(); iterator.hasNext() || !submissionQueue.isEmpty();) {
while (!cancelled && submissionQueue.size() < maxThreads && iterator.hasNext()) {
long end = System.nanoTime() + nanoTimeout;
TimeoutFuture future = new TimeoutFuture(iterator.next(), end);
submissionQueue.add(future);
executor.execute(future);
}
long remain = submissionQueue.peek().getEnd() - System.nanoTime();
TimeoutFuture future = completionQueue.poll(remain, TimeUnit.NANOSECONDS);
if (future != null) {
submissionQueue.remove(future);
result.add(future.get());
}
while (!submissionQueue.isEmpty()) {
if (submissionQueue.peek().isTimeout()) {
submissionQueue.poll().cancel(true);
} else {
break;
}
}
}
return result;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return submissionQueue.isEmpty();
}
}}}}