GCC Code Coverage Report


Directory: src/
File: src/talp/talp_mpi.c
Date: 2026-03-27 16:05:46
Exec Total Coverage
Lines: 43 43 100.0%
Functions: 5 5 100.0%
Branches: 21 32 65.6%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2025 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "talp/talp_mpi.h"
21
22 #include "LB_comm/shmem_talp.h"
23 #include "LB_core/node_barrier.h"
24 #include "LB_core/spd.h"
25 #include "apis/dlb_talp.h"
26 #include "support/atomic.h"
27 #include "support/debug.h"
28 #include "support/mask_utils.h"
29 #include "talp/regions.h"
30 #include "talp/talp.h"
31 #include "talp/talp_record.h"
32 #include "talp/talp_types.h"
33 #ifdef MPI_LIB
34 #include "mpi/mpi_core.h"
35 #endif
36
37 #include <stdio.h>
38 #include <string.h>
39
40 extern __thread bool thread_is_observer;
41
42
43 #ifdef MPI_LIB
44 /* Communicate among all MPI processes so that everyone has the same monitoring regions */
45 static void talp_register_common_mpi_regions(const subprocess_descriptor_t *spd) {
46 /* Note: there's a potential race condition if this function is called
47 * (which happens on talp_mpi_finalize or talp_finalize) while another
48 * thread creates a monitoring region. The solution would be to lock the
49 * entire routine and call a specialized registering function that does not
50 * lock, or use a recursive lock. The situation is strange enough to not
51 * support it */
52
53 talp_info_t *talp_info = spd->talp_info;
54
55 /* Warn about open regions */
56 for (GSList *node = talp_info->open_regions;
57 node != NULL;
58 node = node->next) {
59 const dlb_monitor_t *monitor = node->data;
60 warning("Region %s is still open during MPI_Finalize."
61 " Collected data may be incomplete.",
62 monitor->name);
63 }
64
65 /* Gather recvcounts for each process
66 * (Each process may have different number of monitors) */
67 int nregions = g_tree_nnodes(talp_info->regions);
68 int chars_to_send = nregions * DLB_MONITOR_NAME_MAX;
69 int *recvcounts = malloc(_mpi_size * sizeof(int));
70 PMPI_Allgather(&chars_to_send, 1, MPI_INT,
71 recvcounts, 1, MPI_INT, getWorldComm());
72
73 /* Compute total characters to gather via MPI */
74 int i;
75 int total_chars = 0;
76 for (i=0; i<_mpi_size; ++i) {
77 total_chars += recvcounts[i];
78 }
79
80 if (total_chars > 0) {
81 /* Prepare sendbuffer */
82 char *sendbuffer = malloc(nregions * DLB_MONITOR_NAME_MAX * sizeof(char));
83 char *sendptr = sendbuffer;
84 for (GTreeNode *node = g_tree_node_first(talp_info->regions);
85 node != NULL;
86 node = g_tree_node_next(node)) {
87 const dlb_monitor_t *monitor = g_tree_node_value(node);
88 strcpy(sendptr, monitor->name);
89 sendptr += DLB_MONITOR_NAME_MAX;
90 }
91
92 /* Prepare recvbuffer */
93 char *recvbuffer = malloc(total_chars * sizeof(char));
94
95 /* Compute displacements */
96 int *displs = malloc(_mpi_size * sizeof(int));
97 int next_disp = 0;
98 for (i=0; i<_mpi_size; ++i) {
99 displs[i] = next_disp;
100 next_disp += recvcounts[i];
101 }
102
103 /* Gather all regions */
104 PMPI_Allgatherv(sendbuffer, nregions * DLB_MONITOR_NAME_MAX, MPI_CHAR,
105 recvbuffer, recvcounts, displs, MPI_CHAR, getWorldComm());
106
107 /* Register all regions. Existing ones will be skipped. */
108 for (i=0; i<total_chars; i+=DLB_MONITOR_NAME_MAX) {
109 region_register(spd, &recvbuffer[i]);
110 }
111
112 free(sendbuffer);
113 free(recvbuffer);
114 free(displs);
115 }
116
117 free(recvcounts);
118 }
119 #endif
120
121
122 /*********************************************************************************/
123 /* TALP MPI functions */
124 /*********************************************************************************/
125
126 /* Start global monitoring region (if not already started) */
127 9 void talp_mpi_init(const subprocess_descriptor_t *spd) {
128
129
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 ensure(!thread_is_observer, "An observer thread cannot call talp_mpi_init");
130
131 9 talp_info_t *talp_info = spd->talp_info;
132
133
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (talp_info) {
134 9 talp_info->flags.have_mpi = true;
135
136 /* Start global region (no-op if already started) */
137 9 region_start(spd, talp_info->monitor);
138
139 /* Add MPI_Init statistic and set useful state */
140 9 talp_sample_t *sample = talp_get_thread_sample(spd);
141 9 DLB_ATOMIC_ADD_RLX(&sample->stats.num_mpi_calls, 1);
142 9 talp_set_sample_state(spd, sample, TALP_STATE_USEFUL);
143 }
144 9 }
145
146 /* Stop global monitoring region and gather APP data if needed */
147 6 void talp_mpi_finalize(const subprocess_descriptor_t *spd) {
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 ensure(!thread_is_observer, "An observer thread cannot call talp_mpi_finalize");
150
151 6 talp_info_t *talp_info = spd->talp_info;
152
153
2/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6 times.
6 if (talp_info == NULL || !talp_info->flags.have_mpi) return;
154
155 #ifdef MPI_LIB
156 /* We also need to measure the waiting time of an MPI_Finalize.
157 * For this, we call an MPI_Barrier and the appropriate TALP functions.
158 * The num_mpi_calls variable is also incremented inside those. */
159 talp_into_sync_call(spd, /* is_blocking_collective */ true);
160 PMPI_Barrier(getWorldComm());
161 talp_out_of_sync_call(spd, /* is_blocking_collective */ true);
162 #else
163 /* Add MPI_Finalize to the number of MPI calls.
164 * Even though talp_mpi_finalize should never be called if no MPI_LIB,
165 * we keep this case for testing purposes. */
166 6 talp_sample_t *sample = talp_get_thread_sample(spd);
167 6 DLB_ATOMIC_ADD_RLX(&sample->stats.num_mpi_calls, 1);
168 #endif
169
170 /* Stop global region */
171 6 region_stop(spd, talp_info->monitor);
172
173 6 monitor_data_t *monitor_data = talp_info->monitor->_data;
174
175 /* Update shared memory values */
176
3/4
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
6 if (talp_info->flags.have_shmem || talp_info->flags.have_minimal_shmem) {
177 // TODO: is it needed? isn't it updated when stopped?
178 1 shmem_talp__set_times(monitor_data->node_shared_id,
179 1 talp_info->monitor->mpi_time,
180 1 talp_info->monitor->useful_time);
181 }
182
183 /* If TALP partial output is enabled, metrics are not merged here.
184 * Output is written per process in talp_finalize() */
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (spd->options.talp_partial_output) return;
186
187 #ifdef MPI_LIB
188 /* If performing any kind of TALP summary, check that the number of processes
189 * registered in the shared memory matches with the number of MPI processes in the node.
190 * This check is needed to avoid deadlocks on finalize. */
191 if (spd->options.talp_summary) {
192 verbose(VB_TALP, "Gathering TALP metrics");
193 /* FIXME: use Named Barrier */
194 /* if (shmem_barrier__get_num_participants(spd->options.barrier_id) == _mpis_per_node) { */
195
196 /* Gather data among processes in the node if node summary is enabled */
197 if (spd->options.talp_summary & SUMMARY_NODE) {
198 talp_record_node_summary(spd);
199 }
200
201 /* Gather data among MPIs if any of these summaries is enabled */
202 if (spd->options.talp_summary
203 & (SUMMARY_POP_METRICS | SUMMARY_PROCESS)) {
204 /* Ensure everyone has the same monitoring regions */
205 talp_register_common_mpi_regions(spd);
206
207 /* Finally, reduce data */
208 for (GTreeNode *node = g_tree_node_first(talp_info->regions);
209 node != NULL;
210 node = g_tree_node_next(node)) {
211 const dlb_monitor_t *monitor = g_tree_node_value(node);
212 if (spd->options.talp_summary & SUMMARY_POP_METRICS) {
213 talp_record_pop_summary(spd, monitor);
214 }
215 if (spd->options.talp_summary & SUMMARY_PROCESS) {
216 talp_record_process_summary(spd, monitor);
217 }
218 }
219 }
220
221 /* Synchronize all processes in node before continuing with DLB finalization */
222 node_barrier(spd, NULL);
223 /* } else { */
224 /* warning("The number of MPI processes and processes registered in DLB differ." */
225 /* " TALP will not print any summary."); */
226 /* } */
227 }
228 #endif
229 }
230
231 /* Decide whether to update the current sample (most likely and cheaper)
232 * or to aggregate all samples.
233 * We will only aggregate all samples if the external profiler is enabled
234 * and this MPI call is a blocking collective call. */
235 7 static inline void update_sample_on_sync_call(const subprocess_descriptor_t *spd,
236 const talp_info_t *talp_info, talp_sample_t *sample, bool is_blocking_collective) {
237
238
4/4
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 2 times.
7 if (!talp_info->flags.external_profiler || !is_blocking_collective) {
239 /* Likely scenario, just update the sample */
240 5 talp_update_sample(spd, sample, TALP_NO_TIMESTAMP);
241 } else {
242 /* If talp_info->flags.external_profiler && is_blocking_collective:
243 * aggregate samples and update all monitoring regions */
244 2 talp_flush_samples_to_regions(spd);
245 }
246 7 }
247
248 8 void talp_into_sync_call(const subprocess_descriptor_t *spd, bool is_blocking_collective) {
249
250 /* Observer threads may call MPI functions, but TALP must ignore them */
251
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
8 if (unlikely(thread_is_observer)) return;
252
253 7 const talp_info_t *talp_info = spd->talp_info;
254
255
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7 times.
7 if (talp_info == NULL || !talp_info->flags.have_mpi) return;
256
257 /* Update sample */
258 7 talp_sample_t *sample = talp_get_thread_sample(spd);
259 7 talp_update_sample(spd, sample, TALP_NO_TIMESTAMP);
260
261 /* Into Sync call -> not_useful_mpi */
262 7 talp_set_sample_state(spd, sample, TALP_STATE_NOT_USEFUL_MPI);
263 }
264
265 8 void talp_out_of_sync_call(const subprocess_descriptor_t *spd, bool is_blocking_collective) {
266
267 /* Observer threads may call MPI functions, but TALP must ignore them */
268
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
8 if (unlikely(thread_is_observer)) return;
269
270 7 const talp_info_t *talp_info = spd->talp_info;
271
272
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7 times.
7 if (talp_info == NULL || !talp_info->flags.have_mpi) return;
273
274 /* Update sample (and maybe flush) */
275 7 talp_sample_t *sample = talp_get_thread_sample(spd);
276 7 DLB_ATOMIC_ADD_RLX(&sample->stats.num_mpi_calls, 1);
277 7 update_sample_on_sync_call(spd, talp_info, sample, is_blocking_collective);
278
279 /* Out of Sync call -> useful */
280 7 talp_set_sample_state(spd, sample, TALP_STATE_USEFUL);
281 }
282