Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions python/keystoneauth-kubeservicetoken/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# File-backed OIDC access token plugin

This package provides a `keystoneauth1` plugin that extends the OIDC
access-token flow by reading the OIDC access token from a file
(`access_token_file`) at authentication and reauthentication time.

## Auth type

- `v3oidcaccesstokenfile`

## Required options

- `auth_url`
- `identity_provider`
- `protocol`
- `access_token_file`

## Service configuration examples

### Nova (`nova.conf`)

```ini
[service_user]
auth_type = v3oidcaccesstokenfile
auth_url = https://keystone.example/v3
identity_provider = k8s-workload-idp
protocol = openid
access_token_file = /var/run/secrets/openstack/nova-oidc-token
send_service_user_token = true
```

### Ironic -> Neutron client (`ironic.conf`)

```ini
[neutron]
auth_type = v3oidcaccesstokenfile
auth_url = https://keystone.internal:5000/v3
identity_provider = k8s-workload-idp
protocol = openid
access_token_file = /var/run/secrets/openstack/ironic-oidc-token
region_name = RegionOne
```

### Neutron -> Placement client (`neutron.conf`)

```ini
[placement]
auth_type = v3oidcaccesstokenfile
auth_url = https://keystone.example/v3
identity_provider = k8s-workload-idp
protocol = openid
access_token_file = /var/run/secrets/openstack/neutron-placement-oidc-token
valid_interfaces = internal
```

## Behavior notes

- Token content is read from file on authentication and reauthentication.
- Whitespace in the token file is trimmed.
- Missing, unreadable, or empty token files fail with an explicit auth error.
- Keystone token caching is preserved; token file updates are consumed when
keystoneauth reauthenticates near Keystone token expiry.

## Rollout notes

1. Install this package where the OpenStack service runs.
2. Configure the service to use `auth_type = v3oidcaccesstokenfile`.
3. Set `access_token_file` to the rotated token file path. This will usually be
path to where the Kubernetes secret is mounted.
4. Optionally verify at least one full Keystone token renewal cycle to confirm
file updates are consumed.
40 changes: 40 additions & 0 deletions python/keystoneauth-kubeservicetoken/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "keystoneauth-kubeservicetoken"
version = "0.1.0"
description = "Keystoneauth plugin that reads OIDC access tokens from a file"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"keystoneauth1>=5.0.0",
]

[dependency-groups]
dev = [
"pytest>=8.0.0",
"pytest-cov>=7.1.0",
]

[project.entry-points."keystoneauth1.plugin"]
v3oidcaccesstokenfile = "keystoneauth_kubeservicetoken.oidc:OpenIDConnectAccessTokenFileLoader"

[tool.setuptools]
package-dir = {"" = "src"}

[tool.setuptools.packages.find]
where = ["src"]

[tool.pytest.ini_options]
testpaths = ["tests"]

