Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/bindings-typescript/src/lib/binary_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export default class BinaryWriter {
this.buffer = typeof init === 'number' ? new ResizableBuffer(init) : init;
}

clear() {
this.offset = 0;
}

reset(buffer: ResizableBuffer) {
this.buffer = buffer;
this.offset = 0;
Expand Down
81 changes: 38 additions & 43 deletions crates/bindings-typescript/src/sdk/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import {
type PendingCallback,
type TableUpdate as CacheTableUpdate,
} from './table_cache.ts';
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts';
import type { WebsocketTestAdapter } from './websocket_test_adapter.ts';
import {
WebsocketDecompressAdapter,
type WebsocketAdapter,
} from './websocket_decompress_adapter.ts';
import {
SubscriptionBuilderImpl,
SubscriptionHandleImpl,
Expand Down Expand Up @@ -146,7 +148,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
#eventId = 0;
#emitter: EventEmitter<ConnectionEvent>;
#messageQueue = Promise.resolve();
#outboundQueue: ClientMessage[] = [];
#outboundQueue: Uint8Array[] = [];
#subscriptionManager = new SubscriptionManager<RemoteModule>();
#remoteModule: RemoteModule;
#reducerCallbacks = new Map<
Expand All @@ -171,10 +173,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
// private fields.
// We use them in testing.
private clientCache: ClientCache<RemoteModule>;
private ws?: WebsocketDecompressAdapter | WebsocketTestAdapter;
private wsPromise: Promise<
WebsocketDecompressAdapter | WebsocketTestAdapter | undefined
>;
private ws?: WebsocketAdapter;
private wsPromise: Promise<WebsocketAdapter | undefined>;

constructor({
uri,
Expand Down Expand Up @@ -302,6 +302,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
#makeReducers(def: RemoteModule): ReducersView<RemoteModule> {
const out: Record<string, unknown> = {};

const writer = new BinaryWriter(1024);

for (const reducer of def.reducers) {
const reducerName = reducer.name;
const key = reducer.accessorName;
Expand All @@ -310,7 +312,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
this.#reducerArgsSerializers[reducerName];

(out as any)[key] = (params: InferTypeOfRow<typeof reducer.params>) => {
const writer = new BinaryWriter(1024);
writer.clear();
serializeArgs(writer, params);
const argsBuffer = writer.getBuffer();
return this.callReducer(reducerName, argsBuffer, params);
Expand All @@ -323,6 +325,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
#makeProcedures(def: RemoteModule): ProceduresView<RemoteModule> {
const out: Record<string, unknown> = {};

const writer = new BinaryWriter(1024);

for (const procedure of def.procedures) {
const procedureName = procedure.name;
const key = procedure.accessorName;
Expand All @@ -333,7 +337,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
(out as any)[key] = (
params: InferTypeOfRow<typeof procedure.params>
): Promise<any> => {
const writer = new BinaryWriter(1024);
writer.clear();
serializeArgs(writer, params);
const argsBuffer = writer.getBuffer();
return this.callProcedure(procedureName, argsBuffer).then(returnBuf => {
Expand Down Expand Up @@ -537,41 +541,36 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
return this.#mergeTableUpdates(updates);
}

#sendEncoded(
wsResolved: WebsocketDecompressAdapter | WebsocketTestAdapter,
message: ClientMessage
): void {
stdbLogger(
'trace',
() => `Sending message to server: ${stringify(message)}`
);
const writer = new BinaryWriter(1024);
ClientMessage.serialize(writer, message);
const encoded = writer.getBuffer();
wsResolved.send(encoded);
}

#flushOutboundQueue(
wsResolved: WebsocketDecompressAdapter | WebsocketTestAdapter
): void {
if (!this.isActive || this.#outboundQueue.length === 0) {
return;
}
#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
const pending = this.#outboundQueue.splice(0);
for (const message of pending) {
this.#sendEncoded(wsResolved, message);
wsResolved.send(message);
}
}

#clientMessageEncoder = new BinaryWriter(1024);
#sendMessage(message: ClientMessage): void {
this.wsPromise.then(wsResolved => {
if (!wsResolved || !this.isActive) {
this.#outboundQueue.push(message);
return;
}
this.#flushOutboundQueue(wsResolved);
this.#sendEncoded(wsResolved, message);
});
const writer = this.#clientMessageEncoder;
writer.clear();
ClientMessage.serialize(writer, message);
const encoded = writer.getBuffer();

if (this.ws && this.isActive) {
if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws);

stdbLogger(
'trace',
() => `Sending message to server: ${stringify(message)}`
);
this.ws.send(encoded);
} else {
stdbLogger(
'trace',
() => `Queuing message to server: ${stringify(message)}`
);
// use slice() to copy, in case the clientMessageEncoder's buffer gets used
this.#outboundQueue.push(encoded.slice());
}
}

#nextEventId(): string {
Expand Down Expand Up @@ -978,11 +977,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
* ```
*/
disconnect(): void {
this.wsPromise.then(wsResolved => {
if (wsResolved) {
wsResolved.close();
}
});
this.wsPromise.then(ws => ws?.close());
}

private on(
Expand Down
87 changes: 42 additions & 45 deletions crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,55 @@
import { decompress } from './decompress';
import { resolveWS } from './ws';

export class WebsocketDecompressAdapter {
onclose?: (...ev: any[]) => void;
onopen?: (...ev: any[]) => void;
onmessage?: (msg: { data: Uint8Array }) => void;
onerror?: (msg: ErrorEvent) => void;

#ws: WebSocket;

async #handleOnMessage(msg: MessageEvent) {
const buffer = new Uint8Array(msg.data);
let decompressed: Uint8Array;

if (buffer[0] === 0) {
decompressed = buffer.slice(1);
} else if (buffer[0] === 1) {
throw new Error(
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
);
} else if (buffer[0] === 2) {
decompressed = await decompress(buffer.slice(1), 'gzip');
} else {
throw new Error(
'Unexpected Compression Algorithm. Please use `gzip` or `none`'
);
}
export interface WebsocketAdapter {
send(msg: Uint8Array): void;
close(): void;

set onclose(handler: (ev: CloseEvent) => void);
set onopen(handler: () => void);
set onmessage(handler: (msg: { data: Uint8Array }) => void);
set onerror(handler: (msg: ErrorEvent) => void);
}

this.onmessage?.({ data: decompressed });
export class WebsocketDecompressAdapter implements WebsocketAdapter {
set onclose(handler: (ev: CloseEvent) => void) {
this.#ws.onclose = handler;
}

#handleOnOpen(msg: any) {
this.onopen?.(msg);
set onopen(handler: () => void) {
this.#ws.onopen = handler;
}

#handleOnError(msg: any) {
this.onerror?.(msg);
set onmessage(handler: (msg: { data: Uint8Array }) => void) {
this.#ws.onmessage = async (msg: MessageEvent<ArrayBuffer>) => {
const data = await this.#decompress(new Uint8Array(msg.data));
handler({ data });
};
}
set onerror(handler: (msg: ErrorEvent) => void) {
this.#ws.onerror = handler as (msg: Event) => void;
}

#handleOnClose(msg: any) {
this.onclose?.(msg);
#ws: WebSocket;

async #decompress(buffer: Uint8Array): Promise<Uint8Array> {
const tag = buffer[0];
const data = buffer.subarray(1);
switch (tag) {
case 0:
return data;
case 1:
throw new Error(
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
);
case 2:
return await decompress(data, 'gzip');
default:
throw new Error(
'Unexpected Compression Algorithm. Please use `gzip` or `none`'
);
}
}

send(msg: any): void {
send(msg: Uint8Array): void {
this.#ws.send(msg);
}

Expand All @@ -51,16 +58,6 @@ export class WebsocketDecompressAdapter {
}

constructor(ws: WebSocket) {
this.onmessage = undefined;
this.onopen = undefined;
this.onmessage = undefined;
this.onerror = undefined;

ws.onmessage = this.#handleOnMessage.bind(this);
ws.onerror = this.#handleOnError.bind(this);
ws.onclose = this.#handleOnClose.bind(this);
ws.onopen = this.#handleOnOpen.bind(this);

ws.binaryType = 'arraybuffer';

this.#ws = ws;
Expand Down
5 changes: 3 additions & 2 deletions crates/bindings-typescript/src/sdk/websocket_test_adapter.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { BinaryReader, BinaryWriter } from '../';
import { ClientMessage, ServerMessage } from './client_api/types';
import type { WebsocketAdapter } from './websocket_decompress_adapter';

class WebsocketTestAdapter {
class WebsocketTestAdapter implements WebsocketAdapter {
onclose: any;
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
onopen!: Function;
onopen!: () => void;
onmessage: any;
onerror: any;

Expand Down
45 changes: 2 additions & 43 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion templates/keynote-2/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"drizzle-orm": "^0.44.7",
"express": "^5.1.0",
"hdr-histogram-js": "^3.0.1",
"spacetimedb": "^2.0",
"spacetimedb": "workspace:^",
"sql.js": "^1.13.0",
"undici": "^6.19.2"
}
Expand Down
1 change: 1 addition & 0 deletions templates/keynote-2/src/connectors/convex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export default function convex(

const root: RpcConnector = {
name: 'convex',
maxInflightPerWorker: 16,

async open() {},
async close() {},
Expand Down
1 change: 1 addition & 0 deletions templates/keynote-2/src/connectors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import planetscale_pg_rpc from './rpc/planetscale_pg_rpc.ts';
export const CONNECTORS = {
convex,
spacetimedb,
spacetimedbRustClient: spacetimedb,
bun,
postgres_rpc,
cockroach_rpc,
Expand Down
Loading
Loading