DistSortedMap.java

package handist.collections.dist;

import static apgas.Constructs.*;

import java.io.ObjectStreamException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import apgas.Place;
import apgas.util.GlobalID;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;

public class DistSortedMap<K, V> extends DistMap<K, V> implements NavigableMap<K, V>, SortedKeyRelocatable<K> {

    protected ConcurrentNavigableMap<K, V> data;

    /**
     * Construct a DistSortedMap.
     */
    public DistSortedMap() {
        this(TeamedPlaceGroup.getWorld());
    }

    /**
     * Construct a DistSortedMap with given argument.
     *
     * @param placeGroup PlaceGroup.
     */
    public DistSortedMap(TeamedPlaceGroup placeGroup) {
        this(placeGroup, new GlobalID());
    }

    /**
     * Construct a DistSortedMap with given arguments.
     *
     * @param placeGroup PlaceGroup
     * @param id         the global ID used to identify this instance
     */
    public DistSortedMap(TeamedPlaceGroup placeGroup, GlobalID id) {
        this(placeGroup, id, new ConcurrentSkipListMap<>());
    }

    /**
     * Construct a DistSortedMap with given arguments.
     *
     * @param placeGroup PlaceGroup
     * @param id         the global ID used to identify this instance
     * @param data       the container to be used
     */
    protected DistSortedMap(TeamedPlaceGroup placeGroup, GlobalID id, NavigableMap<K, V> data) {
        super(placeGroup, id, data);
        this.data = (ConcurrentNavigableMap<K, V>) super.data;
        super.GLOBAL = new GlobalOperations<>(this,
                (TeamedPlaceGroup pg0, GlobalID gid) -> new DistSortedMap<>(pg0, gid));
    }

    @Override
    public Entry<K, V> ceilingEntry(K key) {
        return data.ceilingEntry(key);
    }

    @Override
    public K ceilingKey(K key) {
        return data.ceilingKey(key);
    }

    @Override
    public Comparator<? super K> comparator() {
        return data.comparator();
    }

    @Override
    public NavigableSet<K> descendingKeySet() {
        return data.descendingKeySet();
    }

    @Override
    public NavigableMap<K, V> descendingMap() {
        return data.descendingMap();
    }

    @Override
    public Entry<K, V> firstEntry() {
        return data.firstEntry();
    }

    @Override
    public K firstKey() {
        return data.firstKey();
    }

    @Override
    public Entry<K, V> floorEntry(K key) {
        return data.floorEntry(key);
    }

    @Override
    public K floorKey(K key) {
        return data.floorKey(key);
    }

    @Override
    public SortedMap<K, V> headMap(K toKey) {
        return data.headMap(toKey);
    }

    @Override
    public NavigableMap<K, V> headMap(K toKey, boolean inclusive) {
        return data.headMap(toKey, inclusive);
    }

    @Override
    public Entry<K, V> higherEntry(K key) {
        return data.higherEntry(key);
    }

    @Override
    public K higherKey(K key) {
        return data.higherKey(key);
    }

    @Override
    public Entry<K, V> lastEntry() {
        return data.lastEntry();
    }

    @Override
    public K lastKey() {
        return data.lastKey();
    }

    @Override
    public Entry<K, V> lowerEntry(K key) {
        return data.lowerEntry(key);
    }

    @Override
    public K lowerKey(K key) {
        return data.lowerKey(key);
    }

    @Override
    public void moveAtSync(K from, K to, Distribution<K> rule, MoveManager mm) {
        final ConcurrentNavigableMap<K, V> sub = data.subMap(from, to);
        if (sub.isEmpty()) {
            return;
        }
        final Iterator<K> iter = sub.keySet().iterator();
        while (iter.hasNext()) {
            final K key = iter.next();
            moveAtSync(key, rule.location(key), mm);
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public void moveAtSync(K from, K to, Place dest, MoveManager mm) {
        if (dest.equals(here())) {
            return;
        }
        final DistSortedMap<K, V> toBranch = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final ConcurrentNavigableMap<K, V> sub = data.subMap(from, to);
            final int num = sub.size();
            s.writeInt(num);
            if (num == 0) {
                return;
            }
            final Iterator<K> iter = sub.keySet().iterator();
            while (iter.hasNext()) {
                final K key = iter.next();
                final V value = this.remove(key);
                if (value == null) {
                    throw new NullPointerException("DistSortedMap.moveAtSync null pointer value of key: " + key);
                }
                s.writeObject(key);
                s.writeObject(value);
            }
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final int num = ds.readInt();
            for (int i = 0; i < num; i++) {
                final K k = (K) ds.readObject();
                final V v = (V) ds.readObject();
                toBranch.putForMove(k, v);
            }
        };
        mm.request(dest, serialize, deserialize);
    }

    @Override
    public NavigableSet<K> navigableKeySet() {
        return data.navigableKeySet();
    }

    @Override
    public Entry<K, V> pollFirstEntry() {
        return data.pollFirstEntry();
    }

    @Override
    public Entry<K, V> pollLastEntry() {
        return data.pollFirstEntry();
    }

    @Override
    public NavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
        return data.subMap(fromKey, fromInclusive, toKey, toInclusive);
    }

    @Override
    public SortedMap<K, V> subMap(K fromKey, K toKey) {
        return data.subMap(fromKey, toKey);
    }

    @Override
    public SortedMap<K, V> tailMap(K fromKey) {
        return data.tailMap(fromKey);
    }

    @Override
    public NavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
        return data.tailMap(fromKey, inclusive);
    }

    @Override
    public Object writeReplace() throws ObjectStreamException {
        final TeamedPlaceGroup pg1 = placeGroup;
        final GlobalID id1 = id;
        return new LazyObjectReference<>(pg1, id1, () -> {
            return new DistSortedMap<>(pg1, id1);
        });
    }

}