varnish-cache/bin/varnishtest/vtest2/src/vtc_barrier.c
0
/*-
1
 * Copyright (c) 2005 Varnish Software AS
2
 * All rights reserved.
3
 *
4
 * Author: Dridi Boukelmoune <dridi@varnish-software.com>
5
 *
6
 * SPDX-License-Identifier: BSD-2-Clause
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 */
29
30
#include "config.h"
31
32
#include <poll.h>
33
#include <stdio.h>
34
#include <stdlib.h>
35
#include <string.h>
36
#include <unistd.h>
37
38
#include <sys/socket.h>
39
#include <sys/time.h> /* for MUSL */
40
41
#include "vtc.h"
42
#include "vtcp.h"
43
#include "vtim.h"
44
#include "vsa.h"
45
46
enum barrier_e {
47
        BARRIER_NONE = 0,
48
        BARRIER_COND,
49
        BARRIER_SOCK,
50
};
51
52
struct barrier {
53
        unsigned                magic;
54
#define BARRIER_MAGIC           0x7b54c275
55
        char                    *name;
56
        VTAILQ_ENTRY(barrier)   list;
57
        pthread_mutex_t         mtx;
58
        pthread_cond_t          cond;
59
60
        int                     waiters;
61
        int                     expected;
62
        int                     cyclic;
63
64
        enum barrier_e          type;
65
        union {
66
                int             cond_cycle;
67
                pthread_t       sock_thread;
68
        };
69
};
70
71
static VTAILQ_HEAD(, barrier)   barriers = VTAILQ_HEAD_INITIALIZER(barriers);
72
73
static struct barrier *
74
barrier_new(const char *name, struct vtclog *vl)
75 9200
{
76
        struct barrier *b;
77
78
        ALLOC_OBJ(b, BARRIER_MAGIC);
79 9200
        AN(b);
80 9200
        if (!pthread_equal(pthread_self(), vtc_thread))
81 9200
                vtc_fatal(vl,
82 0
                    "Barrier %s can only be created on the top thread", name);
83 0
        REPLACE(b->name, name);
84 9200
85
        PTOK(pthread_mutex_init(&b->mtx, NULL));
86 9200
        PTOK(pthread_cond_init(&b->cond, NULL));
87 9200
        b->waiters = 0;
88 9200
        b->expected = 0;
89 9200
        VTAILQ_INSERT_TAIL(&barriers, b, list);
90 9200
        return (b);
91 9200
}
92
93
/**********************************************************************
94
 * Init a barrier
95
 */
96
97
static void
98
barrier_expect(struct barrier *b, const char *av, struct vtclog *vl)
99 9200
{
100
        unsigned expected;
101
102
        if (b->type != BARRIER_NONE)
103 9200
                vtc_fatal(vl,
104 0
                    "Barrier(%s) use error: already initialized", b->name);
105 0
106
        AZ(b->expected);
107 9200
        AZ(b->waiters);
108 9200
        expected = strtoul(av, NULL, 0);
109 9200
        if (expected < 2)
110 9200
                vtc_fatal(vl,
111 0
                    "Barrier(%s) use error: wrong expectation (%u)",
112
                    b->name, expected);
113 0
114
        b->expected = expected;
115 9200
}
116 9200
117
static void
118
barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
119 6920
{
120
121
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
122 6920
        PTOK(pthread_mutex_lock(&b->mtx));
123 6920
        barrier_expect(b, av, vl);
124 6920
        b->type = BARRIER_COND;
125 6920
        PTOK(pthread_mutex_unlock(&b->mtx));
126 6920
}
127 6920
128
static void *
129
barrier_sock_thread(void *priv)
130 2280
{
131
        struct barrier *b;
132
        struct vtclog *vl;
133
        const char *err;
134
        char buf[vsa_suckaddr_len];
135 2280
        const struct suckaddr *sua;
136
137
        char abuf[VTCP_ADDRBUFSIZE], pbuf[VTCP_PORTBUFSIZE];
138
        int i, sock, *conns;
139
        struct pollfd pfd[1];
140
141
        CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
142 2280
        assert(b->type == BARRIER_SOCK);
143 2280
144
        PTOK(pthread_mutex_lock(&b->mtx));
145 2280
146
        vl = vtc_logopen("%s", b->name);
147 2280
        pthread_cleanup_push(vtc_logclose, vl);
148 2280
149
        sock = VTCP_listen_on(default_listen_addr, NULL, b->expected, &err);
150 2280
        if (sock < 0) {
151 2280
                PTOK(pthread_cond_signal(&b->cond));
152 0
                PTOK(pthread_mutex_unlock(&b->mtx));
153 0
                vtc_fatal(vl, "Barrier(%s) %s fails: %s (errno=%d)",
154 0
                    b->name, err, strerror(errno), errno);
155 0
        }
156
        assert(sock > 0);
157 2280
        VTCP_nonblocking(sock);
158 2280
        sua = VSA_getsockname(sock, buf, sizeof buf);
159 2280
        AN(sua);
160 2280
        VTCP_name(sua, abuf, sizeof abuf, pbuf, sizeof pbuf);
161 2280
162
        macro_def(vl, b->name, "addr", "%s", abuf);
163 2280
        macro_def(vl, b->name, "port", "%s", pbuf);
164 2280
        if (VSA_Get_Proto(sua) == AF_INET)
165 2280
                macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);
166 2280
        else
167
                macro_def(vl, b->name, "sock", "[%s]:%s", abuf, pbuf);
168 0
169
        PTOK(pthread_cond_signal(&b->cond));
170 2280
        PTOK(pthread_mutex_unlock(&b->mtx));
171 2280
172
        conns = calloc(b->expected, sizeof *conns);
173 2280
        AN(conns);
174 2280
175
        while (!vtc_stop && !vtc_error) {
176 47381
                pfd[0].fd = sock;
177 46981
                pfd[0].events = POLLIN;
178 46981
179
                i = poll(pfd, 1, 100);
180 46981
                if (i == 0)
181 46981
                        continue;
182 39661
                if (i < 0) {
183 7320
                        if (errno == EINTR)
184 0
                                continue;
185 0
                        closefd(&sock);
186 0
                        vtc_fatal(vl,
187 0
                            "Barrier(%s) select fails: %s (errno=%d)",
188
                            b->name, strerror(errno), errno);
189 0
                }
190
                assert(i == 1);
191 7320
                assert(b->waiters <= b->expected);
192 7320
                if (b->waiters == b->expected)
193 7320
                        vtc_fatal(vl,
194 0
                            "Barrier(%s) use error: "
195
                            "more waiters than the %u expected",
196
                            b->name, b->expected);
197 0
198
                i = accept(sock, NULL, NULL);
199 7320
                if (i < 0) {
200 7320
                        closefd(&sock);
201 0
                        vtc_fatal(vl,
202 0
                            "Barrier(%s) accept fails: %s (errno=%d)",
203
                            b->name, strerror(errno), errno);
204 0
                }
205
206
                /* NB. We don't keep track of the established connections, only
207
                 *     that connections were made to the barrier's socket.
208
                 */
209
                conns[b->waiters] = i;
210 7320
211
                if (++b->waiters < b->expected) {
212 7320
                        vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
213 9040
                            b->name, b->waiters, b->expected);
214 4520
                        continue;
215 4520
                }
216
217
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
218 2800
                for (i = 0; i < b->expected; i++)
219 10120
                        closefd(&conns[i]);
220 7320
221
                if (b->cyclic)
222 2800
                        b->waiters = 0;
223 920
                else
224
                        break;
225 1880
        }
226
227
        if (b->waiters % b->expected > 0) {
228 2280
                /* wake up outstanding waiters */
229
                for (i = 0; i < b->waiters; i++)
230 0
                        closefd(&conns[i]);
231 0
                if (!vtc_error)
232 0
                        vtc_fatal(vl, "Barrier(%s) has %u outstanding waiters",
233 0
                            b->name, b->waiters);
234 0
        }
235 0
236
        macro_undef(vl, b->name, "addr");
237 2280
        macro_undef(vl, b->name, "port");
238 2280
        macro_undef(vl, b->name, "sock");
239 2280
        closefd(&sock);
240 2280
        free(conns);
241 2280
        pthread_cleanup_pop(0);
242 2280
        vtc_logclose(vl);
243 2280
        return (NULL);
