DistMultiMap.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.dist;

import static apgas.Constructs.*;

import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import apgas.Place;
import apgas.util.GlobalID;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;
import mpjbuf.IllegalArgumentException;

/**
 * A Map data structure spread over the multiple places. This class allows
 * multiple values for one key, those values being stored in a list.
 *
 * @param <K> type of the key used in the {@link DistMultiMap}
 * @param <V> type of the elements contained in the lists to which the keys map
 */
public class DistMultiMap<K, V> extends DistMap<K, Collection<V>> {

    /**
     * Construct a DistMultiMap.
     */
    public DistMultiMap() {
        this(TeamedPlaceGroup.getWorld());
    }

    /**
     * Construct a DistMultiMap with given argument.
     *
     * @param placeGroup PlaceGroup.
     */
    public DistMultiMap(TeamedPlaceGroup placeGroup) {
        super(placeGroup);
    }

    /**
     * Construct a DistMultiMap with given arguments.
     *
     * @param placeGroup PlaceGroup
     * @param id         the global ID used to identify this instance
     */
    public DistMultiMap(TeamedPlaceGroup placeGroup, GlobalID id) {
        this(placeGroup, id, new HashMap<>());
    }

    /**
     * Construct a DistMultiMap with given arguments.
     *
     * @param placeGroup PlaceGroup
     * @param id         the global ID used to identify this instance
     * @param data       the container to be used
     */
    protected DistMultiMap(TeamedPlaceGroup placeGroup, GlobalID id, Map<K, Collection<V>> data) {
        super(placeGroup, id, data);
        super.GLOBAL = new GlobalOperations<>(this,
                (TeamedPlaceGroup pg0, GlobalID gid) -> new DistMultiMap<>(pg0, gid));
    }

    /**
     * create an empty collection to hold values for a key. Please define the
     * adequate container for the class.
     *
     * @return the created collection.
     */
    protected Collection<V> createEmptyCollection() {
        return new ArrayList<>();
    }

    // TODO ...
    // public void setupBranches(DistMap.Generator<T,List<U>> gen)

    /**
     * Apply the same operation onto all the local entries.
     *
     * @param op the operation.
     */
    public void forEach1(BiConsumer<K, V> op) {
        for (final Entry<K, Collection<V>> entry : data.entrySet()) {
            final K key = entry.getKey();
            for (final V value : entry.getValue()) {
                op.accept(key, value);
            }
        }
    }

    /**
     * Return {@link MultiMapEntryDispatcher} instance that enable fast relocation
     * between places than normal. One {@link DistMultiMap} has one dispatcher.
     *
     * @param rule Determines the dispatch destination.
     * @return :
     */
    @Override
    public MultiMapEntryDispatcher<K, V> getObjectDispatcher(Distribution<K> rule) {
        return new MultiMapEntryDispatcher<>(this, rule);
    }

    /**
     * Return {@link MapEntryDispatcher} instance that enable fast relocation
     * between places than normal. One {@link DistMap} has one dispatcher.
     *
     * @param rule Determines the dispatch destination.
     * @param pg   Relocate in this placegroup.
     * @return :
     * @throws IllegalArgumentException :
     */
    @Override
    public MultiMapEntryDispatcher<K, V> getObjectDispatcher(Distribution<K> rule, TeamedPlaceGroup pg)
            throws IllegalArgumentException {
        return new MultiMapEntryDispatcher<>(this, pg, rule);
    }

    /**
     * Apply the same operation on each element including remote places and creates
     * a new {@link DistMultiMap} with the same keys as this instance and the result
     * of the mapping operation as values.
     *
     * @param <W> the type of the result of the map operation
     * @param op  the mapping operation from {@code V} to {@code W}
     * @return a new DistMultiMap which consists of the result of the operation.
     */
    public <W> DistMultiMap<K, W> map(BiFunction<K, V, W> op) {
        // TODO
        throw new Error("not implemented yet");
        /*
         * return new DistMultiMap[T,S](placeGroup, team, () => { val dst = new
         * HashMap[T,List[S]](); for (entry in data.entries()) { val key =
         * entry.getKey(); val old = entry.getValue(); val list = new
         * ArrayList[S](old.size()); for (v in old) { list.add(op(key, v)); }
         * dst(entry.getKey()) = list; } return dst; });
         */
    }

