StarPU Handbook - StarPU Extensions
14. MPI Support

The integration of MPI transfers within task parallelism is done in a very natural way by the means of asynchronous interactions between the application and StarPU. This is implemented in a separate libstarpumpi library which basically provides "StarPU" equivalents of MPI_* functions, where void * buffers are replaced with starpu_data_handle_t, and all GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. Users have to use the usual mpirun command of the MPI implementation to start StarPU on the different MPI nodes.

An MPI Insert Task function provides an even more seamless transition to a distributed application, by automatically issuing all required data transfers according to the task graph and an application-provided distribution.

Some source codes are available in the directory mpi/.

14.1 Building with MPI support

If a mpicc compiler is already in your PATH, StarPU will automatically enable MPI support in the build. If mpicc is not in PATH, you can specify its location by passing –with-mpicc=/where/there/is/mpicc to ./configure

It can be useful to enable MPI tests during make check by passing –enable-mpi-check to ./configure. And similarly to mpicc, if mpiexec in not in PATH, you can specify its location by passing –with-mpiexec=/where/there/is/mpiexec to ./configure, but this is not needed if it is next to mpicc, configure will look there in addition to PATH.

Similarly, Fortran examples use mpif90, which can be specified manually with –with-mpifort if it can't be found automatically.

If users want to run several MPI processes by machine (e.g. one per NUMA node), STARPU_WORKERS_GETBIND needs to be left to its default value 1 to make StarPU take into account the binding set by the MPI launcher (otherwise each StarPU instance would try to bind on all cores of the machine...)

However, depending on the architecture of your machine, one may end up with StarPU-MPI nodes not having any CPU workers. If a node only gets 1 CPU, it will be bound to the MPI thread, and none will be left to start a CPU worker.

One can check that with the following commands.

$ mpirun -np 2 starpu_machine_display --worker CPU --count --notopology
1 CPU worker
1 CPU worker
$ mpirun -np 4 starpu_machine_display --worker CPU --count --notopology
4 CPU workers
4 CPU workers
4 CPU workers
4 CPU workers
$ mpirun --bind-to socket -np 2 starpu_machine_display --worker CPU --count --notopology
4 CPU workers
4 CPU workers
$ STARPU_WORKERS_GETBIND=0 mpirun -np 4 starpu_machine_display --worker CPU --count --notopology
4 CPU workers
4 CPU workers
4 CPU workers
4 CPU workers
$ STARPU_WORKERS_GETBIND=0 mpirun -np 2 starpu_machine_display --worker CPU --count --notopology
4 CPU workers
4 CPU workers

or with hwloc

mpirun --bind-to socket -np 2 hwloc-ls --restrict binding --no-io
mpirun -np 2 hwloc-ls --restrict binding --no-io

14.2 Example Used In This Documentation

The example below will be used as the base for this documentation. It initializes a token on node 0, and the token is passed from node to node, incremented by one on each step. The code is not using StarPU yet.

for (loop = 0; loop < nloops; loop++)
{
int tag = loop*size + rank;
if (loop == 0 && rank == 0)
{
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
}
else
{
MPI_Recv(&token, 1, MPI_INT, (rank+size-1)%size, tag, MPI_COMM_WORLD);
}
token++;
if (loop == last_loop && rank == last_rank)
{
fprintf(stdout, "Finished: token value %d\n", token);
}
else
{
MPI_Send(&token, 1, MPI_INT, (rank+1)%size, tag+1, MPI_COMM_WORLD);
}
}

14.3 About Not Using The MPI Support

Although StarPU provides MPI support, the application programmer may want to keep his MPI communications as they are for a start, and only delegate task execution to StarPU. This is possible by just using starpu_data_acquire(), for instance:

for (loop = 0; loop < nloops; loop++)
{
int tag = loop*size + rank;
/* Acquire the data to be able to write to it */
starpu_data_acquire(token_handle, STARPU_W);
if (loop == 0 && rank == 0)
{
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
}
else
{
MPI_Recv(&token, 1, MPI_INT, (rank+size-1)%size, tag, MPI_COMM_WORLD);
}
starpu_data_release(token_handle);
/* Task delegation to StarPU to increment the token. The execution might
* be performed on a CPU, a GPU, etc. */
increment_token();
/* Acquire the update data to be able to read from it */
starpu_data_acquire(token_handle, STARPU_R);
if (loop == last_loop && rank == last_rank)
{
fprintf(stdout, "Finished: token value %d\n", token);
}
else
{
MPI_Send(&token, 1, MPI_INT, (rank+1)%size, tag+1, MPI_COMM_WORLD);
}
starpu_data_release(token_handle);
}
void starpu_data_release(starpu_data_handle_t handle)
int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
@ STARPU_W
Definition: starpu_data.h:58
@ STARPU_R
Definition: starpu_data.h:57

In that case, libstarpumpi is not needed. One can also use MPI_Isend() and MPI_Irecv(), by calling starpu_data_release() after MPI_Wait() or MPI_Test() have notified completion.

It is however better to use libstarpumpi, to save the application from having to synchronize with starpu_data_acquire(), and instead just submit all tasks and communications asynchronously, and wait for the overall completion.

14.4 Simple Example

The flags required to compile or link against the MPI layer are accessible with the following commands:

