Skip to content

Commit f423c98

Browse files
committed
MINOR: quic: refactor buffered STREAM ACK consuming
1 parent 290c6a5 commit f423c98

File tree

2 files changed

+76
-121
lines changed

2 files changed

+76
-121
lines changed

src/quic_rx.c

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -201,71 +201,6 @@ static int qc_pkt_decrypt(struct quic_conn *qc, struct quic_enc_level *qel,
201201
return ret;
202202
}
203203

204-
/* Remove from <stream> the acknowledged frames.
205-
*
206-
* Returns 1 if at least one frame was removed else 0.
207-
*/
208-
static int quic_stream_try_to_consume(struct quic_conn *qc,
209-
struct qc_stream_desc *stream)
210-
{
211-
int ret;
212-
struct eb64_node *frm_node;
213-
struct qc_stream_buf *stream_buf;
214-
struct eb64_node *buf_node;
215-
216-
TRACE_ENTER(QUIC_EV_CONN_ACKSTRM, qc);
217-
218-
ret = 0;
219-
buf_node = eb64_first(&stream->buf_tree);
220-
if (buf_node) {
221-
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
222-
offset_node);
223-
224-
frm_node = eb64_first(&stream_buf->acked_frms);
225-
while (frm_node) {
226-
struct qf_stream *strm_frm;
227-
struct quic_frame *frm;
228-
size_t offset;
229-
230-
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
231-
frm = container_of(strm_frm, struct quic_frame, stream);
232-
offset = strm_frm->offset.key;
233-
234-
if (offset > stream->ack_offset)
235-
break;
236-
237-
/* First delete frm from tree. This prevents BUG_ON() if
238-
* stream_buf instance is removed via qc_stream_desc_ack().
239-
*/
240-
eb64_delete(frm_node);
241-
242-
if (qc_stream_desc_ack(stream, frm)) {
243-
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
244-
qc, strm_frm, stream);
245-
ret = 1;
246-
}
247-
qc_release_frm(qc, frm);
248-
249-
/* Always retrieve first buffer as the previously used
250-
* instance could have been removed during qc_stream_desc_ack().
251-
*/
252-
buf_node = eb64_first(&stream->buf_tree);
253-
if (buf_node) {
254-
stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
255-
offset_node);
256-
frm_node = eb64_first(&stream_buf->acked_frms);
257-
BUG_ON(!frm_node && !b_data(&stream_buf->buf));
258-
}
259-
else {
260-
frm_node = NULL;
261-
}
262-
}
263-
}
264-
265-
TRACE_LEAVE(QUIC_EV_CONN_ACKSTRM, qc);
266-
return ret;
267-
}
268-
269204
/* Handle <frm> frame whose packet it is attached to has just been acknowledged. The memory allocated
270205
* for this frame will be at least released in every cases.
271206
* Never fail.
@@ -281,7 +216,6 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
281216
struct qf_stream *strm_frm = &frm->stream;
282217
struct eb64_node *node = NULL;
283218
struct qc_stream_desc *stream = NULL;
284-
int ack;
285219

286220
/* do not use strm_frm->stream as the qc_stream_desc instance
287221
* might be freed at this stage. Use the id to do a proper
@@ -299,14 +233,10 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
299233
}
300234
stream = eb64_entry(node, struct qc_stream_desc, by_id);
301235

302-
TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
303-
ack = qc_stream_desc_ack(stream, frm);
304-
if (!ack) {
305-
TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
236+
if (!qc_stream_desc_ack(stream, frm)) {
237+
TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM,
306238
qc, strm_frm, stream);
307239

308-
quic_stream_try_to_consume(qc, stream);
309-
310240
if (qc_stream_desc_done(stream)) {
311241
/* no need to continue if stream freed. */
312242
TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);

src/quic_stream.c

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -134,83 +134,108 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
134134
}
135135
}
136136

