Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/workos/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from pydantic import TypeAdapter
from typing_extensions import TypedDict

from workos.types.authorization.access_evaluation import AccessEvaluation
from workos.types.authorization.environment_role import (
EnvironmentRole,
EnvironmentRoleList,
)
from workos.types.authorization.organization_role import OrganizationRole
from workos.types.authorization.permission import Permission
from workos.types.authorization.resource_identifier import ResourceIdentifier
from workos.types.authorization.resource import Resource
from workos.types.authorization.role import Role, RoleList
from workos.types.list_resource import (
Expand Down Expand Up @@ -268,6 +270,14 @@ def delete_resource_by_external_id(
cascade_delete: Optional[bool] = None,
) -> SyncOrAsync[None]: ...

def check(
self,
organization_membership_id: str,
*,
permission_slug: str,
resource: ResourceIdentifier,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename to resource_identifier

) -> SyncOrAsync[AccessEvaluation]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -722,6 +732,24 @@ def delete_resource_by_external_id(
params=params if params else None,
)

def check(
self,
organization_membership_id: str,
*,
permission_slug: str,
resource: ResourceIdentifier,
) -> AccessEvaluation:
json: Dict[str, Any] = {"permission_slug": permission_slug}
json.update(resource)

response = self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/check",
method=REQUEST_METHOD_POST,
json=json,
)

return AccessEvaluation.model_validate(response)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -1175,3 +1203,21 @@ async def delete_resource_by_external_id(
method=REQUEST_METHOD_DELETE,
params=params if params else None,
)

async def check(
self,
organization_membership_id: str,
*,
permission_slug: str,
resource: ResourceIdentifier,
) -> AccessEvaluation:
json: Dict[str, Any] = {"permission_slug": permission_slug}
json.update(resource)

response = await self._http_client.request(
f"authorization/organization_memberships/{organization_membership_id}/check",
method=REQUEST_METHOD_POST,
json=json,
)

return AccessEvaluation.model_validate(response)
5 changes: 5 additions & 0 deletions src/workos/types/authorization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
)
from workos.types.authorization.permission import Permission
from workos.types.authorization.resource import Resource
from workos.types.authorization.resource_identifier import (
ResourceIdentifier,
ResourceIdentifierByExternalId,
ResourceIdentifierById,
)
from workos.types.authorization.role import (
Role,
RoleList,
Expand Down
15 changes: 15 additions & 0 deletions src/workos/types/authorization/resource_identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Union

from typing_extensions import TypedDict


class ResourceIdentifierById(TypedDict):
resource_id: str


class ResourceIdentifierByExternalId(TypedDict):
resource_external_id: str
resource_type_slug: str


ResourceIdentifier = Union[ResourceIdentifierById, ResourceIdentifierByExternalId]
127 changes: 127 additions & 0 deletions tests/test_authorization_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from typing import Union

import pytest
from tests.utils.syncify import syncify
from workos.authorization import AsyncAuthorization, Authorization
from workos.types.authorization.resource_identifier import (
ResourceIdentifierByExternalId,
ResourceIdentifierById,
)


@pytest.mark.sync_and_async(Authorization, AsyncAuthorization)
class TestAuthorizationCheck:
@pytest.fixture(autouse=True)
def setup(self, module_instance: Union[Authorization, AsyncAuthorization]):
self.http_client = module_instance._http_client
self.authorization = module_instance

@pytest.fixture
def mock_check_authorized(self):
return {"authorized": True}

@pytest.fixture
def mock_check_unauthorized(self):
return {"authorized": False}

def test_check_authorized(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

result = syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource=ResourceIdentifierById(resource_id="res_01ABC"),
)
)

assert result.authorized is True
assert request_kwargs["method"] == "post"
assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01ABC/check"
)

def test_check_unauthorized(
self, mock_check_unauthorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_unauthorized, 200
)

result = syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:write",
resource=ResourceIdentifierById(resource_id="res_01ABC"),
)
)

assert result.authorized is False
assert request_kwargs["method"] == "post"

def test_check_with_resource_id(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource=ResourceIdentifierById(resource_id="res_01XYZ"),
)
)

assert request_kwargs["json"] == {
"permission_slug": "documents:read",
"resource_id": "res_01XYZ",
}

def test_check_with_resource_external_id(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

syncify(
self.authorization.check(
"om_01ABC",
permission_slug="documents:read",
resource=ResourceIdentifierByExternalId(
resource_external_id="ext_doc_123",
resource_type_slug="document",
),
)
)

assert request_kwargs["json"] == {
"permission_slug": "documents:read",
"resource_external_id": "ext_doc_123",
"resource_type_slug": "document",
}

def test_check_url_construction(
self, mock_check_authorized, capture_and_mock_http_client_request
):
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock_check_authorized, 200
)

syncify(
self.authorization.check(
"om_01MEMBERSHIP",
permission_slug="admin:access",
resource=ResourceIdentifierById(resource_id="res_01ABC"),
)
)

assert request_kwargs["url"].endswith(
"/authorization/organization_memberships/om_01MEMBERSHIP/check"
)