Reducer.java
/*******************************************************************************
* Copyright (c) 2021 Handy Tools for Distributed Computing (HanDist) project.
*
* This program and the accompanying materials are made available to you under
* the terms of the Eclipse Public License 1.0 which accompanies this
* distribution,
* and is available at https://www.eclipse.org/legal/epl-v10.html
*
* SPDX-License-Identifier: EPL-1.0
******************************************************************************/
package handist.collections.dist;
import java.io.Serializable;
import java.util.HashMap;
import apgas.util.GlobalID;
import mpi.Datatype;
import mpi.MPI;
import mpi.Op;
import mpi.User_function;
/**
* Reducer object. This object provides an abstract reduction operation on some
* data element T. Implementations should provide the {@link #reduce(Object)},
* {@link #merge(Reducer)} and {@link #newReducer()} methods.
* <p>
* As an example implementation, we provide the example below which performs the
* sum over {@link Integer}s.
*
* <pre>
* class SumReduction extends Reducer<SumReduction, Integer> {
* int runningSum = 0;
*
* public void merge(SumReduction r) {
* runningSum += r.runningSum;
* }
*
* public void reduce(Integer i) {
* runningSum += i;
* }
*
* public void newReducer() {
* return new SumReduction();
* }
* }
* </pre>
*
* @author Patrick Finnerty
*
* @param <R> the implementing type itself, this is used to correctly implement
* reflective methods (methods that act on the implementing type
* itself)
* @param <T> the type from which data is acquired to compute the reduction
*/
public abstract class Reducer<R extends Reducer<R, T>, T> extends User_function implements Serializable {
/** Serial Version UID */
private static final long serialVersionUID = 956660189595987110L;
/**
* The reduction operations which have been registered for use with the MPI
* reduce calls. The canonical name of the class implementing the reduction
* operation is used as key in this map. As calls to methods
* {@link #globalReduction(TeamedPlaceGroup, GlobalID)} and
* {@link #teamReduction(TeamedPlaceGroup)} are made, new entries are
* initialized and added to this map to be reused when a new reduction using the
* same Reducer implementation is made.
*/
private static transient HashMap<String, Op> registeredUserOperations = new HashMap<>();
/**
* This method is called by the MPI runtime to make the reduction happen between
* objects. You do not need to override this method, and do so at your own
* peril.
*/
@SuppressWarnings("unchecked")
@Override
public void Call(Object firstOperandArray, int firstOperandArrayOffset, Object secondOperandAndResultArray,
int secondOperandArrayOffset, int count, Datatype datatype) {
// The argument casts are somewhat tedious but Java.lang.ClassCastException
// are thrown if we try to perform then in one go.
final Object[] ov = (Object[]) secondOperandAndResultArray;
final Object[] iv = (Object[]) firstOperandArray;
for (int i = secondOperandArrayOffset,
j = firstOperandArrayOffset; i < count + secondOperandArrayOffset; i++, j++) {
final R result = (R) ov[i];
final R operand = (R) iv[j];
result.merge(operand);
}
}
/**
* Performs the global reduction of this instance. This method needs to be
* called when the reduction operation took place over data distributed across
* multiple hosts. In such a case, the reducer will have one instance per host
* in which the local result if contained. This method is used to merge the
* results of all the local reducers and make them reflect the global result of
* the reduction.
* <p>
* An implementation does not need to override this method. However, users
* should make sure that method {@link #merge(Reducer)} was appropriately
* implemented. Otherwise, problems may arise.
*
* @param placeGroup the group of places on which "local" reductions have been
* performed
* @param gid global id under which each local instance is registered
* @throws IllegalStateException if this instance is not allowed to perform
* global reductions
*/
/*
* This method needs improvements. More likely class Reducer needs some
* modifications to handle global reduction operations that are not performed
* under the GLB. Currently, the registering of a local Reducer instance needs
* to be performed manually by the programmer, which does not satisfactorily
* provides the global reduction features.
*/
@SuppressWarnings("deprecation")
public void globalReduction(TeamedPlaceGroup placeGroup, GlobalID gid) {
if (placeGroup == null || gid == null) {
throw new IllegalArgumentException("Method Reducer#globalReduction does not tolerate null parameters");
}
final int reductionRank = placeGroup.myrank;
placeGroup.broadcastFlat(() -> {
// Retrieve the local object through the global id
@SuppressWarnings("unchecked")
final Reducer<R, T> local = (Reducer<R, T>) gid.getHere();
// Prepare the reduction through the MPI reduceAll call
// 1. We need to make sure that the reducer instance was registered in the
// operations
Op mpiOperation;
final String s = local.getClass().getCanonicalName();
synchronized (registeredUserOperations) {
if (!registeredUserOperations.containsKey(s)) {
mpiOperation = new Op(local.newReducer(), true);
registeredUserOperations.put(s, mpiOperation);
} else {
mpiOperation = registeredUserOperations.get(s);
}
}
// 2. Prepare the local instances into an Object Array
final Object[] buffer = new Object[1];
buffer[0] = local;
// 3. Make the MPI AllReduce call
placeGroup.comm.Reduce(buffer, 0, buffer, 0, 1, MPI.OBJECT, mpiOperation, reductionRank);
});
}
/**
* Merges the partial reduction given as parameter into this instance. This
* method is used by the library when the reduction is being parallelized and
* multiple instances of the implementing type obtained through method
* {@link #newReducer()} have been used and the partial result of each of these
* instances need to be combined into a single instance.
*
* @param reducer another instance of the implementing class whose partial
* reduction needs to be merged into this instance
*/
public abstract void merge(R reducer);
/**
* Creates a new R instance and returns it. The returned element should be the
* neutral element of the reduction operation being implemented, that is,
* calling method {@link #merge(Reducer)} with the object returned by this
* method without there being any call to {@link #reduce(Object)} should not
* influence the result of the reduction, or throw any exception.
*
* @return a new R instance which is going to be used to parallelize the
* reduction
*/
public abstract R newReducer();
/**
* Takes the object given as parameter and performs the reduction operation this
* object implements.
*
* @param input an instance of the object on which the reduction operation is
* taking place
*/
public abstract void reduce(T input);
/**
* This
*
* @param placeGroup into which this instance is participating
*/
@SuppressWarnings("deprecation")
public R teamReduction(TeamedPlaceGroup placeGroup) {
if (placeGroup == null) {
throw new IllegalStateException("This Reducer is not allowed to perform any global reduction");
}
// 1. We need to make sure that the reducer instance was registered in the
// operations that MPI can handle
Op mpiOperation;
// The getCanonicalName method returns the name of the implementing class, not
// "handist.collections.dist.Reducer".
final String s = this.getClass().getCanonicalName();
synchronized (registeredUserOperations) {
if (!registeredUserOperations.containsKey(s)) {
mpiOperation = new Op(this.newReducer(), true);
registeredUserOperations.put(s, mpiOperation);
} else {
mpiOperation = registeredUserOperations.get(s);
}
}
// 2. Prepare the local instances into an Object Array
final Object[] buffer = new Object[1];
buffer[0] = this;
// Make the MPI AllReduce call
placeGroup.comm.Allreduce(buffer, 0, buffer, 0, 1, MPI.OBJECT, mpiOperation);
return (R) buffer[0];
}
}