 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at
 * SPDX-License-Identifier: EPL-1.0
package handist.collections;

import static apgas.Constructs.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;

import handist.collections.dist.DistBag;

 * Container for user-defined types.
 * <p>
 * This class implements the {@link ParallelReceiver} interface and as such, is
 * capable of concurrently receiving instances of type T from multiple threads.
 * Its use is therefore recommended in parallel implementations of
 * {@code forEach} methods.
 * @param <T> type of the object handled
public class Bag<T> implements ParallelReceiver<T>, Serializable, KryoSerializable {

     * Iterator class for {@link Bag}
    private class It implements Iterator<T> {
        /** Iterator on the contents of a List<T> */
        Iterator<T> cIter;
        /** Iterator on {@link Bag#bags}, iterates on lists of T */
        Iterator<List<T>> oIter;

         * Constructor. Initializes the two iterators used to iterate on the lists
         * contained in {@link Bag#bags} and the iterator on these lists.
        public It() {
            oIter = bags.iterator();
            if (oIter.hasNext()) {
                cIter =;
            } else {
                cIter = null;

        public boolean hasNext() {
            if (cIter == null) {
                return false;
            while (true) {
                if (cIter.hasNext()) {
                    return true;
                if (oIter.hasNext()) {
                    cIter =;
                } else {
                    cIter = null;
                    return false;

        public T next() {
            if (hasNext()) {
            throw new IndexOutOfBoundsException();


    /** Serial Version UID */
    private static final long serialVersionUID = 5436363137856754303L;

     * Container of type T. The instances are split in multiple lists, one for each
     * of the calls made to {@link #getReceiver()}.
    ConcurrentLinkedDeque<List<T>> bags;

     * Default constructor.
    public Bag() {
        bags = new ConcurrentLinkedDeque<>();

     * Constructor to create a Bag with the same contents as another {@link Bag} or
     * {@link DistBag}.
     * @param bag the bag to copy
    public Bag(Bag<T> bag) {
        bags = bag.bags;

     * Adds a instances contained by the provided {@link Bag} to this instance.
     * @param bag Bag of T
    public void addBag(Bag<T> bag) {

     * Adds a list of T instances to this instance.
     * @param l list of T
    public void addBag(List<T> l) {

     * Removes all contents from this bag.
    public void clear() {

     * Copies this {@link Bag}. The individual T elements contained in this instance
     * are not cloned, both this instance and the returned instance will share the
     * same objects.
    public Bag<T> clone() {
        final Bag<T> result = new Bag<>();
        for (final Collection<T> bag : bags) {
            final ArrayList<T> nbag = new ArrayList<>(bag);
        return result;

    public boolean contains(Object v) {
        for (final Collection<T> bag : bags) {
            if (bag.contains(v)) {
                return true;
        return false;

     * Converts the bag into a list and clears the bag.
     * @return the contents of this instance as a list
    public List<T> convertToList() {
        // TODO: prepare a smarter implementation
        final ArrayList<T> result = new ArrayList<>(this.size());
        for (final List<T> c : bags) {
        return result;

    public void forEach(final Consumer<? super T> action) {
        bags.forEach((Collection<T> bag) -> {

     * Launches a parallel forEach on the elements of this collection. The elements
     * contained in the individual lists (created either through
     * {@link #addBag(List)} or {@link #getReceiver()}) are submitted to the
     * provided {@link ExecutorService}. This method than waits for the completion
     * of the tasks to return.
     * @param pool   the executor service in charge of processing this instance
     * @param action the action to perform on individual elements
    public void forEach(ExecutorService pool, final Consumer<? super T> action) {
        final List<Future<?>> futures = forEachConst(pool, action);
        for (final Future<?> f : futures) {
            try {
            } catch (InterruptedException | ExecutionException e) {
                throw new ParallelExecutionException("[Bag] exception raised by worker threads.", e);

    private List<Future<?>> forEachConst(ExecutorService pool, final Consumer<? super T> action) {
        final ArrayList<Future<?>> futures = new ArrayList<>();
        for (final Collection<T> bag : bags) {
            futures.add(pool.submit(() -> {
        return futures;

    private void forEachParallelBody(final Consumer<List<T>> run) {
        final Bag<T> separated = this.separate(Runtime.getRuntime().availableProcessors() * 2);
        for (final List<T> sub : separated.bags) {
            async(() -> {

     * Adds a new list to this instance and returns a {@link Consumer} which will
     * place the T instances it receives into this dedicated list.
    public Consumer<T> getReceiver() {
        final ArrayList<T> bag = new ArrayList<>();
        return new Consumer<T>() {
            public void accept(T t) {

    public boolean isEmpty() {
        for (final Collection<T> bag : bags) {
            if (!bag.isEmpty()) {
                return false;
        return true;

     * Returns an iterator over the elements contained in this instance
    public Iterator<T> iterator() {
        return new It();

     * Returns the number of parallel lists this bag contains. Note that the
     * returned value may be actually different from the actual value if the
     * {@link #getReceiver()} method is called concurrently.
     * @return number of lists in this bag
    public int listCount() {
        return bags.size();

     * Launches a parallel forEach on the elements of this collection. The elements
     * contained in the individual lists (created either through
     * {@link #addBag(List)} or {@link #getReceiver()}) are submitted to the
     * provided {@link ExecutorService}. This method than waits for the completion
     * of the tasks to return.
     * @param action the action to perform on individual elements
    public void parallelForEach(final Consumer<? super T> action) {
        finish(() -> {
            forEachParallelBody((List<T> sub) -> {

    public void read(Kryo kryo, Input input) {
        final int size = input.readInt();
        bags = new ConcurrentLinkedDeque<>();
        final ArrayList<T> bag1 = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bag1.add((T) kryo.readClassAndObject(input));

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        final int size = in.readInt();
        bags = new ConcurrentLinkedDeque<>();
        final ArrayList<T> bag1 = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bag1.add((T) in.readObject());

     * Removes one element contained in this instance and returns it. If there are
     * no elements in this instance, returns {@code null}.
     * @return the element removed from this collection, or {@code null} if this
     *         instance does not contain anything
    public synchronized T remove() {
        while (true) {
            if (bags.isEmpty()) {
                return null;
            final List<T> bag = bags.getLast();
            if (bag.isEmpty()) {
            } else {
                return bag.remove(bag.size() - 1);

     * Removes and returns {@code n} elements from this instance into a list. If
     * there are less than {@code n} elements contained in this instance, removes
     * and returns all the elements. If this instance is empty, returns an empty
     * list.
     * @param n the number of elements to remove from this instance
     * @return a list containing at most {@code n} elements
    public synchronized List<T> remove(int n) {
        final ArrayList<T> result = new ArrayList<>(n);
        while (n > 0) {
            if (bags.isEmpty()) {
                return result;
            final List<T> bag = bags.getLast();
            if (bag.isEmpty()) {
            result.add(bag.remove(bag.size() - 1));
        return result;

     * Separates the contents of the Bag in <em>n</em> parts. This can be used to
     * apply a forEach method in parallel using 'n' threads for instance. The method
     * returns Bag containing <em>n</em> {@link List}.
     * @param n the number of parts in which to split the Bag
     * @return {@link Bag} containing the same number of elements {@link List}s
    public Bag<T> separate(int n) {
        final int totalNum = this.size();
        final int rem = totalNum % n;
        final int quo = totalNum / n;
        final Bag<T> result = new Bag<>();
        final Iterator<T> it = this.iterator();

        for (int i = 0; i < n; i++) {
            final List<T> r = new ArrayList<>();
            int rest = quo + ((i < rem) ? 1 : 0);
            while (rest > 0) {
        return result;

    public int size() {
        int size = 0;
        for (final Collection<T> bag : bags) {
            size += bag.size();
        return size;

    public String toString() {
        final StringBuilder sb = new StringBuilder();
        for (final Collection<T> bag : bags) {
            sb.append(bag.toString() + ":");
        sb.append("end of Bag");
        return sb.toString();

    public void write(Kryo kryo, Output output) {
        for (final Collection<T> bag : bags) {
            for (final T item : bag) {
                kryo.writeClassAndObject(output, item);

    private void writeObject(ObjectOutputStream out) throws IOException {
        // System.out.println("writeChunk:"+this);
        for (final Collection<T> bag : bags) {
            for (final T item : bag) {