GCC Code Coverage Report


Directory: src/
File: src/LB_comm/shmem_async.c
Date: 2025-11-21 10:34:40
Exec Total Coverage
Lines: 193 202 95.5%
Functions: 15 15 100.0%
Branches: 100 129 77.5%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2024 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "LB_comm/shmem_async.h"
21
22 #include "LB_core/spd.h"
23 #include "LB_comm/shmem.h"
24 #include "LB_comm/shmem_cpuinfo.h"
25 #include "LB_comm/shmem_procinfo.h"
26 #include "LB_numThreads/numThreads.h"
27 #include "apis/dlb_errors.h"
28 #include "support/mask_utils.h"
29 #include "support/debug.h"
30
31 #include <sched.h>
32 #include <unistd.h>
33 #include <errno.h>
34 #include <pthread.h>
35 #include <string.h>
36
37 enum { NOBODY = 0 };
38 enum { QUEUE_SIZE = 100 };
39
40 typedef enum HelperAction {
41 ACTION_NONE = 0,
42 ACTION_ENABLE_CPU,
43 ACTION_ENABLE_CPU_SET,
44 ACTION_DISABLE_CPU,
45 ACTION_DISABLE_CPU_SET,
46 ACTION_SET_CPU_SET,
47 ACTION_SET_NUM_CPUS,
48 ACTION_JOIN
49 } action_t;
50
51 typedef enum HelperStatus {
52 HELPER_UNKOWN_STATUS = 0,
53 HELPER_WAITING,
54 HELPER_BUSY,
55 } status_t;
56
57 typedef struct Message {
58 action_t action;
59 int cpuid;
60 int ncpus;
61 cpu_set_t cpu_set;
62 } message_t;
63
64 typedef struct {
65 /* Queue attributes */
66 message_t queue[QUEUE_SIZE];
67 unsigned int q_head;
68 unsigned int q_tail;
69 pthread_mutex_t q_lock;
70 pthread_cond_t q_wait_data;
71
72 /* Helper metadata */
73 pid_t pid;
74 pthread_t pth;
75 cpu_set_t mask;
76 const pm_interface_t *pm;
77 bool joinable;
78 status_t status;
79 } helper_t;
80
81
82 typedef struct {
83 bool initialized;
84 int max_helpers; // capacity
85 int num_helpers; // size
86 helper_t helpers[0];
87 } shdata_t;
88
89 enum { SHMEM_ASYNC_VERSION = 5 };
90
91 static int max_helpers = 0;
92 static shdata_t *shdata = NULL;
93 static const char *shmem_name = "async";
94 static shmem_handler_t *shm_handler = NULL;
95 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
96 static int subprocesses_attached = 0;
97
98 346 static helper_t* get_helper(pid_t pid) {
99
1/2
✓ Branch 0 taken 346 times.
✗ Branch 1 not taken.
346 int num_helpers = shdata ? shdata->num_helpers : 0;
100
2/4
✓ Branch 0 taken 581 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 581 times.
✗ Branch 3 not taken.
581 for (int h = 0; shdata && h < num_helpers; ++h) {
101
2/2
✓ Branch 0 taken 346 times.
✓ Branch 1 taken 235 times.
581 if (shdata->helpers[h].pid == pid) {
102 346 return &shdata->helpers[h];
103 }
104 }
105 return NULL;
106 }
107
108 199 static void enqueue_message(helper_t *helper, const message_t *message) {
109 /* Discard message if helper does not accept new inputs */
110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
199 if (helper->joinable) {
111 return;
112 }
113
114 199 pthread_mutex_lock(&helper->q_lock);
115
116 /* If ACTION_JOIN, flag helper to reject further messages */
117
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 170 times.
199 if (message->action == ACTION_JOIN) {
118 29 helper->joinable = true;
119 }
120
121 /* Get next_head index and check that buffer is not full */
122 199 unsigned int next_head = (helper->q_head + 1) % QUEUE_SIZE;
123
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
199 if (__builtin_expect((next_head == helper->q_tail), 0)) {
124 pthread_mutex_unlock(&helper->q_lock);
125 fatal("Max petitions requested for asynchronous thread");
126 }
127
128 /* Enqueue message and update head */
129
2/2
✓ Branch 0 taken 25 times.
✓ Branch 1 taken 174 times.
199 verbose(VB_ASYNC, "Writing message %d, head is now %d", helper->q_head, next_head);
130 199 helper->queue[helper->q_head] = *message;
131 199 helper->q_head = next_head;
132
133 /* Signal helper */
134 199 pthread_cond_signal(&helper->q_wait_data);
135
136 199 pthread_mutex_unlock(&helper->q_lock);
137 }
138
139 199 static void dequeue_message(helper_t *helper, message_t *message) {
140 199 pthread_mutex_lock(&helper->q_lock);
141
142 /* Block until there's some message in the queue */
143 199 helper->status = HELPER_WAITING;
144
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 199 times.
290 while (helper->q_head == helper->q_tail) {
145 91 pthread_cond_wait(&helper->q_wait_data, &helper->q_lock);
146 }
147 199 helper->status = HELPER_BUSY;
148
149 /* Dequeue message and update tail */
150 199 unsigned int next_tail = (helper->q_tail + 1) % QUEUE_SIZE;
151
2/2
✓ Branch 0 taken 25 times.
✓ Branch 1 taken 174 times.
199 verbose(VB_ASYNC, "Reading message %d, tail is now %d", helper->q_tail, next_tail);
152 199 *message = helper->queue[helper->q_tail];
153 199 helper->q_tail = next_tail;
154
155 199 pthread_mutex_unlock(&helper->q_lock);
156 199 }
157
158 29 static void* thread_start(void *arg) {
159 29 spd_enter_dlb(thread_spd);
160 29 helper_t *helper = arg;
161 29 const pm_interface_t* const pm = helper->pm;
162 29 pthread_setaffinity_np(helper->pth, sizeof(cpu_set_t), &helper->mask);
163
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 19 times.
29 verbose(VB_ASYNC, "Helper thread started, pinned to %s", mu_to_str(&helper->mask));
164
165 29 bool join = false;
166
3/4
✓ Branch 0 taken 199 times.
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 29 times.
228 while(!join || helper->q_head != helper->q_tail) {
167 message_t message;
168 199 dequeue_message(helper, &message);
169
170 199 int error = 0;
171
6/9
✗ Branch 0 not taken.
✓ Branch 1 taken 109 times.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 10 times.
✓ Branch 4 taken 7 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 40 times.
✓ Branch 7 taken 29 times.
✗ Branch 8 not taken.
199 switch(message.action) {
172 case ACTION_NONE:
173 verbose(VB_ASYNC, "Helper thread attending petition: NONE");
174 break;
175 109 case ACTION_ENABLE_CPU:
176
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 107 times.
109 verbose(VB_ASYNC,
177 "Helper thread attending petition: ENABLE %d",
178 message.cpuid);
179 109 error = enable_cpu(pm, message.cpuid);
180 109 break;
181 4 case ACTION_ENABLE_CPU_SET:
182
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 verbose(VB_ASYNC, "Helper thread attending petition: ENABLE_CPU_SET %s",
183 mu_to_str(&message.cpu_set));
184 4 error = enable_cpu_set(pm, &message.cpu_set);
185 4 break;
186 10 case ACTION_DISABLE_CPU:
187
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 4 times.
10 verbose(VB_ASYNC, "Helper thread attending petition: DISABLE %d",
188 message.cpuid);
189 10 error = disable_cpu(pm, message.cpuid);
190 10 break;
191 7 case ACTION_DISABLE_CPU_SET:
192
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 2 times.
7 verbose(VB_ASYNC, "Helper thread attending petition: DISABLE_CPU_SET %s",
193 mu_to_str(&message.cpu_set));
194 7 error = disable_cpu_set(pm, &message.cpu_set);
195 7 break;
196 case ACTION_SET_CPU_SET:
197 break;
198 40 case ACTION_SET_NUM_CPUS:
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 verbose(VB_ASYNC, "Helper thread attending petition: SET_NUM_CPUS %d",
200 message.ncpus);
201 40 error = update_threads(pm, message.ncpus);
202 40 break;
203 29 case ACTION_JOIN:
204 29 join = true;
205 29 break;
206 }
207 if (error) {
208 // error ?
209 }
210 }
211
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 19 times.
29 verbose(VB_ASYNC, "Helper thread finalizing");
212 29 return NULL;
213 }
214
215 1 static void cleanup_shmem(void *shdata_ptr, int pid) {
216 1 shdata_t *shared_data = shdata_ptr;
217 int h;
218
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
9 for (h = 0; h < max_helpers; ++h) {
219
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
8 if (shared_data->helpers[h].pid == pid) {
220 1 shared_data->helpers[h] = (const helper_t){};
221 }
222 }
223 1 }
224
225 35 int shmem_async_init(pid_t pid, const pm_interface_t *pm, const cpu_set_t *process_mask,
226 const char *shmem_key, int shmem_size_multiplier) {
227
228 35 int error = DLB_SUCCESS;
229
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 25 times.
35 verbose(VB_ASYNC, "Creating helper thread");
230
231 // Shared memory creation
232 35 pthread_mutex_lock(&mutex);
233 {
234
2/2
✓ Branch 0 taken 21 times.
✓ Branch 1 taken 14 times.
35 if (shm_handler == NULL) {
235 21 max_helpers = mu_get_system_size() * shmem_size_multiplier;
236 42 shm_handler = shmem_init((void**)&shdata,
237 21 &(const shmem_props_t) {
238 21 .size = shmem_async__size(),
239 .name = shmem_name,
240 .key = shmem_key,
241 .version = SHMEM_ASYNC_VERSION,
242 .cleanup_fn = cleanup_shmem,
243 });
244 21 subprocesses_attached = 1;
245 } else {
246 14 ++subprocesses_attached;
247 }
248 }
249 35 pthread_mutex_unlock(&mutex);
250
251 35 helper_t *helper = NULL;
252 // Lock shmem to register new subprocess
253 35 shmem_lock(shm_handler);
254 {
255
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 21 times.
35 if (!shdata->initialized) {
256 14 shdata->initialized = true;
257 14 shdata->num_helpers = 0;
258 14 shdata->max_helpers = max_helpers;
259 } else {
260
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 20 times.
21 if (shdata->max_helpers != max_helpers) {
261 1 error = DLB_ERR_INIT;
262 }
263 }
264
265
4/4
✓ Branch 0 taken 173 times.
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 172 times.
✓ Branch 3 taken 1 times.
178 for (int h = 0; h < max_helpers && !error; ++h) {
266 // Register helper
267
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 143 times.
172 if (shdata->helpers[h].pid == NOBODY) {
268 29 helper = &shdata->helpers[h];
269
270 /* Initialize queue structure */
271 29 memset(helper->queue, 0, sizeof(helper->queue));
272 29 helper->q_head = 0;
273 29 helper->q_tail = 0;
274
275 /* Initialize queue sync attributes */
276 pthread_mutexattr_t mutex_attr;
277
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_mutexattr_init(&mutex_attr) );
278
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_mutexattr_setpshared(&mutex_attr,
279 PTHREAD_PROCESS_SHARED) );
280
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_mutex_init(&helper->q_lock, &mutex_attr) );
281
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_mutexattr_destroy(&mutex_attr) );
282 pthread_condattr_t cond_attr;
283
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_condattr_init(&cond_attr) );
284
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_condattr_setpshared(&cond_attr,
285 PTHREAD_PROCESS_SHARED) );
286
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_cond_init(&helper->q_wait_data, &cond_attr) );
287
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 fatal_cond_strerror( pthread_condattr_destroy(&cond_attr) );
288
289 // Initialize helper metadata and create thread
290 29 helper->pm = pm;
291 29 helper->pid = pid;
292 29 memcpy(&helper->mask, process_mask, sizeof(cpu_set_t));
293 29 pthread_create(&helper->pth, NULL, thread_start, (void*)helper);
294
295 29 ++shdata->num_helpers;
296 29 break;
297 }
298 }
299 }
300 35 shmem_unlock(shm_handler);
301
302
4/4
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 29 times.
✓ Branch 2 taken 5 times.
✓ Branch 3 taken 1 times.
35 if (helper == NULL && error == DLB_SUCCESS) {
303 5 error = DLB_ERR_NOMEM;
304 5 warn_error(DLB_ERR_NOMEM);
305 }
306
307
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 34 times.
35 if (error == DLB_ERR_INIT) {
308 1 warning("Cannot attach to shmem_async because existing size differ."
309 " Existing shmem size: %d, expected: %d."
310 " Check for DLB_ARGS consistency among processes or clean up shared memory.",
311 1 shdata->max_helpers, max_helpers);
312 }
313
314
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 29 times.
35 if (error < DLB_SUCCESS) {
315 6 shmem_finalize(shm_handler, NULL);
316 6 shm_handler = NULL;
317 6 shdata = NULL;
318 }
319
320 35 return error;
321 }
322
323 29 int shmem_async_finalize(pid_t pid) {
324
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 19 times.
29 verbose(VB_ASYNC, "Finalizing helper thread for pid: %d", pid);
325 29 helper_t *helper = get_helper(pid);
326
1/2
✓ Branch 0 taken 29 times.
✗ Branch 1 not taken.
29 if (helper) {
327 /* Enqueue JOIN message */
328 29 message_t message = { .action = ACTION_JOIN };
329 29 enqueue_message(helper, &message);
330
331 /* Wait helper thread to finish */
332 29 pthread_join(helper->pth, NULL);
333
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 19 times.
29 verbose(VB_ASYNC, "Helper thread joined");
334
335 /* Clear helper data */
336 29 shmem_lock(shm_handler);
337 {
338 29 pthread_mutex_destroy(&helper->q_lock);
339 29 pthread_cond_destroy(&helper->q_wait_data);
340 29 memset(helper, 0, sizeof(*helper));
341 }
342 29 shmem_unlock(shm_handler);
343
344 /* Shared memory destruction */
345 29 pthread_mutex_lock(&mutex);
346 {
347
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 14 times.
29 if (--subprocesses_attached == 0) {
348 15 shmem_finalize(shm_handler, NULL /* do not check if empty */);
349 15 shm_handler = NULL;
350 15 shdata = NULL;
351 }
352 }
353 29 pthread_mutex_unlock(&mutex);
354 }
355
356
1/2
✓ Branch 0 taken 29 times.
✗ Branch 1 not taken.
29 return helper ? DLB_SUCCESS : DLB_ERR_NOPROC;
357 }
358
359
360 109 void shmem_async_enable_cpu(pid_t pid, int cpuid) {
361
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 107 times.
109 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, enable cpuid %d", pid, cpuid);
362 109 helper_t *helper = get_helper(pid);
363
1/2
✓ Branch 0 taken 109 times.
✗ Branch 1 not taken.
109 if (helper) {
364 109 message_t message = { .action = ACTION_ENABLE_CPU, .cpuid = cpuid };
365 109 enqueue_message(helper, &message);
366 }
367 109 }
368
369 4 void shmem_async_enable_cpu_set(pid_t pid, const cpu_set_t *cpu_set) {
370
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, enable cpu set %s",
371 pid, mu_to_str(cpu_set));
372 4 helper_t *helper = get_helper(pid);
373
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (helper) {
374 4 message_t message = { .action = ACTION_ENABLE_CPU_SET, .cpu_set = *cpu_set };
375 4 enqueue_message(helper, &message);
376 }
377 4 }
378
379 10 void shmem_async_disable_cpu(pid_t pid, int cpuid) {
380
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 4 times.
10 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, disable cpuid %d", pid, cpuid);
381 10 helper_t *helper = get_helper(pid);
382
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10 if (helper) {
383 10 message_t message = { .action = ACTION_DISABLE_CPU, .cpuid = cpuid };
384 10 enqueue_message(helper, &message);
385 }
386 10 }
387
388 7 void shmem_async_disable_cpu_set(pid_t pid, const cpu_set_t *cpu_set) {
389
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 2 times.
7 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, disable cpu set %s",
390 pid, mu_to_str(cpu_set));
391 7 helper_t *helper = get_helper(pid);
392
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7 if (helper) {
393 7 message_t message = { .action = ACTION_DISABLE_CPU_SET, .cpu_set = *cpu_set };
394 7 enqueue_message(helper, &message);
395 }
396 7 }
397
398 40 void shmem_async_set_num_cpus(pid_t pid, int ncpus) {
399
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, set num CPUs %d", pid, ncpus);
400 40 helper_t *helper = get_helper(pid);
401
1/2
✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
40 if (helper) {
402 40 message_t message = { .action = ACTION_SET_NUM_CPUS, .ncpus = ncpus };
403 40 enqueue_message(helper, &message);
404 }
405 40 }
406
407 1 int shmem_async__version(void) {
408 1 return SHMEM_ASYNC_VERSION;
409 }
410 22 size_t shmem_async__size(void) {
411 // max_helpers contains a value once shmem is initialized,
412 // otherwise return default size
413 22 return sizeof(shdata_t) + sizeof(helper_t) * (
414
2/2
✓ Branch 0 taken 21 times.
✓ Branch 1 taken 1 times.
22 max_helpers > 0 ? max_helpers : mu_get_system_size());
415 }
416
417 /* Only for testing purposes. Block current thread until helper thread
418 * with the given pid has finished its pending requests */
419 147 void shmem_async_wait_for_completion(pid_t pid) {
420 147 helper_t *helper = get_helper(pid);
421 while(1) {
422
1/2
✓ Branch 1 taken 199 times.
✗ Branch 2 not taken.
199 if (pthread_mutex_trylock(&helper->q_lock) == 0) {
423
2/2
✓ Branch 0 taken 196 times.
✓ Branch 1 taken 3 times.
199 if (helper->status == HELPER_WAITING
424
2/2
✓ Branch 0 taken 147 times.
✓ Branch 1 taken 49 times.
196 && helper->q_head == helper->q_tail) {
425 147 pthread_mutex_unlock(&helper->q_lock);
426 147 break;
427 }
428 52 pthread_mutex_unlock(&helper->q_lock);
429 }
430 52 usleep(1000);
431 }
432 147 }
433