DistMap.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.dist;

import static apgas.Constructs.*;

import java.io.ObjectStreamException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

import apgas.Constructs;
import apgas.Place;
import apgas.util.GlobalID;
import apgas.util.SerializableWithReplace;
import handist.collections.dist.util.IntLongPair;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
import handist.collections.function.Serializer;
import handist.collections.glb.DistMapGlb;
import mpjbuf.IllegalArgumentException;

/**
 * A Map data structure spread over the multiple places.
 *
 * @param <K> type of the key used in the {@link DistMap}
 * @param <V> type of the value mapped to each key in the {@link DistMap}
 */
public class DistMap<K, V>
        implements Map<K, V>, DistributedCollection<V, DistMap<K, V>>, KeyRelocatable<K>, SerializableWithReplace {

    // TODO
    /*
     * public void setupBranches(Generator<T,U> gen) { final DistMap<T,U> handle =
     * this; finish(()->{ placeGroup.broadcastFlat(()->{ gen.accept(here(), handle);
     * }); }); }
     */

    // TODO implements Relocatable

    private static int _debug_level = 5;

    /**
     * Implementation of the local Map collection
     */
    protected Map<K, V> data;
    /** Handle for GLB operations */
    public final DistMapGlb<K, V> GLB;
    public GlobalOperations<V, DistMap<K, V>> GLOBAL;

    final GlobalID id;

    public transient float[] locality;

    public final TeamedPlaceGroup placeGroup;

    private Function<K, V> proxyGenerator;

    protected final TeamOperations<V, DistMap<K, V>> TEAM;

    @SuppressWarnings("rawtypes")
    private DistCollectionSatellite satellite;

    private final MapEntryDispatcher<K, V> dispatcher;

    /**
     * Construct an empty DistMap which can have local handles on all the hosts in
     * the computation.
     */
    public DistMap() {
        this(TeamedPlaceGroup.world);
    }

    /**
     * Construct a DistMap which can have local handles on the hosts of the
     * specified {@link TeamedPlaceGroup}.
     *
     * @param pg the group of hosts that are susceptible to manipulate this
     *           {@link DistMap}
     */
    public DistMap(TeamedPlaceGroup pg) {
        this(pg, new GlobalID());
    }

    DistMap(TeamedPlaceGroup pg, GlobalID globalID) {
        this(pg, globalID, new HashMap<>());
    }

//	Method moved to TEAM and GLOBAL operations
//	@Override
//	public void distSize(long[] result) {
//		TeamedPlaceGroup pg = this.placeGroup;
//		long localSize = data.size(); // int->long
//		long[] sendbuf = new long[] { localSize };
//		// team.alltoall(tmpOverCounts, 0, overCounts, 0, 1);
//		try {
//			pg.comm.Allgather(sendbuf, 0, 1, MPI.LONG, result, 0, 1, MPI.LONG);
//		} catch (MPIException e) {
//			e.printStackTrace();
//			throw new Error("[DistMap] network error in balance()");
//		}
//	}

    /**
     * Package private DistMap constructor. This constructor is used to register a
     * new DistMap handle with the specified GlobalId. Programmers that use this
     * library should never have to call this constructor.
     * <p>
     * Specifying a GLobalId which already has object handles registered in other
     * places (potentially objects different from a {@link DistMap} instance) could
     * prove disastrous. Instead, programmers should only call {@link #DistMap()} to
     * create a distributed map with handles on all hosts, or
     * {@link #DistMap(TeamedPlaceGroup)} to restrict their DistMap to a subset of
     * hosts.
     *
     * @param pg       the palceGroup on which this DistMap is defined
     * @param globalId the global id associated to this distributed map
     * @param data     the data container to be used
     */
    DistMap(TeamedPlaceGroup pg, GlobalID globalId, Map<K, V> data) {
        placeGroup = pg;
        id = globalId;
        locality = new float[pg.size];
        Arrays.fill(locality, 1.0f);
        this.data = data;
        this.GLOBAL = new GlobalOperations<>(this, (TeamedPlaceGroup pg0, GlobalID gid) -> new DistMap<>(pg0, gid));
        GLB = new DistMapGlb<>(this);
        TEAM = new TeamOperations<>(this);
        dispatcher = new MapEntryDispatcher<>(this, null);
        id.putHere(this);
    }

    /**
     * Remove the all local entries.
     */
    @Override
    public void clear() {
        this.data.clear();
    }

    /**
     * Return true if the specified entry is exist in the local collection.
     *
     * @param key a key.
     * @return true is the specified object is a key present in the local map,
     */
    @Override
    public boolean containsKey(Object key) {
        return data.containsKey(key);
    }

    /**
     * Indicates if the provided value is contained in the local map.
     */
    @Override
    public boolean containsValue(Object value) {
        return data.containsValue(value);
    }

    boolean debugPrint() {
        return true;
    }

    /**
     * Removes the provided key from the local map, returns {@code true} if there
     * was a previous obejct mapped to this key, {@code false} if there were no
     * mapping with this key or if the mapping was a {@code null} object
     *
     * @param key the key to remove from this local map
     * @return true if a mapping was removed as a result of this operation, false
     *         otherwise
     */
    public boolean delete(K key) {
        final V result = data.remove(key);
        return (result != null);
    }

    /**
     * Return the Set of local entries.
     *
     * @return the Set of local entries.
     */
    @Override
    public Set<Map.Entry<K, V>> entrySet() {
        return data.entrySet();
    }

    /**
     * Apply the specified operation with each Key/Value pair contained in the local
     * collection.
     *
     * @param action the operation to perform
     */
    @Override
    public void forEach(BiConsumer<? super K, ? super V> action) {
        if (!data.isEmpty()) {
            data.forEach(action);
        }
    }

    @Override
    public void forEach(SerializableConsumer<V> action) {
        data.values().forEach(action);
    }

    private void forEachParallelBodyLocal(SerializableConsumer<V> action) {
        final List<Collection<V>> separated = separateLocalValues(Runtime.getRuntime().availableProcessors() * 2);
        for (final Collection<V> sub : separated) {
            async(() -> {
                sub.forEach(action);
            });
        }
    }

    /**
     * Helper method which separates the keys contained in the local map into even
     * batches for the number of threads available on the system and applies the
     * provided action on each key/value pair contained in the collection in
     * parallel
     *
     * @param action action to perform on the key/value pair contained in the map
     */
    private void forEachParallelKey(SerializableBiConsumer<? super K, ? super V> action) {
        final int batches = Runtime.getRuntime().availableProcessors();

        // Dispatch the existing keys into batches
        final List<Collection<K>> keys = new ArrayList<>(batches);
        for (int i = 0; i < batches; i++) {
            keys.add(new HashSet<>());
        }
        // Round-robin of keys into batches
        int i = 0;
        for (final K k : data.keySet()) {
            keys.get(i++).add(k);
            if (i >= batches) {
                i = 0;
            }
        }

        // Spawn asynchronous activity for each batch
        for (final Collection<K> keysToProcess : keys) {
            async(() -> {
                // Apply the supplied action on each key in the batch
                for (final K key : keysToProcess) {
                    final V value = data.get(key);
                    action.accept(key, value);
                }
            });
        }
    }

    /**
     * Return the element for the provided key. If there is no element at the index,
     * return null.
     *
     * When an agent generator is set on this instance and there is no element at
     * the index, a proxy value for the index is generated as a return value.
     *
     * @param key the index of the value to retrieve
     * @return the element associated with {@code key}.
     */
    @Override
    @SuppressWarnings("unchecked")
    public V get(Object key) {
        final V result = data.get(key);
        if (result != null) {
            return result;
        }
        if (proxyGenerator != null && !data.containsKey(key)) {
            return proxyGenerator.apply((K) key);
        } else {
            return null;
        }
    }

    @Override
    public Collection<K> getAllKeys() {
        return keySet();
    }

    /**
     * Returns a subset of the keys contained in the local map. If the specified
     * number of keys is greater than the number of keys actually contained in the
     * local map, the entire keyset is returned. If a nil or negative number of keys
     * is asked for, an empty collection is returned.
     *
     * @param count number of keys desired
     * @return a collection containing the specified number of keys, or less if the
     *         local map contains fewer keys than the specified parameter
     */
    private Collection<K> getNKeys(int count) {
        if (count <= 0) {
            return Collections.emptySet();
        }
        final ArrayList<K> keys = new ArrayList<>();
        for (final K key : data.keySet()) {
            keys.add(key);
            --count;
            if (count == 0) {
                return keys;
            }
        }
        return data.keySet();
    }

    /**
     * Return {@link MapEntryDispatcher} instance that enable fast relocation
     * between places than normal. One {@link DistMap} has one dispatcher.
     *
     * @param rule Determines the dispatch destination.
     * @return :
     */
    public MapEntryDispatcher<K, V> getObjectDispatcher(Distribution<K> rule) {
        dispatcher.setDistribution(rule);
        return dispatcher;
    }

    /**
     * Return {@link MapEntryDispatcher} instance that enable fast relocation
     * between places than normal. One {@link DistMap} has one dispatcher.
     *
     * @param rule Determines the dispatch destination.
     * @param pg   Relocate in this placegroup.
     * @return :
     * @throws IllegalArgumentException :
     */
    public MapEntryDispatcher<K, V> getObjectDispatcher(Distribution<K> rule, TeamedPlaceGroup pg)
            throws IllegalArgumentException {
        if (placeGroup.places.containsAll(pg.places)) {
            throw new IllegalArgumentException("The TeamedlaceGroup passed to DistMapDispatcher must be part of or "
                    + "the same as TeamedPlaceGroup in origin DistMap.");
        }
        return new MapEntryDispatcher<>(this, pg, rule);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <S extends DistCollectionSatellite<DistMap<K, V>, S>> S getSatellite() {
        return (S) satellite;
    }

    @Override
    public GlobalOperations<V, DistMap<K, V>> global() {
        return GLOBAL;
    }

    @Override
    public GlobalID id() {
        return id;
    }

    // TODO remove this method, we already have #putAll
    public void integrate(Map<K, V> src) {
        for (final Map.Entry<K, V> e : src.entrySet()) {
            put(e.getKey(), e.getValue());
        }
    }

    /**
     * Indicates if the local distributed map is empty or not
     *
     * @return {@code true} if there are no mappings in the local map
     */
    @Override
    public boolean isEmpty() {
        return data.isEmpty();
    }

    /**
     * Return the Set of local keys.
     *
     * @return the Set of local keys.
     */
    @Override
    public Set<K> keySet() {
        return data.keySet();
    }

    @Override
    public float[] locality() {
        return locality;
    }

    @Override
    public long longSize() {
        return data.size();
    }

    /**
     * Apply the same operation on the all elements including remote places and
     * creates a new {@link DistMap} with the same keys as this instance and the
     * result of the mapping operation as values.
     *
     * @param <W> result type of mapping operation
     * @param op  the map operation from type <code>V</code> to <code>W</code>
     * @return a DistMap from <code>K</code> to <code>W</code> built from applying
     *         the mapping operation on each element of this instance
     */
    public <W> DistMap<K, W> map(Function<V, W> op) {
        throw new Error("not supported yet");
        // TODO
        /*
         * return new DistMap<T,S>(placeGroup, team, () -> { val dst = new
         * HashMap<T,S>(); for (entry in entries()) { val key = entry.getKey(); val
         * value = entry.getValue(); dst(key) = op(value); } return dst; });
         */
    }

    @Override
    @SuppressWarnings("unchecked")
    public void moveAtSync(Collection<K> keys, Place pl, MoveManager mm) {
        if (pl.equals(Constructs.here())) {
            return;
        }
        final DistMap<K, V> collection = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final int size = keys.size();
            s.writeInt(size);
            for (final K key : keys) {
                final V value = collection.remove(key);
                s.writeObject(key);
                s.writeObject(value);
            }
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final int size = ds.readInt();
            for (int i = 1; i <= size; i++) {
                final K key = (K) ds.readObject();
                final V value = (V) ds.readObject();
                collection.putForMove(key, value);
            }
        };
        mm.request(pl, serialize, deserialize);
    }

    @Override
    public void moveAtSync(Distribution<K> dist, MoveManager mm) {
        final Function<K, Place> rule = (K key) -> {
            return dist.place(key);
        };
        moveAtSync(rule, mm);
    }

    public void moveAtSync(Function<K, Place> rule, MoveManager mm) {
        final DistMap<K, V> collection = this;
        final HashMap<Place, List<K>> keysToMove = new HashMap<>();
        collection.forEach((K key, V value) -> {
            final Place destination = rule.apply(key);
            if (!keysToMove.containsKey(destination)) {
                keysToMove.put(destination, new ArrayList<K>());
            }
            keysToMove.get(destination).add(key);
        });
        for (final Map.Entry<Place, List<K>> entry : keysToMove.entrySet()) {
            moveAtSync(entry.getValue(), entry.getKey(), mm);
        }
    }

    /**
     * Request that the specified element is relocated when #sync is called.
     *
     * @param key the key of the relocated entry.
     * @param pl  the destination place.
     * @param mm  MoveManagerLocal
     */
    @Override
    @SuppressWarnings("unchecked")
    public void moveAtSync(K key, Place pl, MoveManager mm) {
        if (pl.equals(Constructs.here())) {
            return;
        }
        final DistMap<K, V> toBranch = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final V value = this.remove(key);
            s.writeObject(key);
            s.writeObject(value);
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final K k = (K) ds.readObject();
            final V v = (V) ds.readObject();
            toBranch.putForMove(k, v);
        };
        mm.request(pl, serialize, deserialize);
    }

    /*
     * void teamedBalance() { LoadBalancer.MapBalancer<T, U> balance = new
     * LoadBalancer.MapBalancer<>(this.data, placeGroup); balance.execute();
     * if(debugPrint()) System.out.println(here() + " balance.check1"); clear();
     * if(debugPrint()) { System.out.println(here() + " balance.check2");
     * System.out.println(here() + " balance.ArrayList.size() : " + data.size()); }
     * long time = - System.nanoTime(); time += System.nanoTime(); if(debugPrint())
     * { // System.out.println(here() + " count : " + (count) + " ms"); //
     * System.out.println(here() + " put : " + (total/(1000000)) + " ms");
     * System.out.println(here() + " for : " + (time/(1000000)) + " ms");
     * System.out.println(here() + " data.size() : " + size());
     * System.out.println(here() + " balance.check3"); }
     *
     * }
     */

    @Override
    public void moveAtSyncCount(final ArrayList<IntLongPair> moveList, final MoveManager mm) throws Exception {
        for (final IntLongPair pair : moveList) {
            if (_debug_level > 5) {
                System.out.println("MOVE src: " + here() + " dest: " + pair.first + " size: " + pair.second);
            }
            if (pair.second > Integer.MAX_VALUE) {
                throw new Error("One place cannot receive so much elements: " + pair.second);
            }
            moveAtSyncCount((int) pair.second, placeGroup.get(pair.first), mm);
        }
    }

    // TODO different naming convention of balance methods with DistMap

    public void moveAtSyncCount(int count, Place dest, MoveManager mm) {
        if (count == 0) {
            return;
        }
        moveAtSync(getNKeys(count), dest, mm);
    }

    /**
     * Parallel version of {@link #forEach(BiConsumer)}
     *
     * @param action the action to perform on every key/value pair contained in this
     *               local map
     */
    public void parallelForEach(SerializableBiConsumer<? super K, ? super V> action) {
        finish(() -> {
            forEachParallelKey(action);
        });
    }

    @Override
    public void parallelForEach(SerializableConsumer<V> action) {
        parallelForEachLocal(action);
    }

    /*
     * Abstractovdef create(placeGroup: PlaceGroup, team: TeamOperations, init:
     * ()=>Map[T, U]){ // return new DistMap[T,U](placeGroup, init) as
     * AbstractDistCollection[Map[T,U]]; return null as
     * AbstractDistCollection[Map[T,U]]; }
     */
    /*
     * public def versioningMap(srcName : String){ // return new
     * BranchingManager[DistMap[T,U], Map[T,U]](srcName, this); return null as
     * BranchingManager[DistMap[T,U], Map[T,U]]; }
     */

    private void parallelForEachLocal(SerializableConsumer<V> action) {
        finish(() -> {
            forEachParallelBodyLocal(action);
        });
    }

    @Override
    public TeamedPlaceGroup placeGroup() {
        return placeGroup;
    }

    void printLocalData() {
        System.out.println(this);
    }

    /**
     * Put a new entry.
     *
     * @param key   the key of the new entry.
     * @param value the value of the new entry.
     * @return the previous value associated with {@code key}, or {@code null} if
     *         there was no mapping for {@code key}.(A {@code null} return can also
     *         indicate that the map previously associated {@code null} with
     *         {@code key}.)
     */
    @Override
    public V put(K key, V value) {
        return data.put(key, value);
    }

    /**
     * Adds all the mappings contained in the specified map into this local map.
     */
    @Override
    public void putAll(Map<? extends K, ? extends V> m) {
        data.putAll(m);
    }

    private V putForMove(K key, V value) {
        if (data.containsKey(key)) {
            throw new RuntimeException("DistMap cannot override existing entry: " + key);
        }
        return data.put(key, value);
    }

    /**
     * Reduce the all elements including other place using the given operation.
     *
     * @param <S>  type of the result produced by the reduction operation
     * @param lop  the operation using in the local reduction.
     * @param gop  the operation using in the reduction of the results of the local
     *             reduction.
     * @param unit the zero value of the reduction.
     * @return the result of the reduction.
     */
    public <S> S reduce(BiFunction<S, V, S> lop, BiFunction<S, S, S> gop, S unit) {
        // TODO
        throw new Error("Not implemented yet.");
        /*
         * val reducer = new Reducible[S]() { public def zero() = unit; public operator
         * this(a: S, b: S) = gop(a, b); }; return finish (reducer) {
         * placeGroup.broadcastFlat(() => { offer(reduceLocal(lop, unit)); }); };
         */

    }

    /**
     * Reduce the all elements including other place using the given operation.
     *
     * @param op   the operation.
     * @param unit the neutral element of the reduction.
     * @return the result of the reduction.
     */
    public V reduce(BiFunction<V, V, V> op, V unit) {
        return reduce(op, op, unit);
    }

    /**
     * Reduce the all local elements using the given operation.
     *
     * @param <S>  type of the result produced by the reduction operation
     * @param op   the operation used in the reduction
     * @param unit the neutral element of the reduction operation
     * @return the result of the reduction
     */
    public <S> S reduceLocal(BiFunction<S, V, S> op, S unit) {
        // TODO may be build-in method for Map
        S accum = unit;
        for (final Map.Entry<K, V> entry : data.entrySet()) {
            accum = op.apply(accum, entry.getValue());
        }
        return accum;
    }

    public void relocate(Distribution<K> rule) throws Exception {
        relocate(rule, new CollectiveMoveManager(placeGroup));
    }

    public void relocate(Distribution<K> rule, CollectiveMoveManager mm) throws Exception {
        for (final K key : data.keySet()) {
            final Place place = rule.place(key);
            // TODO
            // if(place==null) throw SomeException();
            moveAtSync(key, place, mm);
        }
        mm.sync();
    }

    public void relocate(Function<K, Place> rule) throws Exception {
        relocate(rule, new CollectiveMoveManager(placeGroup));
    }

    public void relocate(Function<K, Place> rule, CollectiveMoveManager mm) throws Exception {
        for (final K key : data.keySet()) {
            final Place place = rule.apply(key);
            moveAtSync(key, place, mm);
        }
        mm.sync();
    }

    /**
     * Remove the entry corresponding to the specified key in the local map.
     *
     * @param key the key corresponding to the value.
     * @return the previous value associated with the key, or {@code null} if there
     *         was no existing mapping (or the key was mapped to {@code null})
     */
    @Override
    public V remove(Object key) {
        return data.remove(key);
    }

    private List<Collection<V>> separateLocalValues(int n) {
        final List<Collection<V>> result = new ArrayList<>(n);
        final long totalNum = size();
        final long rem = totalNum % n;
        final long quo = totalNum / n;
        if (data.isEmpty()) {
            return result;
        }
        final Iterator<V> it = data.values().iterator();
        List<V> list = new ArrayList<>();
        for (long i = 0; i < n; i++) {
            list = new ArrayList<>();
            final long count = quo + ((i < rem) ? 1 : 0);
            for (long j = 0; j < count; j++) {
                if (it.hasNext()) {
                    list.add(it.next());
                }
            }
            result.add(list);
        }
        return result;
    }

    /**
     * Sets the proxy generator for this instance.
     * <p>
     * The proxy will be used to generate values when accesses to a key not
     * contained in this instance is made. Instead of throwing an exception, the
     * proxy will be called with the attempted index and the program will continue
     * with the value returned by the proxy.
     * <p>
     * This feature is similar to {@link Map#getOrDefault(Object, Object)}
     * operation, the difference being that instead of returning a predetermined
     * default value, the provided function is called with the key.
     *
     * @param proxy function which takes a key "K" as parameter and returns a "V",
     *              or {@code null} to remove any previously set proxy
     */
    public void setProxyGenerator(Function<K, V> proxy) {
        proxyGenerator = proxy;
    }

    @Override
    public <S extends DistCollectionSatellite<DistMap<K, V>, S>> void setSatellite(S s) {
        satellite = s;
    }

    /**
     * Return the number of local entries.
     *
     * @return the number of the local entries.
     */
    @Override
    public int size() {
        return data.size();
    }

    @Override
    public TeamOperations<V, DistMap<K, V>> team() {
        return TEAM;
    }

    @Override
    public String toString() {
        final StringWriter out0 = new StringWriter();
        final PrintWriter out = new PrintWriter(out0);
        out.println("at " + here());
        for (final Map.Entry<K, V> e : data.entrySet()) {
            out.println("key : " + e.getKey() + ", value : " + e.getValue());
        }
        out.close();
        return out0.toString();
    }

    /**
     * Returns all the values of this local map in a collection.
     */
    @Override
    public Collection<V> values() {
        return data.values();
    }

    @Override
    public Object writeReplace() throws ObjectStreamException {
        final TeamedPlaceGroup pg1 = placeGroup;
        final GlobalID id1 = id;
        return new LazyObjectReference<>(pg1, id1, () -> {
            return new DistMap<>(pg1, id1);
        });
    }

}