Skip to content

Commit 8d91aff

Browse files
committed
MAJOR: mux-quic: support pacing emission
1 parent d30bae4 commit 8d91aff

File tree

1 file changed

+74
-12
lines changed

1 file changed

+74
-12
lines changed

src/mux_quic.c

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
395395

396396
static void qcc_wakeup(struct qcc *qcc)
397397
{
398+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
399+
tasklet_wakeup(qcc->wait_event.tasklet);
400+
}
401+
402+
static void qcc_wakeup_pacing(struct qcc *qcc)
403+
{
404+
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
398405
tasklet_wakeup(qcc->wait_event.tasklet);
399406
}
400407

@@ -2083,18 +2090,18 @@ static int qcc_subscribe_send(struct qcc *qcc)
20832090
*
20842091
* Returns 0 if all data sent with success else non-zero.
20852092
*/
2086-
static int qcc_send_frames(struct qcc *qcc, struct list *frms)
2093+
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
20872094
{
20882095
enum quic_tx_err ret;
20892096

20902097
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
20912098

20922099
if (LIST_ISEMPTY(frms)) {
20932100
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
2094-
return 1;
2101+
return -1;
20952102
}
20962103

2097-
ret = qc_send_mux(qcc->conn->handle.qc, frms, 0);
2104+
ret = qc_send_mux(qcc->conn->handle.qc, frms, stream);
20982105
if (ret == QUIC_TX_ERR_FATAL) {
20992106
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
21002107
qcc_subscribe_send(qcc);
@@ -2104,18 +2111,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
21042111
/* If there is frames left at this stage, transport layer is blocked.
21052112
* Subscribe on it to retry later.
21062113
*/
2107-
if (!LIST_ISEMPTY(frms)) {
2114+
if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
21082115
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
21092116
qcc_subscribe_send(qcc);
21102117
goto err;
21112118
}
21122119

21132120
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
2114-
return 0;
2121+
return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
21152122

21162123
err:
21172124
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
2118-
return 1;
2125+
return -1;
21192126
}
21202127

21212128
/* Emit a RESET_STREAM on <qcs>.
@@ -2140,7 +2147,7 @@ static int qcs_send_reset(struct qcs *qcs)
21402147
frm->reset_stream.final_size = qcs->tx.fc.off_real;
21412148

21422149
LIST_APPEND(&frms, &frm->list);
2143-
if (qcc_send_frames(qcs->qcc, &frms)) {
2150+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21442151
if (!LIST_ISEMPTY(&frms))
21452152
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
21462153
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2191,7 +2198,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
21912198
frm->stop_sending.app_error_code = qcs->err;
21922199

21932200
LIST_APPEND(&frms, &frm->list);
2194-
if (qcc_send_frames(qcs->qcc, &frms)) {
2201+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21952202
if (!LIST_ISEMPTY(&frms))
21962203
qc_frm_free(qcc->conn->handle.qc, &frm);
21972204
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2263,7 +2270,7 @@ static int qcc_io_send(struct qcc *qcc)
22632270
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
22642271
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
22652272
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
2266-
int ret, total = 0, resent;
2273+
int ret = 0, total = 0, resent;
22672274

22682275
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
22692276

@@ -2273,6 +2280,8 @@ static int qcc_io_send(struct qcc *qcc)
22732280
* apply for STREAM frames.
22742281
*/
22752282

2283+
quic_pacing_reset(qcc_tx_pacer(qcc));
2284+
22762285
/* Check for transport error. */
22772286
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
22782287
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@@ -2297,7 +2306,7 @@ static int qcc_io_send(struct qcc *qcc)
22972306
}
22982307

22992308
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
2300-
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
2309+
if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
23012310
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
23022311
goto out;
23032312
}
@@ -2373,10 +2382,15 @@ static int qcc_io_send(struct qcc *qcc)
23732382
}
23742383
}
23752384

2385+
if (!quic_pacing_expired(pacer)) {
2386+
qcc_wakeup_pacing(qcc);
2387+
return 1;
2388+
}
2389+
23762390
/* Retry sending until no frame to send, data rejected or connection
23772391
* flow-control limit reached.
23782392
*/
2379-
while ((ret = qcc_send_frames(qcc, quic_pacing_frms(pacer))) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
2393+
while ((ret = qcc_send_frames(qcc, quic_pacing_frms(pacer), 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
23802394
window_conn = qfctl_rcap(&qcc->tx.fc);
23812395
resent = 0;
23822396

@@ -2408,7 +2422,10 @@ static int qcc_io_send(struct qcc *qcc)
24082422

24092423
sent_done:
24102424
/* Deallocate frames that the transport layer has rejected. */
2411-
if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
2425+
if (ret == 1) {
2426+
qcc_wakeup_pacing(qcc);
2427+
}
2428+
else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
24122429
struct quic_frame *frm, *frm2;
24132430

24142431
list_for_each_entry_safe(frm, frm2, quic_pacing_frms(pacer), list)
@@ -2760,12 +2777,57 @@ static void qcc_release(struct qcc *qcc)
27602777
TRACE_LEAVE(QMUX_EV_QCC_END);
27612778
}
27622779

2780+
static void qcc_purge_sending(struct qcc *qcc)
2781+
{
2782+
#if 0
2783+
int ret;
2784+
2785+
if (qcc->tx.next > now_mono_time()) {
2786+
qcc_wakeup_pacing(qcc);
2787+
return 1;
2788+
}
2789+
2790+
//fprintf(stderr, "%s\n", __func__);
2791+
ret = qcc_send_frames(qcc, &qcc->tx.frms, 1);
2792+
if (ret > 0) {
2793+
//struct quic_conn *qc = qcc->conn->handle.qc;
2794+
//qcc->tx.next = now_ns + global.tune.pipesize;
2795+
//qcc->tx.next = now_mono_time() + qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1);
2796+
qcc_wakeup_pacing(qcc);
2797+
return 1;
2798+
}
2799+
#endif
2800+
2801+
struct quic_conn *qc = qcc->conn->handle.qc;
2802+
enum quic_tx_err ret;
2803+
2804+
ret = quic_pacing_send(qcc_tx_pacer(qcc), qc);
2805+
if (ret == QUIC_TX_ERR_AGAIN) {
2806+
BUG_ON(LIST_ISEMPTY(&qcc_tx_pacer(qcc)->frms));
2807+
qcc_wakeup_pacing(qcc);
2808+
}
2809+
else if (ret == QUIC_TX_ERR_FATAL) {
2810+
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
2811+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
2812+
qcc_subscribe_send(qcc);
2813+
}
2814+
else {
2815+
if (!LIST_ISEMPTY(&qcc_tx_pacer(qcc)->frms))
2816+
qcc_subscribe_send(qcc);
2817+
}
2818+
}
2819+
27632820
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
27642821
{
27652822
struct qcc *qcc = ctx;
27662823

27672824
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
27682825

2826+
if (status & TASK_F_USR1) {
2827+
qcc_purge_sending(qcc);
2828+
return NULL;
2829+
}
2830+
27692831
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
27702832
qcc_io_send(qcc);
27712833

0 commit comments

Comments
 (0)