GlobalLoadBalancer.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.glb;

import static apgas.Constructs.*;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ForkJoinPool;

import apgas.SerializableJob;
import handist.collections.dist.DistLog;
import handist.collections.dist.TeamedPlaceGroup;
import handist.collections.glb.GlbOperation.OperationCompletionManagedBlocker;
import handist.collections.glb.GlbOperation.State;

/**
 * Class presenting the static method under which GLB operations can operate.
 * Programmers may choose to statically impoirt this class to avoid the lengthy
 * {@code GlobalLoadBalancer#underGLB(SerializableJob)} call in their program.
 *
 * @author Patrick Finnerty
 *
 */
public class GlobalLoadBalancer {

    /**
     * Key used to gather the events related to the GLB system
     */
    public static final String LOGKEY_GLB = "glb_event";

    /**
     * Key used to gather the events related to the creation, interruption, and
     * termination of asynchronous worker activities
     */
    public static final String LOGKEY_WORKER = "glb_worker";

    /**
     * Message used to record the number of workers initalized on a host
     */
    public static final String LOG_INITIALIZED_WORKERS = "WorkerInitialized";

    /**
     * Message used to record the moment at which the GlbComputer instance is
     * initialized on a host
     */
    public static final String LOG_INITIALIZED_AT_NANOTIME = "InitializedAtTime";

    /**
     * Message used to signal that a worker has stopped
     */
    public static final String LOG_WORKER_STOPPED = "Worker Stopped";

    /**
     * Message used to record that a worker was unable to answer a lifeline
     */
    public static final String LOG_LIFELINE_NOT_ANSWERED = "Lifeline not answered";

    /**
     * Message used to record that a worker answered a lifeline
     */
    public static final String LOG_LIFELINE_ANSWERED = "Lifeline answered";

    /**
     * Message used to record that a worker resumed after yielding
     */
    public static final String LOG_WORKER_RESUMED = "Worker resumed";

    /**
     * Message used to record that a started yielding so that other activites may
     * run on the host
     */
    public static final String LOG_WORKER_YIELDING = "Worker yielding";

    /**
     * Message used to record that a worker started running
     */
    public static final String LOG_WORKER_STARTED = "Worker Started";

    public static final String LOGKEY_UNDER_GLB = "UnderGlb";

    public static final String LOG_PROGRAM_STARTED = "ProgramStarted";
    public static final String LOG_PROGRAM_ENDED = "ProgramEnded";

    /**
     * Singleton
     */
    static GlobalLoadBalancer glb = null;

    /**
     * Member keeping the logger instance of the previously executed GLB program. It
     * can be retrieved after method {@link #underGLB(SerializableJob)} has
     * completed through method #getPreviousLog();
     */
    static DistLog previousLog = null;

    /**
     * Returns the {@link DistLog} containing the information logged during the
     * previous {@link #underGLB(SerializableJob)} method call.
     * <p>
     * The logs are returned in their non-gathered state, meaning the entries are
     * still distributed across all hosts. Call method
     * {@link DistLog#globalGather()} on the returned {@link DistLog} instance to
     * gather all the logs on the caller host.
     */
    public static DistLog getPreviousLog() {
        return previousLog;
    }

    /**
     * Triggers the erasure of all tracking information on every host.
     * <p>
     * Experimental. Can only be called safely if there are no ongoing GLB
     * computation.
     */
    public static void reset() {
        TeamedPlaceGroup.getWorld().broadcastFlat(() -> {
            final GlbComputer computer = GlbComputer.getComputer();
            computer.reserve.reset();
        });
    }

    /**
     * Helper method used to launch the computations staged into the global load
     * balancer. Is not blocking.
     * <p>
     * You would only need to call this method if you have some tasks other than the
     * GLB computations that should be executed concurrently. In such a situation,
     * stage the various GLB operations by calling their
     * <em>collection.GLB.method()</em> and setup the potential
     * {@link DistFuture#after(DistFuture)} completion dependencies. Then, call this
     * method. In the remainder of the ongoing block, you lay out the other
     * activities that should run while the GLB computations previously staged are
     * being computed.
     */
    public static void start() {
        while (!glb.operationsStaged.isEmpty()) {
            final GlbOperation<?, ?, ?, ?, ?, ?> op = glb.operationsStaged.poll();
            boolean needsToBeLaunched;
            synchronized (op) {
                op.state = GlbOperation.State.RUNNING;
                needsToBeLaunched = !op.hasDependencies();
            }

            if (needsToBeLaunched) {
                async(() -> op.compute());
            }
        }
    }

