Bag.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections;

import static apgas.Constructs.*;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import handist.collections.dist.DistBag;

/**
 * Container for user-defined types.
 * <p>
 * This class implements the {@link ParallelReceiver} interface and as such, is
 * capable of concurrently receiving instances of type T from multiple threads.
 * Its use is therefore recommended in parallel implementations of
 * {@code forEach} methods.
 *
 * @param <T> type of the object handled
 */
public class Bag<T> implements ParallelReceiver<T>, Serializable, KryoSerializable {

    /**
     * Iterator class for {@link Bag}
     */
    private class It implements Iterator<T> {
        /** Iterator on the contents of a List<T> */
        Iterator<T> cIter;
        /** Iterator on {@link Bag#bags}, iterates on lists of T */
        Iterator<List<T>> oIter;

        /**
         * Constructor. Initializes the two iterators used to iterate on the lists
         * contained in {@link Bag#bags} and the iterator on these lists.
         */
        public It() {
            oIter = bags.iterator();
            if (oIter.hasNext()) {
                cIter = oIter.next().iterator();
            } else {
                cIter = null;
            }
        }

        @Override
        public boolean hasNext() {
            if (cIter == null) {
                return false;
            }
            while (true) {
                if (cIter.hasNext()) {
                    return true;
                }
                if (oIter.hasNext()) {
                    cIter = oIter.next().iterator();
                } else {
                    cIter = null;
                    return false;
                }
            }
        }

        @Override
        public T next() {
            if (hasNext()) {
                return cIter.next();
            }
            throw new IndexOutOfBoundsException();
        }

    }

    /** Serial Version UID */
    private static final long serialVersionUID = 5436363137856754303L;

    /**
     * Container of type T. The instances are split in multiple lists, one for each
     * of the calls made to {@link #getReceiver()}.
     */
    ConcurrentLinkedDeque<List<T>> bags;

    /**
     * Default constructor.
     */
    public Bag() {
        bags = new ConcurrentLinkedDeque<>();
    }

    /**
     * Constructor to create a Bag with the same contents as another {@link Bag} or
     * {@link DistBag}.
     *
     * @param bag the bag to copy
     */
    public Bag(Bag<T> bag) {
        bags = bag.bags;
    }

    /**
     * Adds a instances contained by the provided {@link Bag} to this instance.
     *
     * @param bag Bag of T
     */
    public void addBag(Bag<T> bag) {
        bags.addAll(bag.bags);
    }

    /**
     * Adds a list of T instances to this instance.
     *
     * @param l list of T
     */
    public void addBag(List<T> l) {
        bags.add(l);
    }

    /**
     * Removes all contents from this bag.
     */
    @Override
    public void clear() {
        bags.clear();
    }

    /**
     * Copies this {@link Bag}. The individual T elements contained in this instance
     * are not cloned, both this instance and the returned instance will share the
     * same objects.
     */
    @Override
    public Bag<T> clone() {
        final Bag<T> result = new Bag<>();
        for (final Collection<T> bag : bags) {
            final ArrayList<T> nbag = new ArrayList<>(bag);
            result.addBag(nbag);
        }
        return result;
    }