$ pkg-config --cflags starpumpi-1.4  # options for the compiler
$ pkg-config --libs starpumpi-1.4    # options for the linker
void increment_token(void)
{
struct starpu_task *task = starpu_task_create();
task->cl = &increment_cl;
task->handles[0] = token_handle;
}
int main(int argc, char **argv)
{
int rank, size;
starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
unsigned nloops = NITER;
unsigned loop;
unsigned last_loop = nloops - 1;
unsigned last_rank = size - 1;
for (loop = 0; loop < nloops; loop++)
{
int tag = loop*size + rank;
if (loop == 0 && rank == 0)
{
starpu_data_acquire(token_handle, STARPU_W);
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
starpu_data_release(token_handle);
}
else
{
starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL, NULL);
}
increment_token();
if (loop == last_loop && rank == last_rank)
{
starpu_data_acquire(token_handle, STARPU_R);
fprintf(stdout, "Finished: token value %d\n", token);
starpu_data_release(token_handle);
}
else
{
starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL, NULL);
}
}
if (rank == last_rank)
{
fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
STARPU_ASSERT(token == nloops*size);
}
struct starpu_codelet * cl
Definition: starpu_task.h:708
starpu_data_handle_t handles[STARPU_NMAXBUFS]
Definition: starpu_task.h:785
struct starpu_task * starpu_task_create(void) STARPU_ATTRIBUTE_MALLOC
#define STARPU_MAIN_RAM
Definition: starpu_task.h:143
int starpu_task_wait_for_all(void)
int starpu_task_submit(struct starpu_task *task)
Definition: starpu_task.h:679
void starpu_vector_data_register(starpu_data_handle_t *handle, int home_node, uintptr_t ptr, uint32_t nx, size_t elemsize)
int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, void(*callback)(void *), void *arg)
int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, void(*callback)(void *), void *arg)
int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
int starpu_mpi_comm_size(MPI_Comm comm, int *size)
int starpu_mpi_shutdown(void)
int starpu_mpi_init_conf(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm, struct starpu_conf *conf)
#define STARPU_ASSERT(x)
Definition: starpu_util.h:240

We have here replaced MPI_Recv() and MPI_Send() with starpu_mpi_irecv_detached() and starpu_mpi_isend_detached(), which just submit the communication to be performed. The implicit sequential consistency dependencies provide synchronization between MPI reception and emission and the corresponding tasks. The only remaining synchronization with starpu_data_acquire() is at the beginning and the end.

The full source code is available in the file mpi/tests/ring.c.

14.5 How to Initialize StarPU-MPI

As seen in the previous example, one has to call starpu_mpi_init_conf() to initialize StarPU-MPI. The third parameter of the function indicates if MPI should be initialized by StarPU, or if the application did it itself. If the application initializes MPI itself, it must call MPI_Init_thread() with MPI_THREAD_SERIALIZED or MPI_THREAD_MULTIPLE, since StarPU-MPI uses a separate thread to perform the communications. MPI_THREAD_MULTIPLE is necessary if the application also performs some MPI communications.

14.6 Point To Point Communication

The standard point to point communications of MPI have been implemented. The semantic is similar to the MPI one, but adapted to the DSM provided by StarPU. An MPI request will only be submitted when the data is available in the main memory of the node submitting the request.

There are two types of asynchronous communications: the classic asynchronous communications and the detached communications. The classic asynchronous communications (starpu_mpi_isend() and starpu_mpi_irecv()) need to be followed by a call to starpu_mpi_wait() or to starpu_mpi_test() to wait for or to test the completion of the communication. As shown in the example mpi/tests/async_ring.c. Waiting for or testing the completion of detached communications is not possible, this is done internally by StarPU-MPI, on completion, the resources are automatically released. This mechanism is similar to the pthread detach state attribute, which determines whether a thread will be created in a joinable or a detached state.

For send communications, data is acquired with the mode STARPU_R. When using the configure option --enable-mpi-pedantic-isend, the mode STARPU_RW is used to make sure there is no more than 1 concurrent MPI_Isend() call accessing a data and StarPU does not read from it from tasks during the communication.

Internally, all communication are divided in 2 communications, a first message is used to exchange an envelope describing the data (i.e. its tag and its size), the data itself is sent in a second message. All MPI communications submitted by StarPU uses a unique tag, which has a default value. This value can be accessed with the function starpu_mpi_get_communication_tag() and changed with the function starpu_mpi_set_communication_tag(). The matching of tags with corresponding requests is done within StarPU-MPI.

For any userland communication, the call of the corresponding function (e.g. starpu_mpi_isend()) will result in the creation of a StarPU-MPI request, the function starpu_data_acquire_cb() is then called to asynchronously request StarPU to fetch the data in main memory; when the data is ready and the corresponding buffer has already been received by MPI, it will be copied in the memory of the data, otherwise the request is stored in the early requests list. Sending requests are stored in the ready requests list.

While requests need to be processed, the StarPU-MPI progression thread does the following:

  1. it polls the ready requests list. For all the ready requests, the appropriate function is called to post the corresponding MPI call. For example, an initial call to starpu_mpi_isend() will result in a call to MPI_Isend(). If the request is marked as detached, the request will then be added to the detached requests list.
  2. it posts an MPI_Irecv() to retrieve a data envelope.
  3. it polls the detached requests list. For all the detached requests, it tests its completion of the MPI request by calling MPI_Test(). On completion, the data handle is released, and if a callback was defined, it is called.
  4. finally, it checks if a data envelope has been received. If so, if the data envelope matches a request in the early requests list (i.e. the request has already been posted by the application), the corresponding MPI call is posted (similarly to the first step above).

    If the data envelope does not match any application request, a temporary handle is created to receive the data, a StarPU-MPI request is created and added into the ready requests list, and thus will be processed in the first step of the next loop.

To prevent putting too much pressure on the MPI library, only a limited number of requests are emitted concurrently. This behavior can be tuned with the environment variable STARPU_MPI_NDETACHED_SEND. In the same fashion, the progression thread will poll for termination of existing requests after submitting a defined number of requests. This behavior can be tuned with the environment variable STARPU_MPI_NREADY_PROCESS.

The function starpu_mpi_issend() allows to perform a synchronous-mode, non-blocking send of a data. It can also be specified when using starpu_mpi_task_insert() with the parameter STARPU_SSEND.

MPIPtpCommunication gives the list of all the point to point communications defined in StarPU-MPI.

14.7 Exchanging User Defined Data Interface

New data interfaces defined as explained in Defining A New Data Interface can also be used within StarPU-MPI and exchanged between nodes. Two functions needs to be defined through the type starpu_data_interface_ops. The function starpu_data_interface_ops::pack_data takes a handle and returns a contiguous memory buffer allocated with

