ElementLocationManager.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.dist;

import static apgas.Constructs.*;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;

import apgas.Place;
import handist.collections.ElementOverlapException;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializerUsingPlace;
import handist.collections.function.Serializer;

/**
 * This class manages the elements of a distributed collection in a
 * difference-basis. It is meant to be used as a member on each handle of a
 * distributed collection whose distribution needs to be tracked.
 *
 * @param <T> the type of the object used as index or key to identify entries in
 *            the collection whose distribution is being tracked.
 */
class ElementLocationManager<T> {

    /**
     * Error class used when inconsistencies in the distribution attributable to a
     * forbidden operation made by the user are encountered.
     */
    static class ParameterErrorException extends RuntimeException {
        /** Serial Version UID */
        private static final long serialVersionUID = -9038636040813564069L;
        /** Integer code describing the cause of the problem */
        public int reason;

        ParameterErrorException(int reason, String msg) {
            super(msg);
            this.reason = reason;
        }
    };

    /**
     * Error class used when an internal error occurred in the
     * ElementLocationManager. Assuming that direct modifications to members
     * {@link #diff} and {@link #dist} are not made and that this class'
     * implementation is correct, this error should never be thrown.
     */
    static class SystemError extends Error {
        /** Serial Version UID */
        private static final long serialVersionUID = 4466816572543219426L;
        /** Integer Code describing the nature of the problem */
        public int reason;

        SystemError(int reason, String msg) {
            super(msg);
            this.reason = reason;
        }
    }

    /** Code used when a key is added to the local handle of the collection */
    static final int DIST_ADDED = 1;
    /** Code for key received from remote place to this local handle */
    static final int DIST_MOVED_IN = 4;
    /** Code used when a key is removed from the local handle of the collection */
    static final int DIST_REMOVED = 2;

    /**
     * Code used to describe that a newly added entry to the local collection (code
     * {@link #DIST_ADDED} in {@link #diff}) was relocated to a remote host
     */
    static final byte MOVE_NEW = 1;
    /**
     * Code used to describe that an entry that was previously relocated to some
     * place(s) has now returned to its original location.
     */
    static final byte MOVE_NONE = 0;
    /**
     * Code used to describe that an entry already known to all local handles was
     * relocated
     */
    static final byte MOVE_OLD = 2;

    /**
     * Changes since that last time updateDist was called that will need to be
     * notified to remote places. There may be other changes that occurred on remote
     * places that this local handle is not yet aware of.
     */
    ConcurrentHashMap<T, Integer> diff = new ConcurrentHashMap<>();
    /** Current knowledge of the key-holding information on local & remote places */
    ConcurrentHashMap<T, Place> dist = new ConcurrentHashMap<>();

    HashSet<T> importedDiffKeys = new HashSet<>();

    /**
     * WeakHashMap used to contain the distribution which need to receive updates
     * when changes to the distribution are made.
     * <h2>Implementation notes</h2>
     * <p>
     * A {@link WeakHashMap} is used with "null" values is used so that the
     * distribution objects used as keys in this collection can be garbage-collected
     * if they are no longer in use in other parts of the program, as represented in
     * {@link ElementLocationManageable#registerDistribution(UpdatableDistribution)}.
     * Refer to the Java documentation of {@link WeakHashMap} for further details.
     */
    protected Map<UpdatableDistribution<T>, Object> registeredDistribution = new WeakHashMap<>();

    /**
     * Registers the fact that a new key / entry was added to the local collection.
     *
     * @param key the object which identifies the newly added entry to the
     *            collection
     * @throws ElementOverlapException if the added key is known to be present on a
     *                                 remote host
     */
    void add(T key) throws ElementOverlapException {
        if (distHasKey(key)) {
            if (distIsLocal(key)) {
                if (diffHasKey(key)) {
                    if (diffOfKeyIs(key, (DIST_ADDED | DIST_MOVED_IN))) {
                        reject("add", 103, key);
                    } else {
                        systemError("add", 104, key);
                    }
                } else {
                    reject("add", 102, key);
                }
            } else {
                reject("add", 105, key);
            }
        } else {
            if (diffHasKey(key)) {
                if (diffOfKeyIs(key, DIST_REMOVED)) {
                    diff.remove(key);
                    dist.put(key, here());
                    registeredDistributionUpdate(key, here());
                } else {
                    systemError("add", 101, key);
                }
            } else {
                diff.put(key, DIST_ADDED);
                dist.put(key, here());
                registeredDistributionUpdate(key, here());
            }
        }
    }

