Streamed upload to s3 with rusoto
Okay. Strap yourself in, this is a fun one.
StreamingBody
is an alias for ByteStream
, which itself takes a parameter type S: Stream<Item = Bytes, Error = Error> + Send + 'static
. In short, it needs to be a stream of bytes.
BufReader
, evidently, does not implement this trait, as it predates futures and streams by a long while. There is also no easy conversion to Stream<Item = Bytes>
that you can use to implicitly convert into this.
The reason the first (commented) example works is because String::into_bytes().into()
will follow the typecast chain: String
-> Vec<u8>
-> ByteStream
thanks to the implementation of From<Vec<u8>>
on ByteStream
.
Now that we know why this doesn't work, we can fix it. There is a fast way, and then there is a right way. I'll show you both.
The fast way
The fast (but not optimal) way is simply to call File::read_to_end()
. This will fill up a Vec<u8>
, which you can then use like you did before:
let mut buf:Vec<u8> = vec![];
file.read_to_end(&mut buf)?;
// buf now contains the entire file
This is inefficient and suboptimal for two reasons:
read_to_end()
is a blocking call. Based on where you are reading the file from, this blocking time may prove unreasonable- You are required to have more free RAM than you have bytes in your file (+ either 64 or 128 bits for the
Vec
definition + some extra we don't really care about)
The good way
The good way turns your file into a structure implementing AsyncRead
. From this, we can then form a Stream
.
Since you already have a std::fs::File
, we will first convert it into a tokio::fs::File
. This implements AsyncRead
, which is very important for later:
let tokio_file = tokio::fs::File::from_std(file);
From this, we sadly need to do some pipework to get it into a Stream
. Multiple crates have implemented it; the way to do so from scratch is the following:
use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
.map(|r| r.as_ref().to_vec());
byte_stream
is an instance of tokio_util::codec::FramedRead
which implements Stream
with a specific item based on our decoder. As our decoder is BytesCodec
, your stream is therefore Stream<Item = BytesMut>
.
As the playground doesn't know rusoto_core
, I cannot show you the full flow. I can, however, show you that you can generate a Stream<Item = Vec<u8>, Error = io::Error>
, which is the crux of this: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133
Here's a version with the upcoming Rusoto async-await syntax (for getObject although should be straightforward to tweak for upload)... possibly out for public consumption in Rusoto 0.4.3:
https://github.com/brainstorm/rusoto-s3-async-await
Namely:
pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
let get_req = GetObjectRequest {
bucket,
key: object,
..Default::default()
};
let result = client
.get_object(get_req)
.await
.expect("Couldn't GET object");
println!("get object result: {:#?}", result);
let stream = result.body.unwrap();
let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();
assert!(body.len() > 0);
dbg!(body);
}
Which is essentially borrowed from the integration testsuite itself, where you can find snippets of the upload version too.