DistColGlbTask.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.glb;
import static apgas.Constructs.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import apgas.Place;
import apgas.impl.Finish;
import handist.collections.LongRange;
import handist.collections.dist.DistCol;
import handist.collections.glb.Config.LifelineAnswerMode;
import handist.collections.glb.GlbComputer.LifelineToken;
/**
* Implementation of GlbTask for the {@link DistCol} distributed collection.
* This implementation relies on {@link LongRange} to describe assignments taken
* up by workers.
*
* @author Patrick Finnerty
*
*/
public class DistColGlbTask implements GlbTask {
/**
* Class describing the progress of the various operations taking place on a
* range pertaining to the {@link handist.collections.dist.DistCol}
*
* @author Patrick Finnerty
*
*/
static class DistColAssignment implements Assignment {
/** Serial Version UID */
private static final long serialVersionUID = 5397031649035798704L;
/**
* {@link DistColGlbTask} currently handling this assignment. This member is not
* serialized as it will need to be set to an existing object when the
* assignment is received on the remote host.
*/
transient DistColGlbTask parent;
/**
* Progress of each operation in progress on this range
* <p>
* As operation are completed on this assignment, the mapping for this operation
* will be removed.
*/
@SuppressWarnings("rawtypes")
Map<GlbOperation, Long> progress;
/** Range of indices on which this assignment will operate */
LongRange range;
/**
* Constructor
*
* Builds a new assignment with a dedicated LongRange instance
*
* @param lr range of entries on which the assignment is being created and on
* which the various operations will operate
* @param p parent {@link DistColGlbTask} in charge of this instance
*/
DistColAssignment(LongRange lr, DistColGlbTask p) {
range = lr;
progress = new ConcurrentHashMap<>();
parent = p;
}
@SuppressWarnings("rawtypes")
@Override
public GlbOperation chooseOperationToProgress() {
for (final Map.Entry<GlbOperation, Long> e : progress.entrySet()) {
if (e.getValue() < range.to) { // This operation has work left, we pick it
return e.getKey();
}
}
throw new RuntimeException("Could not obtain an operation with work in assignment " + this);
}
/**
* Indicates if an assignment of {@link DistCol} can be split in two assignments
* with work in both of them. For an assignment of {@link DistCol}, this method
* will return {@code true} if the following two conditions are met:
* <ol>
* <li>The range of this assignment is greater than the provided parameter
* <li>There is at least one operation in progress on this assignment which has
* more than the provided parameter of instances left to process
* </ol>
*/
@Override
public boolean isSplittable(int qtt) {
// First condition, range greater than minimum
if (range.size() <= qtt) {
return false;
}
// Second condition, at least one operation has greater than minimum elements
// left to process
for (final Long operationProgress : progress.values()) {
if (range.to - operationProgress >= qtt) {
return true;
}
}
return false;
}
/**
* Indicates if there are some operations which have not completed yet on this
* assignment
*
* @return true if there are uncompleted operation on this assignment
*/
boolean operationRemaining() {
for (final Long l : progress.values()) {
if (l < range.to) {
return true;
}
}
return false;
}
/**
* Processes the specified amount of elements in a GlbOperation which is
* available for this assignment.
*
* @param qtt number of elements to process
* @param ws service provided by the worker to the operation
* @param op GLB operation to progress in this assignment
* @return true if there is some work remaining in the operation that was
* progressed, false if the operation that was chosen was completed on
* this fragment
*/
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public boolean process(int qtt, WorkerService ws, GlbOperation op) {
final long next = progress.get(op);
long limit = next + qtt;
boolean operationCompletedForThisAssignment = false;
if (limit >= range.to) {
// The operation selected will be completed on this assignment
limit = range.to;
operationCompletedForThisAssignment = true;
}
/*
* I have getting some NPE thrown from the following call. This issue might have
* been fixed but the try/catch will remain for a while just in case.
*/
try {
progress.put(op, limit);
} catch (final NullPointerException e) {
e.printStackTrace();
System.err.println("Key was: " + op + " and value was " + limit);
throw e;
}
// Computation loop is made on the following LongRange inside the "action"
// carried by the GlbOperation.
final LongRange lr = new LongRange(next, limit);
op.operation.accept(lr, ws);
// Signal the parent GlbTask that the operation has completed on this
// assignment.
if (operationCompletedForThisAssignment) {
parent.operationTerminatedOnAssignment(op);
// Depending if there are other operations on this assignment, we
// place it the appropriate collection of the parent DistColGlbTask
// This part is protected using a read/write lock against newOperation.
parent.lockForWorkAssignmentSplittingAndNewOperation.readLock().lock();
if (operationRemaining()) {
parent.assignedAssignments.remove(this);
parent.availableAssignments.add(this);
} else {
parent.assignedAssignments.remove(this);
parent.completedAssignments.add(this);
}
parent.lockForWorkAssignmentSplittingAndNewOperation.readLock().unlock();
}
return !operationCompletedForThisAssignment;
}
/**
* Allows to set the DistColGlbTask in charge of this assignment. This is used
* when receiving assignments as part of a lifeline answer to make the
* assignments used to correct objects accessed through its parent member.
*
* @param p the new value for member parent
*/
void setParent(DistColGlbTask p) {
parent = p;
}
/**
* Splits this assignment, creating a new one which is stored in the enclosing
* {@link DistColGlbTask} immediately.
* <p>
* The current assignment will be split at the halfway point between the
* operation with the lowest progress and the {@link #range} upper bound. This
* ensures that there is work both in the assignment that is split away and the
*/
@Override
public void splitIntoGlbTask() {
parent.lockForWorkAssignmentSplittingAndNewOperation.readLock().lock();
// First determine the splitting point
long minimumProgress = Long.MAX_VALUE;
for (final Long operationProgress : progress.values()) {
minimumProgress = operationProgress < minimumProgress ? operationProgress : minimumProgress;
}
final long splittingPoint = range.to - ((range.to - minimumProgress) / 2);
// Create a 'split' assignment
final LongRange splitRange = new LongRange(splittingPoint, range.to);
final LongRange thisRange = new LongRange(range.from, splittingPoint);
final DistColAssignment split = new DistColAssignment(splitRange, parent);
range = thisRange;
// Adjust the progress of both "this" and the created assignment
for (@SuppressWarnings("rawtypes")
final Map.Entry<GlbOperation, Long> progressEntry : progress.entrySet()) {
final Long currentProgress = progressEntry.getValue();
@SuppressWarnings("rawtypes")
final GlbOperation op = progressEntry.getKey();
if (currentProgress < splitRange.from) {
// The progress of this operation remains unchanged for this (it is within range
// of "thisRange")
// The start of the "splitRange" is set as the initial progress for the "split"
// assignment
split.progress.put(op, new Long(splittingPoint));
// There is an extra Assignment with work on this operation
// We increment the assignmentsLeftToProcess counter
parent.assignmentsLeftToProcess.get(op).incrementAndGet();
} else {
// The current progress is out of range for "thisRange", we set it to the upper
// bound: thisRange.to.
// This is not actually necessary but it will probably make things less
// confusing when debugging.
progress.put(op, thisRange.to);
// The current progress is placed in the "split" progress.
// Note that it is possible for "currentProgress" to be equal to
// "splitRange.to" if at the time this method is called the operation had
// already completed.
split.progress.put(op, currentProgress);
// Whether there was work or not, the number of assignments that need to
// complete
// for this operation remains unchanged. We do not need to increment the
// assignmentsLeftToProcess counter for this operation
}
}
// Add the "splitAssignment" to the DistColGlbTask handling the assignment for
// the underlying collection.
// NOTE: The counter for the total number of assignments needs to be incremented
// BEFORE the assignment is placed in the "available" queue.
parent.totalAssignments.incrementAndGet();
parent.availableAssignments.add(split);
parent.lockForWorkAssignmentSplittingAndNewOperation.readLock().unlock();
}
@Override
public String toString() {
return range.toString();
}
}
/**
* Answer mode used to make lifeline answers
*/
private static final LifelineAnswerMode answerMode = Config.getLifelineSerializationMode();
/** Serial Version UID */
private static final long serialVersionUID = -792674800264517475L;
/**
* Upper bound on the number of assignments which can be transferred to a remote
* thief.
*/
private static final int MAX_NUMBER_STOLEN_ASSIGNMENTS = 10;
/**
* Contains the list of all the assignments that are being processed by a worker
*/
ConcurrentLinkedQueue<DistColAssignment> assignedAssignments;
/**
* Map which associates the number of assignments left to process to each
* operation in progress.
* <p>
* As workers complete operations of assignments, they will decrement the
* matching atomic counter. When a worker completes the last assignment
* available for an operation (i.e. the counter was decremented to 0), local
* completion of this operation is reached. That worker will therefore trigger
* the stealing process by releasing the operation thread which is currently
* blocking on a {@link OperationBlocker}.
*/
@SuppressWarnings("rawtypes")
Map<GlbOperation, AtomicInteger> assignmentsLeftToProcess;
/** Contains the list of all the assignments that are available to workers */
ConcurrentLinkedQueue<DistColAssignment> availableAssignments;
/**
* Contains all the assignments that have been completely processed by a worker.
* <p>
* The assignments in this collection have all of the current operations
* completed.
*/
ConcurrentLinkedQueue<DistColAssignment> completedAssignments;
/**
* This lock is used to maintain consistency of the total assignment counter and
* the presence of {@link AtomicLong} in {@link DistColAssignment#progress}.
* <p>
* "Readers" are threads that perform the {@link #splitIntoGlbTask()} method
* while "Writers" are threads that call the {@link #newOperation(GlbOperation)}
* method. There can be many concurrent calls to the {@link #splitIntoGlbTask()}
* method but not when a new operation becomes available and a number of
* modifications to several members of this class need to be made atomically to
* preserve consistency.
*/
transient final ReadWriteLock lockForWorkAssignmentSplittingAndNewOperation;
/**
* Total number of assignments located on this place
*/
AtomicInteger totalAssignments;
/**
* Underlying collection on which the assignments operate
*/
@SuppressWarnings("rawtypes")
private final DistCol collection;
/**
* Constructor
*
* @param localHandle the local handle of the collection which is going to
* undergo some operations under this class' supervision
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
DistColGlbTask(DistCol localHandle) {
collection = localHandle;
availableAssignments = new ConcurrentLinkedQueue<>();
assignedAssignments = new ConcurrentLinkedQueue<>();
completedAssignments = new ConcurrentLinkedQueue<>();
assignmentsLeftToProcess = new HashMap<>();
lockForWorkAssignmentSplittingAndNewOperation = new ReentrantReadWriteLock();
// Initialize assignments with each LongRange of the local handle
final Collection<LongRange> ranges = localHandle.getAllRanges();
totalAssignments = new AtomicInteger(ranges.size());
ranges.forEach((l) -> {
final LongRange lr = l;
final LongRange copyForAssignment = new LongRange(lr.from, lr.to);
final DistColAssignment a = new DistColAssignment(copyForAssignment, this);
availableAssignments.add(a);
});
}
@SuppressWarnings("rawtypes")
@Override
public boolean answerLifeline(final LifelineToken token) {
final Place thief = token.place;
// START OF THE R/W LOCK PROTECTION
lockForWorkAssignmentSplittingAndNewOperation.readLock().lock();
// Obtain some Assignments
// TODO cap the maximum number of assignments that can be stolen
final ArrayList<Assignment> stolen = new ArrayList<>();
DistColAssignment a;
while (stolen.size() < MAX_NUMBER_STOLEN_ASSIGNMENTS && (a = availableAssignments.poll()) != null) {
stolen.add(a);
}
// Decrement the total number of assignments contained locally
totalAssignments.getAndAdd(-stolen.size());
// END OF THE R/W LOCK PROTECTION
lockForWorkAssignmentSplittingAndNewOperation.readLock().unlock();
if (stolen.isEmpty()) {
// If no assignment could be taken, there is nothing more to do and the method
// return false here
return false;
}
// From here onward, we know that some assignments were taken from the reserve.
// We flip the place to "here" as the token will be used to indicate to the
// thief that this place is the one making the answer.
token.place = here();
/*
* Detect which operations are contained in the assignments stolen to determine
* under which finish we need to make the lifeline answer. By the same occasion,
* count how many assignments of each operation were taken away. This allows us
* to decrement the number of assignments left to complete after the assignment
* transfer has completed
*/
final HashMap<GlbOperation, Integer> numbers = new HashMap<>();
for (final Assignment s : stolen) {
final DistColAssignment assignment = (DistColAssignment) s;
final long upperBound = assignment.range.to;
assignment.progress.entrySet().forEach((entry) -> {
final Long progress = entry.getValue();
final GlbOperation op = entry.getKey();
if (progress < upperBound) { // If the operation has work left
final Integer v = numbers.computeIfAbsent(op, k -> new Integer(0));
numbers.put(op, new Integer(v + 1)); // Increment the counter
}
});
}
// Prepare the array of enclosing finishes
final Set<GlbOperation> operations = numbers.keySet();
final HashMap<GlbOperation, Finish> finishes = new HashMap<>();
final Finish[] finishArray = new Finish[operations.size()];
final GlbComputer glb = GlbComputer.getComputer();
int fidx = 0;
for (final GlbOperation op : operations) {
final Finish f = glb.finishes.get(op);
finishes.put(op, f);
finishArray[fidx++] = f;
}
/*
* Make the asynchronous answer to the target place. We need serialization of a
* certain number of elements and the assignments. Then we need to
* asynchronously accept these instances and merge them into the local bag and
* check whether a new operation thread is needed.
*/
// Initialize the one-sided move manager
final CustomOneSidedMoveManager m = new CustomOneSidedMoveManager(thief);
// Submit all the elements of the collection that need to be moved
for (final Assignment s : stolen) {
final DistColAssignment assignment = (DistColAssignment) s;
collection.moveRangeAtSync(assignment.range, thief, m);
}
switch (answerMode) {
case MPI:
try {
m.asyncSendAndDoWithMPI(
() -> GlbComputer.getComputer().lifelineAnswer(token, stolen, numbers, finishes), finishArray);
} catch (final Exception e) {
System.err.println("Error while trying to transfer work");
e.printStackTrace();
}
break;
case KRYO:
default:
try {
m.asyncSendAndDoNoMPI(() -> GlbComputer.getComputer().lifelineAnswer(token, stolen, numbers, finishes),
finishArray);
} catch (final IOException e) {
System.err.println("Error while trying to transfer work");
e.printStackTrace();
}
}
// The entries for the distributed collection have been transferred, as well as
// all the assignments.
// We decrement the numbers of remaining assignments to process as if they had
// been completed locally.
for (final Map.Entry<GlbOperation, Integer> entry : numbers.entrySet()) {
final GlbOperation operation = entry.getKey();
final Integer removedAssignments = entry.getValue();
final AtomicInteger remainder = assignmentsLeftToProcess.get(operation);
if (remainder.addAndGet(-removedAssignments) == 0) { // Decrements the counter
// All assignments for this operation have completed locally
GlbComputer.getComputer().signalLocalOperationCompletion(operation);
}
}
return true;
}
@Override
public Assignment assignWorkToWorker() {
lockForWorkAssignmentSplittingAndNewOperation.readLock().lock();
final DistColAssignment a = availableAssignments.poll();
if (a != null) {
assignedAssignments.add(a);
}
lockForWorkAssignmentSplittingAndNewOperation.readLock().unlock();
return a;
}
/**
* Merges the given assignments into this GlbTask. This method is called by a
* lifeline answer after the instances on which the assignment operate have been
* integrated into the underlying {@link DistCol}.
*
* @param quantities the number of assignment which have work for each glb
* operation entered as a key in this map
* @param assignments the assignments that were stolen
*/
@Override
@SuppressWarnings("rawtypes")
public void mergeAssignments(HashMap<GlbOperation, Integer> quantities, ArrayList<Assignment> assignments) {
// As the first part and before placing assignments into the queues, we
// increment the counters for the number of assignments left to process for each
// operation. This can be done without any particular protections.
for (final Map.Entry<GlbOperation, Integer> entry : quantities.entrySet()) {
final AtomicInteger i = assignmentsLeftToProcess.get(entry.getKey());
assertNotNull(i);
i.addAndGet(entry.getValue());
}
// We need to increment the counter for the number of assignments contained
// locally, as well as placing all the assignment in the "availableAssignments"
// queue. This needs to be done under STRONG protection: we use the writeLock
lockForWorkAssignmentSplittingAndNewOperation.writeLock().lock();
totalAssignments.addAndGet(assignments.size()); // Increment counter for the total number of assignments handled
// by this instance
// All all assignments to the "availableAssignments" collection
for (final Assignment a : assignments) {
final DistColAssignment dca = (DistColAssignment) a; // Cast to the proper type
dca.setParent(this); // From now on, "this" DistColGlbTask is handling the assignment
availableAssignments.add(dca);
}
// The critical step has ended, we release the writeLock
lockForWorkAssignmentSplittingAndNewOperation.writeLock().unlock();
}
/**
* Initializes the progress tracking in every assignment contained in this local
* instance for the provided operation.
* <p>
* This method acquires the "WriteLock" of this instance to be protected against
* calls to
* <ul>
* <li>{@link #assignWorkToWorker()}
* <li>DistColAssignment method used to split assignment
* </ul>
*
* @param op the new operation available for processing
* @return true if some new work is available on the local host as a result of
* this new operation. A case where this method would return false is if
* there were no elements in the local handle of {@link DistCol}.
*/
@Override
public boolean newOperation(@SuppressWarnings("rawtypes") GlbOperation op) {
lockForWorkAssignmentSplittingAndNewOperation.writeLock().lock();
// We allocate an extra counter for completed assignments
// This counter is used in #operationTerminatedOnAssignment(GlbOperation)
// to check if all assignments of the DistColGlbTask have been performed
assignmentsLeftToProcess.put(op, new AtomicInteger(totalAssignments.get()));
final int expected = totalAssignments.get();
int prepared = 0;
// Add a progress tracker for each assignment contained locally
boolean toReturn = false;
for (final DistColAssignment a : availableAssignments) {
a.progress.put(op, a.range.from);
toReturn = true;
prepared++;
}
for (final DistColAssignment a : assignedAssignments) {
a.progress.put(op, a.range.from);
toReturn = true;
prepared++;
}
for (final DistColAssignment a : completedAssignments) {
a.progress.put(op, a.range.from);
toReturn = true;
prepared++;
}
assertEquals(expected, prepared); // The number of assignments prepared should be the same as the number of
// assignments known to be held by this instance
// The formerly completed assignments now have work in them
availableAssignments.addAll(completedAssignments);
completedAssignments.clear();
lockForWorkAssignmentSplittingAndNewOperation.writeLock().unlock();
return toReturn;
}
/**
* Signals that the specified operation has been completed for one of the
* assignments. This method is called by a worker thread from method
* {@link DistColAssignment#process(int)} as it was processing the assignment.
*
* @param op operation on which an assignment has completed
*/
void operationTerminatedOnAssignment(@SuppressWarnings("rawtypes") GlbOperation op) {
final AtomicInteger ai = assignmentsLeftToProcess.get(op);
final int completedAssignments = ai.decrementAndGet();
if (completedAssignments == 0) {
GlbComputer.getComputer().signalLocalOperationCompletion(op);
}
}
}