MultiMapEntryDispatcher.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.dist;
import static apgas.Constructs.*;
import java.io.ByteArrayInputStream;
import java.util.Collection;
import java.util.Map;
import apgas.Place;
import handist.collections.MultiMap;
import handist.collections.dist.util.ObjectInput;
import handist.collections.dist.util.ObjectOutput;
/**
*
* {@link MultiMap} has this class in order to dispatch entris to places defined
* by {@link Distribution}. Relocate entries between places by calling
* {@link MapEntryDispatcher.TeamOperations#dispatch} The dispatched entries are
* added to original {@link MultiMap}.
*
* Please be careful that reference relationships like multiple references to
* one object are not maintained.
*
* @author yoshikikawanishi
*
* @param <K> : The key type of map entry
* @param <V> : The value type of map entry
*/
public class MultiMapEntryDispatcher<K, V> extends MapEntryDispatcher<K, Collection<V>> {
private static final long serialVersionUID = -8897691961652214931L;
/**
* @param distMap
* @param pg
* @param dist
*/
MultiMapEntryDispatcher(MultiMap<K, V> multiMap, TeamedPlaceGroup pg, Distribution<K> dist) {
super(multiMap, pg, dist);
}
@Override
@SuppressWarnings("unchecked")
protected void executeDeserialize(byte[] buf, int[] offsets, int[] sizes) throws Exception {
int current = 0;
for (final Place p : placeGroup.places()) {
final int size = sizes[current];
final int offset = offsets[current];
current++;
if (p.equals(here()) || size == 0) {
continue;
}
final ObjectInput ds = new ObjectInput(new ByteArrayInputStream(buf, offset, size), false);
final int nThreads = ds.readInt();
for (int i = 0; i < nThreads; i++) {
final int count = ds.readInt();
for (int j = 0; j < count; j += 2) { // NOTE: convert key and value(two object), so "J+=2".
final K key = (K) ds.readObject();
final V value = (V) ds.readObject();
put1Local(key, value); // NOTE : If put1 of MultiMap becomes put, it is not necessary to prepare
// this class.
}
ds.reset();
}
ds.close();
}
}
/**
* Unsupported.
*/
@Override
@Deprecated
public Collection<V> put(K key, Collection<V> values) {
throw new UnsupportedOperationException();
}
/**
* Put a new entry. The entries are relocated when #relocate is called.
*
* @param key the key of the new entry.
* @param value the value of the new entry.
* @return If the destination associated with the key is here, return the value.
* If not, return null.
*/
public boolean put1(K key, V value) {
final Place next = distribution.location(key);
if (next.equals(here())) {
return put1Local(key, value);
}
final ObjectOutput out = getOutput(next);
out.writeObject(key);
out.writeObject(value);
out.flush();
return false;
}
private boolean put1Local(K key, V value) {
return ((MultiMap<K, V>) base).put1(key, value);
}
/**
* Unsupported.
*/
@Override
@Deprecated
public void putAll(Map<? extends K, ? extends Collection<V>> m) {
throw new UnsupportedOperationException();
}
}