    @Override
    @SuppressWarnings("unchecked")
    public void moveAtSync(Collection<K> keys, Place pl, MoveManager mm) {
        if (pl.equals(here())) {
            return;
        }
        final DistMultiMap<K, V> collection = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final int size = keys.size();
            s.writeInt(size);
            for (final K key : keys) {
                final Collection<V> value = collection.remove(key);
                s.writeObject(key);
                s.writeObject(value);
            }
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final int size = ds.readInt();
            for (int i = 1; i <= size; i++) {
                final K key = (K) ds.readObject();
                final Collection<V> value = (Collection<V>) ds.readObject();
                collection.putForMove(key, value);
            }
        };
        mm.request(pl, serialize, deserialize);
    }

    @Override
    @SuppressWarnings("unchecked")
    public void moveAtSync(K key, Place pl, MoveManager mm) {
        if (pl.equals(here())) {
            return;
        }
        if (!containsKey(key)) {
            throw new RuntimeException("DistMultiMap cannot move uncontained entry: " + key);
        }
        final DistMultiMap<K, V> toBranch = this; // using plh@AbstractCol
        final Serializer serialize = (ObjectOutput s) -> {
            final Collection<V> value = this.removeForMove(key);
            // TODO we should check values!=null before transportation
            s.writeObject(key);
            s.writeObject(value);
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final K k = (K) ds.readObject();
            // TODO we should check values!=null before transportation
            final Collection<V> v = (Collection<V>) ds.readObject();
            toBranch.putForMove(k, v);
        };
        mm.request(pl, serialize, deserialize);
    }

    /**
     * Puts a new value to the list of specified entry.
     *
     * @param key   the key of the entry
     * @param value the new value to be added to the mappings of {@code key}.
     * @return {@code true} as the collection is modified as a result (as specified
     *         by {@link Collection#add(Object)}.
     */
    public boolean put1(K key, V value) {
        Collection<V> list = data.get(key);
        if (list == null) {
            list = createEmptyCollection();
            data.put(key, list);
        }
        return list.add(value);
    }

    /**
     * Request that the specified value be put in the list of the given key on the
     * specified place when the method {@link CollectiveMoveManager#sync()} of the
     * specified {@link CollectiveMoveManager} instance is called.
     *
     * @param key   the key of the list.
     * @param value the value to be added to the mapping of {@code key}
     * @param pl    the destination place
     * @param mm    MoveManagerLocal handling the data transfers
     */
    @SuppressWarnings("unchecked")
    public void putAtSync(K key, V value, Place pl, CollectiveMoveManager mm) {
        final DistMultiMap<K, V> toBranch = this; // using plh@AbstractCol
        final Serializer serialize = (ObjectOutput s) -> {
            s.writeObject(key);
            s.writeObject(value);
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final K k = (K) ds.readObject();
            final V v = (V) ds.readObject();
            toBranch.put1(k, v);
        };
        mm.request(pl, serialize, deserialize);
    }

    public boolean putForMove(K key, Collection<V> values) {
        Collection<V> list = data.get(key);
        if (list == null) {
            list = createEmptyCollection();
            data.put(key, list);
        }
        // TODO we should check values!=null before transportation
        if (values != null) {
            list.addAll(values);
        }
        return false;
    }

    /**
     * Removes the entry corresponding to the specified key.
     *
     * @param key the key whose mapping need to be removed from this instance
     * @return the list of all the mappings to the specified key.
     */
    public Collection<V> removeForMove(K key) {
        final Collection<V> list = data.remove(key);
        return list;
    }

    @Override
    public Object writeReplace() throws ObjectStreamException {
        final TeamedPlaceGroup pg1 = placeGroup;
        final GlobalID id1 = id;
        return new LazyObjectReference<>(pg1, id1, () -> {
            return new DistMultiMap<>(pg1, id1);
        });
    }

    /**
     * Reduce the all the local elements using given function.
     *
     * @param op   the operation.
     * @param unit the zero value of the reduction.
     * @return the result of the reduction.
     */
    /*
     * public def reduceLocal[S](op: (S,U)=>S, unit: S): S { var accum: S = unit;
     * for (entry in data.entries()) { for (value in entry.getValue()) { accum =
     * op(accum, value); } } return accum; }
     *
     * def create(placeGroup: PlaceGroup, team: TeamOperations, init: ()=>Map[T,
     * List[U]]){ // return new DistMultiMap[T,U](placeGroup, init) as
     * AbstractDistCollection[Map[T,List[U]]]; return null as
     * AbstractDistCollection[Map[T, List[U]]]; }
     *
     * public def versioningMapList(srcName : String){ // return new
     * BranchingManager[DistMultiMap[T,U], Map[T,List[U]]](srcName, this); return
     * null as BranchingManager[DistMultiMap[T,U], Map[T,List[U]]]; }
     */
    // TODO
    // In the cunnrent implementation of balance(),
    // DistIdMap treat the number of key as the load of the PE, not using the number
    // of elements in the value lists.
}