varnish-cache/bin/varnishtest/vtest2/src/vtc_tunnel.c
0
/*-
1
 * Copyright (c) 2020 Varnish Software
2
 * All rights reserved.
3
 *
4
 * Author: Dridi Boukelmoune <dridi.boukelmoune@gmail.com>
5
 *
6
 * SPDX-License-Identifier: BSD-2-Clause
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 */
29
30
#include "config.h"
31
32
#include <sys/socket.h>
33
#include <sys/types.h>
34
#include <sys/stat.h>
35
36
#include <errno.h>
37
#include <poll.h>
38
#include <stdio.h>
39
#include <stdlib.h>
40
#include <string.h>
41
#include <unistd.h>
42
43
#include "vtc.h"
44
45
#include "vsa.h"
46
#include "vtcp.h"
47
48
/* SECTION: tunnel tunnel
49
 *
50
 * The goal of a tunnel is to help control the data transfer between two
51
 * parties, for example to trigger socket timeouts in the middle of protocol
52
 * frames, without the need to change how both parties are implemented.
53
 *
54
 * A tunnel accepts a connection and then connects on behalf of the source to
55
 * the desired destination. Once both connections are established the tunnel
56
 * will transfer bytes unchanged between the source and destination. Transfer
57
 * can be interrupted, usually with the help of synchronization methods like
58
 * barriers. Once the transfer is paused, it is possible to let a specific
59
 * amount of bytes move in either direction.
60
 *
61
 * SECTION: tunnel.args Arguments
62
 *
63
 * \-start
64
 *        Start the tunnel in background, processing the last given
65
 *        specification.
66
 *
67
 * \-start+pause
68
 *        Start the tunnel, but already paused.
69
 *
70
 * \-wait
71
 *        Block until the thread finishes.
72
 *
73
 * \-listen STRING
74
 *        Dictate the listening socket for the server. STRING is of the form
75
 *        "IP PORT", or "HOST PORT".
76
 *
77
 *        Listens by defaults to a local random port.
78
 *
79
 * \-connect STRING
80
 *        Indicate the server to connect to. STRING is also of the form
81
 *        "IP PORT", or "HOST PORT".
82
 *
83
 *        Connects by default to a varnish instance called ``v1``.
84
 *
85
 * SECTION: tunnel.spec Specification
86
 *
87
 * The specification contains a list of tunnel commands that can be combined
88
 * with barriers and delays. For example::
89
 *
90
 *     tunnel t1 {
91
 *         barrier b1 sync
92
 *         pause
93
 *         delay 1
94
 *         send 42
95
 *         barrier b2 sync
96
 *         resume
97
 *     } -start
98
 *
99
 * If one end of the tunnel is closed before the end of the specification
100
 * the test case will fail. A specification that ends in a paused state will
101
 * implicitly resume the tunnel.
102
 */
103
104
enum tunnel_state_e {
105
        TUNNEL_ACCEPT,
106
        TUNNEL_RUNNING,
107
        TUNNEL_PAUSED,
108
        TUNNEL_SPEC_DONE,
109
        TUNNEL_POLL_DONE,
110
        TUNNEL_STOPPED,
111
};
112
113
struct tunnel_lane {
114
        char                    buf[1024];
115
        ssize_t                 buf_len;
116
        size_t                  wrk_len;
117
        int                     *rfd;
118
        int                     *wfd;
119
};
120
121
struct tunnel {
122
        unsigned                magic;
123
#define TUNNEL_MAGIC            0x7f59913d
124
        char                    *name;
125
        struct vtclog           *vl;
126
        VTAILQ_ENTRY(tunnel)    list;
127
        enum tunnel_state_e     state;
128
        unsigned                start_paused;
129
130
        char                    *spec;
131
132
        char                    connect[256];
133
        int                     csock;
134
135
        char                    listen[256];
136
        int                     lsock;
137
        char                    laddr[VTCP_ADDRBUFSIZE];
138
        char                    lport[VTCP_PORTBUFSIZE];
139
140
        int                     asock;
141
142
        struct tunnel_lane      send_lane[1];
143
        struct tunnel_lane      recv_lane[1];
144
145
        pthread_mutex_t         mtx;            /* state and lanes->*_len */
146
        pthread_cond_t          cond;
147
        pthread_t               tspec;
148
        pthread_t               tpoll;
149
};
150
151
static pthread_mutex_t          tunnel_mtx;
152
153
static VTAILQ_HEAD(, tunnel)    tunnels = VTAILQ_HEAD_INITIALIZER(tunnels);
154
155
/**********************************************************************
156
 * Is the tunnel still operating?
157
 */
