Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions benchmark/fs/bench-filehandle-pipetosync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Flags: --experimental-stream-iter
// Benchmark: pipeToSync with sync compression transforms.
// Measures fully synchronous file-to-file pipeline (no threadpool, no promises).
'use strict';

const common = require('../common.js');
const fs = require('fs');
const { openSync, closeSync, writeSync, readFileSync, unlinkSync } = fs;

const tmpdir = require('../../test/common/tmpdir');
tmpdir.refresh();
const srcFile = tmpdir.resolve(`.removeme-sync-bench-src-${process.pid}`);
const dstFile = tmpdir.resolve(`.removeme-sync-bench-dst-${process.pid}`);

const bench = common.createBenchmark(main, {
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
});

function main({ compression, filesize, n }) {
// Create the fixture file with repeating lowercase ASCII
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
const fd = openSync(srcFile, 'w');
let remaining = filesize;
while (remaining > 0) {
const toWrite = Math.min(remaining, chunk.length);
writeSync(fd, chunk, 0, toWrite);
remaining -= toWrite;
}
closeSync(fd);

const {
pipeToSync,
compressGzipSync,
compressDeflateSync,
compressBrotliSync,
compressZstdSync,
} = require('stream/iter');
const { open } = fs.promises;

const compressFactory = {
gzip: compressGzipSync,
deflate: compressDeflateSync,
brotli: compressBrotliSync,
zstd: compressZstdSync,
}[compression];

// Stateless uppercase transform (sync)
const upper = (chunks) => {
if (chunks === null) return null;
const out = new Array(chunks.length);
for (let j = 0; j < chunks.length; j++) {
const src = chunks[j];
const buf = Buffer.allocUnsafe(src.length);
for (let i = 0; i < src.length; i++) {
const b = src[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
out[j] = buf;
}
return out;
};

// Use a synchronous wrapper since pipeToSync is fully sync.
// We need FileHandle for pullSync/writer, so open async then run sync.
(async () => {
const srcFh = await open(srcFile, 'r');
const dstFh = await open(dstFile, 'w');

// Warm up
runSync(srcFh, dstFh, upper, compressFactory, pipeToSync);

// Reset file positions for the benchmark
await srcFh.close();
await dstFh.close();

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
const src = await open(srcFile, 'r');
const dst = await open(dstFile, 'w');
totalBytes += runSync(src, dst, upper, compressFactory, pipeToSync);
await src.close();
await dst.close();
}
bench.end(totalBytes / (1024 * 1024));

cleanup();
})();
}

function runSync(srcFh, dstFh, upper, compressFactory, pipeToSync) {
const w = dstFh.writer();
pipeToSync(srcFh.pullSync(upper, compressFactory()), w);

// Read back compressed size
return readFileSync(dstFile).length;
}

function cleanup() {
try { unlinkSync(srcFile); } catch { /* Ignore */ }
try { unlinkSync(dstFile); } catch { /* Ignore */ }
}
72 changes: 38 additions & 34 deletions benchmark/fs/bench-filehandle-pull-vs-webstream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Flags: --experimental-stream-iter
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
// reading a large file through two transforms: uppercase then gzip compress.
// reading a large file through two transforms: uppercase then compress.
'use strict';

const common = require('../common.js');
Expand All @@ -14,11 +14,20 @@ const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'pull'],
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
}, {
// Classic and webstream only support gzip (native zlib / CompressionStream).
// Brotli, deflate, zstd are pull-only via stream/iter transforms.
combinationFilter({ api, compression }) {
if (api === 'classic' && compression !== 'gzip') return false;
if (api === 'webstream' && compression !== 'gzip') return false;
return true;
},
});

function main({ api, filesize, n }) {
function main({ api, compression, filesize, n }) {
// Create the fixture file with repeating lowercase ASCII
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
const fd = fs.openSync(filename, 'w');
Expand All @@ -35,19 +44,28 @@ function main({ api, filesize, n }) {
} else if (api === 'webstream') {
benchWebStream(n, filesize).then(() => cleanup());
} else {
benchPull(n, filesize).then(() => cleanup());
benchPull(n, filesize, compression).then(() => cleanup());
}
}

function cleanup() {
try { fs.unlinkSync(filename); } catch { /* ignore */ }
}

// Stateless uppercase transform (shared by all paths)
function uppercaseChunk(chunk) {
const buf = Buffer.allocUnsafe(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
return buf;
}

// ---------------------------------------------------------------------------
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
// ---------------------------------------------------------------------------
async function benchClassic(n, filesize) {
// Warm up
await runClassic();

bench.start();
Expand All @@ -62,22 +80,14 @@ function runClassic() {
return new Promise((resolve, reject) => {
const rs = fs.createReadStream(filename);

// Transform 1: uppercase
const upper = new Transform({
transform(chunk, encoding, callback) {
const buf = Buffer.allocUnsafe(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
callback(null, buf);
callback(null, uppercaseChunk(chunk));
},
});

// Transform 2: gzip
const gz = zlib.createGzip();

// Sink: count compressed bytes
let totalBytes = 0;
const sink = new Writable({
write(chunk, encoding, callback) {
Expand All @@ -97,7 +107,6 @@ function runClassic() {
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
// ---------------------------------------------------------------------------
async function benchWebStream(n, filesize) {
// Warm up
await runWebStream();

bench.start();
Expand All @@ -113,22 +122,18 @@ async function runWebStream() {
try {
const rs = fh.readableWebStream();

// Transform 1: uppercase
const upper = new TransformStream({
transform(chunk, controller) {
const buf = new Uint8Array(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
// a-z (0x61-0x7a) -> A-Z (0x41-0x5a)
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
controller.enqueue(buf);
},
});

// Transform 2: gzip via CompressionStream
const compress = new CompressionStream('gzip');

const output = rs.pipeThrough(upper).pipeThrough(compress);
const reader = output.getReader();

Expand All @@ -145,45 +150,44 @@ async function runWebStream() {
}

// ---------------------------------------------------------------------------
// New streams path: pull() with uppercase transform + gzip transform
// Pull/iter path: pull() with uppercase transform + selected compression
// ---------------------------------------------------------------------------
async function benchPull(n, filesize) {
const { pull, compressGzip } = require('stream/iter');
async function benchPull(n, filesize, compression) {
const iter = require('stream/iter');

const compressFactory = {
gzip: iter.compressGzip,
deflate: iter.compressDeflate,
brotli: iter.compressBrotli,
zstd: iter.compressZstd,
}[compression];

// Warm up
await runPull(pull, compressGzip);
await runPull(compressFactory);

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runPull(pull, compressGzip);
totalBytes += await runPull(compressFactory);
}
bench.end(totalBytes / (1024 * 1024));
}

async function runPull(pull, compressGzip) {
async function runPull(compressFactory) {
const fh = await fs.promises.open(filename, 'r');
try {
// Stateless transform: uppercase each chunk in the batch
const upper = (chunks) => {
if (chunks === null) return null;
const out = new Array(chunks.length);
for (let j = 0; j < chunks.length; j++) {
const src = chunks[j];
const buf = new Uint8Array(src.length);
for (let i = 0; i < src.length; i++) {
const b = src[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
out[j] = buf;
out[j] = uppercaseChunk(chunks[j]);
}
return out;
};

const readable = fh.pull(upper, compressGzip());
const readable = fh.pull(upper, compressFactory());

// Count bytes symmetrically with the classic path (no final
// concatenation into a single buffer).
let totalBytes = 0;
for await (const chunks of readable) {
for (let i = 0; i < chunks.length; i++) {
Expand Down
Loading
Loading