From a118c4b3115dcf1ccf669598141011789d614c49 Mon Sep 17 00:00:00 2001 From: Tal Leibman Date: Fri, 27 Feb 2026 15:42:49 +0200 Subject: [PATCH 1/2] Use per-task Futures instead of shared return queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids potential result/caller mismatch when multiple threads call call_function concurrently—each caller now blocks on its own Future. --- src/py/kaleido/_sync_server.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/py/kaleido/_sync_server.py b/src/py/kaleido/_sync_server.py index 740bd902..50eb804a 100644 --- a/src/py/kaleido/_sync_server.py +++ b/src/py/kaleido/_sync_server.py @@ -6,7 +6,8 @@ from functools import partial from queue import Queue from threading import Thread -from typing import TYPE_CHECKING, NamedTuple +from typing import TYPE_CHECKING, NamedTuple, Any +from concurrent.futures import Future from .kaleido import Kaleido @@ -18,6 +19,7 @@ class Task(NamedTuple): fn: str args: Any kwargs: Any + future:Future[Any] class _BadFunctionName(BaseException): @@ -37,11 +39,10 @@ async def _server(self, *args, **kwargs): if not hasattr(k, task.fn): raise _BadFunctionName(f"Kaleido has no attribute {task.fn}") try: - self._return_queue.put( - await getattr(k, task.fn)(*task.args, **task.kwargs), - ) + result = await getattr(k, task.fn)(*task.args, **task.kwargs) + task.future.set_result(result) except Exception as e: # noqa: BLE001 - self._return_queue.put(e) + task.future.set_exception(e) self._task_queue.task_done() @@ -72,7 +73,6 @@ def open(self, *args: Any, silence_warnings=False, **kwargs: Any) -> None: daemon=True, ) self._task_queue: Queue[Task | None] = Queue() - self._return_queue: Queue[Any] = Queue() self._thread.start() self._initialized = True close = partial(self.close, silence_warnings=True) @@ -92,7 +92,6 @@ def close(self, *, silence_warnings=False): self._thread.join() del self._thread del self._task_queue - del self._return_queue self._initialized = False def call_function(self, cmd: str, *args: Any, **kwargs: Any): @@ -117,13 +116,9 @@ def call_function(self, cmd: str, *args: Any, **kwargs: Any): UserWarning, stacklevel=3, ) - self._task_queue.put(Task(cmd, args, kwargs)) - self._task_queue.join() - res = self._return_queue.get() - if isinstance(res, BaseException): - raise res - else: - return res + future:Future[Any] = Future() + self._task_queue.put(Task(cmd, args, kwargs,future)) + return future.result() def oneshot_async_run( From 8810b2f607c7dcf189fff9053d3ae7cc9c972f96 Mon Sep 17 00:00:00 2001 From: Tal Leibman Date: Wed, 29 Apr 2026 17:52:02 +0300 Subject: [PATCH 2/2] ruff format --- src/py/kaleido/_sync_server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/py/kaleido/_sync_server.py b/src/py/kaleido/_sync_server.py index 50eb804a..2d420df6 100644 --- a/src/py/kaleido/_sync_server.py +++ b/src/py/kaleido/_sync_server.py @@ -3,11 +3,11 @@ import asyncio import atexit import warnings +from concurrent.futures import Future from functools import partial from queue import Queue from threading import Thread -from typing import TYPE_CHECKING, NamedTuple, Any -from concurrent.futures import Future +from typing import TYPE_CHECKING, Any, NamedTuple from .kaleido import Kaleido @@ -19,7 +19,7 @@ class Task(NamedTuple): fn: str args: Any kwargs: Any - future:Future[Any] + future: Future[Any] class _BadFunctionName(BaseException): @@ -116,8 +116,8 @@ def call_function(self, cmd: str, *args: Any, **kwargs: Any): UserWarning, stacklevel=3, ) - future:Future[Any] = Future() - self._task_queue.put(Task(cmd, args, kwargs,future)) + future: Future[Any] = Future() + self._task_queue.put(Task(cmd, args, kwargs, future)) return future.result()