DistCol.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.dist;
import static apgas.Constructs.*;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import apgas.Place;
import apgas.util.GlobalID;
import apgas.util.SerializableWithReplace;
import handist.collections.ChunkedList;
import handist.collections.ElementOverlapException;
import handist.collections.LongRange;
import handist.collections.RangedList;
import handist.collections.dist.util.IntLongPair;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.MemberOfLazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.dist.util.Pair;
import handist.collections.function.DeSerializer;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
import handist.collections.function.Serializer;
import handist.collections.glb.DistColGlb;
/**
* A class for handling objects at multiple places. It is allowed to add new
* elements dynamically. This class provides the method for load balancing.
* <p>
* Note: In the current implementation, there are some limitations.
* <ul>
* <li>There is only one load balancing method: the method flattens the number
* of elements of the all places.
* </ul>
*
* @param <T> the type of elements handled by this {@link DistCol}
*/
@DefaultSerializer(JavaSerializer.class)
public class DistCol<T> extends ChunkedList<T>
implements DistributedCollection<T, DistCol<T>>, RangeRelocatable<LongRange>, SerializableWithReplace {
/**
* Class used to identify pieces of chunks when a chunk is split in two and each
* instance is transferred to another place. This class implements a sort of
* view of the separated pieces.
*
* @param <T> type of the individual element contained in the chunk
* @see ChunkExtractMiddle
* @see ChunkExtractRight
*/
static class ChunkExtractLeft<T> {
/** Original RangedList that was split */
public RangedList<T> original;
/** index at which the ranged list was split */
public long splitPoint;
/**
* Constructor. The original ranged list and the index at which it was split are
* specified as parameters
*
* @param original ranged list which is being split
* @param splitPoint index at which the ranged list is split
*/
ChunkExtractLeft(final RangedList<T> original, final long splitPoint) {
this.original = original;
this.splitPoint = splitPoint;
}
/**
* Obtain the various RangedList that are now being handled as a result of
* separating a Ranged list at the index specified by this instance
*
* @return the multiple ranged list that result of the original being split at
* the index specified
*/
List<RangedList<T>> extract() {
return original.splitRange(splitPoint);
}
}
/**
* Class used to identify pieces of chunks when a chunk is split in two and each
* instance is transferred to another place. This class implements a sort of
* view of the separated pieces.
*
* @param <T> type of the individual element contained in the chunk
* @see ChunkExtractLeft
* @see ChunkExtractRight
*/
static class ChunkExtractMiddle<T> {
/** Original RangedList whose piece is being considered for splitting */
public RangedList<T> original;
/** First index at which the ranged list is being split */
public long splitPoint1;
/** First index at which the ranged list is being split */
public long splitPoint2;
/**
* Constructor. The original ranged list and the index at which it was split are
* specified as parameters
*
* @param original ranged list which is being split
* @param splitPoint1 first index at which the ranged list is split
* @param splitPoint2 second index at which the ranged list is split
*/
ChunkExtractMiddle(final RangedList<T> original, final long splitPoint1, final long splitPoint2) {
this.original = original;
this.splitPoint1 = splitPoint1;
this.splitPoint2 = splitPoint2;
}
/**
* Obtain the various RangedList that are now being handled as a result of
* separating a Ranged list at the index specified by this instance
*
* @return the multiple ranged list that result of the original being split at
* the indices specified
*/
List<RangedList<T>> extract() {
return original.splitRange(splitPoint1, splitPoint2);
}
}
/**
* Class used to identify pieces of chunks when a chunk is split in two and each
* instance is transferred to another place. This class implements a sort of
* view of the separated pieces.
*
* @param <T> type of the individual element contained in the chunk
* @see ChunkExtractLeft
* @see ChunkExtractMiddle
*/
static class ChunkExtractRight<T> {
/** Original ranged list considered for splitting */
public RangedList<T> original;
/** Index at which the ranged list is being split */
public long splitPoint;
/**
* Constructor. The original ranged list and the index at which it was split are
* specified as parameters
*
* @param original ranged list which is being split
* @param splitPoint index at which the ranged list is split
*/
ChunkExtractRight(final RangedList<T> original, final long splitPoint) {
this.original = original;
this.splitPoint = splitPoint;
}
/**
* Obtain the various RangedList that are now being handled as a result of
* separating a Ranged list at the index specified by this instance
*
* @return the multiple ranged list that result of the original being split at
* the index specified
*/
List<RangedList<T>> extract() {
return original.splitRange(splitPoint);
}
}
/**
* Global handle for the {@link DistCol} class. This class make operations
* operating on all the local handles of the distributed collections accessible.
*
* @author Patrick Finnerty
*
*/
public class DistColGlobal extends GlobalOperations<T, DistCol<T>> implements Serializable {
/** Serial Version UID */
private static final long serialVersionUID = 4584810477237588857L;
/**
* Constructor
*
* @param handle handle to the local DistCol instance
*/
public DistColGlobal(DistCol<T> handle) {
super(handle);
}
@Override
public void onLocalHandleDo(SerializableBiConsumer<Place, DistCol<T>> action) {
localHandle.placeGroup().broadcastFlat(() -> {
action.accept(here(), localHandle);
});
}
@Override
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = localHandle.placeGroup();
final GlobalID id1 = localHandle.id();
return new MemberOfLazyObjectReference<>(pg1, id1, () -> {
return new DistCol<>(pg1, id1);
}, (instanceOfDistCol) -> {
return instanceOfDistCol.GLOBAL;
});
}
}
/**
* TEAM handle for methods that can be called concurrently to other methods on
* all local handles of {@link DistCol}
*
* @author Patrick Finnerty
*
*/
public class DistColTeam extends TeamOperations<T, DistCol<T>> implements Serializable {
/** Serial Version UID */
private static final long serialVersionUID = -5392694230295904290L;
/**
* Constructor
*
* @param handle local handle of {@link DistCol} that this instance is managing
*/
DistColTeam(DistCol<T> handle) {
super(handle);
}
@Override
public void size(long[] result) {
for (final Map.Entry<LongRange, Place> entry : ldist.dist.entrySet()) {
final LongRange k = entry.getKey();
final Place p = entry.getValue();
result[manager.placeGroup.rank(p)] += k.size();
}
}
@Override
public void updateDist() {
team_updateDist();
}
}
static class DistributionManager<T> extends GeneralDistManager<DistCol<T>> implements Serializable {
/** Serial Version UID */
private static final long serialVersionUID = 456677767130722164L;
public DistributionManager(TeamedPlaceGroup placeGroup, GlobalID id, DistCol<T> branch) {
super(placeGroup, id, branch);
}
@Override
public void checkDistInfo(long[] result) {
for (final Map.Entry<LongRange, Place> entry : branch.ldist.dist.entrySet()) {
final LongRange k = entry.getKey();
final Place p = entry.getValue();
result[placeGroup.rank(p)] += k.size();
}
}
@Override
protected void moveAtSyncCount(ArrayList<IntLongPair> moveList, CollectiveMoveManager mm) throws Exception {
branch.moveAtSyncCount(moveList, mm);
}
}
private static int _debug_level = 5;
private static float[] initialLocality(final int size) {
final float[] result = new float[size];
Arrays.fill(result, 1.0f);
return result;
}
/**
* Handle to operations that can benefit from load balance when called inside an
* "underGLB" method.
*/
public transient final DistColGlb<T> GLB;
/**
* Handle to Global Operations implemented by {@link DistCol}.
*/
public transient final DistColGlobal GLOBAL;
/**
* Internal class that handles distribution-related operations.
*/
protected final transient DistManager<LongRange> ldist;
protected transient DistributionManager<T> manager;
/**
* Function kept and used when the local handle does not contain the specified
* index in method {@link #get(long)}. This proxy will return a value to be
* returned by the {@link #get(long)} method rather than throwing an
* {@link Exception}.
*/
private Function<Long, T> proxyGenerator;
/**
* Handle to Team Operations implemented by {@link DistCol}.
*/
public final transient DistColTeam TEAM;
/**
* Create a new DistCol. All the hosts participating in the distributed
* computation are susceptible to handle the created instance. This constructor
* is equivalent to calling {@link #DistCol(TeamedPlaceGroup)} with
* {@link TeamedPlaceGroup#getWorld()} as argument.
*/
public DistCol() {
this(TeamedPlaceGroup.getWorld());
}
/**
* Constructor for {@link DistCol} whose handles are restricted to the
* {@link TeamedPlaceGroup} passed as parameter.
*
* @param placeGroup the places on which the DistCol will hold handles.
*/
public DistCol(final TeamedPlaceGroup placeGroup) {
this(placeGroup, new GlobalID());
}
/**
* Private constructor by which the locality and the GlobalId associated with
* the {@link DistCol} are explicitly given. This constructor should only be
* used internally when creating the local handle of a DistCol already created
* on a remote place. Calling this constructor with an existing {@link GlobalID}
* which is already linked with existing and potentially different objects could
* prove disastrous.
*
* @param placeGroup the hosts on which the distributed collection the created
* instance may have handles on
* @param id the global id used to identify all the local handles
*/
private DistCol(final TeamedPlaceGroup placeGroup, final GlobalID id) {
super();
id.putHere(this);
manager = new DistributionManager<>(placeGroup, id, this);
manager.locality = initialLocality(placeGroup.size);
ldist = new DistManager<>();
TEAM = new DistColTeam(this);
GLOBAL = new DistColGlobal(this);
GLB = new DistColGlb<>(this);
}
@Override
public void add(final RangedList<T> c) throws ElementOverlapException {
ldist.add(c.getRange());
super.add(c);
}
@Override
public void add_unchecked(RangedList<T> c) {
ldist.add(c.getRange());
super.add_unchecked(c);
}
@Override
public void clear() {
super.clear();
ldist.clear();
Arrays.fill(manager.locality, 1.0f);
}
@Override
public void forEach(SerializableConsumer<T> action) {
super.forEach(action);
}
/**
* Return the value corresponding to the specified index.
*
* If the specified index is not located on this place, a
* {@link IndexOutOfBoundsException} will be raised, except if a proxy generator
* was set for this instance, in which case the value generated by the proxy is
* returned.
*
* @param index index whose value needs to be retrieved
* @throws IndexOutOfBoundsException if the specified index is not contained in
* this local collection and no proxy was
* defined
* @return the value corresponding to the provided index, or the value generated
* by the proxy if it was defined and the specified index is outside the
* range of indices of this local instance
* @see #setProxyGenerator(Function)
*/
@Override
public T get(long index) {
if (proxyGenerator == null) {
return super.get(index);
} else {
try {
return super.get(index);
} catch (final IndexOutOfBoundsException e) {
return proxyGenerator.apply(index);
}
}
}
@Override
public Collection<LongRange> getAllRanges() {
return ranges();
}
Map<LongRange, Integer> getDiff() {
return ldist.diff;
}
public ConcurrentHashMap<LongRange, Place> getDist() {
return ldist.dist;
}
public LongDistribution getDistributionLong() {
return LongDistribution.convert(getDist());
}
public LongRangeDistribution getRangedDistributionLong() {
return new LongRangeDistribution(getDist());
}
@Override
public GlobalOperations<T, DistCol<T>> global() {
return GLOBAL;
}
@Override
public int hashCode() {
return (int) id().gid();
}
@Override
public GlobalID id() {
return manager.id;
}
@Override
public float[] locality() {
// TODO check if this is correct
return manager.locality;
}
@Deprecated
@SuppressWarnings("unchecked")
private void moveAtSync(final List<RangedList<T>> cs, final Place dest, final MoveManager mm) {
if (_debug_level > 5) {
System.out.print("[" + here().id + "] moveAtSync List[RangedList[T]]: ");
for (final RangedList<T> rl : cs) {
System.out.print("" + rl.getRange() + ", ");
}
System.out.println(" dest: " + dest.id);
}
if (dest.equals(here())) {
return;
}
final DistCol<T> toBranch = this; // using plh@AbstractCol
final Serializer serialize = (ObjectOutput s) -> {
final ArrayList<Byte> keyTypeList = new ArrayList<>();
for (final RangedList<T> c : cs) {
keyTypeList.add(ldist.moveOut(c.getRange(), dest));
this.removeForMove(c.getRange());
}
s.writeObject(keyTypeList);
s.writeObject(cs);
};
final DeSerializer deserialize = (ObjectInput ds) -> {
final List<Byte> keyTypeList = (List<Byte>) ds.readObject();
final Iterator<Byte> keyTypeListIt = keyTypeList.iterator();
final List<RangedList<T>> chunks = (List<RangedList<T>>) ds.readObject();
for (final RangedList<T> c : chunks) {
final byte keyType = keyTypeListIt.next();
final LongRange key = c.getRange();
if (_debug_level > 5) {
System.out.println("[" + here() + "] putForMove key: " + key + " keyType: " + keyType);
}
toBranch.putForMove(c, keyType);
}
};
mm.request(dest, serialize, deserialize);
}
@Override
public void moveAtSyncCount(final ArrayList<IntLongPair> moveList, final MoveManager mm) throws Exception {
// TODO ->LinkedList? sort??
final ArrayList<LongRange> localKeys = new ArrayList<>();
localKeys.addAll(ranges());
localKeys.sort((LongRange range1, LongRange range2) -> {
final long len1 = range1.to - range1.from;
final long len2 = range2.to - range2.from;
return (int) (len1 - len2);
});
if (_debug_level > 5) {
System.out.print("[" + here() + "] ");
for (int i = 0; i < localKeys.size(); i++) {
System.out.print("" + localKeys.get(i).from + ".." + localKeys.get(i).to + ", ");
}
System.out.println();
}
for (final IntLongPair moveinfo : moveList) {
final long count = moveinfo.second;
final Place dest = manager.placeGroup.get(moveinfo.first);
if (_debug_level > 5) {
System.out.println("[" + here() + "] move count=" + count + " to dest " + dest.id);
}
if (dest.equals(here())) {
continue;
}
long sizeToSend = count;
while (sizeToSend > 0) {
final LongRange lk = localKeys.remove(0);
final long len = lk.to - lk.from;
if (len > sizeToSend) {
moveRangeAtSync(new LongRange(lk.from, lk.from + sizeToSend), dest, mm);
localKeys.add(0, new LongRange(lk.from + sizeToSend, lk.to));
break;
} else {
moveRangeAtSync(lk, dest, mm);
sizeToSend -= len;
}
}
}
}
public void moveRangeAtSync(final Distribution<Long> dist, final CollectiveMoveManager mm) {
moveRangeAtSync((LongRange range) -> {
final ArrayList<Pair<Place, LongRange>> listPlaceRange = new ArrayList<>();
for (final Long key : range) {
listPlaceRange.add(new Pair<>(dist.place(key), new LongRange(key, key + 1)));
}
return listPlaceRange;
}, mm);
}
public void moveRangeAtSync(Function<LongRange, List<Pair<Place, LongRange>>> rule, CollectiveMoveManager mm) {
final DistCol<T> collection = this;
final HashMap<Place, ArrayList<LongRange>> rangesToMove = new HashMap<>();
collection.forEachChunk((RangedList<T> c) -> {
final List<Pair<Place, LongRange>> destinationList = rule.apply(c.getRange());
for (final Pair<Place, LongRange> destination : destinationList) {
final Place destinationPlace = destination.first;
final LongRange destinationRange = destination.second;
if (!rangesToMove.containsKey(destinationPlace)) {
rangesToMove.put(destinationPlace, new ArrayList<LongRange>());
}
rangesToMove.get(destinationPlace).add(destinationRange);
}
});
for (final Place place : rangesToMove.keySet()) {
for (final LongRange range : rangesToMove.get(place)) {
moveRangeAtSync(range, place, mm);
}
}
}
@Override
public void moveRangeAtSync(final LongRange range, final Place dest, final MoveManager mm) {
if (_debug_level > 5) {
System.out.println("[" + here().id + "] moveAtSync range: " + range + " dest: " + dest.id);
}
final ArrayList<RangedList<T>> chunksToMove = new ArrayList<>();
final ArrayList<ChunkExtractLeft<T>> chunksToExtractLeft = new ArrayList<>();
final ArrayList<ChunkExtractMiddle<T>> chunksToExtractMiddle = new ArrayList<>();
final ArrayList<ChunkExtractRight<T>> chunksToExtractRight = new ArrayList<>();
// This method needs to be protected against concurrent calls
// We don't to risk splitting the same Chunk concurrently (as could be the case
// with the GLB)
synchronized (this) {
forEachChunk((RangedList<T> c) -> {
final LongRange cRange = c.getRange();
if (cRange.from <= range.from) {
if (cRange.to <= range.from) { // cRange.max < range.min) {
// skip
} else {
// range.min <= cRange.max
if (cRange.from == range.from) {
if (cRange.to <= range.to) {
// add cRange.min..cRange.max
chunksToMove.add(c);
} else {
// range.max < cRange.max
// split at range.max/range.max+1
// add cRange.min..range.max
chunksToExtractLeft.add(new ChunkExtractLeft<>(c, range.to/* max + 1 */));
}
} else {
// cRange.min < range.min
if (range.to < cRange.to) {
// split at range.min-1/range.min
// split at range.max/range.max+1
// add range.min..range.max
chunksToExtractMiddle
.add(new ChunkExtractMiddle<>(c, range.from, range.to/* max + 1 */));
} else {
// split at range.min-1/range.min
// cRange.max =< range.max
// add range.min..cRange.max
chunksToExtractRight.add(new ChunkExtractRight<>(c, range.from));
}
}
}
} else {
// range.min < cRange.min
if (range.to <= cRange.from) { // range.max < cRange.min) {
// skip
} else {
// cRange.min <= range.max
if (cRange.to <= range.to) {
// add cRange.min..cRange.max
chunksToMove.add(c);
} else {
// split at range.max/range.max+1
// add cRange.min..range.max
chunksToExtractLeft.add(new ChunkExtractLeft<>(c, range.to/* max + 1 */));
}
}
}
});
for (final ChunkExtractLeft<T> chunkToExtractLeft : chunksToExtractLeft) {
final RangedList<T> original = chunkToExtractLeft.original;
final List<RangedList<T>> splits = chunkToExtractLeft.extract();
// System.out.println("[" + here.id + "] putChunk " + splits.first.getRange());
add_unchecked(splits.get(0)/* first */);
// System.out.println("[" + here.id + "] putChunk " + splits.second.getRange());
add_unchecked(splits.get(1)/* second */);
// System.out.println("[" + here.id + "] removeChunk " + original.getRange());
remove(original);
chunksToMove.add(splits.get(0)/* first */);
}
for (final ChunkExtractMiddle<T> chunkToExtractMiddle : chunksToExtractMiddle) {
final RangedList<T> original = chunkToExtractMiddle.original;
final List<RangedList<T>> splits = chunkToExtractMiddle.extract();
// System.out.println("[" + here.id + "] putChunk " + splits.first.getRange());
add_unchecked(splits.get(0)/* first */);
// System.out.println("[" + here.id + "] putChunk " + splits.second.getRange());
add_unchecked(splits.get(1)/* second */);
// System.out.println("[" + here.id + "] putChunk " + splits.third.getRange());
add_unchecked(splits.get(2)/* third */);
// System.out.println("[" + here.id + "] removeChunk " + original.getRange());
remove(original);
chunksToMove.add(splits.get(1)/* second */);
}
for (final ChunkExtractRight<T> chunkToExtractRight : chunksToExtractRight) {
final RangedList<T> original = chunkToExtractRight.original;
final List<RangedList<T>> splits = chunkToExtractRight.extract();
// System.out.println("[" + here.id + "] putChunk " + splits.first.getRange());
add_unchecked(splits.get(0)/* first */);
// System.out.println("[" + here.id + "] putChunk " + splits.second.getRange());
add_unchecked(splits.get(1)/* second */);
// System.out.println("[" + here.id + "] removeChunk " + original.getRange());
remove(original);
chunksToMove.add(splits.get(1)/* second */);
}
} // End of the synchronized block
moveAtSync(chunksToMove, dest, mm);
}
public void moveRangeAtSync(final RangedDistribution<LongRange> dist, final CollectiveMoveManager mm)
throws Exception {
moveRangeAtSync((LongRange range) -> {
return dist.placeRanges(range);
}, mm);
}
@Override
public void parallelForEach(SerializableConsumer<T> action) {
super.parallelForEach(action);
}
@Override
public TeamedPlaceGroup placeGroup() {
return manager.placeGroup;
}
private void putForMove(final RangedList<T> c, final byte mType) throws Exception {
final LongRange key = c.getRange();
switch (mType) {
case DistManager.MOVE_NEW:
ldist.moveInNew(key);
break;
case DistManager.MOVE_OLD:
ldist.moveInOld(key);
break;
default:
throw new Exception("SystemError when calling putForMove " + key);
}
super.add(c);
}
// Method moved to GLOBAL and TEAM operations
// @Override
// public void distSize(long[] result) {
// for (final Map.Entry<LongRange, Place> entry : ldist.dist.entrySet()) {
// final LongRange k = entry.getKey();
// final Place p = entry.getValue();
// result[manager.placeGroup.rank(p)] += k.size();
// }
// }
@Override
public RangedList<T> remove(final LongRange r) {
ldist.remove(r);
return super.remove(r);
}
@Deprecated
@Override
public RangedList<T> remove(final RangedList<T> c) {
ldist.remove(c.getRange());
return super.remove(c);
}
private void removeForMove(final LongRange r) {
if (super.remove(r) == null) {
throw new RuntimeException("DistCol#removeForMove");
}
}
/**
* Sets this instance's proxy generator.
*
* The proxy feature is used to prepare an element when access to an index that
* is not contained in the local range. Instead of throwing an
* {@link IndexOutOfBoundsException}, the value generated by the proxy will be
* used. It resembles `getOrDefault(key, defaultValue)`.
*
* @param proxyGenerator function that takes a {@link Long} index as parameter
* and returns a T
*/
public void setProxyGenerator(Function<Long, T> proxyGenerator) {
this.proxyGenerator = proxyGenerator;
}
@Override
public TeamOperations<T, DistCol<T>> team() {
return TEAM;
}
// TODO make private
private void team_updateDist() {
ldist.updateDist(manager.placeGroup);
}
@Override
public String toString() {
return getClass().getSimpleName() + ":" + id();
}
@Override
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = manager.placeGroup;
final GlobalID id1 = id();
return new LazyObjectReference<>(pg1, id1, () -> {
return new DistCol<>(pg1, id1);
});
}
}