diff --git a/.gitignore b/.gitignore index 6e1054d..275b353 100644 --- a/.gitignore +++ b/.gitignore @@ -108,6 +108,10 @@ ipython_config.py # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock + +# similar for uv.lock +uv.lock + # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/#use-with-ide diff --git a/cmping.py b/cmping.py index a7e6a76..44650c7 100644 --- a/cmping.py +++ b/cmping.py @@ -36,9 +36,15 @@ from dataclasses import dataclass from statistics import stdev -from deltachat_rpc_client import DeltaChat, EventType, Rpc +from deltachat_rpc_client import AttrDict, DeltaChat, EventType, Rpc from xdg_base_dirs import xdg_cache_home + +class CMPingError(Exception): + """Raised when cmping encounters a non-recoverable error during probing.""" + pass + + # Spinner characters for progress display SPINNER_CHARS = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] @@ -192,7 +198,11 @@ def main(): if not args.relay2: args.relay2 = args.relay1 - pinger = perform_ping(args) + try: + pinger = perform_ping(args) + except CMPingError as e: + print(f"\r✗ {e}") + raise SystemExit(1) expected_total = pinger.sent * args.numrecipients raise SystemExit(0 if pinger.received == expected_total else 1) @@ -211,12 +221,20 @@ def _log_event(self, event, addr): else: print(f" {event.kind} [{addr}]") - def wait_all_online(self): + def wait_all_online(self, timeout=None): + deadline = time.time() + timeout if timeout is not None else None remaining = list(self.online) while remaining: ac = remaining.pop() + eq = ac._rpc.get_queue(ac.id) while True: - event = ac.wait_for_event() + if deadline is not None and time.time() >= deadline: + addr = ac.get_config("addr") + raise CMPingError(f"Timeout waiting for {addr} to come online") + try: + event = AttrDict(eq.get(timeout=1.0)) + except queue.Empty: + continue if event.kind == EventType.IMAP_INBOX_IDLE: if self.verbose >= 3: addr = ac.get_config("addr") @@ -298,8 +316,7 @@ def setup_accounts(args, sender_maker, receiver_maker): profiles_created += 1 print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created) except Exception as e: - print(f"\r✗ Failed to setup sender profile on {args.relay1}: {e}") - sys.exit(1) + raise CMPingError(f"Failed to setup sender profile on {args.relay1}: {e}") from e # Create receiver accounts receivers = [] @@ -310,8 +327,9 @@ def setup_accounts(args, sender_maker, receiver_maker): profiles_created += 1 print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created) except Exception as e: - print(f"\r✗ Failed to setup receiver profile {i+1} on {args.relay2}: {e}") - sys.exit(1) + raise CMPingError( + f"Failed to setup receiver profile {i+1} on {args.relay2}: {e}" + ) from e # Profile setup complete print_progress("Setting up profiles", done=True) @@ -495,11 +513,12 @@ def wait_for_receiver_join(idx, receiver, deadline): return len(joined_receivers) -def wait_profiles_online(maker): +def wait_profiles_online(maker, timeout=None): """Wait for all profiles to be online with spinner progress. Args: maker: AccountMaker instance with accounts to wait for + timeout: Optional seconds before giving up and raising CMPingError Raises: SystemExit: If waiting for profiles fails @@ -511,7 +530,7 @@ def wait_profiles_online(maker): def wait_online_thread(): nonlocal online_error try: - maker.wait_all_online() + maker.wait_all_online(timeout=timeout) except Exception as e: online_error = e finally: @@ -531,17 +550,19 @@ def wait_online_thread(): wait_thread.join() if online_error: - print(f"\n✗ Timeout or error waiting for profiles to be online: {online_error}") - sys.exit(1) + raise CMPingError( + f"Timeout or error waiting for profiles to be online: {online_error}" + ) from online_error print_progress("Waiting for profiles to be online", done=True) -def wait_profiles_online_multi(makers): +def wait_profiles_online_multi(makers, timeout=None): """Wait for all profiles to be online with spinner progress. Args: makers: List of AccountMaker instances with accounts to wait for + timeout: Optional seconds before giving up and raising CMPingError Raises: SystemExit: If waiting for profiles fails @@ -550,7 +571,7 @@ def wait_profiles_online_multi(makers): def wait_online_thread(maker): try: - maker.wait_all_online() + maker.wait_all_online(timeout=timeout) except Exception as e: online_errors.append(e) @@ -572,15 +593,23 @@ def wait_online_thread(maker): t.join() if online_errors: - print(f"\n✗ Timeout or error waiting for profiles to be online: {online_errors[0]}") - sys.exit(1) + raise CMPingError( + f"Timeout or error waiting for profiles to be online: {online_errors[0]}" + ) from online_errors[0] print_progress("Waiting for profiles to be online", done=True) -def perform_ping(args): +def perform_ping(args, accounts_dir=None, timeout=None): """Main ping execution function with timing measurements. + Args: + args: Namespace with relay1, relay2, count, interval, verbose, + numrecipients, reset attributes. + accounts_dir: Optional base directory for account storage. + Defaults to $XDG_CACHE_HOME/cmping. Override this to isolate + concurrent probes (each needs its own DB to avoid locking). + Timing Phases: 1. account_setup_time: Time to create and configure all accounts 2. group_join_time: Time for all receivers to join the group @@ -588,8 +617,17 @@ def perform_ping(args): Returns: Pinger: The pinger object with results + Also has account_setup_time, group_join_time, message_time (float, seconds) + and results list of (seq, ms_duration, receiver_idx) tuples. + + Raises: + CMPingError: On account setup or connectivity failures. """ - base_accounts_dir = xdg_cache_home().joinpath("cmping") + if accounts_dir is not None: + from pathlib import Path + base_accounts_dir = Path(accounts_dir) + else: + base_accounts_dir = xdg_cache_home().joinpath("cmping") # Determine unique relays being tested. Using a set to deduplicate when # relay1 == relay2 (same relay testing), so we only create one RPC context. @@ -641,7 +679,7 @@ def perform_ping(args): # Wait for all accounts to be online with timeout feedback # Combine all makers for waiting all_makers = [relay_contexts[r].maker for r in relays] - wait_profiles_online_multi(all_makers) + wait_profiles_online_multi(all_makers, timeout=timeout) account_setup_time = time.time() - account_setup_start @@ -660,6 +698,8 @@ def perform_ping(args): message_start = time.time() pinger = Pinger(args, sender, group, receivers) + if timeout is not None: + pinger.deadline = time.time() + timeout received = {} # Track current sequence for output formatting current_seq = None @@ -670,6 +710,7 @@ def perform_ping(args): if seq not in received: received[seq] = [] received[seq].append(ms_duration) + pinger.results.append((seq, ms_duration, receiver_idx)) # Track timing for this sequence if seq not in seq_tracking: @@ -715,6 +756,7 @@ def perform_ping(args): except KeyboardInterrupt: pass + pinger._send_thread.join(timeout=2.0) message_time = time.time() - message_start if current_seq is not None: @@ -753,6 +795,11 @@ def perform_ping(args): recv_rate = pinger.received / message_time print(f"recv rate: {recv_rate:.2f} msg/s") + # Store timing data on pinger + pinger.account_setup_time = account_setup_time + pinger.group_join_time = group_join_time + pinger.message_time = message_time + return pinger finally: # Clean up all RPC contexts @@ -803,10 +850,18 @@ def __init__(self, args, sender, group, receivers): ) ALPHANUMERIC = string.ascii_lowercase + string.digits self.tx = "".join(random.choices(ALPHANUMERIC, k=30)) - t = threading.Thread(target=self.send_pings, daemon=True) self.sent = 0 self.received = 0 - t.start() + self.results = [] # list of (seq, ms_duration, receiver_idx) + self.account_setup_time = 0.0 + self.group_join_time = 0.0 + self.message_time = 0.0 + # Optional wall-clock deadline for the messaging phase. When set, + # send_pings() stops sending and receive() stops waiting at this time. + # Set externally (e.g. by perform_ping) after setup phases complete. + self.deadline = None + self._send_thread = threading.Thread(target=self.send_pings, daemon=True) + self._send_thread.start() @property def loss(self): @@ -818,15 +873,21 @@ def send_pings(self): Each message contains: unique_id timestamp sequence_number Flow: Sender -> SMTP relay1 -> IMAP relay2 -> All receivers + + Respects self.deadline: stops sending early when the wall clock passes + the deadline so we don't fire pings we'll never wait for. """ for seq in range(self.args.count): + if self.deadline is not None and time.time() >= self.deadline: + break text = f"{self.tx} {time.time():.4f} {seq:17}" self.group.send_text(text) self.sent += 1 time.sleep(self.args.interval) # we sent all pings, let's wait a bit, then force quit if main didn't finish - time.sleep(60) - os.kill(os.getpid(), signal.SIGINT) + if self.deadline is None: + time.sleep(60) + os.kill(os.getpid(), signal.SIGINT) def receive(self): """Receive ping messages from all receivers. @@ -846,12 +907,20 @@ def receive(self): # Create a queue to collect events from all receivers event_queue = queue.Queue() + stop_event = threading.Event() + def receiver_thread(receiver_idx, receiver): """Thread function to listen to events from a single receiver.""" - while True: + # Use a timeout-based poll so the thread exits promptly when + # stop_event is set, rather than blocking indefinitely on + # queue.get() inside wait_for_event() and leaking across rounds. + account_queue = receiver._rpc.get_queue(receiver.id) + while not stop_event.is_set(): try: - event = receiver.wait_for_event() - event_queue.put((receiver_idx, receiver, event)) + item = account_queue.get(timeout=1.0) + event_queue.put((receiver_idx, receiver, AttrDict(item))) + except queue.Empty: + continue except Exception: # If there's an error, put it in the queue event_queue.put((receiver_idx, receiver, None)) @@ -866,53 +935,60 @@ def receiver_thread(receiver_idx, receiver): t.start() threads.append(t) - while num_pending > 0: - try: - receiver_idx, receiver, event = event_queue.get(timeout=1.0) - if event is None: - continue - - if event.kind == EventType.INCOMING_MSG: - msg = receiver.get_message_by_id(event.msg_id) - text = msg.get_snapshot().text - parts = text.strip().split() - if len(parts) == 3 and parts[0] == self.tx: - seq = int(parts[2]) - if seq not in received_by_receiver: - received_by_receiver[seq] = set() - if receiver_idx not in received_by_receiver[seq]: - ms_duration = (time.time() - float(parts[1])) * 1000 - self.received += 1 - num_pending -= 1 - received_by_receiver[seq].add(receiver_idx) - yield seq, ms_duration, len(text), receiver_idx - start_clock = time.time() + try: + while num_pending > 0: + if self.deadline is not None and time.time() >= self.deadline: + break + try: + receiver_idx, receiver, event = event_queue.get(timeout=1.0) + if event is None: + continue + + if event.kind == EventType.INCOMING_MSG: + msg = receiver.get_message_by_id(event.msg_id) + text = msg.get_snapshot().text + parts = text.strip().split() + if len(parts) == 3 and parts[0] == self.tx: + seq = int(parts[2]) + if seq not in received_by_receiver: + received_by_receiver[seq] = set() + if receiver_idx not in received_by_receiver[seq]: + ms_duration = (time.time() - float(parts[1])) * 1000 + self.received += 1 + num_pending -= 1 + received_by_receiver[seq].add(receiver_idx) + yield seq, ms_duration, len(text), receiver_idx + start_clock = time.time() + elif self.args.verbose >= 3: + # Log non-ping messages at verbose level 3 + receiver_addr = self.receivers_addrs[receiver_idx] + ellipsis = "..." if len(text) > 50 else "" + print( + f" [{receiver_addr}] INCOMING_MSG (non-ping): {text[:50]}{ellipsis}" + ) + elif event.kind == EventType.ERROR and self.args.verbose >= 1: + print(f"✗ ERROR: {event.msg}") + elif event.kind == EventType.MSG_FAILED and self.args.verbose >= 1: + msg = receiver.get_message_by_id(event.msg_id) + text = msg.get_snapshot().text + print(f"✗ Message failed: {text}") + elif ( + event.kind in (EventType.INFO, EventType.WARNING) + and self.args.verbose >= 1 + ): + ms_now = (time.time() - start_clock) * 1000 + print(f"INFO {ms_now:07.1f}ms: {event.msg}") elif self.args.verbose >= 3: - # Log non-ping messages at verbose level 3 + # Log all other events at verbose level 3 receiver_addr = self.receivers_addrs[receiver_idx] - ellipsis = "..." if len(text) > 50 else "" - print( - f" [{receiver_addr}] INCOMING_MSG (non-ping): {text[:50]}{ellipsis}" - ) - elif event.kind == EventType.ERROR and self.args.verbose >= 1: - print(f"✗ ERROR: {event.msg}") - elif event.kind == EventType.MSG_FAILED and self.args.verbose >= 1: - msg = receiver.get_message_by_id(event.msg_id) - text = msg.get_snapshot().text - print(f"✗ Message failed: {text}") - elif ( - event.kind in (EventType.INFO, EventType.WARNING) - and self.args.verbose >= 1 - ): - ms_now = (time.time() - start_clock) * 1000 - print(f"INFO {ms_now:07.1f}ms: {event.msg}") - elif self.args.verbose >= 3: - # Log all other events at verbose level 3 - receiver_addr = self.receivers_addrs[receiver_idx] - log_event_verbose(event, receiver_addr) - except queue.Empty: - # Timeout occurred, check if we should continue - continue + log_event_verbose(event, receiver_addr) + except queue.Empty: + # Timeout occurred, check if we should continue + continue + finally: + stop_event.set() + for t in threads: + t.join(timeout=2.0) if __name__ == "__main__":