    /**
     * This method is part of the procedure needed to update the local knwoledge of
     * the global distribution. A call to this method updates the contents of the
     * local distribution knowledge with the knowledge received from remote places.
     * Checks are made to make sure that the received individual change is
     * consistent with information obtained from other hosts using member
     * {@link #importedDiffKeys}.
     *
     *
     * @param key       the key about which some information was received
     * @param operation the operation that the remote operation registered about
     *                  this key
     * @param from      the place from which the present information came from
     * @throws Exception if inconsistencies are detected, such as a remote place
     *                   adding an entry already owned by another place ("duplicate
     *                   key" case)
     * @see #update(TeamedPlaceGroup)
     */
    void applyDiff(T key, int operation, Place from) throws Exception {
        // System.out.println("[" + here.id + "] applyDiff " + key + " op: " + operation
        // + " from: " + from.id);
        if (importedDiffKeys.contains(key) || diff.containsKey(key)) {
            reject("applyDiff with duplicate key ", operation, key);
        } else {
            importedDiffKeys.add(key);
            if ((operation & (DIST_ADDED | DIST_MOVED_IN)) != 0) {
                dist.put(key, from);
                registeredDistributionUpdate(key, from);
            } else {
                // operation == DIST_REMOVED
                dist.remove(key);
                registeredDistributionRemove(key);
            }
        }
    }

    /**
     * Removes all distribution knowledge of this local instance. This method should
     * only be called when all entries in the local handle of the distributed
     * collection are removed.
     */
    void clear() {
        dist.clear();
        diff.clear();
        // FIXME this is not correct.
        // In diff, the fact that all local entries have been removed needs to be
        // recorded.
        // Also, the registered distributions need to be updated accordingly.
    }

    protected boolean diffHasKey(T key) {
        return diff.containsKey(key);
    }

    protected boolean diffOfKeyIs(T key, int operation) {
        return (diff.get(key) & operation) != 0;
    }

    protected boolean distHasKey(T key) {
        return dist.containsKey(key);
    }

    protected boolean distIsLocal(T key) {
        return dist.get(key) == here();
    }

    protected void moveInNew(T key) throws Exception {
        // System.out.println(">>> moveInNew " + key + " distHasKey: " + distHasKey(key)
        // + " diffHasKey: " + diffHasKey(key));

        if (distHasKey(key)) {
            if (distIsLocal(key)) {
                if (diffHasKey(key) && diffOfKeyIs(key, DIST_ADDED)) {
                    reject("moveInNew", 402, key);
                } else {
                    systemError("moveInNew", 403, key);
                }
            } else {
                // !distIsLocal(key)
                if (diffHasKey(key)) {
                    systemError("moveInNew", 404, key);
                } else {
                    // System.out.println(">>> AAA");
                    diff.put(key, DIST_ADDED);
                    dist.put(key, here());
                    registeredDistributionUpdate(key, here());
                }
            }
        } else {
            // !distHasKey(key)
            if (diffHasKey(key)) {
                systemError("moveInNew", 401, key);
            } else {
                // System.out.println(">>> BBB");
                diff.put(key, DIST_ADDED);
                dist.put(key, here());
                registeredDistributionUpdate(key, here());
            }
        }
    }

    void moveInOld(T key) throws Exception {
        if (distHasKey(key)) {
            if (distIsLocal(key)) {
                systemError("moveInOld", 406, key);
            } else {
                // !distIsLocal(key)
                if (diffHasKey(key)) {
                    systemError("moveInOld", 407, key);
                } else {
                    diff.put(key, DIST_MOVED_IN);
                    dist.put(key, here());
                    registeredDistributionUpdate(key, here());
                }
            }
        } else {
            // !distHasKey(key)
            systemError("moveInOld", 405, key);
        }
    }

    byte moveOut(T key, Place dest) {
        if (distHasKey(key)) {
            // System.out.println(">>> distHasKey");
            if (distIsLocal(key)) {
                if (diffHasKey(key)) {
                    if (diffOfKeyIs(key, DIST_ADDED)) {
                        diff.remove(key);
                        dist.remove(key);
                        registeredDistributionRemove(key);
                        return MOVE_NEW;
                    } else if (diffOfKeyIs(key, DIST_MOVED_IN)) {
                        diff.remove(key);
                        dist.put(key, dest);
                        registeredDistributionUpdate(key, dest);
                        return MOVE_OLD;
                    } else {
                        systemError("moveOut", 804, key);
                    }
                } else {
                    dist.put(key, dest);
                    registeredDistributionUpdate(key, dest);
                    return MOVE_OLD;
                }
            } else {
                // !distIsLocal(key)
                reject("moveOut", 805, key);
            }
        } else {
            // !distHasKey(key)
            // System.out.println(">>> !distHasKey");
            if (diffHasKey(key)) {
                // System.out.println(">>> diffHasKey");
                if (diffOfKeyIs(key, DIST_REMOVED)) {
                    reject("moveOut", 802, key);
                } else {
                    systemError("moveOut", 803, key);
                }
            } else {
                // System.out.println(">>> !diffHasKey");
                reject("moveOut", 801, key);
            }
        }
        // System.out.println(">>> MOVE_NONE");
        return MOVE_NONE;
    }

