Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bytes no longer implements Extend<u8> #324

Open
sfackler opened this issue Nov 27, 2019 · 9 comments
Open

Bytes no longer implements Extend<u8> #324

sfackler opened this issue Nov 27, 2019 · 9 comments

Comments

@sfackler
Copy link
Contributor

This means that in particular you can't use StreamExt::concat on a stream of Bytes. Is that change intentional or something that got lost in the rewrites?

@carllerche
Copy link
Member

I think it was accidental. @seanmonstar ?

@seanmonstar
Copy link
Member

Partly intentional, partly just punted. It was intentionally removed in the pass that removed all mutation of a Bytes. Should we just add in a simple Extend<u8> for Bytes?

Another idea is to encourage using some aggregate utility instead, which would aggregate into some BufList.

@dylanede
Copy link

dylanede commented Dec 6, 2019

If Bytes doesn't implement Extend<u8>, then code like this in the implementation of warp will no longer work:

body.try_concat().await

where body is a hyper Body, which is a Stream that produces Bytes. try_concat relies on the output of the stream being Extend<u8> for this to still compile.

Unless you can think of a suitable replacement for that use of try_concat?

@danieleades
Copy link

body.try_concat().await led me here too.

without this i have-

let mut response = request.await?.into_body();
let mut v = Vec::default();

while let Some(bytes_result) = response.next().await {
    let bytes = bytes_result?;
    v.extend(bytes)
}

or maybe

let bytes = response_body
        .try_fold(Vec::default(), |mut v, bytes| async {
            v.extend(bytes);
            Ok(v)
        })
        .await?;

or worst of all,

let mut response = request.await?.into_body();
let chunks: Vec<Vec<u8>> = response.collect().await?;
let bytes: Vec<u8> = chunks.into_iter().flatten().collect();

none of which i love

@danieleades
Copy link

danieleades commented Dec 28, 2019

you could implement it directly on hyper::Body?

/// Returns a future of the concatenated raw bytes from the [`Body`](Body).
/// 
/// # Example
/// ```
/// # async {
/// # let body = hyper::Body::empty()
/// let bytes = body.concat().await?;
/// # Ok(())
/// # };
/// ```
#[cfg(feature = "stream")]
pub async fn concat(mut self) -> crate::Result<Bytes> {
    let mut buf = BytesMut::new();
    while let Some(bytes) = self.next().await {
        buf.extend(bytes?)
    }
    Ok(buf.freeze())
}

@cetra3
Copy link

cetra3 commented Jan 21, 2020

Trying to update my code and I am hitting this bug:

https://github.com/cetra3/mpart-async/blob/541a2e7680d5a0f77f73940b651b408ead720f26/src/filestream.rs#L59

The workaround I had was as follows:

let bytes = FileStream::new("Cargo.toml").try_fold(BytesMut::new(), |mut buf, bytes| {
    buf.extend_from_slice(&bytes);
    future::ready(Ok(buf))
})
.await?;

I can submit a PR for this if needed? I think it makes things a lot more ergonomic if we can use try_concat():

let bytes = FileStream::new("Cargo.toml")
  .try_concat()
  .await?;

@carllerche
Copy link
Member

Tokio will be providing a version of collect that supports Bytes. It is on master but not released yet.

@NeoLegends
Copy link

NeoLegends commented Oct 17, 2020

PSA: https://docs.rs/tokio/0.2.22/tokio/stream/trait.FromStream.html is released.

Tokio 0.3 removed this impl again.

@taiki-e
Copy link
Member

taiki-e commented Oct 17, 2020

tokio 0.3 removed FromStream impl for bytes types... (tokio-rs/tokio#2879)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants