GCC Code Coverage Report


Directory: src/
File: src/LB_policies/lewi_async.c
Date: 2025-11-21 10:34:40
Exec Total Coverage
Lines: 104 119 87.4%
Functions: 14 14 100.0%
Branches: 36 66 54.5%

Line Branch Exec Source
1 /*********************************************************************************/
2 /* Copyright 2009-2024 Barcelona Supercomputing Center */
3 /* */
4 /* This file is part of the DLB library. */
5 /* */
6 /* DLB is free software: you can redistribute it and/or modify */
7 /* it under the terms of the GNU Lesser General Public License as published by */
8 /* the Free Software Foundation, either version 3 of the License, or */
9 /* (at your option) any later version. */
10 /* */
11 /* DLB is distributed in the hope that it will be useful, */
12 /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
13 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
14 /* GNU Lesser General Public License for more details. */
15 /* */
16 /* You should have received a copy of the GNU Lesser General Public License */
17 /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */
18 /*********************************************************************************/
19
20
21 #include "LB_policies/lewi_async.h"
22
23 #include "LB_core/spd.h"
24 #include "apis/dlb_errors.h"
25 #include "LB_comm/shmem_lewi_async.h"
26 #include "LB_comm/shmem_async.h"
27 #include "LB_numThreads/numThreads.h"
28 #include "support/debug.h"
29 #include "support/mask_utils.h"
30 #include "support/options.h"
31 #include "support/queues.h"
32 #include "support/small_array.h"
33 #include "support/types.h"
34
35 #include <sched.h>
36 #include <limits.h>
37
38
39 typedef struct LeWI_async_info {
40 unsigned int prev_requested;
41 } lewi_info_t;
42
43 static int node_size;
44
45 81 static void lewi_async_set_num_threads(const pm_interface_t *pm, unsigned int num_threads) {
46 81 int signed_num_threads = (int)num_threads;
47
1/2
✓ Branch 0 taken 81 times.
✗ Branch 1 not taken.
81 if (signed_num_threads >= 0) {
48
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81 times.
81 verbose(VB_MICROLB, "Using %d cpus", signed_num_threads);
49 81 update_threads(pm, signed_num_threads);
50 }
51 81 }
52
53 /* Helper Lend function, Lend everything except new_ncpus */
54 18 static int lewi_async_Lend_keep_cpus(const subprocess_descriptor_t *spd,
55 unsigned int new_ncpus) {
56
57 /* Lend CPUs to the shmem, obtain potential requests */
58 unsigned int num_requests;
59 unsigned int prev_requested;
60 18 SMALL_ARRAY(lewi_request_t, requests, node_size);
61 18 int error = shmem_lewi_async__lend_keep_cpus(spd->id, new_ncpus,
62 requests, &num_requests, node_size, &prev_requested);
63
64
1/2
✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
18 if (error == DLB_SUCCESS) {
65
66 /* Save previously requested CPUs to push request again later */
67
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 4 times.
18 if (prev_requested > 0 ) {
68 14 lewi_info_t *lewi_info = spd->lewi_info;
69 14 lewi_info->prev_requested = prev_requested;
70 }
71
72 /* Set num_threads = new_ncpus */
73 18 lewi_async_set_num_threads(&spd->pm, new_ncpus);
74
75 /* Trigger other processes's helper thread */
76
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 18 times.
36 for (unsigned int i=0; i<num_requests; ++i) {
77 18 shmem_async_set_num_cpus(requests[i].pid, requests[i].howmany);
78 }
79 }
80
81 18 return error;
82 }
83
84
85 10 int lewi_async_Init(subprocess_descriptor_t *spd) {
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 verbose(VB_MICROLB, "LeWI Init");
87
88 10 unsigned int initial_ncpus = spd->lewi_ncpus;
89 10 node_size = mu_get_system_size();
90
91 10 spd->lewi_info = malloc(sizeof(lewi_info_t));
92 10 lewi_info_t *lewi_info = spd->lewi_info;
93 10 lewi_info->prev_requested = 0;
94
95 10 lewi_async_set_num_threads(&spd->pm, initial_ncpus);
96
97 10 info0("Default cpus per process: %d", initial_ncpus);
98
99 // Initialize shared memory
100 10 shmem_lewi_async__init(spd->id, initial_ncpus, spd->options.shm_key,
101 spd->options.shm_size_multiplier);
102
103 10 return DLB_SUCCESS;
104 }
105
106 10 int lewi_async_Finalize(subprocess_descriptor_t *spd) {
107
108 unsigned int new_ncpus;
109 unsigned int num_requests;
110 10 SMALL_ARRAY(lewi_request_t, requests, node_size);
111 10 shmem_lewi_async__finalize(spd->id, &new_ncpus, requests,
112 &num_requests, node_size);
113
114 /* Set num_threads = new_ncpus */
115 10 lewi_async_set_num_threads(&spd->pm, new_ncpus);
116
117 /* Trigger other processes's helper thread */
118
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 for (unsigned int i=0; i<num_requests; ++i) {
119 shmem_async_set_num_cpus(requests[i].pid, requests[i].howmany);
120 }
121
122 10 free(spd->lewi_info);
123 10 spd->lewi_info = NULL;
124
125 10 return DLB_SUCCESS;
126 }
127
128 2 int lewi_async_Enable(const subprocess_descriptor_t *spd) {
129
130 2 int error = DLB_SUCCESS;
131
132 /* Restore previously requested CPUs */
133 2 lewi_info_t *lewi_info = spd->lewi_info;
134
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (lewi_info->prev_requested > 0) {
135 error = lewi_async_AcquireCpus(spd, lewi_info->prev_requested);
136 if (error == DLB_SUCCESS || error == DLB_NOTED) {
137 lewi_info->prev_requested = 0;
138 }
139 }
140
141 2 return error;
142 }
143
144 2 int lewi_async_Disable(const subprocess_descriptor_t *spd) {
145
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 verbose(VB_MICROLB, "Reset LeWI");
146
147 unsigned int new_ncpus;
148 unsigned int num_requests;
149 unsigned int prev_requested;
150 2 SMALL_ARRAY(lewi_request_t, requests, node_size);
151 2 int error = shmem_lewi_async__reset(spd->id, &new_ncpus, requests,
152 &num_requests, node_size, &prev_requested);
153
154
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (error == DLB_SUCCESS) {
155
156 /* Save previously requested CPUs to push request again later */
157 if (prev_requested > 0 ) {
158 lewi_info_t *lewi_info = spd->lewi_info;
159 lewi_info->prev_requested = prev_requested;
160 }
161
162 /* Set num_threads = new_ncpus */
163 lewi_async_set_num_threads(&spd->pm, new_ncpus);
164
165 /* Trigger other processes's helper thread */
166 for (unsigned int i=0; i<num_requests; ++i) {
167 shmem_async_set_num_cpus(requests[i].pid, requests[i].howmany);
168 }
169 }
170
171
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 return error == DLB_NOUPDT ? DLB_SUCCESS : error;
172 }
173
174 16 int lewi_async_IntoBlockingCall(const subprocess_descriptor_t *spd) {
175
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
16 if (spd->options.lewi_keep_cpu_on_blocking_call) {
176 /* 1CPU */
177 lewi_async_Lend_keep_cpus(spd, 1);
178 } else {
179 /* BLOCK */
180 16 lewi_async_Lend_keep_cpus(spd, 0);
181 }
182 16 return DLB_SUCCESS;
183 }
184
185 16 int lewi_async_OutOfBlockingCall(const subprocess_descriptor_t *spd) {
186 16 return lewi_async_Reclaim(spd);
187 }
188
189
190 /* Lend everything except 1 CPU */
191 2 int lewi_async_Lend(const subprocess_descriptor_t *spd) {
192 2 return lewi_async_Lend_keep_cpus(spd, 1);
193 }
194
195 /* Lend ncpus */
196 7 int lewi_async_LendCpus(const subprocess_descriptor_t *spd, int ncpus) {
197
198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (ncpus < 0) return DLB_ERR_PERM;
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (ncpus == 0) return DLB_NOUPDT;
200
201 /* Lend CPUs to the shmem, obtain potential requests */
202 unsigned int new_ncpus;
203 unsigned int num_requests;
204 unsigned int prev_requested;
205 7 SMALL_ARRAY(lewi_request_t, requests, node_size);
206 7 int error = shmem_lewi_async__lend_cpus(spd->id, ncpus, &new_ncpus,
207 requests, &num_requests, node_size, &prev_requested);
208
209
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7 if (error == DLB_SUCCESS) {
210
211 /* Save previously requested CPUs to push request again later */
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (prev_requested > 0 ) {
213 lewi_info_t *lewi_info = spd->lewi_info;
214 lewi_info->prev_requested = prev_requested;
215 }
216
217 /* Set num_threads = new_ncpus */
218 7 lewi_async_set_num_threads(&spd->pm, new_ncpus);
219
220 /* Trigger other processes's helper thread */
221
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 7 times.
13 for (unsigned int i=0; i<num_requests; ++i) {
222 6 shmem_async_set_num_cpus(requests[i].pid, requests[i].howmany);
223 }
224 }
225
226 7 return error;
227 }
228
229 22 int lewi_async_Reclaim(const subprocess_descriptor_t *spd) {
230 22 lewi_info_t *lewi_info = spd->lewi_info;
231 unsigned int new_ncpus;
232 unsigned int num_requests;
233 22 SMALL_ARRAY(lewi_request_t, requests, node_size);
234 22 int error = shmem_lewi_async__reclaim(spd->id, &new_ncpus, requests,
235 &num_requests, node_size, lewi_info->prev_requested);
236
2/2
✓ Branch 0 taken 21 times.
✓ Branch 1 taken 1 times.
22 if (error == DLB_SUCCESS) {
237
238 /* Reset saved request */
239 21 lewi_info->prev_requested = 0;
240
241 /* Set num_threads = new_ncpus */
242 21 lewi_async_set_num_threads(&spd->pm, new_ncpus);
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 21 times.
21 verbose(VB_MICROLB, "ACQUIRING %d cpus", new_ncpus);
244
245 /* Trigger other processes's helper thread */
246
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 21 times.
35 for (unsigned int i=0; i<num_requests; ++i) {
247 14 shmem_async_set_num_cpus(requests[i].pid, requests[i].howmany);
248 }
249 }
250 22 return error;
251 }
252
253 24 int lewi_async_AcquireCpus(const subprocess_descriptor_t *spd, int ncpus) {
254 24 int error = DLB_ERR_UNKNOWN;
255 unsigned int new_ncpus;
256 24 unsigned int num_requests = 0;
257 24 SMALL_ARRAY(lewi_request_t, requests, node_size);
258
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 13 times.
24 if (ncpus == DLB_DELETE_REQUESTS) {
259 /* If ncpus is special value, remove previous requests */
260 11 shmem_lewi_async__remove_requests(spd->id);
261
262 /* Remove also local data */
263 11 lewi_info_t *lewi_info = spd->lewi_info;
264 11 lewi_info->prev_requested = 0;
265
266 11 error = DLB_SUCCESS;
267
1/2
✓ Branch 0 taken 13 times.
✗ Branch 1 not taken.
13 } else if (ncpus > 0) {
268
269 /* If there are saved requests and the new one is positive, add them */
270 13 lewi_info_t *lewi_info = spd->lewi_info;
271
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 if (lewi_info->prev_requested > 0) {
272 ncpus += lewi_info->prev_requested;
273 lewi_info->prev_requested = 0;
274 }
275
276 /* Otherwise, borrow as usual, and request remaining CPUs */
277 13 error = shmem_lewi_async__acquire_cpus(spd->id, ncpus, &new_ncpus,
278 requests, &num_requests, node_size);
279
280
2/4
✓ Branch 0 taken 13 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
13 if (error == DLB_SUCCESS || error == DLB_NOTED) {
281
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 verbose(VB_MICROLB, "Using %d cpus", new_ncpus);
282 13 lewi_async_set_num_threads(&spd->pm, new_ncpus);
283
284 /* Trigger other processes's helper thread */
285
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 13 times.
15 for (unsigned int i=0; i<num_requests; ++i) {
286 2 shmem_async_set_num_cpus(requests[i].pid, requests[i].howmany);
287 }
288 }
289 }
290
291 24 return error;
292 }
293
294 1 int lewi_async_Borrow(const subprocess_descriptor_t *spd) {
295 1 return lewi_async_BorrowCpus(spd, INT_MAX);
296 }
297
298 2 int lewi_async_BorrowCpus(const subprocess_descriptor_t *spd, int ncpus) {
299 unsigned int new_ncpus;
300 2 int error = shmem_lewi_async__borrow_cpus(spd->id, ncpus, &new_ncpus);
301
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (error == DLB_SUCCESS) {
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 verbose(VB_MICROLB, "Using %d cpus", new_ncpus);
303 2 lewi_async_set_num_threads(&spd->pm, new_ncpus);
304 }
305 2 return error;
306 }
307