Bag.java

/*******************************************************************************
 * Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
 *
 * This program and the accompanying materials are made available to you under
 * the terms of the Eclipse Public License 1.0 which accompanies this
 * distribution,
 * and is available at https://www.eclipse.org/legal/epl-v10.html
 *
 * SPDX-License-Identifier: EPL-1.0
 ******************************************************************************/
package handist.collections;

import static apgas.Constructs.*;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import apgas.GlobalRuntime;
import handist.collections.dist.DistBag;
import handist.collections.reducer.BoolReducer;
import handist.collections.reducer.DoubleReducer;
import handist.collections.reducer.FloatReducer;
import handist.collections.reducer.IntReducer;
import handist.collections.reducer.LongReducer;
import handist.collections.reducer.Reducer;
import handist.collections.reducer.ShortReducer;

/**
 * Container for user-defined types.
 * <p>
 * This class implements the {@link ParallelReceiver} interface and as such, is
 * capable of concurrently receiving instances of type T from multiple threads.
 * Its use is therefore recommended in parallel implementations of
 * {@code forEach} methods.
 *
 * @param <T> type of the object handled
 */
public class Bag<T> implements ParallelReceiver<T>, Serializable, KryoSerializable {

    /**
     * Iterator class for {@link Bag}
     */
    private class It implements Iterator<T> {
        /** Iterator on the contents of a List<T> */
        Iterator<T> cIter;
        /** Iterator on {@link Bag#bags}, iterates on lists of T */
        Iterator<List<T>> oIter;

        /**
         * Constructor. Initializes the two iterators used to iterate on the lists
         * contained in {@link Bag#bags} and the iterator on these lists.
         */
        public It() {
            oIter = bags.iterator();
            if (oIter.hasNext()) {
                cIter = oIter.next().iterator();
            } else {
                cIter = null;
            }
        }

        @Override
        public boolean hasNext() {
            if (cIter == null) {
                return false;
            }
            while (true) {
                if (cIter.hasNext()) {
                    return true;
                }
                if (oIter.hasNext()) {
                    cIter = oIter.next().iterator();
                } else {
                    cIter = null;
                    return false;
                }
            }
        }

        @Override
        public T next() {
            if (hasNext()) {
                return cIter.next();
            }
            throw new IndexOutOfBoundsException();
        }

    }

    /** Serial Version UID */
    private static final long serialVersionUID = 5436363137856754303L;

    /**
     * Container of type T. The instances are split in multiple lists, one for each
     * of the calls made to {@link #getReceiver()}.
     */
    ConcurrentLinkedDeque<List<T>> bags;

    /**
     * Default constructor.
     */
    public Bag() {
        bags = new ConcurrentLinkedDeque<>();
    }

    /**
     * Constructor to create a Bag with the same contents as another {@link Bag} or
     * {@link DistBag}.
     *
     * @param bag the bag to copy
     */
    public Bag(Bag<T> bag) {
        bags = bag.bags;
    }

    /**
     * Adds a instances contained by the provided {@link Bag} to this instance.
     *
     * @param bag Bag of T
     */
    public void addBag(Bag<T> bag) {
        bags.addAll(bag.bags);
    }

    /**
     * Adds a list of T instances to this instance.
     *
     * @param l list of T
     */
    public void addBag(List<T> l) {
        bags.add(l);
    }

    /**
     * Removes all contents from this bag.
     */
    @Override
    public void clear() {
        bags.clear();
    }

    /**
     * Copies this {@link Bag}. The individual T elements contained in this instance
     * are not cloned, both this instance and the returned instance will share the
     * same objects.
     */
    @Override
    public Bag<T> clone() {
        final Bag<T> result = new Bag<>();
        for (final Collection<T> bag : bags) {
            final ArrayList<T> nbag = new ArrayList<>(bag);
            result.addBag(nbag);
        }
        return result;
    }