starpu_malloc_flags(ptr, size, 0)
int starpu_malloc_flags(void **A, size_t dim, int flags)

along with its size, where data to be conveyed to another node should be copied.

static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
{
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
*count = complex_get_size(handle);
*ptr = starpu_malloc_on_node_flags(node, *count, 0);
memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary, complex_interface->nx*sizeof(double));
return 0;
}
uintptr_t starpu_malloc_on_node_flags(unsigned dst_node, size_t size, int flags)
void * starpu_data_get_interface_on_node(starpu_data_handle_t handle, unsigned memory_node)
unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle_t handle, unsigned memory_node)
struct _starpu_data_state * starpu_data_handle_t
Definition: starpu_data.h:44

The inverse operation is implemented in the function starpu_data_interface_ops::unpack_data which takes a contiguous memory buffer and recreates the data handle.

static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
{
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double), complex_interface->nx*sizeof(double));
starpu_free_on_node_flags(node, (uintptr_t) ptr, count, 0);
return 0;
}
void starpu_free_on_node_flags(unsigned dst_node, uintptr_t addr, size_t size, int flags)

And the starpu_data_interface_ops::peek_data operation does the same, but without freeing the buffer. Of course, one can implement starpu_data_interface_ops::unpack_data as merely calling starpu_data_interface_ops::peek_data and do the free:

static int complex_peek_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
{
STARPU_ASSERT(count == complex_get_size(handle));
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double), complex_interface->nx*sizeof(double));
return 0;
}
static struct starpu_data_interface_ops interface_complex_ops =
{
...
.pack_data = complex_pack_data,
.peek_data = complex_peek_data
.unpack_data = complex_unpack_data
};
int(* pack_data)(starpu_data_handle_t handle, unsigned node, void **ptr, starpu_ssize_t *count)
Definition: starpu_data_interfaces.h:647
Definition: starpu_data_interfaces.h:372

Instead of defining pack and unpack operations, users may want to attach an MPI type to their user-defined data interface. The function starpu_mpi_interface_datatype_register() allows doing so. This function takes 3 parameters: the interface ID for which the MPI datatype is going to be defined, a function's pointer that will create the MPI datatype, and a function's pointer that will free the MPI datatype. If for some data an MPI datatype can not be built (e.g. complex data structure), the creation function can return -1, StarPU-MPI will then fallback to using pack/unpack.

The functions to create and free the MPI datatype are defined and registered as follows.

void starpu_complex_interface_datatype_allocate(starpu_data_handle_t handle, MPI_Datatype *mpi_datatype)
{
int ret;
int blocklengths[2];
MPI_Aint displacements[2];
MPI_Datatype types[2] = {MPI_DOUBLE, MPI_DOUBLE};
struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
MPI_Get_address(complex_interface, displacements);
MPI_Get_address(&complex_interface->imaginary, displacements+1);
displacements[1] -= displacements[0];
displacements[0] = 0;
blocklengths[0] = complex_interface->nx;
blocklengths[1] = complex_interface->nx;
ret = MPI_Type_create_struct(2, blocklengths, displacements, types, mpi_datatype);
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_contiguous failed");
ret = MPI_Type_commit(mpi_datatype);
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
}
void starpu_complex_interface_datatype_free(MPI_Datatype *mpi_datatype)
{
MPI_Type_free(mpi_datatype);
}
static struct starpu_data_interface_ops interface_complex_ops =
{
...
};
starpu_mpi_interface_datatype_register(interface_complex_ops.interfaceid, starpu_complex_interface_datatype_allocate, starpu_complex_interface_datatype_free);
starpu_data_interface handle;
starpu_complex_data_register(&handle, STARPU_MAIN_RAM, real, imaginary, 2);
...
enum starpu_data_interface_id interfaceid
Definition: starpu_data_interfaces.h:607
int starpu_data_interface_get_next_id(void)
int starpu_mpi_interface_datatype_register(enum starpu_data_interface_id id, starpu_mpi_datatype_allocate_func_t allocate_datatype_func, starpu_mpi_datatype_free_func_t free_datatype_func)
#define STARPU_ASSERT_MSG(x, msg,...)
Definition: starpu_util.h:313

An example is provided in the file mpi/examples/user_datatype/my_interface.c.

It is also possible to use starpu_mpi_datatype_register() to register the functions through a handle rather than the interface ID, but note that in that case it is important to make sure no communication is going to occur before the function starpu_mpi_datatype_register() is called. This would otherwise produce an undefined result as the data may be received before the function is called, and so the MPI datatype would not be known by the StarPU-MPI communication engine, and the data would be processed with the pack and unpack operations. One would thus need to synchronize all nodes:

starpu_data_interface handle;
starpu_complex_data_register(&handle, STARPU_MAIN_RAM, real, imaginary, 2);
starpu_mpi_datatype_register(handle, starpu_complex_interface_datatype_allocate, starpu_complex_interface_datatype_free);
starpu_mpi_barrier(MPI_COMM_WORLD);
int starpu_mpi_barrier(MPI_Comm comm)
int starpu_mpi_datatype_register(starpu_data_handle_t handle, starpu_mpi_datatype_allocate_func_t allocate_datatype_func, starpu_mpi_datatype_free_func_t free_datatype_func)

14.8 MPI Insert Task Utility

To save the programmer from having to specify all communications, StarPU provides an "MPI Insert Task Utility". The principle is that the application decides a distribution of the data over the MPI nodes by allocating it and notifying StarPU of this decision, i.e. tell StarPU which MPI node "owns" which data. It also decides, for each handle, an MPI tag which will be used to exchange the content of the handle. All MPI nodes then process the whole task graph, and StarPU automatically determines which node actually execute which task, and trigger the required MPI transfers.

The list of functions is described in MPIInsertTask.

Here is an stencil example showing how to use starpu_mpi_task_insert(). One first needs to define a distribution function which specifies the locality of the data. Note that the data needs to be registered to MPI by calling starpu_mpi_data_register(). This function allows setting the distribution information and the MPI tag which should be used when communicating the data. It also allows to automatically clear the MPI communication cache when unregistering the data. A basic example is in the file mpi/tests/insert_task.c.

