Skip to content

Commit 4e62e12

Browse files
committed
Bring typescript benchmark client to parity with rust
1 parent c29a44c commit 4e62e12

13 files changed

Lines changed: 357 additions & 378 deletions

File tree

crates/bindings-typescript/src/lib/binary_writer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ export default class BinaryWriter {
4242
this.buffer = typeof init === 'number' ? new ResizableBuffer(init) : init;
4343
}
4444

45+
clear() {
46+
this.offset = 0;
47+
}
48+
4549
reset(buffer: ResizableBuffer) {
4650
this.buffer = buffer;
4751
this.offset = 0;

crates/bindings-typescript/src/sdk/db_connection_impl.ts

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ import {
3737
type PendingCallback,
3838
type TableUpdate as CacheTableUpdate,
3939
} from './table_cache.ts';
40-
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts';
41-
import type { WebsocketTestAdapter } from './websocket_test_adapter.ts';
40+
import {
41+
WebsocketDecompressAdapter,
42+
type WebsocketAdapter,
43+
} from './websocket_decompress_adapter.ts';
4244
import {
4345
SubscriptionBuilderImpl,
4446
SubscriptionHandleImpl,
@@ -146,7 +148,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
146148
#eventId = 0;
147149
#emitter: EventEmitter<ConnectionEvent>;
148150
#messageQueue = Promise.resolve();
149-
#outboundQueue: ClientMessage[] = [];
151+
#outboundQueue: Uint8Array[] = [];
150152
#subscriptionManager = new SubscriptionManager<RemoteModule>();
151153
#remoteModule: RemoteModule;
152154
#reducerCallbacks = new Map<
@@ -171,10 +173,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
171173
// private fields.
172174
// We use them in testing.
173175
private clientCache: ClientCache<RemoteModule>;
174-
private ws?: WebsocketDecompressAdapter | WebsocketTestAdapter;
175-
private wsPromise: Promise<
176-
WebsocketDecompressAdapter | WebsocketTestAdapter | undefined
177-
>;
176+
private ws?: WebsocketAdapter;
177+
private wsPromise: Promise<WebsocketAdapter | undefined>;
178178

179179
constructor({
180180
uri,
@@ -302,6 +302,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
302302
#makeReducers(def: RemoteModule): ReducersView<RemoteModule> {
303303
const out: Record<string, unknown> = {};
304304

305+
const writer = new BinaryWriter(1024);
306+
305307
for (const reducer of def.reducers) {
306308
const reducerName = reducer.name;
307309
const key = reducer.accessorName;
@@ -310,7 +312,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
310312
this.#reducerArgsSerializers[reducerName];
311313

312314
(out as any)[key] = (params: InferTypeOfRow<typeof reducer.params>) => {
313-
const writer = new BinaryWriter(1024);
315+
writer.clear();
314316
serializeArgs(writer, params);
315317
const argsBuffer = writer.getBuffer();
316318
return this.callReducer(reducerName, argsBuffer, params);
@@ -323,6 +325,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
323325
#makeProcedures(def: RemoteModule): ProceduresView<RemoteModule> {
324326
const out: Record<string, unknown> = {};
325327

328+
const writer = new BinaryWriter(1024);
329+
326330
for (const procedure of def.procedures) {
327331
const procedureName = procedure.name;
328332
const key = procedure.accessorName;
@@ -333,7 +337,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
333337
(out as any)[key] = (
334338
params: InferTypeOfRow<typeof procedure.params>
335339
): Promise<any> => {
336-
const writer = new BinaryWriter(1024);
340+
writer.clear();
337341
serializeArgs(writer, params);
338342
const argsBuffer = writer.getBuffer();
339343
return this.callProcedure(procedureName, argsBuffer).then(returnBuf => {
@@ -537,41 +541,36 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
537541
return this.#mergeTableUpdates(updates);
538542
}
539543

540-
#sendEncoded(
541-
wsResolved: WebsocketDecompressAdapter | WebsocketTestAdapter,
542-
message: ClientMessage
543-
): void {
544-
stdbLogger(
545-
'trace',
546-
() => `Sending message to server: ${stringify(message)}`
547-
);
548-
const writer = new BinaryWriter(1024);
549-
ClientMessage.serialize(writer, message);
550-
const encoded = writer.getBuffer();
551-
wsResolved.send(encoded);
552-
}
553-
554-
#flushOutboundQueue(
555-
wsResolved: WebsocketDecompressAdapter | WebsocketTestAdapter
556-
): void {
557-
if (!this.isActive || this.#outboundQueue.length === 0) {
558-
return;
559-
}
544+
#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
560545
const pending = this.#outboundQueue.splice(0);
561546
for (const message of pending) {
562-
this.#sendEncoded(wsResolved, message);
547+
wsResolved.send(message);
563548
}
564549
}
565550

551+
#clientMessageEncoder = new BinaryWriter(1024);
566552
#sendMessage(message: ClientMessage): void {
567-
this.wsPromise.then(wsResolved => {
568-
if (!wsResolved || !this.isActive) {
569-
this.#outboundQueue.push(message);
570-
return;
571-
}
572-
this.#flushOutboundQueue(wsResolved);
573-
this.#sendEncoded(wsResolved, message);
574-
});
553+
const writer = this.#clientMessageEncoder;
554+
writer.clear();
555+
ClientMessage.serialize(writer, message);
556+
const encoded = writer.getBuffer();
557+
558+
if (this.ws && this.isActive) {
559+
if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws);
560+
561+
stdbLogger(
562+
'trace',
563+
() => `Sending message to server: ${stringify(message)}`
564+
);
565+
this.ws.send(encoded);
566+
} else {
567+
stdbLogger(
568+
'trace',
569+
() => `Queuing message to server: ${stringify(message)}`
570+
);
571+
// use slice() to copy, in case the clientMessageEncoder's buffer gets used
572+
this.#outboundQueue.push(encoded.slice());
573+
}
575574
}
576575

577576
#nextEventId(): string {
@@ -978,11 +977,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
978977
* ```
979978
*/
980979
disconnect(): void {
981-
this.wsPromise.then(wsResolved => {
982-
if (wsResolved) {
983-
wsResolved.close();
984-
}
985-
});
980+
this.wsPromise.then(ws => ws?.close());
986981
}
987982

988983
private on(

crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,55 @@
11
import { decompress } from './decompress';
22
import { resolveWS } from './ws';
33

4-
export class WebsocketDecompressAdapter {
5-
onclose?: (...ev: any[]) => void;
6-
onopen?: (...ev: any[]) => void;
7-
onmessage?: (msg: { data: Uint8Array }) => void;
8-
onerror?: (msg: ErrorEvent) => void;
9-
10-
#ws: WebSocket;
11-
12-
async #handleOnMessage(msg: MessageEvent) {
13-
const buffer = new Uint8Array(msg.data);
14-
let decompressed: Uint8Array;
15-
16-
if (buffer[0] === 0) {
17-
decompressed = buffer.slice(1);
18-
} else if (buffer[0] === 1) {
19-
throw new Error(
20-
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
21-
);
22-
} else if (buffer[0] === 2) {
23-
decompressed = await decompress(buffer.slice(1), 'gzip');
24-
} else {
25-
throw new Error(
26-
'Unexpected Compression Algorithm. Please use `gzip` or `none`'
27-
);
28-
}
4+
export interface WebsocketAdapter {
5+
send(msg: Uint8Array): void;
6+
close(): void;
7+
8+
set onclose(handler: (ev: CloseEvent) => void);
9+
set onopen(handler: () => void);
10+
set onmessage(handler: (msg: { data: Uint8Array }) => void);
11+
set onerror(handler: (msg: ErrorEvent) => void);
12+
}
2913

30-
this.onmessage?.({ data: decompressed });
14+
export class WebsocketDecompressAdapter implements WebsocketAdapter {
15+
set onclose(handler: (ev: CloseEvent) => void) {
16+
this.#ws.onclose = handler;
3117
}
32-
33-
#handleOnOpen(msg: any) {
34-
this.onopen?.(msg);
18+
set onopen(handler: () => void) {
19+
this.#ws.onopen = handler;
3520
}
36-
37-
#handleOnError(msg: any) {
38-
this.onerror?.(msg);
21+
set onmessage(handler: (msg: { data: Uint8Array }) => void) {
22+
this.#ws.onmessage = async (msg: MessageEvent<ArrayBuffer>) => {
23+
const data = await this.#decompress(new Uint8Array(msg.data));
24+
handler({ data });
25+
};
26+
}
27+
set onerror(handler: (msg: ErrorEvent) => void) {
28+
this.#ws.onerror = handler as (msg: Event) => void;
3929
}
4030

41-
#handleOnClose(msg: any) {
42-
this.onclose?.(msg);
31+
#ws: WebSocket;
32+
33+
async #decompress(buffer: Uint8Array): Promise<Uint8Array> {
34+
const tag = buffer[0];
35+
const data = buffer.subarray(1);
36+
switch (tag) {
37+
case 0:
38+
return data;
39+
case 1:
40+
throw new Error(
41+
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
42+
);
43+
case 2:
44+
return await decompress(data, 'gzip');
45+
default:
46+
throw new Error(
47+
'Unexpected Compression Algorithm. Please use `gzip` or `none`'
48+
);
49+
}
4350
}
4451

45-
send(msg: any): void {
52+
send(msg: Uint8Array): void {
4653
this.#ws.send(msg);
4754
}
4855

@@ -51,16 +58,6 @@ export class WebsocketDecompressAdapter {
5158
}
5259

5360
constructor(ws: WebSocket) {
54-
this.onmessage = undefined;
55-
this.onopen = undefined;
56-
this.onmessage = undefined;
57-
this.onerror = undefined;
58-
59-
ws.onmessage = this.#handleOnMessage.bind(this);
60-
ws.onerror = this.#handleOnError.bind(this);
61-
ws.onclose = this.#handleOnClose.bind(this);
62-
ws.onopen = this.#handleOnOpen.bind(this);
63-
6461
ws.binaryType = 'arraybuffer';
6562

6663
this.#ws = ws;

crates/bindings-typescript/src/sdk/websocket_test_adapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { BinaryReader, BinaryWriter } from '../';
22
import { ClientMessage, ServerMessage } from './client_api/types';
3+
import type { WebsocketAdapter } from './websocket_decompress_adapter';
34

4-
class WebsocketTestAdapter {
5+
class WebsocketTestAdapter implements WebsocketAdapter {
56
onclose: any;
67
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
7-
onopen!: Function;
8+
onopen!: () => void;
89
onmessage: any;
910
onerror: any;
1011

pnpm-lock.yaml

Lines changed: 1 addition & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

templates/keynote-2/src/connectors/convex.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export default function convex(
8484

8585
const root: RpcConnector = {
8686
name: 'convex',
87+
maxInflightPerWorker: 16,
8788

8889
async open() {},
8990
async close() {},

templates/keynote-2/src/connectors/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import planetscale_pg_rpc from './rpc/planetscale_pg_rpc.ts';
1010
export const CONNECTORS = {
1111
convex,
1212
spacetimedb,
13+
spacetimedbRustClient: spacetimedb,
1314
bun,
1415
postgres_rpc,
1516
cockroach_rpc,

templates/keynote-2/src/connectors/spacetimedb.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ export function spacetimedb(
134134

135135
return {
136136
name: 'spacetimedb',
137+
maxInflightPerWorker: 16384,
137138

138139
async open() {
139140
try {
@@ -178,14 +179,14 @@ export function spacetimedb(
178179
return worker;
179180
},
180181

181-
async reducer(fn: string, args: Record<string, any>) {
182+
async call(fn: string, args: Record<string, any>) {
182183
await ready;
183184

184185
switch (fn) {
185186
case 'seed': {
186187
conn.reducers.seed({
187-
n: args.n,
188-
initialBalance: args.initial_balance,
188+
n: args.accounts,
189+
initialBalance: args.initialBalance,
189190
});
190191
return;
191192
}
@@ -261,7 +262,7 @@ export function spacetimedb(
261262
return;
262263
}
263264

264-
let initial = BigInt(rawInitial);
265+
const initial = BigInt(rawInitial);
265266

266267
const accounts = conn.db?.accounts;
267268
if (!accounts) {

0 commit comments

Comments
 (0)