Skip to content

Commit

Permalink
srt: shutdown connection if not data for 10s
Browse files Browse the repository at this point in the history
  • Loading branch information
funman committed Dec 15, 2023
1 parent 87b5f9d commit 371b0f9
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion lib/upipe-srt/upipe_srt_handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct upipe_srt_handshake {
struct upump_mgr *upump_mgr;
struct upump *upump_timer;
struct upump *upump_timeout;
struct upump *upump_keepalive_timeout;
struct uclock *uclock;
struct urequest uclock_request;

Expand Down Expand Up @@ -137,6 +138,7 @@ UPIPE_HELPER_OUTPUT(upipe_srt_handshake, output, flow_def, output_state, request
UPIPE_HELPER_UPUMP_MGR(upipe_srt_handshake, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_timer, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_timeout, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_keepalive_timeout, upump_mgr)
UPIPE_HELPER_UCLOCK(upipe_srt_handshake, uclock, uclock_request, NULL, upipe_throw_provide_request, NULL)

UPIPE_HELPER_UREF_MGR(upipe_srt_handshake, uref_mgr, uref_mgr_request,
Expand Down Expand Up @@ -262,13 +264,24 @@ static struct uref *upipe_srt_handshake_alloc_hs(struct upipe *upipe, int ext_si
return uref;
}

static void upipe_srt_handshake_keepalive_timeout(struct upump *upump)
{
struct upipe *upipe = upump_get_opaque(upump, struct upipe *);
struct upipe_srt_handshake *upipe_srt_handshake = upipe_srt_handshake_from_upipe(upipe);

upipe_err(upipe, "No data in 10s");
upipe_throw_source_end(upipe);

upipe_srt_handshake->expect_conclusion = false;
}

static void upipe_srt_handshake_timeout(struct upump *upump)
{
struct upipe *upipe = upump_get_opaque(upump, struct upipe *);
struct upipe_srt_handshake *upipe_srt_handshake = upipe_srt_handshake_from_upipe(upipe);

upipe_err(upipe, "Connection timed out");

upipe_srt_handshake_set_upump_keepalive_timeout(upipe, NULL);
upipe_srt_handshake->expect_conclusion = false;
}

Expand Down Expand Up @@ -335,6 +348,7 @@ static struct upipe *upipe_srt_handshake_alloc(struct upipe_mgr *mgr,
upipe_srt_handshake_init_upump_mgr(upipe);
upipe_srt_handshake_init_upump_timer(upipe);
upipe_srt_handshake_init_upump_timeout(upipe);
upipe_srt_handshake_init_upump_keepalive_timeout(upipe);
upipe_srt_handshake_init_uclock(upipe);
upipe_srt_handshake_require_uclock(upipe);

Expand Down Expand Up @@ -510,6 +524,7 @@ static int _upipe_srt_handshake_control(struct upipe *upipe,
case UPIPE_ATTACH_UPUMP_MGR:
upipe_srt_handshake_set_upump_timer(upipe, NULL);
upipe_srt_handshake_set_upump_timeout(upipe, NULL);
upipe_srt_handshake_set_upump_keepalive_timeout(upipe, NULL);
return upipe_srt_handshake_attach_upump_mgr(upipe);

case UPIPE_SET_FLOW_DEF: {
Expand Down Expand Up @@ -1328,6 +1343,14 @@ static void upipe_srt_handshake_input(struct upipe *upipe, struct uref *uref,
return;
}

struct upump *upump =
upump_alloc_timer(upipe_srt_handshake->upump_mgr,
upipe_srt_handshake_keepalive_timeout,
upipe, upipe->refcount,
10*UCLOCK_FREQ, 0);
upump_start(upump);
upipe_srt_handshake_set_upump_keepalive_timeout(upipe, upump);

if (srt_get_packet_control(buf)) {
bool handled = false;
struct uref *reply = upipe_srt_handshake_input_control(upipe, buf, size, &handled);
Expand Down Expand Up @@ -1363,6 +1386,7 @@ static void upipe_srt_handshake_free(struct upipe *upipe)
upipe_srt_handshake_clean_output(upipe);
upipe_srt_handshake_clean_upump_timer(upipe);
upipe_srt_handshake_clean_upump_timeout(upipe);
upipe_srt_handshake_clean_upump_keepalive_timeout(upipe);
upipe_srt_handshake_clean_upump_mgr(upipe);
upipe_srt_handshake_clean_uclock(upipe);
upipe_srt_handshake_clean_ubuf_mgr(upipe);
Expand Down

0 comments on commit 371b0f9

Please sign in to comment.