/* Returns the MPI node number where data is */
int my_distrib(int x, int y, int nb_nodes)
{
/* Block distrib */
return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
// /* Other examples useful for other kinds of computations */
// /* / distrib */
// return (x+y) % nb_nodes;
// /* Block cyclic distrib */
// unsigned side = sqrt(nb_nodes);
// return x % side + (y % side) * size;
}

Now the data can be registered within StarPU. Data which are not owned but will be needed for computations can be registered through the lazy allocation mechanism, i.e. with a home_node set to -1. StarPU will automatically allocate the memory when it is used for the first time.

One can note an optimization here (the else if test): we only register data which will be needed by the tasks that we will execute.

unsigned matrix[X][Y];
starpu_data_handle_t data_handles[X][Y];
for(x = 0; x < X; x++)
{
for (y = 0; y < Y; y++)
{
int mpi_rank = my_distrib(x, y, size);
if (mpi_rank == my_rank)
/* Owning data */
starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM, (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
|| my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
/* I don't own this index, but will need it for my computations */
starpu_variable_data_register(&data_handles[x][y], -1, (uintptr_t)NULL, sizeof(unsigned));
else
/* I know it's useless to allocate anything for this */
data_handles[x][y] = NULL;
if (data_handles[x][y])
{
starpu_mpi_data_register(data_handles[x][y], x*X+y, mpi_rank);
}
}
}
void starpu_variable_data_register(starpu_data_handle_t *handle, int home_node, uintptr_t ptr, size_t size)
#define starpu_mpi_data_register(data_handle, data_tag, rank)
Definition: starpu_mpi.h:534

Now starpu_mpi_task_insert() can be called for the different steps of the application.

for(loop=0 ; loop<niter; loop++)
for (x = 1; x < X-1; x++)
for (y = 1; y < Y-1; y++)
starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl,
STARPU_RW, data_handles[x][y],
STARPU_R, data_handles[x-1][y],
STARPU_R, data_handles[x+1][y],
STARPU_R, data_handles[x][y-1],
STARPU_R, data_handles[x][y+1],
0);
@ STARPU_RW
Definition: starpu_data.h:59
int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet,...)

The full source code is available in the file mpi/examples/stencil/stencil5.c.

I.e. all MPI nodes process the whole task graph, but as mentioned above, for each task, only the MPI node which owns the data being written to (here, data_handles[x][y]) will actually run the task. The other MPI nodes will automatically send the required data.

To tune the placement of tasks among MPI nodes, one can use STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA to specify an explicit node (an example can be found in mpi/tests/insert_task_node_choice.c), or the node of a given data (e.g. one of the parameters), or use starpu_mpi_node_selection_register_policy() and STARPU_NODE_SELECTION_POLICY to provide a dynamic policy (an example can be found in mpi/tests/policy_register.c). The default policy is to execute the task on the node which owns a data that require write access; if the task requires several data handles with write access, the node executing the task is selected in order to minimize the amount of data to transfer between nodes.

A function starpu_mpi_task_build() is also provided with the aim to only construct the task structure. All MPI nodes need to call the function, which posts the required send/recv on the various nodes as needed. Only the node which is to execute the task will then return a valid task structure, others will return NULL. This node must submit the task. All nodes then need to call the function starpu_mpi_task_post_build() – with the same list of arguments as starpu_mpi_task_build() – to post all the necessary data communications meant to happen after the task execution.

struct starpu_task *task;
task = starpu_mpi_task_build(MPI_COMM_WORLD, &cl,
STARPU_RW, data_handles[0],
STARPU_R, data_handles[1],
0);
if (task) starpu_task_submit(task);
starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,
STARPU_RW, data_handles[0],
STARPU_R, data_handles[1],
0);
struct starpu_task * starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *codelet,...)
int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet,...)

A full source code using these functions is available in the file mpi/tests/insert_task_compute.c.

It is also possible to create and submit the task outside of StarPU-MPI functions and call the functions starpu_mpi_task_exchange_data_before_execution() and starpu_mpi_task_exchange_data_after_execution() to exchange data as required by the data ownership's nodes.

struct starpu_data_descr descrs[2];
struct starpu_task *task;
task->cl = &mycodelet;
task->handles[0] = data_handles[0];
task->handles[1] = data_handles[1];
starpu_mpi_task_exchange_data_before_execution(MPI_COMM_WORLD, task, descrs, &params);
if (params.do_execute) starpu_task_submit(task);
starpu_mpi_task_exchange_data_after_execution(MPI_COMM_WORLD, descrs, 2, params);
Definition: starpu_task.h:664
int starpu_mpi_task_exchange_data_before_execution(MPI_Comm comm, struct starpu_task *task, struct starpu_data_descr *descrs, struct starpu_mpi_task_exchange_params *params)
int starpu_mpi_task_exchange_data_after_execution(MPI_Comm comm, struct starpu_data_descr *descrs, unsigned nb_data, struct starpu_mpi_task_exchange_params params)
Definition: starpu_mpi.h:705

A full source code using these functions is available in the file mpi/tests/mpi_task_submit.c.

If many data handles must be registered with unique tag ids, or if multiple applications are concurrently submitting tasks to StarPU, it is then difficult to keep the uniqueness of the tags for each piece of data. StarPU provides a tag management system to allocate/free a unique range of tags when registering the data to prevent conflict from one application to another. The previous code then becomes:

unsigned matrix[X][Y];
starpu_data_handle_t data_handles[X][Y];
int64_t mintag = starpu_mpi_tags_allocate(X*Y);
for(x = 0; x < X; x++)
{
for (y = 0; y < Y; y++)
{
...
if (data_handles[x][y])
{
starpu_mpi_data_register(data_handles[x][y], mintag + y*Y+x, mpi_rank);
}
}
}
int64_t starpu_mpi_tags_allocate(int64_t nbtags)

Then, when all these pieces of data have been unregistered, you may free the range of tags by calling:

