DistColGlb.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.glb;

import java.io.Serializable;
import java.util.function.Consumer;

import handist.collections.Chunk;
import handist.collections.LongRange;
import handist.collections.dist.DistBag;
import handist.collections.dist.DistCol;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
import handist.collections.function.SerializableFunction;
import handist.collections.function.SerializableLongTBiConsumer;
import handist.collections.function.SerializableSupplier;

/**
 * This class proposes various operations that operate on all the elements of a
 * {@link DistCol} as part of a GLB program. Any call to methods of this class
 * should be made from within a
 * {@link GlobalLoadBalancer#underGLB(apgas.SerializableJob)} method.
 *
 * @author Patrick Finnerty
 * @param <T> type of the elements contained in the underlying distributed
 *            collection
 */
public class DistColGlb<T> extends AbstractGlbHandle implements Serializable {

    /**
     * Runtime exception used when a {@link Throwable} is thrown from a closure
     * given as parameter of a Glb operation.
     *
     * @author Patrick Finnerty
     *
     */
    public static class DistColGlbError extends RuntimeException implements Serializable {

        /** Serial Version UID */
        private static final long serialVersionUID = -6284960496356484016L;

        /** Index in the {@link DistCol} on which a problem was encountered */
        public final long index;

        /**
         * Range on which the assignment was operating at the time the exception was
         * encountered
         */
        public final LongRange assignmentRange;

        /**
         * Constructor
         * <p>
         * This constructor is made private as instances of this class do not need to be
         * created outside of {@link DistColGlb}.
         *
         * @param lr range on which the assignment was operating
         * @param l  index at which the throwable was thrown
         * @param t  the {@link Throwable} thrown by the user-supplied closure
         */
        private DistColGlbError(LongRange lr, long l, Throwable t) {
            super(t.getMessage() + " at index " + l + " in assignment on range " + lr, t);
            assignmentRange = new LongRange(lr.from, lr.to);
            index = l;
        }
    }

    /** Serial Version UID */
    private static final long serialVersionUID = 612021438330155918L;

    /** Underlying collection on which the operations of this class operate */
    DistCol<T> col;

    /**
     * Constructor
     *
     * @param c collection on which this handle will operate
     */
    public DistColGlb(DistCol<T> c) {
        col = c;
    }

