Chunk.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import handist.collections.function.LongTBiConsumer;
/**
* Large collection that can contain objects mapped to long indices.
*
* @param <T> type of the elements handled by this instance
*/
public class Chunk<T> extends RangedList<T> implements Serializable, KryoSerializable {
/**
* Iterator class for Chunk
*
* @param <T> type on which the iterator operates
*/
private static class It<T> implements RangedListIterator<T> {
private final Chunk<T> chunk;
private int i; // offset inside the chunk
private int lastReturnedShift = -1;
public It(Chunk<T> chunk) {
this.chunk = chunk;
this.i = -1;
}
public It(Chunk<T> chunk, long i0) {
if (!chunk.range.contains(i0)) {
throw new IndexOutOfBoundsException();
}
this.chunk = chunk;
this.i = (int) (i0 - chunk.range.from - 1);
}
@Override
public boolean hasNext() {
return i + 1 < chunk.size();
}
@Override
public boolean hasPrevious() {
return i > 0;
}
@Override
@SuppressWarnings("unchecked")
public T next() {
lastReturnedShift = 0;
return (T) chunk.a[++i];
}
@Override
public long nextIndex() {
return chunk.range.from + i + 1;
}
@Override
@SuppressWarnings("unchecked")
public T previous() {
lastReturnedShift = 1;
return (T) chunk.a[i--];
}
@Override
public long previousIndex() {
return chunk.range.from + i;
}
@Override
public void set(T e) {
if (lastReturnedShift == -1) {
throw new IllegalStateException("[Chunk.It] Either method "
+ "previous or next needs to be called before method set" + " can be used");
}
chunk.a[i + lastReturnedShift] = e; // FIXME THIS IS NOT CORRECT !!!
}
}
/** Serial Version UID */
private static final long serialVersionUID = -7691832846457812518L;
/** Array containing the T objects */
private Object[] a;
/** Range on which this instance is defined */
private LongRange range;
/**
* Builds a Chunk with the given range and no mapping.
* <p>
* The given LongRange should have a strictly positive size. Giving a
* {@link LongRange} instance with identical lower and upper bounds will result
* in a {@link IllegalArgumentException} being thrown.
* <p>
* If the {@link LongRange} provided has a range that exceeds
* {@value handist.collections.Config#maxChunkSize}, an
* {@link IllegalArgumentException} will be be thrown.
*
* @param range the range of the chunk to build
* @throws IllegalArgumentException if a {@link Chunk} cannot be built with the
* provided range.
*/
public Chunk(LongRange range) {
final long size = range.to - range.from;
if (size > Config.maxChunkSize) {
throw new IllegalArgumentException(
"The given range " + range + " exceeds the maximum Chunk size " + Config.maxChunkSize);
} else if (size <= 0) {
throw new IllegalArgumentException("Cannot build a Chunk with " + "LongRange " + range
+ ", should have a strictly positive" + " size");
}
a = new Object[(int) size];
this.range = range;
}
/**
* Builds a {@link Chunk} with the provided {@link LongRange}. The provided
* initializer generates the initial value of the element for each index. The
* given LongRange should have a strictly positive size. Giving a
* {@link LongRange} instance with identical lower and upper bounds will result
* in a {@link IllegalArgumentException} being thrown.
* <p>
* If the {@link LongRange} provided has a range that exceeds
* {@value handist.collections.Config#maxChunkSize}, an
* {@link IllegalArgumentException} will be be thrown.
*
* @param range the range of the chunk to build
* @param initializer generates the initial value of the element for each index.
* @throws IllegalArgumentException if a {@link Chunk} cannot be built with the
* provided range.
*/
public Chunk(LongRange range, Function<Long, T> initializer) {
this(range);
range.forEach((long index) -> {
a[(int) (index - range.from)] = initializer.apply(index);
});
}
/**
* Builds a {@link Chunk} with the provided {@link LongRange} and an initial
* mapping for each long in the object array. The provided {@link LongRange} and
* Object array should have the same size. An {@link IllegalArgumentException}
* will be thrown otherwise.
* <p>
* The given {@link LongRange} should have a strictly positive size. Giving a
* {@link LongRange} instance with identical lower and upper bounds will result
* in a {@link IllegalArgumentException} being thrown.
* <p>
* If the {@link LongRange} provided has a range that exceeds
* {@value handist.collections.Config#maxChunkSize}, an
* {@link IllegalArgumentException} will be be thrown.
*
* @param range the range of the chunk to build
* @param a array with the initial mapping for every long in the provided
* range
* @throws IllegalArgumentException if a {@link Chunk} cannot be built with the
* provided range and object array.
*/
public Chunk(LongRange range, Object[] a) {
this(range);
if (a.length != range.size()) {
throw new IllegalArgumentException("The length of the provided " + "array <" + a.length
+ "> does not match the size of the " + "LongRange <" + range.size() + ">");
}
// TODO Do we check for objects in array a that are not of type T?
// We can leave as is and let the code fail later in methods get and
// others where a ClassCastException should be thrown.
this.a = a;
}
/**
* Builds a {@link Chunk} with the provided {@link LongRange} with each long in
* the provided range mapped to object t. The given LongRange should have a
* strictly positive size. Giving a {@link LongRange} instance with identical
* lower and upper bounds will result in a {@link IllegalArgumentException}
* being thrown.
* <p>
* If the {@link LongRange} provided has a range that exceeds
* {@value handist.collections.Config#maxChunkSize}, an
* {@link IllegalArgumentException} will be be thrown.
*
* @param range the range of the chunk to build
* @param t initial mapping for every long in the provided range
* @throws IllegalArgumentException if a {@link Chunk} cannot be built with the
* provided range.
*/
public Chunk(LongRange range, T t) {
this(range);
// TODO Is this what we really want to do?
// The mapping will be on the SAME OBJECT for every long in LongRange.
// Don't we need a Generator<T> generator as argument and create an
// instance for each key with Arrays.setAll(a, generator) ?
Arrays.fill(a, t);
}
/**
* Returns a new Chunk defined on the same {@link LongRange} and with the same
* contents as this instance.
*
* @return a copy of this instance
*/
@Override
public Chunk<T> clone() {
// Object[] aClone = a.clone();
final Object[] aClone = new Object[a.length];
//// FIXME: 2018/09/19 Need deep copy?
// for (int i = 0; i < a.length; i++) {
// try {
// aClone[i] = ((Cloneable) a[i]).clone();
// } catch (CloneNotSupportedException e) {
// e.printStackTrace();
// }
// }
Arrays.fill(aClone, a[0]);
System.arraycopy(a, 0, aClone, 0, a.length);
return new Chunk<>(this.range, aClone);
}
@Override
public Chunk<T> cloneRange(LongRange newRange) {
return range == newRange ? clone() : toChunk(newRange);
}
@Override
public boolean contains(Object v) {
for (final Object e : a) {
if (v == null ? e == null : v.equals(e)) {
return true;
}
}
return false;
}
@Override
public boolean equals(Object o) {
return RangedList.equals(this, o);
}
@Override
public <U> void forEach(LongRange range, BiConsumer<? super T, Consumer<? super U>> action,
Consumer<? super U> receiver) {
rangeCheck(range);
// IntStream.range(begin, end).forEach();
for (long i = range.from; i < range.to; i++) {
action.accept(get(i), receiver);
}
}
@Override
public void forEach(LongRange range, final Consumer<? super T> action) {
rangeCheck(range);
final long from = range.from;
final long to = range.to;
for (long i = from; i < to; i++) {
action.accept(get(i));
}
}
@Override
public void forEach(LongRange range, final LongTBiConsumer<? super T> action) {
rangeCheck(range);
// IntStream.range(begin, end).forEach();
for (long i = range.from; i < range.to; i++) {
action.accept(i, get(i));
}
}
/**
* {@inheritDoc}
*/
@Override
public T get(long index) {
if (!getRange().contains(index)) {
throw new IndexOutOfBoundsException(rangeMsg(index));
}
return getUnsafe(index);
}
/**
* {@inheritDoc}
*/
@Override
public LongRange getRange() {
return range;
}
/**
* Returns the element located at the provided index. The provided index is
* presumed valid and as such, no bound checking is done.
*
* @param index index whose value should be returned
* @return the object stored at the provided index, possibly {@code null}
*/
@SuppressWarnings("unchecked")
final T getUnsafe(long index) { // when range check was done
final long offset = index - range.from;
return (T) a[(int) offset];
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return RangedList.hashCode(this);
}
/**
* {@inheritDoc}
*/
@Override
public Iterator<T> iterator() {
return new It<>(this);
}
/**
* Creates and returns a new {@link RangedListIterator} on the elements
* contained by this instance
*
* @return a new {@link RangedListIterator}
*/
public RangedListIterator<T> rangedListIterator() {
return new It<>(this);
}
/**
* Creates and returns a new {@link RangedListIterator} starting at the
* specified index on the elements contained by this instance
*
* @param index the index of the first element to be returned by calling method
* {@link RangedListIterator#next()}
* @return a new {@link RangedListIterator} starting at the specified index
*/
public RangedListIterator<T> rangedListIterator(long index) {
return new It<>(this, index);
}
private String rangeMsg(long index) {
return "[Chunk] range " + index + " is out of " + getRange();
}
private String rangeMsg(LongRange range) {
return "[Chunk] range " + range + " is not contained in " + getRange();
}
@Override
public void read(Kryo kryo, Input input) {
this.range = (LongRange) kryo.readClassAndObject(input);
this.a = (Object[]) kryo.readClassAndObject(input);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.range = (LongRange) in.readObject();
this.a = (Object[]) in.readObject();
// System.out.println("readChunk:"+this);
}
/**
* {@inheritDoc}
*/
@Override
public T set(long index, T value) {
if (!getRange().contains(index)) {
throw new IndexOutOfBoundsException(rangeMsg(index));
}
return setUnsafe(index, value);
}
@SuppressWarnings("unchecked")
private final T setUnsafe(long index, T v) { // when range check was done
final long offset = index - range.from;
final T prev = (T) a[(int) offset];
a[(int) offset] = v;
return prev;
}
/**
* {@inheritDoc}
*/
@Override
public <S> void setupFrom(RangedList<S> from, Function<? super S, ? extends T> func) {
rangeCheck(from.getRange());
if (range.size() > Integer.MAX_VALUE) {
throw new Error("[Chunk] the size of RangedList cannot exceed Integer.MAX_VALUE.");
}
final LongTBiConsumer<S> consumer = (long index, S s) -> {
final T r = func.apply(s);
a[(int) (index - range.from)] = r;
};
from.forEach(consumer);
}
/**
* {@inheritDoc}
*/
@Override
public Object[] toArray() {
return a;
}
/**
* {@inheritDoc}
*/
@Override
public Object[] toArray(LongRange newRange) {
if (!range.contains(newRange)) {
throw new IndexOutOfBoundsException(rangeMsg(newRange));
}
if (newRange.from == range.from && newRange.to == range.to) {
return a;
}
if (newRange.from == newRange.to) {
return new Object[0];
}
final long newSize = (newRange.to - newRange.from);
if (newSize > Config.maxChunkSize) {
throw new IllegalArgumentException("[Chunk] the size of the result cannot exceed " + Config.maxChunkSize);
}
final Object[] newRail = new Object[(int) newSize];
Arrays.fill(newRail, a[0]);
System.arraycopy(a, (int) (newRange.from - range.from), newRail, 0, (int) newSize);
return newRail;
}
/**
* {@inheritDoc}
*/
@Override
public Chunk<T> toChunk(LongRange newRange) {
final Object[] newRail = toArray(newRange);
if (newRail == a) {
return this;
}
if (newRail.length == 0) {
throw new IllegalArgumentException("[Chunk] toChunk(emptyRange) is not permitted.");
}
return new Chunk<>(newRange, newRail);
}
/**
* {@inheritDoc}
*/
@Override
public List<T> toList(LongRange r) {
final ArrayList<T> list = new ArrayList<>((int) r.size());
forEach(r, (t) -> list.add(t));
return list;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
if (range == null) {
return "[Chunk] in Construction";
}
final StringBuilder sb = new StringBuilder();
sb.append("[" + range + "]:");
final long sz = Config.omitElementsToString ? Math.min(size(), Config.maxNumElementsToString) : size();
for (long i = range.from, c = 0; i < range.to && c < sz; i++, c++) {
if (c > 0) {
sb.append(",");
}
sb.append("" + get(i));
// if (c == sz) {
// break;
// }
}
if (sz < size()) {
sb.append("...(omitted " + (size() - sz) + " elements)");
}
return sb.toString();
}
@Override
public void write(Kryo kryo, Output output) {
kryo.writeClassAndObject(output, range);
kryo.writeClassAndObject(output, a);
}
private void writeObject(ObjectOutputStream out) throws IOException {
// System.out.println("writeChunk:"+this);
out.writeObject(range);
out.writeObject(a);
}
/*
* public static void main(String[] args) { long i = 5; Chunk<Integer> c = new
* Chunk<>(new LongRange(10 * i, 11 * i)); System.out.println("prepare: " + c);
* IntStream.range(0, (int) i).forEach(j -> { int v = (int) (10 * i + j);
* System.out.println("set@" + v); c.set(10 * i + j, v); });
* System.out.println("Chunk :" + c);
*
* c.toArray(new LongRange(0, Config.maxChunkSize)); }
*/
}