158
159 2618
static unsigned
160
tunnel_is_open(struct tunnel *t)
161
{
162
        unsigned is_open;
163 2618
164 2618
        PTOK(pthread_mutex_lock(&t->mtx));
165 2618
        is_open = (t->send_lane->buf_len >= 0 && t->recv_lane->buf_len >= 0);
166 2618
        PTOK(pthread_mutex_unlock(&t->mtx));
167
        return (is_open);
168
}
169
170
/**********************************************************************
171
 * SECTION: tunnel.spec.pause
172
 *
173
 * pause
174
 *         Wait for in-flight bytes to be transferred and pause the tunnel.
175
 *
176
 *         The tunnel must be running.
177
 */
178
179 240
static void
180
cmd_tunnel_pause(CMD_ARGS)
181
{
182
        struct tunnel *t;
183 240
184 240
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
185
        AZ(av[1]);
186 240
187 0
        if (!tunnel_is_open(t))
188
                vtc_fatal(vl, "Tunnel already closed");
189 240
190 240
        PTOK(pthread_mutex_lock(&t->mtx));
191 0
        if (t->state == TUNNEL_PAUSED) {
192 0
                PTOK(pthread_mutex_unlock(&t->mtx));
193
                vtc_fatal(vl, "Tunnel already paused");
194 240
        }
195 240
        assert(t->state == TUNNEL_RUNNING);
196 240
        t->state = TUNNEL_PAUSED;
197 240
        PTOK(pthread_cond_signal(&t->cond));
198 240
        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
199 240
        PTOK(pthread_mutex_unlock(&t->mtx));
200
}
201
202
/**********************************************************************
203
 * SECTION: tunnel.spec.send
204
 *
205
 * send NUMBER
206
 *         Wait until NUMBER bytes are transferred from source to
207
 *         destination.
208
 *
209
 *         The tunnel must be paused, it remains paused afterwards.
210
 */
211
212 400
static void
213
cmd_tunnel_send(CMD_ARGS)
214
{
215
        struct tunnel *t;
216
        unsigned len;
217 400
218 400
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
219 400
        AN(av[1]);
220
        AZ(av[2]);
221 400
222
        len = atoi(av[1]);
223 400
224 0
        if (!tunnel_is_open(t))
225
                vtc_fatal(vl, "Tunnel already closed");
226 400
227 400
        PTOK(pthread_mutex_lock(&t->mtx));
228 0
        if (t->state == TUNNEL_RUNNING) {
229 0
                PTOK(pthread_mutex_unlock(&t->mtx));
230
                vtc_fatal(vl, "Tunnel still running");
231 400
        }
232 400
        assert(t->state == TUNNEL_PAUSED);
233 400
        AZ(t->send_lane->wrk_len);
234 400
        AZ(t->recv_lane->wrk_len);
235 200
        if (!strcmp(av[0], "send"))
236
                t->send_lane->wrk_len = len;
237 200
        else
238 400
                t->recv_lane->wrk_len = len;
239 400
        PTOK(pthread_cond_signal(&t->cond));
240 400
        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
241 400
        PTOK(pthread_mutex_unlock(&t->mtx));
242
}
243
244
/**********************************************************************
245
 * SECTION: tunnel.spec.recv
246
 *
247
 * recv NUMBER
248
 *         Wait until NUMBER bytes are transferred from destination to
249
 *         source.
250
 *
251
 *         The tunnel must be paused, it remains paused afterwards.
252
 */
