Skip to content

Commit c6d6bf0

Browse files
implement a simple aggregation scheme
This aggregation scheme is intended to allow OpenMPI to transfer larger messages if the user-reported partitions are too small or too high in number. This is achieved by using an internal partitioning where each internal (transfer) partition corresponds to one or multiple user-reported partitions. The scheme provides a method to mark a user partition as ready, that optionally returns a transfer partition that is ready. This is implemented by associating each transfer partition with an atomic counter, tracking the number of corresponding pready-calls already made. If a counter reaches the number of corresponding user-partitions, the corresponding transfer partition is returned. This implementation is thread-safe. Signed-off-by: Axel Schneewind <axel.schneewind@hlrs.de>
1 parent 0fcac7a commit c6d6bf0

File tree

3 files changed

+180
-1
lines changed

3 files changed

+180
-1
lines changed

ompi/mca/part/persist_aggregated/Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ local_sources = \
3737
part_persist_aggregated_request.h \
3838
part_persist_aggregated_request.c \
3939
part_persist_aggregated_sendreq.h \
40-
part_persist_aggregated_sendreq.c
40+
part_persist_aggregated_sendreq.c \
41+
schemes/part_persist_aggregated_scheme_regular.h \
42+
schemes/part_persist_aggregated_scheme_regular.c
4143

