Pipe a stream to s3.upload()

None of the answers worked for me because I wanted to:

  • Pipe into s3.upload()
  • Pipe the result of s3.upload() into another stream

The accepted answer doesn't do the latter. The others rely on the promise api, which is cumbersome to work when working with stream pipes.

This is my modification of the accepted answer.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

A bit late answer, it might help someone else hopefully. You can return both writeable stream and the promise, so you can get response data when the upload finishes.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

And you can use the function as follows:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

Now you can either check promise:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

Or using async/await:

try {
    await promise;
    console.log('upload completed successfully');
} catch (error) {
    console.log('upload failed.', error.message);
}

Or as stream.pipe() returns stream.Writable, the destination (writeStream variable above), allowing for a chain of pipes, we can also use its events:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });

Wrap the S3 upload() function with the node.js stream.PassThrough() stream.

Here's an example:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}

In the accepted answer, the function ends before the upload is complete, and thus, it's incorrect. The code below pipes correctly from a readable stream.

Upload reference

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

You can also go a step further and output progress info using ManagedUpload as such:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

ManagedUpload reference

A list of available events