From 312b6f88ec35cf66979b56bafd40c7700c4cec5b Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Tue, 24 Feb 2026 15:38:35 -0500 Subject: [PATCH 1/5] feat: implement server to client stream keepalives --- src/message-stream.ts | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/message-stream.ts b/src/message-stream.ts index 60c404bcb..48712eb79 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -139,6 +139,7 @@ export class ChannelError extends Error implements grpc.ServiceError { interface StreamTracked { stream?: PullStream; receivedStatus?: boolean; + pingTimeout?: NodeJS.Timeout; } /** @@ -227,6 +228,9 @@ export class MessageStream extends PassThrough { for (let i = 0; i < this._streams.length; i++) { const tracker = this._streams[i]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + } if (tracker.stream) { this._removeStream(i, 'overall message stream destroyed', 'n/a'); } @@ -254,6 +258,8 @@ export class MessageStream extends PassThrough { tracker.stream = stream; tracker.receivedStatus = false; + this._resetPingTimer(index); + stream .on('error', err => this._onError(index, err)) .once('status', status => this._onStatus(index, status)) @@ -264,10 +270,30 @@ export class MessageStream extends PassThrough { // Mark this stream as alive again. (reset backoff) const tracker = this._streams[index]; this._retrier.reset(tracker); + this._resetPingTimer(index); this.emit('data', data); } + private _resetPingTimer(index: number): void { + const tracker = this._streams[index]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + } + // We expect a packet from the server at least once every 30 seconds. + // Give it a 1-second grace period. + tracker.pingTimeout = setTimeout(() => { + this._removeStream( + index, + 'stream inactive for longer than 30 seconds', + 'will be retried' + ); + this._retrier.retryLater(tracker, () => + this._fillOne(index, undefined, 'retry') + ); + }, 31000); + } + /** * Attempts to create and cache the desired number of StreamingPull requests. * gRPC does not supply a way to confirm that a stream is connected, so our @@ -347,6 +373,8 @@ export class MessageStream extends PassThrough { maxOutstandingBytes: this._subscriber.useLegacyFlowControl ? 0 : this._subscriber.maxBytes, + clientId: 'node-pubsub', + protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities }; const otherArgs = { headers: { @@ -511,6 +539,10 @@ export class MessageStream extends PassThrough { whatNext?: string, ): void { const tracker = this._streams[index]; + if (tracker.pingTimeout) { + clearTimeout(tracker.pingTimeout); + tracker.pingTimeout = undefined; + } if (tracker.stream) { logs.subscriberStreams.info( 'closing stream %i; why: %s; next: %s', From a680652a2b1f911f4bf2bef6601ec317ff36a7d3 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 24 Feb 2026 20:44:21 +0000 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- src/message-stream.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/message-stream.ts b/src/message-stream.ts index 48712eb79..cdfad5903 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -286,10 +286,10 @@ export class MessageStream extends PassThrough { this._removeStream( index, 'stream inactive for longer than 30 seconds', - 'will be retried' + 'will be retried', ); this._retrier.retryLater(tracker, () => - this._fillOne(index, undefined, 'retry') + this._fillOne(index, undefined, 'retry'), ); }, 31000); } From d8ca48ccf91bd2bce30b3d9aca28f1f6110ae828 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 24 Feb 2026 20:46:34 +0000 Subject: [PATCH 3/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- src/message-stream.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/message-stream.ts b/src/message-stream.ts index 48712eb79..cdfad5903 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -286,10 +286,10 @@ export class MessageStream extends PassThrough { this._removeStream( index, 'stream inactive for longer than 30 seconds', - 'will be retried' + 'will be retried', ); this._retrier.retryLater(tracker, () => - this._fillOne(index, undefined, 'retry') + this._fillOne(index, undefined, 'retry'), ); }, 31000); } From 4015ed7d750c81ce966bd7204109f97405abc3ac Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Tue, 3 Mar 2026 15:40:40 -0500 Subject: [PATCH 4/5] tests: add unit tests for keepalives --- test/message-stream.ts | 67 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/message-stream.ts b/test/message-stream.ts index 2463d17eb..c9624575b 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -523,6 +523,24 @@ describe('MessageStream', () => { }); describe('keeping streams alive', () => { + it('should set protocolVersion in the initial packet', async () => { + // The special handling for messageStream and the spy below are + // so that we can test the initial message. + messageStream.destroy(); + + const spy = sandbox.spy(FakeGrpcStream.prototype, 'write'); + const ms = new MessageStream(subscriber); + await ms.start(); + + assert.strictEqual(spy.callCount, 5); + const {args} = spy.firstCall; + const request = args[0] as any; + + assert.strictEqual(String(request.protocolVersion), '1'); + + ms.destroy(); + }); + it('should keep the streams alive', () => { const frequency = 30000; const stubs = client.streams.map(stream => { @@ -536,6 +554,55 @@ describe('MessageStream', () => { assert.deepStrictEqual(data, {}); }); }); + + it('should close stream if no data received for 30 seconds', async () => { + messageStream.destroy(); + client.streams.length = 0; + + const ms = new MessageStream(subscriber); + await ms.start(); + + const streamCount = client.streams.length; + const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); + + sandbox.clock.tick(32000); + + cancelSpies.forEach(spy => { + assert.strictEqual(spy.callCount, 1); + }); + + // The retry minimum backoff is 100ms. + sandbox.clock.tick(150); + + // The streams are restarted, wait for next tick for fill stream pool + await promisify(process.nextTick)(); + assert.ok(client.streams.length > streamCount); + + ms.destroy(); + }); + + it('should not close stream if data received within 30 seconds', async () => { + messageStream.destroy(); + // client.streams.length = 0; + + const ms = new MessageStream(subscriber); + await ms.start(); + + const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); + + sandbox.clock.tick(20000); + + // Simulating data prevents timeout + client.streams.forEach(s => s.emit('data', {})); + + sandbox.clock.tick(20000); + + cancelSpies.forEach(spy => { + assert.strictEqual(spy.callCount, 0); + }); + + ms.destroy(); + }); }); it('should allow updating the ack deadline', async () => { From b736ce8b3e238e1bd4b9b7a2db96ce102cbe3427 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 3 Mar 2026 20:46:36 +0000 Subject: [PATCH 5/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- test/message-stream.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/message-stream.ts b/test/message-stream.ts index c9624575b..21945b19e 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -537,7 +537,7 @@ describe('MessageStream', () => { const request = args[0] as any; assert.strictEqual(String(request.protocolVersion), '1'); - + ms.destroy(); }); @@ -564,7 +564,7 @@ describe('MessageStream', () => { const streamCount = client.streams.length; const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); - + sandbox.clock.tick(32000); cancelSpies.forEach(spy => { @@ -589,12 +589,12 @@ describe('MessageStream', () => { await ms.start(); const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel')); - + sandbox.clock.tick(20000); - + // Simulating data prevents timeout client.streams.forEach(s => s.emit('data', {})); - + sandbox.clock.tick(20000); cancelSpies.forEach(spy => {