GlbComputer.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.glb;
import static apgas.Constructs.*;
import static apgas.ExtendedConstructs.*;
import static handist.collections.glb.GlobalLoadBalancer.*;
import static org.junit.Assert.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import apgas.GlobalRuntime;
import apgas.Place;
import apgas.impl.Finish;
import apgas.util.PlaceLocalObject;
import handist.collections.Bag;
import handist.collections.dist.DistLog;
import handist.collections.dist.DistributedCollection;
import handist.collections.dist.TeamedPlaceGroup;
import handist.collections.glb.lifeline.Lifeline;
import handist.collections.glb.lifeline.LifelineFactory;
import handist.collections.glb.lifeline.Loop;
/**
* Distributed object in charge of handling the GLB runtime and the work
* stealing and between hosts.
*
* @author Patrick Finnerty
*
*/
class GlbComputer extends PlaceLocalObject {
/**
* Class representing the fact that a host wants to steal some work from another
* host.
*
* @author Patrick Finnerty
*
*/
final static class LifelineToken implements Serializable {
/** Serial Version UID */
private static final long serialVersionUID = 1445594155772627013L;
/** Distributed collection from which assignments are desired */
@SuppressWarnings("rawtypes")
DistributedCollection collection;
/**
* Place performing the steal / answering a lifeline steal
*/
Place place;
/**
* Constructor for a lifeline token. The target collection and the source of the
* steal are specified as parameters
*
* @param c distributed collection from which work is desired
* @param p place which is trying to steal some work
*/
private LifelineToken(@SuppressWarnings("rawtypes") DistributedCollection c, Place p) {
collection = c;
place = p;
}
@Override
public String toString() {
return "LifelineToken[" + place + "/" + collection + "]";
}
}
/**
* Class containing information and logging facilities of an individual worker.
* As part of the initialization process for the GLB, an instance of this class
* is prepared for each concurrent worker that may run on the host.
*
* @author Patrick Finnerty
*
*/
final class WorkerInfo implements WorkerService {
/**
* Counts the number of times this worker split its assignment to increase
* parallelism on the host
*/
int assignmentSplit;
/**
* Counts the number of times this worker tried to split its assignment but
* failed to do so as the assignment it held was too small
*/
int assignmentUnabledToSplit;
/** Current operation being processed by this worker */
@SuppressWarnings("rawtypes")
GlbOperation currentOperation;
/** Unique integer identifier */
final int id;
/**
* Number of times the worker made an answer to a remote thief.
*/
public int lifelineAnswer;
/**
* Number of times the worker attempted to make an answer to a lifeline thief
* but this attempt failed because no assignments were available on the local
* host.
*/
public int lifelineCannotAnswer;
/**
* Time (in nanoseconds) spent working by this worker. The time spent yielding
* is not included.
*/
public long timeWorking;
/** Time spent yielding to other tasks (not counted in {@link #timeWorking}) */
public long timeYielding;
/**
* Counts the number of times this worker successfully takes an assignment from
* the reserve for itself
*/
int tookFromReserve;
/**
* In some operations, an object may need to remain bound to a single worker and
* used throughout the processing of the operation. In such a case, this objects
* is kept in this collection.
*/
final Map<Object, Object> workerBoundObjects;
/**
* Counts the number of times this worker was spawned
*/
int workerSpawned;
/**
* Constructor
* <p>
* The unique identifier of the instance created needs to be specified as
* parameter.
*
* @param workerId identifier for the object to create
*/
public WorkerInfo(int workerId) {
id = workerId;
workerBoundObjects = new ConcurrentHashMap<>();
}
@Override
public void attachOperationObject(Object key, Object o) {
workerBoundObjects.put(key, o);
}
@Override
public Object retrieveOperationObject(Object key) {
return workerBoundObjects.get(key);
}
@Override
public void throwableInOperation(Throwable t) {
@SuppressWarnings("rawtypes")
final Map<GlbOperation, ArrayList<Throwable>> errors = getComputer().operationErrors;
synchronized (errors) {
errors.computeIfAbsent(currentOperation, k -> new ArrayList<>()).add(t);
}
}
}
/**
* Class used on each host to hold the work from which workers and remote steals
* take from.
* <p>
* Internally it manages the (multiple) {@link GlbTask} instances that are being
* processes on the host. Workers obtain pieces of work through an instance of
* this class
*
* @author Patrick Finnerty
*
*/
final class WorkReserve {
/**
* Lock used to guarantee availability of {@link WorkerInfo} instances inside
* the {@link #idleWorkers} collection when they fail to take some work from
* this instance.
*/
final ReadWriteLock lock;
/**
* Map from distributed collection to their respective GlbTask instance. This
* map is used to keep track of distributed collections that may already have an
* operation in progress. If that is the case and an additional operation comes
* along, this operation will need to be added to the existing {@link GlbTask}
* of this distributed collection.
*/
Map<Object, GlbTask> allTasks;
/**
* Map which contains the operation with work left as keys and the GlbTask is in
* charge of handling the corresponding assignments. As all the assignments of a
* certain operations are completed, the mapping in this collection will be
* removed at the GlbTask's initiative as part of the
* {@link Assignment#process(int)} method. This map is traversed when workers
* try to acquire a new assignment from the reserve in method
* {@link #getAssignment()}.
*/
@SuppressWarnings("rawtypes")
ConcurrentSkipListMap<GlbOperation, GlbTask> tasksWithWork;
/**
* Constructor
* <p>
* Prepares the members of WorkReserve to receive the various GLB operations
*/
WorkReserve() {
lock = new ReentrantReadWriteLock();
allTasks = new HashMap<>();
tasksWithWork = new ConcurrentSkipListMap<>();
}
/**
* Checks the GlbTask of the current host to provide some work to a worker.
* <p>
* If at the time this method is called no work could be selected, returns null
* instead.
* <p>
* This implementation uses the natural ordering of class {@link GlbOperation}
* in a {@link ConcurrentSkipListMap} to favor the higher priority
* {@link GlbOperation}s.
*
* @param wInfo the worker info instance which will be placed back into the
* #idleWorkers collection if calling this method did not result in
* an Assignment being given to the worker
* @return the provided instance with updates, or null if no work could be
* obtained
*/
Assignment getAssignment(WorkerInfo wInfo) {
lock.readLock().lock();
for (final GlbTask t : tasksWithWork.values()) {
Assignment a;
if ((a = t.assignWorkToWorker()) != null) {
lock.readLock().unlock();
return a;
}
}
// Unable to get an assignment for the worker
idleWorkers.add(wInfo);
lock.readLock().unlock();
reserveWasEmptied();
return null;
}
/**
* Registers a new operation in the local reserve
*
* @param op operation newly available to workers
* @return true if some work was actually made available to workers as a result
* of this operation
*/
@SuppressWarnings("rawtypes")
public synchronized boolean newOperation(GlbOperation op) {
// There is a GlbTask instance for the underlying collection (either created or
// retrieved)
GlbTask toPlaceInAvailable;
if (!allTasks.containsKey(op.collection)) {
// We need to create a new GlbTask for the target collection
toPlaceInAvailable = (GlbTask) op.initializerOfGlbTask.get();
allTasks.put(op.collection, toPlaceInAvailable);
} else {
// We "add" the operation to the existing instance
toPlaceInAvailable = allTasks.get(op.collection);
}
final boolean workCreated = toPlaceInAvailable.newOperation(op);
if (workCreated) {
// The GlbTask may already be present in tasksWithWork if another operation is
// being processed already. HashSet#add will keep the tasksWithWork set
// unchanged in this case.
tasksWithWork.put(op, toPlaceInAvailable);
}
return workCreated;
}
/**
* Discards all tracking information kept until that point. Can only be called
* safely if there are no ongoing GLB computation.
*
* @see GlobalLoadBalancer#reset()
*/
public void reset() {
allTasks.clear();
tasksWithWork.clear();
}
}
/** Singleton, local handle instance */
private static GlbComputer computer = null;
/**
* Code used in member {@link #lifelineEstablished} to represent the fact that
* this place had a lifeline established with the remote host
*
* @see #establishingLifelineOnRemoteHost(DistributedCollection)
*/
private static final int LIFELINE_ESTABLISHED = 1;
/**
* Code used in member {@link #lifelineEstablished} to represent the fact that
* this place has not established a lifeline with a remote host
*
* @see #establishingLifelineOnRemoteHost(DistributedCollection)
*/
private static final int LIFELINE_NOT_ESTABLISHED = 0;
/**
* Setting describing if tracing is activated. If so, a number of output
* messages are made to {@link System#err} as GLB events occur.
*/
static boolean TRACE = Boolean.parseBoolean(System.getProperty(Config.ACTIVATE_TRACE, "false"));
/**
* Destroys the GlbComputer singleton on all places. This method should be
* called before calling method {@link #initializeComputer(DistLog)} or
* {@link #getComputer()}
*/
static void destroyGlbComputer() {
TeamedPlaceGroup.getWorld().broadcastFlat(() -> {
computer = null;
});
}
/**
* Method returning the local singleton for GlbComputer. May return null if
* method {@link #initializeComputer(DistLog)} was not called beforehand.
*
* @return GlbComputer local handle
*/
static GlbComputer getComputer() {
// if (computer == null) {
// final DistLog log = new DistLog();
// computer = PlaceLocalObject.make(TeamedPlaceGroup.getWorld().places(), () -> {
// final GlbComputer c = new GlbComputer(log);
// // Assign the static member of class GlbComputer here
// // Doing so here avoids the need for a second finish/asyncAt block
// GlbComputer.computer = c;
// if (TRACE) {
// System.err.println("GlbComputer on " + here() + " is " + c);
// }
// return c;
// });
// }
assertNotNull(computer);
return computer;
}
/**
* Performs the global initialization of a GlbComputer instance on all hosts.
* Any previous instance must be destroyed before calling this method.
*
* @param log the logger instance into which all the event that occur as part of
* the GLB program eecution will be recorded
* @return the GlbComputer instance created locally
*/
static GlbComputer initializeComputer(DistLog log) {
return PlaceLocalObject.make(TeamedPlaceGroup.getWorld().places(), () -> {
final GlbComputer c = new GlbComputer(log);
// Assign the static member of class GlbComputer here
// Doing so here avoids the need for a second dedicated finish/asyncAt block
if (computer == null) {
GlbComputer.computer = c;
if (TRACE) {
System.err.println("GlbComputer on " + here() + " is " + c);
}
} else {
throw new IllegalStateException("Previous GlbComputer was not destroyed on " + here());
}
return c;
});
}
/**
* Map associating every GlbOperation with the semaphore used to block the
* thread representing the presence of work for this operation.
* <p>
* This member needs to be accessed through a synchronized block as it is not
* protected against concurrent modifications. As it is not accessed often, the
* use of a concurrent data structure is not warranted.
*/
@SuppressWarnings("rawtypes")
private final Map<GlbOperation, OperationBlocker> blockers;
/**
* Atomic array used to signal to workers that they need to place some work back
* into the {@link #reserve}. Each worker is identified with a unique index.
* They use this identifier to access this array which is initialized with an
* initial length of {@link #MAX_WORKERS}.
* <p>
* The value used in this array are:
* <ul>
* <li>0: the worker can move on in its {@link #worker(Integer, Assignment)}
* procedure's main loop, the {@link #reserve} does not need to be fed with work
* <li>1: the worker should feed the {@link #reserve} in its main loop before
* setting its value back to 0.
* </ul>
*/
private final AtomicIntegerArray feedReserveRequested;
/**
* Map associating every GlbOperation with the finish instance in which they are
* being executed
*/
/*
* This member needs to be protected against concurrent modifications as there
* is a risk that multiple starting operations may try to insert their "finish"
* value inside this collection concurrently. While we could use synchronized
* blocks for all accesses made to this object, it would be error-prone as
* workers making lifeline answer may access this object at any time from
* outside this object. A concurrent data collection was therefore preferred.
*/
@SuppressWarnings("rawtypes")
final ConcurrentHashMap<GlbOperation, Finish> finishes;
/**
* Number of elements processed by workers in one gulp before they check the
* runtime
*/
volatile int granularity;
/**
* Concurrent linked list which contains the inactive workers that may run on
* the host if they are given an assignment.
*/
private final ConcurrentLinkedDeque<WorkerInfo> idleWorkers;
/**
* Collection used to keep track of the lifelines that are established on remote
* hosts
*/
@SuppressWarnings("rawtypes")
ConcurrentHashMap<DistributedCollection, ConcurrentHashMap<Place, AtomicInteger>> lifelineEstablished;
/**
* Lifeline requests for work coming from remote places in the system.
* <p>
* A lifeline request consists in a token which contains the distributed
* collection which is the target of the steal and the Place which requires the
* work. Refer to
*/
ConcurrentLinkedQueue<LifelineToken> lifelineThieves;
/**
* Logger for the events occurring on this host
*/
DistLog logger;
/**
* Maximum number of workers that can concurrently run on the local host
*/
final int MAX_WORKERS;
/**
* Member used for book-keeping of the errors that occur during GLB operations.
*/
@SuppressWarnings("rawtypes")
final transient Map<GlbOperation, ArrayList<Throwable>> operationErrors;
/** Fork Join Pool used by the APGAS runtime */
final ForkJoinPool POOL;
/**
* Instance in which the workers on a host take / place work back into
*/
WorkReserve reserve;
/**
* Collection of Locks which contains the locks that are available for workers
* to pick up to actively yield to other activities.
*/
private final ConcurrentLinkedQueue<TimeoutBlocker> workerAvailableLocks;
/**
* Array containing all the workers initialized on the host Contrary to
* {@link #idleWorkers}, the contents of this array never change. This array is
* used to access all the workers irrespective of if they are running or idle.
* <p>
* One particular case we need to access all workers is when an operation needs
* to attach an object to each worker which is needed when this operation is
* processed by a the workers.
*/
final WorkerInfo[] workers;
/**
* Lock used to force workers to yield execution to allow other activities to
* run
*/
private final TimeoutBlocker workerYieldLock;
/**
* Private constructor
* <p>
* GlbComputer is a global object that follows a singleton design pattern. This
* constructor is made private to protect this property.
*
* @param log the logger instance into which the events that occur on this host
* will be recorded throughout the execution
*/
private GlbComputer(DistLog log) {
// Set the constants related to runtime environment
MAX_WORKERS = Config.getMaximumConcurrentWorkers();
POOL = (ForkJoinPool) GlobalRuntime.getRuntime().getExecutorService();
// Initialize the single lock used by workers to actively yield to other
// activities (lifeline steals / other non-GLB activities)
workerYieldLock = new TimeoutBlocker();
workerAvailableLocks = new ConcurrentLinkedQueue<>();
workerAvailableLocks.add(workerYieldLock);
// Prepare the worker Ids
workers = new WorkerInfo[MAX_WORKERS];
idleWorkers = new ConcurrentLinkedDeque<>();
for (int i = 0; i < MAX_WORKERS; i++) {
final WorkerInfo w = new WorkerInfo(i);
idleWorkers.add(w);
workers[i] = w;
}
// Prepare the atomic array used as flag to signal to workers that the #reserve
// needs to be fed
feedReserveRequested = new AtomicIntegerArray(MAX_WORKERS);
// Set an initial value for the granularity
granularity = Config.getGranularity();
// Initialize the reserve of assignments for this host
reserve = new WorkReserve();
// Initialize the map that will contain the errors that may occur during GLB
// operation
operationErrors = new HashMap<>();
finishes = new ConcurrentHashMap<>();
blockers = new HashMap<>();
// Initialize the data structures used to keep track of the lifelines
lifelineThieves = new ConcurrentLinkedQueue<>();
lifelineEstablished = new ConcurrentHashMap<>();
// Setup the logger instance for this local host
logger = log;
logger.put(LOGKEY_GLB, LOG_INITIALIZED_WORKERS, Integer.toString(MAX_WORKERS));
logger.put(LOGKEY_GLB, LOG_INITIALIZED_AT_NANOTIME, Long.toString(System.nanoTime()));
}
/**
* Attempt to spawn a worker (uncounted) with an assignment obtained from the
* reserve. It is possible all the assignment this operation brought about were
* already taken up by existing workers, preventing this thread from obtaining
* work from the reserve. It is also possible that at the time this method is
* called, there are already the maximum number of workers running, in which
* case an additional worker is not spawned either. What is guaranteed by the
* spawn here is that if there were no workers running and some work is
* available, then a worker is spawned. In case workers are already running but
* it is possible to spawn an extra one, then it is perfectly fine to spawn an
* extra one.
*/
private void attemptToSpawnWorker() {
final WorkerInfo worker = idleWorkers.poll();
if (worker != null) {
// In case getAssignment was not able to deliver an assignment for our worker,
// the worker instance is placed back into #ifleWorkers as part of this method
final Assignment forSpawn = reserve.getAssignment(worker);
if (forSpawn != null) {
if (TRACE) {
System.err.println(here() + " spawned worker(" + worker.id + ")");
}
uncountedAsyncAt(here(), () -> worker(worker, forSpawn));
} else {
// Work could not be taken from the reserve
// We place the workerInfo back into the #workers collection so that it gets
// another chance to be spawned later
// idleWorkers.add(worker); FIXME delete this part
// We signal all the workers that the #reserve is empty and that every worker
// needs to place some work back into the reserve
// reserveWasEmptied(); FIXME delete this part
}
}
}
/**
* Procedure called by the thread representing the presence of work for a given
* operation on this host when all the local assignments have completed and it
* established lifelines on neighbor nodes to obtain some work.
* <p>
* As under our current implementation, lifelines are established on a "per
* collection" basis, it is possible that an operation which terminates does not
* actually establish any new lifelines. This is the case if multiple operations
* on the same collection are ongoing and a previous operation terminated
* before, establishing the lifelines before this new operation did.
*
* @param c the collection on which the operation that has terminated operated
*/
void establishingLifelineOnRemoteHost(@SuppressWarnings("rawtypes") DistributedCollection c) {
final LifelineToken token = new LifelineToken(c, here());
final ConcurrentHashMap<Place, AtomicInteger> lifelineStatus = lifelineEstablished.get(c);
for (final Map.Entry<Place, AtomicInteger> pair : lifelineStatus.entrySet()) {
// An atomic check is made to avoid establishing a lifeline redundantly
if (pair.getValue().compareAndSet(LIFELINE_NOT_ESTABLISHED, LIFELINE_ESTABLISHED)) {
// This thread set the flag to "established", it can now make the assynchronous
// call that actually establishes the lifeline on the remote host
// asyncAt(pair.getKey(), () -> {
uncountedAsyncAt(pair.getKey(), () -> {
try {
if (TRACE) {
System.err.println(token.place + " established lifeline on " + here() + " for collection "
+ token.collection);
}
lifelineThieves.add(token);
} catch (final Exception e) {
e.printStackTrace();
}
});
}
}
}
/**
* Procedure called by a remote host when giving some work as part of a lifeline
* answer.
*
* @param token token containing the information about the collection and the
* place making the answer
* @param stolen collection of assignments that will now be under this host
* responsibility
* @param numbers the number of assignments with work that are given for each
* operation that the asignments may contain
*/
void lifelineAnswer(final LifelineToken token, final ArrayList<Assignment> stolen,
@SuppressWarnings("rawtypes") final HashMap<GlbOperation, Integer> numbers,
@SuppressWarnings("rawtypes") final HashMap<GlbOperation, Finish> finish) {
if (TRACE) {
System.err.println(here() + " received " + stolen.size() + " assignments from " + token.place);
}
// The lifeline answer has just been received, we set the lifeline tracker back
// to "not established"
final ConcurrentHashMap<Place, AtomicInteger> lifelinesForCollection = lifelineEstablished
.get(token.collection);
final AtomicInteger state = lifelinesForCollection.get(token.place);
final boolean resetLifeline = state.compareAndSet(LIFELINE_ESTABLISHED, LIFELINE_NOT_ESTABLISHED);
assertTrue(resetLifeline); // Check that the previous operation worked properly
// Verification that the operations contained by the lifeline answer has been
// initialized on this host.
/*
* Sometimes a lifeline answer will bring assignments that have the progress
* tracking mechanisms for a newly avaialble operation whose #newOperation call
* was made on the remote host but it has not been done yet on this host. In
* this case, this preparation needs to be done before merging any assignments
* coming from the lifeline answer
*/
final GlbTask glbTask = reserve.allTasks.get(token.collection);
synchronized (this) {
for (@SuppressWarnings("rawtypes")
final GlbOperation op : numbers.keySet()) {
if (null == finishes.get(op)) {
// this lifeline answer is the first activity that brings this new operation to
// the place.
prepareForNewOperation(op, finish.get(op));
}
}
}
// Merge the assignments
glbTask.mergeAssignments(numbers, stolen);
/*
* I know, this is weird. But allow me to explain. This guarantees that workers
* which may have failed to take some work from the reserve (before assignments
* from this lifeline answer were merged into it) have had their WorkerInfo
* instance placed back into the #idleWorkers collection. See method
* WorkReserve#getAssignment.
*
* In the rare (but not impossible) case where all the workers fail to get work
* from the reserve just before this lifeline answer merges new assignments,
* this guarantees that the calls to #attempToSpawnWorker that are made inside
* method #operationActivity will find these WorkerInfo instances and be able to
* spawn them.
*/
reserve.lock.writeLock().lock();
reserve.lock.writeLock().unlock();
// For each operation that was transmitted, place an asynchronous task that will
// wait till operation termination
for (@SuppressWarnings("rawtypes")
final GlbOperation op : numbers.keySet()) {
final Finish f = finishes.get(op);
reserve.tasksWithWork.put(op, glbTask);
asyncArbitraryFinish(here(), () -> operationActivity(op), f);
}
}
/**
* Registers a new operation as available to workers and starts workers if they
* were not already launched. The thread that called this method is then blocked
* until the computation has terminated globally/locally ?
*
* @param op the operation newly available to process
*/
@SuppressWarnings("rawtypes")
void newOperation(GlbOperation op) {
// Prepare the data structure for tracking the state of the outgoing lifelines
// if it was not already created. A mapping is actually created if the operation
// given as parameter operates on a distributed collection which has not
// previously had any operations run on it.
lifelineEstablished.computeIfAbsent(op.collection, col -> {
final ConcurrentHashMap<Place, AtomicInteger> map = new ConcurrentHashMap<>();
Lifeline l;
try {
if (op.lifelineClass == null) {
l = LifelineFactory.newLifeline(col.placeGroup());
} else {
l = LifelineFactory.newLifeline(col.placeGroup(), op.lifelineClass);
}
} catch (final Exception e) {
e.printStackTrace();
System.err.println("Faced a " + e + " when attempting to initialize a new lifeline strategy. Using "
+ Loop.class.getName() + " instead");
l = new Loop(col.placeGroup());
}
for (final Place p : l.lifeline(here())) {
map.put(p, new AtomicInteger(0));
}
return map;
});
boolean localWorkCreated;
synchronized (this) {
// Check for the rare case in which a lifeline answer might have done all this
// initialization, in which case this thread is not needed and can return
// immediately
if (null != finishes.get(op)) {
return;
}
// Prepare the GlbTask of the relevant distributed collection
localWorkCreated = prepareForNewOperation(op, currentFinish());
}
// If there was some work created as a result, launch a worker / block until
// completion
// There can be cases where a local handle does not hold any element, in which
// case we simply let this thread return.
if (localWorkCreated) {
// Launch workers / block until local termination
operationActivity(op);
} else {
// Go on to work stealing directly.
establishingLifelineOnRemoteHost(op.collection);
}
}
/**
* Main activity of an operation. This method is called when an operation
* becomes available for the global load balancer. It consists of several steps:
* <ol>
* <li><em>Initialization phase</em> which prepares the GlbTask and other
* members needed for later synchronization for this new operation
* <li><em>Computation phase</em> during which this activity blocks on a
* {@link java.util.concurrent.ForkJoinPool.ManagedBlocker} while workers that
* are not bound to any activity perform the computation. When this operation
* has been completed locally, this activity will be woken up to perform
* work-stealing operations through its lifelines.
* <li><em>Lifeline phase</em> in which phase this thread will asynchronously
* signal the neighboring places that it needs work.
* </ol>
*
* @param op the operation which is being launched on this host
*/
void operationActivity(@SuppressWarnings("rawtypes") GlbOperation op) {
attemptToSpawnWorker();
// As this activity is going to block, it should unblock any potential worker
// that yielded its execution to allow this activity to run
workerYieldLock.unblock();
// Block until operation completes
try {
// Take the appropriate SemaphoreBlocker to be waken up by workers
final OperationBlocker mb;
synchronized (blockers) {
mb = blockers.get(op);
}
if (mb.allowedToBlock()) {
if (TRACE) {
System.err.println(here() + ": thread waiting on operation " + op);
}
ForkJoinPool.managedBlock(mb);
if (TRACE) {
System.out.println(here() + ": resumed after waiting on " + op);
}
} else {
// There is already another thread blocking on this semaphore.
// This thread was a lifeline answer and can return safely
if (TRACE) {
System.err.println(here() + ": thread already waiting on operation " + op);
}
return;
}
} catch (final InterruptedException e) {
System.err.println(
"InterruptedException received in operation activity, this should not happen as the managed blocker implementation does not throw this error");
e.printStackTrace();
}
// The operation has completed. Moving on to establishing lifelines.
establishingLifelineOnRemoteHost(op.collection);
}
/**
* Sub-routine which is part of {@link #newOperation(GlbOperation)} and
* {@link #lifelineAnswer(LifelineToken, ArrayList, HashMap)}.
*
* @param op the new operation which is now available on the host
* @param f the finish which is tracking the completion of this operation
* @return true if the initialization of the assignments on this host resulted
* in work being now available to workers
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean prepareForNewOperation(final GlbOperation op, Finish f) {
// Put members needed for global termination management
// This part is made synchronized since the underlying collections are not
// protected against concurrent accesses.
synchronized (blockers) {
blockers.put(op, new OperationBlocker());
}
finishes.put(op, f);
// Prepare every worker with the special initialization (if needed)
if (op.workerInit != null) {
for (final WorkerInfo wi : workers) {
op.workerInit.accept(wi);
}
}
// Prepare the GlbTask of the relevant distributed collection
return reserve.newOperation(op);
}
/**
* Helper procedure used to signal to all workers that they need to place some
* work back into the {@link #reserve} if they can.
* <p>
* This procedure is called by whichever worker notices the fact the
* {@link #reserve} got empty first. It may be called by multiple workers
* concurrently without any adverse effect.
*/
/*
* It may be possible that if two successive workers are unable to get some work
* from the reserve and start signaling this fact to other workers that some
* workers may find themselves repeatedly asked to place some work in the
* reserve despite the fact they have just done so. This could be alleviated if
* more restrictive synchronization mechanisms were used but I don't think these
* would actually bring about any benefit.
*/
private void reserveWasEmptied() {
for (int i = 0; i < MAX_WORKERS; i++) {
feedReserveRequested.set(i, 1);
}
}
/**
* Sub-routine used to signal to the local work reserve and the local operation
* thread that an operation has had all its local assignments completed locally.
*
* @param op the operation whose assignments were entirely processed
*/
void signalLocalOperationCompletion(@SuppressWarnings("rawtypes") GlbOperation op) {
// We signal the local reserve instance that this operation has completed
// locally.
reserve.tasksWithWork.remove(op);
// We unblock the operation thread that was waiting
// This part is protected against concurrent accesses as a concurrent
// GlbComputer#newOperation call may insert a new value into the map, causing
// some troubles.
synchronized (blockers) {
blockers.get(op).unblock();
}
}
/**
* Main routine of a worker in the distributed collections library global load
* balancer
* <p>
* In this load balancer, a worker is not bound to any single task. Instead, it
* takes work from any available operation and continues running until it
* completely runs out of work. Workers are spawned with an initial assignment.
* The main steps of this procedure are:
* <ol>
* <li>Process a part of the work fragment the worker has, as defined by
* {@link #granularity}
* <li>Attempt to spawn a new parallel worker from the work presumably available
* in the {@link #reserve}
*
* <li>Check for any load balancing operation is needed
* <li>If other asynchronous activities are waiting, yield its execution to
* allow them to execute NOT IMPLEMENTED YET
* </ol>
* These steps are repeated in a loop until the Assignment held by the worker is
* completed. When the worker exits that loop, it attempts to get a new
* Assignment from the {@link #reserve}. If successful, repeat the procedure
* from step 1. If unsuccessful, the worker stops.
*
* @param id {@link Integer} used to identify workers. The type of this
* parameter may be changed later to be able to attach some runtime
* facilities to the worker specific to some operations. For example
* when dealing with operation which consist in placing the results
* into {@link Bag}, we may want to initialize a single unique List
* for each worker with this list being re-used for multiple
* Assignments on this operation.
* @param a initial assignment to be processed by this worker
*/
private void worker(WorkerInfo worker, Assignment a) {
logger.put(LOGKEY_WORKER, LOG_WORKER_STARTED, Long.toString(System.nanoTime()));
try {
for (;;) {
worker.currentOperation = a.chooseOperationToProgress();
while (a.process(granularity, worker, worker.currentOperation)) { // STEP 1: Work is done here
// STEP 2: Attempt to spawn a new worker from work present in the reserve
attemptToSpawnWorker();
// STEP 3: Load Balance operations
if (feedReserveRequested.get(worker.id) == 1) { // If feeding the reserve is requested
if (a.isSplittable(granularity)) {
a.splitIntoGlbTask();
feedReserveRequested.set(worker.id, 0);
worker.assignmentSplit++; // Log the action
} else {
worker.assignmentUnabledToSplit++; // Log
}
}
// STEP 4: Yield if need be
TimeoutBlocker l;
if (POOL.hasQueuedSubmissions() && (l = workerAvailableLocks.poll()) != null) {
l.reset();
logger.put(LOGKEY_WORKER, LOG_WORKER_YIELDING, Long.toString(System.nanoTime()));
ForkJoinPool.managedBlock(l);
logger.put(LOGKEY_WORKER, LOG_WORKER_RESUMED, Long.toString(System.nanoTime()));
workerAvailableLocks.add(l);
}
// STEP 5: Answer lifeline thieves if there are any and this host is capable of
// doing so
final LifelineToken steal = lifelineThieves.poll();
if (steal != null) {
// Check if the target collection has some assignments left for the target
// collection
GlbTask g;
// FIXME when using some "After" dependencies, it is possible for a lifeline
// request to be received at a time the corresponding operation has not yet gone
// through method #newOperation. In this case,
// reserve.allTasks.get(steal.collection) will return null. We need to account
// for that edge case where the lifeline answer is actually the first to bring
// the information to this place that a new operation is available for
// computation.
if ((g = reserve.allTasks.get(steal.collection)) != null && g.answerLifeline(steal)) {
logger.put(LOGKEY_WORKER, LOG_LIFELINE_ANSWERED, Long.toString(System.nanoTime()));
} else {
logger.put(LOGKEY_WORKER, LOG_LIFELINE_NOT_ANSWERED, Long.toString(System.nanoTime()));
lifelineThieves.add(steal);
}
}
}
// Assignment#process method returned false. This worker needs a new assignment.
// Trying to obtain a new assignment determines if the worker continue or stop
if ((a = reserve.getAssignment(worker)) == null) {
// The reserve returned null, this worker will stop
// FIXME potential problem here where the attempt at taking work from the
// reserve and placing the worker info back into the idleWorker collection
// should be done atomically to protect against concurrent #lifleineAnswer made.
// In the current state it is possible (though unlikely) for work to be merged
// after workers have failed to take some the operationActivity which attempts
// to spawn a worker to make this attempt BEFORE worker's info was placed back
// into the collection.
workerYieldLock.unblock(); // As this worker quits, any waiting worker can resume
// if (TRACE) {
// System.err.println(here() + " worker(" + worker.id + ") stopped --- " + idleWorkers.size()
// + " stopped workers");
// }
logger.put(LOGKEY_WORKER, LOG_WORKER_STOPPED, Long.toString(System.nanoTime()));
return;
} else {
// This worker was able to take an assignment from the reserve. It is not
// starting the for(;;) loop again.
worker.tookFromReserve++;
}
}
} catch (final Throwable t) {
System.err.println("Worker number " + worker.id + " on " + here() + " sufferred a " + t);
t.printStackTrace();
}
}
}