FutureN.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
/**
* {@link FutureN} receives a list of {@link Future} and waits for the
* termination of all the futures. There are four variations of futureN.
* <ul>
* <li>When a list of <code>Future<?></code> is given
* <ul>
* <li>{@link FutureN.OnlyWait} waits for the termination of the given futures.
* <li>{@link FutureN.ReturnGivenResult} receives the result data structure `R
* result` and returns the result when all the futures are finished.
* </ul>
* <li>When a list of <code>Future<R></code> is given
* <ul>
* <li>{@link FutureN.ListResults} receives a
* <code>List<Future<R>></code> and returns a
* <code>List<R></code>.
* <li>{@link FutureN.ConsumeResults} receives a
* <code>List<Future<R>></code> and a
* <code>Consumer<R></code>. It waits for the termination of the given
* futures and processes the result using the provided consumer.
* </ul>
* </ul>
*
* @param <S> generic type of the group of {@link Future} this class will handle
*/
public abstract class FutureN<S> {
/**
* {@link ConsumeResults} receives a <code>List<Future<R>></code>
* and a <code>Consumer<R></code>. It waits for the termination of the
* given futures before transmitting the individual results of type
* <code>R</code> to the consumer.
*
* @param <R> type of the result produced by the individual futures
*/
public static class ConsumeResults<R> extends FutureN<R> implements Future<Void> {
/** Consumer provided during construction */
private final Consumer<R> consumer;
/** Flag used to indicate if {@link #processResult()} was previously called */
private boolean processed = false;
/**
* Constructor taking a list of futures and a Consumer as parameter
*
* @param futures the list of futures this instance will wait on
* @param consumer the consumer which will receive the results produced by every
* future
*/
public ConsumeResults(List<Future<R>> futures, Consumer<R> consumer) {
super(futures);
this.consumer = consumer;
}
/**
* Waits for every future to terminate and gives their individual results to the
* consumer provided during construction
*
* @return nothing
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
*/
@Override
public Void get() throws InterruptedException, ExecutionException {
get0();
processResult();
return null;
}
/**
* Waits for every future to terminate and gives their individual results to the
* consumer provided during construction
*
* @return nothing
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
* @throws TimeoutException if one of the futures handled by this class
* exhausts the timeout
*/
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
get0(timeout, unit);
processResult();
return null;
}
/**
* Helper method which takes the results produced by each future and gives them
* to the consumer
*/
private void processResult() {
synchronized (this) {
if (processed) {
return;
}
for (final Future<R> f : futures) {
try {
consumer.accept(f.get());
} catch (InterruptedException | ExecutionException e) {
throw new Error("This should not occur!");
}
}
processed = true;
}
}
}
/**
* {@link ListResults} receives a <code>List<Future<R>></code> and
* returns a <code>List<R></code> containing the result of each of the
* individual futures.
*
* @param <R> type of the result of each future
*/
public static class ListResults<R> extends FutureN<R> implements Future<List<R>> {
/** List which is going to contain the result of each future */
List<R> result = null;
/**
* Constructor with a list of futures.
*
* @param futures list of futures to wait on
*/
public ListResults(List<Future<R>> futures) {
super(futures);
}
/**
* Waits for all futures to terminate and return the individual result of each
* future in a list.
*
* @return the result produced by each future, in a list
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
*/
@Override
public List<R> get() throws InterruptedException, ExecutionException {
get0();
return processResult();
}
/**
* Waits for all futures to terminate within the provided timeout and return the
* individual result of each future in a list.
*
* @param timeout the total time allowed for all the futures to complete
* @param unit time unit of the timeout
* @return the result produced by each future, in a list
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
* @throws TimeoutException if one of the futures handled by this class
* exhausts the timeout
*/
@Override
public List<R> get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
get0(timeout, unit);
return processResult();
}
/**
* Helper method which initializes {@link #result}
*
* @return the freshly initialized {@link #result}
*/
List<R> processResult() {
synchronized (this) {
if (result != null) {
return result;
}
result = new ArrayList<>();
for (final Future<R> f : futures) {
try {
result.add(f.get());
} catch (InterruptedException | ExecutionException e) {
throw new Error("This should not occur!");
}
}
}
return result;
}
}
/**
* {@link OnlyWait} waits for the termination of all its given futures.
*/
public static class OnlyWait extends FutureN<Void> {
/**
* Constructor with a list of futures.
*
* @param futures futures on which to wait for completion
*/
public OnlyWait(List<Future<Void>> futures) {
super(futures);
}
/**
* Waits for all futures to terminate and returns
*
* @return {@code null}
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
*/
public Void get() throws InterruptedException, ExecutionException {
get0();
return null;
}
/**
* Waits for all futures to terminate in the specified allocated time and
* returns
*
* @param timeout the total time allowed for all the futures to complete
* @param unit time unit of the timeout
* @return {@code null}
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
* @throws TimeoutException if one of the futures handled by this class
* exhausts the timeout
*/
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
get0(timeout, unit);
return null;
}
}
/**
* {@link ReturnGivenResult} receives a list of futures and an instance of the
* generic type R. This instance is returned in method {@link #get()} after all
* the futures have completed.
* <p>
* This nested class is typically used when the futures operate on the elements
* contained in an object and the "result" of this operation is actually the
* same object with its contents potentially transformed. Presumably, the object
* can be safely manipulated again once all the futures have completed.
*
* @param <R> the type of the result type (This class implements
* <code>Future<R></code>)
*/
@SuppressWarnings("rawtypes")
public static class ReturnGivenResult<R> extends FutureN implements Future<R> {
/** Object to return when all the futures have completed */
R result;
/**
* Constructor which takes a list of futures and an object
*
* @param futures the list of futures whose completion will be waited on
* @param r object to return when all the futures have completed
*/
@SuppressWarnings("unchecked")
public ReturnGivenResult(List<Future<?>> futures, R r) {
super(futures);
result = r;
}
/**
* Waits for the termination of all the futures and returns the object with
* which this {@link FutureN} was constructed.
*/
@Override
public R get() throws InterruptedException, ExecutionException {
get0();
return result;
}
/**
* Waits for the termination of all the futures within the specified timeout and
* returns the object this instance was initialized with.
*
* @param timeout total timeout allowed for all the futures to finish
* @param unit time unit of the timeout
* @return the object this class was constructed with
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
* @throws TimeoutException if one of the futures handled by this class
* exhausts the timeout
*/
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
get0(timeout, unit);
return result;
}
}
/** List of futures handled by this class */
protected final List<Future<S>> futures;
boolean isCanceled = false;
boolean isDone = false;
/**
* Constructor taking a list of futures as argument.
*
* @param futures the futures which this instance aggreagates
*/
public FutureN(List<Future<S>> futures) {
this.futures = futures;
}
/**
* Attempts to cancel execution of all the contained tasks. This attempt will
* fail if one of the task has already completed, has already been cancelled, or
* could not be cancelled for some other reason. If successful, and this task
* has not started when cancel is called, none of the contained tasks should
* never run. If the task has already started, then the mayInterruptIfRunning
* parameter determines whether the thread executing this task should be
* interrupted in an attempt to stop the task. After this method returns,
* subsequent calls to isDone() will always return true. Subsequent calls to
* isCancelled() will always return true if this method returned true.
*
* @param mayInterruptIfRunning true if the thread executing the tasks should be
* interrupted; otherwise, in-progress tasks are
* allowed to complete
* @return false if at least one of the tasks could not be cancelled (usually
* because it has already completed normally, true otherwise
*/
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = true;
for (final Future<?> f : futures) {
result &= f.cancel(mayInterruptIfRunning);
}
return result;
}
/**
* Helper method for child classes, calls method {@link Future#get()} on each
* future handled
*
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
*/
protected void get0() throws InterruptedException, ExecutionException {
synchronized (this) {
if (isDone) {
return;
}
}
for (final Future<?> f : futures) {
f.get();
}
return;
}
/**
* Helper method for child classes, calls method
* {@link Future#get(long,TimeUnit)} on each future handled by this class
*
* @param timeout total timeout allowed for all the futures
* @param unit unit of the timeout
* @throws InterruptedException if such an exception is thrown by one of the
* futures
* @throws ExecutionException if such an exception is thrown by one of the
* futures
* @throws TimeoutException if one of the futures handled by this class
* exhausts the timeout
*/
protected void get0(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
final long start = System.currentTimeMillis();
synchronized (this) {
if (isDone) {
return;
}
}
for (final Future<?> f : futures) {
final long now = System.currentTimeMillis();
f.get(timeout - unit.convert(now - start, TimeUnit.MILLISECONDS), unit);
}
return;
}
/**
* Indicates if all the tasks this instnace contains were cancelled before they
* could complete normally
*
* @return true if all the tasks were successfully canceled before they could
* complete
*/
public boolean isCancelled() {
boolean result = true;
for (final Future<S> f : futures) {
result &= f.isCancelled();
}
return result;
}
/**
* Returns true is all the individual tasks this instance contains completed.
*
* @return true if all the tasks completed
*/
public boolean isDone() {
synchronized (this) {
if (isDone) {
return isDone;
}
}
for (final Future<S> f : futures) {
if (!f.isDone()) {
return false;
}
}
synchronized (this) {
isDone = true;
}
return true;
}
}