253
254 200
static void
255
cmd_tunnel_recv(CMD_ARGS)
256
{
257 200
258 200
        cmd_tunnel_send(av, priv, vl);
259
}
260
261
/**********************************************************************
262
 * SECTION: tunnel.spec.resume
263
 *
264
 * resume
265
 *         Resume the transfer of bytes in both directions.
266
 *
267
 *         The tunnel must be paused.
268
 */
269
270 280
static void
271
cmd_tunnel_resume(CMD_ARGS)
272
{
273
        struct tunnel *t;
274 280
275 280
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
276
        AZ(av[1]);
277 280
278 0
        if (!tunnel_is_open(t))
279
                vtc_fatal(vl, "Tunnel already closed");
280 280
281 280
        PTOK(pthread_mutex_lock(&t->mtx));
282 0
        if (t->state == TUNNEL_RUNNING) {
283 0
                PTOK(pthread_mutex_unlock(&t->mtx));
284
                vtc_fatal(vl, "Tunnel already running");
285 280
        }
286 280
        assert(t->state == TUNNEL_PAUSED);
287 280
        t->state = TUNNEL_RUNNING;
288 280
        PTOK(pthread_cond_signal(&t->cond));
289 280
        PTOK(pthread_mutex_unlock(&t->mtx));
290
}
291
292
static const struct cmds tunnel_cmds[] = {
293
#define CMD_TUNNEL(n) { #n, cmd_tunnel_##n },
294
        CMD_TUNNEL(pause)
295
        CMD_TUNNEL(send)
296
        CMD_TUNNEL(recv)
297
        CMD_TUNNEL(resume)
298
#undef CMD_TUNNEL
299
        { NULL, NULL }
300
};
301
302
/**********************************************************************
303
 * Tunnel poll thread
304
 */
305
306 3076
static void
307
tunnel_read(struct tunnel *t, struct vtclog *vl, const struct pollfd *pfd,
308
    struct tunnel_lane *lane)
309
{
310
        size_t len;
311
        ssize_t res;
312
        enum tunnel_state_e state;
313 3076
314 3076
        assert(pfd->fd == *lane->rfd);
315 1796
        if (!(pfd->revents & POLLIN))
316
                return;
317 1280
318 1280
        PTOK(pthread_mutex_lock(&t->mtx));
319 1280
        AZ(lane->buf_len);
320 1280
        len = lane->wrk_len;
321 1280
        state = t->state;
322
        PTOK(pthread_mutex_unlock(&t->mtx));
323 1280
324 400
        if (len == 0 && state == TUNNEL_PAUSED)
325
                return;
326 880
327 480
        if (len == 0 || len > sizeof lane->buf)
328
                len = sizeof lane->buf;
329 880
330 880
        res = read(pfd->fd, lane->buf, len);
331 0
        if (res < 0)
332
                vtc_fatal(vl, "Read failed: %s", strerror(errno));
333 880
334 880
        PTOK(pthread_mutex_lock(&t->mtx));
335 880
        lane->buf_len = (res == 0) ? -1 : res;
336 3076
        PTOK(pthread_mutex_unlock(&t->mtx));
337
}
338
339 3076
static void
340
tunnel_write(struct tunnel *t, struct vtclog *vl, struct tunnel_lane *lane,
341
    const char *action)
