From 2e927ce19c06f11d066aad37f07cd586316fa552 Mon Sep 17 00:00:00 2001 From: Noa Date: Fri, 27 Feb 2026 15:33:05 -0600 Subject: [PATCH] Bring typescript benchmark client to parity with rust --- .../src/lib/binary_writer.ts | 4 + .../src/sdk/db_connection_impl.ts | 81 ++-- .../src/sdk/websocket_decompress_adapter.ts | 87 ++-- .../src/sdk/websocket_test_adapter.ts | 5 +- pnpm-lock.yaml | 45 +- templates/keynote-2/package.json | 2 +- templates/keynote-2/src/connectors/convex.ts | 1 + templates/keynote-2/src/connectors/index.ts | 1 + .../keynote-2/src/connectors/spacetimedb.ts | 9 +- templates/keynote-2/src/core/connectors.ts | 4 +- templates/keynote-2/src/core/runner.ts | 406 +++++++++--------- templates/keynote-2/src/demo.ts | 88 ++-- .../src/scenario_recipes/reducer_single.ts | 4 +- .../src/scenario_recipes/rpc_single_call.ts | 2 +- 14 files changed, 359 insertions(+), 380 deletions(-) diff --git a/crates/bindings-typescript/src/lib/binary_writer.ts b/crates/bindings-typescript/src/lib/binary_writer.ts index 144427284c5..a66310745ce 100644 --- a/crates/bindings-typescript/src/lib/binary_writer.ts +++ b/crates/bindings-typescript/src/lib/binary_writer.ts @@ -42,6 +42,10 @@ export default class BinaryWriter { this.buffer = typeof init === 'number' ? new ResizableBuffer(init) : init; } + clear() { + this.offset = 0; + } + reset(buffer: ResizableBuffer) { this.buffer = buffer; this.offset = 0; diff --git a/crates/bindings-typescript/src/sdk/db_connection_impl.ts b/crates/bindings-typescript/src/sdk/db_connection_impl.ts index 0af455c4d44..c2c26123bfe 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_impl.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_impl.ts @@ -37,8 +37,10 @@ import { type PendingCallback, type TableUpdate as CacheTableUpdate, } from './table_cache.ts'; -import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts'; -import type { WebsocketTestAdapter } from './websocket_test_adapter.ts'; +import { + WebsocketDecompressAdapter, + type WebsocketAdapter, +} from './websocket_decompress_adapter.ts'; import { SubscriptionBuilderImpl, SubscriptionHandleImpl, @@ -146,7 +148,7 @@ export class DbConnectionImpl #eventId = 0; #emitter: EventEmitter; #messageQueue = Promise.resolve(); - #outboundQueue: ClientMessage[] = []; + #outboundQueue: Uint8Array[] = []; #subscriptionManager = new SubscriptionManager(); #remoteModule: RemoteModule; #reducerCallbacks = new Map< @@ -171,10 +173,8 @@ export class DbConnectionImpl // private fields. // We use them in testing. private clientCache: ClientCache; - private ws?: WebsocketDecompressAdapter | WebsocketTestAdapter; - private wsPromise: Promise< - WebsocketDecompressAdapter | WebsocketTestAdapter | undefined - >; + private ws?: WebsocketAdapter; + private wsPromise: Promise; constructor({ uri, @@ -302,6 +302,8 @@ export class DbConnectionImpl #makeReducers(def: RemoteModule): ReducersView { const out: Record = {}; + const writer = new BinaryWriter(1024); + for (const reducer of def.reducers) { const reducerName = reducer.name; const key = reducer.accessorName; @@ -310,7 +312,7 @@ export class DbConnectionImpl this.#reducerArgsSerializers[reducerName]; (out as any)[key] = (params: InferTypeOfRow) => { - const writer = new BinaryWriter(1024); + writer.clear(); serializeArgs(writer, params); const argsBuffer = writer.getBuffer(); return this.callReducer(reducerName, argsBuffer, params); @@ -323,6 +325,8 @@ export class DbConnectionImpl #makeProcedures(def: RemoteModule): ProceduresView { const out: Record = {}; + const writer = new BinaryWriter(1024); + for (const procedure of def.procedures) { const procedureName = procedure.name; const key = procedure.accessorName; @@ -333,7 +337,7 @@ export class DbConnectionImpl (out as any)[key] = ( params: InferTypeOfRow ): Promise => { - const writer = new BinaryWriter(1024); + writer.clear(); serializeArgs(writer, params); const argsBuffer = writer.getBuffer(); return this.callProcedure(procedureName, argsBuffer).then(returnBuf => { @@ -537,41 +541,36 @@ export class DbConnectionImpl return this.#mergeTableUpdates(updates); } - #sendEncoded( - wsResolved: WebsocketDecompressAdapter | WebsocketTestAdapter, - message: ClientMessage - ): void { - stdbLogger( - 'trace', - () => `Sending message to server: ${stringify(message)}` - ); - const writer = new BinaryWriter(1024); - ClientMessage.serialize(writer, message); - const encoded = writer.getBuffer(); - wsResolved.send(encoded); - } - - #flushOutboundQueue( - wsResolved: WebsocketDecompressAdapter | WebsocketTestAdapter - ): void { - if (!this.isActive || this.#outboundQueue.length === 0) { - return; - } + #flushOutboundQueue(wsResolved: WebsocketAdapter): void { const pending = this.#outboundQueue.splice(0); for (const message of pending) { - this.#sendEncoded(wsResolved, message); + wsResolved.send(message); } } + #clientMessageEncoder = new BinaryWriter(1024); #sendMessage(message: ClientMessage): void { - this.wsPromise.then(wsResolved => { - if (!wsResolved || !this.isActive) { - this.#outboundQueue.push(message); - return; - } - this.#flushOutboundQueue(wsResolved); - this.#sendEncoded(wsResolved, message); - }); + const writer = this.#clientMessageEncoder; + writer.clear(); + ClientMessage.serialize(writer, message); + const encoded = writer.getBuffer(); + + if (this.ws && this.isActive) { + if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws); + + stdbLogger( + 'trace', + () => `Sending message to server: ${stringify(message)}` + ); + this.ws.send(encoded); + } else { + stdbLogger( + 'trace', + () => `Queuing message to server: ${stringify(message)}` + ); + // use slice() to copy, in case the clientMessageEncoder's buffer gets used + this.#outboundQueue.push(encoded.slice()); + } } #nextEventId(): string { @@ -978,11 +977,7 @@ export class DbConnectionImpl * ``` */ disconnect(): void { - this.wsPromise.then(wsResolved => { - if (wsResolved) { - wsResolved.close(); - } - }); + this.wsPromise.then(ws => ws?.close()); } private on( diff --git a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts index 1edfdc6d0d6..40157393dd1 100644 --- a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts @@ -1,48 +1,55 @@ import { decompress } from './decompress'; import { resolveWS } from './ws'; -export class WebsocketDecompressAdapter { - onclose?: (...ev: any[]) => void; - onopen?: (...ev: any[]) => void; - onmessage?: (msg: { data: Uint8Array }) => void; - onerror?: (msg: ErrorEvent) => void; - - #ws: WebSocket; - - async #handleOnMessage(msg: MessageEvent) { - const buffer = new Uint8Array(msg.data); - let decompressed: Uint8Array; - - if (buffer[0] === 0) { - decompressed = buffer.slice(1); - } else if (buffer[0] === 1) { - throw new Error( - 'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.' - ); - } else if (buffer[0] === 2) { - decompressed = await decompress(buffer.slice(1), 'gzip'); - } else { - throw new Error( - 'Unexpected Compression Algorithm. Please use `gzip` or `none`' - ); - } +export interface WebsocketAdapter { + send(msg: Uint8Array): void; + close(): void; + + set onclose(handler: (ev: CloseEvent) => void); + set onopen(handler: () => void); + set onmessage(handler: (msg: { data: Uint8Array }) => void); + set onerror(handler: (msg: ErrorEvent) => void); +} - this.onmessage?.({ data: decompressed }); +export class WebsocketDecompressAdapter implements WebsocketAdapter { + set onclose(handler: (ev: CloseEvent) => void) { + this.#ws.onclose = handler; } - - #handleOnOpen(msg: any) { - this.onopen?.(msg); + set onopen(handler: () => void) { + this.#ws.onopen = handler; } - - #handleOnError(msg: any) { - this.onerror?.(msg); + set onmessage(handler: (msg: { data: Uint8Array }) => void) { + this.#ws.onmessage = async (msg: MessageEvent) => { + const data = await this.#decompress(new Uint8Array(msg.data)); + handler({ data }); + }; + } + set onerror(handler: (msg: ErrorEvent) => void) { + this.#ws.onerror = handler as (msg: Event) => void; } - #handleOnClose(msg: any) { - this.onclose?.(msg); + #ws: WebSocket; + + async #decompress(buffer: Uint8Array): Promise { + const tag = buffer[0]; + const data = buffer.subarray(1); + switch (tag) { + case 0: + return data; + case 1: + throw new Error( + 'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.' + ); + case 2: + return await decompress(data, 'gzip'); + default: + throw new Error( + 'Unexpected Compression Algorithm. Please use `gzip` or `none`' + ); + } } - send(msg: any): void { + send(msg: Uint8Array): void { this.#ws.send(msg); } @@ -51,16 +58,6 @@ export class WebsocketDecompressAdapter { } constructor(ws: WebSocket) { - this.onmessage = undefined; - this.onopen = undefined; - this.onmessage = undefined; - this.onerror = undefined; - - ws.onmessage = this.#handleOnMessage.bind(this); - ws.onerror = this.#handleOnError.bind(this); - ws.onclose = this.#handleOnClose.bind(this); - ws.onopen = this.#handleOnOpen.bind(this); - ws.binaryType = 'arraybuffer'; this.#ws = ws; diff --git a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts index f5aa35ba610..6ac15f0e7fe 100644 --- a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts @@ -1,10 +1,11 @@ import { BinaryReader, BinaryWriter } from '../'; import { ClientMessage, ServerMessage } from './client_api/types'; +import type { WebsocketAdapter } from './websocket_decompress_adapter'; -class WebsocketTestAdapter { +class WebsocketTestAdapter implements WebsocketAdapter { onclose: any; // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type - onopen!: Function; + onopen!: () => void; onmessage: any; onerror: any; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6fc62ac21a7..195c53848e4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -468,8 +468,8 @@ importers: specifier: ^3.0.1 version: 3.0.1 spacetimedb: - specifier: ^2.0 - version: 2.0.1(@angular/core@21.1.4(@angular/compiler@21.1.4)(rxjs@7.8.2))(@tanstack/react-query@5.90.19(react@19.2.4))(react@19.2.4)(svelte@5.46.4)(undici@6.21.3)(vue@3.5.26(typescript@5.9.3)) + specifier: workspace:^ + version: link:../../crates/bindings-typescript sql.js: specifier: ^1.13.0 version: 1.14.0 @@ -12872,29 +12872,6 @@ packages: space-separated-tokens@2.0.2: resolution: {integrity: sha512-PEGlAwrG8yXGXRjW32fGbg66JAlOAwbObuqVoJpv/mRgoWDQfgH1wDPvtzWyUSNAXBGSk8h755YDbbcEy3SH2Q==} - spacetimedb@2.0.1: - resolution: {integrity: sha512-19YkLz1P+JQDlYvlegDsn3YF4OsE2Pih31e5Xx/kobTExKwx/AtFWRUSA/EiRWvMV9jaIWzMhnsVfG+1fJ39Aw==} - peerDependencies: - '@angular/core': '>=17.0.0' - '@tanstack/react-query': ^5.0.0 - react: ^18.0.0 || ^19.0.0-0 || ^19.0.0 - svelte: ^4.0.0 || ^5.0.0 - undici: ^6.19.2 - vue: ^3.3.0 - peerDependenciesMeta: - '@angular/core': - optional: true - '@tanstack/react-query': - optional: true - react: - optional: true - svelte: - optional: true - undici: - optional: true - vue: - optional: true - spdx-correct@3.2.0: resolution: {integrity: sha512-kN9dJbvnySHULIluDHy32WHRUu3Og7B9sbY7tsFLctQkIqnMh3hErYgdMjTYuqmcXX+lK5T1lnUt3G7zNswmZA==} @@ -30811,24 +30788,6 @@ snapshots: space-separated-tokens@2.0.2: {} - spacetimedb@2.0.1(@angular/core@21.1.4(@angular/compiler@21.1.4)(rxjs@7.8.2))(@tanstack/react-query@5.90.19(react@19.2.4))(react@19.2.4)(svelte@5.46.4)(undici@6.21.3)(vue@3.5.26(typescript@5.9.3)): - dependencies: - base64-js: 1.5.1 - headers-polyfill: 4.0.3 - object-inspect: 1.13.4 - prettier: 3.6.2 - pure-rand: 7.0.1 - safe-stable-stringify: 2.5.0 - statuses: 2.0.2 - url-polyfill: 1.1.14 - optionalDependencies: - '@angular/core': 21.1.4(@angular/compiler@21.1.4)(rxjs@7.8.2) - '@tanstack/react-query': 5.90.19(react@19.2.4) - react: 19.2.4 - svelte: 5.46.4 - undici: 6.21.3 - vue: 3.5.26(typescript@5.9.3) - spdx-correct@3.2.0: dependencies: spdx-expression-parse: 3.0.1 diff --git a/templates/keynote-2/package.json b/templates/keynote-2/package.json index c19cf90fd43..345e2e80a08 100644 --- a/templates/keynote-2/package.json +++ b/templates/keynote-2/package.json @@ -36,7 +36,7 @@ "drizzle-orm": "^0.44.7", "express": "^5.1.0", "hdr-histogram-js": "^3.0.1", - "spacetimedb": "^2.0", + "spacetimedb": "workspace:^", "sql.js": "^1.13.0", "undici": "^6.19.2" } diff --git a/templates/keynote-2/src/connectors/convex.ts b/templates/keynote-2/src/connectors/convex.ts index f60643aa1ac..37922f3e736 100644 --- a/templates/keynote-2/src/connectors/convex.ts +++ b/templates/keynote-2/src/connectors/convex.ts @@ -84,6 +84,7 @@ export default function convex( const root: RpcConnector = { name: 'convex', + maxInflightPerWorker: 16, async open() {}, async close() {}, diff --git a/templates/keynote-2/src/connectors/index.ts b/templates/keynote-2/src/connectors/index.ts index 14a57e4d360..bd50a6b6d83 100644 --- a/templates/keynote-2/src/connectors/index.ts +++ b/templates/keynote-2/src/connectors/index.ts @@ -10,6 +10,7 @@ import planetscale_pg_rpc from './rpc/planetscale_pg_rpc.ts'; export const CONNECTORS = { convex, spacetimedb, + spacetimedbRustClient: spacetimedb, bun, postgres_rpc, cockroach_rpc, diff --git a/templates/keynote-2/src/connectors/spacetimedb.ts b/templates/keynote-2/src/connectors/spacetimedb.ts index 8c6c397aa0b..6837a6d098d 100644 --- a/templates/keynote-2/src/connectors/spacetimedb.ts +++ b/templates/keynote-2/src/connectors/spacetimedb.ts @@ -134,6 +134,7 @@ export function spacetimedb( return { name: 'spacetimedb', + maxInflightPerWorker: 16384, async open() { try { @@ -178,14 +179,14 @@ export function spacetimedb( return worker; }, - async reducer(fn: string, args: Record) { + async call(fn: string, args: Record) { await ready; switch (fn) { case 'seed': { conn.reducers.seed({ - n: args.n, - initialBalance: args.initial_balance, + n: args.accounts, + initialBalance: args.initialBalance, }); return; } @@ -261,7 +262,7 @@ export function spacetimedb( return; } - let initial = BigInt(rawInitial); + const initial = BigInt(rawInitial); const accounts = conn.db?.accounts; if (!accounts) { diff --git a/templates/keynote-2/src/core/connectors.ts b/templates/keynote-2/src/core/connectors.ts index a6767474b45..77b9120dca9 100644 --- a/templates/keynote-2/src/core/connectors.ts +++ b/templates/keynote-2/src/core/connectors.ts @@ -8,6 +8,8 @@ } | null>; verify(): Promise; + maxInflightPerWorker?: number; + createWorker?(opts: { index: number; total: number }): Promise; } @@ -19,7 +21,7 @@ export interface SqlConnector extends BaseConnector { } export interface ReducerConnector extends BaseConnector { - reducer(name: string, args?: Record): Promise; + call(name: string, args?: Record): Promise; } export interface RpcConnector extends BaseConnector { diff --git a/templates/keynote-2/src/core/runner.ts b/templates/keynote-2/src/core/runner.ts index f61efff28c0..1df76da0563 100644 --- a/templates/keynote-2/src/core/runner.ts +++ b/templates/keynote-2/src/core/runner.ts @@ -4,6 +4,7 @@ import { pickTwoDistinct, zipfSampler } from './zipf.ts'; import { getSpacetimeCommittedTransfers } from './spacetimeMetrics.ts'; import { makeCollisionTracker } from './collision_tracker.ts'; import { RunResult } from './types.ts'; +import { BaseConnector } from './connectors.ts'; const OP_TIMEOUT_MS = Number(process.env.BENCH_OP_TIMEOUT_MS ?? '15000'); const MIN_OP_TIMEOUT_MS = Number(process.env.MIN_OP_TIMEOUT_MS ?? '250'); @@ -57,18 +58,9 @@ export async function runOne({ accounts, alpha, }: { - connector: { - name: string; - open(workers?: number): Promise; - close: () => Promise; - verify: () => Promise; - createWorker?: (opts?: { - index: number; - total: number; - }) => Promise; - } & Record; + connector: BaseConnector; scenario: ( - conn: unknown, + conn: BaseConnector, from: number, to: number, amount: number, @@ -90,19 +82,13 @@ export async function runOne({ numberOfSignificantValueDigits: 3, }); - const hasWorkerFactory = - typeof (connector as any).createWorker === 'function'; + const { createWorker } = connector; - const workers: unknown[] = []; + const workers: BaseConnector[] = []; - if (hasWorkerFactory) { + if (createWorker) { await connector.open(concurrency); - const createWorker = (connector as any).createWorker as (opts?: { - index: number; - total: number; - }) => Promise; - for (let i = 0; i < concurrency; i++) { const workerConn = await createWorker({ index: i, total: concurrency }); workers.push(workerConn); @@ -162,222 +148,250 @@ export async function runOne({ `[${connector.name}] precomputed ${transferPairs.count} pairs in ${(precomputeElapsedMs / 1000).toFixed(2)}s`, ); - const start = performance.now(); - const endAt = start + seconds * 1000; - - let completedWithinWindow = 0; - let completedTotal = 0; + const getEnvTernary = (envVal: string | undefined) => { + switch (envVal) { + case '0': + return false; + case '1': + return true; + default: + return null; + } + }; - const PIPELINED = process.env.BENCH_PIPELINED === '1'; + const PIPELINED = + getEnvTernary(process.env.BENCH_PIPELINED) ?? + !!connector.maxInflightPerWorker; const MAX_INFLIGHT_ENV = process.env.MAX_INFLIGHT_PER_WORKER; const MAX_INFLIGHT_PER_WORKER = - MAX_INFLIGHT_ENV === '0' ? Infinity : Number(MAX_INFLIGHT_ENV ?? '8'); + MAX_INFLIGHT_ENV == null + ? (connector.maxInflightPerWorker ?? 8) + : MAX_INFLIGHT_ENV === '0' + ? Infinity + : Number(MAX_INFLIGHT_ENV); console.log( `[${connector.name}] max inflight per worker: ${MAX_INFLIGHT_PER_WORKER}`, ); - - // Track when workers reach end of test window (before waiting for in-flight ops) - let workersReachedEnd = 0; - let resolveTestWindowEnd: () => void; - const testWindowEndPromise = new Promise((resolve) => { - resolveTestWindowEnd = resolve; - }); - - function signalWorkerReachedEnd() { - workersReachedEnd++; - if (workersReachedEnd >= concurrency) { - resolveTestWindowEnd(); - } - } - - async function worker(workerIndex: number) { - const conn = workers[workerIndex]; - const pairsPerWorker = Math.max( - 1, - Math.floor(transferPairs.count / concurrency), - ); - let pairIndex = workerIndex * pairsPerWorker; - - const nextTransferPair = (): [number, number] => { - if (pairIndex >= transferPairs.count) { - pairIndex = 0; + const run = async (seconds: number) => { + const start = performance.now(); + const endAt = start + seconds * 1000; + + let completedWithinWindow = 0; + let completedTotal = 0; + + // Track when workers reach end of test window (before waiting for in-flight ops) + let workersReachedEnd = 0; + let resolveTestWindowEnd: () => void; + const testWindowEndPromise = new Promise((resolve) => { + resolveTestWindowEnd = resolve; + }); + + function signalWorkerReachedEnd() { + workersReachedEnd++; + if (workersReachedEnd >= concurrency) { + resolveTestWindowEnd(); } + } - const from = transferPairs.from[pairIndex]!; - const to = transferPairs.to[pairIndex]!; - pairIndex++; - return [from, to]; - }; + async function worker(workerIndex: number) { + const conn = workers[workerIndex]; + const pairsPerWorker = Math.max( + 1, + Math.floor(transferPairs.count / concurrency), + ); + let pairIndex = workerIndex * pairsPerWorker; - // non-pipelined - if (!PIPELINED) { - while (true) { - const now = performance.now(); - if (now >= endAt) break; + const nextTransferPair = (): [number, number] => { + if (pairIndex >= transferPairs.count) { + pairIndex = 0; + } - const timeLeft = endAt - now; - const dynamicTimeout = Math.max( - MIN_OP_TIMEOUT_MS, - Math.min(OP_TIMEOUT_MS, timeLeft + TAIL_SLACK_MS), - ); + const from = transferPairs.from[pairIndex]!; + const to = transferPairs.to[pairIndex]!; + pairIndex++; + return [from, to]; + }; + + // non-pipelined + if (!PIPELINED) { + while (true) { + const now = performance.now(); + if (now >= endAt) break; + + const timeLeft = endAt - now; + const dynamicTimeout = Math.max( + MIN_OP_TIMEOUT_MS, + Math.min(OP_TIMEOUT_MS, timeLeft + TAIL_SLACK_MS), + ); - const [from, to] = nextTransferPair(); + const [from, to] = nextTransferPair(); - collisionTracker.begin(from); - collisionTracker.begin(to); + collisionTracker.begin(from); + collisionTracker.begin(to); - const t0 = performance.now(); - let ok = false; - try { - await withOpTimeout( - scenario(conn as unknown, from, to, 1), - `${connector.name} scenario ${from}->${to}`, - dynamicTimeout, - ); - ok = true; - } catch (err) { - if (process.env.LOG_ERRORS === '1') { - const msg = - err instanceof Error - ? `${err.name}: ${err.message}` - : String(err); - console.warn( - `[${connector.name}] Scenario failed for ${from} -> ${to}: ${msg}`, + const t0 = performance.now(); + let ok = false; + try { + await withOpTimeout( + scenario(conn, from, to, 1), + `${connector.name} scenario ${from}->${to}`, + dynamicTimeout, ); + ok = true; + } catch (err) { + if (process.env.LOG_ERRORS === '1') { + const msg = + err instanceof Error + ? `${err.name}: ${err.message}` + : String(err); + console.warn( + `[${connector.name}] Scenario failed for ${from} -> ${to}: ${msg}`, + ); + } + } finally { + collisionTracker.end(from); + collisionTracker.end(to); } - } finally { - collisionTracker.end(from); - collisionTracker.end(to); - } - const t1 = performance.now(); - if (ok) { - completedTotal++; - if (t1 <= endAt) { - completedWithinWindow++; - hist.recordValue(Math.max(1, Math.round((t1 - t0) * 1e3))); + const t1 = performance.now(); + if (ok) { + completedTotal++; + if (t1 <= endAt) { + completedWithinWindow++; + hist.recordValue(Math.max(1, Math.round((t1 - t0) * 1e3))); + } } } + signalWorkerReachedEnd(); + return; } - signalWorkerReachedEnd(); - return; - } - // pipelined - const inflight = new Set>(); - const unlimitedInflight = !Number.isFinite(MAX_INFLIGHT_PER_WORKER); + // pipelined + const inflight = new Set>(); + const unlimitedInflight = !Number.isFinite(MAX_INFLIGHT_PER_WORKER); - const launchOp = (dynamicTimeout: number) => { - const [from, to] = nextTransferPair(); + const launchOp = (dynamicTimeout: number) => { + const [from, to] = nextTransferPair(); - collisionTracker.begin(from); - collisionTracker.begin(to); + collisionTracker.begin(from); + collisionTracker.begin(to); - const p = (async () => { - const t0 = performance.now(); - try { - await withOpTimeout( - scenario(conn as unknown, from, to, 1), - `${connector.name} scenario ${from}->${to}`, - dynamicTimeout, - ); - const t1 = performance.now(); - completedTotal++; - if (t1 <= endAt) { - completedWithinWindow++; - hist.recordValue(Math.max(1, Math.round((t1 - t0) * 1e3))); - } - } catch (err) { - if (process.env.LOG_ERRORS === '1') { - const msg = - err instanceof Error - ? `${err instanceof Error ? err.message : String(err)}` - : String(err); - console.warn( - `[${connector.name}] Scenario failed for ${from} -> ${to}: ${msg}`, + const p = (async () => { + const t0 = performance.now(); + try { + await withOpTimeout( + scenario(conn, from, to, 1), + `${connector.name} scenario ${from}->${to}`, + dynamicTimeout, ); + const t1 = performance.now(); + completedTotal++; + if (t1 <= endAt) { + completedWithinWindow++; + hist.recordValue(Math.max(1, Math.round((t1 - t0) * 1e3))); + } + } catch (err) { + if (process.env.LOG_ERRORS === '1') { + const msg = + err instanceof Error + ? `${err instanceof Error ? err.message : String(err)}` + : String(err); + console.warn( + `[${connector.name}] Scenario failed for ${from} -> ${to}: ${msg}`, + ); + } + } finally { + collisionTracker.end(from); + collisionTracker.end(to); } - } finally { - collisionTracker.end(from); - collisionTracker.end(to); - } - })(); - - inflight.add(p); - p.finally(() => { - inflight.delete(p); - }); - }; - - while (true) { - const now = performance.now(); - if (now >= endAt) break; - - const timeLeft = endAt - now; - const dynamicTimeout = Math.max( - MIN_OP_TIMEOUT_MS, - Math.min(OP_TIMEOUT_MS, timeLeft + TAIL_SLACK_MS), - ); + })(); - if (unlimitedInflight || inflight.size < MAX_INFLIGHT_PER_WORKER) { - launchOp(dynamicTimeout); - } else { - await new Promise((resolve) => setTimeout(resolve, 0)); - } - } + inflight.add(p); + p.finally(() => { + inflight.delete(p); + }); + }; - // Signal that this worker has reached end of test window - signalWorkerReachedEnd(); + while (true) { + const now = performance.now(); + if (now >= endAt) break; - await Promise.all(inflight); - } + const timeLeft = endAt - now; + const dynamicTimeout = Math.max( + MIN_OP_TIMEOUT_MS, + Math.min(OP_TIMEOUT_MS, timeLeft + TAIL_SLACK_MS), + ); - console.log(`[${connector.name}] Starting workers for ${seconds}s run...`); + if (unlimitedInflight || inflight.size < MAX_INFLIGHT_PER_WORKER) { + launchOp(dynamicTimeout); + } else { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + } - // Start all workers - they run in parallel - const workerPromises = Array.from({ length: concurrency }, (_, i) => - worker(i), - ); + // Signal that this worker has reached end of test window + signalWorkerReachedEnd(); - // Wait for all workers to reach end of test window (before they wait for in-flight ops) - await testWindowEndPromise; + await Promise.all(inflight); + } - const testWindowEndTime = performance.now(); - console.log( - `[${connector.name}] Test window ended at ${((testWindowEndTime - start) / 1000).toFixed(2)}s; capturing metrics...`, - ); + // Start all workers - they run in parallel + const workerPromises = Array.from({ length: concurrency }, (_, i) => + worker(i), + ); - // Capture metrics immediately when test window ends - let committedDelta: number | null = null; + // Wait for all workers to reach end of test window (before they wait for in-flight ops) + await testWindowEndPromise; - if (useSpacetimeMetrics && beforeTransfers !== null) { - try { - const afterTransfers = await getSpacetimeCommittedTransfers(); - if (afterTransfers !== null && afterTransfers >= beforeTransfers) { - const deltaBig = afterTransfers - beforeTransfers; - const maxSafe = BigInt(Number.MAX_SAFE_INTEGER); - committedDelta = - deltaBig <= maxSafe ? Number(deltaBig) : Number(maxSafe); + const testWindowEndTime = performance.now(); + console.log( + `[${connector.name}] Test window ended at ${((testWindowEndTime - start) / 1000).toFixed(2)}s; capturing metrics...`, + ); - console.log( - `[spacetimedb] metrics at test window end: committed transfer txns = ${afterTransfers.toString()} (delta = ${deltaBig.toString()})`, - ); - } else { + // Capture metrics immediately when test window ends + let committedDelta: number | null = null; + + if (useSpacetimeMetrics && beforeTransfers !== null) { + try { + const afterTransfers = await getSpacetimeCommittedTransfers(); + if (afterTransfers !== null && afterTransfers >= beforeTransfers) { + const deltaBig = afterTransfers - beforeTransfers; + const maxSafe = BigInt(Number.MAX_SAFE_INTEGER); + committedDelta = + deltaBig <= maxSafe ? Number(deltaBig) : Number(maxSafe); + + console.log( + `[spacetimedb] metrics at test window end: committed transfer txns = ${afterTransfers.toString()} (delta = ${deltaBig.toString()})`, + ); + } else { + console.warn( + '[spacetimedb] metrics at test window end missing or decreased; ignoring metrics delta', + ); + } + } catch (err) { console.warn( - '[spacetimedb] metrics at test window end missing or decreased; ignoring metrics delta', + '[spacetimedb] failed to read metrics at test window end; ignoring metrics delta:', + err, ); } - } catch (err) { - console.warn( - '[spacetimedb] failed to read metrics at test window end; ignoring metrics delta:', - err, - ); } - } - // Now wait for all workers to fully complete (including in-flight ops) - await Promise.all(workerPromises); + // Now wait for all workers to fully complete (including in-flight ops) + await Promise.all(workerPromises); + + return { start, completedWithinWindow, completedTotal, committedDelta }; + }; + + const warmUpSeconds = 5; + console.log(`[${connector.name}] Warming up for ${warmUpSeconds}s...`); + await run(warmUpSeconds); + console.log(`[${connector.name}] Finished warmup.`); + + console.log(`[${connector.name}] Starting workers for ${seconds}s run...`); + + const { start, completedWithinWindow, completedTotal, committedDelta } = + await run(seconds); console.log( `[${connector.name}] All workers finished (including in-flight ops)`, @@ -392,7 +406,7 @@ export async function runOne({ } } - if (hasWorkerFactory) { + if (createWorker) { for (const w of workers) { const c = w as { close?: () => Promise }; if (typeof c.close === 'function') { diff --git a/templates/keynote-2/src/demo.ts b/templates/keynote-2/src/demo.ts index 7a4a27e61d5..42dc21d7f8b 100644 --- a/templates/keynote-2/src/demo.ts +++ b/templates/keynote-2/src/demo.ts @@ -3,7 +3,7 @@ import { execSync } from 'node:child_process'; import { mkdir, writeFile } from 'node:fs/promises'; import { createConnection } from 'node:net'; import { join } from 'node:path'; -import { CONNECTORS } from './connectors'; +import { ConnectorKey, CONNECTORS } from './connectors'; import { runOne } from './core/runner'; import { initConvex } from './init/init_convex'; import { sh } from './init/utils'; @@ -74,7 +74,7 @@ const concurrency = getArg('concurrency', 10); const alpha = getArg('alpha', 0.5); const systems = getStringArg('systems', 'convex,spacetimedb') .split(',') - .map((s) => s.trim()); + .map((s) => s.trim()) as ConnectorKey[]; const skipPrep = hasFlag('skip-prep'); const noAnimation = hasFlag('no-animation'); @@ -157,6 +157,11 @@ const serviceConfigs: Record = { healthCheck: async () => spacetimePing(), startCmd: 'spacetime start', }, + spacetimedbRustClient: { + name: 'SpacetimeDB', + healthCheck: async () => spacetimePing(), + startCmd: 'spacetime start', + }, convex: { name: 'Convex', healthCheck: () => ping(3210), @@ -223,8 +228,8 @@ async function checkService(system: string): Promise { // Prep / Seed // ============================================================================ -async function prepSystem(system: string): Promise { - const connector = (CONNECTORS as any)[system]; +async function prepSystem(system: ConnectorKey): Promise { + const connector = CONNECTORS[system]; if (!connector) { console.log(` ${system.padEnd(15)} ${c('yellow', '⚠ SKIPPED')}`); return; @@ -233,10 +238,10 @@ async function prepSystem(system: string): Promise { const spinner = createSpinner(system.padEnd(15)); try { - if (system === 'spacetimedb') { + if (system === 'spacetimedb' || system == 'spacetimedbRustClient') { const moduleName = process.env.STDB_MODULE || 'test-1'; const server = process.env.STDB_SERVER || 'local'; - const server2 = process.env.STDB_SERVER || 'http://localhost:3000'; + // const server2 = process.env.STDB_SERVER || 'http://localhost:3000'; const modulePath = process.env.STDB_MODULE_PATH || './spacetimedb'; // Publish module (creates DB if needed, updates if exists) @@ -248,21 +253,13 @@ async function prepSystem(system: string): Promise { '--module-path', modulePath, ]); - await sh('cargo', [ - 'run', - //"--quiet", - "--manifest-path", - "spacetimedb-rust-client/Cargo.toml", - "--", - "seed", - //"--quiet", + await sh('spacetime', [ + 'call', '--server', - server2, - "--module", + server, moduleName, - "--accounts", + 'seed', String(accounts), - "--initial-balance", String(initialBalance), ]); console.log('[spacetimedb] seed complete.'); @@ -289,8 +286,10 @@ interface BenchResult { tps: number; } -async function runBenchmarkOther(system: string): Promise { - const connectorFactory = (CONNECTORS as any)[system]; +async function runBenchmarkOther( + system: ConnectorKey, +): Promise { + const connectorFactory = CONNECTORS[system]; if (!connectorFactory) { console.log(` ${system}: Unknown connector`); return null; @@ -322,26 +321,26 @@ async function runBenchmarkStdb(): Promise { await sh('cargo', [ 'run', //"--quiet", - "--manifest-path", - "spacetimedb-rust-client/Cargo.toml", - "--", - "bench", + '--manifest-path', + 'spacetimedb-rust-client/Cargo.toml', + '--', + 'bench', //"--quiet", '--server', server2, - "--module", + '--module', moduleName, - "--duration", + '--duration', `${seconds}s`, - "--connections", + '--connections', String(concurrency), - "--alpha", + '--alpha', String(alpha), - "--tps-write-path", - "spacetimedb-tps.tmp.log", + '--tps-write-path', + 'spacetimedb-tps.tmp.log', ]); - const tpsStr = fs.readFileSync("spacetimedb-tps.tmp.log", 'utf-8').trim(); + const tpsStr = fs.readFileSync('spacetimedb-tps.tmp.log', 'utf-8').trim(); const tps = Number(tpsStr); if (isNaN(tps)) { console.warn(`[spacetimedb] Failed to parse TPS from file: ${tpsStr}`); @@ -349,13 +348,13 @@ async function runBenchmarkStdb(): Promise { } return { - system: "spacetimedb", + system: 'spacetimedb', tps: Math.round(tps), }; } -async function runBenchmark(system: string): Promise { - if (system === 'spacetimedb') { +async function runBenchmark(system: ConnectorKey): Promise { + if (system === 'spacetimedbRustClient') { return await runBenchmarkStdb(); } else { return await runBenchmarkOther(system); @@ -413,7 +412,12 @@ async function displayResults(results: BenchResult[]): Promise { const fastest = results[0]; const slowest = results[results.length - 1]; - if (fastest && slowest && fastest.system !== slowest.system && slowest.tps > 0) { + if ( + fastest && + slowest && + fastest.system !== slowest.system && + slowest.tps > 0 + ) { const multiplier = Math.round(fastest.tps / slowest.tps); console.log(''); @@ -435,11 +439,11 @@ async function displayResults(results: BenchResult[]): Promise { console.log(' ' + c('cyan', '║') + ' '.repeat(boxWidth) + c('cyan', '║')); console.log( ' ' + - c('cyan', '║') + - ' '.repeat(msgPadding) + - c('bold', c('green', msgWithEmoji)) + - ' '.repeat(rightPadding) + - c('cyan', '║'), + c('cyan', '║') + + ' '.repeat(msgPadding) + + c('bold', c('green', msgWithEmoji)) + + ' '.repeat(rightPadding) + + c('cyan', '║'), ); console.log(' ' + c('cyan', '║') + ' '.repeat(boxWidth) + c('cyan', '║')); console.log(' ' + c('cyan', '╚' + '═'.repeat(boxWidth) + '╝')); @@ -492,8 +496,8 @@ async function main() { } else { console.log( '\n' + - c('bold', ' [2/4] Preparing databases...') + - c('dim', ' (skipped)\n'), + c('bold', ' [2/4] Preparing databases...') + + c('dim', ' (skipped)\n'), ); } diff --git a/templates/keynote-2/src/scenario_recipes/reducer_single.ts b/templates/keynote-2/src/scenario_recipes/reducer_single.ts index 15a0bc34e9c..df51b348a16 100644 --- a/templates/keynote-2/src/scenario_recipes/reducer_single.ts +++ b/templates/keynote-2/src/scenario_recipes/reducer_single.ts @@ -1,14 +1,14 @@ import type { ReducerConnector } from '../core/connectors'; export async function reducer_single( - conn: unknown, + conn: ReducerConnector, from: number, to: number, amount: number, ): Promise { if (from === to || amount <= 0) return; - await (conn as ReducerConnector).reducer('transfer', { + await conn.call('transfer', { from, to, amount: BigInt(amount), diff --git a/templates/keynote-2/src/scenario_recipes/rpc_single_call.ts b/templates/keynote-2/src/scenario_recipes/rpc_single_call.ts index 37b133fb002..10d09b64483 100644 --- a/templates/keynote-2/src/scenario_recipes/rpc_single_call.ts +++ b/templates/keynote-2/src/scenario_recipes/rpc_single_call.ts @@ -1,7 +1,7 @@ import type { RpcConnector } from '../core/connectors'; export async function rpc_single_call( - conn: unknown, + conn: RpcConnector, from: number, to: number, amount: number,