DistColGlb.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.glb;

import java.io.Serializable;
import java.util.function.Consumer;

import apgas.util.GlobalID;
import handist.collections.Chunk;
import handist.collections.ChunkedList;
import handist.collections.LongRange;
import handist.collections.dist.DistBag;
import handist.collections.dist.DistChunkedList;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
import handist.collections.function.SerializableFunction;
import handist.collections.function.SerializableLongTBiConsumer;
import handist.collections.function.SerializableSupplier;
import handist.collections.glb.DistColGlbTask.DistColLambda;
import handist.collections.glb.GlbComputer.WorkerInfo;
import handist.collections.reducer.Reducer;

/**
 * This class proposes various operations that operate on all the elements of a
 * {@link DistChunkedList} as part of a GLB program. Any call to methods of this
 * class should be made from within a
 * {@link GlobalLoadBalancer#underGLB(apgas.SerializableJob)} method.
 *
 * @author Patrick Finnerty
 * @param <T> type of the elements contained in the underlying distributed
 *            collection
 */
public class DistColGlb<T> extends AbstractGlbHandle implements Serializable {

    /**
     * Runtime exception used when a {@link Throwable} is thrown from a closure
     * given as parameter of a Glb operation.
     *
     * @author Patrick Finnerty
     *
     */
    public static class DistColGlbError extends RuntimeException implements Serializable {

        /** Serial Version UID */
        private static final long serialVersionUID = -6284960496356484016L;

        /** Index in the {@link DistChunkedList} on which a problem was encountered */
        public final long index;

        /**
         * Range on which the assignment was operating at the time the exception was
         * encountered
         */
        public final LongRange assignmentRange;

        /**
         * Constructor
         * <p>
         * This constructor is made private as instances of this class do not need to be
         * created outside of {@link DistColGlb}.
         *
         * @param lr range on which the assignment was operating
         * @param l  index at which the throwable was thrown
         * @param t  the {@link Throwable} thrown by the user-supplied closure
         */
        private DistColGlbError(LongRange lr, long l, Throwable t) {
            super(t.getMessage() + " at index " + l + " in assignment on range " + lr, t);
            assignmentRange = new LongRange(lr.from, lr.to);
            index = l;
        }
    }

    /** Serial Version UID */
    private static final long serialVersionUID = 612021438330155918L;

    /** Underlying collection on which the operations of this class operate */
    DistChunkedList<T> col;

    /**
     * Constructor
     *
     * @param c collection on which this handle will operate
     */
    public DistColGlb(DistChunkedList<T> c) {
        col = c;
    }