342
{
343
        const char *p;
344
        ssize_t res, l;
345 3076
346 3076
        p = lane->buf;
347
        l = lane->buf_len;
348 3076
349 720
        if (l > 0)
350 3796
                vtc_log(vl, 3, "%s %zd bytes", action, l);
351 720
        while (l > 0) {
352 720
                res = write(*lane->wfd, p, l);
353 0
                if (res <= 0)
354 720
                        vtc_fatal(vl, "Write failed: %s", strerror(errno));
355 720
                l -= res;
356
                p += res;
357
        }
358 3076
359 3076
        PTOK(pthread_mutex_lock(&t->mtx));
360 800
        if (lane->wrk_len > 0 && lane->buf_len != -1) {
361 800
                assert(lane->buf_len >= 0);
362 800
                assert(lane->wrk_len >= (size_t)lane->buf_len);
363 800
                lane->wrk_len -= lane->buf_len;
364 3076
        }
365 3076
        lane->buf_len = l;
366 3076
        PTOK(pthread_mutex_unlock(&t->mtx));
367
}
368
369 160
static void *
370
tunnel_poll_thread(void *priv)
371
{
372
        struct tunnel *t;
373
        struct vtclog *vl;
374
        struct pollfd pfd[2];
375
        enum tunnel_state_e state;
376
        int res;
377 160
378
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
379 160
380 160
        vl = vtc_logopen("%s", t->name);
381
        pthread_cleanup_push(vtc_logclose, vl);
382 1698
383 1538
        while (tunnel_is_open(t) && !vtc_stop) {
384
                PTOK(pthread_mutex_lock(&t->mtx));
385 1704
                /* NB: can be woken up by `tunnel tX -wait` */
386 166
                while (t->state == TUNNEL_ACCEPT && !vtc_stop)
387 1538
                        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
388 1538
                state = t->state;
389
                PTOK(pthread_mutex_unlock(&t->mtx));
390 1538
391 0
                if (vtc_stop)
392
                        break;
393 1538
394
                assert(state < TUNNEL_POLL_DONE);
395 1538
396 1538
                memset(pfd, 0, sizeof pfd);
397 1538
                pfd[0].fd = *t->send_lane->rfd;
398 1538
                pfd[1].fd = *t->recv_lane->rfd;
399 1538
                pfd[0].events = POLLIN;
400 1538
                pfd[1].events = POLLIN;
401 1538
                res = poll(pfd, 2, 100);
402 0
                if (res == -1)
403
                        vtc_fatal(vl, "Poll failed: %s", strerror(errno));
404 1538
405 1538
                tunnel_read(t, vl, &pfd[0], t->send_lane);
406
                tunnel_read(t, vl, &pfd[1], t->recv_lane);
407 1538
408 1538
                PTOK(pthread_mutex_lock(&t->mtx));
409 840
                if (t->state == TUNNEL_PAUSED && t->send_lane->wrk_len == 0 &&
410 640
                    t->recv_lane->wrk_len == 0) {
411 640
                        AZ(t->send_lane->buf_len);
412 640
                        AZ(t->recv_lane->buf_len);
413 640
                        PTOK(pthread_cond_signal(&t->cond));
414 640
                        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
415 1538
                }
416
                PTOK(pthread_mutex_unlock(&t->mtx));
417 1538
418 0
                if (vtc_stop)
419
                        break;
420 1538
421 1538
                tunnel_write(t, vl, t->send_lane, "Sending");
422
                tunnel_write(t, vl, t->recv_lane, "Receiving");
423
        }
424 160
425 160
        PTOK(pthread_mutex_lock(&t->mtx));
426 0
        if (t->state != TUNNEL_SPEC_DONE && !vtc_stop) {
427 0
                PTOK(pthread_cond_signal(&t->cond));
428 0
                PTOK(pthread_cond_wait(&t->cond, &t->mtx));
429 160
        }
430
        PTOK(pthread_mutex_unlock(&t->mtx));
431 160
432 160
        pthread_cleanup_pop(0);
433 160
        vtc_logclose(vl);
434 160
        t->state = TUNNEL_POLL_DONE;
435
        return (NULL);
436
}
437
438
/**********************************************************************
439
 * Tunnel spec thread
440
 */
