GCC Code Coverage Report


Directory: src/
File: src/LB_policies/lewi_mask.c
Date: 2026-06-05 08:54:23
Exec Total Coverage
Lines: 334 388 86.1%
Functions: 36 40 90.0%
Branches: 162 233 69.5%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2024 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "LB_policies/lewi_mask.h"
21
22 #include "LB_core/spd.h"
23 #include "LB_comm/shmem_cpuinfo.h"
24 #include "LB_comm/shmem_async.h"
25 #include "apis/dlb_errors.h"
26 #include "support/debug.h"
27 #include "support/mask_utils.h"
28 #include "support/small_array.h"
29 #include "support/types.h"
30
31 #include <sched.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <pthread.h>
35
36 /* array_cpuid_t */
37 #define ARRAY_T cpuid_t
38 #include "support/array_template.h"
39
40 /* array_cpuinfo_task_t */
41 #define ARRAY_T cpuinfo_task_t
42 #define ARRAY_KEY_T pid_t
43 #include "support/array_template.h"
44
45
46 /* Node size will be the same for all processes in the node,
47 * it is safe to be out of the shared memory */
48 static int node_size = -1;
49
50 /* LeWI_mask data is private for each process */
51 typedef struct LeWI_mask_info {
52 int64_t last_borrow;
53 int max_parallelism;
54 array_cpuid_t cpus_priority_array;
55 cpu_set_t pending_reclaimed_cpus; /* CPUs that become reclaimed after an MPI */
56 cpu_set_t in_mpi_cpus; /* CPUs inside an MPI call */
57 pthread_mutex_t mutex; /* Mutex to protect lewi_info */
58 } lewi_info_t;
59
60
61 /* Compute the common elements between a cpuid array a cpu_set:
62 * cpuid_t *result = cpuid_t *op1 AND cpu_set_t *op2
63 * (cpu_set_t* may be NULL, meaning neutral value)
64 */
65 123 static inline void cpu_array_and(array_cpuid_t *result,
66 const array_cpuid_t *op1, const cpu_set_t *op2) {
67
2/2
✓ Branch 0 taken 984 times.
✓ Branch 1 taken 123 times.
1107 for (unsigned int i = 0; i < op1->count; ++i) {
68
7/8
✓ Branch 0 taken 240 times.
✓ Branch 1 taken 744 times.
✓ Branch 2 taken 240 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 158 times.
✓ Branch 5 taken 82 times.
✓ Branch 6 taken 158 times.
✓ Branch 7 taken 82 times.
984 if (op2 == NULL || CPU_ISSET(op1->items[i], op2)) {
69 902 array_cpuid_t_push(result, op1->items[i]);
70 }
71 }
72 123 }
73
74 /* Construct a cpuid array from a cpu_set_t */
75 12 static inline void cpu_array_from_cpuset(array_cpuid_t *result,
76 const cpu_set_t *cpu_set) {
77 12 for (int cpuid = mu_get_first_cpu(cpu_set);
78
3/4
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
24 cpuid >= 0 && cpuid != DLB_CPUID_INVALID;
79 12 cpuid = mu_get_next_cpu(cpu_set, cpuid)) {
80 12 array_cpuid_t_push(result, cpuid);
81 }
82 12 }
83
84
85 /* Construct a priority list of CPUs considering the topology and the affinity mask */
86 40 static void lewi_mask_UpdateOwnershipInfo(const subprocess_descriptor_t *spd,
87 const cpu_set_t *process_mask) {
88
89 40 lewi_info_t *lewi_info = spd->lewi_info;
90 40 lewi_affinity_t lewi_affinity = spd->options.lewi_affinity;
91
92 cpu_set_t affinity_mask;
93 40 mu_get_nodes_intersecting_with_cpuset(&affinity_mask, process_mask);
94
95 /* Reset current priority array */
96 40 array_cpuid_t *cpus_priority_array = &lewi_info->cpus_priority_array;
97 40 array_cpuid_t_clear(cpus_priority_array);
98
99 /* First, construct a list of potentially available CPUs
100 * (owned + nearby, depending on the affinity policy) */
101 cpu_set_t system_mask;
102 40 mu_get_system_mask(&system_mask);
103 40 for (int cpuid = mu_get_first_cpu(&system_mask);
104
3/4
✓ Branch 0 taken 560 times.
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 560 times.
✗ Branch 3 not taken.
600 cpuid >= 0 && cpuid != DLB_CPUID_INVALID;
105 560 cpuid = mu_get_next_cpu(&system_mask, cpuid)) {
106
5/6
✓ Branch 0 taken 560 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 168 times.
✓ Branch 3 taken 392 times.
✓ Branch 4 taken 168 times.
✓ Branch 5 taken 392 times.
560 if (CPU_ISSET(cpuid, process_mask)) {
107 168 array_cpuid_t_push(cpus_priority_array, cpuid);
108 } else {
109
1/5
✓ Branch 0 taken 392 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
392 switch (lewi_affinity) {
110 392 case LEWI_AFFINITY_AUTO:
111 case LEWI_AFFINITY_MASK:
112 case LEWI_AFFINITY_NEARBY_FIRST:
113 392 array_cpuid_t_push(cpus_priority_array, cpuid);
114 392 break;
115 case LEWI_AFFINITY_NEARBY_ONLY:
116 if (CPU_ISSET(cpuid, &affinity_mask)) {
117 array_cpuid_t_push(cpus_priority_array, cpuid);
118 }
119 break;
120 case LEWI_AFFINITY_SPREAD_IFEMPTY:
121 // This case cannot be pre-computed
122 break;
123 case LEWI_AFFINITY_NONE:
124 /* LEWI_AFFINITY_NONE should force LeWI without mask support */
125 fatal("Unhandled LEWI_AFFINITY_NONE in LeWI mask. "
126 "Please report bug");
127 }
128 }
129 }
130
131 /* Sort available CPUs according to the affinity */
132 cpu_set_t affinity[3];
133 40 memcpy(&affinity[0], process_mask, sizeof(cpu_set_t));
134 40 memcpy(&affinity[1], &affinity_mask, sizeof(cpu_set_t));
135 40 CPU_ZERO(&affinity[2]);
136 40 qsort_r(cpus_priority_array->items, cpus_priority_array->count,
137 sizeof(cpuid_t), mu_cmp_cpuids_by_affinity, &affinity);
138 40 }
139
140
141 /*********************************************************************************/
142 /* Thread-local pointers for temporary storage during LeWI calls */
143 /*********************************************************************************/
144
145 /* Array of eligible CPUs. Usually a subset of the CPUs priority array. */
146 static __thread array_cpuid_t *_cpu_subset = NULL;
147
148 static pthread_key_t cpu_subset_key;
149 static pthread_once_t cpu_subset_once = PTHREAD_ONCE_INIT;
150
151 17 static void cpu_subset_destructor(void *p) {
152
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 ensure(p == _cpu_subset, "pthread key does not match _cpu_subset");
153 17 array_cpuid_t_destroy(_cpu_subset);
154 17 free(_cpu_subset);
155 17 _cpu_subset = NULL;
156 17 pthread_setspecific(cpu_subset_key, NULL);
157 17 }
158
159 10 static void make_cpu_subset_key(void) {
160 /* Associate key with destructor, all worker threads will call it on pthread_exit */
161 10 pthread_key_create(&cpu_subset_key, cpu_subset_destructor);
162 10 }
163
164 135 static inline array_cpuid_t* get_cpu_subset(const subprocess_descriptor_t *spd) {
165 /* Thread already has an allocated array, return it */
166
2/2
✓ Branch 0 taken 118 times.
✓ Branch 1 taken 17 times.
135 if (likely(_cpu_subset != NULL)) {
167 118 array_cpuid_t_clear(_cpu_subset);
168 118 return _cpu_subset;
169 }
170
171 /* Otherwise, allocate */
172 17 _cpu_subset = malloc(sizeof(array_cpuid_t));
173 17 array_cpuid_t_init(_cpu_subset, node_size);
174
175 /* Associate pointer to key to clean up array on thread destruction */
176 17 pthread_once(&cpu_subset_once, make_cpu_subset_key);
177 17 pthread_setspecific(cpu_subset_key, _cpu_subset);
178
179 17 return _cpu_subset;
180 }
181
182 /* Array of cpuinfo tasks. For enabling or disabling CPUs. */
183 static __thread array_cpuinfo_task_t *_tasks = NULL;
184
185 static pthread_key_t tasks_key;
186 static pthread_once_t tasks_once = PTHREAD_ONCE_INIT;
187
188 40 static void tasks_destructor(void *p) {
189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 ensure(p == _tasks, "pthread key does not match _tasks");
190 40 array_cpuinfo_task_t_destroy(_tasks);
191 40 free(_tasks);
192 40 _tasks = NULL;
193 40 pthread_setspecific(tasks_key, NULL);
194 40 }
195
196 15 static void make_tasks_key(void) {
197 /* Associate key with destructor, all worker threads will call it on pthread_exit */
198 15 pthread_key_create(&tasks_key, tasks_destructor);
199 15 }
200
201 399 static inline array_cpuinfo_task_t* get_tasks(const subprocess_descriptor_t *spd) {
202 /* Thread already has an allocated array, return it */
203
2/2
✓ Branch 0 taken 359 times.
✓ Branch 1 taken 40 times.
399 if (likely(_tasks != NULL)) {
204 359 array_cpuinfo_task_t_clear(_tasks);
205 359 return _tasks;
206 }
207
208 /* Otherwise, allocate */
209 40 _tasks = malloc(sizeof(array_cpuinfo_task_t));
210 40 array_cpuinfo_task_t_init(_tasks, node_size*2);
211
212 /* Associate pointer to key to clean up structure on thread destruction */
213 40 pthread_once(&tasks_once, make_tasks_key);
214 40 pthread_setspecific(tasks_key, _tasks);
215
216 40 return _tasks;
217 }
218
219
220 /*********************************************************************************/
221 /* Resolve cpuinfo tasks */
222 /*********************************************************************************/
223
224 292 static void resolve_cpuinfo_tasks(const subprocess_descriptor_t *restrict spd,
225 array_cpuinfo_task_t *restrict tasks) {
226
227 292 size_t tasks_count = tasks->count;
228
229 /* We don't need it strictly sorted, but if there are 3 or more tasks
230 * we need to group them by pid */
231
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 259 times.
292 if (tasks_count > 2 ) {
232 33 array_cpuinfo_task_t_sort(tasks);
233 }
234
235 /* Iterate tasks by group of PIDs */
236
2/2
✓ Branch 0 taken 212 times.
✓ Branch 1 taken 292 times.
504 for (size_t i=0; i<tasks_count; ) {
237
238 /* count how many times this PID is repeated */
239 212 size_t j = i+1;
240 212 while (j < tasks_count
241
4/4
✓ Branch 0 taken 146 times.
✓ Branch 1 taken 180 times.
✓ Branch 2 taken 114 times.
✓ Branch 3 taken 32 times.
326 && tasks->items[j].pid == tasks->items[i].pid) {
242 114 ++j;
243 }
244 212 size_t num_tasks = j - i;
245
246
2/2
✓ Branch 0 taken 139 times.
✓ Branch 1 taken 73 times.
212 if (num_tasks == 1) {
247 /* resolve single task */
248 139 const cpuinfo_task_t *task = &tasks->items[i];
249
2/2
✓ Branch 0 taken 115 times.
✓ Branch 1 taken 24 times.
139 if (task->pid == spd->id) {
250
2/2
✓ Branch 0 taken 98 times.
✓ Branch 1 taken 17 times.
115 if (task->action == ENABLE_CPU) {
251
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 42 times.
98 verbose(VB_MICROLB, "Enabling CPU %d", task->cpuid);
252 98 enable_cpu(&spd->pm, task->cpuid);
253 }
254
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
17 else if (task->action == DISABLE_CPU) {
255
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 11 times.
17 verbose(VB_MICROLB, "Disabling CPU %d", task->cpuid);
256 17 disable_cpu(&spd->pm, task->cpuid);
257 }
258 }
259
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 7 times.
24 else if (spd->options.mode == MODE_ASYNC) {
260
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 9 times.
17 if (task->action == ENABLE_CPU) {
261
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 6 times.
8 verbose(VB_MICROLB, "Requesting process %d to enable CPU %d",
262 task->pid, task->cpuid);
263 8 shmem_async_enable_cpu(task->pid, task->cpuid);
264 }
265
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 else if (task->action == DISABLE_CPU) {
266
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 3 times.
9 verbose(VB_MICROLB, "Requesting process %d to disable CPU %d",
267 task->pid, task->cpuid);
268 9 shmem_async_disable_cpu(task->pid, task->cpuid);
269 9 shmem_cpuinfo__return_async_cpu(task->pid, task->cpuid);
270 }
271 }
272 } else {
273 /* group tasks */
274 73 cpu_set_t cpus_to_enable = {};
275 73 cpu_set_t cpus_to_disable = {};
276
2/2
✓ Branch 0 taken 187 times.
✓ Branch 1 taken 73 times.
260 for (size_t k = i; k < i+num_tasks; ++k) {
277 187 const cpuinfo_task_t *task = &tasks->items[k];
278
2/2
✓ Branch 0 taken 143 times.
✓ Branch 1 taken 44 times.
187 if (task->action == ENABLE_CPU) {
279
1/2
✓ Branch 0 taken 143 times.
✗ Branch 1 not taken.
143 CPU_SET(task->cpuid, &cpus_to_enable);
280 }
281
1/2
✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
44 else if (task->action == DISABLE_CPU) {
282
1/2
✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
44 CPU_SET(task->cpuid, &cpus_to_disable);
283 }
284 }
285
286 /* resolve group of tasks for the same PID */
287 73 const cpuinfo_task_t *task = &tasks->items[i];
288
2/2
✓ Branch 0 taken 53 times.
✓ Branch 1 taken 20 times.
73 if (task->pid == spd->id) {
289
2/2
✓ Branch 1 taken 50 times.
✓ Branch 2 taken 3 times.
53 if (CPU_COUNT(&cpus_to_enable) > 0) {
290
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 21 times.
50 verbose(VB_MICROLB, "Enabling CPUs %s", mu_to_str(&cpus_to_enable));
291 50 enable_cpu_set(&spd->pm, &cpus_to_enable);
292 }
293
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 50 times.
53 if (CPU_COUNT(&cpus_to_disable) > 0) {
294
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 verbose(VB_MICROLB, "Disabling CPUs %s", mu_to_str(&cpus_to_disable));
295 3 disable_cpu_set(&spd->pm, &cpus_to_disable);
296 }
297 }
298
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 9 times.
20 else if (spd->options.mode == MODE_ASYNC) {
299
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 7 times.
11 if (CPU_COUNT(&cpus_to_enable) > 0) {
300
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 verbose(VB_MICROLB, "Requesting process %d to enable CPUs %s",
301 task->pid, mu_to_str(&cpus_to_enable));
302 4 shmem_async_enable_cpu_set(task->pid, &cpus_to_enable);
303 }
304
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 4 times.
11 if (CPU_COUNT(&cpus_to_disable) > 0) {
305
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 2 times.
7 verbose(VB_MICROLB, "Requesting process %d to disable CPUs %s",
306 task->pid, mu_to_str(&cpus_to_disable));
307 7 shmem_async_disable_cpu_set(task->pid, &cpus_to_disable);
308 7 shmem_cpuinfo__return_async_cpu_mask(task->pid, &cpus_to_disable);
309 }
310 }
311 }
312
313 212 i += num_tasks;
314 }
315 292 }
316
317
318 /*********************************************************************************/
319 /* Init / Finalize */
320 /*********************************************************************************/
321
322 40 int lewi_mask_Init(subprocess_descriptor_t *spd) {
323 /* Value is always updated to allow testing different node sizes */
324 40 node_size = mu_get_system_size();
325
326 /* Allocate and initialize private structure */
327 40 spd->lewi_info = malloc(sizeof(lewi_info_t));
328 40 lewi_info_t *lewi_info = spd->lewi_info;
329 40 *lewi_info = (const lewi_info_t) {
330 40 .max_parallelism = spd->options.lewi_max_parallelism,
331 .mutex = PTHREAD_MUTEX_INITIALIZER,
332 };
333 40 array_cpuid_t_init(&lewi_info->cpus_priority_array, node_size);
334 40 lewi_mask_UpdateOwnershipInfo(spd, &spd->process_mask);
335
336 /* Enable request queues only in async mode */
337
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 24 times.
40 if (spd->options.mode == MODE_ASYNC) {
338 16 shmem_cpuinfo__enable_request_queues();
339 }
340
341 40 return DLB_SUCCESS;
342 }
343
344 40 int lewi_mask_Finalize(subprocess_descriptor_t *spd) {
345 /* De-register subprocess from the shared memory */
346 40 array_cpuinfo_task_t *tasks = get_tasks(spd);
347 40 int error = shmem_cpuinfo__deregister(spd->id, tasks);
348
1/2
✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
40 if (error == DLB_SUCCESS) {
349 40 resolve_cpuinfo_tasks(spd, tasks);
350 }
351
352 /* Deallocate main thread arrays (worker threads do it on thread_exit) */
353
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 23 times.
40 if (_cpu_subset != NULL) cpu_subset_destructor(_cpu_subset);
354
1/2
✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
40 if (_tasks != NULL) tasks_destructor(_tasks);
355
356 /* Deallocate private structure */
357 40 lewi_info_t *lewi_info = spd->lewi_info;
358 40 array_cpuid_t_destroy(&lewi_info->cpus_priority_array);
359 40 free(lewi_info);
360 40 lewi_info = NULL;
361
362 40 return (error >= 0) ? DLB_SUCCESS : error;
363 }
364
365
366 /*********************************************************************************/
367 /* LeWI Modes (enable/disable, max_parallelism, ...) */
368 /*********************************************************************************/
369
370 5 int lewi_mask_EnableDLB(const subprocess_descriptor_t *spd) {
371 /* Reset value of last_borrow */
372 5 ((lewi_info_t*)spd->lewi_info)->last_borrow = 0;
373 5 return DLB_SUCCESS;
374 }
375
376 5 int lewi_mask_DisableDLB(const subprocess_descriptor_t *spd) {
377 5 array_cpuinfo_task_t *tasks = get_tasks(spd);
378 5 int error = shmem_cpuinfo__reset(spd->id, tasks);
379
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if (error == DLB_SUCCESS) {
380 5 resolve_cpuinfo_tasks(spd, tasks);
381 }
382 5 return error;
383 }
384
385 4 int lewi_mask_SetMaxParallelism(const subprocess_descriptor_t *spd, int max) {
386 4 int error = DLB_SUCCESS;
387
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (max > 0) {
388 4 lewi_info_t *lewi_info = spd->lewi_info;
389 4 lewi_info->max_parallelism = max;
390 4 array_cpuinfo_task_t *tasks = get_tasks(spd);
391 4 error = shmem_cpuinfo__update_max_parallelism(spd->id, max, tasks);
392
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (error == DLB_SUCCESS) {
393 4 resolve_cpuinfo_tasks(spd, tasks);
394 }
395 }
396 4 return error;
397 }
398
399 4 int lewi_mask_UnsetMaxParallelism(const subprocess_descriptor_t *spd) {
400 4 lewi_info_t *lewi_info = spd->lewi_info;
401 4 lewi_info->max_parallelism = 0;
402 4 return DLB_SUCCESS;
403 }
404
405
406 /*********************************************************************************/
407 /* MPI */
408 /*********************************************************************************/
409
410 /* Obtain thread mask and remove first core if keep_cpu_on_blocking_call */
411 28 static inline void get_mask_for_blocking_call(
412 cpu_set_t *cpu_set, bool keep_cpu_on_blocking_call) {
413
414 /* Obtain current thread's affinity mask */
415 28 pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), cpu_set);
416
417
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 24 times.
28 if (keep_cpu_on_blocking_call) {
418 /* Remove the first core */
419 4 int first_cpuid = mu_get_first_cpu(cpu_set);
420 4 const mu_cpuset_t *first_core = mu_get_core_mask(first_cpuid);
421 4 mu_subtract(cpu_set, cpu_set, first_core->set);
422 }
423 28 }
424
425 /* Lend the CPUs of the thread encountering the blocking call */
426 14 int lewi_mask_IntoBlockingCall(const subprocess_descriptor_t *spd) {
427
428 14 lewi_info_t *lewi_info = spd->lewi_info;
429
430 /* Obtain affinity mask to lend */
431 cpu_set_t cpu_set;
432 14 get_mask_for_blocking_call(&cpu_set,
433 14 spd->options.lewi_keep_cpu_on_blocking_call);
434
435
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 14 times.
14 if (unlikely(mu_intersects(&lewi_info->in_mpi_cpus, &cpu_set))) {
436 #ifdef DEBUG_VERSION
437 fatal("Some CPU in %s already into blocking call", mu_to_str(&cpu_set));
438 #else
439 /* This scenario is not expected to occur; if it does, simply ignore. */
440 return DLB_NOUPDT;
441 #endif
442 }
443
444 14 int error = DLB_NOUPDT;
445
446
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 2 times.
14 if (mu_count(&cpu_set) > 0) {
447
448 /* Add cpu_set to in_mpi_cpus */
449 12 mu_or(&lewi_info->in_mpi_cpus, &lewi_info->in_mpi_cpus, &cpu_set);
450
451
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 verbose(VB_MICROLB, "In blocking call, lending %s", mu_to_str(&cpu_set));
452
453 /* Finally, lend mask */
454 12 error = lewi_mask_LendCpuMask(spd, &cpu_set);
455 }
456 14 return error;
457 }
458
459 /* Reclaim the CPUs that were lent when encountering the blocking call.
460 * The thread must have not change its affinity mask since then. */
461 14 int lewi_mask_OutOfBlockingCall(const subprocess_descriptor_t *spd) {
462
463 14 int error = DLB_NOUPDT;
464 14 lewi_info_t *lewi_info = spd->lewi_info;
465
466 /* Obtain affinity mask to lend */
467 cpu_set_t cpu_set;
468 14 get_mask_for_blocking_call(&cpu_set,
469 14 spd->options.lewi_keep_cpu_on_blocking_call);
470
471
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 14 times.
14 if (unlikely(!mu_is_subset(&lewi_info->in_mpi_cpus, &cpu_set))) {
472 #ifdef DEBUG_VERSION
473 fatal("Some CPU in %s is not into blocking call", mu_to_str(&cpu_set));
474 #else
475 /* This scenario is not expected to occur; if it does, simply ignore. */
476 return DLB_NOUPDT;
477 #endif
478 }
479
480
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 2 times.
14 if (mu_count(&lewi_info->in_mpi_cpus) > 0) {
481
482 /* Clear cpu_set from in_mpi_cpus */
483 12 mu_subtract(&lewi_info->in_mpi_cpus, &lewi_info->in_mpi_cpus, &cpu_set);
484
485
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 verbose(VB_MICROLB, "Out of blocking call, acquiring %s", mu_to_str(&cpu_set));
486
487 /* If there are owned CPUs to reclaim: */
488 cpu_set_t owned_cpus;
489 12 mu_and(&owned_cpus, &cpu_set, &spd->process_mask);
490
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 6 times.
12 if (mu_count(&owned_cpus) > 0) {
491
492 /* Construct a CPU array from owned_cpus */
493 6 array_cpuid_t *cpu_subset = get_cpu_subset(spd);
494 6 cpu_array_from_cpuset(cpu_subset, &owned_cpus);
495
496 /* Acquire CPUs */
497 6 array_cpuinfo_task_t *tasks = get_tasks(spd);
498 6 error = shmem_cpuinfo__acquire_from_cpu_subset(spd->id, cpu_subset, tasks);
499
500 /* Resolve tasks: Even if callbacks weren't called during IntoBlockingCall,
501 * other LeWI actions may have been called after it so we cannot ignore
502 * pending actions */
503
3/4
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
6 if (error == DLB_SUCCESS || error == DLB_NOTED) {
504 6 resolve_cpuinfo_tasks(spd, tasks);
505 }
506 }
507
508 /* If there are ONLY non-owned CPUs to reclaim:
509 * (if some owned CPU was already reclaimed, we forget about non-owned) */
510 cpu_set_t non_owned_cpus;
511 12 mu_subtract(&non_owned_cpus, &cpu_set, &spd->process_mask);
512
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 6 times.
12 if (mu_count(&non_owned_cpus) > 0
513
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 && mu_count(&owned_cpus) == 0) {
514
515 /* Remove CPUs that are disabled in the shmem */
516 6 for (int cpuid = mu_get_first_cpu(&non_owned_cpus);
517
3/4
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
12 cpuid >= 0 && cpuid != DLB_CPUID_INVALID;
518 6 cpuid = mu_get_next_cpu(&non_owned_cpus, cpuid)) {
519
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 4 times.
6 if (!shmem_cpuinfo__is_cpu_enabled(cpuid)) {
520
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 CPU_CLR(cpuid, &non_owned_cpus);
521 }
522 }
523
524 /* Construct a CPU array from non_owned_cpus */
525 6 array_cpuid_t *cpu_subset = get_cpu_subset(spd);
526 6 cpu_array_from_cpuset(cpu_subset, &non_owned_cpus);
527
528 /* Borrow CPUs, but also ignoring the enable callbacks */
529 6 array_cpuinfo_task_t *tasks = get_tasks(spd);
530 6 error = shmem_cpuinfo__borrow_from_cpu_subset(spd->id, cpu_subset, tasks);
531
532 /* A non-success error means that not all CPUs could be borrowed */
533
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 2 times.
6 if (error != DLB_SUCCESS) {
534 /* Annotate the CPU as pending to bypass the shared memory for the next query */
535 4 mu_or(&lewi_info->pending_reclaimed_cpus, &lewi_info->pending_reclaimed_cpus,
536 &non_owned_cpus);
537
538 /* We lost the CPU while in MPI.
539 * In async mode, we can just disable the CPU.
540 * In polling mode, the process needs to call Return */
541
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (spd->options.mode == MODE_ASYNC) {
542 2 disable_cpu_set(&spd->pm, &non_owned_cpus);
543 }
544 }
545 }
546 }
547
548 14 return error;
549 }
550
551
552 /*********************************************************************************/
553 /* Lend */
554 /*********************************************************************************/
555
556 2 int lewi_mask_Lend(const subprocess_descriptor_t *spd) {
557 cpu_set_t mask;
558 2 mu_get_system_mask(&mask);
559 2 return lewi_mask_LendCpuMask(spd, &mask);
560 }
561
562 83 int lewi_mask_LendCpu(const subprocess_descriptor_t *spd, int cpuid) {
563 83 array_cpuinfo_task_t *tasks = get_tasks(spd);
564 83 int error = shmem_cpuinfo__lend_cpu(spd->id, cpuid, tasks);
565
566
1/2
✓ Branch 0 taken 83 times.
✗ Branch 1 not taken.
83 if (error == DLB_SUCCESS) {
567
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 39 times.
83 if (spd->options.mode == MODE_ASYNC) {
568 44 resolve_cpuinfo_tasks(spd, tasks);
569 }
570
571 /* Clear possible pending reclaimed CPUs */
572 83 lewi_info_t *lewi_info = spd->lewi_info;
573
1/2
✓ Branch 0 taken 83 times.
✗ Branch 1 not taken.
83 CPU_CLR(cpuid, &lewi_info->pending_reclaimed_cpus);
574 }
575 83 return error;
576 }
577
578 51 int lewi_mask_LendCpuMask(const subprocess_descriptor_t *spd, const cpu_set_t *mask) {
579 51 array_cpuinfo_task_t *tasks = get_tasks(spd);
580 51 int error = shmem_cpuinfo__lend_cpu_mask(spd->id, mask, tasks);
581
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (error == DLB_SUCCESS) {
582
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 25 times.
51 if (spd->options.mode == MODE_ASYNC) {
583 26 resolve_cpuinfo_tasks(spd, tasks);
584 }
585
586 /* Clear possible pending reclaimed CPUs */
587 51 lewi_info_t *lewi_info = spd->lewi_info;
588 51 mu_subtract(&lewi_info->pending_reclaimed_cpus,
589 51 &lewi_info->pending_reclaimed_cpus, mask);
590 }
591 51 return error;
592 }
593
594
595 /*********************************************************************************/
596 /* Reclaim */
597 /*********************************************************************************/
598
599 8 int lewi_mask_Reclaim(const subprocess_descriptor_t *spd) {
600 /* Even though shmem_cpuinfo__reclaim_all exists,
601 * iterating the mask should be more efficient */
602 8 return lewi_mask_ReclaimCpuMask(spd, &spd->process_mask);
603 }
604
605 8 int lewi_mask_ReclaimCpu(const subprocess_descriptor_t *spd, int cpuid) {
606 8 array_cpuinfo_task_t *tasks = get_tasks(spd);
607 8 int error = shmem_cpuinfo__reclaim_cpu(spd->id, cpuid, tasks);
608
4/4
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 6 times.
✓ Branch 3 taken 1 times.
8 if (error == DLB_SUCCESS || error == DLB_NOTED) {
609 7 resolve_cpuinfo_tasks(spd, tasks);
610 }
611 8 return error;
612 }
613
614 int lewi_mask_ReclaimCpus(const subprocess_descriptor_t *spd, int ncpus) {
615 array_cpuinfo_task_t *tasks = get_tasks(spd);
616 int error = shmem_cpuinfo__reclaim_cpus(spd->id, ncpus, tasks);
617 if (error == DLB_SUCCESS || error == DLB_NOTED) {
618 resolve_cpuinfo_tasks(spd, tasks);
619 }
620 return error;
621 }
622
623 19 int lewi_mask_ReclaimCpuMask(const subprocess_descriptor_t *spd, const cpu_set_t *mask) {
624 19 array_cpuinfo_task_t *tasks = get_tasks(spd);
625 19 int error = shmem_cpuinfo__reclaim_cpu_mask(spd->id, mask, tasks);
626 19 resolve_cpuinfo_tasks(spd, tasks);
627 19 return error;
628 }
629
630
631 /*********************************************************************************/
632 /* Acquire */
633 /*********************************************************************************/
634
635 29 int lewi_mask_AcquireCpu(const subprocess_descriptor_t *spd, int cpuid) {
636 29 array_cpuinfo_task_t *tasks = get_tasks(spd);
637 29 int error = shmem_cpuinfo__acquire_cpu(spd->id, cpuid, tasks);
638
4/4
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 17 times.
✓ Branch 2 taken 9 times.
✓ Branch 3 taken 3 times.
29 if (error == DLB_SUCCESS || error == DLB_NOTED) {
639 26 resolve_cpuinfo_tasks(spd, tasks);
640 }
641 29 return error;
642 }
643
644 65 int lewi_mask_AcquireCpus(const subprocess_descriptor_t *spd, int ncpus) {
645 65 int error = DLB_NOUPDT;
646
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 56 times.
65 if (ncpus == 0) {
647 /* AcquireCPUs(0) has a special meaning of removing any previous request */
648 9 shmem_cpuinfo__remove_requests(spd->id);
649 9 error = DLB_SUCCESS;
650
1/2
✓ Branch 0 taken 56 times.
✗ Branch 1 not taken.
56 } else if (ncpus > 0) {
651 56 error = lewi_mask_AcquireCpusInMask(spd, ncpus, NULL);
652 }
653 65 return error;
654 }
655
656 22 int lewi_mask_AcquireCpuMask(const subprocess_descriptor_t *spd, const cpu_set_t *mask) {
657 22 return lewi_mask_AcquireCpusInMask(spd, 0, mask);
658 }
659
660 93 int lewi_mask_AcquireCpusInMask(const subprocess_descriptor_t *spd, int ncpus,
661 const cpu_set_t *mask) {
662 93 lewi_info_t *lewi_info = spd->lewi_info;
663 93 bool async = spd->options.mode == MODE_ASYNC;
664
2/2
✓ Branch 0 taken 36 times.
✓ Branch 1 taken 57 times.
93 int64_t *last_borrow = async ? NULL : &lewi_info->last_borrow;
665
666 /* Construct a CPU array based on cpus_priority_array and mask (if present) */
667 93 array_cpuid_t *cpu_subset = get_cpu_subset(spd);
668 93 cpu_array_and(cpu_subset, &lewi_info->cpus_priority_array, mask);
669
670 /* Provide a number of requested CPUs only if needed */
671
2/2
✓ Branch 0 taken 71 times.
✓ Branch 1 taken 22 times.
93 int *requested_ncpus = ncpus > 0 ? &ncpus : NULL;
672
673 93 array_cpuinfo_task_t *tasks = get_tasks(spd);
674 93 int error = shmem_cpuinfo__acquire_ncpus_from_cpu_subset(spd->id,
675 93 requested_ncpus, cpu_subset, spd->options.lewi_affinity,
676 lewi_info->max_parallelism, last_borrow, tasks);
677
678
2/2
✓ Branch 0 taken 73 times.
✓ Branch 1 taken 20 times.
93 if (error != DLB_NOUPDT) {
679 73 resolve_cpuinfo_tasks(spd, tasks);
680 }
681 93 return error;
682 }
683
684
685 /*********************************************************************************/
686 /* Borrow */
687 /*********************************************************************************/
688
689 8 int lewi_mask_Borrow(const subprocess_descriptor_t *spd) {
690 cpu_set_t system_mask;
691 8 mu_get_system_mask(&system_mask);
692 8 return lewi_mask_BorrowCpusInMask(spd, 0, &system_mask);
693 }
694
695 10 int lewi_mask_BorrowCpu(const subprocess_descriptor_t *spd, int cpuid) {
696 10 array_cpuinfo_task_t *tasks = get_tasks(spd);
697 10 int error = shmem_cpuinfo__borrow_cpu(spd->id, cpuid, tasks);
698
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10 if (error == DLB_SUCCESS) {
699 10 resolve_cpuinfo_tasks(spd, tasks);
700 }
701 10 return error;
702 }
703
704 14 int lewi_mask_BorrowCpus(const subprocess_descriptor_t *spd, int ncpus) {
705 14 return lewi_mask_BorrowCpusInMask(spd, ncpus, NULL);
706 }
707
708 int lewi_mask_BorrowCpuMask(const subprocess_descriptor_t *spd, const cpu_set_t *mask) {
709 return lewi_mask_BorrowCpusInMask(spd, 0, mask);
710 }
711
712 30 int lewi_mask_BorrowCpusInMask(const subprocess_descriptor_t *spd, int ncpus, const cpu_set_t *mask) {
713 30 lewi_info_t *lewi_info = spd->lewi_info;
714 30 bool async = spd->options.mode == MODE_ASYNC;
715
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 15 times.
30 int64_t *last_borrow = async ? NULL : &lewi_info->last_borrow;
716
717 /* Construct a CPU array based on cpus_priority_array and mask (if present) */
718 30 array_cpuid_t *cpu_subset = get_cpu_subset(spd);
719 30 cpu_array_and(cpu_subset, &lewi_info->cpus_priority_array, mask);
720
721 /* Provide a number of requested CPUs only if needed */
722
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 8 times.
30 int *requested_ncpus = ncpus > 0 ? &ncpus : NULL;
723
724 30 array_cpuinfo_task_t *tasks = get_tasks(spd);
725 30 int error = shmem_cpuinfo__borrow_ncpus_from_cpu_subset(spd->id,
726 30 requested_ncpus, cpu_subset, spd->options.lewi_affinity,
727 lewi_info->max_parallelism, last_borrow, tasks);
728
729
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 12 times.
30 if (error == DLB_SUCCESS) {
730 18 resolve_cpuinfo_tasks(spd, tasks);
731 }
732
733 30 return error;
734 }
735
736
737 /*********************************************************************************/
738 /* Return */
739 /*********************************************************************************/
740
741 1 int lewi_mask_Return(const subprocess_descriptor_t *spd) {
742
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (spd->options.mode == MODE_ASYNC) {
743 // Return should not be called in async mode
744 return DLB_ERR_NOCOMP;
745 }
746
747 1 array_cpuinfo_task_t *tasks = get_tasks(spd);
748 1 int error = shmem_cpuinfo__return_all(spd->id, tasks);
749 1 resolve_cpuinfo_tasks(spd, tasks);
750
751 /* Check possible pending reclaimed CPUs */
752 1 lewi_info_t *lewi_info = spd->lewi_info;
753 1 int cpuid = mu_get_first_cpu(&lewi_info->pending_reclaimed_cpus);
754
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (cpuid >= 0) {
755 do {
756 disable_cpu(&spd->pm, cpuid);
757 cpuid = mu_get_next_cpu(&lewi_info->pending_reclaimed_cpus, cpuid);
758 } while (cpuid >= 0 && cpuid != DLB_CPUID_INVALID);
759 CPU_ZERO(&lewi_info->pending_reclaimed_cpus);
760 error = DLB_SUCCESS;
761 }
762
763 1 return error;
764 }
765
766 21 int lewi_mask_ReturnCpu(const subprocess_descriptor_t *spd, int cpuid) {
767
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 14 times.
21 if (spd->options.mode == MODE_ASYNC) {
768 // Return should not be called in async mode
769 7 return DLB_ERR_NOCOMP;
770 }
771
772 14 array_cpuinfo_task_t *tasks = get_tasks(spd);
773 14 int error = shmem_cpuinfo__return_cpu(spd->id, cpuid, tasks);
774
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
14 if (error == DLB_SUCCESS || error == DLB_ERR_REQST) {
775 13 resolve_cpuinfo_tasks(spd, tasks);
776
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 } else if (error == DLB_ERR_PERM) {
777 /* Check possible pending reclaimed CPUs */
778 1 lewi_info_t *lewi_info = spd->lewi_info;
779
3/6
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 if (CPU_ISSET(cpuid, &lewi_info->pending_reclaimed_cpus)) {
780
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 CPU_CLR(cpuid, &lewi_info->pending_reclaimed_cpus);
781 1 disable_cpu(&spd->pm, cpuid);
782 1 error = DLB_SUCCESS;
783 }
784 }
785 14 return error;
786 }
787
788 int lewi_mask_ReturnCpuMask(const subprocess_descriptor_t *spd, const cpu_set_t *mask) {
789 if (spd->options.mode == MODE_ASYNC) {
790 // Return should not be called in async mode
791 return DLB_ERR_NOCOMP;
792 }
793
794 array_cpuinfo_task_t *tasks = get_tasks(spd);
795 int error = shmem_cpuinfo__return_cpu_mask(spd->id, mask, tasks);
796 resolve_cpuinfo_tasks(spd, tasks);
797
798 /* Check possible pending reclaimed CPUs */
799 lewi_info_t *lewi_info = spd->lewi_info;
800 if (CPU_COUNT(&lewi_info->pending_reclaimed_cpus) > 0) {
801 cpu_set_t cpus_to_return;
802 CPU_AND(&cpus_to_return, &lewi_info->pending_reclaimed_cpus, mask);
803 for (int cpuid = mu_get_first_cpu(&cpus_to_return);
804 cpuid >= 0 && cpuid != DLB_CPUID_INVALID;
805 cpuid = mu_get_next_cpu(&cpus_to_return, cpuid)) {
806 disable_cpu(&spd->pm, cpuid);
807 }
808 mu_subtract(&lewi_info->pending_reclaimed_cpus,
809 &lewi_info->pending_reclaimed_cpus, &cpus_to_return);
810 error = DLB_SUCCESS;
811 }
812
813 return error;
814 }
815
816
817 // Others
818
819 32 int lewi_mask_CheckCpuAvailability(const subprocess_descriptor_t *spd, int cpuid) {
820 32 return shmem_cpuinfo__check_cpu_availability(spd->id, cpuid);
821 }
822
823 int lewi_mask_UpdateOwnership(const subprocess_descriptor_t *spd,
824 const cpu_set_t *process_mask) {
825 /* Update priority array */
826 lewi_mask_UpdateOwnershipInfo(spd, process_mask);
827
828 /* Update cpuinfo data and return reclaimed CPUs */
829 array_cpuinfo_task_t *tasks = get_tasks(spd);
830 shmem_cpuinfo__update_ownership(spd->id, process_mask, tasks);
831 resolve_cpuinfo_tasks(spd, tasks);
832
833 /* Check possible pending reclaimed CPUs */
834 lewi_info_t *lewi_info = spd->lewi_info;
835 int cpuid = mu_get_first_cpu(&lewi_info->pending_reclaimed_cpus);
836 if (cpuid >= 0) {
837 do {
838 disable_cpu(&spd->pm, cpuid);
839 cpuid = mu_get_next_cpu(&lewi_info->pending_reclaimed_cpus, cpuid);
840 } while (cpuid >= 0 && cpuid != DLB_CPUID_INVALID);
841 CPU_ZERO(&lewi_info->pending_reclaimed_cpus);
842 }
843
844 return DLB_SUCCESS;
845 }
846