DistRangedMap.java

package handist.collections.dist;

import static apgas.Constructs.*;

import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.function.Function;

import apgas.Place;
import apgas.util.GlobalID;
import apgas.util.SerializableWithReplace;
import handist.collections.LongRange;
import handist.collections.RangedMap;
import handist.collections.dist.util.IntLongPair;
import handist.collections.dist.util.LazyObjectReference;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.Serializer;

/**
 *
 *
 * TODO : handle not for LongRange but for object range.
 *
 * @author yoshikikawanishi
 */
public class DistRangedMap<T> extends RangedMap<T> implements DistributedCollection<T, DistRangedMap<T>>,
        RangeRelocatable<LongRange>, ElementLocationManageable<LongRange>, SerializableWithReplace {

    /**
     * Internal class that handles distribution-related operations.
     */
    protected final transient ElementLocationManager<LongRange> ldist;

    /** Handle for GLB operations */
    // public final DistMapGlb<K, V> GLB;

    public GlobalOperations<T, DistRangedMap<T>> GLOBAL;

    final GlobalID id;

    public transient float[] locality;

    public final TeamedPlaceGroup placeGroup;

    protected final TeamOperations<T, DistRangedMap<T>> TEAM;

    @SuppressWarnings("rawtypes")
    private DistCollectionSatellite satellite;

    /**
     * Construct a DistRangedMap.
     */
    public DistRangedMap() {
        this(TeamedPlaceGroup.getWorld());
    }

    /**
     * Construct a DistRangedMap with given argument.
     *
     * @param placeGroup PlaceGroup.
     */
    public DistRangedMap(TeamedPlaceGroup placeGroup) {
        this(placeGroup, new GlobalID());
    }

    /**
     * Construct a DistRangedMap with given arguments.
     *
     * @param placeGroup PlaceGroup
     * @param id         the global ID used to identify this instance
     */
    public DistRangedMap(TeamedPlaceGroup placeGroup, GlobalID id) {
        super();
        ldist = new ElementLocationManager<>();
        this.placeGroup = placeGroup;
        this.id = id;
        locality = new float[placeGroup.size];
        Arrays.fill(locality, 1.0f);
        this.GLOBAL = new GlobalOperations<>(this,
                (TeamedPlaceGroup pg0, GlobalID gid) -> new DistRangedMap<>(pg0, gid));
        // GLB = new DistMapGlb<>(this);
        TEAM = new TeamOperations<>(this);
        id.putHere(this);
    }

    private void addForMove(LongRange range, byte mType) throws Exception {
        switch (mType) {
        case ElementLocationManager.MOVE_NEW:
            ldist.moveInNew(range);
            break;
        case ElementLocationManager.MOVE_OLD:
            ldist.moveInOld(range);
            break;
        default:
            throw new Exception("SystemError when calling addForMove " + range);
        }
        super.addRange(range);
    }

    @Override
    public void addRange(LongRange range) {
        super.addRange(range);
        ldist.add(range);
    }

    @Override
    public void addRange(LongRange range, Function<Long, T> func) {
        super.addRange(range, func);
        ldist.add(range);
    }

    @Override
    public Collection<LongRange> getAllRanges() {
        return ranges;
    }

    /**
     * Returns a newly created snapshot of the current distribution of registered
     * tracking ranges as a {@link LongRangeDistribution}. This returned
     * distribution's contents will become out-of-date if the contents of this class
     * are relocated, added, and/or removed.
     *
     * @return a new {@link LongRangeDistribution} object representing the current
     *         distribution of this collection
     */
    public LongRangeDistribution getDistribution() {
        return new LongRangeDistribution(ldist.dist);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <S extends DistCollectionSatellite<DistRangedMap<T>, S>> S getSatellite() {
        return (S) satellite;
    }

    @Override
    public void getSizeDistribution(long[] result) {
        for (final Map.Entry<LongRange, Place> entry : ldist.dist.entrySet()) {
            final LongRange k = entry.getKey();
            final Place p = entry.getValue();
            result[placeGroup.rank(p)] += k.size();
        }
    }

    @Override
    public GlobalOperations<T, DistRangedMap<T>> global() {
        return GLOBAL;
    }

    @Override
    public GlobalID id() {
        return id;
    }

    @Override
    public float[] locality() {
        return locality;
    }

    @Override
    public long longSize() {
        return size();
    }

    @Override
    public void moveAtSyncCount(final ArrayList<IntLongPair> moveList, final MoveManager mm) throws Exception {
        throw new UnsupportedOperationException("Not implemented yet");
    }

    @Override
    public void moveRangeAtSync(LongRange range, Place dest, MoveManager manager) {
        if (dest.equals(here())) {
            return;
        }
        final DistRangedMap<T> toBranch = this;
        final Serializer serialize = (ObjectOutput s) -> {
            final RangedMap<T> toMove = toBranch.split(range);
            s.writeInt(toMove.ranges().size());
            for (final LongRange r : toMove.ranges()) {
                final byte mType = ldist.moveOut(r, dest);
                s.writeObject(r);
                s.writeByte(mType);
            }

            final ConcurrentNavigableMap<Long, T> sub = data.subMap(range.from, range.to);
            final int num = sub.size();
            s.writeInt(num);
            final Iterator<Entry<Long, T>> iter = sub.entrySet().iterator();
            while (iter.hasNext()) {
                final long key = iter.next().getKey();
                final T value = this.removeForMove(key);
                s.writeLong(key);
                s.writeObject(value);
            }
        };
        final DeSerializer deserialize = (ObjectInput ds) -> {
            final int rangesNum = ds.readInt();
            for (int i = 0; i < rangesNum; i++) {
                final LongRange r = (LongRange) ds.readObject();
                final byte mType = ds.readByte();
                toBranch.addForMove(r, mType);
            }

            final int num = ds.readInt();
            for (int i = 0; i < num; i++) {
                final long index = ds.readLong();
                @SuppressWarnings("unchecked")
                final T t = (T) ds.readObject();
                toBranch.put(index, t);
            }
        };
        manager.request(dest, serialize, deserialize);
    }

    @Override
    public TeamedPlaceGroup placeGroup() {
        return placeGroup;
    }

    @Override
    public void registerDistribution(UpdatableDistribution<LongRange> distributionToUpdate) {
        ldist.registerDistribution(distributionToUpdate);
    }

    private T removeForMove(Long i) {
        final T t = super.remove(i);
        if (t == null) {
            throw new NullPointerException("removeForMove null pointer value of index: " + i);
        }
        return t;
    }

    @Override
    protected boolean removeRangeTemporary(LongRange range) {
        ldist.remove(range);
        return ranges.remove(range);
    }

    @Override
    public <S extends DistCollectionSatellite<DistRangedMap<T>, S>> void setSatellite(S satellite) {
        this.satellite = satellite;
    }

    @Override
    public TeamOperations<T, DistRangedMap<T>> team() {
        return TEAM;
    }

    @Override
    public void updateDist() {
        ldist.update(placeGroup);
    }

    @Override
    public Object writeReplace() throws ObjectStreamException {
        final TeamedPlaceGroup pg1 = placeGroup;
        final GlobalID id1 = id;
        return new LazyObjectReference<>(pg1, id1, () -> {
            return new DistRangedMap<>(pg1, id1);
        });
    }

}