441
442 160
static void
443
tunnel_accept(struct tunnel *t, struct vtclog *vl)
444
{
445
        struct vsb *vsb;
446
        const char *addr, *err;
447
        int afd, cfd;
448 160
449 160
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
450 160
        assert(t->lsock >= 0);
451 160
        assert(t->asock < 0);
452 160
        assert(t->csock < 0);
453
        assert(t->state == TUNNEL_ACCEPT);
454 160
455 160
        vtc_log(vl, 4, "Accepting");
456 160
        afd = accept(t->lsock, NULL, NULL);
457 0
        if (afd < 0)
458 160
                vtc_fatal(vl, "Accept failed: %s", strerror(errno));
459
        vtc_log(vl, 3, "Accepted socket fd is %d", afd);
460 160
461 160
        vsb = macro_expand(vl, t->connect);
462 160
        AN(vsb);
463
        addr = VSB_data(vsb);
464 160
465 160
        cfd = VTCP_open(addr, NULL, 10., &err);
466 0
        if (cfd < 0)
467 160
                vtc_fatal(vl, "Failed to open %s: %s", addr, err);
468 160
        vtc_log(vl, 3, "Connected socket fd is %d", cfd);
469
        VSB_destroy(&vsb);
470 160
471 160
        VTCP_blocking(afd);
472
        VTCP_blocking(cfd);
473 160
474 160
        PTOK(pthread_mutex_lock(&t->mtx));
475 160
        t->asock = afd;
476 160
        t->csock = cfd;
477 160
        t->send_lane->buf_len = 0;
478 160
        t->send_lane->wrk_len = 0;
479 160
        t->recv_lane->buf_len = 0;
480 160
        t->recv_lane->wrk_len = 0;
481 40
        if (t->start_paused) {
482 40
                t->state = TUNNEL_PAUSED;
483 40
                t->start_paused = 0;
484 120
        } else
485 160
                t->state = TUNNEL_RUNNING;
486 160
        PTOK(pthread_cond_signal(&t->cond));
487 160
        PTOK(pthread_mutex_unlock(&t->mtx));
488
}
489
490 160
static void *
491
tunnel_spec_thread(void *priv)
492
{
493
        struct tunnel *t;
494
        struct vtclog *vl;
495
        enum tunnel_state_e state;
496 160
497 160
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
498
        AN(*t->connect);
499 160
500 160
        vl = vtc_logopen("%s", t->name);
501 160
        vtc_log_set_cmd(vl, tunnel_cmds);
502
        pthread_cleanup_push(vtc_logclose, vl);
503 160
504 160
        tunnel_accept(t, vl);
505
        parse_string(vl, t, t->spec);
506 160
507 160
        PTOK(pthread_mutex_lock(&t->mtx));
508 160
        state = t->state;
509
        PTOK(pthread_mutex_unlock(&t->mtx));
510 160
511 80
        if (state == TUNNEL_PAUSED && !vtc_stop)
512
                parse_string(vl, t, "resume");
513 160
514 160
        PTOK(pthread_mutex_lock(&t->mtx));
515 160
        t->state = TUNNEL_SPEC_DONE;
516 160
        PTOK(pthread_cond_signal(&t->cond));
517
        PTOK(pthread_mutex_unlock(&t->mtx));
518 160
519 160
        vtc_log(vl, 2, "Ending");
520 160
        pthread_cleanup_pop(0);
521 160
        vtc_logclose(vl);
522
        return (NULL);
523
}
524
525
/**********************************************************************
526
 * Tunnel management
527
 */
528
529 120
static struct tunnel *
530
tunnel_new(const char *name)
531
{
532
        struct tunnel *t;
533 120
534 120
        ALLOC_OBJ(t, TUNNEL_MAGIC);
535 120
        AN(t);
536 120
        REPLACE(t->name, name);
537 120
        t->vl = vtc_logopen("%s", name);
538
        AN(t->vl);
539 120
540 120
        t->state = TUNNEL_STOPPED;
541 120
        bprintf(t->connect, "%s", "${v1_sock}");
542 120
        bprintf(t->listen, "%s", default_listen_addr);
543 120
        t->csock = -1;
544 120
        t->lsock = -1;
545 120
        t->asock = -1;
546 120
        t->send_lane->rfd = &t->asock;
547 120
        t->send_lane->wfd = &t->csock;
548 120
        t->recv_lane->rfd = &t->csock;
549 120
        t->recv_lane->wfd = &t->asock;
550 120
        PTOK(pthread_mutex_init(&t->mtx, NULL));
551 120
        PTOK(pthread_cond_init(&t->cond, NULL));
552 120
        PTOK(pthread_mutex_lock(&tunnel_mtx));
553 120
        VTAILQ_INSERT_TAIL(&tunnels, t, list);
554 120
        PTOK(pthread_mutex_unlock(&tunnel_mtx));
555
        return (t);
556
}
557
558 120
static void
559
tunnel_delete(struct tunnel *t)
560
{
561 120
562 120
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
563 120
        assert(t->asock < 0);
564 120
        assert(t->csock < 0);
565 120
        if (t->lsock >= 0)
566 120
                VTCP_close(&t->lsock);
567 120
        macro_undef(t->vl, t->name, "addr");
568 120
        macro_undef(t->vl, t->name, "port");
569 120
        macro_undef(t->vl, t->name, "sock");
570 120
        vtc_logclose(t->vl);
571 120
        (void)pthread_mutex_destroy(&t->mtx);
572 120
        (void)pthread_cond_destroy(&t->cond);
573 120
        free(t->name);
574 120
        FREE_OBJ(t);
575
}
576
577
/**********************************************************************
578
 * Tunnel listen
579
 */
