Skip to content

Commit 780b890

Browse files
committed
TMP ms pacing with budget calcul
1 parent bbaf5bc commit 780b890

File tree

5 files changed

+116
-87
lines changed

5 files changed

+116
-87
lines changed

include/haproxy/quic_pacing-t.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
struct quic_pacer {
88
struct list frms;
99
const struct quic_cc_path *path;
10+
//int next;
11+
//unsigned int curr;
12+
//int pkt_ms;
13+
//int sent;
14+
int burst;
15+
int budget;
16+
int last_sent;
1017
int next;
11-
unsigned int curr;
12-
int pkt_ms;
13-
int sent;
1418
};
1519

1620
#endif /* _HAPROXY_QUIC_PACING_T_H */

include/haproxy/quic_pacing.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@ static inline void quic_pacing_init(struct quic_pacer *pacer,
1313
LIST_INIT(&pacer->frms);
1414
pacer->path = path;
1515
//pacer->next = TICK_ETERNITY;
16-
pacer->next = now_ms;
16+
//pacer->next = now_ms;
1717

1818
//pacer->curr = now_ms;
19-
pacer->curr = TICK_ETERNITY;
20-
pacer->pkt_ms = 0;
21-
pacer->sent = 0;
19+
//pacer->curr = TICK_ETERNITY;
20+
//pacer->pkt_ms = 0;
21+
//pacer->sent = 0;
22+
23+
pacer->last_sent = now_ms;
24+
//pacer->budget = global.tune.quic_frontend_max_tx_burst;
25+
pacer->budget = 0;
26+
pacer->burst = global.tune.quic_frontend_max_tx_burst;
27+
pacer->next = TICK_ETERNITY;
2228
}
2329

2430
static inline void quic_pacing_reset(struct quic_pacer *pacer)
@@ -47,7 +53,7 @@ static inline int quic_pacing_ns_pkt(const struct quic_pacer *pacer, int sent)
4753
return (pacer->path->cwnd / (pacer->path->mtu + 1)) / (pacer->path->loss.srtt + 1) + 1;
4854
}
4955

50-
int quic_pacing_expired(const struct quic_pacer *pacer);
56+
//int quic_pacing_expired(const struct quic_pacer *pacer);
5157

5258
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
5359

@@ -56,4 +62,6 @@ int quic_pacing_prepare(struct quic_pacer *pacer);
5662
//void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
5763
int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err);
5864

65+
int quic_pacing_next(struct quic_pacer *pacer);
66+
5967
#endif /* _HAPROXY_QUIC_PACING_H */

src/mux_quic.c

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ static void qcc_refresh_timeout(struct qcc *qcc)
277277

278278
void qcc_wakeup(struct qcc *qcc)
279279
{
280+
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
280281
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
281282
tasklet_wakeup(qcc->wait_event.tasklet);
282283

@@ -287,12 +288,23 @@ void qcc_wakeup(struct qcc *qcc)
287288

288289
static void qcc_wakeup_pacing(struct qcc *qcc)
289290
{
291+
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
292+
BUG_ON(LIST_ISEMPTY(&qcc->tx.pacer.frms));
290293
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
291294
tasklet_wakeup(qcc->wait_event.tasklet);
292-
//qcc->task->expire = qcc->tx.pacer.next;
293-
//BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
294-
//task_queue(qcc->task);
295-
//TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
295+
296+
qcc->task->expire = TICK_ETERNITY;
297+
task_queue(qcc->task);
298+
}
299+
300+
static void qcc_task_pacing(struct qcc *qcc)
301+
{
302+
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
303+
//HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
304+
qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next;
305+
BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
306+
task_queue(qcc->task);
307+
TRACE_POINT(QMUX_EV_STRM_WAKE, qcc->conn);
296308
}
297309

298310
/* Mark a stream as open if it was idle. This can be used on every
@@ -2176,6 +2188,7 @@ static int qcc_io_send(struct qcc *qcc)
21762188
*/
21772189

21782190
quic_pacing_reset(pacer);
2191+
//HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
21792192

21802193
/* Check for transport error. */
21812194
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
@@ -2277,9 +2290,17 @@ static int qcc_io_send(struct qcc *qcc)
22772290
}
22782291
}
22792292

2280-
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
2281-
qcc_wakeup_pacing(qcc);
2282-
return 1;
2293+
//if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
2294+
if (!LIST_ISEMPTY(frms)) {
2295+
if (!qcc->tx.pacer.budget) {
2296+
qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer));
2297+
//fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd);
2298+
qcc_task_pacing(qcc);
2299+
return 1;
2300+
}
2301+
//else {
2302+
// qcc_wakeup_pacing(qcc);
2303+
//}
22832304
}
22842305

22852306
/* Retry sending until no frame to send, data rejected or connection
@@ -2317,11 +2338,14 @@ static int qcc_io_send(struct qcc *qcc)
23172338

23182339
sent_done:
23192340
if (ret == 1) {
2341+
qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(pacer));
2342+
//fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd);
23202343
qcc_wakeup_pacing(qcc);
23212344
}
23222345
else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
23232346
/* Deallocate frames that the transport layer has rejected. */
23242347
quic_pacing_reset(pacer);
2348+
//HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
23252349
}
23262350