void starpu_mpi_tags_free(int64_t mintag)

where mintag was the value returned by starpu_mpi_tags_allocate().

Note that both these functions should be called by all nodes involved in the computations in the exact same order and with the same parameters to keep the tags synchronized between all nodes.

Also note that StarPU will not check if a tag given to starpu_mpi_data_register() has been previously registered, this functionality only aims to prevent different parts of an application to use the same data tags.

14.9 Pruning MPI Task Insertion

Making all MPI nodes process the whole graph can be a concern with a growing number of nodes. To avoid this, the application can prune the task for loops according to the data distribution, to only submit tasks on nodes which have to care about them (either to execute them, or to send the required data).

A way to do some of this quite easily can be to just add an if like this:

for(loop=0 ; loop<niter; loop++)
for (x = 1; x < X-1; x++)
for (y = 1; y < Y-1; y++)
if (my_distrib(x,y,size) == my_rank
|| my_distrib(x-1,y,size) == my_rank
|| my_distrib(x+1,y,size) == my_rank
|| my_distrib(x,y-1,size) == my_rank
|| my_distrib(x,y+1,size) == my_rank)
starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl,
STARPU_RW, data_handles[x][y],
STARPU_R, data_handles[x-1][y],
STARPU_R, data_handles[x+1][y],
STARPU_R, data_handles[x][y-1],
STARPU_R, data_handles[x][y+1],
0);

This permits to drop the cost of function call argument passing and parsing.

An example is available in the file examples/stencil/implicit-stencil-tasks.c.

If the my_distrib function can be inlined by the compiler, the latter can improve the test.

If the size can be made a compile-time constant, the compiler can considerably improve the test further.

If the distribution function is not too complex and the compiler is very good, the latter can even optimize the for loops, thus dramatically reducing the cost of task submission.

To estimate quickly how long task submission takes, and notably how much pruning saves, a quick and easy way is to measure the submission time of just one of the MPI nodes. This can be achieved by running the application on just one MPI node with the following environment variables:

export STARPU_DISABLE_KERNELS=1
export STARPU_MPI_FAKE_RANK=2
export STARPU_MPI_FAKE_SIZE=1024

Here we have disabled the kernel function call to skip the actual computation time and only keep submission time, and we have asked StarPU to fake running on MPI node 2 out of 1024 nodes.

14.10 Temporary Data

To be able to use starpu_mpi_task_insert(), one has to call starpu_mpi_data_register(), so that StarPU-MPI can know what it needs to do for each data. Parameters of starpu_mpi_data_register() are normally the same on all nodes for a given data, so that all nodes agree on which node owns the data, and which tag is used to transfer its value.

It can however be useful to register e.g. some temporary data on just one node, without having to register a dumb handle on all nodes, while only one node will actually need to know about it. In this case, nodes which will not need the data can just pass NULL to starpu_mpi_task_insert():

starpu_data_handle_t data0 = NULL;
if (rank == 0)
{
starpu_variable_data_register(&data0, STARPU_MAIN_RAM, (uintptr_t) &val0, sizeof(val0));
starpu_mpi_data_register(data0, 0, rank);
}
starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, data0, 0); /* Executes on node 0 */

Here, nodes whose rank is not 0 will simply not take care of the data, and consider it to be on another node.

This can be mixed various way, for instance here node 1 determines that it does not have to care about data0, but knows that it should send the value of its data1 to node 0, which owns data and thus will need the value of data1 to execute the task:

starpu_data_handle_t data0 = NULL, data1, data;
if (rank == 0)
{
starpu_variable_data_register(&data0, STARPU_MAIN_RAM, (uintptr_t) &val0, sizeof(val0));
starpu_mpi_data_register(data0, -1, rank);
starpu_variable_data_register(&data1, -1, 0, sizeof(val1));
starpu_variable_data_register(&data, STARPU_MAIN_RAM, (uintptr_t) &val, sizeof(val));
}
else if (rank == 1)
{
starpu_variable_data_register(&data1, STARPU_MAIN_RAM, (uintptr_t) &val1, sizeof(val1));
starpu_variable_data_register(&data, -1, 0, sizeof(val));
}
starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, data, STARPU_R, data0, STARPU_R, data1, 0); /* Executes on node 0 */

The full source code is available in the file mpi/tests/temporary.c.

14.11 Per-node Data

Further than temporary data on just one node, one may want per-node data, to e.g. replicate some computation because that is less expensive than communicating the value over MPI:

starpu_data_handle pernode, data0, data1;
starpu_variable_data_register(&pernode, -1, 0, sizeof(val));
/* Normal data: one on node0, one on node1 */
if (rank == 0)
{
starpu_variable_data_register(&data0, STARPU_MAIN_RAM, (uintptr_t) &val0, sizeof(val0));
starpu_variable_data_register(&data1, -1, 0, sizeof(val1));
}
else if (rank == 1)
{
starpu_variable_data_register(&data0, -1, 0, sizeof(val1));
starpu_variable_data_register(&data1, STARPU_MAIN_RAM, (uintptr_t) &val1, sizeof(val1));
}
starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, pernode, 0); /* Will be replicated on all nodes */
starpu_mpi_task_insert(MPI_COMM_WORLD, &cl2, STARPU_RW, data0, STARPU_R, pernode); /* Will execute on node 0, using its own pernode*/
starpu_mpi_task_insert(MPI_COMM_WORLD, &cl2, STARPU_RW, data1, STARPU_R, pernode); /* Will execute on node 1, using its own pernode*/
#define STARPU_MPI_PER_NODE
Definition: starpu_mpi.h:519

One can turn a normal data into per-node data, by first broadcasting it to all nodes:

starpu_data_handle data;
starpu_variable_data_register(&data, -1, 0, sizeof(val));
/* Compute some value */
starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, data, 0); /* Node 0 computes it */
/* Get it on all nodes */
/* And turn it per-node */
#define starpu_mpi_data_set_rank(handle, rank)
Definition: starpu_mpi.h:563
void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle_t data_handle)

The data can then be used just like per-node above.

