Skip to content

Commit b33a42c

Browse files
committed
port fix from futures to tokio
1 parent cdc1ed9 commit b33a42c

File tree

2 files changed

+32
-38
lines changed

2 files changed

+32
-38
lines changed

src/codec/flate/decoder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl Decode for FlateDecoder {
5151
match self.decode(input, output, FlushDecompress::None)? {
5252
Status::Ok => Ok(false),
5353
Status::StreamEnd => Ok(true),
54-
Status::BufError => Ok(true), // Waiting for more input.
54+
Status::BufError => Err(io::Error::new(io::ErrorKind::Other, "unexpected BufError")),
5555
}
5656
}
5757

src/tokio/bufread/generic/decoder.rs

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,49 +65,43 @@ impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
6565
) -> Poll<Result<()>> {
6666
let mut this = self.project();
6767

68+
let mut first = true;
69+
6870
loop {
6971
*this.state = match this.state {
7072
State::Decoding => {
71-
let fill_buf_result = this.reader.as_mut().poll_fill_buf(cx);
72-
73-
match fill_buf_result {
74-
Poll::Pending => {
75-
// Try to decode even if there is no new data.
76-
// Some data may be left in the internal state of the decoder
77-
// because there was not enough space in the output buffer.
78-
let written_before = output.written().len();
79-
80-
let mut input: Vec<u8> = vec![];
81-
let mut input = PartialBuffer::new(input);
82-
let done = this.decoder.decode(&mut input, output)?;
83-
if output.written().len() == written_before {
84-
return Poll::Pending;
85-
}
73+
let input = if first {
74+
&[][..]
75+
} else {
76+
ready!(this.reader.as_mut().poll_fill_buf(cx))?
77+
};
8678

87-
if done {
88-
State::Flushing
89-
} else {
90-
State::Decoding
91-
}
92-
}
93-
Poll::Ready(input) => {
94-
let input = input?;
95-
if input.is_empty() {
96-
// Avoid attempting to reinitialise the decoder if the reader
97-
// has returned EOF.
98-
*this.multiple_members = false;
99-
State::Flushing
79+
if input.is_empty() && !first {
80+
// Avoid attempting to reinitialise the decoder if the reader
81+
// has returned EOF.
82+
*this.multiple_members = false;
83+
84+
State::Flushing
85+
} else {
86+
let mut input = PartialBuffer::new(input);
87+
let done = this.decoder.decode(&mut input, output).or_else(|err| {
88+
// ignore the first error, occurs when input is empty
89+
// but we need to run decode to flush
90+
if first {
91+
Ok(false)
10092
} else {
101-
let mut input = PartialBuffer::new(input);
102-
let done = this.decoder.decode(&mut input, output)?;
103-
let len = input.written().len();
104-
this.reader.as_mut().consume(len);
105-
if done {
106-
State::Flushing
107-
} else {
108-
State::Decoding
109-
}
93+
Err(err)
11094
}
95+
})?;
96+
97+
first = false;
98+
99+
let len = input.written().len();
100+
this.reader.as_mut().consume(len);
101+
if done {
102+
State::Flushing
103+
} else {
104+
State::Decoding
111105
}
112106
}
113107
}

0 commit comments

Comments
 (0)