23272351
/* Re-insert on-error QCS at the end of the send-list. */
@@ -2643,6 +2667,7 @@ static void qcc_release(struct qcc *qcc)
26432667
}
26442668

26452669
quic_pacing_reset(&qcc->tx.pacer);
2670+
//HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
26462671

26472672
if (qcc->app_ops && qcc->app_ops->release)
26482673
qcc->app_ops->release(qcc->ctx);
@@ -2679,7 +2704,7 @@ static int qcc_purge_sending(struct qcc *qcc)
26792704
if (ret == QUIC_TX_ERR_AGAIN) {
26802705
BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
26812706
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
2682-
qcc_wakeup_pacing(qcc);
2707+
//qcc_wakeup_pacing(qcc);
26832708
return 1;
26842709
}
26852710
else if (ret == QUIC_TX_ERR_FATAL) {
@@ -2693,6 +2718,8 @@ static int qcc_purge_sending(struct qcc *qcc)
26932718
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
26942719
if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
26952720
qcc_subscribe_send(qcc);
2721+
//else
2722+
// HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
26962723
return 0;
26972724
}
26982725
}
@@ -2704,8 +2731,18 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
27042731
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
27052732

27062733
if (status & TASK_F_USR1) {
2734+
++activity[tid].ctr0;
2735+
//HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
27072736
//ABORT_NOW();
2708-
qcc_purge_sending(qcc);
2737+
if (qcc_purge_sending(qcc)) {
2738+
if (!qcc->tx.pacer.budget) {
2739+
qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer));
2740+
//fprintf(stderr, "wait for %ldms\n", qcc->tx.pacer.burst * qcc->tx.pacer.path->loss.srtt * qcc->tx.pacer.path->mtu / qcc->tx.pacer.path->cwnd);
2741+
qcc_task_pacing(qcc);
2742+
}
2743+
else
2744+
qcc_wakeup_pacing(qcc);
2745+
}
27092746
return NULL;
27102747
}
27112748

@@ -2745,21 +2782,21 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
27452782
goto requeue;
27462783
}
27472784
//fprintf(stderr, "woken up after %dms\n", now_ms - qcc->tx.pacer.next);
2748-
2749-
#if 0
2750-
if (!qcc_may_expire(qcc)) {
2751-
TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
2752-
t->expire = TICK_ETERNITY;
2753-
goto requeue;
2754-
}
2755-
#endif
27562785
}
27572786

2787+
++activity[tid].ctr1;
27582788
if (qcc_purge_sending(qcc)) {
2759-
qcc->task->expire = qcc->tx.pacer.next;
2760-
BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
2761-
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
2762-
goto requeue;
2789+
//qcc->task->expire = qcc->tx.pacer.next;
2790+
if (!qcc->tx.pacer.budget) {
2791+
qcc->tx.pacer.next = tick_add(now_ms, quic_pacing_next(&qcc->tx.pacer));
2792+
qcc->task->expire = now_ms == qcc->tx.pacer.next ? tick_add(qcc->tx.pacer.next, 1) : qcc->tx.pacer.next;
2793+
BUG_ON(tick_is_expired(qcc->task->expire, now_ms));
2794+
TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
2795+
goto requeue;
2796+
}
2797+
else {
2798+
qcc_wakeup_pacing(qcc);
2799+
}
27632800
}
27642801
t->expire = TICK_ETERNITY;
27652802
goto requeue;

src/quic_pacing.c

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,84 +5,59 @@
55

66
struct quic_conn;
77

8-
int quic_pacing_expired(const struct quic_pacer *pacer)
9-
{
10-
//return !pacer->next || pacer->next <= now_mono_time();
11-
//return !pacer->next || pacer->next <= now_ms;
12-
return tick_is_expired(pacer->next, now_ms);
13-
}
8+
//int quic_pacing_expired(const struct quic_pacer *pacer)
9+
//{
10+
// //return !pacer->next || pacer->next <= now_mono_time();
11+
// //return !pacer->next || pacer->next <= now_ms;
12+
// return tick_is_expired(pacer->next, now_ms);
13+
//}
1414

1515
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
1616
{
1717
enum quic_tx_err ret;
1818

19-
if (!quic_pacing_expired(pacer))
20-
return QUIC_TX_ERR_AGAIN;
19+
//if (!quic_pacing_expired(pacer))
20+
//if (!pacer->budget)
21+
// return QUIC_TX_ERR_AGAIN;
2122

2223
BUG_ON(LIST_ISEMPTY(&pacer->frms));
2324
ret = qc_send_mux(qc, &pacer->frms, pacer);
24-
BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
25+
//BUG_ON(ret == QUIC_TX_ERR_AGAIN && tick_is_expired(pacer->next, now_ms));
2526

2627
/* TODO handle QUIC_TX_ERR_FATAL */
2728
return ret;
2829
}
2930

