Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export class ChannelError extends Error implements grpc.ServiceError {
interface StreamTracked {
stream?: PullStream;
receivedStatus?: boolean;
pingTimeout?: NodeJS.Timeout;
}

/**
Expand Down Expand Up @@ -227,6 +228,9 @@ export class MessageStream extends PassThrough {

for (let i = 0; i < this._streams.length; i++) {
const tracker = this._streams[i];
if (tracker.pingTimeout) {
clearTimeout(tracker.pingTimeout);
}
if (tracker.stream) {
this._removeStream(i, 'overall message stream destroyed', 'n/a');
}
Expand Down Expand Up @@ -254,6 +258,8 @@ export class MessageStream extends PassThrough {
tracker.stream = stream;
tracker.receivedStatus = false;

this._resetPingTimer(index);

stream
.on('error', err => this._onError(index, err))
.once('status', status => this._onStatus(index, status))
Expand All @@ -264,10 +270,30 @@ export class MessageStream extends PassThrough {
// Mark this stream as alive again. (reset backoff)
const tracker = this._streams[index];
this._retrier.reset(tracker);
this._resetPingTimer(index);

this.emit('data', data);
}

private _resetPingTimer(index: number): void {
const tracker = this._streams[index];
if (tracker.pingTimeout) {
clearTimeout(tracker.pingTimeout);
}
// We expect a packet from the server at least once every 30 seconds.
// Give it a 1-second grace period.
tracker.pingTimeout = setTimeout(() => {
this._removeStream(
index,
'stream inactive for longer than 30 seconds',
'will be retried',
);
this._retrier.retryLater(tracker, () =>
this._fillOne(index, undefined, 'retry'),
);
}, 31000);
}

/**
* Attempts to create and cache the desired number of StreamingPull requests.
* gRPC does not supply a way to confirm that a stream is connected, so our
Expand Down Expand Up @@ -347,6 +373,8 @@ export class MessageStream extends PassThrough {
maxOutstandingBytes: this._subscriber.useLegacyFlowControl
? 0
: this._subscriber.maxBytes,
clientId: 'node-pubsub',
protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities
};
const otherArgs = {
headers: {
Expand Down Expand Up @@ -511,6 +539,10 @@ export class MessageStream extends PassThrough {
whatNext?: string,
): void {
const tracker = this._streams[index];
if (tracker.pingTimeout) {
clearTimeout(tracker.pingTimeout);
tracker.pingTimeout = undefined;
}
if (tracker.stream) {
logs.subscriberStreams.info(
'closing stream %i; why: %s; next: %s',
Expand Down
67 changes: 67 additions & 0 deletions test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,24 @@
});

describe('keeping streams alive', () => {
it('should set protocolVersion in the initial packet', async () => {
// The special handling for messageStream and the spy below are
// so that we can test the initial message.
messageStream.destroy();

const spy = sandbox.spy(FakeGrpcStream.prototype, 'write');
const ms = new MessageStream(subscriber);
await ms.start();

assert.strictEqual(spy.callCount, 5);
const {args} = spy.firstCall;
const request = args[0] as any;

Check warning on line 537 in test/message-stream.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

assert.strictEqual(String(request.protocolVersion), '1');

ms.destroy();
});

it('should keep the streams alive', () => {
const frequency = 30000;
const stubs = client.streams.map(stream => {
Expand All @@ -536,6 +554,55 @@
assert.deepStrictEqual(data, {});
});
});

it('should close stream if no data received for 30 seconds', async () => {
messageStream.destroy();
client.streams.length = 0;

const ms = new MessageStream(subscriber);
await ms.start();

const streamCount = client.streams.length;
const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel'));

sandbox.clock.tick(32000);

cancelSpies.forEach(spy => {
assert.strictEqual(spy.callCount, 1);
});

// The retry minimum backoff is 100ms.
sandbox.clock.tick(150);

// The streams are restarted, wait for next tick for fill stream pool
await promisify(process.nextTick)();
assert.ok(client.streams.length > streamCount);

ms.destroy();
});

it('should not close stream if data received within 30 seconds', async () => {
messageStream.destroy();
// client.streams.length = 0;

const ms = new MessageStream(subscriber);
await ms.start();

const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel'));

sandbox.clock.tick(20000);

// Simulating data prevents timeout
client.streams.forEach(s => s.emit('data', {}));

sandbox.clock.tick(20000);

cancelSpies.forEach(spy => {
assert.strictEqual(spy.callCount, 0);
});

ms.destroy();
});
});

it('should allow updating the ack deadline', async () => {
Expand Down
Loading