OneSidedMoveManager.java
package handist.collections.dist;
import static apgas.Constructs.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import apgas.Place;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;
import mpi.MPI;
/**
* Implementation of {@link MoveManager} which provides features to make data
* transfer at the sole initiative of the sender.
*
* @author Patrick Finnerty
*
*/
public class OneSidedMoveManager implements MoveManager {
/** Private counter used to generate tags for MPI calls */
private static int intTag = 10;
private static final int MINIMUM_TAG_VALUE = 10;
/**
* Private method used to assign tags for MPI calls.
*
* @return a safe integer to use as tag
*/
/*
* When refactoring of MPI features access is made, this method should be moved.
*/
protected static synchronized int nextTag() {
final int toReturn = intTag++;
if (intTag < 0) {
intTag = MINIMUM_TAG_VALUE;
}
return toReturn;
}
/**
* List of deserializers used to process the received data on the destination
*/
final protected List<DeSerializer> deserializers;
/**
* Place to which the objects need to be sent
*/
final protected Place destination;
/**
* List of serializers used to transform the objects of the local host into
* bytes to be sent to the remote host
*/
final protected List<Serializer> serializers;
/**
* Constructor
*
* @param d the place which will be the destination of all the objects
* transferred
*/
public OneSidedMoveManager(Place d) {
destination = d;
deserializers = new ArrayList<>();
serializers = new ArrayList<>();
}
/**
* Sends the elements that have been requested to this move manager. The
* elements will be received on the remote host with an asynchronous task
* registered within the same Finish instance as the calling thread.
*
* @throws IOException if thrown during the serialization process
*/
@SuppressWarnings("deprecation")
public void asyncSend() throws IOException {
final byte[] bytesToSend = prepareByteArray();
final int nbOfBytes = bytesToSend.length;
final int destinationRank = TeamedPlaceGroup.world.rank(destination);
final int myRank = TeamedPlaceGroup.world.rank();
final int tag = nextTag();
TeamedPlaceGroup.world.comm.Isend(bytesToSend, 0, nbOfBytes, MPI.BYTE, destinationRank, tag);
asyncAt(destination, () -> {
// Receive the array of bytes
TeamedPlaceGroup.world.comm.Recv(new byte[nbOfBytes], 0, nbOfBytes, MPI.BYTE, myRank, tag);
final ByteArrayInputStream inStream = new ByteArrayInputStream(bytesToSend);
final ObjectInput oInput = new ObjectInput(inStream);
// The first object to come out of the byte array is a list of deserializers
@SuppressWarnings("unchecked")
final List<DeSerializer> ds = (List<DeSerializer>) oInput.readObject();
// We know apply each deserializer one after the other
for (final DeSerializer deserializer : ds) {
deserializer.accept(oInput);
}
oInput.close();
});
}
/**
* Applies all the serializers accumulated so far and produces a byte array
* which is going to be transmitted to the destination
*
* @return a byte array containing the deserializers and the serialized form of
* the objects that were targeted by the serializers
* @throws IOException if thrown during serialization of objects
*/
protected byte[] prepareByteArray() throws IOException {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
final ObjectOutput oo = new ObjectOutput(stream);
oo.writeObject(deserializers); // Write all the serializers first
for (final Serializer s : serializers) {
s.accept(oo); // Convert the objects targeted by the serializers into bytes.
}
oo.close();
final byte[] bytesToSend = stream.toByteArray();
return bytesToSend;
}
@Override
public void request(Place dest, Serializer s, DeSerializer d) {
if (dest != destination) {
throw new RuntimeException("OneSidedMoveManager received a request for " + dest
+ " but is only accepting submissions for" + destination);
}
serializers.add(s);
deserializers.add(d);
}
/**
* Sends the elements that have been requested in a synchronous fashion. This
* method will only return when the remote host has completed the reception of
* all the objects and has processed all of them with their deserializers
*
* @throws IOException if thrown during the serialization process
*/
@SuppressWarnings("deprecation")
public void send() throws IOException {
final byte[] bytesToSend = prepareByteArray();
final int nbOfBytes = bytesToSend.length;
final int destinationRank = TeamedPlaceGroup.world.rank(destination);
final int myRank = TeamedPlaceGroup.world.rank();
final int tag = nextTag();
TeamedPlaceGroup.world.comm.Isend(bytesToSend, 0, nbOfBytes, MPI.BYTE, destinationRank, tag);
at(destination, () -> {
// Receive the array of bytes
TeamedPlaceGroup.world.comm.Recv(new byte[nbOfBytes], 0, nbOfBytes, MPI.BYTE, myRank, tag);
final ByteArrayInputStream inStream = new ByteArrayInputStream(bytesToSend);
final ObjectInput oInput = new ObjectInput(inStream);
// The first object to come out of the byte array is a list of deserializers
@SuppressWarnings("unchecked")
final List<DeSerializer> ds = (List<DeSerializer>) oInput.readObject();
// We know apply each deserializer one after the other
for (final DeSerializer deserializer : ds) {
deserializer.accept(oInput);
}
oInput.close();
});
}
}