GCC Code Coverage Report


Directory: src/
File: src/LB_comm/shmem_async.c
Date: 2024-11-22 17:07:10
Exec Total Coverage
Lines: 154 185 83.2%
Functions: 13 15 86.7%
Branches: 68 113 60.2%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2024 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "LB_comm/shmem_async.h"
21
22 #include "LB_core/spd.h"
23 #include "LB_comm/shmem.h"
24 #include "LB_comm/shmem_cpuinfo.h"
25 #include "LB_comm/shmem_procinfo.h"
26 #include "LB_numThreads/numThreads.h"
27 #include "apis/dlb_errors.h"
28 #include "support/mask_utils.h"
29 #include "support/debug.h"
30
31 #include <sched.h>
32 #include <unistd.h>
33 #include <errno.h>
34 #include <pthread.h>
35 #include <string.h>
36
37 enum { NOBODY = 0 };
38 enum { QUEUE_SIZE = 100 };
39
40 typedef enum HelperAction {
41 ACTION_NONE = 0,
42 ACTION_ENABLE_CPU,
43 ACTION_ENABLE_CPU_SET,
44 ACTION_DISABLE_CPU,
45 ACTION_DISABLE_CPU_SET,
46 ACTION_SET_CPU_SET,
47 ACTION_SET_NUM_CPUS,
48 ACTION_JOIN
49 } action_t;
50
51 typedef enum HelperStatus {
52 HELPER_UNKOWN_STATUS = 0,
53 HELPER_WAITING,
54 HELPER_BUSY,
55 } status_t;
56
57 typedef struct Message {
58 action_t action;
59 int cpuid;
60 int ncpus;
61 cpu_set_t cpu_set;
62 } message_t;
63
64 typedef struct {
65 /* Queue attributes */
66 message_t queue[QUEUE_SIZE];
67 unsigned int q_head;
68 unsigned int q_tail;
69 pthread_mutex_t q_lock;
70 pthread_cond_t q_wait_data;
71
72 /* Helper metadata */
73 pid_t pid;
74 pthread_t pth;
75 cpu_set_t mask;
76 const pm_interface_t *pm;
77 bool joinable;
78 status_t status;
79 } helper_t;
80
81
82 typedef struct {
83 helper_t helpers[0];
84 } shdata_t;
85
86 enum { SHMEM_ASYNC_VERSION = 4 };
87
88 static int max_helpers = 0;
89 static shdata_t *shdata = NULL;
90 static const char *shmem_name = "async";
91 static shmem_handler_t *shm_handler = NULL;
92 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
93 static int subprocesses_attached = 0;
94
95 256 static helper_t* get_helper(pid_t pid) {
96 int h;
97
2/4
✓ Branch 0 taken 441 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 441 times.
✗ Branch 3 not taken.
441 for (h = 0; shdata && h < max_helpers; ++h) {
98
2/2
✓ Branch 0 taken 256 times.
✓ Branch 1 taken 185 times.
441 if (shdata->helpers[h].pid == pid) {
99 256 return &shdata->helpers[h];
100 }
101 }
102 return NULL;
103 }
104
105 160 static void enqueue_message(helper_t *helper, const message_t *message) {
106 /* Discard message if helper does not accept new inputs */
107
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 160 times.
160 if (helper->joinable) {
108 return;
109 }
110
111 160 pthread_mutex_lock(&helper->q_lock);
112
113 /* If ACTION_JOIN, flag helper to reject further messages */
114
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 143 times.
160 if (message->action == ACTION_JOIN) {
115 17 helper->joinable = true;
116 }
117
118 /* Get next_head index and check that buffer is not full */
119 160 unsigned int next_head = (helper->q_head + 1) % QUEUE_SIZE;
120
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 160 times.
160 if (__builtin_expect((next_head == helper->q_tail), 0)) {
121 pthread_mutex_unlock(&helper->q_lock);
122 fatal("Max petitions requested for asynchronous thread");
123 }
124
125 /* Enqueue message and update head */
126
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 154 times.
160 verbose(VB_ASYNC, "Writing message %d, head is now %d", helper->q_head, next_head);
127 160 helper->queue[helper->q_head] = *message;
128 160 helper->q_head = next_head;
129
130 /* Signal helper */
131 160 pthread_cond_signal(&helper->q_wait_data);
132
133 160 pthread_mutex_unlock(&helper->q_lock);
134 }
135
136 160 static void dequeue_message(helper_t *helper, message_t *message) {
137 160 pthread_mutex_lock(&helper->q_lock);
138
139 /* Block until there's some message in the queue */
140 160 helper->status = HELPER_WAITING;
141
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 160 times.
216 while (helper->q_head == helper->q_tail) {
142 56 pthread_cond_wait(&helper->q_wait_data, &helper->q_lock);
143 }
144 160 helper->status = HELPER_BUSY;
145
146 /* Dequeue message and update tail */
147 160 unsigned int next_tail = (helper->q_tail + 1) % QUEUE_SIZE;
148
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 154 times.
160 verbose(VB_ASYNC, "Reading message %d, tail is now %d", helper->q_tail, next_tail);
149 160 *message = helper->queue[helper->q_tail];
150 160 helper->q_tail = next_tail;
151
152 160 pthread_mutex_unlock(&helper->q_lock);
153 160 }
154
155 17 static void* thread_start(void *arg) {
156 17 spd_enter_dlb(thread_spd);
157 17 helper_t *helper = arg;
158 17 const pm_interface_t* const pm = helper->pm;
159 17 pthread_setaffinity_np(helper->pth, sizeof(cpu_set_t), &helper->mask);
160
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
17 verbose(VB_ASYNC, "Helper thread started, pinned to %s", mu_to_str(&helper->mask));
161
162 17 bool join = false;
163
3/4
✓ Branch 0 taken 160 times.
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 17 times.
177 while(!join || helper->q_head != helper->q_tail) {
164 message_t message;
165 160 dequeue_message(helper, &message);
166
167 160 int error = 0;
168
4/9
✗ Branch 0 not taken.
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 40 times.
✓ Branch 7 taken 17 times.
✗ Branch 8 not taken.
160 switch(message.action) {
169 case ACTION_NONE:
170 verbose(VB_ASYNC, "Helper thread attending petition: NONE");
171 break;
172 101 case ACTION_ENABLE_CPU:
173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 101 times.
101 verbose(VB_ASYNC,
174 "Helper thread attending petition: ENABLE %d",
175 message.cpuid);
176 101 error = enable_cpu(pm, message.cpuid);
177 101 break;
178 case ACTION_ENABLE_CPU_SET:
179 verbose(VB_ASYNC, "Helper thread attending petition: ENABLE_CPU_SET %s",
180 mu_to_str(&message.cpu_set));
181 error = enable_cpu_set(pm, &message.cpu_set);
182 break;
183 2 case ACTION_DISABLE_CPU:
184
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 verbose(VB_ASYNC, "Helper thread attending petition: DISABLE %d",
185 message.cpuid);
186 2 error = disable_cpu(pm, message.cpuid);
187 2 break;
188 case ACTION_DISABLE_CPU_SET:
189 verbose(VB_ASYNC, "Helper thread attending petition: DISABLE_CPU_SET %s",
190 mu_to_str(&message.cpu_set));
191 error = disable_cpu_set(pm, &message.cpu_set);
192 break;
193 case ACTION_SET_CPU_SET:
194 break;
195 40 case ACTION_SET_NUM_CPUS:
196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 verbose(VB_ASYNC, "Helper thread attending petition: SET_NUM_CPUS %d",
197 message.ncpus);
198 40 error = update_threads(pm, message.ncpus);
199 40 break;
200 17 case ACTION_JOIN:
201 17 join = true;
202 17 break;
203 }
204 if (error) {
205 // error ?
206 }
207 }
208
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
17 verbose(VB_ASYNC, "Helper thread finalizing");
209 17 return NULL;
210 }
211
212 1 static void cleanup_shmem(void *shdata_ptr, int pid) {
213 1 shdata_t *shared_data = shdata_ptr;
214 int h;
215
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
9 for (h = 0; h < max_helpers; ++h) {
216
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
8 if (shared_data->helpers[h].pid == pid) {
217 1 shared_data->helpers[h] = (const helper_t){};
218 }
219 }
220 1 }
221
222 17 int shmem_async_init(pid_t pid, const pm_interface_t *pm, const cpu_set_t *process_mask,
223 const char *shmem_key) {
224 17 int error = DLB_ERR_UNKNOWN;
225
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
17 verbose(VB_ASYNC, "Creating helper thread");
226
227 // Shared memory creation
228 17 pthread_mutex_lock(&mutex);
229 {
230
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 8 times.
17 if (shm_handler == NULL) {
231 9 max_helpers = mu_get_system_size();
232 18 shm_handler = shmem_init((void**)&shdata,
233 9 &(const shmem_props_t) {
234 9 .size = shmem_async__size(),
235 .name = shmem_name,
236 .key = shmem_key,
237 .version = SHMEM_ASYNC_VERSION,
238 .cleanup_fn = cleanup_shmem,
239 });
240 9 subprocesses_attached = 1;
241 } else {
242 8 ++subprocesses_attached;
243 }
244 }
245 17 pthread_mutex_unlock(&mutex);
246
247 17 helper_t *helper = NULL;
248 // Lock shmem to register new subprocess
249 17 shmem_lock(shm_handler);
250 {
251 int h;
252
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 for (h = 0; h < max_helpers; ++h) {
253 // Register helper
254
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 14 times.
31 if (shdata->helpers[h].pid == NOBODY) {
255 17 helper = &shdata->helpers[h];
256
257 /* Initialize queue structure */
258 17 memset(helper->queue, 0, sizeof(helper->queue));
259 17 helper->q_head = 0;
260 17 helper->q_tail = 0;
261
262 /* Initialize queue sync attributes */
263 pthread_mutexattr_t mutex_attr;
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_mutexattr_init(&mutex_attr) );
265
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_mutexattr_setpshared(&mutex_attr,
266 PTHREAD_PROCESS_SHARED) );
267
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_mutex_init(&helper->q_lock, &mutex_attr) );
268
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_mutexattr_destroy(&mutex_attr) );
269 pthread_condattr_t cond_attr;
270
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_condattr_init(&cond_attr) );
271
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_condattr_setpshared(&cond_attr,
272 PTHREAD_PROCESS_SHARED) );
273
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_cond_init(&helper->q_wait_data, &cond_attr) );
274
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 fatal_cond_strerror( pthread_condattr_destroy(&cond_attr) );
275
276 // Initialize helper metadata and create thread
277 17 helper->pm = pm;
278 17 helper->pid = pid;
279 17 memcpy(&helper->mask, process_mask, sizeof(cpu_set_t));
280 17 pthread_create(&helper->pth, NULL, thread_start, (void*)helper);
281
282 17 error = DLB_SUCCESS;
283 17 break;
284 }
285 }
286 }
287 17 shmem_unlock(shm_handler);
288
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 if (helper == NULL) {
290 error = DLB_ERR_NOMEM;
291 warn_error(DLB_ERR_NOMEM);
292 }
293
294 17 return error;
295 }
296
297 17 int shmem_async_finalize(pid_t pid) {
298
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
17 verbose(VB_ASYNC, "Finalizing helper thread for pid: %d", pid);
299 17 helper_t *helper = get_helper(pid);
300
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
17 if (helper) {
301 /* Enqueue JOIN message */
302 17 message_t message = { .action = ACTION_JOIN };
303 17 enqueue_message(helper, &message);
304
305 /* Wait helper thread to finish */
306 17 pthread_join(helper->pth, NULL);
307
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
17 verbose(VB_ASYNC, "Helper thread joined");
308
309 /* Clear helper data */
310 17 shmem_lock(shm_handler);
311 {
312 17 pthread_mutex_destroy(&helper->q_lock);
313 17 pthread_cond_destroy(&helper->q_wait_data);
314 17 memset(helper, 0, sizeof(*helper));
315 }
316 17 shmem_unlock(shm_handler);
317
318 /* Shared memory destruction */
319 17 pthread_mutex_lock(&mutex);
320 {
321
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 8 times.
17 if (--subprocesses_attached == 0) {
322 9 shmem_finalize(shm_handler, NULL /* do not check if empty */);
323 9 shm_handler = NULL;
324 9 shdata = NULL;
325 }
326 }
327 17 pthread_mutex_unlock(&mutex);
328 }
329
330
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
17 return helper ? DLB_SUCCESS : DLB_ERR_NOPROC;
331 }
332
333
334 101 void shmem_async_enable_cpu(pid_t pid, int cpuid) {
335
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 101 times.
101 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, enable cpuid %d", pid, cpuid);
336 101 helper_t *helper = get_helper(pid);
337
1/2
✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
101 if (helper) {
338 101 message_t message = { .action = ACTION_ENABLE_CPU, .cpuid = cpuid };
339 101 enqueue_message(helper, &message);
340 }
341 101 }
342
343 void shmem_async_enable_cpu_set(pid_t pid, const cpu_set_t *cpu_set) {
344 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, enable cpu set %s",
345 pid, mu_to_str(cpu_set));
346 helper_t *helper = get_helper(pid);
347 if (helper) {
348 message_t message = { .action = ACTION_ENABLE_CPU_SET, .cpu_set = *cpu_set };
349 enqueue_message(helper, &message);
350 }
351 }
352
353 2 void shmem_async_disable_cpu(pid_t pid, int cpuid) {
354
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, disable cpuid %d", pid, cpuid);
355 2 helper_t *helper = get_helper(pid);
356
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (helper) {
357 2 message_t message = { .action = ACTION_DISABLE_CPU, .cpuid = cpuid };
358 2 enqueue_message(helper, &message);
359 }
360 2 }
361
362 void shmem_async_disable_cpu_set(pid_t pid, const cpu_set_t *cpu_set) {
363 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, disable cpu set %s",
364 pid, mu_to_str(cpu_set));
365 helper_t *helper = get_helper(pid);
366 if (helper) {
367 message_t message = { .action = ACTION_DISABLE_CPU_SET, .cpu_set = *cpu_set };
368 enqueue_message(helper, &message);
369 }
370 }
371
372 40 void shmem_async_set_num_cpus(pid_t pid, int ncpus) {
373
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 verbose(VB_ASYNC, "Enqueuing petition for pid: %d, set num CPUs %d", pid, ncpus);
374 40 helper_t *helper = get_helper(pid);
375
1/2
✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
40 if (helper) {
376 40 message_t message = { .action = ACTION_SET_NUM_CPUS, .ncpus = ncpus };
377 40 enqueue_message(helper, &message);
378 }
379 40 }
380
381 1 int shmem_async__version(void) {
382 1 return SHMEM_ASYNC_VERSION;
383 }
384 10 size_t shmem_async__size(void) {
385 10 return sizeof(shdata_t) + sizeof(helper_t)*mu_get_system_size();
386 }
387
388 /* Only for testing purposes. Block current thread until helper thread
389 * with the given pid has finished its pending requests */
390 96 void shmem_async_wait_for_completion(pid_t pid) {
391 96 helper_t *helper = get_helper(pid);
392 while(1) {
393
1/2
✓ Branch 1 taken 128 times.
✗ Branch 2 not taken.
128 if (pthread_mutex_trylock(&helper->q_lock) == 0) {
394
1/2
✓ Branch 0 taken 128 times.
✗ Branch 1 not taken.
128 if (helper->status == HELPER_WAITING
395
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 32 times.
128 && helper->q_head == helper->q_tail) {
396 96 pthread_mutex_unlock(&helper->q_lock);
397 96 break;
398 }
399 32 pthread_mutex_unlock(&helper->q_lock);
400 }
401 32 usleep(1000);
402 }
403 96 }
404