GlobalLoadBalancer.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.glb;

import static apgas.Constructs.*;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.ForkJoinPool;

import apgas.SerializableJob;
import handist.collections.glb.GlbOperation.OperationCompletionManagedBlocker;
import handist.collections.glb.GlbOperation.State;

/**
 * Class presenting the static method under which GLB operations can operate.
 * Programmers may choose to statically impoirt this class to avoid the lengthy
 * {@code GlobalLoadBalancer#underGLB(SerializableJob)} call in their program.
 *
 * @author Patrick Finnerty
 *
 */
public class GlobalLoadBalancer {
    /**
     * Singleton
     */
    static GlobalLoadBalancer glb = null;

    /**
     * Helper method used to launch the computations submitted to the global load
     * balancer.
     */
    static void start() {
        while (!glb.operationsStaged.isEmpty()) {
            final GlbOperation<?, ?, ?, ?, ?> op = glb.operationsStaged.poll();
            boolean needsToBeLaunched;
            synchronized (op) {
                op.state = GlbOperation.State.RUNNING;
                needsToBeLaunched = !op.hasDependencies();
            }

            if (needsToBeLaunched) {
                async(() -> op.compute());
            }
        }
    }

    /**
     * Helper method used to launch the operations submitted to the global load
     * balancer. In addition, this method will only return when the specified
     * operation completes
     *
     * @param operation the operation whose global termination is be waited upon
     */
    static void startAndWait(GlbOperation<?, ?, ?, ?, ?> operation) {
        // Install a hook on the operation on which we are going to wait
        OperationCompletionManagedBlocker b = null;
        synchronized (operation) {
            if (operation.state != State.TERMINATED) {
                // Install a hook to "unblock the SempaphoreBlocker
                b = new OperationCompletionManagedBlocker();
                final OperationCompletionManagedBlocker blocker = b; // final field for lambda expression
                operation.addHook(() -> blocker.unblock());
            }
        }

        start(); // Start all other operations that were submitted to the GLB

        if (b != null) { // b is different from null iff the operation has not yet terminated
            try {
                // Block until woken up by the installed hook
                ForkJoinPool.managedBlock(b);
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Method to call to launch a GLB program. A single call to this method can be
     * made at the time. If a second call is made while a program is under way, this
     * second call will be blocked.
     *
     * @param program program to launch under GLB
     * @return collection of all the Exceptions that occurred during the glb program
     */
    public static ArrayList<Exception> underGLB(SerializableJob program) {
        if (GlobalLoadBalancer.glb == null) {
            // Create a new GlobalLoadBalancer instance that will handle the program
            glb = new GlobalLoadBalancer();

            // Also initialize the GlbComputer on all hosts before trying to submit anything
            GlbComputer.getComputer();

            final ArrayList<Exception> exc = new ArrayList<>();
            finish(() -> {
                try {
                    program.run();
                    start(); // This launches any submitted operation that have not been explicitly launched
                    // inside the provided program
                } catch (final Exception e) {
                    System.err.println("ERROR during GLB program execution");
                    e.printStackTrace();
                    exc.add(e);
                } finally {
                    glb = null; // Destroy the singleton for a new one to be created next time this method is
                    // called
                }
            });
            return exc;
        } else {
            throw new IllegalStateException("Method was called even though another glb program is already running");
        }
    }

    /**
     * Collection containing the operation that have been submitted to the GLB as
     * part of a program given to {@link #underGLB(SerializableJob)}.
     */
    @SuppressWarnings("rawtypes")
    private final LinkedList<GlbOperation> operationsStaged;

    /**
     * Private constructor to preserve the singleton pattern
     */
    private GlobalLoadBalancer() {
        operationsStaged = new LinkedList<>();
    }

    /**
     * Makes the global load balancer start the operation given as second argument
     * after the operation represented by the first argument had completed globally.
     *
     * @param before operation to terminate before the second argument can start
     * @param then   operation to start after the first argument has completed
     */
    void scheduleOperationAfter(GlbOperation<?, ?, ?, ?, ?> before, GlbOperation<?, ?, ?, ?, ?> then) {
        GlbOperation.makeDependency(before, then);
    }

    /**
     * Helper method used by the various Glb handles of the distributed collections
     * to submit an operation to the Glb.
     *
     * @param operation operation to perform on a distributed collection
     */
    void submit(@SuppressWarnings("rawtypes") GlbOperation operation) {
        operationsStaged.add(operation);
    }
}