DistManager.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.dist;
import static apgas.Constructs.*;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import apgas.Place;
import handist.collections.ElementOverlapException;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializerUsingPlace;
import handist.collections.function.Serializer;
public class DistManager<T> {
static class ParameterErrorException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = -9038636040813564069L;
public int reason;
ParameterErrorException(int reason, String msg) {
super(msg);
this.reason = reason;
}
};
/*
* public static class Range extends DistManager<LongRange> { }
*/
static class SystemError extends Error {
/**
*
*/
private static final long serialVersionUID = 4466816572543219426L;
public int reason;
SystemError(int reason, String msg) {
super(msg);
this.reason = reason;
}
}
public static final int DIST_ADDED = 1;
/** Code for key received from remote place to this local handle */
public static final int DIST_MOVED_IN = 4;
public static final int DIST_REMOVED = 2;
public static final byte MOVE_NEW = 1; // New range created on local handle
public static final byte MOVE_NONE = 0;
public static final byte MOVE_OLD = 2;
/**
* Changes since that last time updateDist was called that will need to be
* notified to remote places. There may be other changes that occurred on remote
* places that this local handle is not yet aware of.
*/
public ConcurrentHashMap<T, Integer> diff = new ConcurrentHashMap<>();
/** Current knowledge of the key-holding information on local & remote places */
public ConcurrentHashMap<T, Place> dist = new ConcurrentHashMap<>();
HashSet<T> importedDiffKeys = new HashSet<>();
public void add(T key) throws ElementOverlapException {
if (distHasKey(key)) {
if (distIsLocal(key)) {
if (diffHasKey(key)) {
if (diffOfKeyIs(key, (DIST_ADDED | DIST_MOVED_IN))) {
reject("add", 103, key);
} else {
systemError("add", 104, key);
}
} else {
reject("add", 102, key);
}
} else {
// !distIsLocal(key)
reject("add", 105, key);
}
} else {
if (diffHasKey(key)) {
if (diffOfKeyIs(key, DIST_REMOVED)) {
diff.remove(key);
dist.put(key, here());
} else {
systemError("add", 101, key);
}
} else {
diff.put(key, DIST_ADDED);
dist.put(key, here());
}
}
}
void applyDiff(T key, int operation, Place from) throws Exception {
// System.out.println("[" + here.id + "] applyDiff " + key + " op: " + operation
// + " from: " + from.id);
if (importedDiffKeys.contains(key) || diff.containsKey(key)) {
reject("applyDiff with duplicate key ", operation, key);
} else {
importedDiffKeys.add(key);
if ((operation & (DIST_ADDED | DIST_MOVED_IN)) != 0) {
dist.put(key, from);
} else {
// operation == DIST_REMOVED
dist.remove(key);
}
}
}
public void clear() {
dist.clear();
diff.clear();
}
public boolean diffHasKey(T key) {
return diff.containsKey(key);
}
public boolean diffOfKeyIs(T key, int operation) {
return (diff.get(key) & operation) != 0;
}
public boolean distHasKey(T key) {
return dist.containsKey(key);
}
public boolean distIsLocal(T key) {
return dist.get(key) == here();
}
public void moveInNew(T key) throws Exception {
// System.out.println(">>> moveInNew " + key + " distHasKey: " + distHasKey(key)
// + " diffHasKey: " + diffHasKey(key));
if (distHasKey(key)) {
if (distIsLocal(key)) {
if (diffHasKey(key) && diffOfKeyIs(key, DIST_ADDED)) {
reject("moveInNew", 402, key);
} else {
systemError("moveInNew", 403, key);
}
} else {
// !distIsLocal(key)
if (diffHasKey(key)) {
systemError("moveInNew", 404, key);
} else {
// System.out.println(">>> AAA");
diff.put(key, DIST_ADDED);
dist.put(key, here());
}
}
} else {
// !distHasKey(key)
if (diffHasKey(key)) {
systemError("moveInNew", 401, key);
} else {
// System.out.println(">>> BBB");
diff.put(key, DIST_ADDED);
dist.put(key, here());
}
}
}
public void moveInOld(T key) throws Exception {
if (distHasKey(key)) {
if (distIsLocal(key)) {
systemError("moveInOld", 406, key);
} else {
// !distIsLocal(key)
if (diffHasKey(key)) {
systemError("moveInOld", 407, key);
} else {
diff.put(key, DIST_MOVED_IN);
dist.put(key, here());
}
}
} else {
// !distHasKey(key)
systemError("moveInOld", 405, key);
}
}
public byte moveOut(T key, Place dest) {
if (distHasKey(key)) {
// System.out.println(">>> distHasKey");
if (distIsLocal(key)) {
if (diffHasKey(key)) {
if (diffOfKeyIs(key, DIST_ADDED)) {
diff.remove(key);
dist.remove(key);
return MOVE_NEW;
} else if (diffOfKeyIs(key, DIST_MOVED_IN)) {
diff.remove(key);
dist.put(key, dest);
return MOVE_OLD;
} else {
systemError("moveOut", 804, key);
}
} else {
dist.put(key, dest);
return MOVE_OLD;
}
} else {
// !distIsLocal(key)
reject("moveOut", 805, key);
}
} else {
// !distHasKey(key)
// System.out.println(">>> !distHasKey");
if (diffHasKey(key)) {
// System.out.println(">>> diffHasKey");
if (diffOfKeyIs(key, DIST_REMOVED)) {
reject("moveOut", 802, key);
} else {
systemError("moveOut", 803, key);
}
} else {
// System.out.println(">>> !diffHasKey");
reject("moveOut", 801, key);
}
}
// System.out.println(">>> MOVE_NONE");
return MOVE_NONE;
}
void reject(String method, int reason, T key) throws ParameterErrorException {
final String msg = "[" + here() + "] Error when calling " + method + " " + key + " on code " + reason;
System.err.println(msg);
if (reason > 0) {
throw new ParameterErrorException(reason, msg);
}
throw new ParameterErrorException(reason, msg);
}
public void remove(T key) {
if (distHasKey(key)) {
if (distIsLocal(key)) {
if (diffHasKey(key)) {
// System.out.println("[" + here.id + "] remove key " + key);
if (diffOfKeyIs(key, DIST_ADDED)) {
diff.remove(key);
dist.remove(key);
} else if (diffOfKeyIs(key, DIST_MOVED_IN)) {
diff.put(key, DIST_REMOVED);
dist.remove(key);
} else {
systemError("remove", 202, key);
}
} else {
diff.put(key, DIST_REMOVED);
dist.remove(key);
}
} else {
// !distIsLocal(key)
reject("remove", 203, key);
}
} else {
// !distHasKey(key)
reject("remove", 201, key);
}
}
void setup(Collection<T> keys) {
assert (keys.isEmpty());
try {
for (final T k : keys) {
add(k);
}
} catch (final Exception e) {
throw new RuntimeException("[DistManager] Duplicate key in " + keys);
}
}
void systemError(String method, int reason, T key) throws SystemError {
final String msg = "[" + here() + "] System Error when calling " + method + " " + key + " on code " + reason;
System.err.println(msg);
if (reason > 0) {
throw new SystemError(reason, msg);
}
throw new SystemError(reason, msg);
}
@Override
public String toString() {
return "[DistManager] + dist: " + dist + ", diff: " + diff + ", imported: " + importedDiffKeys + "-----";
}
@SuppressWarnings("unchecked")
void updateDist(TeamedPlaceGroup pg) {
final Serializer serProcess = (ObjectOutput s) -> {
s.writeObject(diff);
};
final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place from) -> {
final Map<T, Integer> importedDiff = (Map<T, Integer>) ds.readObject();
for (final Map.Entry<T, Integer> entry : importedDiff.entrySet()) {
final T k = entry.getKey();
final Integer v = entry.getValue();
applyDiff(k, v, from);
}
};
CollectiveRelocator.allgatherSer(pg, serProcess, desProcess);
importedDiffKeys.clear();
diff.clear();
}
}