The full source code is available in the file mpi/tests/temporary.c.

14.12 Inter-node reduction

One might want to leverage a reduction pattern across several nodes. Using STARPU_REDUX (see DataReduction), one can obtain such patterns where each core on contributing nodes spawns their own copy to work with. In the case that the required reductions are too numerous and expensive, the access mode STARPU_MPI_REDUX tells StarPU to spawn only one contribution per contributing node.

The setup and use of STARPU_MPI_REDUX is similar to STARPU_REDUX : the initialization and reduction codelets should be declared through starpu_data_set_reduction_methods() in the same fashion as STARPU_REDUX. Example mpi/examples/mpi_redux/mpi_redux.c shows how to use the STARPU_MPI_REDUX mode and compare it with the standard STARPU_REDUX. The function starpu_mpi_redux_data() is automatically called either when a task reading the reduced handle is inserted through the MPI layer of StarPU through starpu_mpi_insert_task() or when users wait for all communications and tasks to be executed through starpu_mpi_wait_for_all(). The function can be called by users to fine-tune arguments such as the priority of the reduction tasks. Tasks contributing to the inter-node reduction should be registered as accessing the contribution through STARPU_RW|STARPU_COMMUTE mode, as for the STARPU_REDUX mode, as in the following example.

static struct starpu_codelet contrib_cl =
{
.cpu_funcs = {cpu_contrib}, /* cpu implementation(s) of the routine */
.nbuffers = 1, /* number of data handles referenced by this routine */
.modes = {STARPU_RW | STARPU_COMMUTE} /* access modes for the contribution */
.name = "contribution"
};
starpu_cpu_func_t cpu_funcs[STARPU_MAXIMPLEMENTATIONS]
Definition: starpu_task.h:410
Definition: starpu_task.h:334
@ STARPU_COMMUTE
Definition: starpu_data.h:101

When inserting these tasks, the access mode handed out to the StarPU-MPI layer should be STARPU_MPI_REDUX. If a task uses a data owned by node 0 and is executed on the node 1, it can be inserted as in the following example.

starpu_mpi_task_insert(MPI_COMM_WORLD, &contrib_cl, STARPU_MPI_REDUX, data, STARPU_EXECUTE_ON_NODE, 1); /* Node 1 computes it */
@ STARPU_MPI_REDUX
Definition: starpu_data.h:132
#define STARPU_EXECUTE_ON_NODE
Definition: starpu_task_util.h:79

Note that if the specified node is set to -1, the option is ignored.

More examples are available at mpi/examples/mpi_redux/mpi_redux.c and mpi/examples/mpi_redux/mpi_redux_tree.c.

14.13 Priorities

All send functions have a _prio variant which takes an additional priority parameter, which allows making StarPU-MPI change the order of MPI requests before submitting them to MPI. The default priority is 0.

When using the starpu_mpi_task_insert() helper, STARPU_PRIORITY defines both the task priority and the MPI requests priority. An example is available in the file mpi/examples/benchs/recv_wait_finalize_bench.c.

To test how much MPI priorities have a good effect on performance, you can set the environment variable STARPU_MPI_PRIORITIES to 0 to disable the use of priorities in StarPU-MPI.

14.14 MPI Cache Support

StarPU-MPI automatically optimizes duplicate data transmissions: if an MPI node B needs a piece of data D from MPI node A for several tasks, only one transmission of D will take place from A to B, and the value of D will be kept on B as long as no task modifies D.

If a task modifies D, B will wait for all tasks which need the previous value of D, before invalidating the value of D. As a consequence, it releases the memory occupied by D. Whenever a task running on B needs the new value of D, allocation will take place again to receive it.

Since tasks can be submitted dynamically, StarPU-MPI can not know whether the current value of data D will again be used by a newly-submitted task before being modified by another newly-submitted task, so until a task is submitted to modify the current value, it can not decide by itself whether to flush the cache or not. The application can however explicitly tell StarPU-MPI to flush the cache by calling starpu_mpi_cache_flush() or starpu_mpi_cache_flush_all_data(), for instance in case the data will not be used at all anymore (see for instance the cholesky example in mpi/examples/matrix_decomposition), or at least not in the close future. If a newly-submitted task actually needs the value again, another transmission of D will be initiated from A to B. A mere starpu_mpi_cache_flush_all_data() can for instance be added at the end of the whole algorithm, to express that no data will be reused after this (or at least that it is not interesting to keep them in cache). It may however be interesting to add fine-graph starpu_mpi_cache_flush() calls during the algorithm; the effect for the data deallocation will be the same, but it will additionally release some pressure from the StarPU-MPI cache hash table during task submission.

One can determine whether a piece of data is cached with starpu_mpi_cached_receive() and starpu_mpi_cached_send(). An example is available in the file mpi/examples/cache/cache.c.

Functions starpu_mpi_cached_receive_set() and starpu_mpi_cached_send_set() are automatically called by starpu_mpi_task_insert() but can also be called directly by the application. Functions starpu_mpi_cached_send_clear() and starpu_mpi_cached_receive_clear() must be called to clear data from the cache. They are also automatically called when using starpu_mpi_task_insert().

The whole caching behavior can be disabled thanks to the STARPU_MPI_CACHE environment variable. The variable STARPU_MPI_CACHE_STATS can be set to 1 to enable the runtime to display messages when data are added or removed from the cache holding the received data.

14.15 MPI Data Migration

The application can dynamically change its mind about the data distribution, to balance the load over MPI nodes, for instance. This can be done very simply by requesting an explicit move and then change the registered rank. For instance, we here switch to a new distribution function my_distrib2: we first register any data which wasn't registered already and will be needed, then migrate the data, and register the new location.

