GCC Code Coverage Report


Directory: src/
File: src/talp/talp_mpi.c
Date: 2025-11-21 10:34:40
Exec Total Coverage
Lines: 43 43 100.0%
Functions: 5 5 100.0%
Branches: 17 24 70.8%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2025 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "talp/talp_mpi.h"
21
22 #include "LB_comm/shmem_talp.h"
23 #include "LB_core/node_barrier.h"
24 #include "LB_core/spd.h"
25 #include "apis/dlb_talp.h"
26 #include "support/atomic.h"
27 #include "support/debug.h"
28 #include "support/mask_utils.h"
29 #include "talp/regions.h"
30 #include "talp/talp.h"
31 #include "talp/talp_record.h"
32 #include "talp/talp_types.h"
33 #ifdef MPI_LIB
34 #include "mpi/mpi_core.h"
35 #endif
36
37 #include <stdio.h>
38 #include <string.h>
39
40 extern __thread bool thread_is_observer;
41
42
43 #ifdef MPI_LIB
44 /* Communicate among all MPI processes so that everyone has the same monitoring regions */
45 static void talp_register_common_mpi_regions(const subprocess_descriptor_t *spd) {
46 /* Note: there's a potential race condition if this function is called
47 * (which happens on talp_mpi_finalize or talp_finalize) while another
48 * thread creates a monitoring region. The solution would be to lock the
49 * entire routine and call a specialized registering function that does not
50 * lock, or use a recursive lock. The situation is strange enough to not
51 * support it */
52
53 talp_info_t *talp_info = spd->talp_info;
54
55 /* Warn about open regions */
56 for (GSList *node = talp_info->open_regions;
57 node != NULL;
58 node = node->next) {
59 const dlb_monitor_t *monitor = node->data;
60 warning("Region %s is still open during MPI_Finalize."
61 " Collected data may be incomplete.",
62 monitor->name);
63 }
64
65 /* Gather recvcounts for each process
66 * (Each process may have different number of monitors) */
67 int nregions = g_tree_nnodes(talp_info->regions);
68 int chars_to_send = nregions * DLB_MONITOR_NAME_MAX;
69 int *recvcounts = malloc(_mpi_size * sizeof(int));
70 PMPI_Allgather(&chars_to_send, 1, MPI_INT,
71 recvcounts, 1, MPI_INT, getWorldComm());
72
73 /* Compute total characters to gather via MPI */
74 int i;
75 int total_chars = 0;
76 for (i=0; i<_mpi_size; ++i) {
77 total_chars += recvcounts[i];
78 }
79
80 if (total_chars > 0) {
81 /* Prepare sendbuffer */
82 char *sendbuffer = malloc(nregions * DLB_MONITOR_NAME_MAX * sizeof(char));
83 char *sendptr = sendbuffer;
84 for (GTreeNode *node = g_tree_node_first(talp_info->regions);
85 node != NULL;
86 node = g_tree_node_next(node)) {
87 const dlb_monitor_t *monitor = g_tree_node_value(node);
88 strcpy(sendptr, monitor->name);
89 sendptr += DLB_MONITOR_NAME_MAX;
90 }
91
92 /* Prepare recvbuffer */
93 char *recvbuffer = malloc(total_chars * sizeof(char));
94
95 /* Compute displacements */
96 int *displs = malloc(_mpi_size * sizeof(int));
97 int next_disp = 0;
98 for (i=0; i<_mpi_size; ++i) {
99 displs[i] = next_disp;
100 next_disp += recvcounts[i];
101 }
102
103 /* Gather all regions */
104 PMPI_Allgatherv(sendbuffer, nregions * DLB_MONITOR_NAME_MAX, MPI_CHAR,
105 recvbuffer, recvcounts, displs, MPI_CHAR, getWorldComm());
106
107 /* Register all regions. Existing ones will be skipped. */
108 for (i=0; i<total_chars; i+=DLB_MONITOR_NAME_MAX) {
109 region_register(spd, &recvbuffer[i]);
110 }
111
112 free(sendbuffer);
113 free(recvbuffer);
114 free(displs);
115 }
116
117 free(recvcounts);
118 }
119 #endif
120
121
122 /*********************************************************************************/
123 /* TALP MPI functions */
124 /*********************************************************************************/
125
126 /* Start global monitoring region (if not already started) */
127 6 void talp_mpi_init(const subprocess_descriptor_t *spd) {
128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 ensure(!thread_is_observer, "An observer thread cannot call talp_mpi_init");
129
130 6 talp_info_t *talp_info = spd->talp_info;
131
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (talp_info) {
132 6 talp_info->flags.have_mpi = true;
133
134 /* Start global region (no-op if already started) */
135 6 region_start(spd, talp_info->monitor);
136
137 /* Add MPI_Init statistic and set useful state */
138 6 talp_sample_t *sample = talp_get_thread_sample(spd);
139 6 DLB_ATOMIC_ADD_RLX(&sample->stats.num_mpi_calls, 1);
140 6 talp_set_sample_state(sample, useful, talp_info->flags.papi);
141 }
142 6 }
143
144 /* Stop global monitoring region and gather APP data if needed */
145 6 void talp_mpi_finalize(const subprocess_descriptor_t *spd) {
146
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 ensure(!thread_is_observer, "An observer thread cannot call talp_mpi_finalize");
147 6 talp_info_t *talp_info = spd->talp_info;
148
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (talp_info) {
149
150 #ifdef MPI_LIB
151 /* We also need to measure the waiting time of an MPI_Finalize.
152 * For this, we call an MPI_Barrier and the appropriate TALP functions.
153 * The num_mpi_calls variable is also incremented inside those. */
154 talp_into_sync_call(spd, /* is_blocking_collective */ true);
155 PMPI_Barrier(getWorldComm());
156 talp_out_of_sync_call(spd, /* is_blocking_collective */ true);
157 #else
158 /* Add MPI_Finalize to the number of MPI calls.
159 * Even though talp_mpi_finalize should never be called if no MPI_LIB,
160 * we keep this case for testing purposes. */
161 6 talp_sample_t *sample = talp_get_thread_sample(spd);
162 6 DLB_ATOMIC_ADD_RLX(&sample->stats.num_mpi_calls, 1);
163 #endif
164
165 /* Stop global region */
166 6 region_stop(spd, talp_info->monitor);
167
168 6 monitor_data_t *monitor_data = talp_info->monitor->_data;
169
170 /* Update shared memory values */
171
3/4
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
6 if (talp_info->flags.have_shmem || talp_info->flags.have_minimal_shmem) {
172 // TODO: is it needed? isn't it updated when stopped?
173 1 shmem_talp__set_times(monitor_data->node_shared_id,
174 1 talp_info->monitor->mpi_time,
175 1 talp_info->monitor->useful_time);
176 }
177
178 #ifdef MPI_LIB
179 /* If performing any kind of TALP summary, check that the number of processes
180 * registered in the shared memory matches with the number of MPI processes in the node.
181 * This check is needed to avoid deadlocks on finalize. */
182 if (spd->options.talp_summary) {
183 verbose(VB_TALP, "Gathering TALP metrics");
184 /* FIXME: use Named Barrier */
185 /* if (shmem_barrier__get_num_participants(spd->options.barrier_id) == _mpis_per_node) { */
186
187 /* Gather data among processes in the node if node summary is enabled */
188 if (spd->options.talp_summary & SUMMARY_NODE) {
189 talp_record_node_summary(spd);
190 }
191
192 /* Gather data among MPIs if any of these summaries is enabled */
193 if (spd->options.talp_summary
194 & (SUMMARY_POP_METRICS | SUMMARY_PROCESS)) {
195 /* Ensure everyone has the same monitoring regions */
196 talp_register_common_mpi_regions(spd);
197
198 /* Finally, reduce data */
199 for (GTreeNode *node = g_tree_node_first(talp_info->regions);
200 node != NULL;
201 node = g_tree_node_next(node)) {
202 const dlb_monitor_t *monitor = g_tree_node_value(node);
203 if (spd->options.talp_summary & SUMMARY_POP_METRICS) {
204 talp_record_pop_summary(spd, monitor);
205 }
206 if (spd->options.talp_summary & SUMMARY_PROCESS) {
207 talp_record_process_summary(spd, monitor);
208 }
209 }
210 }
211
212 /* Synchronize all processes in node before continuing with DLB finalization */
213 node_barrier(spd, NULL);
214 /* } else { */
215 /* warning("The number of MPI processes and processes registered in DLB differ." */
216 /* " TALP will not print any summary."); */
217 /* } */
218 }
219 #endif
220 }
221 6 }
222
223 /* Decide whether to update the current sample (most likely and cheaper)
224 * or to aggregate all samples.
225 * We will only aggregate all samples if the external profiler is enabled
226 * and this MPI call is a blocking collective call. */
227 7 static inline void update_sample_on_sync_call(const subprocess_descriptor_t *spd,
228 const talp_info_t *talp_info, talp_sample_t *sample, bool is_blocking_collective) {
229
230
4/4
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 2 times.
7 if (!talp_info->flags.external_profiler || !is_blocking_collective) {
231 /* Likely scenario, just update the sample */
232 5 talp_update_sample(sample, talp_info->flags.papi, TALP_NO_TIMESTAMP);
233 } else {
234 /* If talp_info->flags.external_profiler && is_blocking_collective:
235 * aggregate samples and update all monitoring regions */
236 2 talp_flush_samples_to_regions(spd);
237 }
238 7 }
239
240 8 void talp_into_sync_call(const subprocess_descriptor_t *spd, bool is_blocking_collective) {
241 /* Observer threads may call MPI functions, but TALP must ignore them */
242
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
8 if (unlikely(thread_is_observer)) return;
243
244 7 const talp_info_t *talp_info = spd->talp_info;
245
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7 if (talp_info) {
246 /* Update sample */
247 7 talp_sample_t *sample = talp_get_thread_sample(spd);
248 7 talp_update_sample(sample, talp_info->flags.papi, TALP_NO_TIMESTAMP);
249
250 /* Into Sync call -> not_useful_mpi */
251 7 talp_set_sample_state(sample, not_useful_mpi, talp_info->flags.papi);
252 }
253 }
254
255 8 void talp_out_of_sync_call(const subprocess_descriptor_t *spd, bool is_blocking_collective) {
256 /* Observer threads may call MPI functions, but TALP must ignore them */
257
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
8 if (unlikely(thread_is_observer)) return;
258
259 7 const talp_info_t *talp_info = spd->talp_info;
260
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7 if (talp_info) {
261 /* Update sample (and maybe flush) */
262 7 talp_sample_t *sample = talp_get_thread_sample(spd);
263 7 DLB_ATOMIC_ADD_RLX(&sample->stats.num_mpi_calls, 1);
264 7 update_sample_on_sync_call(spd, talp_info, sample, is_blocking_collective);
265
266 /* Out of Sync call -> useful */
267 7 talp_set_sample_state(sample, useful, talp_info->flags.papi);
268 }
269 }
270