How to use ES8 async/await with streams?

If you are using node version >= v10.0.0 then you can use stream.pipeline and util.promisify.

const fs = require('fs');
const crypto = require('crypto');
const util = require('util');
const stream = require('stream');

const pipeline = util.promisify(stream.pipeline);

const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

async function run() {
  await pipeline(
    fs.createReadStream('/some/file/name.txt'),
    hash
  );
  console.log('Pipeline succeeded');
}

run().catch(console.error);

Node V15 now has a promisfiy pipeline in stream/promises. This is the cleanest and most official way.

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

We all should appreciate how much works it's done here:

  • Capture errors in all the streams.
  • Destroy unfinished streams when error is raised.
  • Only return when the last writable stream is finished.

This pipe thing is one of the most powerful feature Node.JS has. Making it fully async is not easy. Now we have it.


Something like this works:

for (var res of fetchResponses){ //node-fetch package responses
    const dest = fs.createWriteStream(filePath,{flags:'a'});
    totalBytes += Number(res.headers.get('content-length'));
    await new Promise((resolve, reject) => {
        res.body.pipe(dest);
        res.body.on("error", (err) => {
            reject(err);
        });
        dest.on("finish", function() {
            resolve();
        });
    });         
}

async/await only works with promises, not with streams. There are ideas to make an extra stream-like data type that would get its own syntax, but those are highly experimental if at all and I won't go into details.

Anyway, your callback is only waiting for the end of the stream, which is a perfect fit for a promise. You'd just have to wrap the stream:

var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');
// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

var end = new Promise(function(resolve, reject) {
    hash.on('end', () => resolve(hash.read()));
    fd.on('error', reject); // or something like that. might need to close `hash`
});

Now you can await that promise:

(async function() {
    let sha1sum = await end;
    console.log(sha1sum);
}());