Das Package java.util.concurrency stellt Möglichkeiten zur parallelen Abarbeitung von Aufgaben zhur Verfügung. Insbesondere sinnvoll ist dies zur Steuerung externer Aufgaben, auf die gewartet wird und die unzuverlässig sind.

Beschreibung#

Der TimeoutExecutor benutzt eine begrenzte Zahl von Threads aus einem bestehenden ExecutorService zum Ausführen von Callables. Wenn das Ergebnis eines Callables nicht innerhalb der angegebenen Zeit vorliegt, wird die Abarbeitung abgebrochen.

Ziel#

Die Threads werden effizient genutzt, indem auf eine Ressource maximal die angegebene Zeit gewartet wird. Sobald ein Thread frei wird, entweder durch liefern des Ergebnisses oder durch den Timeout, wird dieser wieder verwendet.
package com.intersult.util.concurrent

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class TimeoutExecutor<V> implements Future<List<V>> {
	private class TimeoutFuture extends FutureTask<V> {
		private long end;
		
		TimeoutFuture(Callable<V> callable, long end) {
			super(callable);
			this.end = end;
		}

		@Override
		protected void done() {
			if (!isCancelled())
				completionQueue.add(this);
		}
		
		public boolean isTimeout() {
			return end < System.nanoTime();
		}

		public long getEnd() {
			return end;
		}
	}

	final BlockingQueue<TimeoutFuture> submissionQueue;
	final BlockingQueue<TimeoutFuture> completionQueue;
	private final Executor executor;
	private boolean cancelled;
	private final int maxThreads;
	private final Collection<Callable<V>> callables;

	public TimeoutExecutor(Executor executor, Collection<Callable<V>> callables, int maxThreads) {
		this.maxThreads = maxThreads;
		if (executor == null || callables == null)
			throw new NullPointerException();
		this.executor = executor;
		this.callables = callables;
		submissionQueue = new LinkedBlockingQueue<TimeoutFuture>(maxThreads);
		completionQueue = new LinkedBlockingQueue<TimeoutFuture>(callables.size());
	}

	@Override
	public boolean cancel(boolean mayInterruptIfRunning) {
		if (isDone())
			return false;
		cancelled = true;
		for (Future<V> future : submissionQueue) {
			future.cancel(mayInterruptIfRunning);
		}
		return cancelled;
	}

	@Override
	public List<V> get() throws InterruptedException, ExecutionException {
		return get(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
	}

	@Override
	public List<V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
		long nanoTimeout = unit.toNanos(timeout);
		List<V> result = new ArrayList<V>(callables.size());
		for (Iterator<Callable<V>> iterator = callables.iterator(); iterator.hasNext() || !submissionQueue.isEmpty();) {
			while (!cancelled && submissionQueue.size() < maxThreads && iterator.hasNext()) {
				long end = System.nanoTime() + nanoTimeout;
				TimeoutFuture future = new TimeoutFuture(iterator.next(), end);
				submissionQueue.add(future);
				executor.execute(future);
			}
			long remain = submissionQueue.peek().getEnd() - System.nanoTime();
			TimeoutFuture future = completionQueue.poll(remain, TimeUnit.NANOSECONDS);
			if (future != null) {
				submissionQueue.remove(future);
				result.add(future.get());
			}
			while (!submissionQueue.isEmpty()) {
				if (submissionQueue.peek().isTimeout()) {
					submissionQueue.poll().cancel(true);
				} else {
					break;
				}
			}
		}
		return result;
	}

	@Override
	public boolean isCancelled() {
		return cancelled;
	}
	@Override
	public boolean isDone() {
		return submissionQueue.isEmpty();
	}
}