From c817bb337e8acdc932f7029936af6b0c4d7dc9bb Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Wed, 11 Mar 2026 13:48:59 +0100 Subject: [PATCH 1/2] Add extraPaths to pyrightconfig for local library resolution Signed-off-by: Lars Erik Wik --- pyrightconfig.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyrightconfig.json b/pyrightconfig.json index 389015c..3ab6871 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -1,3 +1,4 @@ { - "reportMissingImports": "none" + "reportMissingImports": "none", + "extraPaths": ["libraries/python"] } From f9ee09ccc211351cdebd4932c7ba2f414d9acb5c Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Thu, 12 Mar 2026 10:37:40 +0100 Subject: [PATCH 2/2] Custom promise type for managing global sshd configuration Signed-off-by: Lars Erik Wik --- cfbs.json | 9 + promise-types/sshd/LICENSE | 21 ++ promise-types/sshd/README.md | 63 ++++ promise-types/sshd/enable.cf | 6 + promise-types/sshd/example.cf | 33 ++ promise-types/sshd/sshd.py | 570 ++++++++++++++++++++++++++++++++++ 6 files changed, 702 insertions(+) create mode 100644 promise-types/sshd/LICENSE create mode 100644 promise-types/sshd/README.md create mode 100644 promise-types/sshd/enable.cf create mode 100644 promise-types/sshd/example.cf create mode 100644 promise-types/sshd/sshd.py diff --git a/cfbs.json b/cfbs.json index 9a1f191..cc2c295 100644 --- a/cfbs.json +++ b/cfbs.json @@ -302,6 +302,15 @@ "append enable.cf services/init.cf" ] }, + "promise-type-sshd": { + "description": "Promise type to configure sshd.", + "subdirectory": "promise-types/sshd", + "dependencies": ["library-for-promise-types-in-python"], + "steps": [ + "copy sshd.py modules/promises/", + "append enable.cf services/init.cf" + ] + }, "promise-type-systemd": { "description": "Promise type to manage systemd services.", "subdirectory": "promise-types/systemd", diff --git a/promise-types/sshd/LICENSE b/promise-types/sshd/LICENSE new file mode 100644 index 0000000..6e1016e --- /dev/null +++ b/promise-types/sshd/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Northern.tech + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/promise-types/sshd/README.md b/promise-types/sshd/README.md new file mode 100644 index 0000000..4f47b01 --- /dev/null +++ b/promise-types/sshd/README.md @@ -0,0 +1,63 @@ +# `sshd` promise type + +Configures sshd and restarts the service when configuration changes. + +## Promiser +An arbitrary human-readable label that appears in log messages and reports. +Since there is only one global sshd configuration, the promiser is not used to identify a resource. +Example: `"global sshd config"`. + +## Attributes +- Named using sshd's native directive names (e.g. `PermitRootLogin`, not `permit_root_login`) +- Values can be strings or slists +- Validated against `sshd -t` during promise evaluation + +## What the module manages internally +1. **Include directive** — ensures the base `sshd_config` includes the drop-in directory (`sshd_config.d/`) as its first non-comment directive +2. **Drop-in directory** — creates the drop-in directory if it doesn't exist +3. **Drop-in file** — writes directives to `sshd_config.d/00-cfengine.conf` +4. **Service restart** — restarts sshd if configuration was changed and the service is already running +5. **Verification** — verifies the desired attributes appear in the effective sshd config (`sshd -T`) + +## What the module does NOT do +- Install sshd — that is a `packages:` promise +- Ensure sshd is running — that is a `services:` promise +- Manage match blocks — those are a policy-level concern + +## Policy +```cf3 +bundle agent sshd_config +{ + packages: + "openssh-server" + policy => "present"; + + services: + "sshd" + service_policy => "start"; + + vars: + "allowed_users" slist => { "alice", "bob" }; + + sshd: + "global" + PermitRootLogin => "no", + PasswordAuthentication => "no", + Port => "22", + AllowUsers => @(allowed_users); +} +``` + +## Authors + +This software was created by the team at [Northern.tech](https://northern.tech), with many contributions from the community. +Thanks everyone! + +## Contribute + +Feel free to open pull requests to expand this documentation, add features, or fix problems. +You can also pick up an existing task or file an issue in [our bug tracker](https://northerntech.atlassian.net/). + +## License + +This software is licensed under the MIT License. See LICENSE in the root of the repository for the full license text. diff --git a/promise-types/sshd/enable.cf b/promise-types/sshd/enable.cf new file mode 100644 index 0000000..89b2c9a --- /dev/null +++ b/promise-types/sshd/enable.cf @@ -0,0 +1,6 @@ +promise agent sshd +# @brief Define sshd promise type +{ + path => "$(sys.workdir)/modules/promises/sshd.py"; + interpreter => "/usr/bin/python3"; +} diff --git a/promise-types/sshd/example.cf b/promise-types/sshd/example.cf new file mode 100644 index 0000000..4070cdb --- /dev/null +++ b/promise-types/sshd/example.cf @@ -0,0 +1,33 @@ +promise agent sshd +# @brief Define sshd promise type +{ + path => "$(sys.workdir)/modules/promises/sshd.py"; + interpreter => "/usr/bin/python3"; +} + +bundle agent example +{ + packages: + "openssh-server" + policy => "present"; + + services: + "sshd" + service_policy => "start"; + + vars: + "allowed_users" slist => { "alice", "bob" }; + + sshd: + "global" + PermitRootLogin => "no", + PasswordAuthentication => "no", + Port => "22", + AllowUsers => @(allowed_users); +} + +bundle agent __main__ +{ + methods: + "example"; +} diff --git a/promise-types/sshd/sshd.py b/promise-types/sshd/sshd.py new file mode 100644 index 0000000..c0c882e --- /dev/null +++ b/promise-types/sshd/sshd.py @@ -0,0 +1,570 @@ +import os +import re +import sys +import subprocess +import tempfile + +try: + from cfengine_module_library import PromiseModule, ValidationError, Result +except ImportError: + sys.path.append(os.path.join(os.path.dirname(__file__), "../../libraries/python")) + from cfengine_module_library import PromiseModule, ValidationError, Result + + +BASE_CONFIG = "/etc/ssh/sshd_config" +DROP_IN_DIR = "/etc/ssh/sshd_config.d/" +CFE_CONFIG = os.path.join(DROP_IN_DIR, "00-cfengine.conf") + + +def sshd_quote(value: str) -> str: + """Quote a string for sshd_config. Values containing whitespace, '#', or + '\"' are wrapped in double quotes, with internal backslashes and double + quotes escaped.""" + if not value: + return '""' + if re.search(r'[\s#"]', value): + escaped = value.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + return value + + +def to_sshd_value(value: str | list[str]) -> str: + """Convert a Python value to an sshd config value. Lists are space-joined, + individual strings are quoted when necessary.""" + if isinstance(value, list): + return " ".join(sshd_quote(v) for v in value) + if isinstance(value, str): + return sshd_quote(value) + raise TypeError(f"Expected str or list[str], got {type(value).__name__}") + + +def try_unlink(path: str): + """Remove a file, ignoring errors if it no longer exists.""" + try: + os.unlink(path) + except OSError: + pass + + +def safe_write(path: str, lines: list[str]): + """Atomically write lines to a file via a temporary file in the same directory.""" + dir = os.path.dirname(path) + base = os.path.basename(path) + prefix, suffix = os.path.splitext(base) + + fd, tmp_path = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dir) + try: + os.fchmod(fd, 0o600) # rw------- + with os.fdopen(fd, "w") as f: + f.writelines(lines) + os.replace(tmp_path, path) + except BaseException: + # BaseException (not Exception) to also clean up on KeyboardInterrupt/SystemExit + try_unlink(tmp_path) + raise + + +def is_drop_in_directive(directive: str) -> bool: + """Check if a directive is an Include for the drop-in config directory.""" + m = re.match( + rf"include(\s+|\s*=\s*){re.escape(DROP_IN_DIR)}\*\.conf", + directive.strip(), + re.IGNORECASE, + ) + return m is not None + + +def update_result(old: str, new: str) -> str: + """Return the worst of two results. Severity: KEPT < REPAIRED < NOT_KEPT.""" + if old == Result.NOT_KEPT or new == Result.NOT_KEPT: + return Result.NOT_KEPT + if old == Result.REPAIRED or new == Result.REPAIRED: + return Result.REPAIRED + return Result.KEPT + + +def strip_trailing_comment(directive: str) -> str: + """Remove trailing comment from a directive, respecting double-quoted strings.""" + in_quotes = False + i = 0 + while i < len(directive): + c = directive[i] + if c == "\\" and in_quotes: + i += 2 # skip escaped character + continue + if c == '"': + in_quotes = not in_quotes + elif c == "#" and not in_quotes: + return directive[:i].rstrip() + i += 1 + return directive + + +def normalize_directive(directive: str) -> str: + """Normalize a directive by removing trailing comments and replacing = with a space.""" + directive = strip_trailing_comment(directive) + # Normalize separator (= or whitespace) to a single space + directive = re.sub(r"\s*=\s*", " ", directive, count=1) + return directive.strip().lower() + + +def get_directives(lines: list[str]) -> set[str]: + """Extract and normalize all non-comment, non-empty directives from lines.""" + return { + normalize_directive(line) + for line in lines + if line.strip() and not line.strip().startswith("#") + } + + +def has_same_directives(a: list[str], b: list[str]) -> bool: + """Check if two sets of lines contain the same directives, ignoring comments and order.""" + return get_directives(a) == get_directives(b) + + +def get_first_directive(lines: list[str]) -> str | None: + """Return the first non-comment, non-empty directive, or None if not found.""" + for line in lines: + stripped = line.strip() + if stripped and not stripped.startswith("#"): + return stripped + return None + + +class SshdPromiseTypeModule(PromiseModule): + + def __init__(self): + super().__init__("sshd_promise_module", "0.0.0") + + def validate_promise( # pyright: ignore[reportImplicitOverride] + self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] + ): + for attr, value in attributes.items(): + if not isinstance(value, (str, list)): + raise ValidationError(f"Attribute '{attr}' must be a string or slist") + + def ensure_include_directive(self) -> str: + """Ensure the base sshd config includes the drop-in directory.""" + try: + with open(BASE_CONFIG, "r") as f: + lines = f.readlines() + except FileNotFoundError: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Base configuration file '{BASE_CONFIG}' does not exist" + ) + return Result.NOT_KEPT + + first_directive = get_first_directive(lines) + + if first_directive is None or not is_drop_in_directive(first_directive): + include_directive = f"Include {DROP_IN_DIR}*.conf" + + self.log_debug( # pyright: ignore[reportUnknownMemberType] + f"Expected first directive in '{BASE_CONFIG}' to be '{include_directive}'" + ) + + lines.insert(0, f"{include_directive} # Added by CFEngine\n") + try: + safe_write(BASE_CONFIG, lines) + except Exception as e: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Failed to write '{BASE_CONFIG}': {e}" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + f"Added include directive to '{BASE_CONFIG}'" + ) + return Result.REPAIRED + + return Result.KEPT + + def ensure_drop_in_dir(self) -> str: + """Ensure the drop-in config directory exists.""" + if os.path.isdir(DROP_IN_DIR): + return Result.KEPT + + try: + os.makedirs(DROP_IN_DIR, mode=0o755) # rwxr-xr-x + except Exception as e: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Failed to create drop-in directory '{DROP_IN_DIR}': {e}" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + f"Created drop-in directory '{DROP_IN_DIR}'" + ) + return Result.REPAIRED + + def ensure_drop_in_config(self, attributes: dict[str, object]) -> str: + """Write the CFEngine drop-in config file with the given attributes.""" + lines = ["# Managed by CFEngine\n"] + for attr, value in attributes.items(): + # Ensured by validate_promise + assert isinstance(value, (str, list)) + lines.append( + f"{attr} {to_sshd_value(value)}\n" # pyright: ignore[reportUnknownArgumentType] + ) + + try: + with open(CFE_CONFIG, "r") as f: + existing = f.readlines() + except FileNotFoundError: + existing = [] + + if has_same_directives(lines, existing): + return Result.KEPT + + try: + safe_write(CFE_CONFIG, lines) + except Exception as e: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Failed to write drop-in config '{CFE_CONFIG}': {e}" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + f"Updated drop-in config '{CFE_CONFIG}'" + ) + return Result.REPAIRED + + def validate_config(self) -> str: + """Validate the sshd configuration using sshd -t.""" + r = subprocess.run( + ["/usr/sbin/sshd", "-t"], + capture_output=True, + text=True, + ) + if r.returncode != 0: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Configuration validation failed: {r.stderr.strip()}" + ) + return Result.NOT_KEPT + return Result.KEPT + + def restart_sshd(self) -> str: + """Restart the sshd service if it is currently running.""" + r = subprocess.run( + ["systemctl", "is-active", "--quiet", "sshd"], + ) + if r.returncode != 0: + # If sshd is not running, do nothing + self.log_debug( # pyright: ignore[reportUnknownMemberType] + "The service sshd is not running" + ) + return Result.KEPT + + r = subprocess.run( + ["systemctl", "restart", "--quiet", "sshd"], + ) + if r.returncode != 0: + self.log_error( # pyright: ignore[reportUnknownMemberType] + "Failed to restart sshd service" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + "Restarted sshd service" + ) + return Result.REPAIRED + + def verify_effective_config(self, attributes: dict[str, object]) -> str: + """Verify that the desired attributes appear in the effective sshd config.""" + r = subprocess.run( + ["/usr/sbin/sshd", "-T"], + capture_output=True, + text=True, + ) + if r.returncode != 0: + self.log_error( # pyright: ignore[reportUnknownMemberType] + "Failed to retrieve effective sshd configuration" + ) + return Result.NOT_KEPT + + effective = get_directives(r.stdout.splitlines()) + + desired: list[str] = [] + for attr, value in attributes.items(): + # Ensured by validate_promise + assert isinstance(value, (str, list)) + if isinstance(value, list): + # sshd -T splits multi-argument keywords into separate + # lines (e.g. "AllowUsers user1 user2" becomes two lines: + # "allowusers user1" and "allowusers user2"), so we must + # expand list values into individual directives to match + # the effective config format for set comparison. + for item in value: + desired.append(f"{attr} {to_sshd_value(item)}") + else: + desired.append(f"{attr} {to_sshd_value(value)}") + + missing = get_directives(desired) - effective + if missing: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Missing directives in effective sshd config: {missing}" + ) + return Result.NOT_KEPT + + return Result.KEPT + + def evaluate_promise( # pyright: ignore[reportImplicitOverride] + self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] + ) -> str: + result = Result.KEPT + + # Step 1: Ensure the base config includes the drop-in directory + result = update_result(result, self.ensure_include_directive()) + + # Step 2: Ensure the drop-in directory exists + result = update_result(result, self.ensure_drop_in_dir()) + + # Step 3: Ensure the drop-in config file contains the desired attributes + result = update_result(result, self.ensure_drop_in_config(attributes)) + + # Step 4: Validate config before restarting sshd + result = update_result(result, self.validate_config()) + + # Step 5: Restart sshd only if configuration was changed + if result == Result.REPAIRED: + result = update_result(result, self.restart_sshd()) + + # Step 6: Verify the effective config matches the desired attributes + if result != Result.NOT_KEPT: + result = update_result(result, self.verify_effective_config(attributes)) + + return result + + +def test_sshd_quote_simple(): + assert sshd_quote("no") == "no" + + +def test_sshd_quote_empty(): + assert sshd_quote("") == '""' + + +def test_sshd_quote_space(): + assert sshd_quote("some value") == '"some value"' + + +def test_sshd_quote_tab(): + assert sshd_quote("some\tvalue") == '"some\tvalue"' + + +def test_sshd_quote_hash(): + assert sshd_quote("before#after") == '"before#after"' + + +def test_sshd_quote_double_quote(): + assert sshd_quote('say "hello"') == '"say \\"hello\\""' + + +def test_sshd_quote_backslash(): + assert sshd_quote("path\\to") == "path\\to" + + +def test_sshd_quote_backslash_and_space(): + assert sshd_quote("path\\to dir") == '"path\\\\to dir"' + + +def test_to_sshd_value_str(): + assert to_sshd_value("no") == "no" + + +def test_to_sshd_value_str_with_spaces(): + assert to_sshd_value("some value") == '"some value"' + + +def test_to_sshd_value_list(): + assert to_sshd_value(["user1", "user2"]) == "user1 user2" + + +def test_to_sshd_value_list_with_quoting(): + assert to_sshd_value(["user1", "user 2"]) == 'user1 "user 2"' + + +def test_get_first_directive(): + lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_no_comments(): + lines = ["PermitRootLogin no\n", "Port 22\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_all_comments(): + lines = ["# comment\n", "# another comment\n"] + assert get_first_directive(lines) is None + + +def test_get_first_directive_empty(): + assert get_first_directive([]) is None + + +def test_get_first_directive_extra_whitespace(): + lines = ["# comment\n", "PermitRootLogin no\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_equal_sign(): + lines = ["# comment\n", "PermitRootLogin=no\n"] + assert get_first_directive(lines) == "PermitRootLogin=no" + + +def test_get_first_directive_blank_lines(): + lines = ["\n", " \n", "# comment\n", "Port 22\n"] + assert get_first_directive(lines) == "Port 22" + + +def test_normalize_directive_simple(): + assert normalize_directive("permitrootlogin no") == "permitrootlogin no" + + +def test_normalize_directive_trailing_comment(): + assert normalize_directive("PermitRootLogin no # comment") == "permitrootlogin no" + + +def test_normalize_directive_equal_sign(): + assert normalize_directive("PermitRootLogin=no") == "permitrootlogin no" + + +def test_normalize_directive_space_equal_space(): + assert normalize_directive("PermitRootLogin = no") == "permitrootlogin no" + + +def test_normalize_directive_equal_and_comment(): + assert normalize_directive("PermitRootLogin=no # comment") == "permitrootlogin no" + + +def test_normalize_directive_leading_trailing_whitespace(): + assert normalize_directive(" PermitRootLogin no ") == "permitrootlogin no" + + +def test_normalize_directive_hash_in_quoted_value(): + assert ( + normalize_directive('Banner "/etc/banner #info"') + == 'banner "/etc/banner #info"' + ) + + +def test_get_directives_filters_comments(): + lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + assert get_directives(lines) == {"permitrootlogin no", "port 22"} + + +def test_get_directives_filters_blank_lines(): + lines = ["\n", "PermitRootLogin no\n", " \n"] + assert get_directives(lines) == {"permitrootlogin no"} + + +def test_get_directives_normalizes(): + lines = ["PermitRootLogin=no # managed\n", "Port 22\n"] + assert get_directives(lines) == {"permitrootlogin no", "port 22"} + + +def test_get_directives_empty(): + assert get_directives([]) == set() + + +def test_has_same_directives_same(): + a = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + b = ["Port 22\n", "PermitRootLogin no\n"] + assert has_same_directives(a, b) + + +def test_has_same_directives_different_comments(): + a = ["# managed by X\n", "PermitRootLogin no\n"] + b = ["# managed by Y\n", "PermitRootLogin no\n"] + assert has_same_directives(a, b) + + +def test_has_same_directives_different_format(): + a = ["PermitRootLogin=no # comment\n"] + b = ["PermitRootLogin no\n"] + assert has_same_directives(a, b) + + +def test_has_same_directives_missing(): + a = ["PermitRootLogin no\n", "Port 22\n"] + b = ["PermitRootLogin no\n"] + assert not has_same_directives(a, b) + + +def test_has_same_directives_extra(): + a = ["PermitRootLogin no\n"] + b = ["PermitRootLogin no\n", "Port 22\n"] + assert not has_same_directives(a, b) + + +def test_is_drop_in_directive_space(): + assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_equal(): + assert is_drop_in_directive(f"Include={DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_space_equal_space(): + assert is_drop_in_directive(f"Include = {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_case_insensitive(): + assert is_drop_in_directive(f"include {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_extra_files(): + assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf /other/path") + + +def test_is_drop_in_directive_wrong_path(): + assert not is_drop_in_directive("Include /other/path/*.conf") + + +def test_is_drop_in_directive_no_separator(): + assert not is_drop_in_directive(f"Include{DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_not_include(): + assert not is_drop_in_directive("permitrootlogin no") + + +def test_update_result_kept_kept(): + assert update_result(Result.KEPT, Result.KEPT) == Result.KEPT + + +def test_update_result_kept_repaired(): + assert update_result(Result.KEPT, Result.REPAIRED) == Result.REPAIRED + + +def test_update_result_kept_not_kept(): + assert update_result(Result.KEPT, Result.NOT_KEPT) == Result.NOT_KEPT + + +def test_update_result_repaired_kept(): + assert update_result(Result.REPAIRED, Result.KEPT) == Result.REPAIRED + + +def test_update_result_repaired_repaired(): + assert update_result(Result.REPAIRED, Result.REPAIRED) == Result.REPAIRED + + +def test_update_result_repaired_not_kept(): + assert update_result(Result.REPAIRED, Result.NOT_KEPT) == Result.NOT_KEPT + + +def test_update_result_not_kept_kept(): + assert update_result(Result.NOT_KEPT, Result.KEPT) == Result.NOT_KEPT + + +def test_update_result_not_kept_repaired(): + assert update_result(Result.NOT_KEPT, Result.REPAIRED) == Result.NOT_KEPT + + +def test_update_result_not_kept_not_kept(): + assert update_result(Result.NOT_KEPT, Result.NOT_KEPT) == Result.NOT_KEPT + + +if __name__ == "__main__": + SshdPromiseTypeModule().start() # pyright: ignore[reportUnknownMemberType]