580
581 160
static void
582
tunnel_listen(struct tunnel *t)
583 160
{
584
        char buf[vsa_suckaddr_len];
585
        const struct suckaddr *sua;
586
        const char *err;
587 160
588 40
        if (t->lsock >= 0)
589 160
                VTCP_close(&t->lsock);
590 160
        t->lsock = VTCP_listen_on(t->listen, "0", 1, &err);
591 0
        if (err != NULL)
592
                vtc_fatal(t->vl,
593 0
                    "Tunnel listen address (%s) cannot be resolved: %s",
594 160
                    t->listen, err);
595 160
        assert(t->lsock > 0);
596 160
        sua = VSA_getsockname(t->lsock, buf, sizeof buf);
597 160
        AN(sua);
598
        VTCP_name(sua, t->laddr, sizeof t->laddr, t->lport, sizeof t->lport);
599
600 160
        /* Record the actual port, and reuse it on subsequent starts */
601 160
        if (VSA_Get_Proto(sua) == AF_INET)
602
                bprintf(t->listen, "%s:%s", t->laddr, t->lport);
603 0
        else
604
                bprintf(t->listen, "[%s]:%s", t->laddr, t->lport);
605 160
606 160
        macro_def(t->vl, t->name, "addr", "%s", t->laddr);
607 160
        macro_def(t->vl, t->name, "port", "%s", t->lport);
608 160
        macro_def(t->vl, t->name, "sock", "%s %s", t->laddr, t->lport);
609
}
610
611
/**********************************************************************
612
 * Start the tunnel thread
613
 */
614
615 160
static void
616
tunnel_start(struct tunnel *t)
617
{
618 160
619 160
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
620 160
        vtc_log(t->vl, 2, "Starting tunnel");
621 160
        tunnel_listen(t);
622 160
        vtc_log(t->vl, 1, "Listen on %s", t->listen);
623 160
        assert(t->state == TUNNEL_STOPPED);
624 160
        t->state = TUNNEL_ACCEPT;
625 160
        t->send_lane->buf_len = 0;
626 160
        t->send_lane->wrk_len = 0;
627 160
        t->recv_lane->buf_len = 0;
628 160
        t->recv_lane->wrk_len = 0;
629 160
        PTOK(pthread_create(&t->tpoll, NULL, tunnel_poll_thread, t));
630 160
        PTOK(pthread_create(&t->tspec, NULL, tunnel_spec_thread, t));
631
}
632
633 40
static void
634
tunnel_start_pause(struct tunnel *t)
635
{
636 40
637 40
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
638 40
        t->start_paused = 1;
639 40
        tunnel_start(t);
640
}
641
642
/**********************************************************************
643
 * Wait for tunnel thread to stop
644
 */
