CachableChunkedList.java
package handist.collections.dist;
import static apgas.Constructs.*;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import apgas.Place;
import apgas.util.GlobalID;
import handist.collections.ChunkedList;
import handist.collections.LongRange;
import handist.collections.RangedList;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.DeSerializerUsingPlace;
import handist.collections.function.LongTBiConsumer;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.Serializer;
public class CachableChunkedList<T> extends DistCol<T> {
// static class Team<S> extends TeamOperations<S, CachableChunkedList<S>> {
//
// /**
// * Super constructor. Needs to be called by all implementations to initialize
// * the necessary members common to all Team handles.
// *
// * @param localObject local handle of the distributed collection
// */
// public Team(CachableChunkedList<S> localObject) {
// super(localObject);
// }
// @Override
// public void gather(Place root) {
// throw new UnsupportedOperationException("CachableChunkedList does not support gather().");
// }
// @Override
// public void teamedBalance(CollectiveMoveManager mm) {
// throw new UnsupportedOperationException("CachableChunkedList does not support balance operations.");
// }
// }
protected ChunkedList<T> shared = new ChunkedList<>();
protected HashMap<RangedList<T>, Place> shared2owner = new HashMap<>();
public CachableChunkedList(final TeamedPlaceGroup pg) {
this(pg, new GlobalID());
}
private CachableChunkedList(final TeamedPlaceGroup placeGroup, final GlobalID id) {
super(placeGroup, id, (TeamedPlaceGroup pg, GlobalID gid) -> new CachableChunkedList<>(pg, gid));
}
private void addNewShared(Place owner, List<RangedList<T>> chunks) {
for (final RangedList<T> chunk : chunks) {
if (!owner.equals(here())) {
add(chunk);
}
shared.add(chunk);
shared2owner.put(chunk, owner);
}
}
private void addNewShared(Place owner, RangedList<T> chunk) {
if (!owner.equals(here())) {
add(chunk);
}
shared.add(chunk);
shared2owner.put(chunk, owner);
}
public <U> void allreduce(Function<T, U> pack, BiConsumer<T, U> unpack) {
allreduce(new ArrayList<>(ranges()), pack, unpack); // TODO: not good, copying ranges to arraylist
}
public <U> void allreduce(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Allgather mm) {
allreduce(new ArrayList<>(ranges()), pack, unpack, mm); // TODO: not good, copying ranges to arraylist
}
public <U> void allreduce(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
allreduce(ranges, pack, unpack, mm);
mm.execute();
}
/**
* conduct allreduce operation on shared chunks in the given range. Note: please
* use the same ranges in all the places.
*
* @param ranges the list of ranges in which chunks are applied to the
* operation.
* @param pack the function that receives an element and extracts data that
* will be transferred to other places and be reduced by the
* unpack operation.
* @param unpack the function that receives a local element and the transferred
* data from each place and conducts reduction operation to the
* local element.
* @param mm the collective relocator to manage serialize process
* @param <U> the type of the extracted data
*/
public <U> void allreduce(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack,
CollectiveRelocator.Allgather mm) {
final List<RangedList<T>> chunks = searchSharedChunks(ranges);
final Serializer serProcess = (ObjectOutput s) -> {
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T elem) -> {
s.writeObject(pack.apply(elem));
});
}
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place place) -> {
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T elem) -> {
@SuppressWarnings("unchecked")
final U diff = (U) ds.readObject();
unpack.accept(elem, diff);
});
}
};
mm.request(serProcess, desProcess);
}
private void assertUnshared(LongRange r) {
if (!searchSharedChunks(Collections.singletonList(r)).isEmpty()) {
throw new IllegalStateException("CachableChunkedList found shared chunks in range: " + r);
}
}
public <U> void bcast(Function<T, U> pack, BiConsumer<T, U> unpack) {
bcast((LongRange) null, pack, unpack);
}
public <U> void bcast(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Allgather mm) {
bcast((LongRange) null, pack, unpack, mm);
}
public <U> void bcast(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
bcast(ranges, pack, unpack, mm);
mm.execute();
}
/**
* conduct broadcast operation on chunks that are already shared within the
* place group. The user must call each of the broadcast methods of a cachable
* chunked list in all the place belonging to the place group.
*
* @param ranges
* @param pack
* @param unpack
* @param mm
* @param <U>
*/
public <U> void bcast(List<LongRange> ranges, Function<T, U> pack, BiConsumer<T, U> unpack,
CollectiveRelocator.Allgather mm) {
final List<RangedList<T>> chunks = searchSharedChunks(here(), ranges);
final Serializer serProcess = (ObjectOutput s) -> {
s.writeObject(ranges);
for (final RangedList<T> chunk : chunks) {
chunk.forEach((T elem) -> {
s.writeObject(pack.apply(elem));
});
}
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place p) -> {
if (p.equals(here())) {
return;
}
@SuppressWarnings("unchecked")
final List<LongRange> rangesX = (List<LongRange>) ds.readObject();
final List<RangedList<T>> receiving = searchSharedChunks(p, rangesX);
for (final RangedList<T> chunk : receiving) {
chunk.forEach((T elem) -> {
@SuppressWarnings("unchecked")
final U diff = (U) ds.readObject();
unpack.accept(elem, diff);
});
}
};
mm.request(serProcess, desProcess);
}
public <U> void bcast(LongRange range, Function<T, U> pack, BiConsumer<T, U> unpack) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
bcast(range, pack, unpack, mm);
mm.execute();
}
public <U> void bcast(LongRange range, Function<T, U> pack, BiConsumer<T, U> unpack,
CollectiveRelocator.Allgather mm) {
bcast(Collections.singletonList(range), pack, unpack, mm);
}
@Override
public void clear() {
// TODO
// The super of clear() assums teamed operation of clear();
}
private List<RangedList<T>> exportLocalChunks(List<LongRange> ranges) {
// TODO
// Should we check the overlaps in ranges?
final ArrayList<RangedList<T>> result = new ArrayList<>();
for (final LongRange range : ranges) {
forEachChunk(range, (RangedList<T> chunk) -> {
final LongRange r0 = chunk.getRange();
if (range.contains(r0)) {
addNewShared(here(), chunk);
result.add(chunk);
} else if (r0.from < range.from && r0.to > range.to) {
if (attemptSplitChunkAtTwoPoints(range)) {
final RangedList<T> c = getChunk(range);
addNewShared(here(), c);
result.add(c);
} else {
throw new ConcurrentModificationException();
}
} else {
final long splitPoint = (r0.from >= range.from) ? range.to : range.from;
final LongRange rRange = (r0.from >= range.from) ? new LongRange(r0.from, range.to)
: new LongRange(range.from, r0.to);
if (attemptSplitChunkAtSinglePoint(new LongRange(splitPoint))) {
final RangedList<T> c = getChunk(rRange);
addNewShared(here(), c);
result.add(c);
} else {
throw new ConcurrentModificationException();
}
}
});
}
return result;
}
public void forEachSharedChunk(LongRange range, Consumer<RangedList<T>> func) {
shared.forEachChunk(range, func);
}
public void forEachSharedOwner(LongRange range, Consumer<T> func) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (shared2owner.get(r0).equals(here())) {
if (!range.contains(r0.getRange())) {
r0 = r0.subList(range);
}
r0.forEach(func);
}
});
}
public void forEachSharedOwner(LongRange range, LongTBiConsumer<T> func) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (shared2owner.get(r0).equals(here())) {
if (!range.contains(r0.getRange())) {
r0 = r0.subList(range);
}
r0.forEach(func);
}
});
}
public Place getSharedOwner(RangedList<T> chunk) {
return shared2owner.get(chunk);
}
@Override
protected void moveAtSync(final List<RangedList<T>> cs, final Place dest, final MoveManager mm) {
// check or filter out shared ones.
for (final RangedList<T> c : cs) {
assertUnshared(c.getRange());
}
super.moveAtSync(cs, dest, mm);
}
public <U> void reduce(List<LongRange> ranges, Function<T, U> pack, SerializableBiConsumer<T, U> unpack) {
final CollectiveMoveManager mm = new CollectiveMoveManager(placeGroup());
reduce(ranges, pack, unpack, mm);
try {
mm.sync();
} catch (final Exception e) {
e.printStackTrace();
throw new Error("Exception raised during CachbleArray#reduce().");
}
}
public <U> void reduce(List<LongRange> ranges, final Function<T, U> pack, final SerializableBiConsumer<T, U> unpack,
CollectiveMoveManager mm) {
final CachableChunkedList<T> toBranch = this;
for (final Place p : placeGroup().places()) {
if (p.equals(here())) {
continue;
}
final List<RangedList<T>> chunks = searchSharedChunks(p, ranges);
final Serializer serProcess = (ObjectOutput s) -> {
s.writeInt(chunks.size());
for (final RangedList<T> chunk : chunks) {
s.writeObject(chunk.getRange());
chunk.forEach((T elem) -> {
s.writeObject(pack.apply(elem));
});
}
};
final DeSerializer desProcess = (ObjectInput ds) -> {
final int n = ds.readInt();
for (int i = 0; i < n; i++) {
final LongRange range0 = (LongRange) ds.readObject();
if (!toBranch.containsRange(range0)) {
throw new ConcurrentModificationException(
"The specified range seems to be remove from " + toBranch + " at " + here());
}
toBranch.forEach(range0, (T elem) -> {
@SuppressWarnings("unchecked")
final U diff = (U) ds.readObject();
unpack.accept(elem, diff);
});
}
};
mm.request(p, serProcess, desProcess);
}
}
@Override
public RangedList<T> remove(final LongRange r) {
assertUnshared(r);
return super.remove(r);
}
private List<RangedList<T>> searchSharedChunks(List<LongRange> ranges) {
final ArrayList<RangedList<T>> result = new ArrayList<>();
for (final LongRange range : ranges) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (range.contains(r0.getRange())) {
result.add(r0);
} else {
result.add(r0.subList(range));
}
});
}
return result;
}
private List<RangedList<T>> searchSharedChunks(Place owner, List<LongRange> ranges) {
final ArrayList<RangedList<T>> result = new ArrayList<>();
for (final LongRange range : ranges) {
shared.forEachChunk(range, (RangedList<T> r0) -> {
if (shared2owner.get(r0).equals(owner)) {
if (range.contains(r0.getRange())) {
result.add(r0);
} else {
result.add(r0.subList(range));
}
}
});
}
return result;
}
@Override
public void setProxyGenerator(Function<Long, T> func) {
throw new UnsupportedOperationException("CachableChunkedList does not support proxy feature.");
}
public void share() {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
share(Collections.singletonList(null), mm);
mm.execute();
}
public void share(CollectiveRelocator.Allgather mm) {
share(Collections.singletonList(null), mm);
}
public void share(List<LongRange> ranges) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
share(ranges, mm);
mm.execute();
}
/**
* conduct broadcast operation on chunks that are not shared with other places
* yet. The user must call each of the share methods of a cachable chunked list
* in all the place belonging to the place group. This method should not be
* called simultaneously with other collective methods. The caller place is
* treated as the owner even if the chunks become shared.
*
* Note 1: if you want to share all the local chunks, please call
* {@code share()}. Note 2: if you want to specify multiple ranges, please use
* {@code share(List<LongRange>)}. Note 3: if you don't want to share any local
* chunks from the called place, please specify an empty range or an empty list
* of ranges. Note 3: if you want to conduct relocate process of multiple
* cachable chunked lists using the same ObjectOutput(Stream), please prepare an
* instance of {@link CollectiveRelocator.Allgather} first and call the
* relocation methods of the cachable chunked lists in the same order specifying
* the collective relocator as a parameter, and finally call the execute method
* of the relocator.
*
* @param ranges The library scans the ranges and exports (the parts of) the
* local chunks in the ranges.
* @param mm You can relocate multiple cachable chunked lists using the same
* collective relocator, specified with {@code mm}.
*/
public void share(final List<LongRange> ranges, CollectiveRelocator.Allgather mm) {
final List<RangedList<T>> chunks = exportLocalChunks(ranges);
final Serializer serProcess = (ObjectOutput s) -> {
s.writeObject(chunks);
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place sender) -> {
@SuppressWarnings("unchecked")
final List<RangedList<T>> received = (List<RangedList<T>>) ds.readObject();
addNewShared(sender, received);
};
mm.request(serProcess, desProcess);
}
public void share(LongRange range) {
final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup());
share(Collections.singletonList(range), mm);
mm.execute();
}
public void share(LongRange range, CollectiveRelocator.Allgather mm) {
share(Collections.singletonList(range), mm);
}
public ChunkedList<T> sharedChunks() {
return new ChunkedList.UnmodifiableView<>(shared);
}
@Override
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = manager.placeGroup;
final GlobalID id1 = id();
return new LazyObjectReference<>(pg1, id1, () -> {
return new CachableChunkedList<>(pg1, id1);
});
}
// TODO
// prepare documents for the following methods
// getSizedistribution: only returns unshared
// getDistribution: only returns unshared
// getRangedDistribution: only returns unshared
// TEAM: only support DistCol methods
}