137+
/* Acknowledged data for buffer <buf> attached to <stream> instance. The
138+
* acknowledged STREAM starts at <offset> and is of length <len> with <fin>
139+
* sets for the last frame of the stream.
140+
*
141+
* Returns <buf> if there is still data to acknowledged after completing the
142+
* operation. Else, the next buffer instance of stream is returned if it exists
143+
* or NULL in the latter case.
144+
*/
145+
static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf,
146+
struct qc_stream_desc *stream,
147+
uint64_t offset, uint64_t len, int fin)
148+
{
149+
if (offset + len > stream->ack_offset) {
150+
const uint64_t diff = offset + len - stream->ack_offset;
151+
b_del(&buf->buf, diff);
152+
stream->ack_offset += diff;
153+
}
154+
155+
if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) {
156+
qc_stream_buf_free(stream, &buf);
157+
buf = NULL;
158+
}
159+
160+
if (fin) {
161+
/* Mark FIN as acknowledged. */
162+
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
163+
}
164+
165+
if (!buf && !eb_is_empty(&stream->buf_tree))
166+
buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
167+
return buf;
168+
}
169+
137170
/* Acknowledge <frm> STREAM frame whose content is managed by <stream>
138171
* descriptor.
139172
*
140173
* Returns 0 if the frame has been handled and can be removed.
141-
* Returns a positive value if the frame cannot be acknowledged and has been
142-
* buffered.
174+
* Returns a positive value if acknowledgement is out-of-order and
175+
* corresponding STREAM frame has been buffered.
143176
*/
144177
int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)
145178
{
146179
struct qf_stream *strm_frm = &frm->stream;
147-
uint64_t offset = strm_frm->offset.key;
148-
uint64_t len = strm_frm->len;
180+
size_t offset = strm_frm->offset.key;
181+
size_t len = strm_frm->len;
149182
int fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
150-
151183
struct qc_stream_buf *stream_buf = NULL;
152184
struct eb64_node *buf_node;
153-
struct buffer *buf = NULL;
154-
size_t diff;
185+
struct eb64_node *frm_node;
186+
int ret = 0;
155187

156188
/* Cannot advertise FIN for an inferior data range. */
157189
BUG_ON(fin && offset + len < stream->ack_offset);
158190

159-
if (offset + len < stream->ack_offset) {
160-
return 0;
191+
if (!len) {
192+
BUG_ON(!fin); /* An empty STREAM frame can only be used to advertise FIN */
193+
/* An empty FIN STREAM cannot be inferior to last ack offset. */
194+
BUG_ON(offset < stream->ack_offset);
195+
196+
/* Empty STREAM frame with FIN can be acknowledged immediately. */
197+
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
161198
}
162199
else if (offset > stream->ack_offset) {
163200
buf_node = eb64_lookup_le(&stream->buf_tree, offset);
164-
if (buf_node) {
165-
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
166-
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
167-
return 1;
168-
}
169-
else {
170-
ABORT_NOW();
171-
return 0;
172-
}
201+
BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */
202+
stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
203+
eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
204+
ret = 1;
173205
}
174-
175-
diff = offset + len - stream->ack_offset;
176-
if (diff) {
206+
else if (offset + len > stream->ack_offset) {
177207
/* Buf list cannot be empty if there is still unacked data. */
178208
BUG_ON(eb_is_empty(&stream->buf_tree));
179209

180210
/* get oldest buffer from buf tree */
181211
stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
182-
buf = &stream_buf->buf;
212+
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
183213

184-
stream->ack_offset += diff;
185-
b_del(buf, diff);
186-
187-
/* Free oldest buffer if all data acknowledged. */
188-
if (!b_data(buf)) {
189-
/* Remove buffered ACK before deleting buffer instance. */
190-
while (!eb_is_empty(&stream_buf->acked_frms)) {
191-
struct quic_conn *qc = stream->qc;
192-
struct eb64_node *frm_node;
193-
struct qf_stream *strm_frm;
194-
struct quic_frame *frm;
195-
196-
frm_node = eb64_first(&stream_buf->acked_frms);
197-
eb64_delete(frm_node);
198-
199-
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
200-
frm = container_of(strm_frm, struct quic_frame, stream);
201-
qc_release_frm(qc, frm);
202-
}
203-
qc_stream_buf_free(stream, &stream_buf);
204-
buf = NULL;
205-
}
206-
}
214+
/* some data were acknowledged, try to consume any remaining buffered ACK. */
215+
frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
216+
while (frm_node) {
217+
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
218+
frm = container_of(strm_frm, struct quic_frame, stream);
207219

208-
if (fin) {
209-
/* Mark FIN as acknowledged. */
210-
stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
220+
offset = strm_frm->offset.key;
221+
len = strm_frm->len;
222+
fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
223+
224+
if (offset > stream->ack_offset)
225+
break;
226+
227+
/* Delete frame before acknowledged it. This prevents BUG_ON()
228+
* on non-empty acked_frms tree when stream_buf is empty and removed.
229+
*/
230+
eb64_delete(frm_node);
231+
stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
232+
qc_release_frm(NULL, frm);
233+
234+
frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
235+
}
211236
}
212237

213-
return 0;
238+
return ret;
214239
}
215240

216241
/* Free the stream descriptor <stream> content. This function should be used

0 commit comments

Comments
 (0)