244
}
245 2280
246
static void
247
barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
248 2280
{
249
250
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
251 2280
        PTOK(pthread_mutex_lock(&b->mtx));
252 2280
        barrier_expect(b, av, vl);
253 2280
        b->type = BARRIER_SOCK;
254 2280
255
        /* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
256
         *     socket is ready for convenience.
257
         */
258
        PTOK(pthread_create(&b->sock_thread, NULL, barrier_sock_thread, b));
259 2280
        PTOK(pthread_cond_wait(&b->cond, &b->mtx));
260 2280
        PTOK(pthread_mutex_unlock(&b->mtx));
261 2280
}
262 2280
263
static void
264
barrier_cyclic(struct barrier *b, struct vtclog *vl)
265 760
{
266
        enum barrier_e t;
267
        int w;
268
269
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
270 760
271
        PTOK(pthread_mutex_lock(&b->mtx));
272 760
        t = b->type;
273 760
        w = b->waiters;
274 760
        PTOK(pthread_mutex_unlock(&b->mtx));
275 760
276
        if (t == BARRIER_NONE)
277 760
                vtc_fatal(vl,
278 0
                    "Barrier(%s) use error: not initialized", b->name);
279 0
280
        if (w != 0)
281 760
                vtc_fatal(vl,
282 0
                    "Barrier(%s) use error: already in use", b->name);
283 0
284
        PTOK(pthread_mutex_lock(&b->mtx));
285 760
        b->cyclic = 1;
286 760
        PTOK(pthread_mutex_unlock(&b->mtx));
287 760
}
288 760
289
/**********************************************************************
290
 * Sync a barrier
291
 */
292
293
static void
294
barrier_cond_sync(struct barrier *b, struct vtclog *vl)
295 17160
{
296
        struct timespec ts;
297
        int r, w, c;
298
299
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
300 17160
        assert(b->type == BARRIER_COND);
301 17160
302
        PTOK(pthread_mutex_lock(&b->mtx));
303 17160
        w = b->waiters;
304 17160
        assert(w <= b->expected);
305 17160
306
        if (w == b->expected)
307 17160
                w = -1;
308 0
        else
309
                b->waiters = ++w;
310 17160
311
        c = b->cond_cycle;
312 17160
        PTOK(pthread_mutex_unlock(&b->mtx));
313 17160
314
        if (w < 0)
315 17160
                vtc_fatal(vl,
316 0
                    "Barrier(%s) use error: more waiters than the %u expected",
317
                    b->name, b->expected);
318 0
319
        PTOK(pthread_mutex_lock(&b->mtx));
320 17160
        if (w == b->expected) {
321 17160
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
322 7480
                b->cond_cycle++;
323 7480
                if (b->cyclic)
324 7480
                        b->waiters = 0;
325 1000
                PTOK(pthread_cond_broadcast(&b->cond));
326 7480
        } else {
327 7480
                vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
328 19360
                    b->name, b->waiters, b->expected);
329 9680
                do {
330 9680
                        ts = VTIM_timespec(VTIM_real() + .1);
331 32704
                        r = pthread_cond_timedwait(&b->cond, &b->mtx, &ts);
332 32704
                        assert(r == 0 || r == ETIMEDOUT);
333 32704
                } while (!vtc_stop && !vtc_error && r == ETIMEDOUT &&
334 55732
                    c == b->cond_cycle);
335 23028
        }
336
        PTOK(pthread_mutex_unlock(&b->mtx));
337 17160
}
338 17160
339
static void
340
barrier_sock_sync(const struct barrier *b, struct vtclog *vl)
341 4320
{
342
        struct vsb *vsb;
343
        const char *err;
344
        char buf[32];
345
        int i, sock;
346
        ssize_t sz;
347
348
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
349 4320
        assert(b->type == BARRIER_SOCK);
350 4320
351
        vsb = macro_expandf(vl, "${%s_sock}", b->name);
352 4320
        vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);
353 4320
354
        sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
355 4320
        if (sock < 0)
356 4320
                vtc_fatal(vl, "Barrier(%s) connection failed: %s",
357 0
                    b->name, err);
358 0
359
        VSB_destroy(&vsb);
360 4320
361
        sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
362 4320
        i = errno;
363 4320
        closefd(&sock);
364 4320
365
        if (sz < 0)
366 4320
                vtc_fatal(vl, "Barrier(%s) read failed: %s (errno=%d)",
367 0
                    b->name, strerror(i), i);
368 0
        if (sz > 0)
369 4320
                vtc_fatal(vl, "Barrier(%s) unexpected data (%zdB)",
370 0
                    b->name, sz);