for(x = 0; x < X; x++)
{
for (y = 0; y < Y; y++)
{
int mpi_rank = my_distrib2(x, y, size);
if (!data_handles[x][y] && (mpi_rank == my_rank
|| my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
|| my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
/* Register newly-needed data */
starpu_variable_data_register(&data_handles[x][y], -1, (uintptr_t)NULL, sizeof(unsigned));
if (data_handles[x][y])
{
/* Migrate the data */
starpu_mpi_data_migrate(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
}
}
}
void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t handle, int new_rank)

The full example is available in the file mpi/examples/stencil/stencil5.c. From then on, further tasks submissions will use the new data distribution, which will thus change both MPI communications and task assignments.

Very importantly, since all nodes have to agree on which node owns which data to determine MPI communications and task assignments the same way, all nodes have to perform the same data migration, and at the same point among task submissions. It thus does not require a strict synchronization, just a clear separation of task submissions before and after the data redistribution.

Before data unregistration, it has to be migrated back to its original home node (the value, at least), since that is where the user-provided buffer resides. Otherwise, the unregistration will complain that it does not have the latest value on the original home node.

for(x = 0; x < X; x++)
{
for (y = 0; y < Y; y++)
{
if (data_handles[x][y])
{
int mpi_rank = my_distrib(x, y, size);
/* Get back data to original place where the user-provided buffer is. */
starpu_mpi_data_migrate(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
/* And unregister it */
starpu_data_unregister(data_handles[x][y]);
}
}
}
void starpu_data_unregister(starpu_data_handle_t handle)

14.16 MPI Collective Operations

The functions are described in MPICollectiveOperations.

if (rank == root)
{
/* Allocate the vector */
vector = malloc(nblocks * sizeof(float *));
for(x=0 ; x<nblocks ; x++)
{
starpu_malloc((void **)&vector[x], block_size*sizeof(float));
}
}
/* Allocate data handles and register data to StarPU */
data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
for(x = 0; x < nblocks ; x++)
{
int mpi_rank = my_distrib(x, nodes);
if (rank == root)
{
starpu_vector_data_register(&data_handles[x], STARPU_MAIN_RAM, (uintptr_t)vector[x], blocks_size, sizeof(float));
}
else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1)))
{
/* I own this index, or i will need it for my computations */
starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL, block_size, sizeof(float));
}
else
{
/* I know it's useless to allocate anything for this */
data_handles[x] = NULL;
}
if (data_handles[x])
{
starpu_mpi_data_register(data_handles[x], x*nblocks+y, mpi_rank);
}
}
/* Scatter the matrix among the nodes */
starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD, NULL, NULL, NULL, NULL);
/* Calculation */
for(x = 0; x < nblocks ; x++)
{
if (data_handles[x])
{
int owner = starpu_data_get_rank(data_handles[x]);
if (owner == rank)
{
starpu_task_insert(&cl, STARPU_RW, data_handles[x], 0);
}
}
}
/* Gather the matrix on main node */
starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD, NULL, NULL, NULL, NULL);
int nodes[STARPU_NMAXBUFS]
Definition: starpu_task.h:571
int starpu_task_insert(struct starpu_codelet *cl,...)
int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void(*scallback)(void *), void *sarg, void(*rcallback)(void *), void *rarg)
#define starpu_data_get_rank
Definition: starpu_mpi.h:578
int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void(*scallback)(void *), void *sarg, void(*rcallback)(void *), void *rarg)
int starpu_malloc(void **A, size_t dim)

An example is available in mpi/tests/mpi_scatter_gather.c.

With NewMadeleine (see Using the NewMadeleine communication library), broadcasts can automatically be detected and be optimized by using routing trees. This behavior can be controlled with the environment variable STARPU_MPI_COOP_SENDS. See the corresponding paper for more information.

Other collective operations would be easy to define, just ask starpu-devel for them!

14.17 Make StarPU-MPI Progression Thread Execute Tasks

The default behavior of StarPU-MPI is to spawn an MPI thread to take care only of MPI communications in an active fashion (i.e. the StarPU-MPI thread sleeps only when there are no active request submitted by the application), with the goal of being as reactive as possible to communications. Knowing that, users usually leave one free core for the MPI thread when starting a distributed execution with StarPU-MPI. However, this could result in a loss of performance for applications that does not require an extreme reactivity to MPI communications.

The starpu_mpi_init_conf() routine allows users to give the starpu_conf configuration structure of StarPU (usually given to the starpu_init() routine) to StarPU-MPI, so that StarPU-MPI reserves for its own use one of the CPU drivers of the current computing node, or one of the CPU cores, and then calls starpu_init() internally.

This allows the MPI communication thread to call a StarPU CPU driver to run tasks when there is no active requests to take care of, and thus recover the computational power of the "lost" core. Since there is a trade-off between executing tasks and polling MPI requests, which is how much the application wants to lose in reactivity to MPI communications to get back the computing power of the core dedicated to the StarPU-MPI thread, there are two environment variables to pilot the behavior of the MPI thread so that users can tune this trade-off depending on the behavior of the application.

The STARPU_MPI_DRIVER_CALL_FREQUENCY environment variable sets how many times the MPI progression thread goes through the MPI_Test() loop on each active communication request (and thus try to make communications progress by going into the MPI layer) before executing tasks. The default value for this environment variable is 0, which means that the support for interleaving task execution and communication polling is deactivated, thus returning the MPI progression thread to its original behavior.

The STARPU_MPI_DRIVER_TASK_FREQUENCY environment variable sets how many tasks are executed by the MPI communication thread before checking all active requests again. While this environment variable allows a better use of the core dedicated to StarPU-MPI for computations, it also decreases the reactivity of the MPI communication thread as much.

14.18 Debugging MPI

Communication trace will be enabled when the environment variable STARPU_MPI_COMM is set to 1, and StarPU has been configured with the option --enable-verbose.

Statistics will be enabled for the communication cache when the environment variable STARPU_MPI_CACHE_STATS is set to 1. It prints messages on the standard output when data are added or removed from the received communication cache.

When the environment variable STARPU_MPI_STATS is set to 1, StarPU will display at the end of the execution for each node the volume and the bandwidth of data sent to all the other nodes. Communication statistics can also be enabled and disabled from the application by calling the functions starpu_mpi_comm_stats_enable() and starpu_mpi_comm_stats_disable(). If communication statictics have been enabled, calling the function starpu_mpi_comm_stats_retrieve() will give the amout of communications between the calling node and all the other nodes. Communication statistics will also be automatically displayed at the end of the execution, as examplified below.

