CachableChunkedList.java
package handist.collections.dist;
import static apgas.Constructs.*;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import apgas.Place;
import apgas.util.GlobalID;
import handist.collections.Chunk;
import handist.collections.ChunkedList;
import handist.collections.LongRange;
import handist.collections.RangedList;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.DeSerializerUsingPlace;
import handist.collections.function.LongTBiConsumer;
import handist.collections.function.PrimitiveInput;
import handist.collections.function.PrimitiveOutput;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.Serializer;
import mpi.MPI;
import mpi.MPIException;
import mpi.Op;
/**
* {@link DistCol} with additional features allowing it to replicate some range
* of values held by a process onto other processes
*
* @param <T> type contained by this collection
*/
public class CachableChunkedList<T> extends DistCol<T> {
// public static class Team<T> extends DistCol.Team<T> {
//
// /**
// * Super constructor. Needs to be called by all implementations to initialize
// * the necessary members common to all Team handles.
// *
// * @param localObject local handle of the distributed collection
// */
// private Team(CachableChunkedList<T> localObject) {
// super(localObject);
// }
//
// @Override
// public void gather(Place root) {
// throw new UnsupportedOperationException("CachableChunkedList does not support gather().");
// }
//
// /**
// * Computes and gathers the size of each local collection <b>not including
// * shared collections</b> into the provided array. This operation usually
// * requires that all the hosts that are manipulating the distributed collection
// * call this method before it returns on any host. This is due to the fact some
// * communication between the {@link Place}s in the collection's
// * {@link TeamedPlaceGroup} is needed to compute/gather the result.
// *
// * @param result long array in which the result will be gathered
// */
// @Override
// public void getSizeDistribution(final long[] result) {
// super.getSizeDistribution(result);
// }
//
// @Override
// public <R extends Reducer<R, T>> R parallelReduce(R reducer) {
// return super.parallelReduce(reducer);
// }
//
// @Override
// public <R extends Reducer<R, T>> R reduce(R reducer) {
// return super.reduce(reducer);
// }
//
// @Override
// public void teamedBalance() {
// throw new UnsupportedOperationException("CachableChunkedList does not support balance operations.");
// }
//
// @Override
// public void teamedBalance(final CollectiveMoveManager mm) {
// throw new UnsupportedOperationException("CachableChunkedList does not support balance operations.");
// }
//
// @Override
// public void teamedBalance(final float[] newLocality) {
// throw new UnsupportedOperationException("CachableChunkedList does not support balance operations.");
// }
//
// @Override
// public void teamedBalance(final float[] newLocality, final CollectiveMoveManager mm) {
// throw new UnsupportedOperationException("CachableChunkedList does not support balance operations.");
// }
// }
/**
* List of chunks that have been shared to this local branch by a remote branch
*/
protected ChunkedList<T> shared = new ChunkedList<>();
/**
* Map keeping track of the "owner" of each range in the collection
*/
protected HashMap<RangedList<T>, Place> shared2owner = new HashMap<>();
/**
* Creates a new {@link CachableChunkedList} on the specified
* {@link TeamedPlaceGroup}
*
* @param pg the group of places on which this collection may have a branch
*/
public CachableChunkedList(final TeamedPlaceGroup pg) {
this(pg, new GlobalID());
}
/**
* Creates a new branch for a {@link CachableChunkedList} with the specified
* group of places and id which identifies the distributed collection into which
* the created branch is taking place
*
* @param placeGroup the group of places on which the distributed collection may
* have a handle
* @param id global id identifying the distributed collection
*/
private CachableChunkedList(final TeamedPlaceGroup placeGroup, final GlobalID id) {
super(placeGroup, id, (TeamedPlaceGroup pg, GlobalID gid) -> new CachableChunkedList<>(pg, gid));
}
/**
* Adds several ranged lists shared by a remote branch to this local branch
*
* @param owner the owner of the shared ranged lists
* @param chunks the chunks shared with this branch
*/
private void addNewShared(Place owner, List<RangedList<T>> chunks) {
for (final RangedList<T> chunk : chunks) {
if (!owner.equals(here())) {
add(chunk);
}
shared.add(chunk);
shared2owner.put(chunk, owner);
}
}
/**
* Adds a ranged list shared by a remote branch to this local branch
*
* @param owner the owner of the shared ranged list
* @param chunk the shared with this branch
*/
private void addNewShared(Place owner, RangedList<T> chunk) {
if (!owner.equals(here())) {
add(chunk);
}
shared.add(chunk);
shared2owner.put(chunk, owner);
}
/**
* Conduct allreduce operation on shared chunks using MPI reduce operation.
* <p>
* This variant cannot handle object data, but is faster than
* {@link #allreduce(Function, BiConsumer)} in many cases.
* <p>
* The core idea consists in converting each individual T object into a number
* of {@code long}, {@code int}, and {@code double}, perform an MPI primitive
* "all reduce" reduction on these raw types, and modify the T elements
* contained by the {@link CachableChunkedList} based on the resulting
* {@link PrimitiveInput}.
* <p>
* The number of raw type values of each type stored into the
* {@link PrimitiveOutput} must be the same for all T elements. The
* {@code unpack} closure supplied as second parameter must also extract the
* same number of raw type data (even if such raw type data is eventually unused
* to modify the T element) to preserve the consistency of the data in relation
* to the individual T element being processed.
*
* <br>
* =========================================================================
* <br>
* code sample
*
* <pre>
* class Element {
* double d1, d2;
* int i;
* }
*
* cachableChunkedList.allreduce((PrimitiveOutput out, Element e) -> {
* out.writeDouble(e.d1);
* out.writeDouble(e.d2);
* out.writeInt(e.i);
* }, (PrimitiveInput in, Element e) -> {
* e.d1 = in.readDouble(); // Match the unpack order with the pack closure above
* e.d2 = in.readDouble();
* in.readInt(); // (For examples purposes) the int is eventually not used to modify `e`,
* // but in.readInt needs to be called regardless
* }, MPI.SUM);
* </pre>
*
* <br>
* =========================================================================
* <br>
*
* @param pack the function that receives an element and extracts data to
* transfer and reduce with other places.
* @param unpack the function that receives a local element and raw data that
* was reduced between the hosts using MPI.
* @param op the MPI reduction operation used to merge the
*/
public void allreduce(BiConsumer<PrimitiveOutput, T> pack, BiConsumer<PrimitiveInput, T> unpack, Op op) {
allreduce(new ArrayList<>(shared.ranges()), pack, unpack, op); // TODO: not good, copying ranges to arraylist
}
/**
* conduct allreduce operation on shared chunks.
*
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param <U> the type of the extracted data
*/
public <U> void allreduce(Function<T, U> pack, BiConsumer<T, U> unpack) {
allreduce(new ArrayList<>(shared.ranges()), pack, unpack); // TODO: not good, copying ranges to arraylist
}
/**
* conduct allreduce operation on shared chunks.
*
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param mm the collective relocator to manage serialize process
* @param <U> the type of the extracted data
*/
public <U> void allreduce(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Allgather mm) {
allreduce(new ArrayList<>(shared.ranges()), pack, unpack, mm); // TODO: not good, copying ranges to arraylist
}
/**
* Refer to {@link CachableChunkedList#allreduce(BiConsumer, BiConsumer, Op)}.
* <p>
* This method needs to be called with the same ranges on all places on which
* this {@link CachableChunkedList} is defined. Otherwise it will throw
* {@link MPIException}.
*
* @param ranges the ranges on which the common reduction is to be applied
* @throws MPIException if called with different ranges on the various hosts
* involved in the common reduction
*/
@SuppressWarnings("deprecation")
public void allreduce(List<LongRange> ranges, BiConsumer<PrimitiveOutput, T> pack,
BiConsumer<PrimitiveInput, T> unpack, Op op) {
final List<RangedList<T>> chunks = searchSharedChunks(ranges);
final Iterator<RangedList<T>> listIt = chunks.iterator();
Iterator<T> chunkIt = listIt.next().iterator();
final PrimitiveStream stream = new PrimitiveStream(10);
// Count how many times one pack calls writeDouble, writeInt, writeLong.
pack.accept(stream, chunkIt.next());
// Compute how many T elements there are to pack
int nbOfElements = 0;
for (final RangedList<T> r : chunks) {
nbOfElements += r.size();
}
// Adjust stream size according to how many elements are expected
stream.adjustSize(nbOfElements);
// Process the remainder of the elements ...
// Complete the current chunk
while (chunkIt.hasNext()) {
pack.accept(stream, chunkIt.next());
}
// Deal with all the remaining chunks in the same manner
while (listIt.hasNext()) {
chunkIt = listIt.next().iterator();
while (chunkIt.hasNext()) {
pack.accept(stream, chunkIt.next());
}
}
stream.checkIsFull(); // Sanity check, the arrays inside `stream` should be full.
// communicate
if (stream.doubleArray.length != 0) {
final int size = stream.doubleArray.length;
placeGroup().comm.Allreduce(stream.doubleArray, 0, stream.doubleArray, 0, size, MPI.DOUBLE, op);
}
if (stream.intArray.length != 0) {
final int size = stream.intArray.length;
placeGroup().comm.Allreduce(stream.intArray, 0, stream.intArray, 0, size, MPI.INT, op);
}
if (stream.longArray.length != 0) {
final int size = stream.longArray.length;
placeGroup().comm.Allreduce(stream.longArray, 0, stream.longArray, 0, size, MPI.LONG, op);
}
// do unpack
stream.reset();
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T, t) -> {
unpack.accept(stream, t);
});
}
}
/**
* conduct allreduce operation on shared chunks in the given range. Note: please
* use the same ranges in all the places.
*
* @param ranges the list of ranges in which chunks are applied to the
* operation.
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param <U> the type of the extracted data
*/
public <U> void allreduce(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
allreduce(ranges, pack, unpack, mm);
mm.execute();
}
/**
* conduct allreduce operation on shared chunks in the given range. Note: please
* use the same ranges in all the places.
*
* @param ranges the list of ranges in which chunks are applied to the
* operation.
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param mm the collective relocator to manage serialize process
* @param <U> the type of the extracted data
*/
public <U> void allreduce(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack,
CollectiveRelocator.Allgather mm) {
final List<RangedList<T>> chunks = searchSharedChunks(ranges);
final Serializer serProcess = (ObjectOutput s) -> {
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T elem) -> {
s.writeObject(pack.apply(elem));
});
}
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place place) -> {
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T elem) -> {
@SuppressWarnings("unchecked")
final U diff = (U) ds.readObject();
unpack.accept(elem, diff);
});
}
};
mm.request(serProcess, desProcess);
}
private void assertUnshared(LongRange r) {
if (!searchSharedChunks(Collections.singletonList(r)).isEmpty()) {
throw new IllegalStateException("CachableChunkedList found shared chunks in range: " + r);
}
}
/**
* Conducts a broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param <U> the type used to transfer information from originals to shared
* replicas on remote places
* @param pack the function used to transform T objects into the U type used
* for transfer
* @param unpack the closure used to update the T objects based on on the
* received U objects
*/
public <U> void bcast(Function<T, U> pack, BiConsumer<T, U> unpack) {
bcast((LongRange) null, pack, unpack);
}
/**
* Conducts a broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param <U> the type used to transfer information from originals to shared
* replicas on remote places
* @param pack the function used to transform T objects into the U type used
* for transfer
* @param unpack the closure used to update the T objects based on on the
* received U objects
* @param mm the relocator in charge of handling the communication between
* hosts
*/
public <U> void bcast(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Allgather mm) {
bcast((LongRange) null, pack, unpack, mm);
}
/**
* Conducts a broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param <U> the type used to transfer information from originals to shared
* replicas on remote places
* @param ranges the ranges to braodcast
* @param pack the function used to transform T objects into the U type used
* for transfer
* @param unpack the closure used to update the T objects based on on the
* received U objects
*
*/
public <U> void bcast(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
bcast(ranges, pack, unpack, mm);
mm.execute();
}
/**
* Conducts a broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param <U> the type used to transfer information from originals to shared
* replicas on remote places
* @param ranges the ranges to braodcast
* @param pack the function used to transform T objects into the U type used
* for transfer
* @param unpack the closure used to update the T objects based on on the
* received U objects
* @param mm the relocator in charge of handling the communication between
* hosts
*/
public <U> void bcast(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack,
CollectiveRelocator.Allgather mm) {
final List<RangedList<T>> chunks = searchSharedChunks(here(), ranges);
final Serializer serProcess = (ObjectOutput s) -> {
s.writeObject(ranges);
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T elem) -> {
s.writeObject(pack.apply(elem));
});
}
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place p) -> {
if (p.equals(here())) {
return;
}
@SuppressWarnings("unchecked")
final List<LongRange> rangesX = (List<LongRange>) ds.readObject();
final List<RangedList<T>> receiving = searchSharedChunks(p, rangesX);
for (final RangedList<T> chunk : receiving) {
chunk.forEach((T elem) -> {
@SuppressWarnings("unchecked")
final U diff = (U) ds.readObject();
unpack.accept(elem, diff);
});
}
};
mm.request(serProcess, desProcess);
}
/**
* Conducts a broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param <U> the type used to transfer information from originals to shared
* replicas on remote places
* @param range the range to braodcast
* @param pack the function used to transform T objects into the U type used
* for transfer
* @param unpack the closure used to update the T objects based on on the
* received U objects
*/
public <U> void bcast(LongRange range, Function<T, U> pack, BiConsumer<T, U> unpack) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
bcast(range, pack, unpack, mm);
mm.execute();
}
/**
* Conducts a broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param <U> the type used to transfer information from originals to shared
* replicas on remote places
* @param range the range to braodcast
* @param pack the function used to transform T objects into the U type used
* for transfer
* @param unpack the closure used to update the T objects based on on the
* received U objects
* @param mm the relocator in charge of handling the communication between
* hosts
*/
public <U> void bcast(LongRange range, Function<T, U> pack, BiConsumer<T, U> unpack,
CollectiveRelocator.Allgather mm) {
bcast(Collections.singletonList(range), pack, unpack, mm);
}
@Override
public void clear() {
// TODO
// The super of clear() assumes teamed operation of clear();
}
private List<RangedList<T>> exportLocalChunks(List<LongRange> ranges) {
// TODO
// Should we check the overlaps in ranges?
final ArrayList<RangedList<T>> result = new ArrayList<>();
for (final LongRange range : ranges) {
forEachChunk(range, (RangedList<T> chunk) -> {
final LongRange r0 = chunk.getRange();
if (range.contains(r0)) {
addNewShared(here(), chunk);
result.add(chunk);
} else if (r0.from < range.from && r0.to > range.to) {
if (attemptSplitChunkAtTwoPoints(range)) {
final RangedList<T> c = getChunk(range);
addNewShared(here(), c);
result.add(c);
} else {
throw new ConcurrentModificationException();
}
} else {
final long splitPoint = (r0.from >= range.from) ? range.to : range.from;
final LongRange rRange = (r0.from >= range.from) ? new LongRange(r0.from, range.to)
: new LongRange(range.from, r0.to);
if (attemptSplitChunkAtSinglePoint(new LongRange(splitPoint))) {
final RangedList<T> c = getChunk(rRange);
addNewShared(here(), c);
result.add(c);
} else {
throw new ConcurrentModificationException();
}
}
});
}
return result;
}
/**
* Performs the provided operation on each {@link Chunk}s that are already
* shared within the place group and overlapped with the given range.
*
* @param range range to be scanned
* @param func operation to make on each chunk
*/
public void forEachSharedChunk(LongRange range, Consumer<RangedList<T>> func) {
shared.forEachChunk(range, func);
}
/**
* Performs the provided operation on each element contained in already shared
* {@link Chunk} which owner place is here and overlapped with the given range.
*
* @param range range to be scanned
* @param func operation to make on each element
*/
public void forEachSharedOwner(LongRange range, Consumer<T> func) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (shared2owner.get(r0).equals(here())) {
if (!range.contains(r0.getRange())) {
r0 = r0.subList(range);
}
r0.forEach(func);
}
});
}
/**
* Performs the provided operation on each element contained in already shared
* {@link Chunk} which owner place is here and overlapped with the given range.
*
* @param range range to be scanned
* @param func to action to perform on each pair of ({@code long} key and (T)
* element
*/
public void forEachSharedOwner(LongRange range, LongTBiConsumer<T> func) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (shared2owner.get(r0).equals(here())) {
if (!range.contains(r0.getRange())) {
r0 = r0.subList(range);
}
r0.forEach(func);
}
});
}
/**
* Returns a newly created snapshot of the current distribution of this
* collection as a {@link LongRangeDistribution}. This returned distribution's
* contents will become out-of-date if the contents of this class are relocated,
* added, and/or removed. <b>The distribution does not include shared
* ranges.</b>
* <p>
* If you need a {@link LongRangeDistribution} to remain up-to-date with the
* actual distribution of a {@link DistCol}, considers using
* {@link #registerDistribution(UpdatableDistribution)}. By registering a
* {@link LongRangeDistribution}, changes in the distribution of entries of this
* {@link DistCol} will be reflected in the {@link LongRangeDistribution} object
* when the distribution information of {@link DistCol} is updated and
* synchronized between hosts using {@link #updateDist()}. This is more
* efficient than allocating a new {@link LongRangeDistribution} object each
* time the distribution of the distributed collection changes.
*
* @return a new {@link LongRangeDistribution} object representing the current
* distribution of this collection
*/
@Override
public LongRangeDistribution getDistribution() {
return super.getDistribution();
}
/**
* Returns a place where a given chunk is owned.
*
* @param chunk to find owner place.
* @return a place a place where a given chunk is owned.
*/
public Place getSharedOwner(RangedList<T> chunk) {
return shared2owner.get(chunk);
}
@Override
protected void moveAtSync(final List<RangedList<T>> cs, final Place dest, final MoveManager mm) {
// check or filter out shared ones.
for (final RangedList<T> c : cs) {
assertUnshared(c.getRange());
}
super.moveAtSync(cs, dest, mm);
}
/**
* Conduct reduce operation on chunks that are already shared with other places
* in the given ranges. The reduced result is stored in owner chunks. The user
* must call each of the reduce methods of a cachable chunked list in all the
* place belonging to the place group.
*
* @param ranges the list of ranges in which chunks are applied to the
* operation.
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param <U> the type of the extracted data
*/
public <U> void reduce(List<LongRange> ranges, Function<T, U> pack, SerializableBiConsumer<T, U> unpack) {
final CollectiveMoveManager mm = new CollectiveMoveManager(placeGroup());
reduce(ranges, pack, unpack, mm);
try {
mm.sync();
} catch (final Exception e) {
e.printStackTrace();
throw new Error("Exception raised during CachbleArray#reduce().");
}
}
/**
* Conduct reduce operation on chunks that are already shared with other places
* in the given ranges. The reduced result is stored in owner chunks. The user
* must call each of the reduce methods of a cachable chunked list in all the
* place belonging to the place group.
*
* @param ranges the list of ranges in which chunks are applied to the
* operation.
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param mm You can relocate multiple cachable chunked lists using the same
* collective relocator, specified with {@code mm}.
* @param <U> the type of the extracted data
*/
public <U> void reduce(List<LongRange> ranges, final Function<T, U> pack, final SerializableBiConsumer<T, U> unpack,
CollectiveMoveManager mm) {
final CachableChunkedList<T> toBranch = this;
for (final Place p : placeGroup().places()) {
if (p.equals(here())) {
continue;
}
final List<RangedList<T>> chunks = searchSharedChunks(p, ranges);
final Serializer serProcess = (ObjectOutput s) -> {
s.writeInt(chunks.size());
for (final RangedList<T> chunk : chunks) {
s.writeObject(chunk.getRange());
chunk.forEach((T elem) -> {
s.writeObject(pack.apply(elem));
});
}
};
final DeSerializer desProcess = (ObjectInput ds) -> {
final int n = ds.readInt();
for (int i = 0; i < n; i++) {
final LongRange range0 = (LongRange) ds.readObject();
if (!toBranch.containsRange(range0)) {
throw new ConcurrentModificationException(
"The specified range seems to be remove from " + toBranch + " at " + here());
}
toBranch.forEach(range0, (T elem) -> {
@SuppressWarnings("unchecked")
final U diff = (U) ds.readObject();
unpack.accept(elem, diff);
});
}
};
mm.request(p, serProcess, desProcess);
}
}
@Override
public RangedList<T> remove(final LongRange r) {
assertUnshared(r);
return super.remove(r);
}
private List<RangedList<T>> searchSharedChunks(List<LongRange> ranges) {
final ArrayList<RangedList<T>> result = new ArrayList<>();
for (final LongRange range : ranges) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (range.contains(r0.getRange())) {
result.add(r0);
} else {
result.add(r0.subList(range));
}
});
}
return result;
}
private List<RangedList<T>> searchSharedChunks(Place owner, List<LongRange> ranges) {
final ArrayList<RangedList<T>> result = new ArrayList<>();
for (final LongRange range : ranges) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (shared2owner.get(r0).equals(owner)) {
if (range.contains(r0.getRange())) {
result.add(r0);
} else {
result.add(r0.subList(range));
}
}
});
}
return result;
}
@Override
public void setProxyGenerator(Function<Long, T> func) {
throw new UnsupportedOperationException("CachableChunkedList does not support proxy feature.");
}
/**
* Calls for the sharing of chunks between the local handles of the
* {@link CachableChunkedList}, but with this local handle not sharing any chunk
* with the other handles. Instead, it will only receive the chunked shared by
* the other handles.
*/
public void share() {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
share(Collections.emptyList(), mm);
mm.execute();
}
/**
* conduct broadcast operation on chunks that are not shared with other places
* yet. The user must call each of the share methods of a cachable chunked list
* in all the place belonging to the place group. This method should not be
* called simultaneously with other collective methods. The caller place is
* treated as the owner even if the chunks become shared.
* <p>
* Note 1: if you want to share all the local chunks, please call
* {@link #share()}
* <p>
* Note 2: if you want to specify multiple ranges, please use
* {@link #share(List)}.
* <p>
* Note 3: if you don't want to share any local chunks from the called place,
* please specify an empty range or an empty list of ranges.
* <p>
* Note 4: if you want to conduct the relocation process of multiple cachable
* chunked lists using the same ObjectOutput(Stream), please prepare an instance
* of {@link CollectiveRelocator.Allgather} first and call the relocation
* methods of the cachable chunked lists in the same order specifying the
* collective relocator as a parameter, and finally call the execute method of
* the relocator.
*
* @param mm You can relocate multiple cachable chunked lists using the same
* collective relocator, specified with {@code mm}.
*/
public void share(CollectiveRelocator.Allgather mm) {
share(Collections.singletonList(null), mm);
}
/**
* conduct broadcast operation on chunks that are not shared with other places
* yet. The user must call each of the share methods of a cachable chunked list
* in all the place belonging to the place group. This method should not be
* called simultaneously with other collective methods. The caller place is
* treated as the owner even if the chunks become shared.
* <p>
* Note 1: if you want to share all the local chunks, please call
* {@link #share()}
* <p>
* Note 2: if you want to specify multiple ranges, please use
* {@link #share(List)}.
* <p>
* Note 3: if you don't want to share any local chunks from the called place,
* please specify an empty range or an empty list of ranges.
* <p>
* Note 4: if you want to conduct the relocation process of multiple cachable
* chunked lists using the same ObjectOutput(Stream), please prepare an instance
* of {@link CollectiveRelocator.Allgather} first and call the relocation
* methods of the cachable chunked lists in the same order specifying the
* collective relocator as a parameter, and finally call the execute method of
* the relocator.
*
* @param ranges The library scans the ranges and exports (the parts of) the
* local chunks in the ranges.
*/
public void share(List<LongRange> ranges) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
share(ranges, mm);
mm.execute();
}
/**
* conduct broadcast operation on chunks that are not shared with other places
* yet. The user must call each of the share methods of a cachable chunked list
* in all the place belonging to the place group. This method should not be
* called simultaneously with other collective methods. The caller place is
* treated as the owner even if the chunks become shared.
* <p>
* Note 1: if you want to share all the local chunks, please call
* {@link #share()}
* <p>
* Note 2: if you want to specify multiple ranges, please use
* {@link #share(List)}.
* <p>
* Note 3: if you don't want to share any local chunks from the called place,
* please specify an empty range or an empty list of ranges.
* <p>
* Note 4: if you want to conduct the relocation process of multiple cachable
* chunked lists using the same ObjectOutput(Stream), please prepare an instance
* of {@link CollectiveRelocator.Allgather} first and call the relocation
* methods of the cachable chunked lists in the same order specifying the
* collective relocator as a parameter, and finally call the execute method of
* the relocator.
*
* @param ranges The library scans the ranges and exports (the parts of) the
* local chunks in the ranges.
* @param mm You can relocate multiple cachable chunked lists using the same
* collective relocator, specified with {@code mm}.
*/
public void share(final List<LongRange> ranges, CollectiveRelocator.Allgather mm) {
final List<RangedList<T>> chunks = exportLocalChunks(ranges);
final Serializer serProcess = (ObjectOutput s) -> {
s.writeObject(chunks);
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place sender) -> {
@SuppressWarnings("unchecked")
final List<RangedList<T>> received = (List<RangedList<T>>) ds.readObject();
addNewShared(sender, received);
};
mm.request(serProcess, desProcess);
}
/**
* conduct broadcast operation on chunks that are not shared with other places
* yet. The user must call each of the share methods of a cachable chunked list
* in all the place belonging to the place group. This method should not be
* called simultaneously with other collective methods. The caller place is
* treated as the owner even if the chunks become shared.
* <p>
* Note 1: if you want to share all the local chunks, please call
* {@link #share()}
* <p>
* Note 2: if you want to specify multiple ranges, please use
* {@link #share(List)}.
* <p>
* Note 3: if you don't want to share any local chunks from the called place,
* please specify an empty range or an empty list of ranges.
* <p>
* Note 4: if you want to conduct the relocation process of multiple cachable
* chunked lists using the same ObjectOutput(Stream), please prepare an instance
* of {@link CollectiveRelocator.Allgather} first and call the relocation
* methods of the cachable chunked lists in the same order specifying the
* collective relocator as a parameter, and finally call the execute method of
* the relocator.
*
* @param range The library scans the range and exports (the parts of) the local
* chunks in the range.
*/
public void share(LongRange range) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
share(Collections.singletonList(range), mm);
mm.execute();
}
/**
* conduct broadcast operation on chunks that are not shared with other places
* yet. The user must call each of the share methods of a cachable chunked list
* in all the place belonging to the place group. This method should not be
* called simultaneously with other collective methods. The caller place is
* treated as the owner even if the chunks become shared.
* <p>
* Note 1: if you want to share all the local chunks, please call
* {@link #share()}
* <p>
* Note 2: if you want to specify multiple ranges, please use
* {@link #share(List)}.
* <p>
* Note 3: if you don't want to share any local chunks from the called place,
* please specify an empty range or an empty list of ranges.
* <p>
* Note 4: if you want to conduct the relocation process of multiple cachable
* chunked lists using the same ObjectOutput(Stream), please prepare an instance
* of {@link CollectiveRelocator.Allgather} first and call the relocation
* methods of the cachable chunked lists in the same order specifying the
* collective relocator as a parameter, and finally call the execute method of
* the relocator.
*
* @param range The library scans the range and exports (the parts of) the local
* chunks in the range.
* @param mm You can relocate multiple cachable chunked lists using the same
* collective relocator, specified with {@code mm}.
*/
public void share(LongRange range, CollectiveRelocator.Allgather mm) {
share(Collections.singletonList(range), mm);
}
/**
* Returns ChunkedList contains chunks that are already shared within the place
* group. The returned shared chunkedList has an unmodifiable structure.
* Operations such as add and remove cannot be performed.
*
* @return ChunkedList contains chunks that are already shared
*/
public ChunkedList<T> sharedChunks() {
return new ChunkedList.UnmodifiableView<>(shared);
}
@Override
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = manager.placeGroup;
final GlobalID id1 = id();
return new LazyObjectReference<>(pg1, id1, () -> {
return new CachableChunkedList<>(pg1, id1);
});
}
// TODO
// prepare documents for the following methods
// getSizedistribution: only returns unshared
// getDistribution: only returns unshared
// getRangedDistribution: only returns unshared
// TEAM: only support DistCol methods
}