From 315bcb6122604c18e6531edd232c84ee59ea38f9 Mon Sep 17 00:00:00 2001 From: j4n Date: Thu, 5 Mar 2026 16:03:46 +0100 Subject: [PATCH 1/4] Make cmping importable as a library Fix several roadblocks preventing usage of cmping as library: Changes: - Add CMPingError exception, replace sys.exit(1) with raise CMPingError() - Add collection of result tuples in Pinger.results - Return times of account setup, group join and message time from Pinger - Add accounts_dir parameter to perform_ping() to allow callers to isolate concurrent probes in separate DB directories, avoiding the deltachat-rpc-server exclusive lock - Add timeout support to Pinger --- cmping.py | 80 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 14 deletions(-) diff --git a/cmping.py b/cmping.py index a7e6a76..fb977a7 100644 --- a/cmping.py +++ b/cmping.py @@ -39,6 +39,12 @@ from deltachat_rpc_client import DeltaChat, EventType, Rpc from xdg_base_dirs import xdg_cache_home + +class CMPingError(Exception): + """Raised when cmping encounters a non-recoverable error during probing.""" + pass + + # Spinner characters for progress display SPINNER_CHARS = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] @@ -192,7 +198,11 @@ def main(): if not args.relay2: args.relay2 = args.relay1 - pinger = perform_ping(args) + try: + pinger = perform_ping(args) + except CMPingError as e: + print(f"\r✗ {e}") + raise SystemExit(1) expected_total = pinger.sent * args.numrecipients raise SystemExit(0 if pinger.received == expected_total else 1) @@ -298,8 +308,7 @@ def setup_accounts(args, sender_maker, receiver_maker): profiles_created += 1 print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created) except Exception as e: - print(f"\r✗ Failed to setup sender profile on {args.relay1}: {e}") - sys.exit(1) + raise CMPingError(f"Failed to setup sender profile on {args.relay1}: {e}") from e # Create receiver accounts receivers = [] @@ -310,8 +319,9 @@ def setup_accounts(args, sender_maker, receiver_maker): profiles_created += 1 print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created) except Exception as e: - print(f"\r✗ Failed to setup receiver profile {i+1} on {args.relay2}: {e}") - sys.exit(1) + raise CMPingError( + f"Failed to setup receiver profile {i+1} on {args.relay2}: {e}" + ) from e # Profile setup complete print_progress("Setting up profiles", done=True) @@ -531,8 +541,9 @@ def wait_online_thread(): wait_thread.join() if online_error: - print(f"\n✗ Timeout or error waiting for profiles to be online: {online_error}") - sys.exit(1) + raise CMPingError( + f"Timeout or error waiting for profiles to be online: {online_error}" + ) from online_error print_progress("Waiting for profiles to be online", done=True) @@ -572,15 +583,23 @@ def wait_online_thread(maker): t.join() if online_errors: - print(f"\n✗ Timeout or error waiting for profiles to be online: {online_errors[0]}") - sys.exit(1) + raise CMPingError( + f"Timeout or error waiting for profiles to be online: {online_errors[0]}" + ) from online_errors[0] print_progress("Waiting for profiles to be online", done=True) -def perform_ping(args): +def perform_ping(args, accounts_dir=None, timeout=None): """Main ping execution function with timing measurements. + Args: + args: Namespace with relay1, relay2, count, interval, verbose, + numrecipients, reset attributes. + accounts_dir: Optional base directory for account storage. + Defaults to $XDG_CACHE_HOME/cmping. Override this to isolate + concurrent probes (each needs its own DB to avoid locking). + Timing Phases: 1. account_setup_time: Time to create and configure all accounts 2. group_join_time: Time for all receivers to join the group @@ -588,8 +607,17 @@ def perform_ping(args): Returns: Pinger: The pinger object with results + Also has account_setup_time, group_join_time, message_time (float, seconds) + and results list of (seq, ms_duration, receiver_idx) tuples. + + Raises: + CMPingError: On account setup or connectivity failures. """ - base_accounts_dir = xdg_cache_home().joinpath("cmping") + if accounts_dir is not None: + from pathlib import Path + base_accounts_dir = Path(accounts_dir) + else: + base_accounts_dir = xdg_cache_home().joinpath("cmping") # Determine unique relays being tested. Using a set to deduplicate when # relay1 == relay2 (same relay testing), so we only create one RPC context. @@ -660,6 +688,8 @@ def perform_ping(args): message_start = time.time() pinger = Pinger(args, sender, group, receivers) + if timeout is not None: + pinger.deadline = time.time() + timeout received = {} # Track current sequence for output formatting current_seq = None @@ -670,6 +700,7 @@ def perform_ping(args): if seq not in received: received[seq] = [] received[seq].append(ms_duration) + pinger.results.append((seq, ms_duration, receiver_idx)) # Track timing for this sequence if seq not in seq_tracking: @@ -753,6 +784,11 @@ def perform_ping(args): recv_rate = pinger.received / message_time print(f"recv rate: {recv_rate:.2f} msg/s") + # Store timing data on pinger + pinger.account_setup_time = account_setup_time + pinger.group_join_time = group_join_time + pinger.message_time = message_time + return pinger finally: # Clean up all RPC contexts @@ -803,9 +839,17 @@ def __init__(self, args, sender, group, receivers): ) ALPHANUMERIC = string.ascii_lowercase + string.digits self.tx = "".join(random.choices(ALPHANUMERIC, k=30)) - t = threading.Thread(target=self.send_pings, daemon=True) self.sent = 0 self.received = 0 + self.results = [] # list of (seq, ms_duration, receiver_idx) + self.account_setup_time = 0.0 + self.group_join_time = 0.0 + self.message_time = 0.0 + # Optional wall-clock deadline for the messaging phase. When set, + # send_pings() stops sending and receive() stops waiting at this time. + # Set externally (e.g. by perform_ping) after setup phases complete. + self.deadline = None + t = threading.Thread(target=self.send_pings, daemon=True) t.start() @property @@ -818,15 +862,21 @@ def send_pings(self): Each message contains: unique_id timestamp sequence_number Flow: Sender -> SMTP relay1 -> IMAP relay2 -> All receivers + + Respects self.deadline: stops sending early when the wall clock passes + the deadline so we don't fire pings we'll never wait for. """ for seq in range(self.args.count): + if self.deadline is not None and time.time() >= self.deadline: + break text = f"{self.tx} {time.time():.4f} {seq:17}" self.group.send_text(text) self.sent += 1 time.sleep(self.args.interval) # we sent all pings, let's wait a bit, then force quit if main didn't finish - time.sleep(60) - os.kill(os.getpid(), signal.SIGINT) + if self.deadline is None: + time.sleep(60) + os.kill(os.getpid(), signal.SIGINT) def receive(self): """Receive ping messages from all receivers. @@ -867,6 +917,8 @@ def receiver_thread(receiver_idx, receiver): threads.append(t) while num_pending > 0: + if self.deadline is not None and time.time() >= self.deadline: + break try: receiver_idx, receiver, event = event_queue.get(timeout=1.0) if event is None: From 6013f717e01fc8deba1c97b8e5ca5b503c8e149a Mon Sep 17 00:00:00 2001 From: j4n Date: Wed, 11 Mar 2026 17:43:10 +0100 Subject: [PATCH 2/4] Add timeout support to wait_all_online and improve thread cleanup Previously wait_all_online blocked indefinitely on wait_for_event(), making it impossible to enforce a timeout from the caller. Now it accepts a timeout parameter and polls the account event queue with a 1s granularity, raising CMPingError if the deadline is exceeded. The timeout is propagated throughout. Thread cleanup improvements: - Store Pinger._send_thread so it can be joined after the ping loop preventing the thread from running orphaned after the deadline - Wrap receiver thread joins in a try/finally block in Pinger.receive() (cmping.py:985) so threads are always cleaned up even if the deadline fires or an exception occurs --- cmping.py | 129 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 57 deletions(-) diff --git a/cmping.py b/cmping.py index fb977a7..551748e 100644 --- a/cmping.py +++ b/cmping.py @@ -36,7 +36,7 @@ from dataclasses import dataclass from statistics import stdev -from deltachat_rpc_client import DeltaChat, EventType, Rpc +from deltachat_rpc_client import AttrDict, DeltaChat, EventType, Rpc from xdg_base_dirs import xdg_cache_home @@ -221,12 +221,20 @@ def _log_event(self, event, addr): else: print(f" {event.kind} [{addr}]") - def wait_all_online(self): + def wait_all_online(self, timeout=None): + deadline = time.time() + timeout if timeout is not None else None remaining = list(self.online) while remaining: ac = remaining.pop() + eq = ac._rpc.get_queue(ac.id) while True: - event = ac.wait_for_event() + if deadline is not None and time.time() >= deadline: + addr = ac.get_config("addr") + raise CMPingError(f"Timeout waiting for {addr} to come online") + try: + event = AttrDict(eq.get(timeout=1.0)) + except queue.Empty: + continue if event.kind == EventType.IMAP_INBOX_IDLE: if self.verbose >= 3: addr = ac.get_config("addr") @@ -505,11 +513,12 @@ def wait_for_receiver_join(idx, receiver, deadline): return len(joined_receivers) -def wait_profiles_online(maker): +def wait_profiles_online(maker, timeout=None): """Wait for all profiles to be online with spinner progress. Args: maker: AccountMaker instance with accounts to wait for + timeout: Optional seconds before giving up and raising CMPingError Raises: SystemExit: If waiting for profiles fails @@ -521,7 +530,7 @@ def wait_profiles_online(maker): def wait_online_thread(): nonlocal online_error try: - maker.wait_all_online() + maker.wait_all_online(timeout=timeout) except Exception as e: online_error = e finally: @@ -548,11 +557,12 @@ def wait_online_thread(): print_progress("Waiting for profiles to be online", done=True) -def wait_profiles_online_multi(makers): +def wait_profiles_online_multi(makers, timeout=None): """Wait for all profiles to be online with spinner progress. Args: makers: List of AccountMaker instances with accounts to wait for + timeout: Optional seconds before giving up and raising CMPingError Raises: SystemExit: If waiting for profiles fails @@ -561,7 +571,7 @@ def wait_profiles_online_multi(makers): def wait_online_thread(maker): try: - maker.wait_all_online() + maker.wait_all_online(timeout=timeout) except Exception as e: online_errors.append(e) @@ -669,7 +679,7 @@ def perform_ping(args, accounts_dir=None, timeout=None): # Wait for all accounts to be online with timeout feedback # Combine all makers for waiting all_makers = [relay_contexts[r].maker for r in relays] - wait_profiles_online_multi(all_makers) + wait_profiles_online_multi(all_makers, timeout=timeout) account_setup_time = time.time() - account_setup_start @@ -746,6 +756,7 @@ def perform_ping(args, accounts_dir=None, timeout=None): except KeyboardInterrupt: pass + pinger._send_thread.join(timeout=2.0) message_time = time.time() - message_start if current_seq is not None: @@ -849,8 +860,8 @@ def __init__(self, args, sender, group, receivers): # send_pings() stops sending and receive() stops waiting at this time. # Set externally (e.g. by perform_ping) after setup phases complete. self.deadline = None - t = threading.Thread(target=self.send_pings, daemon=True) - t.start() + self._send_thread = threading.Thread(target=self.send_pings, daemon=True) + self._send_thread.start() @property def loss(self): @@ -916,55 +927,59 @@ def receiver_thread(receiver_idx, receiver): t.start() threads.append(t) - while num_pending > 0: - if self.deadline is not None and time.time() >= self.deadline: - break - try: - receiver_idx, receiver, event = event_queue.get(timeout=1.0) - if event is None: - continue - - if event.kind == EventType.INCOMING_MSG: - msg = receiver.get_message_by_id(event.msg_id) - text = msg.get_snapshot().text - parts = text.strip().split() - if len(parts) == 3 and parts[0] == self.tx: - seq = int(parts[2]) - if seq not in received_by_receiver: - received_by_receiver[seq] = set() - if receiver_idx not in received_by_receiver[seq]: - ms_duration = (time.time() - float(parts[1])) * 1000 - self.received += 1 - num_pending -= 1 - received_by_receiver[seq].add(receiver_idx) - yield seq, ms_duration, len(text), receiver_idx - start_clock = time.time() + try: + while num_pending > 0: + if self.deadline is not None and time.time() >= self.deadline: + break + try: + receiver_idx, receiver, event = event_queue.get(timeout=1.0) + if event is None: + continue + + if event.kind == EventType.INCOMING_MSG: + msg = receiver.get_message_by_id(event.msg_id) + text = msg.get_snapshot().text + parts = text.strip().split() + if len(parts) == 3 and parts[0] == self.tx: + seq = int(parts[2]) + if seq not in received_by_receiver: + received_by_receiver[seq] = set() + if receiver_idx not in received_by_receiver[seq]: + ms_duration = (time.time() - float(parts[1])) * 1000 + self.received += 1 + num_pending -= 1 + received_by_receiver[seq].add(receiver_idx) + yield seq, ms_duration, len(text), receiver_idx + start_clock = time.time() + elif self.args.verbose >= 3: + # Log non-ping messages at verbose level 3 + receiver_addr = self.receivers_addrs[receiver_idx] + ellipsis = "..." if len(text) > 50 else "" + print( + f" [{receiver_addr}] INCOMING_MSG (non-ping): {text[:50]}{ellipsis}" + ) + elif event.kind == EventType.ERROR and self.args.verbose >= 1: + print(f"✗ ERROR: {event.msg}") + elif event.kind == EventType.MSG_FAILED and self.args.verbose >= 1: + msg = receiver.get_message_by_id(event.msg_id) + text = msg.get_snapshot().text + print(f"✗ Message failed: {text}") + elif ( + event.kind in (EventType.INFO, EventType.WARNING) + and self.args.verbose >= 1 + ): + ms_now = (time.time() - start_clock) * 1000 + print(f"INFO {ms_now:07.1f}ms: {event.msg}") elif self.args.verbose >= 3: - # Log non-ping messages at verbose level 3 + # Log all other events at verbose level 3 receiver_addr = self.receivers_addrs[receiver_idx] - ellipsis = "..." if len(text) > 50 else "" - print( - f" [{receiver_addr}] INCOMING_MSG (non-ping): {text[:50]}{ellipsis}" - ) - elif event.kind == EventType.ERROR and self.args.verbose >= 1: - print(f"✗ ERROR: {event.msg}") - elif event.kind == EventType.MSG_FAILED and self.args.verbose >= 1: - msg = receiver.get_message_by_id(event.msg_id) - text = msg.get_snapshot().text - print(f"✗ Message failed: {text}") - elif ( - event.kind in (EventType.INFO, EventType.WARNING) - and self.args.verbose >= 1 - ): - ms_now = (time.time() - start_clock) * 1000 - print(f"INFO {ms_now:07.1f}ms: {event.msg}") - elif self.args.verbose >= 3: - # Log all other events at verbose level 3 - receiver_addr = self.receivers_addrs[receiver_idx] - log_event_verbose(event, receiver_addr) - except queue.Empty: - # Timeout occurred, check if we should continue - continue + log_event_verbose(event, receiver_addr) + except queue.Empty: + # Timeout occurred, check if we should continue + continue + finally: + for t in threads: + t.join(timeout=2.0) if __name__ == "__main__": From c3871105a99e5f42ad5180ce1e349f20b3818253 Mon Sep 17 00:00:00 2001 From: j4n Date: Thu, 12 Mar 2026 10:34:57 +0100 Subject: [PATCH 3/4] Pinger.receive(): fix receiver_thread leak via timeout-based poll receiver_thread previously called receiver.wait_for_event(), which internally blocks on queue.Queue.get() with no timeout. After Rpc.close() terminates the rpc-server subprocess and stops the events_loop, nothing puts a sentinel in the account event queues, so every receiver_thread blocked indefinitely. With N^2 pairs and multiple probe rounds, these daemon threads accumulated in threading._dangling. When a new probe tried to fork deltachat-rpc-server, _after_fork called threads.update(_dangling) in the child, which OOM'd because the set had grown to thousands of entries. Symptoms: MemoryError + "can't start new thread" mid-round. Fix: replace the blocking wait_for_event() call with a account_queue.get(timeout=1.0) loop that checks stop_event. The finally block in receive() sets stop_event before joining, so threads exit within ~1s instead of blocking until process exit. --- cmping.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/cmping.py b/cmping.py index 551748e..44650c7 100644 --- a/cmping.py +++ b/cmping.py @@ -907,12 +907,20 @@ def receive(self): # Create a queue to collect events from all receivers event_queue = queue.Queue() + stop_event = threading.Event() + def receiver_thread(receiver_idx, receiver): """Thread function to listen to events from a single receiver.""" - while True: + # Use a timeout-based poll so the thread exits promptly when + # stop_event is set, rather than blocking indefinitely on + # queue.get() inside wait_for_event() and leaking across rounds. + account_queue = receiver._rpc.get_queue(receiver.id) + while not stop_event.is_set(): try: - event = receiver.wait_for_event() - event_queue.put((receiver_idx, receiver, event)) + item = account_queue.get(timeout=1.0) + event_queue.put((receiver_idx, receiver, AttrDict(item))) + except queue.Empty: + continue except Exception: # If there's an error, put it in the queue event_queue.put((receiver_idx, receiver, None)) @@ -978,6 +986,7 @@ def receiver_thread(receiver_idx, receiver): # Timeout occurred, check if we should continue continue finally: + stop_event.set() for t in threads: t.join(timeout=2.0) From 46ffa78049eaf713c9aca2dd6cfb04a9033ddc44 Mon Sep 17 00:00:00 2001 From: j4n Date: Thu, 12 Mar 2026 15:29:29 +0100 Subject: [PATCH 4/4] add uv.lock to gitignore --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 6e1054d..275b353 100644 --- a/.gitignore +++ b/.gitignore @@ -108,6 +108,10 @@ ipython_config.py # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock + +# similar for uv.lock +uv.lock + # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/#use-with-ide