Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ Change Log
Unreleased
**********

1.2.0 - 2026-03-30
******************

Added
=====

* Add ``get_user_role_assignments_filtered`` api function to fetch user role assignments filtered by user, role, and/or scope.
* Add ``org`` property to ``ContentLibraryData`` and ``CourseOverviewData``.

1.1.0 - 2026-03-17
******************

Expand Down
2 changes: 1 addition & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

import os

__version__ = "1.1.0"
__version__ = "1.2.0"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
23 changes: 21 additions & 2 deletions openedx_authz/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
from abc import abstractmethod
from enum import Enum
from functools import cached_property
from typing import Any, ClassVar, Literal, Type

from attrs import define
Expand Down Expand Up @@ -448,6 +449,15 @@ class ContentLibraryData(ScopeData):
NAMESPACE: ClassVar[str] = "lib"
ID_SEPARATOR: ClassVar[str] = ":"

@property
def org(self) -> str:
"""Get the organization name from the library key.

Returns:
str: The organization name (e.g., ``DemoX`` from ``lib:DemoX:CSPROB``).
"""
return self.library_key.org

@property
def library_id(self) -> str:
"""The library identifier as used in Open edX (e.g., 'lib:DemoX:CSPROB').
Expand All @@ -459,7 +469,7 @@ def library_id(self) -> str:
"""
return self.external_key

@property
@cached_property
def library_key(self) -> LibraryLocatorV2:
"""The LibraryLocatorV2 object for the content library.

Expand Down Expand Up @@ -552,6 +562,15 @@ class CourseOverviewData(ScopeData):
NAMESPACE: ClassVar[str] = "course-v1"
ID_SEPARATOR: ClassVar[str] = "+"

@property
def org(self) -> str:
"""Get the organization name from the course key.

Returns:
str: The organization name (e.g., ``DemoX`` from ``course-v1:DemoX+TestCourse+2024_T1``).
"""
return self.course_key.org

@property
def course_id(self) -> str:
"""The course identifier as used in Open edX (e.g., 'course-v1:TestOrg+TestCourse+2024_T1').
Expand All @@ -563,7 +582,7 @@ def course_id(self) -> str:
"""
return self.external_key

@property
@cached_property
def course_key(self) -> CourseKey:
"""The CourseKey object for the course.

Expand Down
107 changes: 97 additions & 10 deletions openedx_authz/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@
from openedx_authz.models import ExtendedCasbinRule

__all__ = [
"get_permissions_for_single_role",
"get_permissions_for_roles",
"get_all_roles_names",
"get_all_roles_in_scope",
"get_permissions_for_active_roles_in_scope",
"get_role_definitions_in_scope",
"assign_role_to_subject_in_scope",
"batch_assign_role_to_subjects_in_scope",
"unassign_role_from_subject_in_scope",
"batch_unassign_role_from_subjects_in_scope",
"get_subject_role_assignments_in_scope",
"get_subject_role_assignments_for_role_in_scope",
"get_all_roles_in_scope",
"get_all_roles_names",
"get_all_subject_role_assignments_in_scope",
"get_subject_role_assignments",
"get_permissions_for_active_roles_in_scope",
"get_permissions_for_roles",
"get_permissions_for_single_role",
"get_role_assignments",
"get_role_definitions_in_scope",
"get_scopes_for_subject_and_permission",
"get_subject_role_assignments",
"get_subject_role_assignments_for_role_in_scope",
"get_subject_role_assignments_in_scope",
"unassign_role_from_subject_in_scope",
"unassign_subject_from_all_roles",
]

Expand Down Expand Up @@ -293,6 +294,92 @@ def get_subject_role_assignments(subject: SubjectData) -> list[RoleAssignmentDat
return role_assignments


def _get_field_index_and_values(
subject: SubjectData | None,
role: RoleData | None,
scope: ScopeData | None,
) -> tuple[int, list[str]]:
"""Build field index and values for Casbin's get_filtered_grouping_policy.

Returns the leftmost non-None field as field_index and a list of consecutive
values starting from that index. Empty strings serve as wildcards for positions
between specified values.

Args:
subject: Optional subject to filter by.
role: Optional role to filter by.
scope: Optional scope to filter by.

Returns:
tuple: (field_index, field_values) where field_index is the starting position
and field_values are the consecutive filter values from that position.

Examples:
>>> _get_field_index_and_values(user, None, None)
(0, ['user^steve'])
>>> _get_field_index_and_values(user, role, None)
(0, ['user^steve', 'role^course_admin'])
>>> _get_field_index_and_values(None, role, scope)
(1, ['role^course_admin', 'course-v1^course-v1:OpenedX+Demo+Course'])
>>> _get_field_index_and_values(user, None, scope)
(0, ['user^steve', '', 'course-v1^course-v1:OpenedX+Demo+Course'])
>>> _get_field_index_and_values(None, None, scope)
(2, ['course-v1^course-v1:OpenedX+Demo+Course'])
"""
fields = [subject, role, scope]
field_index = 0

for index, field in enumerate(fields):
if field is not None:
field_index = index
break

values = [field.namespaced_key if field else "" for field in fields]

# Take slice from first defined field
field_values = values[field_index:]

# Remove trailing wildcards
while field_values and field_values[-1] == "":
field_values.pop()

return field_index, field_values


def get_role_assignments(
*,
subject: SubjectData | None = None,
role: RoleData | None = None,
scope: ScopeData | None = None,
) -> list[RoleAssignmentData]:
"""Get all the roles for a subject across all scopes filtered by the given filters.

Args:
subject: Optional SubjectData object to filter by.
role: Optional RoleData object to filter by.
scope: Optional ScopeData object to filter by.

Returns:
list[RoleAssignmentData]: A list of RoleAssignmentData objects filtered by the given filters.
"""
enforcer = AuthzEnforcer.get_enforcer()
role_assignments = []
field_index, field_values = _get_field_index_and_values(subject, role, scope)
policies = enforcer.get_filtered_grouping_policy(field_index, *field_values)

for policy in policies:
role = RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value])
role.permissions = get_permissions_for_single_role(role)
role_assignments.append(
RoleAssignmentData(
subject=SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]),
roles=[role],
scope=ScopeData(namespaced_key=policy[GroupingPolicyIndex.SCOPE.value]),
)
)
return role_assignments