    /**
     * Applies the specified action to all the elements contained in the
     * {@link DistChunkedList} and returns the underlying collection
     *
     * @param action action to perform on each element
     * @return future representing this "forEach" operation which will return the
     *         underlying {@link DistChunkedList} collection upon termination
     */
    public DistFuture<DistChunkedList<T>> forEach(SerializableConsumer<T> action) {
        final GlobalLoadBalancer glb = getGlb();

        // Initialize the future returned to the programmer in the underGLB method
        // In this operation, the collection involved is the handle itself
        final DistFuture<DistChunkedList<T>> future = new DistFuture<>(col);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // We transform the action to accept a LongRange as parameter, retrieve
        // the T at the each index, and apply the lambda given as parameter to these Ts
        // The second argument (WorkerService) provided by the GLB runtime is unused for
        // this operation
        final DistColLambda<T> realAction = (chunk, startIndex, endIndex, ws) -> {
            for (long l = startIndex; l < endIndex; l++) {
                try {
                    action.accept(chunk.get(l));
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(new LongRange(startIndex, endIndex), l, t));
                }
            }
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistChunkedList<T>, T, LongRange, LongRange, DistChunkedList<T>, DistColLambda<T>> operation = new GlbOperation<>(
                col, realAction, future, initGlbTask, null, lifelineClass);
        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    /**
     * Applies the specified action to all the elements contained in the
     * {@link DistChunkedList} and returns the underlying collection
     *
     * @param action action to perform on each element, taking the index and the
     *               object as parameter
     * @return future representing this "forEach" operation which will return the
     *         underlying {@link DistChunkedList} collection upon termination
     */
    public DistFuture<DistChunkedList<T>> forEach(SerializableLongTBiConsumer<T> action) {
        final GlobalLoadBalancer glb = getGlb();

        // Initialize the future returned to the programmer in the underGLB method
        // In this operation, the collection involved is the handle itself
        final DistFuture<DistChunkedList<T>> future = new DistFuture<>(col);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // We retrieve the T at the each index, and apply the lambda given as parameter
        // to these Ts.
        // The WorkerService argument provided by the GLB runtime is unused for
        // this operation.
        final DistColLambda<T> realAction = (chunk, from, to, ws) -> {
            for (long l = from; l < to; l++) {
                try {
                    action.accept(l, chunk.get(l));
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(new LongRange(from, to), l, t));
                }
            }
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistChunkedList<T>, T, LongRange, LongRange, DistChunkedList<T>, DistColLambda<T>> operation = new GlbOperation<>(
                col, realAction, future, initGlbTask, null, lifelineClass);
        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    /**
     * GLB operation which creates a new {@link DistChunkedList} using the mapping
     * operation provided as parameter. The resulting {@link DistChunkedList} will
     * contain the same indices as this collection. The value stored at each index
     * of the resulting collection will be the result of the provided mapping
     * operation for this collection at the same index. As part of the GLB consists
     * in moving entries from place to place, it is possible for the distribution of
     * the resulting collection and this collection to differ.
     *
     * @param <U> type of the result of the map function provided as parameter
     * @param map function which takes an object T as input and returns a instance
     *            of type U
     * @return a {@link DistFuture}
     */
    public <U> DistFuture<DistChunkedList<U>> map(SerializableFunction<T, U> map) {
        final GlobalLoadBalancer glb = getGlb();

        // Create new collection to contain the result
        final DistChunkedList<U> resultCollection = new DistChunkedList<>(col.placeGroup());

        // Adapt the provided map to represent what the glb workers will actually
        // perform.
        // The second argument (WorkerService) provided by the GLB runtime is unused for
        // this operation
        final DistColLambda<T> realAction = (chunk, from, to, ws) -> {
            // First, initialize a Chunk to place the mappings
            final LongRange lr = new LongRange(from, to);
            final Chunk<U> c = new Chunk<>(lr);
            resultCollection.add(c);

            // Iterate on the elements
            for (long l = from; l < to; l++) {
                try {
                    final T t = chunk.get(l);
                    final U u = map.apply(t);
                    c.set(l, u);
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(lr, l, t));
                }
            }
        };

        // Initialize the future returned to the programmer in the underGLB method
        // In this operation, the collection involved is the handle itself
        final DistFuture<DistChunkedList<U>> future = new DistFuture<>(resultCollection);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistChunkedList<T>, T, LongRange, LongRange, DistChunkedList<U>, DistColLambda<T>> operation = new GlbOperation<>(
                col, realAction, future, initGlbTask, null, lifelineClass);

        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    @SuppressWarnings("unchecked")
    public <R extends Reducer<R, T>> DistFuture<R> reduce(final R reducer) {
        final GlobalLoadBalancer glb = getGlb();
        final GlobalID gid = new GlobalID();
        final R globalReducer = reducer;

        final SerializableConsumer<WorkerService> workerInit = (w) -> w.attachOperationObject(gid,
                globalReducer.newReducer());

        final DistColLambda<T> realAction = (chunk, from, to, ws) -> {
            final R workerLocalReducer = (R) ws.retrieveOperationObject(gid);

            for (long l = from; l < to; l++) {
                try {
                    workerLocalReducer.reduce(chunk.get(l));
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(new LongRange(from, to), l, t));
                }
            }
        };

        final DistFuture<R> future = new DistFuture<>(globalReducer);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        final GlbOperation<DistChunkedList<T>, T, LongRange, LongRange, R, DistColLambda<T>> operation = new GlbOperation<>(
                col, realAction, future, initGlbTask, workerInit, lifelineClass);

        glb.submit(operation);

        // This operation needs a specific hook after all the entries have been
        // traversed. We need to reduce all the R instances that were created back into
        // a single instance on each host, and perform the global reduction such that
        // the given reducer contains the global result of the operation.
        operation.addHook(() -> {
            col.placeGroup().broadcastFlat(() -> {
                final R localReducer = reducer; // (R) gid.getHere();
                for (final WorkerInfo wi : GlbComputer.getComputer().workers) {
                    localReducer.merge((R) wi.workerBoundObjects.remove(gid));
                }
                localReducer.teamReduction(col.placeGroup());
            });
        });

        return future;
    }

    /**
     * GLB variant of
     * {@link ChunkedList#parallelForEach(java.util.function.BiConsumer, handist.collections.ParallelReceiver)}
     *
     * @param <U>              type of elements accepted by the parallel receiver
     * @param action           user-specified action, generally consisting of
     *                         extracting some "U" object from an element of the
     *                         distributed collection and placing it in the Consumer
     *                         given as second parameter. Unlike
     *                         {@link #toBag(SerializableFunction)}, the present
     *                         variant allows for some intermediary checks and
     *                         choice between placing elements in the bag rather
     *                         than directly applying a function to each element of
     *                         the collection and placing the obtained object in the
     *                         bag directly.
     * @param resultCollection {@link DistBag} instance into which the various U
     *                         elements are placed
     * @return {@link DistFuture} waiting on the completion of this operation and
     *         returning the {@link DistBag} provided as parameter as the result
     */
    public <U> DistFuture<DistBag<U>> toBag(SerializableBiConsumer<T, Consumer<U>> action,
            DistBag<U> resultCollection) {
        final GlobalLoadBalancer glb = getGlb();

        // Check that the provided bag is defined on the same place group as the
        // distributed collection
        if (resultCollection.placeGroup != col.placeGroup()) {
            throw new IllegalArgumentException(
                    "The provided bag should be defined on the same place group as the underlying DistributedChunkedList");
        }

        // Initialization for workers to be made before the computation starts.
        // This will bind a handle to place the U elements into the DistBag to each
        // worker in the system.
        final SerializableConsumer<WorkerService> workerInit = (w) -> w.attachOperationObject(resultCollection,
                resultCollection.getReceiver());

        // Adapt the provided function to represent what the glb workers will actually
        // perform
        final DistColLambda<T> realAction = (chunk, from, to, ws) -> {
            // First, retrieve the consumer of U which is bound to the worker
            // The object used as key to retrieve the object bound to workers is the result
            // collection
            @SuppressWarnings("unchecked")
            final Consumer<U> destination = (Consumer<U>) ws.retrieveOperationObject(resultCollection);

            // Iterate on the elements
            for (long l = from; l < to; l++) {
                try {
                    final T t = chunk.get(l);
                    action.accept(t, destination);
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(new LongRange(from, to), l, t));
                }
            }
        };

        // Initialize the future returned to the programmer in the underGLB method
        // The result of this operation is the DistBag "resultCollection"
        final DistFuture<DistBag<U>> future = new DistFuture<>(resultCollection);

        // Initializer for GlbTask of this DistCol in case it is not yet initialized
        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistChunkedList<T>, T, LongRange, LongRange, DistBag<U>, DistColLambda<T>> operation = new GlbOperation<>(
                col, realAction, future, initGlbTask, workerInit, lifelineClass);

        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    /**
     * Applies the given function to every element contained in this distributed
     * collection and places the results in a new {@link DistBag} collection.
     *
     * @param <U>      type of the objects produced by the function given as
     *                 parameter
     * @param function function taking type T as input and returning U
     * @return a {@link DistFuture} producing a DistBag as a result
     */
    public <U> DistFuture<DistBag<U>> toBag(SerializableFunction<T, U> function) {
        final GlobalLoadBalancer glb = getGlb();

        // Create new collection to contain the result
        final DistBag<U> resultCollection = new DistBag<>(col.placeGroup());

        // Initialization for workers to be made before the computation starts.
        // This will bind a handle to place the U elements into the DistBag to each
        // worker in the system.
        final SerializableConsumer<WorkerService> workerInit = (w) -> w.attachOperationObject(resultCollection,
                resultCollection.getReceiver());

        // Adapt the provided function to represent what the glb workers will actually
        // perform
        final DistColLambda<T> realAction = (chunk, from, to, ws) -> {
            // First, retrieve the consumer of U which is bound to the worker
            // The object used as key to retrieve the object bound to workers is the result
            // collection
            @SuppressWarnings("unchecked")
            final Consumer<U> destination = (Consumer<U>) ws.retrieveOperationObject(resultCollection);

            // Iterate on the elements
            for (long l = from; l < to; l++) {
                try {
                    final T t = chunk.get(l);
                    final U u = function.apply(t);
                    destination.accept(u);
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(new LongRange(from, to), l, t));
                }
            }
        };

        // Initialize the future returned to the programmer in the underGLB method
        // The result of this operation is the DistBag "resultCollection"
        final DistFuture<DistBag<U>> future = new DistFuture<>(resultCollection);

        // Initializer for GlbTask of this DistCol in case it is not yet initialized
        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistChunkedList<T>, T, LongRange, LongRange, DistBag<U>, DistColLambda<T>> operation = new GlbOperation<>(
                col, realAction, future, initGlbTask, workerInit, lifelineClass);

        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }
}