DistBag.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.dist;
import static apgas.Constructs.*;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import apgas.Constructs;
import apgas.Place;
import apgas.util.GlobalID;
import apgas.util.SerializableWithReplace;
import handist.collections.Bag;
import handist.collections.dist.util.IntLongPair;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.MemberOfLazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.DeSerializerUsingPlace;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
import handist.collections.function.Serializer;
import mpi.MPI;
import mpi.MPIException;
/**
* A class for handling objects at multiple places. It is allowed to add new
* elements dynamically. This class provides methods for load balancing.
* <p>
* Note: In its current implementation, there are some limitations.
* <ul>
* <li>There is only one load balancing method
* <li>The method flattens the number of elements of the all places
* </ul>
*
* @param <T> type of the elements handled by the {@link DistBag}.
*/
public class DistBag<T> extends Bag<T> implements DistributedCollection<T, DistBag<T>>, SerializableWithReplace {
/* implements Container[T], ReceiverHolder[T] */
/**
* Implementation of the GLOBAL handle for class {@link DistBag}
*
* @author Patrick Finnerty
*
*/
public class DistBagGlobal extends GlobalOperations<T, DistBag<T>> {
/**
* Constructor
*
* @param handle handle to the local {@link DistBag} this GLOBAL handle acts on
*/
DistBagGlobal(DistBag<T> handle) {
super(handle);
}
@Override
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = placeGroup;
final GlobalID gId = id;
return new MemberOfLazyObjectReference<>(pg1, gId, () -> {
return new DistBag<>(pg1, gId);
}, (distBag) -> {
return distBag.GLOBAL;
});
}
}
/**
* Implementation of the TEAM handle for class {@link DistBag}
*
* @author Patrick Finnerty
*
*/
public class DistBagTeam extends TeamOperations<T, DistBag<T>> {
/**
* Constructor
*
* @param handle handle to the local {@link DistBag} this TEAM handle acts on
*/
DistBagTeam(DistBag<T> handle) {
super(handle);
}
@SuppressWarnings("deprecation")
@Override
public void size(long[] result) {
final TeamedPlaceGroup pg = handle.placeGroup();
result[pg.myrank] = handle.size();
try {
// THIS WORKS FOR MPJ-NATIVE implementation
pg.comm.Allgather(result, pg.myrank, 1, MPI.LONG, result, 0, 1, MPI.LONG);
} catch (final MPIException e) {
e.printStackTrace();
throw new Error("[DistMap] network error in team().size()");
}
}
@Override
public void updateDist() {
// TODO Auto-generated method stub
}
}
private static int _debug_level = 5;
/** Handle to Global operations on the DistBag instance */
public DistBag<T>.DistBagGlobal GLOBAL;
/**
* Global Id which identifies this DistBag object as part of a number of handles
* to the distributed collection implemented by this instance
*/
final GlobalID id;
/**
* Array keeping track of the number of entries on the various places on which
* this distributed collection is defined.
*/
public transient float[] locality;
/**
* Set of Places on which this {@link DistBag} is defined, i.e. on which Places
* can this distributed collection can hold instances.
*/
public final TeamedPlaceGroup placeGroup;
/**
* Handle to TEAM operations on this DistBag instance
*/
public DistBag<T>.DistBagTeam TEAM;
/**
* Create a new DistBag. Place.places() is used as the PlaceGroup.
*/
public DistBag() {
this(TeamedPlaceGroup.getWorld());
}
/**
* Create a new DistBag using the given arguments.
*
* @param pg the places susceptible to interact with this instance.
*/
public DistBag(TeamedPlaceGroup pg) {
this(pg, new GlobalID());
}
/**
* Constructor for DistBag.
*
* @param pg group of places on which this contruction is to be
* initialized
* @param globalId unique identifier linked to every local {@link DistBag}
* instance which participates in this distributed collection
*/
protected DistBag(TeamedPlaceGroup pg, GlobalID globalId) {
super();
id = globalId;
placeGroup = pg;
locality = new float[pg.size];
Arrays.fill(locality, 1.0f);
id.putHere(this);
GLOBAL = new DistBagGlobal(this);
TEAM = new DistBagTeam(this);
}
@Override
public void forEach(SerializableConsumer<T> action) {
super.forEach(action);
}
/**
* gather all place-local elements to the root Place.
*
* @param root the place where the result of reduction is stored.
*/
@SuppressWarnings("unchecked")
public void gather(Place root) {
final Serializer serProcess = (ObjectOutput s) -> {
s.writeObject(new Bag<>(this));
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place place) -> {
final Bag<T> imported = (Bag<T>) ds.readObject();
addBag(imported);
};
CollectiveRelocator.gatherSer(placeGroup, root, serProcess, desProcess);
if (!here().equals(root)) {
clear();
}
}
/**
* Return a Container that has the same values of DistBag's local storage.
*
* @return a Container that has the same values of local storage.
*/
/*
* public Collection<T> clone(): Container[T] { return data.clone(); }
*/
@Override
public GlobalOperations<T, DistBag<T>> global() {
return GLOBAL;
}
/*
* public def integrate(src : List[T]) { // addAll(src); throw new
* UnsupportedOperationException(); }
*/
@Override
public GlobalID id() {
return id;
}
@Override
public float[] locality() {
return locality;
}
@Override
public void moveAtSyncCount(final ArrayList<IntLongPair> moveList, final MoveManager mm) throws Exception {
for (final IntLongPair pair : moveList) {
if (_debug_level > 5) {
System.out.println("MOVE src: " + here() + " dest: " + pair.first + " size: " + pair.second);
}
if (pair.second > Integer.MAX_VALUE) {
throw new Error("One place cannot receive so much elements: " + pair.second);
}
moveAtSyncCount((int) pair.second, placeGroup.get(pair.first), mm);
}
}
/**
* Removes the specified number of entries from the local Bag and prepares them
* to be transfered to the specified place when the
* {@link CollectiveMoveManager#sync()} method of the
* {@link CollectiveMoveManager} is called.
* <p>
* The objects are not removed from the local collection until method
* {@link CollectiveMoveManager#sync()} is called. If the {@code destination} is
* the local placce, this method has no effects.
*
* @param count number of objects to move from this instance
* @param destination the destination of the objects
* @param mm move manager in charge of making the transfer
*/
@SuppressWarnings("unchecked")
public void moveAtSyncCount(final int count, Place destination, MoveManager mm) {
if (destination.equals(Constructs.here())) {
return;
}
final DistBag<T> collection = this;
final Serializer serialize = (ObjectOutput s) -> {
s.writeObject(this.remove(count));
};
final DeSerializer deserialize = (ObjectInput ds) -> {
final List<T> imported = (List<T>) ds.readObject();
collection.addBag(imported);
};
mm.request(destination, serialize, deserialize);
}
@Override
public void parallelForEach(SerializableConsumer<T> action) {
super.parallelForEach(action);
}
@Override
public TeamedPlaceGroup placeGroup() {
return placeGroup;
}
@SuppressWarnings("unused")
@Deprecated
private void setupBranches(SerializableBiConsumer<Place, DistBag<T>> gen) {
final DistBag<T> handle = this;
finish(() -> {
placeGroup.broadcastFlat(() -> {
gen.accept(here(), handle);
});
});
}
@Override
public TeamOperations<T, DistBag<T>> team() {
return TEAM;
}
@Override
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = placeGroup;
final GlobalID id1 = id;
return new LazyObjectReference<>(pg1, id1, () -> {
return new DistBag<>(pg1, id1);
});
}
}