    @Override
    public boolean contains(Object v) {
        for (final Collection<T> bag : bags) {
            if (bag.contains(v)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Converts the bag into a list and clears the bag.
     *
     * @return the contents of this instance as a list
     */
    public List<T> convertToList() {
        // TODO: prepare a smarter implementation
        final ArrayList<T> result = new ArrayList<>(this.size());
        for (final List<T> c : bags) {
            result.addAll(c);
        }
        bags.clear();
        return result;
    }

    @Override
    public void forEach(final Consumer<? super T> action) {
        bags.forEach((Collection<T> bag) -> {
            bag.forEach(action);
        });
    }

    /**
     * Launches a parallel forEach on the elements of this collection. The elements
     * contained in the individual lists (created either through
     * {@link #addBag(List)} or {@link #getReceiver()}) are submitted to the
     * provided {@link ExecutorService}. This method than waits for the completion
     * of the tasks to return.
     *
     * @param pool   the executor service in charge of processing this instance
     * @param action the action to perform on individual elements
     */
    public void forEach(ExecutorService pool, final Consumer<? super T> action) {
        final List<Future<?>> futures = forEachConst(pool, action);
        for (final Future<?> f : futures) {
            try {
                f.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new ParallelExecutionException("[Bag] exception raised by worker threads.", e);
            }
        }
    }

    private List<Future<?>> forEachConst(ExecutorService pool, final Consumer<? super T> action) {
        final ArrayList<Future<?>> futures = new ArrayList<>();
        for (final Collection<T> bag : bags) {
            futures.add(pool.submit(() -> {
                bag.forEach(action);
            }));
        }
        return futures;
    }

    private void forEachParallelBody(final Consumer<List<T>> run) {
        final Bag<T> separated = this.separate(Runtime.getRuntime().availableProcessors() * 2);
        for (final List<T> sub : separated.bags) {
            async(() -> {
                run.accept(sub);
            });
        }
    }

    /**
     * Adds a new list to this instance and returns a {@link Consumer} which will
     * place the T instances it receives into this dedicated list.
     */
    @Override
    public Consumer<T> getReceiver() {
        final ArrayList<T> bag = new ArrayList<>();
        bags.add(bag);
        return new Consumer<T>() {
            @Override
            public void accept(T t) {
                bag.add(t);
            }
        };
    }

    @Override
    public boolean isEmpty() {
        for (final Collection<T> bag : bags) {
            if (!bag.isEmpty()) {
                return false;
            }
        }
        return true;
    }

    /**
     * Returns an iterator over the elements contained in this instance
     */
    @Override
    public Iterator<T> iterator() {
        return new It();
    }

    /**
     * Returns the number of parallel lists this bag contains. Note that the
     * returned value may be actually different from the actual value if the
     * {@link #getReceiver()} method is called concurrently.
     *
     * @return number of lists in this bag
     */
    public int listCount() {
        return bags.size();
    }

    /**
     * Launches a parallel forEach on the elements of this collection. The elements
     * contained in the individual lists (created either through
     * {@link #addBag(List)} or {@link #getReceiver()}) are submitted to the
     * provided {@link ExecutorService}. This method than waits for the completion
     * of the tasks to return.
     *
     * @param action the action to perform on individual elements
     */
    public void parallelForEach(final Consumer<? super T> action) {
        finish(() -> {
            forEachParallelBody((List<T> sub) -> {
                sub.forEach(action);
            });
        });
    }

    @SuppressWarnings("unchecked")
    @Override
    public void read(Kryo kryo, Input input) {
        final int size = input.readInt();
        bags = new ConcurrentLinkedDeque<>();
        final ArrayList<T> bag1 = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bag1.add((T) kryo.readClassAndObject(input));
        }
        bags.add(bag1);
    }

    @SuppressWarnings("unchecked")
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        final int size = in.readInt();
        bags = new ConcurrentLinkedDeque<>();
        final ArrayList<T> bag1 = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bag1.add((T) in.readObject());
        }
        bags.add(bag1);
    }

    /**
     * Removes one element contained in this instance and returns it. If there are
     * no elements in this instance, returns {@code null}.
     *
     * @return the element removed from this collection, or {@code null} if this
     *         instance does not contain anything
     */
    public synchronized T remove() {
        while (true) {
            if (bags.isEmpty()) {
                return null;
            }
            final List<T> bag = bags.getLast();
            if (bag.isEmpty()) {
                bags.removeLast();
            } else {
                return bag.remove(bag.size() - 1);
            }
        }
    }

    /**
     * Removes and returns {@code n} elements from this instance into a list. If
     * there are less than {@code n} elements contained in this instance, removes
     * and returns all the elements. If this instance is empty, returns an empty
     * list.
     *
     * @param n the number of elements to remove from this instance
     * @return a list containing at most {@code n} elements
     */
    public synchronized List<T> remove(int n) {
        final ArrayList<T> result = new ArrayList<>(n);
        while (n > 0) {
            if (bags.isEmpty()) {
                return result;
            }
            final List<T> bag = bags.getLast();
            if (bag.isEmpty()) {
                bags.removeLast();
                continue;
            }
            result.add(bag.remove(bag.size() - 1));
            n--;
        }
        return result;
    }

    /**
     * Separates the contents of the Bag in <em>n</em> parts. This can be used to
     * apply a forEach method in parallel using 'n' threads for instance. The method
     * returns Bag containing <em>n</em> {@link List}.
     *
     * @param n the number of parts in which to split the Bag
     * @return {@link Bag} containing the same number of elements {@link List}s
     */
    public Bag<T> separate(int n) {
        final int totalNum = this.size();
        final int rem = totalNum % n;
        final int quo = totalNum / n;
        final Bag<T> result = new Bag<>();
        final Iterator<T> it = this.iterator();

        for (int i = 0; i < n; i++) {
            final List<T> r = new ArrayList<>();
            result.addBag(r);
            int rest = quo + ((i < rem) ? 1 : 0);
            while (rest > 0) {
                r.add(it.next());
                rest--;
            }
        }
        return result;
    }

    @Override
    public int size() {
        int size = 0;
        for (final Collection<T> bag : bags) {
            size += bag.size();
        }
        return size;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder();
        sb.append("[Bag]");
        for (final Collection<T> bag : bags) {
            sb.append(bag.toString() + ":");
        }
        sb.append("end of Bag");
        return sb.toString();
    }

    @Override
    public void write(Kryo kryo, Output output) {
        output.writeInt(size());
        for (final Collection<T> bag : bags) {
            for (final T item : bag) {
                kryo.writeClassAndObject(output, item);
            }
        }
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        // System.out.println("writeChunk:"+this);
        out.writeInt(size());
        for (final Collection<T> bag : bags) {
            for (final T item : bag) {
                out.writeObject(item);
            }
        }
    }
}