From d564d04d6da75a9acbe285ea9bb1340bc5dbb5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20M=C3=A9ndez?= Date: Tue, 21 Apr 2026 15:28:46 -0600 Subject: [PATCH 1/3] feat: Add support for global wildcard scope assignation --- openedx_authz/api/data.py | 105 ++++++++++- openedx_authz/engine/matcher.py | 2 + openedx_authz/rest_api/v1/permissions.py | 22 +++ openedx_authz/rest_api/v1/serializers.py | 17 +- openedx_authz/tests/api/test_data.py | 36 ++-- openedx_authz/tests/api/test_roles.py | 2 +- openedx_authz/tests/rest_api/test_views.py | 207 +++++++++++++++++++++ 7 files changed, 364 insertions(+), 27 deletions(-) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index 042c2e32..f0173616 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -32,6 +32,7 @@ "AuthzBaseClass", "ContentLibraryData", "CourseOverviewData", + "GlobalWildcardScopeData", "GroupingPolicyIndex", "OrgCourseOverviewGlobData", "OrgGlobData", @@ -154,11 +155,11 @@ def __call__(cls, *args, **kwargs): # When working with global scopes, we can't determine subclass with an external_key since # a global scope it's not attached to a specific resource type. So we only use * as - # an external_key to mean generic scope which maps to base ScopeData class. + # an external_key to mean the global wildcard scope which maps to GlobalWildcardScopeData. # The only remaining issue is that internally the namespace key used in policies will be # The global scope namespace (global^*), so we need to handle that case here. if kwargs.get("external_key") == GLOBAL_SCOPE_WILDCARD: - return super().__call__(*args, **kwargs) + return super(ScopeMeta, GlobalWildcardScopeData).__call__(*args, **kwargs) if "namespaced_key" in kwargs: scope_cls = cls.get_subclass_by_namespaced_key(kwargs["namespaced_key"]) @@ -249,6 +250,11 @@ def get_subclass_by_external_key(mcs, external_key: str) -> Type["ScopeData"]: - This won't work for org scopes that don't have explicit namespace prefixes. TODO: Handle org scopes differently. """ + + # Special case: handle the global wildcard scope: + if external_key == "*": + return mcs.glob_registry.get("global") + if EXTERNAL_KEY_SEPARATOR not in external_key: raise ValueError(f"Invalid external_key format: {external_key}") @@ -288,7 +294,7 @@ def get_all_namespaces(mcs) -> dict[str, Type["ScopeData"]]: Examples: >>> ScopeMeta.get_all_namespaces() - {'global': ScopeData, 'lib': ContentLibraryData, 'org': OrganizationData} + {'global': GlobalWildcardScopeData, 'lib': ContentLibraryData, 'org': OrganizationData} """ return mcs.scope_registry @@ -326,8 +332,7 @@ class ScopeData(AuthZData, metaclass=ScopeMeta): # The 'global' namespace is used for scopes that aren't tied to a specific resource type. # This base class supports: - # 1. Global wildcard scopes (external_key='*') that apply across all resource types - # 2. Custom global scopes that don't map to specific domain objects (e.g., 'global:some_scope') + # - Custom scopes that don't map to specific domain objects (e.g., 'global:some_scope') # Subclasses like ContentLibraryData ('lib') represent concrete resource types with their own namespaces. NAMESPACE: ClassVar[str] = "global" IS_GLOB: ClassVar[bool] = False @@ -400,6 +405,93 @@ def exists(self) -> bool: raise NotImplementedError("Subclasses must implement exists method.") +@define +class GlobalWildcardScopeData(ScopeData): + """The global wildcard scope representing access across all scopes. + + This scope is used when a role assignment should apply globally (i.e., not + tied to any specific resource). It corresponds to the ``global^*`` namespaced + key in Casbin policies. + + The global wildcard scope always exists and does not map to a concrete domain + object. It is automatically instantiated by the ``ScopeMeta`` metaclass when + ``ScopeData(external_key='*')`` is called. + + Attributes: + NAMESPACE (str): 'global'. + external_key (str): Always ``'*'``. + namespaced_key (str): Always ``'global^*'``. + + Examples: + >>> scope = ScopeData(external_key='*') + >>> isinstance(scope, GlobalWildcardScopeData) + True + >>> scope.exists() + True + >>> scope.namespaced_key + 'global^*' + """ + + # The 'global' namespace is used for scopes that aren't tied to a specific resource type. + # This class supports Global wildcard scopes (external_key='*') that apply across all resource types + NAMESPACE: ClassVar[str] = "global" + IS_GLOB: ClassVar[bool] = True + + @classmethod + def validate_external_key(cls, external_key: str) -> bool: + """Validate the external_key format for GlobalWildcardScopeData. + + Only the wildcard ``'*'`` is accepted. + + Args: + external_key: The external key to validate. + + Returns: + bool: True if the key is ``'*'``, False otherwise. + """ + return external_key == GLOBAL_SCOPE_WILDCARD + + @classmethod + def get_admin_view_permission(cls) -> PermissionData: + """No admin view permission for the global scope. + + Global scope assignments are managed exclusively by superadmins + at the REST API layer, so no granular permission is needed. + + Returns: + None + """ + return VIEW_LIBRARY_TEAM + + @classmethod + def get_admin_manage_permission(cls) -> PermissionData: + """No admin manage permission for the global scope. + + Global scope assignments are managed exclusively by superadmins + at the REST API layer, so no granular permission is needed. + + Returns: + None + """ + return None + + def get_object(self) -> None: + """The global wildcard scope does not map to a concrete domain object. + + Returns: + None: Always returns None. + """ + return None + + def exists(self) -> bool: + """The global wildcard scope always exists. + + Returns: + bool: Always True. + """ + return True + + @define class ContentLibraryData(ScopeData): """A content library scope for authorization in the Open edX platform. @@ -690,6 +782,9 @@ class OrgGlobData(ScopeData): - ``course-v1:DemoX+*`` (all courses in org ``DemoX``) """ + # This NAMESPACE should be overriden by specific scope subclasses, + # Setting this here so it doesn't conflict with GlobalWildcardScopeData's 'global' namespace + NAMESPACE: ClassVar[str] = "orgglob" IS_GLOB: ClassVar[bool] = True ID_SEPARATOR: ClassVar[str] ORG_NAME_VALID_PATTERN: ClassVar[re.Pattern] = r"^[a-zA-Z0-9._-]*$" diff --git a/openedx_authz/engine/matcher.py b/openedx_authz/engine/matcher.py index b1cde5bb..2fcfe5c5 100644 --- a/openedx_authz/engine/matcher.py +++ b/openedx_authz/engine/matcher.py @@ -6,6 +6,7 @@ from openedx_authz.api.data import ( ContentLibraryData, CourseOverviewData, + GlobalWildcardScopeData, OrgContentLibraryGlobData, OrgCourseOverviewGlobData, ScopeData, @@ -21,6 +22,7 @@ (CourseOverviewData.NAMESPACE, CourseOverviewData), (OrgContentLibraryGlobData.NAMESPACE, OrgContentLibraryGlobData), (OrgCourseOverviewGlobData.NAMESPACE, OrgCourseOverviewGlobData), + (GlobalWildcardScopeData.NAMESPACE, GlobalWildcardScopeData), } diff --git a/openedx_authz/rest_api/v1/permissions.py b/openedx_authz/rest_api/v1/permissions.py index 8d5c18f7..cdcd497e 100644 --- a/openedx_authz/rest_api/v1/permissions.py +++ b/openedx_authz/rest_api/v1/permissions.py @@ -282,6 +282,28 @@ def has_permission(self, request, view) -> bool: return any(api.get_scopes_for_user_and_permission(request.user.username, permission) for permission in required) +class GlobalScopePermission(BaseScopePermission): + """Permission handler for the global wildcard scope. + + Only superadmins (``is_superuser`` or ``is_staff``) are allowed to assign roles to the + global scope (``*``). Staff members without superuser status are denied. + + This class is automatically selected by ``DynamicScopePermission`` when + the request scope resolves to the ``global`` namespace. + """ + + NAMESPACE: ClassVar[str] = "global" + """``global`` for global wildcard scopes.""" + + def has_permission(self, request, view) -> bool: + """Allow only superusers to operate on the global scope. + + Returns: + bool: True if the user is a superadmin, False otherwise. + """ + return request.user.is_superuser or request.user.is_staff + + class ContentLibraryPermission(MethodPermissionMixin, BaseScopePermission): """Permission handler for content library scopes. diff --git a/openedx_authz/rest_api/v1/serializers.py b/openedx_authz/rest_api/v1/serializers.py index fd85064e..44541c6c 100644 --- a/openedx_authz/rest_api/v1/serializers.py +++ b/openedx_authz/rest_api/v1/serializers.py @@ -6,7 +6,7 @@ from rest_framework import serializers from openedx_authz import api -from openedx_authz.api.data import UserAssignments +from openedx_authz.api.data import GLOBAL_SCOPE_WILDCARD, UserAssignments from openedx_authz.rest_api.data import ( AssignmentSortField, ScopesTypeField, @@ -95,6 +95,10 @@ def _validate_scope_and_role(self, scope_value: str, role_value: str) -> None: if not scope.exists(): raise serializers.ValidationError({"scope": f"Scope '{scope_value}' does not exist"}) + # Special case for global wildcard + if scope_value == GLOBAL_SCOPE_WILDCARD: + return + role = api.RoleData(external_key=role_value) generic_scope = get_generic_scope(scope) role_definitions = api.get_role_definitions_in_scope(generic_scope) @@ -160,15 +164,11 @@ def validate(self, attrs) -> dict: role_value = validated_data["role"] if scope and scopes is not None: - raise serializers.ValidationError( - "Provide either 'scope' or 'scopes', not both." - ) + raise serializers.ValidationError("Provide either 'scope' or 'scopes', not both.") scopes_list = scopes if scopes is not None else ([scope] if scope else None) if not scopes_list: - raise serializers.ValidationError( - "Either 'scope' or 'scopes' must be provided." - ) + raise serializers.ValidationError("Either 'scope' or 'scopes' must be provided.") for scope_value in scopes_list: self._validate_scope_and_role(scope_value, role_value) @@ -401,6 +401,9 @@ def get_org(self, obj: api.RoleAssignmentData | api.SuperAdminAssignmentData) -> case api.SuperAdminAssignmentData(): return "*" case api.RoleAssignmentData(): + if obj.scope.external_key == GLOBAL_SCOPE_WILDCARD: + # Special case for global wildcard scope + return "*" return getattr(obj.scope, "org", "") def get_scope(self, obj: api.RoleAssignmentData | api.SuperAdminAssignmentData) -> str: diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index 81d312fd..f8cde650 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -11,6 +11,7 @@ CCXCourseOverviewData, ContentLibraryData, CourseOverviewData, + GlobalWildcardScopeData, OrgContentLibraryGlobData, OrgCourseOverviewGlobData, PermissionData, @@ -249,7 +250,8 @@ def test_scope_data_registration(self): """Test that ScopeData and its subclasses are registered correctly. Expected Result: - - 'global' namespace maps to ScopeData class + - 'global' namespace maps to ScopeData class in scope_registry + - 'global' namespace maps to GlobalWildcardScopeData in glob_registry - 'lib' namespace maps to ContentLibraryData class """ self.assertIn("global", ScopeData.scope_registry) @@ -261,7 +263,9 @@ def test_scope_data_registration(self): self.assertIn("ccx-v1", ScopeData.scope_registry) self.assertIs(ScopeData.scope_registry["ccx-v1"], CCXCourseOverviewData) - # Glob registries for organization-level scopes + # Glob registries for organization-level scopes and global wildcard + self.assertIn("global", ScopeMeta.glob_registry) + self.assertIs(ScopeMeta.glob_registry["global"], GlobalWildcardScopeData) self.assertIn("lib", ScopeMeta.glob_registry) self.assertIs(ScopeMeta.glob_registry["lib"], OrgContentLibraryGlobData) self.assertIn("course-v1", ScopeMeta.glob_registry) @@ -320,6 +324,7 @@ def test_get_subclass_by_namespaced_key(self, namespaced_key, expected_class): ("course-v1:OpenedX+*", OrgCourseOverviewGlobData), ("lib:edX:Demo", ContentLibraryData), ("global:generic_scope", ScopeData), + ("*", GlobalWildcardScopeData), ) @unpack def test_get_subclass_by_external_key(self, external_key, expected_class): @@ -361,11 +366,11 @@ def test_scope_validate_external_key(self, external_key, expected_valid, expecte self.assertEqual(result, expected_valid) @data( - "unknown:DemoX", - "unknown:DemoX:*", + "undefined:DemoX", + "undefined:DemoX:*", ) - def test_get_subclass_by_external_key_unknown_scope_raises_value_error(self, external_key): - """Unknown namespace should raise ValueError, including wildcard keys.""" + def test_get_subclass_by_external_key_undefined_scope_raises_value_error(self, external_key): + """Undefined namespace should raise ValueError, including wildcard keys.""" with self.assertRaises(ValueError): ScopeMeta.get_subclass_by_external_key(external_key) @@ -464,25 +469,28 @@ def test_empty_external_key_raises_value_error(self): SubjectData(external_key="") def test_scope_data_with_wildcard_external_key(self): - """Test that ScopeData instantiated with wildcard (*) returns base ScopeData. + """Test that ScopeData instantiated with wildcard (*) returns GlobalWildcardScopeData. - When using the global scope wildcard '*', the metaclass should return a base - ScopeData instance rather than attempting subclass determination. + When using the global scope wildcard '*', the metaclass should return a + GlobalWildcardScopeData instance rather than attempting subclass determination + from the external_key format. Expected Result: - - ScopeData(external_key='*') creates base ScopeData instance + - ScopeData(external_key='*') creates GlobalWildcardScopeData instance - namespaced_key is 'global^*' - - No subclass determination occurs + - exists() returns True + - get_object() returns None """ scope = ScopeData(external_key="*") - expected_namespaced = f"{ScopeData.NAMESPACE}{ScopeData.SEPARATOR}*" + expected_namespaced = f"{GlobalWildcardScopeData.NAMESPACE}{GlobalWildcardScopeData.SEPARATOR}*" self.assertIsInstance(scope, ScopeData) - # Ensure it's exactly ScopeData, not a subclass - self.assertEqual(type(scope), ScopeData) + self.assertIsInstance(scope, GlobalWildcardScopeData) self.assertEqual(scope.external_key, "*") self.assertEqual(scope.namespaced_key, expected_namespaced) + self.assertTrue(scope.exists()) + self.assertIsNone(scope.get_object()) @ddt diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index ecdb80e2..3b33ab38 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -922,7 +922,7 @@ class TestRoleAssignmentAPI(RolesTestSetupMixin): """ @ddt_data( - (["mary", "john"], roles.LIBRARY_USER.external_key, "global:batch_test", True), + (["mary", "john"], roles.LIBRARY_USER.external_key, "*", True), ( ["paul", "diana", "lila"], roles.LIBRARY_CONTRIBUTOR.external_key, diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 5ad5055a..30b0be92 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -937,6 +937,213 @@ def test_put_accepts_valid_full_course_key_scope(self, _mock_exists, _mock_assig self.assertEqual(len(response.data["completed"]), 1) +@ddt +class TestRoleUserAPIViewGlobalWildcardScope(ViewTestMixin): + """Test suite for global wildcard scope (*) permission enforcement on role assignment endpoints. + + Verifies that only superusers and staff can assign/unassign roles to the + global wildcard scope via the REST API. Regular authenticated users must + receive 403 Forbidden. + """ + + def setUp(self): + super().setUp() + self.url = reverse("openedx_authz:role-user-list") + + # -- helpers ---------------------------------------------------------- + + def _create_staff_only_user(self): + """Return a user with is_staff=True but is_superuser=False.""" + user, _ = User.objects.get_or_create( + username="staff_only", + defaults={"email": "staff_only@example.com", "is_staff": True, "is_superuser": False}, + ) + return user + + def _create_superuser(self): + """Return a user with is_superuser=True and is_staff=True.""" + user, _ = User.objects.get_or_create( + username="superadmin", + defaults={"email": "superadmin@example.com", "is_staff": True, "is_superuser": True}, + ) + return user + + def _put_global_scope(self, user): + """Issue a PUT to assign a role in the global wildcard scope.""" + self.client.force_authenticate(user=user) + return self.client.put( + self.url, + data={ + "role": roles.LIBRARY_ADMIN.external_key, + "scope": "*", + "users": ["regular_1"], + }, + format="json", + ) + + def _delete_global_scope(self, user): + """Issue a DELETE to unassign a role from the global wildcard scope.""" + self.client.force_authenticate(user=user) + query_params = { + "role": roles.LIBRARY_ADMIN.external_key, + "scope": "*", + "users": "regular_1", + } + return self.client.delete(f"{self.url}?{urlencode(query_params)}") + + # -- PUT tests -------------------------------------------------------- + + def test_put_global_scope_allowed_for_superuser(self): + """Superusers can assign roles to the global wildcard scope.""" + user = self._create_superuser() + with ( + patch.object(api, "assign_role_to_user_in_scope", return_value=True), + patch.object( + api, + "get_role_definitions_in_scope", + return_value=[api.RoleData(external_key=roles.LIBRARY_ADMIN.external_key)], + ), + ): + response = self._put_global_scope(user) + + self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) + + def test_put_global_scope_allowed_for_staff(self): + """Staff users can assign roles to the global wildcard scope. + + DynamicScopePermission grants access to is_staff before dispatching + to the scope-specific permission class. + """ + user = self._create_staff_only_user() + with ( + patch.object(api, "assign_role_to_user_in_scope", return_value=True), + patch.object( + api, + "get_role_definitions_in_scope", + return_value=[api.RoleData(external_key=roles.LIBRARY_ADMIN.external_key)], + ), + ): + response = self._put_global_scope(user) + + self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) + + def test_put_global_scope_denied_for_regular_user(self): + """Regular users without staff/superuser status are denied.""" + self.client.force_authenticate(user=self.regular_user) + response = self.client.put( + self.url, + data={ + "role": roles.LIBRARY_ADMIN.external_key, + "scope": "*", + "users": ["regular_2"], + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_put_global_scope_denied_for_unauthenticated(self): + """Unauthenticated requests are denied.""" + self.client.force_authenticate(user=None) + response = self.client.put( + self.url, + data={ + "role": roles.LIBRARY_ADMIN.external_key, + "scope": "*", + "users": ["regular_1"], + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # -- DELETE tests ----------------------------------------------------- + + def test_delete_global_scope_allowed_for_superuser(self): + """Superusers can unassign roles from the global wildcard scope.""" + user = self._create_superuser() + with ( + patch.object(api, "unassign_role_from_user", return_value=True), + patch.object( + api, + "get_role_definitions_in_scope", + return_value=[api.RoleData(external_key=roles.LIBRARY_ADMIN.external_key)], + ), + ): + response = self._delete_global_scope(user) + + self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) + + def test_delete_global_scope_allowed_for_staff(self): + """Staff users can unassign roles from the global wildcard scope.""" + user = self._create_staff_only_user() + with ( + patch.object(api, "unassign_role_from_user", return_value=True), + patch.object( + api, + "get_role_definitions_in_scope", + return_value=[api.RoleData(external_key=roles.LIBRARY_ADMIN.external_key)], + ), + ): + response = self._delete_global_scope(user) + + self.assertEqual(response.status_code, status.HTTP_207_MULTI_STATUS) + + def test_delete_global_scope_denied_for_regular_user(self): + """Regular users without staff/superuser status are denied.""" + self.client.force_authenticate(user=self.regular_user) + query_params = { + "role": roles.LIBRARY_ADMIN.external_key, + "scope": "*", + "users": "regular_2", + } + + response = self.client.delete(f"{self.url}?{urlencode(query_params)}") + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_delete_global_scope_denied_for_unauthenticated(self): + """Unauthenticated requests are denied.""" + self.client.force_authenticate(user=None) + query_params = { + "role": roles.LIBRARY_ADMIN.external_key, + "scope": "*", + "users": "regular_1", + } + + response = self.client.delete(f"{self.url}?{urlencode(query_params)}") + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # -- GET tests -------------------------------------------------------- + + def test_get_global_scope_denied_for_regular_user(self): + """Regular users cannot list role assignments for the global scope.""" + self.client.force_authenticate(user=self.regular_user) + + response = self.client.get(self.url, {"scope": "*"}) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_get_global_scope_allowed_for_superuser(self): + """Superusers can list role assignments for the global scope.""" + user = self._create_superuser() + self.client.force_authenticate(user=user) + + response = self.client.get(self.url, {"scope": "*"}) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_get_global_scope_allowed_for_staff(self): + """Staff users can list role assignments for the global scope.""" + user = self._create_staff_only_user() + self.client.force_authenticate(user=user) + + response = self.client.get(self.url, {"scope": "*"}) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @ddt class TestScopesAPIView(ViewTestMixin): """ From 7583cedc9babb0d529f0f019725b07f1b2f9caaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20M=C3=A9ndez?= Date: Tue, 21 Apr 2026 17:36:25 -0600 Subject: [PATCH 2/3] squash!: cleanup --- openedx_authz/api/data.py | 2 +- openedx_authz/tests/api/test_data.py | 8 ++++---- openedx_authz/tests/api/test_roles.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index f0173616..fb8c21dc 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -252,7 +252,7 @@ def get_subclass_by_external_key(mcs, external_key: str) -> Type["ScopeData"]: """ # Special case: handle the global wildcard scope: - if external_key == "*": + if external_key == GLOBAL_SCOPE_WILDCARD: return mcs.glob_registry.get("global") if EXTERNAL_KEY_SEPARATOR not in external_key: diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index f8cde650..905e9d8b 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -366,11 +366,11 @@ def test_scope_validate_external_key(self, external_key, expected_valid, expecte self.assertEqual(result, expected_valid) @data( - "undefined:DemoX", - "undefined:DemoX:*", + "unknown:DemoX", + "unknown:DemoX:*", ) - def test_get_subclass_by_external_key_undefined_scope_raises_value_error(self, external_key): - """Undefined namespace should raise ValueError, including wildcard keys.""" + def test_get_subclass_by_external_key_unknown_scope_raises_value_error(self, external_key): + """Inknown namespace should raise ValueError, including wildcard keys.""" with self.assertRaises(ValueError): ScopeMeta.get_subclass_by_external_key(external_key) diff --git a/openedx_authz/tests/api/test_roles.py b/openedx_authz/tests/api/test_roles.py index 3b33ab38..ecdb80e2 100644 --- a/openedx_authz/tests/api/test_roles.py +++ b/openedx_authz/tests/api/test_roles.py @@ -922,7 +922,7 @@ class TestRoleAssignmentAPI(RolesTestSetupMixin): """ @ddt_data( - (["mary", "john"], roles.LIBRARY_USER.external_key, "*", True), + (["mary", "john"], roles.LIBRARY_USER.external_key, "global:batch_test", True), ( ["paul", "diana", "lila"], roles.LIBRARY_CONTRIBUTOR.external_key, From 0accbde95b40476ecfc55ae35eed4219d59dcdce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20M=C3=A9ndez?= Date: Wed, 22 Apr 2026 09:30:44 -0600 Subject: [PATCH 3/3] squash!: Support specifying multiple admin view and manage permissions on scope data --- openedx_authz/api/data.py | 130 ++++++++--------- openedx_authz/api/users.py | 19 +-- openedx_authz/rest_api/v1/views.py | 33 +++-- openedx_authz/tests/api/test_data.py | 4 +- openedx_authz/tests/api/test_users.py | 155 ++++++++++++++++++++- openedx_authz/tests/rest_api/test_views.py | 4 +- 6 files changed, 259 insertions(+), 86 deletions(-) diff --git a/openedx_authz/api/data.py b/openedx_authz/api/data.py index fb8c21dc..162b8a65 100644 --- a/openedx_authz/api/data.py +++ b/openedx_authz/api/data.py @@ -355,31 +355,37 @@ def validate_external_key(cls, _: str) -> bool: @classmethod @abstractmethod - def get_admin_view_permission(cls) -> PermissionData: - """Get the permission required to view this scope + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view this scope. This method should be implemented on every ScopeData subclass to define - which permission to check against when a user tries to see assignations + which permissions to check against when a user tries to see assignations related to this scope in the Admin Console. + The consumer uses OR logic: if the user has any one of the returned + permissions, access is granted. An empty list means access is denied. + Returns: - PermissionData: The permission required to view this scope in the admin console. + list[PermissionData]: The permissions required to view this scope in the admin console. """ - raise NotImplementedError("Subclasses must implement get_admin_view_permission method.") + raise NotImplementedError("Subclasses must implement get_admin_view_permissions method.") @classmethod @abstractmethod - def get_admin_manage_permission(cls) -> PermissionData: - """Get the permission required to manage this scope + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """Get the permissions required to manage this scope. This method should be implemented on every ScopeData subclass to define - which permission to check against when a user tries to manage assignations + which permissions to check against when a user tries to manage assignations related to this scope in the Admin Console. + The consumer uses OR logic: if the user has any one of the returned + permissions, access is granted. An empty list means access is denied. + Returns: - PermissionData: The permission required to manage this scope in the admin console. + list[PermissionData]: The permissions required to manage this scope in the admin console. """ - raise NotImplementedError("Subclasses must implement get_admin_manage_permission method.") + raise NotImplementedError("Subclasses must implement get_admin_manage_permissions method.") @abstractmethod def get_object(self) -> Any | None: @@ -452,28 +458,28 @@ def validate_external_key(cls, external_key: str) -> bool: return external_key == GLOBAL_SCOPE_WILDCARD @classmethod - def get_admin_view_permission(cls) -> PermissionData: - """No admin view permission for the global scope. + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view the global scope. - Global scope assignments are managed exclusively by superadmins - at the REST API layer, so no granular permission is needed. + Since the global scope spans all resource types, a user needs any one + of the scope-specific view permissions to be granted access. Returns: - None + list[PermissionData]: VIEW_LIBRARY_TEAM or COURSES_VIEW_COURSE_TEAM (OR logic). """ - return VIEW_LIBRARY_TEAM + return [VIEW_LIBRARY_TEAM, COURSES_VIEW_COURSE_TEAM] @classmethod - def get_admin_manage_permission(cls) -> PermissionData: - """No admin manage permission for the global scope. + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """No manage permissions for the global scope. - Global scope assignments are managed exclusively by superadmins - at the REST API layer, so no granular permission is needed. + Global scope management is restricted to superadmins at the REST API + layer, so no granular permission grants access. Returns: - None + list[PermissionData]: Empty list — access is always denied via permissions. """ - return None + return [] def get_object(self) -> None: """The global wildcard scope does not map to a concrete domain object. @@ -566,22 +572,22 @@ def validate_external_key(cls, external_key: str) -> bool: return False @classmethod - def get_admin_view_permission(cls) -> PermissionData: - """Get the permission required to view this scope + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view this scope. Returns: - PermissionData: The permission required to view this scope in the admin console. + list[PermissionData]: The permissions required to view this scope in the admin console. """ - return VIEW_LIBRARY_TEAM + return [VIEW_LIBRARY_TEAM] @classmethod - def get_admin_manage_permission(cls) -> PermissionData: - """Get the permission required to manage this scope + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """Get the permissions required to manage this scope. Returns: - PermissionData: The permission required to manage this scope in the admin console. + list[PermissionData]: The permissions required to manage this scope in the admin console. """ - return MANAGE_LIBRARY_TEAM + return [MANAGE_LIBRARY_TEAM] def get_object(self) -> ContentLibrary | None: """Retrieve the ContentLibrary instance associated with this scope. @@ -697,22 +703,22 @@ def validate_external_key(cls, external_key: str) -> bool: return False @classmethod - def get_admin_view_permission(cls) -> PermissionData: - """Get the permission required to view this scope + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view this scope. Returns: - PermissionData: The permission required to view this scope in the admin console. + list[PermissionData]: The permissions required to view this scope in the admin console. """ - return COURSES_VIEW_COURSE_TEAM + return [COURSES_VIEW_COURSE_TEAM] @classmethod - def get_admin_manage_permission(cls) -> PermissionData: - """Get the permission required to manage this scope + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """Get the permissions required to manage this scope. Returns: - PermissionData: The permission required to manage this scope in the admin console. + list[PermissionData]: The permissions required to manage this scope in the admin console. """ - return COURSES_MANAGE_COURSE_TEAM + return [COURSES_MANAGE_COURSE_TEAM] def get_object(self) -> CourseOverview | None: """Retrieve the CourseOverview instance associated with this scope. @@ -821,13 +827,13 @@ def validate_external_key(cls, external_key: str) -> bool: return True @classmethod - def get_admin_view_permission(cls) -> PermissionData: - """Get the permission required to view this scope + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view this scope. Returns: - PermissionData: The permission required to view this scope in the admin console. + list[PermissionData]: The permissions required to view this scope in the admin console. """ - raise NotImplementedError("Subclasses must implement get_admin_view_permission method.") + raise NotImplementedError("Subclasses must implement get_admin_view_permissions method.") @classmethod def build_external_key(cls, org: str) -> str: @@ -848,13 +854,13 @@ def build_external_key(cls, org: str) -> str: return f"{cls.NAMESPACE}{EXTERNAL_KEY_SEPARATOR}{org}{cls.ID_SEPARATOR}{GLOBAL_SCOPE_WILDCARD}" @classmethod - def get_admin_manage_permission(cls) -> PermissionData: - """Get the permission required to manage this scope + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """Get the permissions required to manage this scope. Returns: - PermissionData: The permission required to manage this scope in the admin console. + list[PermissionData]: The permissions required to manage this scope in the admin console. """ - raise NotImplementedError("Subclasses must implement get_admin_manage_permission method.") + raise NotImplementedError("Subclasses must implement get_admin_manage_permissions method.") @classmethod def get_org(cls, external_key: str) -> str | None: @@ -946,22 +952,22 @@ class OrgContentLibraryGlobData(OrgGlobData): ID_SEPARATOR: ClassVar[str] = ":" @classmethod - def get_admin_view_permission(cls) -> PermissionData: - """Get the permission required to view this scope + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view this scope. Returns: - PermissionData: The permission required to view this scope in the admin console. + list[PermissionData]: The permissions required to view this scope in the admin console. """ - return VIEW_LIBRARY_TEAM + return [VIEW_LIBRARY_TEAM] @classmethod - def get_admin_manage_permission(cls) -> PermissionData: - """Get the permission required to manage this scope + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """Get the permissions required to manage this scope. Returns: - PermissionData: The permission required to manage this scope in the admin console. + list[PermissionData]: The permissions required to manage this scope in the admin console. """ - return MANAGE_LIBRARY_TEAM + return [MANAGE_LIBRARY_TEAM] @define @@ -1004,22 +1010,22 @@ class OrgCourseOverviewGlobData(OrgGlobData): ID_SEPARATOR: ClassVar[str] = "+" @classmethod - def get_admin_view_permission(cls) -> PermissionData: - """Get the permission required to view this scope + def get_admin_view_permissions(cls) -> list[PermissionData]: + """Get the permissions required to view this scope. Returns: - PermissionData: The permission required to view this scope in the admin console. + list[PermissionData]: The permissions required to view this scope in the admin console. """ - return COURSES_VIEW_COURSE_TEAM + return [COURSES_VIEW_COURSE_TEAM] @classmethod - def get_admin_manage_permission(cls) -> PermissionData: - """Get the permission required to manage this scope + def get_admin_manage_permissions(cls) -> list[PermissionData]: + """Get the permissions required to manage this scope. Returns: - PermissionData: The permission required to manage this scope in the admin console. + list[PermissionData]: The permissions required to manage this scope in the admin console. """ - return COURSES_MANAGE_COURSE_TEAM + return [COURSES_MANAGE_COURSE_TEAM] class CCXCourseOverviewData(CourseOverviewData): diff --git a/openedx_authz/api/users.py b/openedx_authz/api/users.py index e84bd590..8265877d 100644 --- a/openedx_authz/api/users.py +++ b/openedx_authz/api/users.py @@ -272,21 +272,24 @@ def _filter_allowed_assignments( ) -> list[RoleAssignmentData]: """ Filter the given role assignments to only include those that the user has permission to view. + + Uses OR logic: if the user has any one of the scope's admin view permissions, the + assignment is included. """ if not user_external_key: # If no user is specified, return all assignments return assignments allowed_assignments: list[RoleAssignmentData] = [] for assignment in assignments: - permission = None - - # Get the permission needed to view the specific scope in the admin console - permission = assignment.scope.get_admin_view_permission().identifier + view_permissions = assignment.scope.get_admin_view_permissions() - if permission and is_user_allowed( - user_external_key=user_external_key, - action_external_key=permission, - scope_external_key=assignment.scope.external_key, + if view_permissions and any( + is_user_allowed( + user_external_key=user_external_key, + action_external_key=perm.identifier, + scope_external_key=assignment.scope.external_key, + ) + for perm in view_permissions ): allowed_assignments.append(assignment) diff --git a/openedx_authz/rest_api/v1/views.py b/openedx_authz/rest_api/v1/views.py index 7bf4f832..6b91a47d 100644 --- a/openedx_authz/rest_api/v1/views.py +++ b/openedx_authz/rest_api/v1/views.py @@ -781,7 +781,7 @@ def _get_allowed_scope_queryset( username: str, scope_cls: type, glob_cls: type, - get_permission: callable, + get_permissions: callable, queryset_builder: callable, extract_ids: callable, search: str = "", @@ -790,7 +790,7 @@ def _get_allowed_scope_queryset( """Resolve allowed scopes from Casbin and return a filtered queryset. This helper encapsulates the shared pattern of: - 1. Fetching allowed scopes for a user and permission. + 1. Fetching allowed scopes for a user across any of the scope's permissions (OR logic). 2. Partitioning them into specific IDs vs org-level globs. 3. Delegating to the appropriate queryset builder. @@ -798,7 +798,7 @@ def _get_allowed_scope_queryset( username: The username to check permissions for. scope_cls: The concrete scope data class (e.g., CourseOverviewData). glob_cls: The org-level glob class (e.g., OrgCourseOverviewGlobData). - get_permission: Callable that returns the permission for a scope class. + get_permissions: Callable that returns a list of permissions for a scope class. queryset_builder: Callable that builds the filtered queryset (e.g., _get_courses_queryset). extract_ids: Callable that extracts specific IDs from non-glob scopes. search: Optional search term to filter by display name. @@ -807,10 +807,19 @@ def _get_allowed_scope_queryset( Returns: QuerySet: The filtered queryset projected to the unified scope shape. """ - allowed_scopes = get_scopes_for_user_and_permission(username, get_permission(scope_cls).identifier) - specific_scopes = [s for s in allowed_scopes if not isinstance(s, glob_cls)] + # Collect allowed scopes across all permissions (OR logic) + all_allowed_scopes = [] + seen = set() + for perm in get_permissions(scope_cls): + for scope in get_scopes_for_user_and_permission(username, perm.identifier): + key = scope.namespaced_key + if key not in seen: + seen.add(key) + all_allowed_scopes.append(scope) + + specific_scopes = [s for s in all_allowed_scopes if not isinstance(s, glob_cls)] allowed_ids = extract_ids(specific_scopes) - allowed_orgs = {s.org for s in allowed_scopes if isinstance(s, glob_cls)} + allowed_orgs = {s.org for s in all_allowed_scopes if isinstance(s, glob_cls)} return queryset_builder(allowed_ids, allowed_orgs, search=search, org=org) def _build_queryset(self, courses_qs: QuerySet | None, libraries_qs: QuerySet | None) -> QuerySet: @@ -852,9 +861,11 @@ def get_queryset(self) -> QuerySet: management_only = params_serializer.validated_data["management_permission_only"] - # Determine which permission to check based on the query parameter. - def get_permission(scope_cls): - return scope_cls.get_admin_manage_permission() if management_only else scope_cls.get_admin_view_permission() + # Determine which permissions to check based on the query parameter. + def get_permissions(scope_cls): + return ( + scope_cls.get_admin_manage_permissions() if management_only else scope_cls.get_admin_view_permissions() + ) # Resolve allowed scopes from Casbin and build filtered querysets. courses_qs = None @@ -863,7 +874,7 @@ def get_permission(scope_cls): username=user.username, scope_cls=CourseOverviewData, glob_cls=OrgCourseOverviewGlobData, - get_permission=get_permission, + get_permissions=get_permissions, queryset_builder=self._get_courses_queryset, extract_ids=lambda scopes: {s.external_key for s in scopes}, search=search, @@ -876,7 +887,7 @@ def get_permission(scope_cls): username=user.username, scope_cls=ContentLibraryData, glob_cls=OrgContentLibraryGlobData, - get_permission=get_permission, + get_permissions=get_permissions, queryset_builder=self._get_libraries_queryset, extract_ids=lambda scopes: { (s.external_key.split(":")[1], s.external_key.split(":")[2]) for s in scopes diff --git a/openedx_authz/tests/api/test_data.py b/openedx_authz/tests/api/test_data.py index 905e9d8b..e8431702 100644 --- a/openedx_authz/tests/api/test_data.py +++ b/openedx_authz/tests/api/test_data.py @@ -370,7 +370,7 @@ def test_scope_validate_external_key(self, external_key, expected_valid, expecte "unknown:DemoX:*", ) def test_get_subclass_by_external_key_unknown_scope_raises_value_error(self, external_key): - """Inknown namespace should raise ValueError, including wildcard keys.""" + """Unknown namespace should raise ValueError, including wildcard keys.""" with self.assertRaises(ValueError): ScopeMeta.get_subclass_by_external_key(external_key) @@ -410,7 +410,7 @@ def exists(self) -> bool: return False @classmethod - def get_admin_view_permission(cls): + def get_admin_view_permissions(cls): raise NotImplementedError("Not implemented for TempScope") # Metaclass should have recreated the registries on the class diff --git a/openedx_authz/tests/api/test_users.py b/openedx_authz/tests/api/test_users.py index da0b3b75..293bd494 100644 --- a/openedx_authz/tests/api/test_users.py +++ b/openedx_authz/tests/api/test_users.py @@ -5,8 +5,16 @@ from ddt import data, ddt, unpack from django.contrib.auth import get_user_model -from openedx_authz.api.data import ContentLibraryData, RoleAssignmentData, RoleData, UserData +from openedx_authz.api.data import ( + ContentLibraryData, + CourseOverviewData, + GlobalWildcardScopeData, + RoleAssignmentData, + RoleData, + UserData, +) from openedx_authz.api.users import ( + _filter_allowed_assignments, assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope, batch_unassign_role_from_users, @@ -616,3 +624,148 @@ def test_mixed_active_inactive_subjects_in_assignments(self): self.assertGreater(len(eve_assignments), 0) self.assertEqual(grace_assignments, []) + + +class TestFilterAllowedAssignmentsPermissionLogic(UserAssignmentsSetupMixin): + """Test the OR logic and empty-list behavior of _filter_allowed_assignments. + + The function iterates each assignment's ``get_admin_view_permissions()`` list + and includes the assignment if the user has *any* of those permissions (OR). + An empty permissions list means the assignment is always excluded. + """ + + def _make_assignment(self, scope_cls, scope_key, role_key="library_admin"): + """Build a minimal RoleAssignmentData for testing.""" + return RoleAssignmentData( + subject=UserData(external_key="alice"), + roles=[RoleData(external_key=role_key)], + scope=scope_cls(external_key=scope_key), + ) + + # -- empty list → always excluded ------------------------------------ + + def test_empty_permissions_list_excludes_assignment(self): + """When get_admin_view_permissions returns [], the assignment is excluded + regardless of the user's actual permissions.""" + + assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + with patch.object(ContentLibraryData, "get_admin_view_permissions", return_value=[]): + result = _filter_allowed_assignments([assignment], user_external_key="alice") + + self.assertEqual(result, []) + + # -- single permission → standard check ------------------------------ + + def test_single_permission_granted_includes_assignment(self): + """A single-element permissions list where the user has the permission + results in the assignment being included.""" + + assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + # alice is library_admin on lib:Org1:math_101 → has VIEW_LIBRARY_TEAM + result = _filter_allowed_assignments([assignment], user_external_key="alice") + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], assignment) + + def test_single_permission_denied_excludes_assignment(self): + """A single-element permissions list where the user lacks the permission + results in the assignment being excluded.""" + + # mallory has no roles in lib:Org1:math_101 + assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + result = _filter_allowed_assignments([assignment], user_external_key="mallory") + + self.assertEqual(result, []) + + # -- multiple permissions → OR logic --------------------------------- + + def test_or_logic_first_permission_granted(self): + """When the permissions list has two entries and the user has only the + first one, the assignment is included (OR).""" + + assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + # alice has VIEW_LIBRARY_TEAM on this scope but not COURSES_VIEW_COURSE_TEAM + with patch.object( + ContentLibraryData, + "get_admin_view_permissions", + return_value=[permissions.VIEW_LIBRARY_TEAM, permissions.COURSES_VIEW_COURSE_TEAM], + ): + result = _filter_allowed_assignments([assignment], user_external_key="alice") + + self.assertEqual(len(result), 1) + + def test_or_logic_second_permission_granted(self): + """When the permissions list has two entries and the user has only the + second one, the assignment is included (OR).""" + # daniel is course_staff on course-v1:TestOrg+TestCourse+2024_T1 + # → has COURSES_VIEW_COURSE_TEAM but not VIEW_LIBRARY_TEAM on that scope + + assignment = self._make_assignment( + CourseOverviewData, "course-v1:TestOrg+TestCourse+2024_T1", role_key="course_staff" + ) + + with patch.object( + CourseOverviewData, + "get_admin_view_permissions", + return_value=[permissions.VIEW_LIBRARY_TEAM, permissions.COURSES_VIEW_COURSE_TEAM], + ): + result = _filter_allowed_assignments([assignment], user_external_key="daniel") + + self.assertEqual(len(result), 1) + + def test_or_logic_no_permission_granted(self): + """When the permissions list has two entries and the user has neither, + the assignment is excluded.""" + + # mallory has no roles anywhere + assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + with patch.object( + ContentLibraryData, + "get_admin_view_permissions", + return_value=[permissions.VIEW_LIBRARY_TEAM, permissions.COURSES_VIEW_COURSE_TEAM], + ): + result = _filter_allowed_assignments([assignment], user_external_key="mallory") + + self.assertEqual(result, []) + + # -- no user specified → return all ---------------------------------- + + def test_no_user_returns_all_assignments(self): + """When user_external_key is None, all assignments are returned + regardless of permissions.""" + + assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + with patch.object(ContentLibraryData, "get_admin_view_permissions", return_value=[]): + result = _filter_allowed_assignments([assignment], user_external_key=None) + + self.assertEqual(len(result), 1) + + # -- mixed assignments ----------------------------------------------- + + def test_mixed_assignments_filtered_correctly(self): + """A mix of assignments with different permission lists are filtered + correctly: empty-list scopes excluded, single-perm scopes checked, + multi-perm scopes use OR.""" + + # alice has VIEW_LIBRARY_TEAM on lib:Org1:math_101 + lib_assignment = self._make_assignment(ContentLibraryData, "lib:Org1:math_101") + + # GlobalWildcardScopeData returns [VIEW_LIBRARY_TEAM, COURSES_VIEW_COURSE_TEAM] + # alice has VIEW_LIBRARY_TEAM on lib:Org1:math_101 but the scope here is * + # which won't match — so this tests that the scope_external_key matters + global_assignment = RoleAssignmentData( + subject=UserData(external_key="alice"), + roles=[RoleData(external_key="library_admin")], + scope=GlobalWildcardScopeData(external_key="*"), + ) + + result = _filter_allowed_assignments([lib_assignment, global_assignment], user_external_key="alice") + + # lib assignment should be included (alice has VIEW_LIBRARY_TEAM there) + self.assertIn(lib_assignment, result) diff --git a/openedx_authz/tests/rest_api/test_views.py b/openedx_authz/tests/rest_api/test_views.py index 30b0be92..0b0da5c2 100644 --- a/openedx_authz/tests/rest_api/test_views.py +++ b/openedx_authz/tests/rest_api/test_views.py @@ -1605,7 +1605,7 @@ def test_org_glob_scope_returns_all_org_courses(self): self.assertNotIn(self.COURSE_ORG2, external_keys) def test_manage_permission_only_uses_manage_permission(self): - """management_permission_only=true calls get_admin_manage_permission, not get_admin_view_permission.""" + """management_permission_only=true calls get_admin_manage_permissions, not get_admin_view_permissions.""" user = User.objects.get(username="regular_1") self.client.force_authenticate(user=user) @@ -1620,7 +1620,7 @@ def test_manage_permission_only_uses_manage_permission(self): self.assertIn(permissions.COURSES_MANAGE_COURSE_TEAM.identifier, called_permissions) def test_view_permission_only_uses_view_permission(self): - """management_permission_only=false (default) calls get_admin_view_permission.""" + """management_permission_only=false (default) calls get_admin_view_permissions.""" user = User.objects.get(username="regular_1") self.client.force_authenticate(user=user)