CollectiveMoveManager.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.dist;

import static apgas.Constructs.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import apgas.Place;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;

/**
 * This class is used for relocating elements of distributed collections in a
 * collective manner. I.e. the transfer of instances between places occurs when
 * an instance of this relocator's {@link #sync()} method is called on every
 * host involved in the transfer.
 */
public final class CollectiveMoveManager implements MoveManager {
    private static final boolean DEBUG = false;

    /**
     * Collection of the deserializers, gathered by destination place.
     */
    private final Map<Place, List<DeSerializer>> builders;

    /**
     * The group of places which are involved in the collective relocation
     * operation.
     */
    private final TeamedPlaceGroup placeGroup;

    /**
     * The collection of serializers, gathered by destination places
     */
    private final Map<Place, List<Serializer>> serializeListMap;

    /**
     * Construct a MoveManagerLocal with the given arguments.
     *
     * @param placeGroup the group hosts that will transfer objects between
     *                   themselves using this instance.
     */
    public CollectiveMoveManager(TeamedPlaceGroup placeGroup) {
        this.placeGroup = placeGroup;
        serializeListMap = new HashMap<>(placeGroup.size());
        builders = new HashMap<>(placeGroup.size());
        for (final Place place : placeGroup.places()) {
            serializeListMap.put(place, new ArrayList<>());
            builders.put(place, new ArrayList<>());
        }
    }

    private void all2allser() throws Exception {
        // Prepare to send the data
        final int[] sendOffset = new int[placeGroup.size()];
        final int[] sendSize = new int[placeGroup.size()];
        // Rather than initializing an array, use an output stream as we do not know how
        // long the array needs to be an advance.
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        executeSerialization(out, sendOffset, sendSize);

        // Prepare the arrays for receiving the information
        final int[] rcvOffset = new int[placeGroup.size()];
        final int[] rcvSize = new int[placeGroup.size()];

        // Make the MPI calls
        final byte[] buf = CollectiveRelocator.exchangeBytesWithinGroup(placeGroup, out.toByteArray(), sendOffset,
                sendSize, rcvOffset, rcvSize);

        // Deserialize the objects received from the various hosts
        executeDeserialization(buf, rcvOffset, rcvSize);
    }

    /**
     * Removes all the serializers and deserializers contained in this instance
     */
    public void clear() {
        for (final List<Serializer> list : serializeListMap.values()) {
            list.clear();
        }
        for (final List<DeSerializer> list : builders.values()) {
            list.clear();
        }
    }

    @SuppressWarnings("unchecked")
    private void executeDeserialization(byte[] buf, int[] rcvOffset, int[] rcvSize) throws Exception {
        int current = 0;
        for (final Place p : placeGroup.places()) {
            final int size = rcvSize[current];
            final int offset = rcvOffset[current];
            current++;
            if (p.equals(here())) {
                continue;
            }

            final ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, size);
            final ObjectInput ds = new ObjectInput(in);
            final List<DeSerializer> deserializerList = (List<DeSerializer>) ds.readObject();
            for (final DeSerializer deserialize : deserializerList) {
                deserialize.accept(ds);
            }
            ds.close();
        }
    }

    /*
     * 将来的に moveAtSync(dist:RangedDistribution, mm) を 持つものを interface 宣言するのかな?
     * public def moveAssociativeCollectionsAtSync(dist: RangedDistribution, dists:
     * List[RangedMoballe]) {
     *
     * } public def moveAssosicativeCollectionsAtSync(dist: Distribution[K]) { //
     * add dist to the list to schedule }
     */
    /**
     * Proceed to call all the serializers held by this instance and prepare an
     * array for an incoming MPI call.
     *
     * @param out     output stream into which the serialized objects need to be
     *                placed
     * @param offsets array describing the index in the byte array which is destined
     *                for every destination
     * @param sizes   length in the array which is destined to every host
     * @throws IOException if thrown while serializing the objects
     */
    private void executeSerialization(ByteArrayOutputStream out, int[] offsets, int[] sizes) throws IOException {
        for (int i = 0; i < placeGroup.size(); i++) {
            final Place place = placeGroup.get(i);
            if (place.equals(here())) {
                continue;
            }
            offsets[i] = out.size();
            // TODO should reopen ByteArray...
            if (DEBUG) {
                System.err.println("execSeri: " + here() + "->" + place + ":start:" + out.size());
            }
            final ObjectOutput s = new ObjectOutput(out);

            // First, write all the deserializers which will have to operate on the other
            // end
            s.writeObject(builders.get(place));
            // Then call all the serializers
            for (final Serializer serializer : serializeListMap.get(place)) {
                serializer.accept(s);
            }
            s.close();
            if (DEBUG) {
                System.err.println("execSeri: " + here() + "->" + place + ":finish:" + out.size());
            }
            sizes[i] = out.size() - offsets[i];
        }
    }

    @Override
    public void request(Place pl, Serializer serializer, DeSerializer deserializer) {
        serializeListMap.get(pl).add(serializer);
        builders.get(pl).add(deserializer);
    }

    /**
     * Execute the all requests synchronously. When the transfer of objects
     * completes, clears this object so that it can be safely re-used for another
     * transfer.
     *
     * @throws Exception if a runtime exception is thrown at any stage during the
     *                   relocation
     */
    public void sync() throws Exception {
        all2allser();
        // Clear the MoveManager to make it safe to reuse
        clear();
    }
}