    /**
     * Registers a distribution that will be actively updated by the
     * {@link ElementLocationManageable} from now on. The registered distribution
     * will receive updates until it is garbage-collected as it is internally kept
     * in a "weak" collection.
     * <p>
     * The contents of the given distribution are updated with the current knowledge
     * of this class when this method is called.
     * <p>
     * It is best to avoid inserting and removing entries of a distributed
     * collection when calling this method. This may cause some inconsistencies
     * between the information held by this instance and the information contained
     * in the distribution given as parameter.
     *
     * @param distributionToUpdate distribution into which changes in distribution
     *                             managed by this object need to be reflected
     */
    void registerDistribution(UpdatableDistribution<T> distributionToUpdate) {
        // Update the contents with the current knowledge of the situation
        for (final Map.Entry<T, Place> distEntry : dist.entrySet()) {
            distributionToUpdate.updateLocation(distEntry.getKey(), distEntry.getValue());
        }

        // Register the distribution to reflect the changes from now on
        registeredDistribution.put(distributionToUpdate, null);
    }

    /**
     * Sub-routine used to update the registered distributions with the fact that a
     * key has been removed from the collection
     *
     * @param key the key removed from the collection
     */
    protected void registeredDistributionRemove(T key) {
        for (final UpdatableDistribution<T> distribution : registeredDistribution.keySet()) {
            distribution.removeLocation(key);
        }
    }

    /**
     * Sub-routine used to update the location of an entry into all the registered
     * distribution contained
     *
     * @param key      the key to update
     * @param location the location of the specified key
     */
    protected void registeredDistributionUpdate(T key, Place location) {
        for (final UpdatableDistribution<T> distribution : registeredDistribution.keySet()) {
            distribution.updateLocation(key, location);
        }
    }

    void reject(String method, int reason, T key) throws ParameterErrorException {
        final String msg = "[" + here() + "] Error when calling " + method + " " + key + " on code " + reason;
        System.err.println(msg);
        throw new ParameterErrorException(reason, msg);
    }

    void remove(T key) {
        if (distHasKey(key)) {
            if (distIsLocal(key)) {
                if (diffHasKey(key)) {
                    // System.out.println("[" + here.id + "] remove key " + key);
                    if (diffOfKeyIs(key, DIST_ADDED)) {
                        diff.remove(key);
                        dist.remove(key);
                        registeredDistributionRemove(key);
                    } else if (diffOfKeyIs(key, DIST_MOVED_IN)) {
                        diff.put(key, DIST_REMOVED);
                        dist.remove(key);
                        registeredDistributionRemove(key);
                    } else {
                        systemError("remove", 202, key);
                    }
                } else {
                    diff.put(key, DIST_REMOVED);
                    dist.remove(key);
                    registeredDistributionRemove(key);
                }
            } else {
                // !distIsLocal(key)
                reject("remove", 203, key);
            }
        } else {
            // !distHasKey(key)
            reject("remove", 201, key);
        }
    }

    void setup(Collection<T> keys) {
        assert (keys.isEmpty());
        try {
            for (final T k : keys) {
                add(k);
            }
        } catch (final Exception e) {
            throw new RuntimeException("[DistManager] Duplicate key in " + keys);
        }
    }

    void systemError(String method, int reason, T key) throws SystemError {
        final String msg = "[" + here() + "] System Error when calling " + method + " " + key + " on code " + reason;
        System.err.println(msg);
        if (reason > 0) {
            throw new SystemError(reason, msg);
        }
        throw new SystemError(reason, msg);
    }

    @Override
    public String toString() {
        return "[DistManager] + dist: " + dist + ",  diff: " + diff + ", imported: " + importedDiffKeys + "-----";
    }

    @SuppressWarnings("unchecked")
    void update(TeamedPlaceGroup pg) {
        final Serializer serProcess = (ObjectOutput s) -> {
            s.writeObject(diff);
        };
        final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place from) -> {
            final Map<T, Integer> importedDiff = (Map<T, Integer>) ds.readObject();
            for (final Map.Entry<T, Integer> entry : importedDiff.entrySet()) {
                final T k = entry.getKey();
                final Integer v = entry.getValue();
                applyDiff(k, v, from);
            }
        };
        new CollectiveRelocator.Allgather(pg).request(serProcess, desProcess).execute();
        importedDiffKeys.clear();
        diff.clear();
    }

}