4244
mcacomponentdir = $(ompilibdir)
4345
mcacomponent_LTLIBRARIES = $(component_install)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2024 High Performance Computing Center Stuttgart,
3+
* University of Stuttgart. All rights reserved.
4+
* $COPYRIGHT$
5+
*
6+
* Additional copyrights may follow
7+
*
8+
* $HEADER$
9+
*
10+
*/
11+
12+
#include "part_persist_aggregated_scheme_regular.h"
13+
14+
#include <stdlib.h>
15+
#include <string.h>
16+
17+
// converts the index of a public partition to the index of its corresponding internal partition
18+
static int internal_partition(struct part_persist_aggregation_state *state, int public_part)
19+
{
20+
return public_part / state->aggregation_count;
21+
}
22+
23+
void part_persist_aggregate_regular_init(struct part_persist_aggregation_state *state,
24+
int internal_partition_count, int factor,
25+
int last_internal_partition_size)
26+
{
27+
state->public_partition_count = (internal_partition_count - 1) * factor
28+
+ last_internal_partition_size;
29+
state->internal_partition_count = internal_partition_count;
30+
31+
// number of user-partitions per internal partition (except for the last one)
32+
state->aggregation_count = factor;
33+
// number of user-partitions corresponding to the last internal partition
34+
state->last_internal_partition_size = last_internal_partition_size;
35+
36+
// initialize counters
37+
state->public_parts_ready = (opal_atomic_int32_t *) calloc(state->internal_partition_count,
38+
sizeof(opal_atomic_uint32_t));
39+
}
40+
41+
void part_persist_aggregate_regular_reset(struct part_persist_aggregation_state *state)
42+
{
43+
// reset flags
44+
if (NULL != state->public_parts_ready) {
45+
memset((void *) state->public_parts_ready, 0,
46+
state->internal_partition_count * sizeof(opal_atomic_uint32_t));
47+
}
48+
}
49+
50+
static inline int is_last_partition(struct part_persist_aggregation_state *state, int partition)
51+
{
52+
return (partition == state->internal_partition_count - 1);
53+
}
54+
55+
static inline int num_public_parts(struct part_persist_aggregation_state *state, int partition)
56+
{
57+
return is_last_partition(state, partition) ? state->last_internal_partition_size
58+
: state->aggregation_count;
59+
}
60+
61+
void part_persist_aggregate_regular_pready(struct part_persist_aggregation_state *state,
62+
int partition, int *available_partition)
63+
{
64+
int internal_part = internal_partition(state, partition);
65+
66+
// this is the new value (after adding)
67+
int count = opal_atomic_add_fetch_32(&state->public_parts_ready[internal_part], 1);
68+
69+
// push to buffer if internal partition is ready
70+
if (count == num_public_parts(state, internal_part)) {
71+
*available_partition = internal_part;
72+
} else {
73+
*available_partition = -1;
74+
}
75+
}
76+
77+
void part_persist_aggregate_regular_free(struct part_persist_aggregation_state *state)
78+
{
79+
if (state->public_parts_ready != NULL)
80+
free((void *) state->public_parts_ready);
81+
state->public_parts_ready = NULL;
82+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2+
/*
3+
* Copyright (c) 2024 High Performance Computing Center Stuttgart,
4+
* University of Stuttgart. All rights reserved.
5+
* $COPYRIGHT$
6+
*
7+
* Additional copyrights may follow
8+
*
9+
* $HEADER$
10+
*
11+
*/
12+
13+
/**
14+
* @file
15+
* This file defines a simple message aggregation scheme:
16+
* A user-provided partitioning into n partitions is mapped
17+
* to an internal partitioning of ceil(n/k) partitions where
18+
* each internal partition corresponds to k public ones
19+
* (with the last partition potentially having a lower size).
20+
* The factor k can be defined to optimize the internal
21+
* number/size of internal partitions.
22+
*/
23+
24+
#ifndef PART_PERSIST_AGGREGATED_SCHEME_REGULAR_H
25+
#define PART_PERSIST_AGGREGATED_SCHEME_REGULAR_H
26+
27+
#include "ompi_config.h"
28+
29+
#include "opal/include/opal/sys/atomic.h"
30+
31+
/**
32+
* @brief tracks the number of pready calls corresponding to internal partitions
33+
*
34+
*/
35+
struct part_persist_aggregation_state {
36+
// counters for each internal partition
37+
opal_atomic_int32_t *public_parts_ready;
38+
39+
// number of public partitions
40+
int public_partition_count;
41+
42+
// number of internal partitions
43+
int internal_partition_count;
44+
45+
// how many public partitions are aggregated into an internal one
46+
int aggregation_count;
47+
// number of public partitions corresponding to last internal partition
48+
int last_internal_partition_size;
49+
};
50+
51+
/**
52+
* @brief initializes the aggregation state
53+
*
54+
* @param[out] state pointer to aggregation state object
55+
* @param[in] internal_partition_count number of internal partitions (i.e. number of messages
56+
* per partitioned transfer)
57+
* @param[in] factor number of public partitions corresponding to each
58+
* internal one other than the last
59+
* @param[in] last_internal_partition_size number of public partitions corresponding to the last
60+
* internal partition internal partition
61+
*/
62+
OMPI_DECLSPEC void part_persist_aggregate_regular_init(struct part_persist_aggregation_state *state,
63+
int internal_partition_count, int factor,
64+
int last_internal_partition_size);
65+
66+
/**
67+
* @brief resets the aggregation state
68+
*
69+
* @param[out] state pointer to aggregation state object
70+
*/
71+
OMPI_DECLSPEC void
72+
part_persist_aggregate_regular_reset(struct part_persist_aggregation_state *state);
73+
74+
/**
75+
* @brief marks a public partition as ready and optionally outputs an internal partition that can be
76+
* sent.
77+
*
78+
* @param[in,out] state pointer to aggregation state object
79+
* @param[in] partition index of the public partition to mark ready
80+
* @param[out] available_partition index of the corresponding internal partition if it is ready,
81+
* otherwise -1
82+
*/
83+
OMPI_DECLSPEC void
84+
part_persist_aggregate_regular_pready(struct part_persist_aggregation_state *state, int partition,
85+
int *available_partition);
86+
87+
/**
88+
* @brief destroys the aggregation scheme
89+
*
90+
* @param[in,out] state pointer to aggregation state object
91+
*/
92+
OMPI_DECLSPEC void
93+
part_persist_aggregate_regular_free(struct part_persist_aggregation_state *state);
94+
95+
#endif

0 commit comments

Comments
 (0)