3031
int quic_pacing_prepare(struct quic_pacer *pacer)
3132
{
32-
if (pacer->curr == now_ms) {
33-
BUG_ON(pacer->sent > pacer->pkt_ms);
34-
return pacer->pkt_ms - pacer->sent;
35-
}
36-
else {
37-
int not_consumed = pacer->pkt_ms - pacer->sent;
38-
BUG_ON(not_consumed < 0);
39-
//if (not_consumed)
40-
// fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pacer->pkt_ms, pacer->sent);
33+
int idle = tick_remain(pacer->last_sent, now_ms);
34+
int pkts = idle * pacer->path->cwnd / (pacer->path->loss.srtt * pacer->path->mtu + 1);
4135

42-
pacer->curr = now_ms;
43-
pacer->sent = 0;
44-
pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0);
45-
//pacer->pkt_ms = quic_pacing_ns_pkt(pacer, 0) + not_consumed;
36+
TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
4637

47-
BUG_ON(!pacer->pkt_ms);
48-
return pacer->pkt_ms;
38+
pacer->budget += pkts;
39+
if (pacer->budget > pacer->burst * 2) {
40+
TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
41+
pacer->budget = pacer->burst * 2;
4942
}
43+
//fprintf(stderr, "prepare = %d %d/%d\n", pkts, pacer->budget, pacer->burst);
44+
return MIN(pacer->budget, pacer->burst);
45+
}
5046

47+
int quic_pacing_next(struct quic_pacer *pacer)
48+
{
49+
//return (pacer->burst / 4) * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd;
50+
return 1;
5151
}
5252

5353
int quic_pacing_sent_done(struct quic_pacer *pacer, int sent, enum quic_tx_err err)
5454
{
55-
//const int pkt_ms = quic_pacing_ns_pkt(pacer, 1);
56-
57-
#if 0
58-
if (pacer->curr == now_ms) {
59-
pacer->sent += sent;
60-
}
61-
else {
62-
int not_consumed = pkt_ms - pacer->sent;
63-
if (not_consumed < 0)
64-
not_consumed = 0;
65-
if (not_consumed)
66-
fprintf(stderr, "not consumed %d (%d - %d)\n", not_consumed, pkt_ms, pacer->sent);
67-
68-
//pacer->sent = 0;
69-
//pacer->sent -= not_consumed;
70-
71-
pacer->curr = now_ms;
72-
pacer->sent = sent;
73-
}
74-
#endif
75-
BUG_ON(pacer->curr != now_ms);
76-
pacer->sent += sent;
77-
78-
if (pacer->sent >= pacer->pkt_ms) {
79-
//pacer->next = tick_add(now_ms, 1);
80-
pacer->next = tick_add(now_ms, MAX((pacer->sent / pacer->pkt_ms), 1));
81-
BUG_ON(tick_is_expired(pacer->next, now_ms));
82-
//fprintf(stderr, "pacing in %dms (%d / %d)\n", pacer->sent / pkt_ms, pacer->sent, pkt_ms);
83-
return 1;
84-
}
85-
else {
86-
return 0;
55+
BUG_ON(sent > pacer->budget);
56+
TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
57+
pacer->budget -= sent;
58+
if (sent) {
59+
TRACE_POINT(QMUX_EV_QCC_WAKE, NULL);
60+
pacer->last_sent = now_ms;
8761
}
62+
return 0;
8863
}

src/quic_tx.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,9 +513,12 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
513513
}
514514
#endif
515515
max_dgram = quic_pacing_prepare(pacer);
516-
BUG_ON(!max_dgram);
517-
if (!max_dgram)
516+
//BUG_ON(!max_dgram);
517+
if (!max_dgram) {
518+
pacer->next = tick_add(now_ms, quic_pacing_next(pacer));
519+
//fprintf(stderr, "wait for %ldms\n", pacer->burst * pacer->path->loss.srtt * pacer->path->mtu / pacer->path->cwnd);
518520
return QUIC_TX_ERR_AGAIN;
521+
}
519522
}
520523

521524
TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
@@ -638,6 +641,7 @@ static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
638641
// goto out;
639642
// }
640643
//}
644+
BUG_ON(max_dgrams && dgram_cnt > max_dgrams);
641645
if (max_dgrams && dgram_cnt == max_dgrams) {
642646
BUG_ON(LIST_ISEMPTY(frms));
643647
TRACE_PROTO("reached max allowed built datagrams", QUIC_EV_CONN_PHPKTS, qc, qel);
@@ -890,6 +894,7 @@ int qc_send(struct quic_conn *qc, int old_data, struct list *send_list,
890894
}
891895

892896
ret += prep_pkts;
897+
BUG_ON(max_dgrams && ret > max_dgrams);
893898
if (max_dgrams && ret == max_dgrams && !LIST_ISEMPTY(send_list)) {
894899
TRACE_DEVEL("stopping for artificial pacing", QUIC_EV_CONN_TXPKT, qc);
895900
break;

0 commit comments

Comments
 (0)