Line | Branch | Exec | Source |
---|---|---|---|
1 | /*********************************************************************************/ | ||
2 | /* Copyright 2009-2024 Barcelona Supercomputing Center */ | ||
3 | /* */ | ||
4 | /* This file is part of the DLB library. */ | ||
5 | /* */ | ||
6 | /* DLB is free software: you can redistribute it and/or modify */ | ||
7 | /* it under the terms of the GNU Lesser General Public License as published by */ | ||
8 | /* the Free Software Foundation, either version 3 of the License, or */ | ||
9 | /* (at your option) any later version. */ | ||
10 | /* */ | ||
11 | /* DLB is distributed in the hope that it will be useful, */ | ||
12 | /* but WITHOUT ANY WARRANTY; without even the implied warranty of */ | ||
13 | /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ | ||
14 | /* GNU Lesser General Public License for more details. */ | ||
15 | /* */ | ||
16 | /* You should have received a copy of the GNU Lesser General Public License */ | ||
17 | /* along with DLB. If not, see <https://www.gnu.org/licenses/>. */ | ||
18 | /*********************************************************************************/ | ||
19 | |||
20 | #include "LB_comm/shmem_async.h" | ||
21 | |||
22 | #include "LB_core/spd.h" | ||
23 | #include "LB_comm/shmem.h" | ||
24 | #include "LB_comm/shmem_cpuinfo.h" | ||
25 | #include "LB_comm/shmem_procinfo.h" | ||
26 | #include "LB_numThreads/numThreads.h" | ||
27 | #include "apis/dlb_errors.h" | ||
28 | #include "support/mask_utils.h" | ||
29 | #include "support/debug.h" | ||
30 | |||
31 | #include <sched.h> | ||
32 | #include <unistd.h> | ||
33 | #include <errno.h> | ||
34 | #include <pthread.h> | ||
35 | #include <string.h> | ||
36 | |||
37 | enum { NOBODY = 0 }; | ||
38 | enum { QUEUE_SIZE = 100 }; | ||
39 | |||
40 | typedef enum HelperAction { | ||
41 | ACTION_NONE = 0, | ||
42 | ACTION_ENABLE_CPU, | ||
43 | ACTION_ENABLE_CPU_SET, | ||
44 | ACTION_DISABLE_CPU, | ||
45 | ACTION_DISABLE_CPU_SET, | ||
46 | ACTION_SET_CPU_SET, | ||
47 | ACTION_SET_NUM_CPUS, | ||
48 | ACTION_JOIN | ||
49 | } action_t; | ||
50 | |||
51 | typedef enum HelperStatus { | ||
52 | HELPER_UNKOWN_STATUS = 0, | ||
53 | HELPER_WAITING, | ||
54 | HELPER_BUSY, | ||
55 | } status_t; | ||
56 | |||
57 | typedef struct Message { | ||
58 | action_t action; | ||
59 | int cpuid; | ||
60 | int ncpus; | ||
61 | cpu_set_t cpu_set; | ||
62 | } message_t; | ||
63 | |||
64 | typedef struct { | ||
65 | /* Queue attributes */ | ||
66 | message_t queue[QUEUE_SIZE]; | ||
67 | unsigned int q_head; | ||
68 | unsigned int q_tail; | ||
69 | pthread_mutex_t q_lock; | ||
70 | pthread_cond_t q_wait_data; | ||
71 | |||
72 | /* Helper metadata */ | ||
73 | pid_t pid; | ||
74 | pthread_t pth; | ||
75 | cpu_set_t mask; | ||
76 | const pm_interface_t *pm; | ||
77 | bool joinable; | ||
78 | status_t status; | ||
79 | } helper_t; | ||
80 | |||
81 | |||
82 | typedef struct { | ||
83 | helper_t helpers[0]; | ||
84 | } shdata_t; | ||
85 | |||
86 | enum { SHMEM_ASYNC_VERSION = 4 }; | ||
87 | |||
88 | static int max_helpers = 0; | ||
89 | static shdata_t *shdata = NULL; | ||
90 | static const char *shmem_name = "async"; | ||
91 | static shmem_handler_t *shm_handler = NULL; | ||
92 | static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; | ||
93 | static int subprocesses_attached = 0; | ||
94 | |||
95 | 256 | static helper_t* get_helper(pid_t pid) { | |
96 | int h; | ||
97 |
2/4✓ Branch 0 taken 441 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 441 times.
✗ Branch 3 not taken.
|
441 | for (h = 0; shdata && h < max_helpers; ++h) { |
98 |
2/2✓ Branch 0 taken 256 times.
✓ Branch 1 taken 185 times.
|
441 | if (shdata->helpers[h].pid == pid) { |
99 | 256 | return &shdata->helpers[h]; | |
100 | } | ||
101 | } | ||
102 | ✗ | return NULL; | |
103 | } | ||
104 | |||
105 | 160 | static void enqueue_message(helper_t *helper, const message_t *message) { | |
106 | /* Discard message if helper does not accept new inputs */ | ||
107 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 160 times.
|
160 | if (helper->joinable) { |
108 | ✗ | return; | |
109 | } | ||
110 | |||
111 | 160 | pthread_mutex_lock(&helper->q_lock); | |
112 | |||
113 | /* If ACTION_JOIN, flag helper to reject further messages */ | ||
114 |
2/2✓ Branch 0 taken 17 times.
✓ Branch 1 taken 143 times.
|
160 | if (message->action == ACTION_JOIN) { |
115 | 17 | helper->joinable = true; | |
116 | } | ||
117 | |||
118 | /* Get next_head index and check that buffer is not full */ | ||
119 | 160 | unsigned int next_head = (helper->q_head + 1) % QUEUE_SIZE; | |
120 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 160 times.
|
160 | if (__builtin_expect((next_head == helper->q_tail), 0)) { |
121 | ✗ | pthread_mutex_unlock(&helper->q_lock); | |
122 | ✗ | fatal("Max petitions requested for asynchronous thread"); | |
123 | } | ||
124 | |||
125 | /* Enqueue message and update head */ | ||
126 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 154 times.
|
160 | verbose(VB_ASYNC, "Writing message %d, head is now %d", helper->q_head, next_head); |
127 | 160 | helper->queue[helper->q_head] = *message; | |
128 | 160 | helper->q_head = next_head; | |
129 | |||
130 | /* Signal helper */ | ||
131 | 160 | pthread_cond_signal(&helper->q_wait_data); | |
132 | |||
133 | 160 | pthread_mutex_unlock(&helper->q_lock); | |
134 | } | ||
135 | |||
136 | 160 | static void dequeue_message(helper_t *helper, message_t *message) { | |
137 | 160 | pthread_mutex_lock(&helper->q_lock); | |
138 | |||
139 | /* Block until there's some message in the queue */ | ||
140 | 160 | helper->status = HELPER_WAITING; | |
141 |
2/2✓ Branch 0 taken 56 times.
✓ Branch 1 taken 160 times.
|
216 | while (helper->q_head == helper->q_tail) { |
142 | 56 | pthread_cond_wait(&helper->q_wait_data, &helper->q_lock); | |
143 | } | ||
144 | 160 | helper->status = HELPER_BUSY; | |
145 | |||
146 | /* Dequeue message and update tail */ | ||
147 | 160 | unsigned int next_tail = (helper->q_tail + 1) % QUEUE_SIZE; | |
148 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 154 times.
|
160 | verbose(VB_ASYNC, "Reading message %d, tail is now %d", helper->q_tail, next_tail); |
149 | 160 | *message = helper->queue[helper->q_tail]; | |
150 | 160 | helper->q_tail = next_tail; | |
151 | |||
152 | 160 | pthread_mutex_unlock(&helper->q_lock); | |
153 | 160 | } | |
154 | |||
155 | 17 | static void* thread_start(void *arg) { | |
156 | 17 | spd_enter_dlb(thread_spd); | |
157 | 17 | helper_t *helper = arg; | |
158 | 17 | const pm_interface_t* const pm = helper->pm; | |
159 | 17 | pthread_setaffinity_np(helper->pth, sizeof(cpu_set_t), &helper->mask); | |
160 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
|
17 | verbose(VB_ASYNC, "Helper thread started, pinned to %s", mu_to_str(&helper->mask)); |
161 | |||
162 | 17 | bool join = false; | |
163 |
3/4✓ Branch 0 taken 160 times.
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 17 times.
|
177 | while(!join || helper->q_head != helper->q_tail) { |
164 | message_t message; | ||
165 | 160 | dequeue_message(helper, &message); | |
166 | |||
167 | 160 | int error = 0; | |
168 |
4/9✗ Branch 0 not taken.
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 40 times.
✓ Branch 7 taken 17 times.
✗ Branch 8 not taken.
|
160 | switch(message.action) { |
169 | ✗ | case ACTION_NONE: | |
170 | ✗ | verbose(VB_ASYNC, "Helper thread attending petition: NONE"); | |
171 | ✗ | break; | |
172 | 101 | case ACTION_ENABLE_CPU: | |
173 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 101 times.
|
101 | verbose(VB_ASYNC, |
174 | "Helper thread attending petition: ENABLE %d", | ||
175 | message.cpuid); | ||
176 | 101 | error = enable_cpu(pm, message.cpuid); | |
177 | 101 | break; | |
178 | ✗ | case ACTION_ENABLE_CPU_SET: | |
179 | ✗ | verbose(VB_ASYNC, "Helper thread attending petition: ENABLE_CPU_SET %s", | |
180 | mu_to_str(&message.cpu_set)); | ||
181 | ✗ | error = enable_cpu_set(pm, &message.cpu_set); | |
182 | ✗ | break; | |
183 | 2 | case ACTION_DISABLE_CPU: | |
184 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | verbose(VB_ASYNC, "Helper thread attending petition: DISABLE %d", |
185 | message.cpuid); | ||
186 | 2 | error = disable_cpu(pm, message.cpuid); | |
187 | 2 | break; | |
188 | ✗ | case ACTION_DISABLE_CPU_SET: | |
189 | ✗ | verbose(VB_ASYNC, "Helper thread attending petition: DISABLE_CPU_SET %s", | |
190 | mu_to_str(&message.cpu_set)); | ||
191 | ✗ | error = disable_cpu_set(pm, &message.cpu_set); | |
192 | ✗ | break; | |
193 | ✗ | case ACTION_SET_CPU_SET: | |
194 | ✗ | break; | |
195 | 40 | case ACTION_SET_NUM_CPUS: | |
196 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | verbose(VB_ASYNC, "Helper thread attending petition: SET_NUM_CPUS %d", |
197 | message.ncpus); | ||
198 | 40 | error = update_threads(pm, message.ncpus); | |
199 | 40 | break; | |
200 | 17 | case ACTION_JOIN: | |
201 | 17 | join = true; | |
202 | 17 | break; | |
203 | } | ||
204 | if (error) { | ||
205 | // error ? | ||
206 | } | ||
207 | } | ||
208 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
|
17 | verbose(VB_ASYNC, "Helper thread finalizing"); |
209 | 17 | return NULL; | |
210 | } | ||
211 | |||
212 | 1 | static void cleanup_shmem(void *shdata_ptr, int pid) { | |
213 | 1 | shdata_t *shared_data = shdata_ptr; | |
214 | int h; | ||
215 |
2/2✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
|
9 | for (h = 0; h < max_helpers; ++h) { |
216 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7 times.
|
8 | if (shared_data->helpers[h].pid == pid) { |
217 | 1 | shared_data->helpers[h] = (const helper_t){}; | |
218 | } | ||
219 | } | ||
220 | 1 | } | |
221 | |||
222 | 17 | int shmem_async_init(pid_t pid, const pm_interface_t *pm, const cpu_set_t *process_mask, | |
223 | const char *shmem_key) { | ||
224 | 17 | int error = DLB_ERR_UNKNOWN; | |
225 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
|
17 | verbose(VB_ASYNC, "Creating helper thread"); |
226 | |||
227 | // Shared memory creation | ||
228 | 17 | pthread_mutex_lock(&mutex); | |
229 | { | ||
230 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 8 times.
|
17 | if (shm_handler == NULL) { |
231 | 9 | max_helpers = mu_get_system_size(); | |
232 | 18 | shm_handler = shmem_init((void**)&shdata, | |
233 | 9 | &(const shmem_props_t) { | |
234 | 9 | .size = shmem_async__size(), | |
235 | .name = shmem_name, | ||
236 | .key = shmem_key, | ||
237 | .version = SHMEM_ASYNC_VERSION, | ||
238 | .cleanup_fn = cleanup_shmem, | ||
239 | }); | ||
240 | 9 | subprocesses_attached = 1; | |
241 | } else { | ||
242 | 8 | ++subprocesses_attached; | |
243 | } | ||
244 | } | ||
245 | 17 | pthread_mutex_unlock(&mutex); | |
246 | |||
247 | 17 | helper_t *helper = NULL; | |
248 | // Lock shmem to register new subprocess | ||
249 | 17 | shmem_lock(shm_handler); | |
250 | { | ||
251 | int h; | ||
252 |
1/2✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
|
31 | for (h = 0; h < max_helpers; ++h) { |
253 | // Register helper | ||
254 |
2/2✓ Branch 0 taken 17 times.
✓ Branch 1 taken 14 times.
|
31 | if (shdata->helpers[h].pid == NOBODY) { |
255 | 17 | helper = &shdata->helpers[h]; | |
256 | |||
257 | /* Initialize queue structure */ | ||
258 | 17 | memset(helper->queue, 0, sizeof(helper->queue)); | |
259 | 17 | helper->q_head = 0; | |
260 | 17 | helper->q_tail = 0; | |
261 | |||
262 | /* Initialize queue sync attributes */ | ||
263 | pthread_mutexattr_t mutex_attr; | ||
264 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_mutexattr_init(&mutex_attr) ); |
265 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_mutexattr_setpshared(&mutex_attr, |
266 | PTHREAD_PROCESS_SHARED) ); | ||
267 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_mutex_init(&helper->q_lock, &mutex_attr) ); |
268 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_mutexattr_destroy(&mutex_attr) ); |
269 | pthread_condattr_t cond_attr; | ||
270 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_condattr_init(&cond_attr) ); |
271 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_condattr_setpshared(&cond_attr, |
272 | PTHREAD_PROCESS_SHARED) ); | ||
273 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_cond_init(&helper->q_wait_data, &cond_attr) ); |
274 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | fatal_cond_strerror( pthread_condattr_destroy(&cond_attr) ); |
275 | |||
276 | // Initialize helper metadata and create thread | ||
277 | 17 | helper->pm = pm; | |
278 | 17 | helper->pid = pid; | |
279 | 17 | memcpy(&helper->mask, process_mask, sizeof(cpu_set_t)); | |
280 | 17 | pthread_create(&helper->pth, NULL, thread_start, (void*)helper); | |
281 | |||
282 | 17 | error = DLB_SUCCESS; | |
283 | 17 | break; | |
284 | } | ||
285 | } | ||
286 | } | ||
287 | 17 | shmem_unlock(shm_handler); | |
288 | |||
289 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
|
17 | if (helper == NULL) { |
290 | ✗ | error = DLB_ERR_NOMEM; | |
291 | ✗ | warn_error(DLB_ERR_NOMEM); | |
292 | } | ||
293 | |||
294 | 17 | return error; | |
295 | } | ||
296 | |||
297 | 17 | int shmem_async_finalize(pid_t pid) { | |
298 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
|
17 | verbose(VB_ASYNC, "Finalizing helper thread for pid: %d", pid); |
299 | 17 | helper_t *helper = get_helper(pid); | |
300 |
1/2✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
|
17 | if (helper) { |
301 | /* Enqueue JOIN message */ | ||
302 | 17 | message_t message = { .action = ACTION_JOIN }; | |
303 | 17 | enqueue_message(helper, &message); | |
304 | |||
305 | /* Wait helper thread to finish */ | ||
306 | 17 | pthread_join(helper->pth, NULL); | |
307 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 12 times.
|
17 | verbose(VB_ASYNC, "Helper thread joined"); |
308 | |||
309 | /* Clear helper data */ | ||
310 | 17 | shmem_lock(shm_handler); | |
311 | { | ||
312 | 17 | pthread_mutex_destroy(&helper->q_lock); | |
313 | 17 | pthread_cond_destroy(&helper->q_wait_data); | |
314 | 17 | memset(helper, 0, sizeof(*helper)); | |
315 | } | ||
316 | 17 | shmem_unlock(shm_handler); | |
317 | |||
318 | /* Shared memory destruction */ | ||
319 | 17 | pthread_mutex_lock(&mutex); | |
320 | { | ||
321 |
2/2✓ Branch 0 taken 9 times.
✓ Branch 1 taken 8 times.
|
17 | if (--subprocesses_attached == 0) { |
322 | 9 | shmem_finalize(shm_handler, NULL /* do not check if empty */); | |
323 | 9 | shm_handler = NULL; | |
324 | 9 | shdata = NULL; | |
325 | } | ||
326 | } | ||
327 | 17 | pthread_mutex_unlock(&mutex); | |
328 | } | ||
329 | |||
330 |
1/2✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
|
17 | return helper ? DLB_SUCCESS : DLB_ERR_NOPROC; |
331 | } | ||
332 | |||
333 | |||
334 | 101 | void shmem_async_enable_cpu(pid_t pid, int cpuid) { | |
335 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 101 times.
|
101 | verbose(VB_ASYNC, "Enqueuing petition for pid: %d, enable cpuid %d", pid, cpuid); |
336 | 101 | helper_t *helper = get_helper(pid); | |
337 |
1/2✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
|
101 | if (helper) { |
338 | 101 | message_t message = { .action = ACTION_ENABLE_CPU, .cpuid = cpuid }; | |
339 | 101 | enqueue_message(helper, &message); | |
340 | } | ||
341 | 101 | } | |
342 | |||
343 | ✗ | void shmem_async_enable_cpu_set(pid_t pid, const cpu_set_t *cpu_set) { | |
344 | ✗ | verbose(VB_ASYNC, "Enqueuing petition for pid: %d, enable cpu set %s", | |
345 | pid, mu_to_str(cpu_set)); | ||
346 | ✗ | helper_t *helper = get_helper(pid); | |
347 | ✗ | if (helper) { | |
348 | ✗ | message_t message = { .action = ACTION_ENABLE_CPU_SET, .cpu_set = *cpu_set }; | |
349 | ✗ | enqueue_message(helper, &message); | |
350 | } | ||
351 | } | ||
352 | |||
353 | 2 | void shmem_async_disable_cpu(pid_t pid, int cpuid) { | |
354 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | verbose(VB_ASYNC, "Enqueuing petition for pid: %d, disable cpuid %d", pid, cpuid); |
355 | 2 | helper_t *helper = get_helper(pid); | |
356 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if (helper) { |
357 | 2 | message_t message = { .action = ACTION_DISABLE_CPU, .cpuid = cpuid }; | |
358 | 2 | enqueue_message(helper, &message); | |
359 | } | ||
360 | 2 | } | |
361 | |||
362 | ✗ | void shmem_async_disable_cpu_set(pid_t pid, const cpu_set_t *cpu_set) { | |
363 | ✗ | verbose(VB_ASYNC, "Enqueuing petition for pid: %d, disable cpu set %s", | |
364 | pid, mu_to_str(cpu_set)); | ||
365 | ✗ | helper_t *helper = get_helper(pid); | |
366 | ✗ | if (helper) { | |
367 | ✗ | message_t message = { .action = ACTION_DISABLE_CPU_SET, .cpu_set = *cpu_set }; | |
368 | ✗ | enqueue_message(helper, &message); | |
369 | } | ||
370 | } | ||
371 | |||
372 | 40 | void shmem_async_set_num_cpus(pid_t pid, int ncpus) { | |
373 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | verbose(VB_ASYNC, "Enqueuing petition for pid: %d, set num CPUs %d", pid, ncpus); |
374 | 40 | helper_t *helper = get_helper(pid); | |
375 |
1/2✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
|
40 | if (helper) { |
376 | 40 | message_t message = { .action = ACTION_SET_NUM_CPUS, .ncpus = ncpus }; | |
377 | 40 | enqueue_message(helper, &message); | |
378 | } | ||
379 | 40 | } | |
380 | |||
381 | 1 | int shmem_async__version(void) { | |
382 | 1 | return SHMEM_ASYNC_VERSION; | |
383 | } | ||
384 | 10 | size_t shmem_async__size(void) { | |
385 | 10 | return sizeof(shdata_t) + sizeof(helper_t)*mu_get_system_size(); | |
386 | } | ||
387 | |||
388 | /* Only for testing purposes. Block current thread until helper thread | ||
389 | * with the given pid has finished its pending requests */ | ||
390 | 96 | void shmem_async_wait_for_completion(pid_t pid) { | |
391 | 96 | helper_t *helper = get_helper(pid); | |
392 | while(1) { | ||
393 |
1/2✓ Branch 1 taken 128 times.
✗ Branch 2 not taken.
|
128 | if (pthread_mutex_trylock(&helper->q_lock) == 0) { |
394 |
1/2✓ Branch 0 taken 128 times.
✗ Branch 1 not taken.
|
128 | if (helper->status == HELPER_WAITING |
395 |
2/2✓ Branch 0 taken 96 times.
✓ Branch 1 taken 32 times.
|
128 | && helper->q_head == helper->q_tail) { |
396 | 96 | pthread_mutex_unlock(&helper->q_lock); | |
397 | 96 | break; | |
398 | } | ||
399 | 32 | pthread_mutex_unlock(&helper->q_lock); | |
400 | } | ||
401 | 32 | usleep(1000); | |
402 | } | ||
403 | 96 | } | |
404 |