    /**
     * Helper method used to launch the operations submitted to the global load
     * balancer. In addition, this method will only return when the specified
     * operation completes
     *
     * @param operation the operation whose global termination is be waited upon
     */
    static void startAndWait(GlbOperation<?, ?, ?, ?, ?, ?> operation) {
        // Install a hook on the operation on which we are going to wait
        OperationCompletionManagedBlocker b = null;
        synchronized (operation) {
            if (operation.state != State.TERMINATED) {
                // Install a hook to "unblock the SempaphoreBlocker
                b = new OperationCompletionManagedBlocker();
                final OperationCompletionManagedBlocker blocker = b; // final field for lambda expression
                operation.addHook(() -> blocker.unblock());
            }
        }

        start(); // Start all other operations that were submitted to the GLB

        if (b != null) { // b is different from null iff the operation has not yet terminated
            try {
                // Block until woken up by the installed hook
                ForkJoinPool.managedBlock(b);
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Method to call to launch a GLB program. A single call to this method can be
     * made at the time. If a second call is made while a program is under way, this
     * second call will be blocked.
     *
     * @param logger  existing {@link DistLog} into which the logged events of the
     *                GLB will be kept
     * @param program program to launch under GLB
     * @return collection of all the Exceptions that occurred during the glb program
     */
    public static ArrayList<Exception> underGLB(DistLog logger, SerializableJob program) {
        if (GlobalLoadBalancer.glb == null) {
            // Logger instance for the entire GLB
            previousLog = logger;
            logger.put(LOGKEY_UNDER_GLB, LOG_PROGRAM_STARTED, Long.toString(System.nanoTime()));

            // Create a new GlobalLoadBalancer instance that will handle the program
            glb = new GlobalLoadBalancer();

            // Also initialize the GlbComputer on all hosts before trying to submit anything
            GlbComputer.initializeComputer(logger);

            final ArrayList<Exception> exc = new ArrayList<>();
            finish(() -> {
                try {
                    program.run();
                    start(); // This launches any submitted operation that have not been explicitly launched
                    // inside the provided program
                } catch (final Exception e) {
                    System.err.println("ERROR during GLB program execution");
                    e.printStackTrace();
                    exc.add(e);
                }
            });
            logger.put(LOGKEY_UNDER_GLB, LOG_PROGRAM_ENDED, Long.toString(System.nanoTime()));

            // Destroy the singletons for new ones to be created next time this method is
            // called
            glb = null;

            GlbComputer.destroyGlbComputer();
            // Also reset the priority for the future GlbOperations to be created
            GlbOperation.nextPriority = 0;

            return exc;
        } else {
            throw new IllegalStateException("Method was called even though another glb program is already running");
        }
    }

    /**
     * Method to call to launch a GLB program. A single call to this method can be
     * made at the time. If a second call is made while a program is under way, this
     * second call will be blocked.
     *
     * @param program program to launch under GLB
     * @return collection of all the Exceptions that occurred during the glb program
     */
    public static ArrayList<Exception> underGLB(SerializableJob program) {
        return underGLB(new DistLog(), program);
    }

    /**
     * Collection containing the operation that have been submitted to the GLB as
     * part of a program given to {@link #underGLB(SerializableJob)}.
     */
    @SuppressWarnings("rawtypes")
    private final LinkedList<GlbOperation> operationsStaged;

    /**
     * Private constructor to preserve the singleton pattern
     */
    private GlobalLoadBalancer() {
        operationsStaged = new LinkedList<>();
    }

    /**
     * Makes the global load balancer start the operation given as second argument
     * after the operation represented by the first argument had completed globally.
     *
     * @param before operation to terminate before the second argument can start
     * @param then   operation to start after the first argument has completed
     */
    void scheduleOperationAfter(GlbOperation<?, ?, ?, ?, ?, ?> before, GlbOperation<?, ?, ?, ?, ?, ?> then) {
        GlbOperation.makeDependency(before, then);
    }

    /**
     * Helper method used by the various Glb handles of the distributed collections
     * to submit an operation to the Glb.
     *
     * @param operation operation to perform on a distributed collection
     */
    void submit(@SuppressWarnings("rawtypes") GlbOperation operation) {
        operationsStaged.add(operation);
    }
}