GCC Code Coverage Report


Directory: src/
File: src/LB_numThreads/omptm_omp5.c
Date: 2025-11-21 10:34:40
Exec Total Coverage
Lines: 137 214 64.0%
Functions: 17 24 70.8%
Branches: 54 130 41.5%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2024 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "LB_numThreads/omptm_omp5.h"
21
22 #include "apis/dlb.h"
23 #include "support/debug.h"
24 #include "support/tracing.h"
25 #include "support/types.h"
26 #include "support/mask_utils.h"
27 #include "LB_comm/shmem_cpuinfo.h"
28 #include "LB_comm/shmem_procinfo.h"
29 #include "LB_numThreads/omptool.h"
30
31 #include <limits.h>
32 #include <sched.h>
33 #include <pthread.h>
34 #include <stdbool.h>
35 #include <stdint.h>
36 #include <string.h>
37 #include <unistd.h>
38 #include <dlfcn.h>
39
40 /* array_cpuid_t */
41 #define ARRAY_T cpuid_t
42 #include "support/array_template.h"
43
44 void omp_set_num_threads(int nthreads) __attribute__((weak));
45 typedef void (*set_num_threads_fn_t)(int);
46 static set_num_threads_fn_t set_num_threads_fn = NULL;
47
48
49 /*********************************************************************************/
50 /* OMP Thread Manager */
51 /*********************************************************************************/
52
53 typedef enum openmp_places_t {
54 OPENMP_PLACE_OTHER,
55 OPENMP_PLACE_CORES,
56 OPENMP_PLACE_THREADS,
57 } openmp_places_t;
58
59 static cpu_set_t active_mask, process_mask;
60 static bool lewi = false;
61 static bool drom = false;
62 static pid_t pid;
63 static int num_omp_threads_per_core;
64 static int hwthreads_per_core;
65 static int num_cores_in_process_mask;
66 static int num_initial_omp_threads;
67 static int num_initial_omp_threads_level1;
68 static omptool_opts_t omptool_opts;
69 static array_cpuid_t cpu_bindings; /* available CPUs sorted by thread_num */
70
71 /* Based on the process_mask, active_mask and num_omp_threads_per_core, compute
72 * the CPU binding for each thread number up to the system's highest CPU id */
73 11 static void compute_cpu_bindings(void) {
74
75 /* clear previous bindings */
76 11 array_cpuid_t_clear(&cpu_bindings);
77
78 /* Iterate active mask and add potential CPUs to which threads may be bound to */
79 11 for (int cpuid = mu_get_first_cpu(&active_mask);
80
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 11 times.
37 cpuid >= 0;
81 26 cpuid = mu_get_cpu_next_core(&active_mask, cpuid)) {
82
83 /* Add as many CPUs in the core as num_omp_threads_per_core */
84 26 int num_added_cpus = 0;
85 26 const mu_cpuset_t *core_mask = mu_get_core_mask(cpuid);
86 26 for(int cpuid_in_core = core_mask->first_cpuid;
87
3/4
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
44 cpuid_in_core >= 0 && cpuid_in_core != DLB_CPUID_INVALID;
88 18 cpuid_in_core = mu_get_next_cpu(core_mask->set, cpuid_in_core)) {
89
90 /* Add CPU id as potentially available */
91 42 array_cpuid_t_push(&cpu_bindings, cpuid_in_core);
92
93 /* Stop adding if we reach the limit */
94
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 18 times.
42 if (++num_added_cpus == num_omp_threads_per_core) break;
95 }
96 }
97
98 /* Sort list by ownership */
99 11 qsort_r(cpu_bindings.items, cpu_bindings.count, sizeof(cpuid_t),
100 mu_cmp_cpuids_by_ownership, &process_mask);
101 11 }
102
103 6 static void set_num_threads_from_active_mask(void) {
104
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (num_omp_threads_per_core == hwthreads_per_core) {
105 /* Equivalent to OMP_PLACES=threads, enable one thread per CPU in the active mask */
106 3 set_num_threads_fn(CPU_COUNT(&active_mask));
107 } else {
108 /* Obtain first the number of available cores, then apply multiplier. */
109 cpu_set_t active_core_set;
110 3 mu_get_cores_intersecting_with_cpuset(&active_core_set, &active_mask);
111 3 int num_active_cores = CPU_COUNT(&active_core_set) / hwthreads_per_core;
112 3 set_num_threads_fn(num_active_cores * num_omp_threads_per_core);
113 }
114 6 }
115
116 static void cb_enable_cpu(int cpuid, void *arg) {
117 CPU_SET(cpuid, &active_mask);
118 set_num_threads_from_active_mask();
119 compute_cpu_bindings();
120 }
121
122 2 static void cb_enable_cpu_set(const cpu_set_t *cpu_set, void *arg) {
123
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 2 times.
34 CPU_OR(&active_mask, &active_mask, cpu_set);
124 2 set_num_threads_from_active_mask();
125 2 compute_cpu_bindings();
126 2 }
127
128 static void cb_disable_cpu(int cpuid, void *arg) {
129 CPU_CLR(cpuid, &active_mask);
130 set_num_threads_from_active_mask();
131 compute_cpu_bindings();
132 }
133
134 1 static void cb_disable_cpu_set(const cpu_set_t *cpu_set, void *arg) {
135 1 mu_subtract(&active_mask, &active_mask, cpu_set);
136 1 set_num_threads_from_active_mask();
137 1 compute_cpu_bindings();
138 1 }
139
140 static void cb_set_process_mask(const cpu_set_t *mask, void *arg) {
141 memcpy(&process_mask, mask, sizeof(cpu_set_t));
142 memcpy(&active_mask, mask, sizeof(cpu_set_t));
143
144 /* Update number of cores in process mask */
145 cpu_set_t core_set;
146 mu_get_cores_subset_of_cpuset(&core_set, &process_mask);
147 num_cores_in_process_mask = CPU_COUNT(&core_set) / hwthreads_per_core;
148
149 set_num_threads_from_active_mask();
150 compute_cpu_bindings();
151 }
152
153 1 static void omptm_omp5__borrow(void) {
154 /* The "Exponential Weighted Moving Average" is an average computed
155 * with weighting factors that decrease exponentially on new samples.
156 * I.e,: Most recent values are more significant for the average */
157 typedef struct ExponentialWeightedMovingAverage {
158 float value;
159 unsigned int samples;
160 } ewma_t;
161 static ewma_t ewma = {1.0f, 1};
162 static int cpus_to_borrow = 1;
163
164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (drom) {
165 DLB_PollDROM_Update();
166 }
167
168
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if (lewi && omptool_opts & OMPTOOL_OPTS_BORROW) {
169 int err_return = DLB_Return();
170 int err_reclaim = DLB_Reclaim();
171 int err_borrow = DLB_BorrowCpus(cpus_to_borrow);
172
173 if (err_return == DLB_SUCCESS
174 || err_reclaim == DLB_SUCCESS
175 || err_borrow == DLB_SUCCESS) {
176 verbose(VB_OMPT, "Acquire - Setting new mask to %s", mu_to_str(&active_mask));
177 }
178
179 if (err_borrow == DLB_SUCCESS) {
180 cpus_to_borrow += ewma.value;
181 } else {
182 cpus_to_borrow -= ewma.value;
183 cpus_to_borrow = max_int(1, cpus_to_borrow);
184 }
185
186 /* Update Exponential Weighted Moving Average */
187 ++ewma.samples;
188 float alpha = 2.0f / (ewma.samples + 1);
189 ewma.value = ewma.value + alpha * (cpus_to_borrow - ewma.value);
190 }
191 1 }
192
193 1 static void omptm_omp5__lend(void) {
194
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if (lewi && omptool_opts & OMPTOOL_OPTS_LEND) {
195 set_num_threads_fn(1);
196 CPU_ZERO(&active_mask);
197 CPU_SET(sched_getcpu(), &active_mask);
198 compute_cpu_bindings();
199 verbose(VB_OMPT, "Release - Setting new mask to %s", mu_to_str(&active_mask));
200 DLB_SetMaxParallelism(1);
201 }
202 1 }
203
204 void omptm_omp5__lend_from_api(void) {
205 if (lewi) {
206 set_num_threads_fn(1);
207 CPU_ZERO(&active_mask);
208 CPU_SET(sched_getcpu(), &active_mask);
209 compute_cpu_bindings();
210 verbose(VB_OMPT, "Release - Setting new mask to %s", mu_to_str(&active_mask));
211 //DLB_SetMaxParallelism(1);
212 }
213 }
214
215
216 /* Parse OMP_PLACES. For now we are only interested if the variable is equal to
217 * 'cores' or 'threads' */
218 3 static openmp_places_t parse_omp_places(void) {
219 3 const char *env_places = getenv("OMP_PLACES");
220
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (env_places) {
221
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (strcmp(env_places, "cores") == 0) {
222 1 return OPENMP_PLACE_CORES;
223
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 } else if (strcmp(env_places, "threads") == 0) {
224 1 return OPENMP_PLACE_THREADS;
225 }
226 }
227 1 return OPENMP_PLACE_OTHER;
228 }
229
230 /* Parse OMP_NUM_THREADS and return the product of the values in the list
231 * e.g.: OMP_NUM_THREADS=8 and OMP_NUM_THREADS=2,4 return 8 */
232 6 static int parse_omp_num_threads(int level) {
233 6 const char *env_num_threads = getenv("OMP_NUM_THREADS");
234
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 2 times.
6 if (env_num_threads == NULL) return 0;
235
236 2 int current_level = 0;
237 2 int num_threads = 0;
238 2 size_t len = strlen(env_num_threads) + 1;
239 2 char *env_copy = malloc(sizeof(char)*len);
240 2 strcpy(env_copy, env_num_threads);
241 2 char *end = NULL;
242 2 char *token = strtok_r(env_copy, ",", &end);
243
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
4 while(token && current_level != level) {
244 2 int num = strtol(token, NULL, 10);
245
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (num > 0) {
246
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 num_threads = num_threads == 0 ? num : num_threads * num;
247 }
248 2 token = strtok_r(NULL, ",", &end);
249 2 ++current_level;
250 }
251
252 2 free(env_copy);
253
254 2 return num_threads;
255 }
256
257 4 void omptm_omp5__init(pid_t process_id, const options_t *options) {
258
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (omp_set_num_threads) {
259 4 set_num_threads_fn = omp_set_num_threads;
260 } else {
261 void *handle = dlopen("libomp.so", RTLD_LAZY | RTLD_GLOBAL);
262 if (handle == NULL) {
263 handle = dlopen("libiomp5.so", RTLD_LAZY | RTLD_GLOBAL);
264 }
265 if (handle == NULL) {
266 handle = dlopen("libgomp.so", RTLD_LAZY | RTLD_GLOBAL);
267 }
268 if (handle != NULL) {
269 set_num_threads_fn = (set_num_threads_fn_t)dlsym(handle, "omp_set_num_threads");
270 }
271 fatal_cond(set_num_threads_fn == NULL, "omp_set_num_threads cannot be found");
272 }
273
274 4 lewi = options->lewi;
275 4 drom = options->drom;
276 4 omptool_opts = options->lewi_ompt;
277 4 pid = process_id;
278 4 hwthreads_per_core = mu_get_system_hwthreads_per_core();
279
280 /* Get process mask */
281 4 int have_procinfo = shmem_procinfo__getprocessmask(pid, &process_mask, DLB_DROM_FLAGS_NONE);
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (have_procinfo != DLB_SUCCESS) {
283 mu_get_system_mask(&process_mask);
284 }
285 4 memcpy(&active_mask, &process_mask, sizeof(cpu_set_t));
286
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 verbose(VB_OMPT, "Initial mask set to: %s", mu_to_str(&process_mask));
287
288 /* initialize cpu_bindings based on active_mask */
289 4 array_cpuid_t_init(&cpu_bindings, mu_get_system_size());
290 4 compute_cpu_bindings();
291
292
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
4 if (lewi || drom) {
293 int err;
294 3 err = DLB_CallbackSet(dlb_callback_enable_cpu, (dlb_callback_t)cb_enable_cpu, NULL);
295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (err != DLB_SUCCESS) {
296 warning("DLB_CallbackSet enable_cpu: %s", DLB_Strerror(err));
297 }
298 3 err = DLB_CallbackSet(dlb_callback_enable_cpu_set, (dlb_callback_t)cb_enable_cpu_set, NULL);
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (err != DLB_SUCCESS) {
300 warning("DLB_CallbackSet enable_cpu_set: %s", DLB_Strerror(err));
301 }
302 3 err = DLB_CallbackSet(dlb_callback_disable_cpu, (dlb_callback_t)cb_disable_cpu, NULL);
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (err != DLB_SUCCESS) {
304 warning("DLB_CallbackSet disable_cpu: %s", DLB_Strerror(err));
305 }
306 3 err = DLB_CallbackSet(dlb_callback_disable_cpu_set, (dlb_callback_t)cb_disable_cpu_set, NULL);
307
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (err != DLB_SUCCESS) {
308 warning("DLB_CallbackSet disable_cpu_set: %s", DLB_Strerror(err));
309 }
310 3 err = DLB_CallbackSet(dlb_callback_set_process_mask,
311 (dlb_callback_t)cb_set_process_mask, NULL);
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (err != DLB_SUCCESS) {
313 warning("DLB_CallbackSet set_process_mask: %s", DLB_Strerror(err));
314 }
315
316 /* Update number of cores in process mask */
317 cpu_set_t core_set;
318 3 mu_get_cores_subset_of_cpuset(&core_set, &process_mask);
319 3 num_cores_in_process_mask = CPU_COUNT(&core_set) / hwthreads_per_core;
320
321 /* Best effort trying to compute OMP threads per core */
322 3 openmp_places_t openmp_places = parse_omp_places();
323 3 num_initial_omp_threads = parse_omp_num_threads(INT_MAX);
324 3 num_initial_omp_threads_level1 = parse_omp_num_threads(1);
325
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 if (openmp_places == OPENMP_PLACE_CORES) {
326 1 num_omp_threads_per_core = 1;
327
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 } else if (openmp_places == OPENMP_PLACE_THREADS) {
328 1 num_omp_threads_per_core = hwthreads_per_core;
329
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 } else if ( num_initial_omp_threads > 0 && num_cores_in_process_mask > 0) {
330 num_omp_threads_per_core = max_int(
331 1, num_initial_omp_threads / num_cores_in_process_mask);
332 } else {
333 1 num_omp_threads_per_core = 1;
334 }
335
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 verbose(VB_OMPT, "hwthreads per core: %d, omp threads per core: %d",
336 hwthreads_per_core, num_omp_threads_per_core);
337
338 /* If --lewi-ompt=lend (non-default) we lend from the beginning,
339 * otherwise we update the number of threads to the current active mask */
340
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (lewi && omptool_opts & OMPTOOL_OPTS_LEND) {
341 omptm_omp5__lend();
342 } else {
343 3 set_num_threads_from_active_mask();
344 }
345 }
346 4 }
347
348 4 void omptm_omp5__finalize(void) {
349 4 array_cpuid_t_destroy(&cpu_bindings);
350 4 }
351
352
353 /* lb_funcs.into_blocking_call has already been called and
354 * the current CPU will be lent according to the --lew-mpi option
355 * This function just lends the rest of the CPUs
356 */
357 void omptm_omp5__IntoBlockingCall(void) {
358 if (lewi) {
359 int mycpu = sched_getcpu();
360
361 /* Lend every CPU except the current one */
362 cpu_set_t mask;
363 memcpy(&mask, &active_mask, sizeof(cpu_set_t));
364 CPU_CLR(mycpu, &mask);
365 DLB_LendCpuMask(&mask);
366
367 /* Set active_mask to only the current CPU */
368 CPU_ZERO(&active_mask);
369 CPU_SET(mycpu, &active_mask);
370 set_num_threads_fn(1);
371 compute_cpu_bindings();
372
373 verbose(VB_OMPT, "IntoBlockingCall - lending all");
374 }
375 }
376
377 void omptm_omp5__OutOfBlockingCall(void) {
378 if (lewi) {
379 DLB_Reclaim();
380
381 /* Return to the initial number of threads */
382 set_num_threads_fn(num_initial_omp_threads_level1);
383 }
384 }
385
386
387 1 void omptm_omp5__parallel_begin(omptool_parallel_data_t *parallel_data) {
388
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (parallel_data->level == 1) {
389 1 omptm_omp5__borrow();
390 }
391 1 }
392
393 1 void omptm_omp5__parallel_end(omptool_parallel_data_t *parallel_data) {
394
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (parallel_data->level == 1) {
395 1 omptm_omp5__lend();
396 }
397 1 }
398
399 1 static inline int get_thread_binding(int index) {
400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (unlikely(cpu_bindings.count == 0)) {
401 /* On a regular execution, cpu_bindings *should* have some valid entries,
402 * but we still need to support scenarios with 0 CPUs in the active mask. */
403 return -1;
404 }
405 1 return cpu_bindings.items[index % cpu_bindings.count];
406 }
407
408 1 void omptm_omp5__into_parallel_function(
409 omptool_parallel_data_t *parallel_data, unsigned int index) {
410
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (parallel_data->level == 1) {
411 1 int cpuid = get_thread_binding(index);
412 1 int current_cpuid = sched_getcpu();
413
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (cpuid >=0 && cpuid != current_cpuid) {
414 cpu_set_t thread_mask;
415 1 CPU_ZERO(&thread_mask);
416
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 CPU_SET(cpuid, &thread_mask);
417 1 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &thread_mask);
418 instrument_event(REBIND_EVENT, cpuid+1, EVENT_BEGIN);
419
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 verbose(VB_OMPT, "Rebinding thread %d to CPU %d", index, cpuid);
420 }
421 }
422 1 }
423
424 1 void omptm_omp5__into_parallel_implicit_barrier(omptool_parallel_data_t *parallel_data) {
425 1 if (parallel_data->level == 1) {
426 /* A thread participating in the level 1 parallel region
427 * has reached the implicit barrier */
428 instrument_event(REBIND_EVENT, 0, EVENT_END);
429 }
430 1 }
431
432
433 /*********************************************************************************/
434 /* Vtable for handling omptool events */
435 /*********************************************************************************/
436
437 const omptool_event_funcs_t omptm_omp5_events_vtable = {
438 .init = omptm_omp5__init,
439 .finalize = omptm_omp5__finalize,
440 .into_mpi = omptm_omp5__IntoBlockingCall,
441 .outof_mpi = omptm_omp5__OutOfBlockingCall,
442 .lend_from_api = omptm_omp5__lend_from_api,
443 .thread_begin = NULL,
444 .thread_end = NULL,
445 .thread_role_shift = NULL,
446 .parallel_begin = omptm_omp5__parallel_begin,
447 .parallel_end = omptm_omp5__parallel_end,
448 .into_parallel_function = omptm_omp5__into_parallel_function,
449 .outof_parallel_function = NULL,
450 .into_parallel_implicit_barrier = omptm_omp5__into_parallel_implicit_barrier,
451 .task_create = NULL,
452 .task_complete = NULL,
453 .task_switch = NULL,
454 };
455
456
457 /*********************************************************************************/
458 /* Functions for testing purposes */
459 /*********************************************************************************/
460
461 const cpu_set_t* omptm_omp5_testing__get_active_mask(void) {
462 return &active_mask;
463 }
464
465 4 const array_cpuid_t* omptm_omp5_testing__compute_and_get_cpu_bindings(void) {
466
467 4 compute_cpu_bindings();
468 4 return &cpu_bindings;
469 }
470
471 2 void omptm_omp5_testing__set_num_threads_fn(void (*fn)(int)) {
472 2 set_num_threads_fn = fn;
473 2 }
474