[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = ["E", "F", "B", "I", "UP", "S"]

[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["S101", "S106"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""keystoneauth_kubeservicetoken package."""

from keystoneauth_kubeservicetoken.oidc import (
OpenIDConnectAccessTokenFile,
OpenIDConnectAccessTokenFileLoader,
)

__all__ = [
"OpenIDConnectAccessTokenFile",
"OpenIDConnectAccessTokenFileLoader",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""File-backed OIDC access token plugin for keystoneauth."""

from __future__ import annotations

from keystoneauth1 import exceptions, loading
from keystoneauth1.identity.v3 import oidc
from keystoneauth1.loading._plugins.identity import v3 as identity_v3_loading


class OpenIDConnectAccessTokenFile(oidc.OidcAccessToken):
"""OIDC access-token auth plugin that reads the token from a file."""

def __init__(
self, *args, access_token_file: str, access_token: str | None = None, **kwargs
):
if not access_token_file:
raise exceptions.OptionError("'access_token_file' is required")

self.access_token_file = access_token_file

super().__init__(*args, access_token=access_token or "", **kwargs)

def _read_access_token(self) -> str:
try:
with open(self.access_token_file, encoding="utf-8") as token_file:
token = token_file.read().strip()
except FileNotFoundError as exc:
msg = f"OIDC access token file does not exist: {self.access_token_file}"
raise exceptions.AuthorizationFailure(msg) from exc
except OSError as exc:
msg = (
"Unable to read OIDC access token file "
f"'{self.access_token_file}': {exc}"
)
raise exceptions.AuthorizationFailure(msg) from exc

if not token:
msg = f"OIDC access token file is empty: {self.access_token_file}"
raise exceptions.AuthorizationFailure(msg)

return token

def get_unscoped_auth_ref(self, session):
self.access_token = self._read_access_token()
return super().get_unscoped_auth_ref(session)


class OpenIDConnectAccessTokenFileLoader(identity_v3_loading.OpenIDConnectAccessToken):
"""Loader for the file-backed OIDC access-token auth plugin."""

@property
def plugin_class(self):
return OpenIDConnectAccessTokenFile

def get_options(self):
options = [
option for option in super().get_options() if option.dest != "access_token"
]

options.append(
loading.Opt(
"access-token-file",
required=True,
help="Path to a file containing the OIDC access token.",
)
)
return options
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from __future__ import annotations

from pathlib import Path
from unittest.mock import patch

import pytest
from keystoneauth1 import exceptions, loading
from keystoneauth1.identity.v3 import oidc as upstream_oidc

from keystoneauth_kubeservicetoken.oidc import (
OpenIDConnectAccessTokenFile,
OpenIDConnectAccessTokenFileLoader,
)


class FakeAuthRef:
def __init__(self, auth_token: str, expires_soon: bool):
self.auth_token = auth_token
self._expires_soon = expires_soon

def will_expire_soon(self, _stale_duration: int) -> bool:
return self._expires_soon


def _create_plugin(token_file: Path) -> OpenIDConnectAccessTokenFile:
return OpenIDConnectAccessTokenFile(
auth_url="https://keystone.example/v3",
identity_provider="example-idp",
protocol="openid",
access_token_file=str(token_file),
)


@pytest.fixture
def auth_options() -> dict[str, str]:
return {
"auth_url": "https://keystone.example/v3",
"identity_provider": "example-idp",
"protocol": "openid",
}


def test_loader_options_require_access_token_file_and_not_access_token():
loader = OpenIDConnectAccessTokenFileLoader()

options = {option.dest: option for option in loader.get_options()}

assert "access_token_file" in options
assert options["access_token_file"].required
assert "access_token" not in options


def test_loader_can_initialize_plugin_with_access_token_file_only(
tmp_path, auth_options
):
token_file = tmp_path / "token"
token_file.write_text("oidc-token", encoding="utf-8")

loader = OpenIDConnectAccessTokenFileLoader()
plugin = loader.load_from_options(
**auth_options,
access_token_file=str(token_file),
)

assert isinstance(plugin, OpenIDConnectAccessTokenFile)
assert plugin.access_token_file == str(token_file)


def test_plugin_loader_is_discoverable_by_auth_type():
loader = loading.get_plugin_loader("v3oidcaccesstokenfile")

assert isinstance(loader, OpenIDConnectAccessTokenFileLoader)


def test_missing_access_token_file_configuration_fails(auth_options):
with pytest.raises(exceptions.OptionError, match="access_token_file"):
OpenIDConnectAccessTokenFile(
**auth_options,
access_token_file="",
)


def test_auth_reads_trimmed_token_from_file_each_time(tmp_path):
token_file = tmp_path / "token"
token_file.write_text(" token-a\n", encoding="utf-8")

plugin = _create_plugin(token_file)
observed_tokens: list[str] = []

def fake_super_get_unscoped_auth_ref(self, _session):
observed_tokens.append(self.access_token)
return object()

with patch.object(
upstream_oidc.OidcAccessToken,
"get_unscoped_auth_ref",
autospec=True,
side_effect=fake_super_get_unscoped_auth_ref,
):
plugin.get_unscoped_auth_ref(session=None)
token_file.write_text("token-b\n", encoding="utf-8")
plugin.get_unscoped_auth_ref(session=None)

assert observed_tokens == ["token-a", "token-b"]


def test_auth_fails_for_missing_file(tmp_path):
plugin = _create_plugin(tmp_path / "missing-token")

with pytest.raises(exceptions.AuthorizationFailure, match="does not exist"):
plugin._read_access_token()


def test_auth_fails_for_unreadable_file(tmp_path):
token_file = tmp_path / "token"
token_file.write_text("token", encoding="utf-8")
plugin = _create_plugin(token_file)

with patch("builtins.open", side_effect=OSError("permission denied")):
with pytest.raises(exceptions.AuthorizationFailure, match="Unable to read"):
plugin._read_access_token()


def test_auth_fails_for_empty_file(tmp_path):
token_file = tmp_path / "token"
token_file.write_text("\n\t", encoding="utf-8")
plugin = _create_plugin(token_file)

with pytest.raises(exceptions.AuthorizationFailure, match="is empty"):
plugin._read_access_token()


def test_file_updates_consumed_on_reauth_not_every_request(tmp_path):
token_file = tmp_path / "token"
token_file.write_text("token-a", encoding="utf-8")
plugin = _create_plugin(token_file)

plugin.auth_ref = FakeAuthRef(
auth_token="cached-keystone-token", expires_soon=False
)

def fake_get_auth_ref(_session):
plugin.get_unscoped_auth_ref(session=None)
return FakeAuthRef(auth_token=f"ks-{plugin.access_token}", expires_soon=False)

with (
patch.object(
upstream_oidc.OidcAccessToken,
"get_unscoped_auth_ref",
autospec=True,
return_value=object(),
),
patch.object(plugin, "get_auth_ref", side_effect=fake_get_auth_ref),
):
token_file.write_text("token-b", encoding="utf-8")

assert plugin.get_token(session=None) == "cached-keystone-token"

plugin.auth_ref = FakeAuthRef(auth_token="expiring-token", expires_soon=True)
assert plugin.get_token(session=None) == "ks-token-b"
Loading
Loading