GCC Code Coverage Report


Directory: src/
File: src/LB_numThreads/omptm_omp5.c
Date: 2024-11-22 17:07:10
Exec Total Coverage
Lines: 88 202 43.6%
Functions: 11 24 45.8%
Branches: 28 118 23.7%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2024 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20 #include "LB_numThreads/omptm_omp5.h"
21
22 #include "apis/dlb.h"
23 #include "support/debug.h"
24 #include "support/tracing.h"
25 #include "support/types.h"
26 #include "support/mask_utils.h"
27 #include "LB_comm/shmem_cpuinfo.h"
28 #include "LB_comm/shmem_procinfo.h"
29 #include "LB_numThreads/omptool.h"
30
31 #include <limits.h>
32 #include <sched.h>
33 #include <pthread.h>
34 #include <stdbool.h>
35 #include <stdint.h>
36 #include <string.h>
37 #include <unistd.h>
38 #include <dlfcn.h>
39
40 /* array_cpuid_t */
41 #define ARRAY_T cpuid_t
42 #include "support/array_template.h"
43
44 void omp_set_num_threads(int nthreads) __attribute__((weak));
45 static void (*set_num_threads_fn)(int) = NULL;
46
47
48 /*********************************************************************************/
49 /* OMP Thread Manager */
50 /*********************************************************************************/
51
52 typedef enum openmp_places_t {
53 OPENMP_PLACE_OTHER,
54 OPENMP_PLACE_CORES,
55 OPENMP_PLACE_THREADS,
56 } openmp_places_t;
57
58 static cpu_set_t active_mask, process_mask;
59 static bool lewi = false;
60 static bool drom = false;
61 static pid_t pid;
62 static int num_omp_threads_per_core;
63 static int hwthreads_per_core;
64 static int num_cores_in_process_mask;
65 static int num_initial_omp_threads;
66 static int num_initial_omp_threads_level1;
67 static omptool_opts_t omptool_opts;
68 static array_cpuid_t cpu_bindings; /* available CPUs sorted by thread_num */
69
70 /* Based on the process_mask, active_mask and num_omp_threads_per_core, compute
71 * the CPU binding for each thread number up to the system's highest CPU id */
72 9 static void compute_cpu_bindings(void) {
73
74 /* clear previous bindings */
75 9 array_cpuid_t_clear(&cpu_bindings);
76
77 /* Iterate active mask and add potential CPUs to which threads may be bound to */
78 9 for (int cpuid = mu_get_first_cpu(&active_mask);
79
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 9 times.
33 cpuid >= 0;
80 24 cpuid = mu_get_cpu_next_core(&active_mask, cpuid)) {
81
82 /* Add as many CPUs in the core as num_omp_threads_per_core */
83 24 int num_added_cpus = 0;
84 24 const mu_cpuset_t *core_mask = mu_get_core_mask(cpuid);
85 24 for(int cpuid_in_core = core_mask->first_cpuid;
86
1/2
✓ Branch 0 taken 38 times.
✗ Branch 1 not taken.
38 cpuid_in_core >= 0;
87 14 cpuid_in_core = mu_get_next_cpu(core_mask->set, cpuid_in_core)) {
88
89 /* Add CPU id as potentially available */
90 38 array_cpuid_t_push(&cpu_bindings, cpuid_in_core);
91
92 /* Stop adding if we reach the limit */
93
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 14 times.
38 if (++num_added_cpus == num_omp_threads_per_core) break;
94 }
95 }
96
97 /* Sort list by ownership */
98 9 qsort_r(cpu_bindings.items, cpu_bindings.count, sizeof(cpuid_t),
99 mu_cmp_cpuids_by_ownership, &process_mask);
100 9 }
101
102 3 static void set_num_threads_from_active_mask(void) {
103
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (num_omp_threads_per_core == hwthreads_per_core) {
104 2 set_num_threads_fn(CPU_COUNT(&active_mask));
105 } else {
106 cpu_set_t active_core_set;
107 1 mu_get_cores_subset_of_cpuset(&active_core_set, &active_mask);
108 1 int num_active_cores = CPU_COUNT(&active_core_set) / hwthreads_per_core;
109 1 set_num_threads_fn(num_active_cores * num_omp_threads_per_core);
110 }
111 3 }
112
113 static void cb_enable_cpu(int cpuid, void *arg) {
114 CPU_SET(cpuid, &active_mask);
115 set_num_threads_from_active_mask();
116 compute_cpu_bindings();
117 }
118
119 2 static void cb_enable_cpu_set(const cpu_set_t *cpu_set, void *arg) {
120
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 2 times.
34 CPU_OR(&active_mask, &active_mask, cpu_set);
121 2 set_num_threads_from_active_mask();
122 2 compute_cpu_bindings();
123 2 }
124
125 static void cb_disable_cpu(int cpuid, void *arg) {
126 CPU_CLR(cpuid, &active_mask);
127 set_num_threads_from_active_mask();
128 compute_cpu_bindings();
129 }
130
131 1 static void cb_disable_cpu_set(const cpu_set_t *cpu_set, void *arg) {
132 1 mu_substract(&active_mask, &active_mask, cpu_set);
133 1 set_num_threads_from_active_mask();
134 1 compute_cpu_bindings();
135 1 }
136
137 static void cb_set_process_mask(const cpu_set_t *mask, void *arg) {
138 memcpy(&process_mask, mask, sizeof(cpu_set_t));
139 memcpy(&active_mask, mask, sizeof(cpu_set_t));
140
141 /* Update number of cores in process mask */
142 cpu_set_t core_set;
143 mu_get_cores_subset_of_cpuset(&core_set, &process_mask);
144 num_cores_in_process_mask = CPU_COUNT(&core_set) / hwthreads_per_core;
145
146 set_num_threads_from_active_mask();
147 compute_cpu_bindings();
148 }
149
150 static void omptm_omp5__borrow(void) {
151 /* The "Exponential Weighted Moving Average" is an average computed
152 * with weighting factors that decrease exponentially on new samples.
153 * I.e,: Most recent values are more significant for the average */
154 typedef struct ExponentialWeightedMovingAverage {
155 float value;
156 unsigned int samples;
157 } ewma_t;
158 static ewma_t ewma = {1.0f, 1};
159 static int cpus_to_borrow = 1;
160
161 if (drom) {
162 DLB_PollDROM_Update();
163 }
164
165 if (lewi && omptool_opts & OMPTOOL_OPTS_BORROW) {
166 int err_return = DLB_Return();
167 int err_reclaim = DLB_Reclaim();
168 int err_borrow = DLB_BorrowCpus(cpus_to_borrow);
169
170 if (err_return == DLB_SUCCESS
171 || err_reclaim == DLB_SUCCESS
172 || err_borrow == DLB_SUCCESS) {
173 verbose(VB_OMPT, "Acquire - Setting new mask to %s", mu_to_str(&active_mask));
174 }
175
176 if (err_borrow == DLB_SUCCESS) {
177 cpus_to_borrow += ewma.value;
178 } else {
179 cpus_to_borrow -= ewma.value;
180 cpus_to_borrow = max_int(1, cpus_to_borrow);
181 }
182
183 /* Update Exponential Weighted Moving Average */
184 ++ewma.samples;
185 float alpha = 2.0f / (ewma.samples + 1);
186 ewma.value = ewma.value + alpha * (cpus_to_borrow - ewma.value);
187 }
188 }
189
190 2 static void omptm_omp5__lend(void) {
191
2/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 if (lewi && omptool_opts & OMPTOOL_OPTS_LEND) {
192 set_num_threads_fn(1);
193 CPU_ZERO(&active_mask);
194 CPU_SET(sched_getcpu(), &active_mask);
195 compute_cpu_bindings();
196 verbose(VB_OMPT, "Release - Setting new mask to %s", mu_to_str(&active_mask));
197 DLB_SetMaxParallelism(1);
198 }
199 2 }
200
201 void omptm_omp5__lend_from_api(void) {
202 if (lewi) {
203 set_num_threads_fn(1);
204 CPU_ZERO(&active_mask);
205 CPU_SET(sched_getcpu(), &active_mask);
206 compute_cpu_bindings();
207 verbose(VB_OMPT, "Release - Setting new mask to %s", mu_to_str(&active_mask));
208 //DLB_SetMaxParallelism(1);
209 }
210 }
211
212
213 /* Parse OMP_PLACES. For now we are only interested if the variable is equal to
214 * 'cores' or 'threads' */
215 2 static openmp_places_t parse_omp_places(void) {
216 2 const char *env_places = getenv("OMP_PLACES");
217
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (env_places) {
218
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (strcmp(env_places, "cores") == 0) {
219 1 return OPENMP_PLACE_CORES;
220
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 } else if (strcmp(env_places, "threads") == 0) {
221 1 return OPENMP_PLACE_THREADS;
222 }
223 }
224 return OPENMP_PLACE_OTHER;
225 }
226
227 /* Parse OMP_NUM_THREADS and return the product of the values in the list
228 * e.g.: OMP_NUM_THREADS=8 and OMP_NUM_THREADS=2,4 return 8 */
229 4 static int parse_omp_num_threads(int level) {
230 4 const char *env_num_threads = getenv("OMP_NUM_THREADS");
231
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (env_num_threads == NULL) return 0;
232
233 int current_level = 0;
234 int num_threads = 0;
235 size_t len = strlen(env_num_threads) + 1;
236 char *env_copy = malloc(sizeof(char)*len);
237 strcpy(env_copy, env_num_threads);
238 char *end = NULL;
239 char *token = strtok_r(env_copy, ",", &end);
240 while(token && current_level != level) {
241 int num = strtol(token, NULL, 10);
242 if (num > 0) {
243 num_threads = num_threads == 0 ? num : num_threads * num;
244 }
245 token = strtok_r(NULL, ",", &end);
246 ++current_level;
247 }
248
249 free(env_copy);
250
251 return num_threads;
252 }
253
254 2 void omptm_omp5__init(pid_t process_id, const options_t *options) {
255
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (omp_set_num_threads) {
256 2 set_num_threads_fn = omp_set_num_threads;
257 } else {
258 void *handle = dlopen("libomp.so", RTLD_LAZY | RTLD_GLOBAL);
259 if (handle == NULL) {
260 handle = dlopen("libiomp5.so", RTLD_LAZY | RTLD_GLOBAL);
261 }
262 if (handle == NULL) {
263 handle = dlopen("libgomp.so", RTLD_LAZY | RTLD_GLOBAL);
264 }
265 if (handle != NULL) {
266 set_num_threads_fn = dlsym(handle, "omp_set_num_threads");
267 }
268 fatal_cond(set_num_threads_fn == NULL, "omp_set_num_threads cannot be found");
269 }
270
271 2 lewi = options->lewi;
272 2 drom = options->drom;
273 2 omptool_opts = options->lewi_ompt;
274 2 pid = process_id;
275 2 hwthreads_per_core = mu_get_system_hwthreads_per_core();
276
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2 if (lewi || drom) {
277 int err;
278 2 err = DLB_CallbackSet(dlb_callback_enable_cpu, (dlb_callback_t)cb_enable_cpu, NULL);
279
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (err != DLB_SUCCESS) {
280 warning("DLB_CallbackSet enable_cpu: %s", DLB_Strerror(err));
281 }
282 2 err = DLB_CallbackSet(dlb_callback_enable_cpu_set, (dlb_callback_t)cb_enable_cpu_set, NULL);
283
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (err != DLB_SUCCESS) {
284 warning("DLB_CallbackSet enable_cpu_set: %s", DLB_Strerror(err));
285 }
286 2 err = DLB_CallbackSet(dlb_callback_disable_cpu, (dlb_callback_t)cb_disable_cpu, NULL);
287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (err != DLB_SUCCESS) {
288 warning("DLB_CallbackSet disable_cpu: %s", DLB_Strerror(err));
289 }
290 2 err = DLB_CallbackSet(dlb_callback_disable_cpu_set, (dlb_callback_t)cb_disable_cpu_set, NULL);
291
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (err != DLB_SUCCESS) {
292 warning("DLB_CallbackSet disable_cpu_set: %s", DLB_Strerror(err));
293 }
294 2 err = DLB_CallbackSet(dlb_callback_set_process_mask,
295 (dlb_callback_t)cb_set_process_mask, NULL);
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (err != DLB_SUCCESS) {
297 warning("DLB_CallbackSet set_process_mask: %s", DLB_Strerror(err));
298 }
299
300 /* Get process mask */
301 2 shmem_procinfo__getprocessmask(pid, &process_mask, 0);
302 2 memcpy(&active_mask, &process_mask, sizeof(cpu_set_t));
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 verbose(VB_OMPT, "Initial mask set to: %s", mu_to_str(&process_mask));
304
305 /* Update number of cores in process mask */
306 cpu_set_t core_set;
307 2 mu_get_cores_subset_of_cpuset(&core_set, &process_mask);
308 2 num_cores_in_process_mask = CPU_COUNT(&core_set) / hwthreads_per_core;
309
310 /* Best effort trying to compute OMP threads per core */
311 2 openmp_places_t openmp_places = parse_omp_places();
312 2 num_initial_omp_threads = parse_omp_num_threads(INT_MAX);
313 2 num_initial_omp_threads_level1 = parse_omp_num_threads(1);
314
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (openmp_places == OPENMP_PLACE_CORES) {
315 1 num_omp_threads_per_core = 1;
316
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 } else if (openmp_places == OPENMP_PLACE_THREADS) {
317 1 num_omp_threads_per_core = hwthreads_per_core;
318 } else if ( num_initial_omp_threads > 0) {
319 num_omp_threads_per_core = max_int(
320 1, num_initial_omp_threads / num_cores_in_process_mask);
321 } else {
322 num_omp_threads_per_core = 1;
323 }
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 verbose(VB_OMPT, "hwthreads per core: %d, omp threads per core: %d",
325 hwthreads_per_core, num_omp_threads_per_core);
326
327 2 array_cpuid_t_init(&cpu_bindings, 8);
328 2 compute_cpu_bindings();
329
330 2 omptm_omp5__lend();
331 }
332 2 }
333
334 2 void omptm_omp5__finalize(void) {
335 2 }
336
337
338 /* lb_funcs.into_blocking_call has already been called and
339 * the current CPU will be lent according to the --lew-mpi option
340 * This function just lends the rest of the CPUs
341 */
342 void omptm_omp5__IntoBlockingCall(void) {
343 if (lewi) {
344 int mycpu = sched_getcpu();
345
346 /* Lend every CPU except the current one */
347 cpu_set_t mask;
348 memcpy(&mask, &active_mask, sizeof(cpu_set_t));
349 CPU_CLR(mycpu, &mask);
350 DLB_LendCpuMask(&mask);
351
352 /* Set active_mask to only the current CPU */
353 CPU_ZERO(&active_mask);
354 CPU_SET(mycpu, &active_mask);
355 set_num_threads_fn(1);
356 compute_cpu_bindings();
357
358 verbose(VB_OMPT, "IntoBlockingCall - lending all");
359 }
360 }
361
362 void omptm_omp5__OutOfBlockingCall(void) {
363 if (lewi) {
364 DLB_Reclaim();
365
366 /* Return to the initial number of threads */
367 set_num_threads_fn(num_initial_omp_threads_level1);
368 }
369 }
370
371
372 void omptm_omp5__parallel_begin(omptool_parallel_data_t *parallel_data) {
373 if (parallel_data->level == 1) {
374 omptm_omp5__borrow();
375 }
376 }
377
378 void omptm_omp5__parallel_end(omptool_parallel_data_t *parallel_data) {
379 if (parallel_data->level == 1) {
380 omptm_omp5__lend();
381 }
382 }
383
384 static inline int get_thread_binding(int index) {
385 return cpu_bindings.items[index % cpu_bindings.count];
386 }
387
388 void omptm_omp5__into_parallel_function(
389 omptool_parallel_data_t *parallel_data, unsigned int index) {
390 if (parallel_data->level == 1) {
391 int cpuid = get_thread_binding(index);
392 int current_cpuid = sched_getcpu();
393 if (cpuid >=0 && cpuid != current_cpuid) {
394 cpu_set_t thread_mask;
395 CPU_ZERO(&thread_mask);
396 CPU_SET(cpuid, &thread_mask);
397 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &thread_mask);
398 instrument_event(REBIND_EVENT, cpuid+1, EVENT_BEGIN);
399 verbose(VB_OMPT, "Rebinding thread %d to CPU %d", index, cpuid);
400 }
401 }
402 }
403
404 void omptm_omp5__into_parallel_implicit_barrier(omptool_parallel_data_t *parallel_data) {
405 if (parallel_data->level == 1) {
406 /* A thread participating in the level 1 parallel region
407 * has reached the implicit barrier */
408 instrument_event(REBIND_EVENT, 0, EVENT_END);
409 }
410 }
411
412
413 /*********************************************************************************/
414 /* Functions for testing purposes */
415 /*********************************************************************************/
416
417 const cpu_set_t* omptm_omp5_testing__get_active_mask(void) {
418 return &active_mask;
419 }
420
421 4 const array_cpuid_t* omptm_omp5_testing__compute_and_get_cpu_bindings(void) {
422
423 4 compute_cpu_bindings();
424 4 return &cpu_bindings;
425 }
426
427 2 void omptm_omp5_testing__set_num_threads_fn(void (*fn)(int)) {
428 2 set_num_threads_fn = fn;
429 2 }
430