DistMap.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.dist;

import static apgas.Constructs.*;

import java.io.ObjectStreamException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

import apgas.Constructs;
import apgas.Place;
import apgas.util.GlobalID;
import apgas.util.SerializableWithReplace;
import handist.collections.ParallelMap;
import handist.collections.dist.util.IntLongPair;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;
import handist.collections.glb.DistMapGlb;
import mpjbuf.IllegalArgumentException;

/**
 * A Map data structure spread over the multiple places.
 *
 * @param <K> type of the key used in the {@link DistMap}
 * @param <V> type of the value mapped to each key in the {@link DistMap}
 */
public class DistMap<K, V> extends ParallelMap<K, V>
        implements DistributedCollection<V, DistMap<K, V>>, KeyRelocatable<K>, SerializableWithReplace {

    // TODO
//     public <T, U> void setupBranches(Generator<T,U> gen) {
//         final DistMap<T,U> handle = this;
//         finish(()->{
//             placeGroup.broadcastFlat(()->{
//                 gen.accept(here(), handle);
//             });
//         });
//     }

//  Method moved to TEAM and GLOBAL operations
//  @Override
//  public void distSize(long[] result) {
//      TeamedPlaceGroup pg = this.placeGroup;
//      long localSize = data.size(); // int->long
//      long[] sendbuf = new long[] { localSize };
//      // team.alltoall(tmpOverCounts, 0, overCounts, 0, 1);
//      try {
//          pg.comm.Allgather(sendbuf, 0, 1, MPI.LONG, result, 0, 1, MPI.LONG);
//      } catch (MPIException e) {
//          e.printStackTrace();
//          throw new Error("[DistMap] network error in balance()");
//      }
//  }

    private static int _debug_level = 5;

    /** Handle for GLB operations */
    public final DistMapGlb<K, V> GLB;
    public GlobalOperations<V, DistMap<K, V>> GLOBAL;

    final GlobalID id;

    public transient float[] locality;

    public final TeamedPlaceGroup placeGroup;

    protected final TeamOperations<V, DistMap<K, V>> TEAM;

    @SuppressWarnings("rawtypes")
    private DistCollectionSatellite satellite;

    /**
     * Construct an empty DistMap which can have local handles on all the hosts in
     * the computation.
     */
    public DistMap() {
        this(TeamedPlaceGroup.world);
    }

    /**
     * Construct a DistMap which can have local handles on the hosts of the
     * specified {@link TeamedPlaceGroup}.
     *
     * @param pg the group of hosts that are susceptible to manipulate this
     *           {@link DistMap}
     */
    public DistMap(TeamedPlaceGroup pg) {
        this(pg, new GlobalID());
    }

    DistMap(TeamedPlaceGroup pg, GlobalID globalID) {
        this(pg, globalID, new HashMap<>());
    }

    /**
     * Package private DistMap constructor. This constructor is used to register a
     * new DistMap handle with the specified GlobalId. Programmers that use this
     * library should never have to call this constructor.
     * <p>
     * Specifying a GLobalId which already has object handles registered in other
     * places (potentially objects different from a {@link DistMap} instance) could
     * prove disastrous. Instead, programmers should only call {@link #DistMap()} to
     * create a distributed map with handles on all hosts, or
     * {@link #DistMap(TeamedPlaceGroup)} to restrict their DistMap to a subset of
     * hosts.
     *
     * @param pg       the palceGroup on which this DistMap is defined
     * @param globalId the global id associated to this distributed map
     * @param data     the data container to be used
     */
    DistMap(TeamedPlaceGroup pg, GlobalID globalId, Map<K, V> data) {
        super(data);
        placeGroup = pg;
        id = globalId;
        locality = new float[pg.size];
        Arrays.fill(locality, 1.0f);
        this.GLOBAL = new GlobalOperations<>(this, (TeamedPlaceGroup pg0, GlobalID gid) -> new DistMap<>(pg0, gid));
        GLB = new DistMapGlb<>(this);
        TEAM = new TeamOperations<>(this);
        id.putHere(this);
    }

    @Override
    public Collection<K> getAllKeys() {
        return keySet();
    }

    /**
     * Returns a subset of the keys contained in the local map. If the specified
     * number of keys is greater than the number of keys actually contained in the
     * local map, the entire keyset is returned. If a nil or negative number of keys
     * is asked for, an empty collection is returned.
     *
     * @param count number of keys desired
     * @return a collection containing the specified number of keys, or less if the
     *         local map contains fewer keys than the specified parameter
     */
    private Collection<K> getNKeys(int count) {
        if (count <= 0) {
            return Collections.emptySet();
        }
        final ArrayList<K> keys = new ArrayList<>();
        for (final K key : data.keySet()) {
            keys.add(key);
            --count;
            if (count == 0) {
                return keys;
            }
        }
        return data.keySet();
    }

    /**
     * Return new {@link MapEntryDispatcher} instance that enable fast relocation
     * between places than normal.
     *
     * @param rule Determines the dispatch destination.
     * @return :
     */
    public MapEntryDispatcher<K, V> getObjectDispatcher(Distribution<K> rule) {
        return new MapEntryDispatcher<>(this, placeGroup(), rule);
    }

    /**
     * Return new {@link MapEntryDispatcher} instance that enable fast relocation
     * between places than normal.
     *
     * @param rule Determines the dispatch destination.
     * @param pg   Relocate in this placegroup.
     * @return :
     * @throws IllegalArgumentException :
     */
    public MapEntryDispatcher<K, V> getObjectDispatcher(Distribution<K> rule, TeamedPlaceGroup pg)
            throws IllegalArgumentException {
        if (placeGroup.places.containsAll(pg.places)) {
            throw new IllegalArgumentException("The TeamedlaceGroup passed to DistMapDispatcher must be part of or "
                    + "the same as TeamedPlaceGroup in origin DistMap.");
        }
        return new MapEntryDispatcher<>(this, pg, rule);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <S extends DistCollectionSatellite<DistMap<K, V>, S>> S getSatellite() {
        return (S) satellite;
    }

    @Override
    public GlobalOperations<V, DistMap<K, V>> global() {
        return GLOBAL;
    }

    @Override
    public GlobalID id() {
        return id;
    }

    @Override
    public float[] locality() {
        return locality;
    }

    @Override
    public long longSize() {
        return data.size();
    }

    @Override
    @SuppressWarnings("unchecked")
    public void moveAtSync(Collection<K> keys, Place pl, MoveManager mm) {
        if (pl.equals(Constructs.here())) {
            return;
        }
        final DistMap<K, V> collection = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final int size = keys.size();
            s.writeInt(size);
            for (final K key : keys) {
                final V value = collection.remove(key);
                s.writeObject(key);
                s.writeObject(value);
            }
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final int size = ds.readInt();
            for (int i = 1; i <= size; i++) {
                final K key = (K) ds.readObject();
                final V value = (V) ds.readObject();
                collection.putForMove(key, value);
            }
        };
        mm.request(pl, serialize, deserialize);
    }

    @Override
    public void moveAtSync(Function<K, Place> rule, MoveManager mm) {
        final DistMap<K, V> collection = this;
        final HashMap<Place, List<K>> keysToMove = new HashMap<>();
        collection.forEach((K key, V value) -> {
            final Place destination = rule.apply(key);
            if (!keysToMove.containsKey(destination)) {
                keysToMove.put(destination, new ArrayList<K>());
            }
            keysToMove.get(destination).add(key);
        });
        for (final Map.Entry<Place, List<K>> entry : keysToMove.entrySet()) {
            moveAtSync(entry.getValue(), entry.getKey(), mm);
        }
    }

    /**
     * Request that the specified element is relocated when #sync is called.
     *
     * @param key the key of the relocated entry.
     * @param pl  the destination place.
     * @param mm  MoveManagerLocal
     */
    @Override
    @SuppressWarnings("unchecked")
    public void moveAtSync(K key, Place pl, MoveManager mm) {
        if (pl.equals(Constructs.here())) {
            return;
        }
        final DistMap<K, V> toBranch = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final V value = this.remove(key);
            s.writeObject(key);
            s.writeObject(value);
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final K k = (K) ds.readObject();
            final V v = (V) ds.readObject();
            toBranch.putForMove(k, v);
        };
        mm.request(pl, serialize, deserialize);
    }

    /*
     * void teamedBalance() { LoadBalancer.MapBalancer<T, U> balance = new
     * LoadBalancer.MapBalancer<>(this.data, placeGroup); balance.execute();
     * if(debugPrint()) System.out.println(here() + " balance.check1"); clear();
     * if(debugPrint()) { System.out.println(here() + " balance.check2");
     * System.out.println(here() + " balance.ArrayList.size() : " + data.size()); }
     * long time = - System.nanoTime(); time += System.nanoTime(); if(debugPrint())
     * { // System.out.println(here() + " count : " + (count) + " ms"); //
     * System.out.println(here() + " put : " + (total/(1000000)) + " ms");
     * System.out.println(here() + " for : " + (time/(1000000)) + " ms");
     * System.out.println(here() + " data.size() : " + size());
     * System.out.println(here() + " balance.check3"); }
     *
     * }
     */

    @Override
    public void moveAtSyncCount(final ArrayList<IntLongPair> moveList, final MoveManager mm) throws Exception {
        for (final IntLongPair pair : moveList) {
            if (_debug_level > 5) {
                System.out.println("MOVE src: " + here() + " dest: " + pair.first + " size: " + pair.second);
            }
            if (pair.second > Integer.MAX_VALUE) {
                throw new Error("One place cannot receive so much elements: " + pair.second);
            }
            moveAtSyncCount((int) pair.second, placeGroup.get(pair.first), mm);
        }
    }

    // TODO different naming convention of balance methods with DistMap

    public void moveAtSyncCount(int count, Place dest, MoveManager mm) {
        if (count == 0) {
            return;
        }
        moveAtSync(getNKeys(count), dest, mm);
    }

    /*
     * Abstractovdef create(placeGroup: PlaceGroup, team: TeamOperations, init:
     * ()=>Map[T, U]){ // return new DistMap[T,U](placeGroup, init) as
     * AbstractDistCollection[Map[T,U]]; return null as
     * AbstractDistCollection[Map[T,U]]; }
     */
    /*
     * public def versioningMap(srcName : String){ // return new
     * BranchingManager[DistMap[T,U], Map[T,U]](srcName, this); return null as
     * BranchingManager[DistMap[T,U], Map[T,U]]; }
     */

    @Override
    public TeamedPlaceGroup placeGroup() {
        return placeGroup;
    }

    protected V putForMove(K key, V value) {
        if (data.containsKey(key)) {
            throw new RuntimeException("DistMap cannot override existing entry: " + key);
        }
        return data.put(key, value);
    }

    /**
     * Reduce the all elements including other place using the given operation.
     *
     * @param <S>  type of the result produced by the reduction operation
     * @param lop  the operation using in the local reduction.
     * @param gop  the operation using in the reduction of the results of the local
     *             reduction.
     * @param unit the zero value of the reduction.
     * @return the result of the reduction.
     */
    public <S> S reduce(BiFunction<S, V, S> lop, BiFunction<S, S, S> gop, S unit) {
        // TODO
        throw new Error("Not implemented yet.");
        /*
         * val reducer = new Reducible[S]() { public def zero() = unit; public operator
         * this(a: S, b: S) = gop(a, b); }; return finish (reducer) {
         * placeGroup.broadcastFlat(() => { offer(reduceLocal(lop, unit)); }); };
         */
    }

    /**
     * Reduce the all elements including other place using the given operation.
     *
     * @param op   the operation.
     * @param unit the neutral element of the reduction.
     * @return the result of the reduction.
     */
    public V reduce(BiFunction<V, V, V> op, V unit) {
        return reduce(op, op, unit);
    }

    @Override
    public void relocate(Distribution<K> rule) throws Exception {
        relocate(rule, new CollectiveMoveManager(placeGroup));
    }

    @Override
    public void relocate(Distribution<K> rule, CollectiveMoveManager mm) throws Exception {
        for (final K key : data.keySet()) {
            final Place place = rule.location(key);
            if (place == null) {
                throw new NullPointerException("DistMap.relocate must not relocate entries to null place");
            }
            moveAtSync(key, place, mm);
        }
        mm.sync();
    }

    @Override
    public void relocate(Function<K, Place> rule) throws Exception {
        relocate(rule, new CollectiveMoveManager(placeGroup));
    }

    @Override
    public void relocate(Function<K, Place> rule, CollectiveMoveManager mm) throws Exception {
        for (final K key : data.keySet()) {
            final Place place = rule.apply(key);
            moveAtSync(key, place, mm);
        }
        mm.sync();
    }

    @Override
    public <S extends DistCollectionSatellite<DistMap<K, V>, S>> void setSatellite(S s) {
        satellite = s;
    }

    @Override
    public TeamOperations<V, DistMap<K, V>> team() {
        return TEAM;
    }

    @Override
    public String toString() {
        final StringWriter out0 = new StringWriter();
        final PrintWriter out = new PrintWriter(out0);
        out.println("at " + here());
        for (final Map.Entry<K, V> e : data.entrySet()) {
            out.println("key : " + e.getKey() + ", value : " + e.getValue());
        }
        out.close();
        return out0.toString();
    }

    @Override
    public Object writeReplace() throws ObjectStreamException {
        final TeamedPlaceGroup pg1 = placeGroup;
        final GlobalID id1 = id;
        return new LazyObjectReference<>(pg1, id1, () -> {
            return new DistMap<>(pg1, id1);
        });
    }

}