CachableArray.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections.dist;

import static apgas.Constructs.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.function.BiConsumer;
import java.util.function.Function;

import apgas.Place;
import apgas.util.PlaceLocalObject;
import apgas.util.SerializableWithReplace;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
import handist.collections.function.DeSerializer;
import handist.collections.function.DeSerializerUsingPlace;
import handist.collections.function.Serializer;

/**
 * A class for handling objects using the Master-Proxy mechanism. The master
 * place has the body of each elements. The proxy places have the branch of each
 * elements.
 *
 * Note: In the current implementation, there are some limitations.
 *
 * o The first place of the PlaceGroup is selected as the master place
 * automatically. o To add any new elements is not allowed. The elements are
 * assigned only in the construction.
 *
 * @param <T> type of the elements handled by the {@link CachableArray}
 */

public class CachableArray<T> extends PlaceLocalObject implements List<T>, SerializableWithReplace {
    /**
     * Create a new CacheableArray using the given arguments. The elements of the
     * newly created CacheableArray and the given collection will be identical. The
     * proxies are also prepared as part of the initialization.
     *
     * @param <T>  type of the elements contained in the instance to create
     * @param pg   {@link TeamedPlaceGroup} on which the {@link CachableArray} will
     *             be prepared
     * @param data initial content to be placed in the {@link CachableArray}
     * @return the handle to the local instance of the {@link CachableArray}
     */

    public static <T> CachableArray<T> make(final TeamedPlaceGroup pg, List<T> data) {
        final Place master = here();
        final ArrayList<T> body = new ArrayList<>();
        body.addAll(data);
        return PlaceLocalObject.make(pg.places(), () -> new CachableArray<>(pg, master, body));
    }

    protected transient ArrayList<T> data;
    public transient Place master;

    public transient TeamedPlaceGroup placeGroup;

    /**
     * Create a new CacheableArray using the given list. data must not be shared
     * with others.
     *
     * @param placeGroup group of hosts suceptible to manipulate this instance
     * @param master     the "master" of the Cachable array
     * @param data       the initial data to create this CachableArray with
     */
    protected CachableArray(TeamedPlaceGroup placeGroup, Place master, ArrayList<T> data) {
        this.data = data;
        this.placeGroup = placeGroup;
        this.master = master;
    }

    @Override
    public void add(int index, T element) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean add(T e) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean addAll(int index, Collection<? extends T> c) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    public <U> void allreduce(Function<T, U> pack, BiConsumer<T, U> unpack) {
        final CollectiveRelocator.Allgather mm = new CollectiveRelocator.Allgather(placeGroup);
        allreduce(pack, unpack, mm);
        mm.execute();
    }

    public <U> void allreduce(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Allgather mm) {
        final Serializer serProcess = (ObjectOutput s) -> {
            for (final T elem : data) {
                s.writeObject(pack.apply(elem));
            }
        };
        final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place place) -> {
            for (final T elem : data) {
                @SuppressWarnings("unchecked")
                final U diff = (U) ds.readObject();
                unpack.accept(elem, diff);
            }
        };
        mm.request(serProcess, desProcess);
    }

    /**
     * Broadcast from master place to proxy place, packing elements using the
     * specified function. It is assumed that the type U is declared as a struct and
     * that it does not contain any reference.
     * <p>
     * Note: Currently, this method is implemented in too simple way.
     *
     * @param <U>    type used to represent the elements of this instance and that
     *               is going to be transfered to remote hosts. In the
     *               implementation of the <em>pack</em> and <code>unpack</code>,
     *               the programmer may choose to use T, but this allows any other
     *               custom type to be used.
     * @param pack   a function which packs the elements of the master node.
     * @param unpack a function which unpacks the received data and inserts the
     *               unpacked data into the instance local to each proxy.
     */
    public <U> void broadcast(Function<T, U> pack, BiConsumer<T, U> unpack) {
        final CollectiveRelocator.Bcast manager = new CollectiveRelocator.Bcast(placeGroup, master);
        broadcast(pack, unpack, manager);
        manager.execute();
    }

    public <U> void broadcast(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Bcast manager) {
        final Serializer serProcess = (ObjectOutput s) -> {
            for (final T elem : data) {
                s.writeObject(pack.apply(elem));
            }
        };
        final DeSerializer desProcess = (ObjectInput ds) -> {
            for (final T elem : data) {
                @SuppressWarnings("unchecked")
                final U diff = (U) ds.readObject();
                unpack.accept(elem, diff);
            }
        };
        manager.request(serProcess, desProcess);
    }

    @Override
    public void clear() {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean contains(Object o) {
        return data.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return data.containsAll(c);
    }

    @Override
    public T get(int index) {
        return data.get(index);
    }

    @Override
    public int indexOf(Object o) {
        return data.indexOf(o);
    }

    @Override
    public boolean isEmpty() {
        return data.isEmpty();
    }

    @Override
    public Iterator<T> iterator() {
        return Collections.unmodifiableList(data).iterator();
    }

    @Override
    public int lastIndexOf(Object o) {
        return data.lastIndexOf(o);
    }

    @Override
    public ListIterator<T> listIterator() {
        return Collections.unmodifiableList(data).listIterator();
    }

    @Override
    public ListIterator<T> listIterator(int index) {
        return Collections.unmodifiableList(data).listIterator(index);
    }

    /**
     * Return the PlaceGroup on which this instance was created.
     *
     * @return the {@link TeamedPlaceGroup} on which this instance was replicated
     */
    public TeamedPlaceGroup placeGroup() {
        return placeGroup;
    }

    public <U> void reduce(Function<T, U> pack, BiConsumer<T, U> unpack) {
        final CollectiveRelocator.Gather manager = new CollectiveRelocator.Gather(placeGroup, master);
        reduce(pack, unpack, manager);
        manager.execute();
    }

    public <U> void reduce(Function<T, U> pack, BiConsumer<T, U> unpack, CollectiveRelocator.Gather manager) {
        final Serializer serProcess = (ObjectOutput s) -> {
            for (final T elem : data) {
                s.writeObject(pack.apply(elem));
            }
        };
        final DeSerializerUsingPlace desProcess = (ObjectInput ds, Place place) -> {
            for (final T elem : data) {
                @SuppressWarnings("unchecked")
                final U diff = (U) ds.readObject();
                unpack.accept(elem, diff);
            }
        };
        manager.request(serProcess, desProcess);
    }

    @Override
    public T remove(int index) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean remove(Object o) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public T set(int index, T element) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public int size() {
        return data.size();
    }

    @Override
    public List<T> subList(int fromIndex, int toIndex) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public Object[] toArray() {
        throw new UnsupportedOperationException("[CachableArray] No direct access to members is allowed.");
    }

    @Override
    public <S> S[] toArray(S[] a) {
        throw new UnsupportedOperationException("[CachableArray] No modification of members is allowed.");
    }

    @Override
    public String toString() {
        final StringBuffer sb = new StringBuffer();
        final Iterator<T> ei = this.data.iterator();
        sb.append("CacheableArray[");
        while (true) {
            if (ei.hasNext()) {
                sb.append(ei.next());
            } else {
                break;
            }
            if (ei.hasNext()) {
                sb.append(" ");
            }
        }

        sb.append("]");
        return sb.toString();
    }

}