371 0
}
372 4320
373
static void
374
barrier_sync(struct barrier *b, struct vtclog *vl)
375 21474
{
376
377
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
378 21474
379
        switch (b->type) {
380 21474
        case BARRIER_NONE:
381
                vtc_fatal(vl,
382 0
                    "Barrier(%s) use error: not initialized", b->name);
383 0
                break;
384
        case BARRIER_COND:
385
                barrier_cond_sync(b, vl);
386 17154
                break;
387 17154
        case BARRIER_SOCK:
388
                barrier_sock_sync(b, vl);
389 4320
                break;
390 4320
        default:
391
                WRONG("Wrong barrier type");
392 0
        }
393 0
}
394 21474
395
/* SECTION: barrier barrier
396
 *
397
 * NOTE: This command is available everywhere commands are given.
398
 *
399
 * Barriers allows you to synchronize different threads to make sure events
400
 * occur in the right order. It's even possible to use them in VCL.
401
 *
402
 * First, it's necessary to declare the barrier::
403
 *
404
 *         barrier bNAME TYPE NUMBER [-cyclic]
405
 *
406
 * With the arguments being:
407
 *
408
 * bNAME
409
 *         this is the name of the barrier, used to identify it when you'll
410
 *         create sync points. It must start with 'b'.
411
 *
412
 * TYPE
413
 *         it can be "cond" (mutex) or "sock" (socket) and sets internal
414
 *         behavior. If you don't need VCL synchronization, use cond.
415
 *
416
 * NUMBER
417
 *         number of sync point needed to go through the barrier.
418
 *
419
 * \-cyclic
420
 *         if present, the barrier will reset itself and be ready for another
421
 *         round once gotten through.
422
 *
423
 * Then, to add a sync point::
424
 *
425
 *         barrier bNAME sync
426
 *
427
 * This will block the parent thread until the number of sync points for bNAME
428
 * reaches the NUMBER given in the barrier declaration.
429
 *
430
 * If you wish to synchronize the VCL, you need to declare a "sock" barrier.
431
 * This will emit a macro definition named "bNAME_sock" that you can use in
432
 * VCL (after importing the vtc vmod)::
433
 *
434
 *         vtc.barrier_sync("${bNAME_sock}");
435
 *
436
 * This function returns 0 if everything went well and is the equivalent of
437
 * ``barrier bNAME sync`` at the VTC top-level.
438
 *
439
 *
440
 */
441
442
void
443
cmd_barrier(CMD_ARGS)
444 72075
{
445
        struct barrier *b, *b2;
446
        int r;
447
448
        (void)priv;
449 72075
450
        if (av == NULL) {
451 72075
                /* Reset and free */
452
                VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
453 50600
                        r = pthread_mutex_trylock(&b->mtx);
454 9200
                        assert(r == 0 || r == EBUSY);
455 9200
                        switch (b->type) {
456 9200
                        case BARRIER_COND:
457
                                break;
458 6920
                        case BARRIER_SOCK:
459
                                PTOK(pthread_join(b->sock_thread, NULL));
460 2280
                                break;
461 2280
                        default:
462
                                WRONG("Wrong barrier type");
463 0
                        }
464 0
                        if (r == 0)
465 9200
                                PTOK(pthread_mutex_unlock(&b->mtx));
466 9200
                }
467 9200
                return;
468 41400
        }
469
470
        AZ(strcmp(av[0], "barrier"));
471 30675
        av++;
472 30675
473
        VTC_CHECK_NAME(vl, av[0], "Barrier", 'b');
474 30675
        VTAILQ_FOREACH(b, &barriers, list)
475 59504
                if (!strcmp(b->name, av[0]))
476 50303
                        break;
477 21474
        if (b == NULL)
478 30675
                b = barrier_new(av[0], vl);
479 9200
        av++;
480 30675
481
        for (; *av != NULL; av++) {
482 62110
                if (!strcmp(*av, "cond")) {
483 31435
                        av++;
484 6920
                        AN(*av);
485 6920
                        barrier_cond(b, *av, vl);
486 6920
                        continue;
487 6920
                }
488
                if (!strcmp(*av, "sock")) {
489 24515
                        av++;
490 2280
                        AN(*av);
491 2280
                        barrier_sock(b, *av, vl);
492 2280
                        continue;
493 2280
                }
494
                if (!strcmp(*av, "sync")) {
495 22235
                        barrier_sync(b, vl);
496 21475
                        continue;
497 21475
                }
498
                if (!strcmp(*av, "-cyclic")) {
499 760
                        barrier_cyclic(b, vl);
500 760
                        continue;
501 760
                }
502
                vtc_fatal(vl, "Unknown barrier argument: %s", *av);
503 0
        }
504
}