ParallelMap.java
package handist.collections;
import static apgas.Constructs.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
public class ParallelMap<K, V> implements Map<K, V>, Serializable {
private static final long serialVersionUID = 2613768150080256264L;
protected Map<K, V> data;
protected Function<K, V> proxyGenerator;
public ParallelMap() {
this(new HashMap<>());
}
protected ParallelMap(Map<K, V> data) {
this.data = data;
}
/**
* Remove the all local entries.
*/
@Override
public void clear() {
data.clear();
}
/**
* Return true if the specified entry is exist in the local collection.
*
* @param key a key.
* @return true is the specified object is a key present in the local map,
*/
@Override
public boolean containsKey(Object key) {
return data.containsKey(key);
}
/**
* Indicates if the provided value is contained in the local map.
*/
@Override
public boolean containsValue(Object value) {
return data.containsValue(value);
}
boolean debugPrint() {
return true;
}
/**
* Removes the provided key from the local map, returns {@code true} if there
* was a previous obejct mapped to this key, {@code false} if there were no
* mapping with this key or if the mapping was a {@code null} object
*
* @param key the key to remove from this local map
* @return true if a mapping was removed as a result of this operation, false
* otherwise
*/
public boolean delete(K key) {
final V result = data.remove(key);
return (result != null);
}
/**
* Return the Set of local entries.
*
* @return the Set of local entries.
*/
@Override
public Set<Entry<K, V>> entrySet() {
return data.entrySet();
}
/**
* Apply the specified operation with each Key/Value pair contained in the local
* collection.
*
* @param action the operation to perform
*/
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
if (!data.isEmpty()) {
data.forEach(action);
}
}
public void forEach(SerializableConsumer<V> action) {
for (final Entry<K, V> entry : data.entrySet()) {
action.accept(entry.getValue());
}
}
private void forEachParallelBodyLocal(SerializableConsumer<V> action) {
final List<Collection<V>> separated = separateLocalValues(Runtime.getRuntime().availableProcessors() * 2);
for (final Collection<V> sub : separated) {
async(() -> {
sub.forEach(action);
});
}
}
/**
* Helper method which separates the keys contained in the local map into even
* batches for the number of threads available on the system and applies the
* provided action on each key/value pair contained in the collection in
* parallel
*
* @param action action to perform on the key/value pair contained in the map
*/
private void forEachParallelKey(SerializableBiConsumer<? super K, ? super V> action) {
final int batches = Runtime.getRuntime().availableProcessors();
// Dispatch the existing keys into batches
final List<Collection<K>> keys = new ArrayList<>(batches);
for (int i = 0; i < batches; i++) {
keys.add(new HashSet<>());
}
// Round-robin of keys into batches
int i = 0;
for (final K k : data.keySet()) {
keys.get(i++).add(k);
if (i >= batches) {
i = 0;
}
}
// Spawn asynchronous activity for each batch
for (final Collection<K> keysToProcess : keys) {
async(() -> {
// Apply the supplied action on each key in the batch
for (final K key : keysToProcess) {
final V value = data.get(key);
action.accept(key, value);
}
});
}
}
/**
* Return the element for the provided key. If there is no element at the index,
* return null.
*
* When an agent generator is set on this instance and there is no element at
* the index, a proxy value for the index is generated as a return value.
*
* @param key the index of the value to retrieve
* @return the element associated with {@code key}, or null if this map contains
* no mapping for the key
*/
@SuppressWarnings("unchecked")
@Override
public V get(Object key) {
final V result = data.get(key);
if (result != null) {
return result;
}
if (proxyGenerator != null && !data.containsKey(key)) {
return proxyGenerator.apply((K) key);
} else {
return null;
}
}
/**
* Indicates if the local distributed map is empty or not
*
* @return {@code true} if there are no mappings in the local map
*/
@Override
public boolean isEmpty() {
return data.isEmpty();
}
/**
* Return the Set of local keys.
*
* @return the Set of local keys.
*/
@Override
public Set<K> keySet() {
return data.keySet();
}
/**
* Apply the same operation on the all elements including remote places and
* creates a new {@link ParallelMap} with the same keys as this instance and the
* result of the mapping operation as values.
*
* @param <W> result type of mapping operation
* @param op the map operation from type <code>V</code> to <code>W</code>
* @return a DistMap from <code>K</code> to <code>W</code> built from applying
* the mapping operation on each element of this instance
*/
public <W> ParallelMap<K, W> map(Function<V, W> op) {
throw new Error("not supported yet");
// TODO
/*
* return new ParallelMap<T,S>(placeGroup, team, () -> { val dst = new
* HashMap<T,S>(); for (entry in entries()) { val key = entry.getKey(); val
* value = entry.getValue(); dst(key) = op(value); } return dst; });
*/
}
/**
* Parallel version of {@link #forEach(BiConsumer)}
*
* @param action the action to perform on every key/value pair contained in this
* local map
*/
public void parallelForEach(SerializableBiConsumer<? super K, ? super V> action) {
finish(() -> {
forEachParallelKey(action);
});
}
public void parallelForEach(SerializableConsumer<V> action) {
parallelForEachLocal(action);
}
private void parallelForEachLocal(SerializableConsumer<V> action) {
finish(() -> {
forEachParallelBodyLocal(action);
});
}
void printLocalData() {
System.out.println(this);
}
/**
* Put a new entry.
*
* @param key the key of the new entry.
* @param value the value of the new entry.
* @return the previous value associated with {@code key}, or {@code null} if
* there was no mapping for {@code key}.(A {@code null} return can also
* indicate that the map previously associated {@code null} with
* {@code key}.)
*/
@Override
public V put(K key, V value) {
return data.put(key, value);
}
/**
* Adds all the mappings contained in the specified map into this local map.
*/
@Override
public void putAll(java.util.Map<? extends K, ? extends V> m) {
data.putAll(m);
}
/**
* Reduce the all local elements using the given operation.
*
* @param <S> type of the result produced by the reduction operation
* @param op the operation used in the reduction
* @param unit the neutral element of the reduction operation
* @return the result of the reduction
*/
public <S> S reduceLocal(BiFunction<S, V, S> op, S unit) {
S accum = unit;
for (final Map.Entry<K, V> entry : data.entrySet()) {
accum = op.apply(accum, entry.getValue());
}
return accum;
}
/**
* Remove the entry corresponding to the specified key in the local map.
*
* @param key the key corresponding to the value.
* @return the previous value associated with the key, or {@code null} if there
* was no existing mapping (or the key was mapped to {@code null})
*/
@Override
public V remove(Object key) {
return data.remove(key);
}
private List<Collection<V>> separateLocalValues(int n) {
final List<Collection<V>> result = new ArrayList<>(n);
final long totalNum = size();
final long rem = totalNum % n;
final long quo = totalNum / n;
if (data.isEmpty()) {
return result;
}
final Iterator<V> it = data.values().iterator();
List<V> list = new ArrayList<>();
for (long i = 0; i < n; i++) {
list = new ArrayList<>();
final long count = quo + ((i < rem) ? 1 : 0);
for (long j = 0; j < count; j++) {
if (it.hasNext()) {
list.add(it.next());
}
}
result.add(list);
}
return result;
}
/**
* Sets the proxy generator for this instance.
* <p>
* The proxy will be used to generate values when accesses to a key not
* contained in this instance is made. Instead of throwing an exception, the
* proxy will be called with the attempted index and the program will continue
* with the value returned by the proxy.
* <p>
* This feature is similar to {@link Map#getOrDefault(Object, Object)}
* operation, the difference being that instead of returning a predetermined
* default value, the provided function is called with the key.
*
* @param proxy function which takes a key "K" as parameter and returns a "V",
* or {@code null} to remove any previously set proxy
*/
public void setProxyGenerator(Function<K, V> proxy) {
proxyGenerator = proxy;
}
/**
* Return the number of local entries.
*
* @return the number of the local entries.
*/
@Override
public int size() {
return data.size();
}
/**
* Returns all the values of this local map in a collection.
*/
@Override
public Collection<V> values() {
return data.values();
}
}