Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 156 additions & 55 deletions openedx_authz/api/data.py

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions openedx_authz/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,24 @@ def _filter_allowed_assignments(
) -> list[RoleAssignmentData]:
"""
Filter the given role assignments to only include those that the user has permission to view.

Uses OR logic: if the user has any one of the scope's admin view permissions, the
assignment is included.
"""
if not user_external_key:
# If no user is specified, return all assignments
return assignments
allowed_assignments: list[RoleAssignmentData] = []
for assignment in assignments:
permission = None

# Get the permission needed to view the specific scope in the admin console
permission = assignment.scope.get_admin_view_permission().identifier
view_permissions = assignment.scope.get_admin_view_permissions()

if permission and is_user_allowed(
user_external_key=user_external_key,
action_external_key=permission,
scope_external_key=assignment.scope.external_key,
if view_permissions and any(
is_user_allowed(
user_external_key=user_external_key,
action_external_key=perm.identifier,
scope_external_key=assignment.scope.external_key,
)
for perm in view_permissions
):
allowed_assignments.append(assignment)

Expand Down
2 changes: 2 additions & 0 deletions openedx_authz/engine/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from openedx_authz.api.data import (
ContentLibraryData,
CourseOverviewData,
GlobalWildcardScopeData,
OrgContentLibraryGlobData,
OrgCourseOverviewGlobData,
ScopeData,
Expand All @@ -21,6 +22,7 @@
(CourseOverviewData.NAMESPACE, CourseOverviewData),
(OrgContentLibraryGlobData.NAMESPACE, OrgContentLibraryGlobData),
(OrgCourseOverviewGlobData.NAMESPACE, OrgCourseOverviewGlobData),
(GlobalWildcardScopeData.NAMESPACE, GlobalWildcardScopeData),
}


Expand Down
22 changes: 22 additions & 0 deletions openedx_authz/rest_api/v1/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,28 @@ def has_permission(self, request, view) -> bool:
return any(api.get_scopes_for_user_and_permission(request.user.username, permission) for permission in required)


class GlobalScopePermission(BaseScopePermission):
"""Permission handler for the global wildcard scope.

Only superadmins (``is_superuser`` or ``is_staff``) are allowed to assign roles to the
global scope (``*``). Staff members without superuser status are denied.

This class is automatically selected by ``DynamicScopePermission`` when
the request scope resolves to the ``global`` namespace.
"""

NAMESPACE: ClassVar[str] = "global"
"""``global`` for global wildcard scopes."""

def has_permission(self, request, view) -> bool:
"""Allow only superusers to operate on the global scope.

Returns:
bool: True if the user is a superadmin, False otherwise.
"""
return request.user.is_superuser or request.user.is_staff


class ContentLibraryPermission(MethodPermissionMixin, BaseScopePermission):
"""Permission handler for content library scopes.

Expand Down
17 changes: 10 additions & 7 deletions openedx_authz/rest_api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rest_framework import serializers

from openedx_authz import api
from openedx_authz.api.data import UserAssignments
from openedx_authz.api.data import GLOBAL_SCOPE_WILDCARD, UserAssignments
from openedx_authz.rest_api.data import (
AssignmentSortField,
ScopesTypeField,
Expand Down Expand Up @@ -95,6 +95,10 @@ def _validate_scope_and_role(self, scope_value: str, role_value: str) -> None:
if not scope.exists():
raise serializers.ValidationError({"scope": f"Scope '{scope_value}' does not exist"})

# Special case for global wildcard
if scope_value == GLOBAL_SCOPE_WILDCARD:
return

role = api.RoleData(external_key=role_value)
generic_scope = get_generic_scope(scope)
role_definitions = api.get_role_definitions_in_scope(generic_scope)
Expand Down Expand Up @@ -160,15 +164,11 @@ def validate(self, attrs) -> dict:
role_value = validated_data["role"]

if scope and scopes is not None:
raise serializers.ValidationError(
"Provide either 'scope' or 'scopes', not both."
)
raise serializers.ValidationError("Provide either 'scope' or 'scopes', not both.")

scopes_list = scopes if scopes is not None else ([scope] if scope else None)
if not scopes_list:
raise serializers.ValidationError(
"Either 'scope' or 'scopes' must be provided."
)
raise serializers.ValidationError("Either 'scope' or 'scopes' must be provided.")

for scope_value in scopes_list:
self._validate_scope_and_role(scope_value, role_value)
Expand Down Expand Up @@ -401,6 +401,9 @@ def get_org(self, obj: api.RoleAssignmentData | api.SuperAdminAssignmentData) ->
case api.SuperAdminAssignmentData():
return "*"
case api.RoleAssignmentData():
if obj.scope.external_key == GLOBAL_SCOPE_WILDCARD:
# Special case for global wildcard scope
return "*"
return getattr(obj.scope, "org", "")

def get_scope(self, obj: api.RoleAssignmentData | api.SuperAdminAssignmentData) -> str:
Expand Down
33 changes: 22 additions & 11 deletions openedx_authz/rest_api/v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ def _get_allowed_scope_queryset(
username: str,
scope_cls: type,
glob_cls: type,
get_permission: callable,
get_permissions: callable,
queryset_builder: callable,
extract_ids: callable,
search: str = "",
Expand All @@ -790,15 +790,15 @@ def _get_allowed_scope_queryset(
"""Resolve allowed scopes from Casbin and return a filtered queryset.

This helper encapsulates the shared pattern of:
1. Fetching allowed scopes for a user and permission.
1. Fetching allowed scopes for a user across any of the scope's permissions (OR logic).
2. Partitioning them into specific IDs vs org-level globs.
3. Delegating to the appropriate queryset builder.

Args:
username: The username to check permissions for.
scope_cls: The concrete scope data class (e.g., CourseOverviewData).
glob_cls: The org-level glob class (e.g., OrgCourseOverviewGlobData).
get_permission: Callable that returns the permission for a scope class.
get_permissions: Callable that returns a list of permissions for a scope class.
queryset_builder: Callable that builds the filtered queryset (e.g., _get_courses_queryset).
extract_ids: Callable that extracts specific IDs from non-glob scopes.
search: Optional search term to filter by display name.
Expand All @@ -807,10 +807,19 @@ def _get_allowed_scope_queryset(
Returns:
QuerySet: The filtered queryset projected to the unified scope shape.
"""
allowed_scopes = get_scopes_for_user_and_permission(username, get_permission(scope_cls).identifier)
specific_scopes = [s for s in allowed_scopes if not isinstance(s, glob_cls)]
# Collect allowed scopes across all permissions (OR logic)
all_allowed_scopes = []
seen = set()
for perm in get_permissions(scope_cls):
for scope in get_scopes_for_user_and_permission(username, perm.identifier):
key = scope.namespaced_key
if key not in seen:
seen.add(key)
all_allowed_scopes.append(scope)

specific_scopes = [s for s in all_allowed_scopes if not isinstance(s, glob_cls)]
allowed_ids = extract_ids(specific_scopes)
allowed_orgs = {s.org for s in allowed_scopes if isinstance(s, glob_cls)}
allowed_orgs = {s.org for s in all_allowed_scopes if isinstance(s, glob_cls)}
return queryset_builder(allowed_ids, allowed_orgs, search=search, org=org)

def _build_queryset(self, courses_qs: QuerySet | None, libraries_qs: QuerySet | None) -> QuerySet:
Expand Down Expand Up @@ -852,9 +861,11 @@ def get_queryset(self) -> QuerySet:

management_only = params_serializer.validated_data["management_permission_only"]

# Determine which permission to check based on the query parameter.
def get_permission(scope_cls):
return scope_cls.get_admin_manage_permission() if management_only else scope_cls.get_admin_view_permission()
# Determine which permissions to check based on the query parameter.
def get_permissions(scope_cls):
return (
scope_cls.get_admin_manage_permissions() if management_only else scope_cls.get_admin_view_permissions()
)

# Resolve allowed scopes from Casbin and build filtered querysets.
courses_qs = None
Expand All @@ -863,7 +874,7 @@ def get_permission(scope_cls):
username=user.username,
scope_cls=CourseOverviewData,
glob_cls=OrgCourseOverviewGlobData,
get_permission=get_permission,
get_permissions=get_permissions,
queryset_builder=self._get_courses_queryset,
extract_ids=lambda scopes: {s.external_key for s in scopes},
search=search,
Expand All @@ -876,7 +887,7 @@ def get_permission(scope_cls):
username=user.username,
scope_cls=ContentLibraryData,
glob_cls=OrgContentLibraryGlobData,
get_permission=get_permission,
get_permissions=get_permissions,
queryset_builder=self._get_libraries_queryset,
extract_ids=lambda scopes: {
(s.external_key.split(":")[1], s.external_key.split(":")[2]) for s in scopes
Expand Down
30 changes: 19 additions & 11 deletions openedx_authz/tests/api/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CCXCourseOverviewData,
ContentLibraryData,
CourseOverviewData,
GlobalWildcardScopeData,
OrgContentLibraryGlobData,
OrgCourseOverviewGlobData,
PermissionData,
Expand Down Expand Up @@ -249,7 +250,8 @@ def test_scope_data_registration(self):
"""Test that ScopeData and its subclasses are registered correctly.

Expected Result:
- 'global' namespace maps to ScopeData class
- 'global' namespace maps to ScopeData class in scope_registry
- 'global' namespace maps to GlobalWildcardScopeData in glob_registry
- 'lib' namespace maps to ContentLibraryData class
"""
self.assertIn("global", ScopeData.scope_registry)
Expand All @@ -261,7 +263,9 @@ def test_scope_data_registration(self):
self.assertIn("ccx-v1", ScopeData.scope_registry)
self.assertIs(ScopeData.scope_registry["ccx-v1"], CCXCourseOverviewData)

# Glob registries for organization-level scopes
# Glob registries for organization-level scopes and global wildcard
self.assertIn("global", ScopeMeta.glob_registry)
self.assertIs(ScopeMeta.glob_registry["global"], GlobalWildcardScopeData)
self.assertIn("lib", ScopeMeta.glob_registry)
self.assertIs(ScopeMeta.glob_registry["lib"], OrgContentLibraryGlobData)
self.assertIn("course-v1", ScopeMeta.glob_registry)
Expand Down Expand Up @@ -320,6 +324,7 @@ def test_get_subclass_by_namespaced_key(self, namespaced_key, expected_class):
("course-v1:OpenedX+*", OrgCourseOverviewGlobData),
("lib:edX:Demo", ContentLibraryData),
("global:generic_scope", ScopeData),
("*", GlobalWildcardScopeData),
)
@unpack
def test_get_subclass_by_external_key(self, external_key, expected_class):
Expand Down Expand Up @@ -405,7 +410,7 @@ def exists(self) -> bool:
return False

@classmethod
def get_admin_view_permission(cls):
def get_admin_view_permissions(cls):
raise NotImplementedError("Not implemented for TempScope")

# Metaclass should have recreated the registries on the class
Expand Down Expand Up @@ -464,25 +469,28 @@ def test_empty_external_key_raises_value_error(self):
SubjectData(external_key="")

def test_scope_data_with_wildcard_external_key(self):
"""Test that ScopeData instantiated with wildcard (*) returns base ScopeData.
"""Test that ScopeData instantiated with wildcard (*) returns GlobalWildcardScopeData.

When using the global scope wildcard '*', the metaclass should return a base
ScopeData instance rather than attempting subclass determination.
When using the global scope wildcard '*', the metaclass should return a
GlobalWildcardScopeData instance rather than attempting subclass determination
from the external_key format.

Expected Result:
- ScopeData(external_key='*') creates base ScopeData instance
- ScopeData(external_key='*') creates GlobalWildcardScopeData instance
- namespaced_key is 'global^*'
- No subclass determination occurs
- exists() returns True
- get_object() returns None
"""
scope = ScopeData(external_key="*")

expected_namespaced = f"{ScopeData.NAMESPACE}{ScopeData.SEPARATOR}*"
expected_namespaced = f"{GlobalWildcardScopeData.NAMESPACE}{GlobalWildcardScopeData.SEPARATOR}*"

self.assertIsInstance(scope, ScopeData)
# Ensure it's exactly ScopeData, not a subclass
self.assertEqual(type(scope), ScopeData)
self.assertIsInstance(scope, GlobalWildcardScopeData)
self.assertEqual(scope.external_key, "*")
self.assertEqual(scope.namespaced_key, expected_namespaced)
self.assertTrue(scope.exists())
self.assertIsNone(scope.get_object())


@ddt
Expand Down
Loading