645
646 160
static void
647
tunnel_wait(struct tunnel *t)
648
{
649
        void *res;
650 160
651 160
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
652
        vtc_log(t->vl, 2, "Waiting for tunnel");
653 160
654
        PTOK(pthread_cond_signal(&t->cond));
655 160
656 160
        PTOK(pthread_join(t->tspec, &res));
657 0
        if (res != NULL && !vtc_stop)
658
                vtc_fatal(t->vl, "Tunnel spec returned \"%p\"", res);
659 160
660 160
        PTOK(pthread_join(t->tpoll, &res));
661 0
        if (res != NULL && !vtc_stop)
662
                vtc_fatal(t->vl, "Tunnel poll returned \"%p\"", res);
663 160
664 160
        if (t->csock >= 0)
665 160
                VTCP_close(&t->csock);
666 160
        if (t->asock >= 0)
667 160
                VTCP_close(&t->asock);
668 160
        t->tspec = 0;
669 160
        t->tpoll = 0;
670 160
        t->state = TUNNEL_STOPPED;
671
}
672
673
/**********************************************************************
674
 * Reap tunnel
675
 */
676
677 41400
static void
678
tunnel_reset(void)
679
{
680
        struct tunnel *t;
681 41520
682 41520
        while (1) {
683 41520
                PTOK(pthread_mutex_lock(&tunnel_mtx));
684 41520
                t = VTAILQ_FIRST(&tunnels);
685 41520
                CHECK_OBJ_ORNULL(t, TUNNEL_MAGIC);
686 120
                if (t != NULL)
687 41520
                        VTAILQ_REMOVE(&tunnels, t, list);
688 41520
                PTOK(pthread_mutex_unlock(&tunnel_mtx));
689 41400
                if (t == NULL)
690
                        break;
691 120
692 40
                if (t->state != TUNNEL_STOPPED)
693 120
                        tunnel_wait(t);
694
                tunnel_delete(t);
695 41400
        }
696
}
697
698
/**********************************************************************
699
 * Tunnel command dispatch
700
 */
701
702 41680
void
703
cmd_tunnel(CMD_ARGS)
704
{
705
        struct tunnel *t;
706 41680
707
        (void)priv;
708 41680
709
        if (av == NULL) {
710 41400
                /* Reset and free */
711 41400
                tunnel_reset();
712
                return;
713
        }
714 280
715 280
        AZ(strcmp(av[0], "tunnel"));
716
        av++;
717 280
718
        VTC_CHECK_NAME(vl, av[0], "Tunnel", 't');
719 280
720 320
        PTOK(pthread_mutex_lock(&tunnel_mtx));
721 200
        VTAILQ_FOREACH(t, &tunnels, list)
722 160
                if (!strcmp(t->name, av[0]))
723 280
                        break;
724 280
        PTOK(pthread_mutex_unlock(&tunnel_mtx));
725 120
        if (t == NULL)
726 280
                t = tunnel_new(av[0]);
727 280
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
728
        av++;
729 760
730 480
        for (; *av != NULL; av++) {
731 0
                if (vtc_error)
732 480
                        break;
733 120
                if (!strcmp(*av, "-wait")) {
734 0
                        if (t->state == TUNNEL_STOPPED)
735 120
                                vtc_fatal(t->vl, "Tunnel not -started");
736 120
                        tunnel_wait(t);
737
                        continue;
738
                }
739
740 360
                /* Don't mess with a running tunnel */
741 0
                if (t->state != TUNNEL_STOPPED)
742
                        tunnel_wait(t);
743 360
744 360
                assert(t->state == TUNNEL_STOPPED);
745 80
                if (!strcmp(*av, "-connect")) {
746 80
                        bprintf(t->connect, "%s", av[1]);
747 80
                        av++;
748
                        continue;
749 280
                }
750 0
                if (!strcmp(*av, "-listen")) {
751 0
                        bprintf(t->listen, "%s", av[1]);
752 0
                        av++;
753
                        continue;
754 280
                }
755 120
                if (!strcmp(*av, "-start")) {
756 120
                        tunnel_start(t);
757
                        continue;
758 160
                }
759 40
                if (!strcmp(*av, "-start+pause")) {
760 40
                        tunnel_start_pause(t);
761
                        continue;
762 120
                }
763 0
                if (**av == '-')
764 120
                        vtc_fatal(t->vl, "Unknown tunnel argument: %s", *av);
765 120
                t->spec = *av;
766 41680
        }
767
}
768
769 41400
void
770
init_tunnel(void)
771
{
772 41400
773 41400
        PTOK(pthread_mutex_init(&tunnel_mtx, NULL));
774
}