    @Override
    public boolean contains(Object v) {
        for (final Collection<T> bag : bags) {
            if (bag.contains(v)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Converts the bag into a list and clears the bag.
     *
     * @return the contents of this instance as a list
     */
    public List<T> convertToList() {
        // TODO: prepare a smarter implementation
        final ArrayList<T> result = new ArrayList<>(this.size());
        for (final List<T> c : bags) {
            result.addAll(c);
        }
        bags.clear();
        return result;
    }

    @Override
    public void forEach(final Consumer<? super T> action) {
        bags.forEach((Collection<T> bag) -> {
            bag.forEach(action);
        });
    }

    /**
     * Launches a parallel forEach on the elements of this collection. The elements
     * contained in the individual lists (created either through
     * {@link #addBag(List)} or {@link #getReceiver()}) are submitted to the
     * provided {@link ExecutorService}. This method than waits for the completion
     * of the tasks to return.
     *
     * @param pool   the executor service in charge of processing this instance
     * @param action the action to perform on individual elements
     */
    public void forEach(ExecutorService pool, final Consumer<? super T> action) {
        final List<Future<?>> futures = forEachConst(pool, action);
        for (final Future<?> f : futures) {
            try {
                f.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new ParallelExecutionException("[Bag] exception raised by worker threads.", e);
            }
        }
    }

    private List<Future<?>> forEachConst(ExecutorService pool, final Consumer<? super T> action) {
        final ArrayList<Future<?>> futures = new ArrayList<>();
        for (final Collection<T> bag : bags) {
            futures.add(pool.submit(() -> {
                bag.forEach(action);
            }));
        }
        return futures;
    }

    private void forEachParallelBody(final Consumer<List<T>> run) {
        final Bag<T> separated = this.separate(Runtime.getRuntime().availableProcessors() * 2);
        for (final List<T> sub : separated.bags) {
            async(() -> {
                run.accept(sub);
            });
        }
    }

    /**
     * Adds a new list to this instance and returns a {@link Consumer} which will
     * place the T instances it receives into this dedicated list.
     */
    @Override
    public Consumer<T> getReceiver() {
        final ArrayList<T> bag = new ArrayList<>();
        bags.add(bag);
        return new Consumer<T>() {
            @Override
            public void accept(T t) {
                bag.add(t);
            }
        };
    }

    @Override
    public boolean isEmpty() {
        for (final Collection<T> bag : bags) {
            if (!bag.isEmpty()) {
                return false;
            }
        }
        return true;
    }

    /**
     * Returns an iterator over the elements contained in this instance
     */
    @Override
    public Iterator<T> iterator() {
        return new It();
    }

    /**
     * Returns the number of parallel lists this bag contains. Note that the
     * returned value may be actually different from the actual value if the
     * {@link #getReceiver()} method is called concurrently.
     *
     * @return number of lists in this bag
     */
    public int listCount() {
        return bags.size();
    }

    /**
     * Launches a parallel forEach on the elements of this collection. The elements
     * contained in the individual lists (created either through
     * {@link #addBag(List)} or {@link #getReceiver()}) are submitted to the
     * provided {@link ExecutorService}. This method than waits for the completion
     * of the tasks to return.
     *
     * @param action the action to perform on individual elements
     */
    public void parallelForEach(final Consumer<? super T> action) {
        finish(() -> {
            forEachParallelBody((List<T> sub) -> {
                sub.forEach(action);
            });
        });
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel, using an operation provided by default. This {@link Bag} will be
     * emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param boolFunc    defines the value to be reduced
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the value after the reduction has completed
     */
    public boolean parallelReduce(int parallelism, BoolReducer.Op op, Function<T, Boolean> boolFunc) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final BoolReducer reducer = new BoolReducer(op);
        final ArrayList<Future<BoolReducer>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final BoolReducer r = new BoolReducer(op);
            final Future<BoolReducer> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(boolFunc.apply(t));
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<BoolReducer> future : reducerInstances) {
            try {
                reducer.reduce(future.get().value());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer.value();
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel, using an operation provided by default. This {@link Bag} will be
     * emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param doubleFunc  defines the value to be reduced
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the value after the reduction has completed
     */
    public double parallelReduce(int parallelism, DoubleReducer.Op op, Function<T, Double> doubleFunc) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final DoubleReducer reducer = new DoubleReducer(op);
        final ArrayList<Future<DoubleReducer>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final DoubleReducer r = new DoubleReducer(op);
            final Future<DoubleReducer> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(doubleFunc.apply(t));
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<DoubleReducer> future : reducerInstances) {
            try {
                reducer.reduce(future.get().value());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer.value();
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel, using an operation provided by default. This {@link Bag} will be
     * emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param floatFunc   defines the value to be reduced
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the value after the reduction has completed
     */
    public float parallelReduce(int parallelism, FloatReducer.Op op, Function<T, Float> floatFunc) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final FloatReducer reducer = new FloatReducer(op);
        final ArrayList<Future<FloatReducer>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final FloatReducer r = new FloatReducer(op);
            final Future<FloatReducer> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(floatFunc.apply(t));
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<FloatReducer> future : reducerInstances) {
            try {
                reducer.reduce(future.get().value());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer.value();
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel, using an operation provided by default. This {@link Bag} will be
     * emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param intFunc     defines the value to be reduced
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the value after the reduction has completed
     */
    public int parallelReduce(int parallelism, IntReducer.Op op, Function<T, Integer> intFunc) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final IntReducer reducer = new IntReducer(op);
        final ArrayList<Future<IntReducer>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final IntReducer r = new IntReducer(op);
            final Future<IntReducer> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(intFunc.apply(t));
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<IntReducer> future : reducerInstances) {
            try {
                reducer.reduce(future.get().value());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer.value();
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel, using an operation provided by default. This {@link Bag} will be
     * emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param longFunc    defines the value to be reduced
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the value after the reduction has completed
     */
    public long parallelReduce(int parallelism, LongReducer.Op op, Function<T, Long> longFunc) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final LongReducer reducer = new LongReducer(op);
        final ArrayList<Future<LongReducer>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final LongReducer r = new LongReducer(op);
            final Future<LongReducer> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(longFunc.apply(t));
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<LongReducer> future : reducerInstances) {
            try {
                reducer.reduce(future.get().value());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer.value();
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel. This {@link Bag} will be emptied as a result
     *
     * @param <R>         type of the reducer
     * @param reducer     instance into which the result will be placed
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the instance given as parameter after the reduction has terminated
     */
    public <R extends Reducer<R, T>> R parallelReduce(int parallelism, R reducer) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final ArrayList<Future<R>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final R r = reducer.newReducer();
            final Future<R> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(t);
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<R> future : reducerInstances) {
            try {
                reducer.merge(future.get());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer;
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel, using an operation provided by default. This {@link Bag} will be
     * emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param shortFunc   defines the value to be reduced
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @return the value after the reduction has completed
     */
    public short parallelReduce(int parallelism, ShortReducer.Op op, Function<T, Short> shortFunc) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final ShortReducer reducer = new ShortReducer(op);
        final ArrayList<Future<ShortReducer>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every element in every list present
            // in member #bags until we run out of lists
            final ShortReducer r = new ShortReducer(op);
            final Future<ShortReducer> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    for (final T t : list) {
                        r.reduce(shortFunc.apply(t));
                    }
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<ShortReducer> future : reducerInstances) {
            try {
                reducer.reduce(future.get().value());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer.value();
    }

    /**
     * Performs the specified reduction on the elements contained in this bag in
     * parallel. This {@link Bag} will be emptied as a result
     *
     * @param <R>     type of the reducer
     * @param reducer instance into which the result will be placed
     * @return the instance given as parameter after the reduction has terminated
     */
    public <R extends Reducer<R, T>> R parallelReduce(R reducer) {
        return parallelReduce(Runtime.getRuntime().availableProcessors(), reducer);
    }

    /**
     * Performs the reduction operation specified as parameter in parallel with the
     * specified level of parallelism. This {@link Bag} is emptied as a result.
     *
     * @param <R>         the type of the reducer
     * @param parallelism the maximum number of concurrent threads allocated to this
     *                    reduction operation (must be greater or equal to 1)
     * @param reducer     the instance into which the reduction is going to be
     *                    performed
     * @return the instance specified as parameter after the reduction has completed
     *         and has been fully reduced into that instance
     */
    public <R extends Reducer<R, List<T>>> R parallelReduceList(final int parallelism, final R reducer) {
        final ExecutorService pool = GlobalRuntime.getRuntime().getExecutorService();
        final ArrayList<Future<R>> reducerInstances = new ArrayList<>(parallelism);

        for (int i = 0; i < parallelism; i++) {
            // The individual job consists in reducing every list present in member #bags
            // until we run out of lists
            final R r = reducer.newReducer();
            final Future<R> future = pool.submit(() -> {
                List<T> list = bags.poll();
                while (list != null) {
                    r.reduce(list);
                    list = bags.poll();
                }
            }, r);
            reducerInstances.add(future);
        }

        // Here we wait for each future to terminate and we merge then into the instance
        // given as parameter
        for (final Future<R> future : reducerInstances) {
            try {
                reducer.merge(future.get());
            } catch (final InterruptedException e) {
                e.printStackTrace();
            } catch (final ExecutionException e) {
                e.printStackTrace();
            }
        }

        // All the individual reducers have been merged into the given instance, we
        // return this result
        return reducer;
    }

    /**
     * Performs the reduction operation in parallel using the host's level of
     * parallelism. This {@link Bag} instance will be emptied as a result of this
     * method being called.
     *
     * @param <R>     the type of the reducer used
     * @param reducer instance in which the reduction is going to be performed
     * @return the instance specified as parameter after the reduction has completed
     */
    public <R extends Reducer<R, List<T>>> R parallelReduceList(R reducer) {
        return parallelReduceList(Runtime.getRuntime().availableProcessors(), reducer);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void read(Kryo kryo, Input input) {
        final int size = input.readInt();
        bags = new ConcurrentLinkedDeque<>();
        final ArrayList<T> bag1 = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bag1.add((T) kryo.readClassAndObject(input));
        }
        bags.add(bag1);
    }

    @SuppressWarnings("unchecked")
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        final int size = in.readInt();
        bags = new ConcurrentLinkedDeque<>();
        final ArrayList<T> bag1 = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bag1.add((T) in.readObject());
        }
        bags.add(bag1);
    }

    /**
     * Sequentially reduces all the elements contained in this {@link Bag}, using an
     * operation provided by default. This {@link Bag} will be emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param extractFunc defines the value to be reduced
     * @return the value after the reduction has completed
     */
    public boolean reduce(BoolReducer.Op op, Function<T, Boolean> extractFunc) {
        final BoolReducer reducer = new BoolReducer(op);
        while (!isEmpty()) {
            if (reducer.reduce(extractFunc.apply(remove()))) {
                clear();
                return reducer.value();
            }
        }
        return reducer.value();
    }

    /**
     * Sequentially reduces all the elements contained in this {@link Bag}, using an
     * operation provided by default. This {@link Bag} will be emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param extractFunc defines the value to be reduced
     * @return the value after the reduction has completed
     */
    public double reduce(DoubleReducer.Op op, Function<T, Double> extractFunc) {
        final DoubleReducer reducer = new DoubleReducer(op);
        while (!isEmpty()) {
            reducer.reduce(extractFunc.apply(remove()));
        }
        return reducer.value();
    }

    /**
     * Sequentially reduces all the elements contained in this {@link Bag}, using an
     * operation provided by default. This {@link Bag} will be emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param extractFunc defines the value to be reduced
     * @return the value after the reduction has completed
     */
    public float reduce(FloatReducer.Op op, Function<T, Float> extractFunc) {
        final FloatReducer reducer = new FloatReducer(op);
        while (!isEmpty()) {
            reducer.reduce(extractFunc.apply(remove()));
        }
        return reducer.value();
    }

    /**
     * Sequentially reduces all the elements contained in this {@link Bag}, using an
     * operation provided by default. This {@link Bag} will be emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param extractFunc defines the value to be reduced
     * @return the value after the reduction has completed
     */
    public int reduce(IntReducer.Op op, Function<T, Integer> extractFunc) {
        final IntReducer reducer = new IntReducer(op);
        while (!isEmpty()) {
            reducer.reduce(extractFunc.apply(remove()));
        }
        return reducer.value();
    }

    /**
     * Sequentially reduces all the elements contained in this {@link Bag}, using an
     * operation provided by default. This {@link Bag} will be emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param extractFunc defines the value to be reduced
     * @return the value after the reduction has completed
     */
    public long reduce(LongReducer.Op op, Function<T, Long> extractFunc) {
        final LongReducer reducer = new LongReducer(op);
        while (!isEmpty()) {
            reducer.reduce(extractFunc.apply(remove()));
        }
        return reducer.value();
    }

    /**
     * Sequentially reduces all the elements contained in this bag using the reducer
     * provided as parameter. This {@link Bag} will be emptied as a result
     *
     * @param <R>     type of the reducer
     * @param reducer reducer to be used to reduce this parameter
     * @return the reducer provided as parameter after the reduction has completed
     */
    public <R extends Reducer<R, T>> R reduce(R reducer) {
        while (!isEmpty()) {
            reducer.reduce(remove());
        }
        return reducer;
    }

    /**
     * Sequentially reduces all the elements contained in this {@link Bag}, using an
     * operation provided by default. This {@link Bag} will be emptied as a result
     *
     * @param op          specifies the type of reduction operation
     * @param extractFunc defines the value to be reduced
     * @return the value after the reduction has completed
     */
    public short reduce(ShortReducer.Op op, Function<T, Short> extractFunc) {
        final ShortReducer reducer = new ShortReducer(op);
        while (!isEmpty()) {
            reducer.reduce(extractFunc.apply(remove()));
        }
        return reducer.value();
    }

    /**
     * Sequentially reduces all the lists of Ts contained in this bag into the
     * provided reducer and returns that reducer.This {@link Bag} will be emptied as
     * a result
     *
     * @param <R>     the type of the reducer
     * @param reducer the reducer into which this bag needs to be reduced
     * @return the reducer given as parameter after it has been applied to every
     *         list in this {@link Bag}
     */
    public <R extends Reducer<R, List<T>>> R reduceList(R reducer) {
        while (!isEmpty()) {
            reducer.reduce(bags.poll());
        }
        return reducer;
    }

    /**
     * Removes one element contained in this instance and returns it. If there are
     * no elements in this instance, returns {@code null}.
     *
     * @return the element removed from this collection, or {@code null} if this
     *         instance does not contain anything
     */
    public synchronized T remove() {
        while (true) {
            if (bags.isEmpty()) {
                return null;
            }
            final List<T> bag = bags.getLast();
            if (bag.isEmpty()) {
                bags.removeLast();
            } else {
                return bag.remove(bag.size() - 1);
            }
        }
    }

    /**
     * Removes and returns {@code n} elements from this instance into a list. If
     * there are less than {@code n} elements contained in this instance, removes
     * and returns all the elements. If this instance is empty, returns an empty
     * list.
     *
     * @param n the number of elements to remove from this instance
     * @return a list containing at most {@code n} elements
     */
    public synchronized List<T> remove(int n) {
        final ArrayList<T> result = new ArrayList<>(n);
        while (n > 0) {
            if (bags.isEmpty()) {
                return result;
            }
            final List<T> bag = bags.getLast();
            if (bag.isEmpty()) {
                bags.removeLast();
                continue;
            }
            result.add(bag.remove(bag.size() - 1));
            n--;
        }
        return result;
    }

    /**
     * Separates the contents of the Bag in <em>n</em> parts. This can be used to
     * apply a forEach method in parallel using 'n' threads for instance. The method
     * returns Bag containing <em>n</em> {@link List}.
     *
     * @param n the number of parts in which to split the Bag
     * @return {@link Bag} containing the same number of elements {@link List}s
     */
    public Bag<T> separate(int n) {
        final int totalNum = this.size();
        final int rem = totalNum % n;
        final int quo = totalNum / n;
        final Bag<T> result = new Bag<>();
        final Iterator<T> it = this.iterator();

        for (int i = 0; i < n; i++) {
            final List<T> r = new ArrayList<>();
            result.addBag(r);
            int rest = quo + ((i < rem) ? 1 : 0);
            while (rest > 0) {
                r.add(it.next());
                rest--;
            }
        }
        return result;
    }

    @Override
    public int size() {
        int size = 0;
        for (final Collection<T> bag : bags) {
            size += bag.size();
        }
        return size;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder();
        sb.append("[Bag]");
        for (final Collection<T> bag : bags) {
            sb.append(bag.toString() + ":");
        }
        sb.append("end of Bag");
        return sb.toString();
    }

    @Override
    public void write(Kryo kryo, Output output) {
        output.writeInt(size());
        for (final Collection<T> bag : bags) {
            for (final T item : bag) {
                kryo.writeClassAndObject(output, item);
            }
        }
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        // System.out.println("writeChunk:"+this);
        out.writeInt(size());
        for (final Collection<T> bag : bags) {
            for (final T item : bag) {
                out.writeObject(item);
            }
        }
    }
}