CustomOneSidedMoveManager.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.glb;

import static apgas.ExtendedConstructs.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;

import apgas.Place;
import apgas.SerializableJob;
import apgas.impl.Finish;
import handist.collections.dist.MoveManager;
import handist.collections.dist.OneSidedMoveManager;
import handist.collections.dist.TeamedPlaceGroup;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;
import mpi.MPI;

/**
 * Extension of class {@link OneSidedMoveManager} with more advanced and
 * specific functionalities used in the context of the GLB. The functionalities
 * of this class would be otherwise confusing to programmers. There are
 * therefore presented with package visibility so as to avoid exposing them
 * outside of this package.
 *
 * @author Patrick Finnerty
 *
 */
class CustomOneSidedMoveManager extends OneSidedMoveManager {

    /**
     * Constructor
     *
     * @param d place to which instances will be transferred
     */
    public CustomOneSidedMoveManager(Place d) {
        super(d);
    }

    /**
     * Performs the transfer of instances using an asynchronous task. The
     * serializers that were registered into this MoveManager are applied to
     * generate a byte array. The deserializers are serialized using the default
     * serialization method used by the APGAS library, most likely "kryo". After the
     * deserialization has completed, calls the job passed as parameter, making it
     * registered with the same finish instances that were passed as parameter.
     *
     * @param j       the job to run after the transfer of instances has completed
     * @param finishs {@link Finish} instances under which the asynchronous task in
     *                charge of making the transfer and the job given as parameter
     *                will be registered.
     * @throws IOException if thrown during the serialization of instances
     */
    public void asyncSendAndDoNoMPI(SerializableJob j, Finish... finishs) throws IOException {
        final byte[] bytesToSend = serializeObjectsOnly();

        final List<DeSerializer> ds = deserializers;

        asyncArbitraryFinish(destination, () -> {
            // Convert the array of bytes implicitly serialized into a byte stream
            final ByteArrayInputStream inStream = new ByteArrayInputStream(bytesToSend);
            final ObjectInput oInput = new ObjectInput(inStream);

            // Apply the deserializers
            for (final DeSerializer d : ds) {
                d.accept(oInput);
            }
            oInput.close();

            // Call the job that was passed as parameter
            j.run();
        }, finishs);
    }

    /**
     * Proceed to the serialization and send the bytes over to the destination. Also
     * spawn an asynchronous task on the remote place to receive the bytes and
     * deserialize them.
     *
     * @param j       the job to run after the deserialization of the objects that
     *                were transferred
     * @param finishs the finishes under which the asynchronous task which is
     *                spawned on the destination host is registered.
     * @throws IOException if thrown during serialization
     */
    @SuppressWarnings("deprecation")
    public void asyncSendAndDoWithMPI(SerializableJob j, Finish... finishs) throws IOException {
        final byte[] bytesToSend = prepareByteArray();

        final int nbOfBytes = bytesToSend.length;
        final int destinationRank = TeamedPlaceGroup.getWorld().rank(destination);
        final int myRank = TeamedPlaceGroup.getWorld().rank();
        final int tag = nextTag();

        TeamedPlaceGroup.getWorld().comm.Isend(bytesToSend, 0, nbOfBytes, MPI.BYTE, destinationRank, tag);

        asyncArbitraryFinish(destination, () -> {
            // Receive the array of bytes
            TeamedPlaceGroup.getWorld().comm.Recv(new byte[nbOfBytes], 0, nbOfBytes, MPI.BYTE, myRank, tag);
            final ByteArrayInputStream inStream = new ByteArrayInputStream(bytesToSend);
            final ObjectInput oInput = new ObjectInput(inStream);

            // The first object to come out of the byte array is a list of deserializers
            @SuppressWarnings("unchecked")
            final List<DeSerializer> ds = (List<DeSerializer>) oInput.readObject();

            // We know apply each deserializer one after the other
            for (final DeSerializer deserializer : ds) {
                deserializer.accept(oInput);
            }
            oInput.close();

            // Reception is over, launch the job that was given as parameter
            j.run();
        }, finishs);
    }

    /**
     * Returns the list of the deserializers that have been registered into this
     * move manager
     *
     * @return the list of deserializers registered into this {@link MoveManager}
     */
    public List<DeSerializer> getDeSerializer() {
        return deserializers;
    }

    /**
     * Similar to {@link OneSidedMoveManager} method used to prepare bytes, but does
     * not serialize the deserializers at the beginning of the array. Instead, only
     * the serializers are called and their output written to the returned array.
     *
     * @return byte array containing the serialized form of the objects targeted by
     *         the serializers
     * @throws IOException if thrown during the serialization process
     */
    public byte[] serializeObjectsOnly() throws IOException {
        final ByteArrayOutputStream stream = new ByteArrayOutputStream();
        final ObjectOutput oo = new ObjectOutput(stream);

        for (final Serializer s : serializers) {
            s.accept(oo); // Convert the objects targeted by the serializers into bytes.
        }
        oo.close();

        final byte[] bytesToSend = stream.toByteArray();
        return bytesToSend;
    }
}