GlobalOperations.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.dist;
import java.io.ObjectStreamException;
import java.util.function.BiFunction;
import apgas.Constructs;
import apgas.Place;
import apgas.util.GlobalID;
import apgas.util.SerializableWithReplace;
import handist.collections.dist.util.MemberOfLazyObjectReference;
import handist.collections.function.SerializableBiConsumer;
import handist.collections.function.SerializableConsumer;
/**
* Class that defines the "Global Operations" that distributed collections
* propose.
*
* @param <T> the type of objects manipulated by the distributed collection
* @param <C> implementing type, should be a class that implements
* {@link DistributedCollection}
*/
public class GlobalOperations<T, C extends DistributedCollection<T, C>> implements SerializableWithReplace {
protected final C localHandle;
protected final BiFunction<TeamedPlaceGroup, GlobalID, ? extends C> lazyCreator;
/**
*
* @param handle the target Distributed Collection
* @param lazyCreator the way to create a branch of a distributed collection to
* a new place.
*/
GlobalOperations(C handle, BiFunction<TeamedPlaceGroup, GlobalID, ? extends C> lazyCreator) {
localHandle = handle;
this.lazyCreator = lazyCreator;
}
public void balance() {
final TeamedPlaceGroup pg = localHandle.placeGroup();
pg.broadcastFlat(() -> {
localHandle.team().teamedBalance();
});
}
public void balance(final float[] balance) {
localHandle.balanceSpecCheck(balance);
final TeamedPlaceGroup pg = localHandle.placeGroup();
pg.broadcastFlat(() -> {
localHandle.team().teamedBalance(balance);
});
}
/**
* Performs the specified action on every instance contained on every host of
* the distributed collection and returns when all operations have been
* completed.
* <p>
* The specified action is performed by a single thread on each host.
*
* @param action action to perform
*/
public void forEach(final SerializableConsumer<T> action) {
localHandle.placeGroup().broadcastFlat(() -> {
localHandle.forEach(action);
});
};
public void gather(final Place destination) {
final TeamedPlaceGroup pg = localHandle.placeGroup();
pg.broadcastFlat(() -> {
localHandle.team().gather(destination);
});
}
/**
* Gathers the size of every local collection and returns it in the provided
* array
*
* @param result the array in which the result will be stored
*/
@SuppressWarnings("rawtypes")
public void getSizeDistribution(final long[] result) {
if (localHandle instanceof ElementLocationManageable) {
((ElementLocationManageable) localHandle).getSizeDistribution(result);
} else {
localHandle.placeGroup().broadcastFlat(() -> {
localHandle.team().getSizeDistribution(result);
});
}
}
/**
* Calls the provided action on the local instance of the distributed collection
* on every place the collection is handled and returns.
*
* @param action action to perform, the first parameter is the Place on which
* the local instance is located, the second parameter is the
* local collection object
*/
public void onLocalHandleDo(SerializableBiConsumer<Place, C> action) {
localHandle.placeGroup().broadcastFlat(() -> {
action.accept(Constructs.here(), localHandle);
});
}
/**
* Performs the specified action on every instance contained on every host of
* the distributed collection and returns when all operations have been
* completed.
* <p>
* The specified action is performed by multiple threads on each host.
*
* @param action action to perform
*/
public void parallelForEach(final SerializableConsumer<T> action) {
localHandle.placeGroup().broadcastFlat(() -> {
localHandle.parallelForEach(action);
});
}
/**
* Method used to create an object which will be transferred to a remote place.
* <p>
* This method is defined as <em>abstract</em> in class {@link GlobalOperations}
* to force the implementation in child classes. Implementation should return a
* {@link MemberOfLazyObjectReference} instance capable of initializing the
* local handle of the distributed collection on the remote place and return the
* "GLOBAL" member of this handle's local class.
*
* @return a {@link MemberOfLazyObjectReference} (left to programmer's
* good-will)
* @throws ObjectStreamException if such an exception is thrown during the
* process
*/
public Object writeReplace() throws ObjectStreamException {
final TeamedPlaceGroup pg1 = localHandle.placeGroup();
final GlobalID id1 = localHandle.id();
return new MemberOfLazyObjectReference<>(pg1, id1, () -> {
return lazyCreator.apply(pg1, id1);
}, (handle) -> {
return handle.global();
});
}
}