def get_subject_role_assignments_in_scope(subject: SubjectData, scope: ScopeData) -> list[RoleAssignmentData]:
"""Get the roles for a subject in a specific scope.

Expand Down
38 changes: 37 additions & 1 deletion openedx_authz/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@
(e.g., 'user^john_doe').
"""

from openedx_authz.api.data import ActionData, PermissionData, RoleAssignmentData, RoleData, ScopeData, UserData
from openedx_authz.api.data import (
ActionData,
PermissionData,
RoleAssignmentData,
RoleData,
ScopeData,
UserData,
)
from openedx_authz.api.permissions import is_subject_allowed
from openedx_authz.api.roles import (
assign_role_to_subject_in_scope,
batch_assign_role_to_subjects_in_scope,
batch_unassign_role_from_subjects_in_scope,
get_all_subject_role_assignments_in_scope,
get_role_assignments,
get_scopes_for_subject_and_permission,
get_subject_role_assignments,
get_subject_role_assignments_for_role_in_scope,
Expand All @@ -33,6 +41,7 @@
"get_user_role_assignments",
"get_user_role_assignments_in_scope",
"get_user_role_assignments_for_role_in_scope",
"get_user_role_assignments_filtered",
"get_all_user_role_assignments_in_scope",
"is_user_allowed",
"get_scopes_for_user_and_permission",
Expand Down Expand Up @@ -155,6 +164,33 @@ def get_user_role_assignments_for_role_in_scope(
)


def get_user_role_assignments_filtered(
*,
user_external_key: str | None = None,
role_external_key: str | None = None,
scope_external_key: str | None = None,
) -> list[RoleAssignmentData]:
"""Get role assignments filtered by user, role, and/or scope.

This function provides flexible filtering of role assignments by any combination
of user, role, and scope. At least one filter parameter should be provided for
meaningful results.

Args:
user_external_key: Optional user ID to filter by (e.g., 'john_doe').
role_external_key: Optional role name to filter by (e.g., 'library_admin').
scope_external_key: Optional scope to filter by (e.g., 'lib:DemoX:CSPROB').

Returns:
list[RoleAssignmentData]: Filtered role assignments.
"""
return get_role_assignments(
subject=UserData(external_key=user_external_key) if user_external_key else None,
role=RoleData(external_key=role_external_key) if role_external_key else None,
scope=ScopeData(external_key=scope_external_key) if scope_external_key else None,
)


def get_all_user_role_assignments_in_scope(
scope_external_key: str,
) -> list[RoleAssignmentData]:
Expand Down
22 changes: 22 additions & 0 deletions openedx_authz/tests/api/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,28 @@ def test_scope_content_lib_data_namespace(self, external_key):

self.assertEqual(scope.namespaced_key, expected)

@data(
("lib:DemoX:CSPROB", "DemoX"),
("lib:Org1:math_101", "Org1"),
)
@unpack
def test_content_library_data_org_property(self, external_key, expected_org):
"""Test that ContentLibraryData returns the correct organization name."""
scope = ContentLibraryData(external_key=external_key)

self.assertEqual(scope.org, expected_org)

@data(
("course-v1:DemoX+TestCourse+2024_T1", "DemoX"),
("course-v1:WGU+CS002+2025_T1", "WGU"),
)
@unpack
def test_course_overview_data_org_property(self, external_key, expected_org):
"""Test that CourseOverviewData returns the correct organization name."""
scope = CourseOverviewData(external_key=external_key)

self.assertEqual(scope.org, expected_org)


@ddt
class TestPolymorphicData(TestCase):
Expand Down
Loading
Loading