    /**
     * Applies the specified action to all the elements contained in the
     * {@link DistCol} and returns the underlying collection
     *
     * @param action action to perform on each element
     * @return future representing this "forEach" operation which will return the
     *         underlying {@link DistCol} collection upon termination
     */
    public DistFuture<DistCol<T>> forEach(SerializableConsumer<T> action) {
        final GlobalLoadBalancer glb = getGlb();

        // Initialize the future returned to the programmer in the underGLB method
        // In this operation, the collection involved is the handle itself
        final DistFuture<DistCol<T>> future = new DistFuture<>(col);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // We transform the action to accept a LongRange as parameter, retrieve
        // the T at the each index, and apply the lambda given as parameter to these Ts
        // The second argument (WorkerService) provided by the GLB runtime is unused for
        // this operation
        final SerializableBiConsumer<LongRange, WorkerService> realAction = (lr, ws) -> {
            for (long l = lr.from; l < lr.to; l++) {
                try {
                    action.accept(col.get(l));
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(lr, l, t));
                }
            }
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistCol<T>, T, LongRange, LongRange, DistCol<T>> operation = new GlbOperation<>(col,
                realAction, future, initGlbTask, null);
        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    /**
     * Applies the specified action to all the elements contained in the
     * {@link DistCol} and returns the underlying collection
     *
     * @param action action to perform on each element, taking the index and the
     *               object as parameter
     * @return future representing this "forEach" operation which will return the
     *         underlying {@link DistCol} collection upon termination
     */
    public DistFuture<DistCol<T>> forEach(SerializableLongTBiConsumer<T> action) {
        final GlobalLoadBalancer glb = getGlb();

        // Initialize the future returned to the programmer in the underGLB method
        // In this operation, the collection involved is the handle itself
        final DistFuture<DistCol<T>> future = new DistFuture<>(col);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // We transform the action to accept a LongRange as parameter, retrieve
        // the T at the each index, and apply the lambda given as parameter to these Ts
        // The second argument (WorkerService) provided by the GLB runtime is unused for
        // this operation
        final SerializableBiConsumer<LongRange, WorkerService> realAction = (lr, ws) -> {
            for (long l = lr.from; l < lr.to; l++) {
                try {
                    action.accept(l, col.get(l));
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(lr, l, t));
                }
            }
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistCol<T>, T, LongRange, LongRange, DistCol<T>> operation = new GlbOperation<>(col,
                realAction, future, initGlbTask, null);
        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    /**
     * GLB operation which creates a new {@link DistCol} using the mapping operation
     * provided as parameter. The resulting {@link DistCol} will contain the same
     * indices as this collection. The value stored at each index of the resulting
     * collection will be the result of the provided mapping operation for this
     * collection at the same index. As part of the GLB consists in moving entries
     * from place to place, it is possible for the distribution of the resulting
     * collection and this collection to differ.
     *
     * @param <U> type of the result of the map function provided as parameter
     * @param map function which takes an object T as input and returns a instance
     *            of type U
     * @return a {@link DistFuture}
     */
    public <U> DistFuture<DistCol<U>> map(SerializableFunction<T, U> map) {
        final GlobalLoadBalancer glb = getGlb();

        // Create new collection to contain the result
        final DistCol<U> resultCollection = new DistCol<>(col.placeGroup());

        // Adapt the provided map to represent what the glb workers will actually
        // perform.
        // The second argument (WorkerService) provided by the GLB runtime is unused for
        // this operation
        final SerializableBiConsumer<LongRange, WorkerService> realAction = (lr, ws) -> {
            // First, initialize a Chunk to place the mappings
            final Chunk<U> c = new Chunk<>(lr);

            /*
             * FIXME ChunkedList (parent of DistCol) does not support concurrent insertion
             * of chunks As a result, inserting a chunk in the result collection must be
             * done in mutual exclusion with any other worker inserting chunks in this
             * collection on the local host
             */
            synchronized (resultCollection) {
                resultCollection.add(c);
            }

            // Iterate on the elements
            for (long l = lr.from; l < lr.to; l++) {
                try {
                    final T t = col.get(l);
                    final U u = map.apply(t);
                    c.set(l, u);
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(lr, l, t));
                }
            }
        };

        // Initialize the future returned to the programmer in the underGLB method
        // In this operation, the collection involved is the handle itself
        final DistFuture<DistCol<U>> future = new DistFuture<>(resultCollection);

        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistCol<T>, T, LongRange, LongRange, DistCol<U>> operation = new GlbOperation<>(col,
                realAction, future, initGlbTask, null);

        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }

    /**
     * Applies the given function to every element contained in this distributed
     * collection and places the results in a new {@link DistBag} collection.
     *
     * @param <U>      type of the objects produced by the function given as
     *                 parameter
     * @param function function taking type T as input and returning U
     * @return a {@link DistFuture} producing a DistBag as a result
     */
    public <U> DistFuture<DistBag<U>> toBag(SerializableFunction<T, U> function) {
        final GlobalLoadBalancer glb = getGlb();

        // Create new collection to contain the result
        final DistBag<U> resultCollection = new DistBag<>(col.placeGroup());

        // Initialization for workers to be made before the computation starts.
        // This will bind a handle to place the U elements into the DistBag to each
        // worker in the system.
        final SerializableConsumer<WorkerService> workerInit = (w) -> w.attachOperationObject(resultCollection,
                resultCollection.getReceiver());

        // Adapt the provided function to represent what the glb workers will actually
        // perform
        final SerializableBiConsumer<LongRange, WorkerService> realAction = (lr, ws) -> {
            // First, retrieve the consumer of U which is bound to the worker
            // The object used as key to retrieve the object bound to workers is the result
            // collection
            @SuppressWarnings("unchecked")
            final Consumer<U> destination = (Consumer<U>) ws.retrieveOperationObject(resultCollection);

            // Iterate on the elements
            for (long l = lr.from; l < lr.to; l++) {
                try {
                    final T t = col.get(l);
                    final U u = function.apply(t);
                    destination.accept(u);
                } catch (final Throwable t) {
                    ws.throwableInOperation(new DistColGlbError(lr, l, t));
                }
            }
        };

        // Initialize the future returned to the programmer in the underGLB method
        // The result of this operation is the DistBag "resultCollection"
        final DistFuture<DistBag<U>> future = new DistFuture<>(resultCollection);

        // Initializer for GlbTask of this DistCol in case it is not yet initialized
        final SerializableSupplier<GlbTask> initGlbTask = () -> {
            return new DistColGlbTask(col);
        };

        // Create the operation with all the types/arguments
        final GlbOperation<DistCol<T>, T, LongRange, LongRange, DistBag<U>> operation = new GlbOperation<>(col,
                realAction, future, initGlbTask, workerInit);

        // Submit the operation to the GLB
        glb.submit(operation);

        // return the future to the programmer
        return future;
    }
}