[starpu_comm_stats][3] TOTAL:   476.000000 B    0.000454 MB      0.000098 B/s    0.000000 MB/s
[starpu_comm_stats][3:0]        248.000000 B    0.000237 MB      0.000051 B/s    0.000000 MB/s
[starpu_comm_stats][3:2]        50.000000 B     0.000217 MB      0.000047 B/s    0.000000 MB/s

[starpu_comm_stats][2] TOTAL:   288.000000 B    0.000275 MB      0.000059 B/s    0.000000 MB/s
[starpu_comm_stats][2:1]        70.000000 B     0.000103 MB      0.000022 B/s    0.000000 MB/s
[starpu_comm_stats][2:3]        288.000000 B    0.000172 MB      0.000037 B/s    0.000000 MB/s

[starpu_comm_stats][1] TOTAL:   188.000000 B    0.000179 MB      0.000038 B/s    0.000000 MB/s
[starpu_comm_stats][1:0]        80.000000 B     0.000114 MB      0.000025 B/s    0.000000 MB/s
[starpu_comm_stats][1:2]        188.000000 B    0.000065 MB      0.000014 B/s    0.000000 MB/s

[starpu_comm_stats][0] TOTAL:   376.000000 B    0.000359 MB      0.000077 B/s    0.000000 MB/s
[starpu_comm_stats][0:1]        376.000000 B    0.000141 MB      0.000030 B/s    0.000000 MB/s
[starpu_comm_stats][0:3]        10.000000 B     0.000217 MB      0.000047 B/s    0.000000 MB/s

These statistics can be plotted as heatmaps using the StarPU tool starpu_mpi_comm_matrix.py, this will produce 2 PDF files, one plot for the bandwidth, and one plot for the data volume.

Bandwidth Heatmap
Data Bandwidth Heatmap

14.19 More MPI examples

MPI examples are available in the StarPU source code in mpi/examples:

  • comm shows how to use communicators with StarPU-MPI
  • complex is a simple example using a user-define data interface over MPI (complex numbers),
  • stencil5 is a simple stencil example using starpu_mpi_task_insert(),
  • matrix_decomposition is a cholesky decomposition example using starpu_mpi_task_insert(). The non-distributed version can check for <algorithm correctness in 1-node configuration, the distributed version uses exactly the same source code, to be used over MPI,
  • mpi_lu is an LU decomposition example, provided in three versions: plu_example uses explicit MPI data transfers, plu_implicit_example uses implicit MPI data transfers, plu_outofcore_example uses implicit MPI data transfers and supports data matrices which do not fit in memory (out-of-core).

14.20 Using the NewMadeleine communication library

NewMadeleine (see https://pm2.gitlabpages.inria.fr/newmadeleine/, part of the PM2 project) is an optimizing communication library for high-performance networks. NewMadeleine provides its own interface, but also an MPI interface (called MadMPI). Thus, there are two possibilities to use NewMadeleine with StarPU:

  • using the NewMadeleine's native interface. StarPU supports this interface from its release 1.3.0, by enabling the configure option --enable-nmad. In this case, StarPU relies directly on NewMadeleine to make communications progress and NewMadeleine has to be built with the profile pukabi+madmpi.conf.
  • using the NewMadeleine's MPI interface (MadMPI). StarPU will use the standard MPI API and NewMadeleine will handle the calls to the MPI API. In this case, StarPU makes communications progress and thus communication progress has to be disabled in NewMadeleine by compiling it with the profile pukabi+madmpi-mini.conf.

To build NewMadeleine, download the latest version from the website (or, better, use the Git version to use the most recent version), then:

cd pm2/scripts
./pm2-build-packages ./<the profile you chose> --prefix=<installation prefix>

With Guix, the NewMadeleine's native interface can be used by setting the parameter --with-input=openmpi=nmad and MadMPI can be used with --with-input=openmpi=nmad-mini.

Whatever implementation (NewMadeleine or MadMPI) is used by StarPU, the public MPI interface of StarPU (described in MPI Support) is the same.

14.21 MPI Master Slave Support

StarPU provides another way to execute applications across many nodes. The Master Slave support permits to use remote cores without thinking about data distribution. This support can be activated with the configure option --enable-mpi-master-slave. However, you should not activate both MPI support and MPI Master-Slave support.

The existing kernels for CPU devices can be used as such. They only have to be exposed through the name of the function in the starpu_codelet::cpu_funcs_name field. Functions have to be globally-visible (i.e. not static) for StarPU to be able to look them up, and -rdynamic must be passed to gcc (or -export-dynamic to ld) so that symbols of the main program are visible.

By default, one core is dedicated on the master node to manage the entire set of slaves. If the implementation of MPI you are using has a good multiple threads support, you can set the STARPU_MPI_MS_MULTIPLE_THREAD environment variable to 1 to dedicate one core per slave.

Choosing the number of cores on each slave device is done by setting the environment variable STARPU_NMPIMSTHREADS=<number> with <number> being the requested number of cores. By default, all the slave's cores are used.

Setting the number of slaves nodes is done by changing the -np parameter when executing the application with mpirun or mpiexec.

The master node is by default the node with the MPI rank equal to 0. To select another node, use the environment variable STARPU_MPI_MASTER_NODE=<number> with <number> being the requested MPI rank node.

A simple example tests/main/insert_task.c can be used to test the MPI master slave support.

14.22 MPI Checkpoint Support

StarPU provides an experimental checkpoint mechanism. It is for now only a proof of concept to see what the checkpointing cost is, since the restart part has not been integrated yet.

To enable checkpointing, you should use the configure option --enable-mpi-ft. The application in the directory mpi/examples/matrix_decomposition shows how to enable checkpoints. The API documentation is available in MPI Fault Tolerance Support

Statistics can also be enabled with the configure option --enable-mpi-ft-stats.