Accumulator.java
package handist.collections.accumulator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import handist.collections.ChunkedList;
import handist.collections.LongRange;
/**
* Class representing a factory for accumulator instances dedicated to a single
* thread.
* <p>
* This allows for multiple threads to record information in a thread-dedicated
* {@link ChunkedList}<A>, with each A object "accumulating" information
* with regards to the "T" object at the matching index in the target
* collection.
* <p>
* Computation with an accumulator takes place in 3 phases:
* <ol>
* <li>An {@link Accumulator} with a certain accumulator-allocation policy is
* created. the type A used to accumulate information and how to initialize new
* instance of this A type are specified to the constructor.
* <li>This accumulator is used in one or multiple accumulation calls where the
* information contained in some collection is used as the source to register
* information into the accumulator objects contained by the {@link Accumulator}
* <li>The information accumulated in the {@link Accumulator} is used to update
* some collection's elements using method
* {@link ChunkedList#accept(Accumulator, java.util.function.BiConsumer)} or
* {@link ChunkedList#parallelAccept(Accumulator, java.util.function.BiConsumer)}.
* The function which determines how to update individual T elements of the
* target collection based on the information contained in the A accumulators is
* specified as parameter to these functions.
* </ol>
* <p>
* There is currently one implementation of {@link Accumulator}:
* <ul>
* <li>{@link AccumulatorCompleteRange} which prepares accumulators for all the
* ranges it is given at the time of construction. Use this variant if you know
* that the elements processed during the accumulation phase will need to record
* information about (almost) all indices contained in the target
* {@link ChunkedList}<T>.
* </ul>
*
* @author Kawanishi Yoshiki
*
* @param <A> the "accumulator" type used to keep information about the
* modification to be performed on the target {@link ChunkedList}
*/
public abstract class Accumulator<A> {
/**
* ThreadLocalAccumulator provided as parameter when accumulating information
* inside accumulators. This interface provides access to the individual
* accumulators of type A.
* <p>
* Internally, various accumulator allocation strategies can be used (on-demand,
* fixed initialization on specific ranges defined in advance) and are
* implemented into the specializations of class {@link Accumulator}.
*
* @author Patrick Finnerty
*
* @param <A> the type of the accumulator used to collect data
*/
public interface ThreadLocalAccumulator<A> {
/**
* Provides the accumulator instance dedicated to this thread for the specified
* index. This method may throw Exceptions if the allocation contract of the
* underlying {@link Accumulator} is not respected.
*
* @param idx index of the accumulator desired
* @return the accumulator object for this index
*/
public A acquire(long idx);
/**
* Obtains the accumulators for the range specified by the parameter.
* <p>
* This method may throw an {@link Exception} if the allocation contract of the
* underlying {@link Accumulator} is not respected.
*
* @param range the range of accumulators to obtain from the thread-local
* accumulator
* @return ranged list of accumulators containing the specified range
*/
public ChunkedList<A> acquire(LongRange range);
/**
* Returns the underlying {@link ChunkedList} containing the accumulators
* <p>
* Internal-use only. Call at your own peril.
*
* @return the underlying {@link ChunkedList} containing the accumulators of
* this {@link ThreadLocalAccumulator}
*/
public ChunkedList<A> getChunkedList();
/**
* Returns the ranges that the current thread-local accumulator has internally
* initialized.
*
* @return collection of {@link LongRange} present in this accumulator
*/
public Collection<LongRange> ranges();
}
/**
* Collection which keeps track of the {@link ChunkedList} created for each
* thread participating in the computation.
*/
protected final ArrayList<ThreadLocalAccumulator<A>> threadLocalAccumulators;
/**
* Function used to create the R type based on the index of the corresponding T
* type
*/
public final Function<Long, A> initFunc;
/**
* Protected constructor. Initializes members common to all
* ThreadLocalAccumulator implementations
*
* @param initializerFunction function used to initialize a new accumulator A
* for the T object located at the specified index
*/
protected Accumulator(Function<Long, A> initializerFunction) {
initFunc = initializerFunction;
threadLocalAccumulators = new ArrayList<>();
}
/**
* Obtain all currently initialized thread-local accumulators.
*
* @return all thread accumulators initialized by this instance up until this
* point
*/
public List<ThreadLocalAccumulator<A>> getAllThreadLocalAccumulator() {
return threadLocalAccumulators;
}
/**
* Factory method which creates a new ThreadLocalAccumulator for an upcoming
* thread to use to accumulate information.
* <p>
* This method is for internal use only. It is used when there are not enough
* thread-local accumulators for the level of parallelism desired. Each
* {@link Accumulator} implementation should implement this method to return a
* {@link ThreadLocalAccumulator} instance fitted with its appropriate
* accumulator allocation policy.
*
* @return a newly created
*/
protected abstract ThreadLocalAccumulator<A> newThreadLocalAccumulator();
/**
* Obtain the specified number of thread-local accumulators. These can then be
* used by as many threads to accumulate information without interfering with
* each-other.
* <p>
* Previously allocate thread-local accumulators will be re-used by this class.
* Only in a case where more accumulators then previously initialized are
* demanded will new TLAs be allocated.
*
* @param nbAccumulators number of accumulator desired
* @return list of accumulators containing as many
*/
public List<ThreadLocalAccumulator<A>> obtainThreadLocalAccumulators(int nbAccumulators) {
// Until we have enough local accumulators prepared, allocate some more
while (threadLocalAccumulators.size() < nbAccumulators) {
threadLocalAccumulators.add(newThreadLocalAccumulator());
}
return threadLocalAccumulators.subList(0, nbAccumulators);
}
/**
* Discards all initialized accumulators prepared up until this point. Call this
* method if you plan to re-use this {@link Accumulator} again for a new
* computation.
*/
public void reset() {
threadLocalAccumulators.clear();
}
}