diff --git a/percy/cache.py b/percy/cache.py index 594057c..79959e2 100644 --- a/percy/cache.py +++ b/percy/cache.py @@ -33,10 +33,14 @@ def get_cache(cls, session_id, property): @classmethod def cleanup_cache(cls): + # Iterate a list copy so we can rewrite (or in future, delete) entries + # without `RuntimeError: dictionary changed size during iteration`. now = time.time() - for session_id, session in cls.CACHE.items(): - timestamp = session[cls.TIMEOUT_KEY] + for session_id, session in list(cls.CACHE.items()): + timestamp = session.get(cls.TIMEOUT_KEY) + if timestamp is None: + continue if now - timestamp >= cls.CACHE_TIMEOUT: cls.CACHE[session_id] = { - cls.session_details: session[cls.session_details] + cls.session_details: session.get(cls.session_details) } diff --git a/percy/screenshot.py b/percy/screenshot.py index dff7779..42e8d96 100644 --- a/percy/screenshot.py +++ b/percy/screenshot.py @@ -96,6 +96,146 @@ def fetch_percy_dom(): return response.text +def _get_origin(url): + """Return scheme://host:port for a URL, or '' if it can't be parsed.""" + if not url: + return "" + try: + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + return "" + return f"{parsed.scheme}://{parsed.netloc}" + except Exception: # pragma: no cover + return "" + + +def _same_origin(url, page_origin): + """True when `url`'s origin (scheme + host + port) matches the page origin.""" + if not page_origin: + return False + return _get_origin(url) == page_origin + + +def _walk_nodes(node, closed_pairs, page_origin=""): + """Walk CDP DOM tree to find closed shadow roots. + + Same-origin child frame documents share the parent's JS realm and the + `window.__percyClosedShadowRoots` WeakMap that PercyDOM.serialize reads, + so we recurse INTO them. Cross-origin frames live in a different realm + (their resolveNode objectIds wouldn't belong to our execution context), + so they're skipped. A contentDocument with no resolvable origin is also + skipped defensively.""" + if "contentDocument" in node: + content_doc = node["contentDocument"] + document_url = content_doc.get("documentURL") + if not document_url: + return + if not _same_origin(document_url, page_origin): + log( + "Skipping cross-origin frame document during" + f" closed-shadow walk: {document_url}", + lvl="debug" + ) + return + # Same-origin frame: walk into the contentDocument as if it were any + # other subtree, then continue with this node's own children below. + _walk_nodes(content_doc, closed_pairs, page_origin) + if "shadowRoots" in node: + for sr in node["shadowRoots"]: + if sr.get("shadowRootType") == "closed": + closed_pairs.append({ + "hostBackendNodeId": node["backendNodeId"], + "shadowBackendNodeId": sr["backendNodeId"] + }) + _walk_nodes(sr, closed_pairs, page_origin) + if "children" in node: + for child in node["children"]: + _walk_nodes(child, closed_pairs, page_origin) + + +# pylint: disable=too-many-locals +def expose_closed_shadow_roots(page): + """Use CDP to discover closed shadow roots and expose them to PercyDOM.serialize(). + Closed shadow roots are inaccessible from JS (element.shadowRoot === null), + but CDP's DOM domain can pierce them.""" + cdp_session = None + dom_enabled = False + try: + cdp_session = page.context.new_cdp_session(page) + + cdp_session.send("DOM.enable") + dom_enabled = True + doc_result = cdp_session.send( + "DOM.getDocument", {"depth": -1, "pierce": True} + ) + root = doc_result["root"] + + # Compute the top-level page origin once so the walker can recurse + # into same-origin child frame documents but skip cross-origin ones. + page_origin = _get_origin(page.url) + + closed_pairs = [] + _walk_nodes(root, closed_pairs, page_origin) + + if not closed_pairs: + return + + log( + f"Found {len(closed_pairs)} closed shadow root(s)," + " exposing via CDP", + lvl="debug" + ) + + weakmap_script = ( + "() => { window.__percyClosedShadowRoots =" + " window.__percyClosedShadowRoots || new WeakMap(); }" + ) + page.evaluate(weakmap_script) + + fn_decl = ( + "function(shadowRoot) {" + " window.__percyClosedShadowRoots" + ".set(this, shadowRoot); }" + ) + for pair in closed_pairs: + host_id = pair["hostBackendNodeId"] + host_result = cdp_session.send( + "DOM.resolveNode", {"backendNodeId": host_id} + ) + host_object_id = host_result["object"]["objectId"] + + shadow_id = pair["shadowBackendNodeId"] + shadow_result = cdp_session.send( + "DOM.resolveNode", {"backendNodeId": shadow_id} + ) + shadow_object_id = shadow_result["object"]["objectId"] + + cdp_session.send("Runtime.callFunctionOn", { + "functionDeclaration": fn_decl, + "objectId": host_object_id, + "arguments": [{"objectId": shadow_object_id}] + }) + except Exception as err: + log( + f"Could not expose closed shadow roots via CDP: {err}", + lvl="debug" + ) + finally: + # Release the DOM domain so subsequent CDP commands don't keep + # emitting DOM events for this session. Only sent when DOM.enable + # succeeded — a failing enable must not emit a spurious disable. + if dom_enabled: + try: + cdp_session.send("DOM.disable") + except Exception: # pragma: no cover + pass + if cdp_session: # pragma: no branch + try: + cdp_session.detach() + except Exception: # pragma: no cover + pass + + def process_frame(page, frame, options, percy_dom_script): """ Processes a single cross-origin frame to capture its snapshot and resources. @@ -439,6 +579,9 @@ def capture_responsive_dom(page, cookies, percy_dom_script=None, config=None, ** if PERCY_RESPONSIVE_CAPTURE_RELOAD_PAGE: page.reload() page.evaluate(percy_dom_script) + # Re-prime the closed-shadow-root WeakMap — page.reload() creates a + # new document and erases window.__percyClosedShadowRoots. + expose_closed_shadow_roots(page) page.evaluate("PercyDOM.waitForResize()") resize_count = 0 @@ -492,6 +635,10 @@ def percy_snapshot(page, name, **kwargs): # Inject the DOM serialization script percy_dom_script = fetch_percy_dom() page.evaluate(percy_dom_script) + + # Expose closed shadow roots via CDP before serialization + expose_closed_shadow_roots(page) + cookies = page.context.cookies() # Serialize and capture the DOM diff --git a/tests/test_cache.py b/tests/test_cache.py index be4e69b..979e526 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -61,3 +61,12 @@ def test_cleanup_cache(self): self.assertIn(self.session_id, self.cache.CACHE) self.assertIn("session_details", self.cache.CACHE[self.session_id]) self.assertNotIn("key-1", self.cache.CACHE[self.session_id]) + + def test_cleanup_cache_skips_entry_missing_timeout_key(self): + orphan_id = "orphan_session" + self.cache.CACHE[orphan_id] = {Cache.session_details: {"hashed_id": "x"}} + self.cache.cleanup_cache() + self.assertEqual( + self.cache.CACHE[orphan_id], {Cache.session_details: {"hashed_id": "x"}} + ) + del self.cache.CACHE[orphan_id] diff --git a/tests/test_screenshot.py b/tests/test_screenshot.py index 48191e0..45f91a3 100644 --- a/tests/test_screenshot.py +++ b/tests/test_screenshot.py @@ -24,6 +24,10 @@ change_window_dimension_and_wait, get_serialized_dom, process_frame, + expose_closed_shadow_roots, + _walk_nodes, + _get_origin, + _same_origin, log, _resolve_readiness_config, _wait_for_ready, @@ -1219,6 +1223,8 @@ def test_capture_responsive_dom_calls_resize_reload_sleep(self): ) as mock_resize, patch( "percy.screenshot.fetch_percy_dom" ) as mock_fetch, patch( + "percy.screenshot.expose_closed_shadow_roots" + ) as mock_expose, patch( "percy.screenshot.sleep" ) as mock_sleep: mock_widths.return_value = [ @@ -1237,6 +1243,9 @@ def test_capture_responsive_dom_calls_resize_reload_sleep(self): page.evaluate.assert_any_call("dom-script") self.assertEqual(page.evaluate.call_count, 5) self.assertEqual(page.reload.call_count, 2) + # WeakMap must be re-primed after each reload (one per width here) + self.assertEqual(mock_expose.call_count, 2) + mock_expose.assert_has_calls([call(page), call(page)]) mock_sleep.assert_any_call(1) self.assertEqual(mock_sleep.call_count, 2) mock_resize.assert_has_calls( @@ -1418,5 +1427,229 @@ def test_create_region_with_invalid_algorithm(self): self.assertEqual(result, expected_result) +class TestClosedShadowDOM(unittest.TestCase): + """Tests for expose_closed_shadow_roots and _walk_nodes.""" + + def test_walk_nodes_finds_closed_shadow_roots(self): + # uses top-level _walk_nodes import + node = { + "backendNodeId": 1, + "shadowRoots": [ + {"backendNodeId": 2, "shadowRootType": "closed", "children": []}, + {"backendNodeId": 3, "shadowRootType": "open", "children": []} + ], + "children": [] + } + pairs = [] + _walk_nodes(node, pairs) + self.assertEqual(len(pairs), 1) + self.assertEqual(pairs[0]["hostBackendNodeId"], 1) + self.assertEqual(pairs[0]["shadowBackendNodeId"], 2) + + def test_walk_nodes_skips_content_document_missing_url(self): + # contentDocument with no documentURL -> defensive skip + # uses top-level _walk_nodes import + node = { + "backendNodeId": 1, + "contentDocument": {"backendNodeId": 2, "children": [ + {"backendNodeId": 3, "shadowRoots": [ + {"backendNodeId": 4, "shadowRootType": "closed", "children": []} + ], "children": []} + ]}, + "children": [] + } + pairs = [] + _walk_nodes(node, pairs, "https://example.com") + self.assertEqual(len(pairs), 0) + + def test_walk_nodes_recurses_into_same_origin_iframe(self): + # Same-origin contentDocument -> recurse and capture closed roots inside + # uses top-level _walk_nodes import + node = { + "backendNodeId": 1, + "contentDocument": { + "backendNodeId": 2, + "documentURL": "https://example.com/inner", + "children": [ + {"backendNodeId": 3, "shadowRoots": [ + {"backendNodeId": 4, "shadowRootType": "closed", + "children": []} + ], "children": []} + ] + }, + "children": [] + } + pairs = [] + _walk_nodes(node, pairs, "https://example.com") + self.assertEqual(len(pairs), 1) + self.assertEqual(pairs[0]["hostBackendNodeId"], 3) + self.assertEqual(pairs[0]["shadowBackendNodeId"], 4) + + def test_walk_nodes_skips_cross_origin_iframe(self): + # Cross-origin contentDocument -> skip the nested document entirely + # uses top-level _walk_nodes import + node = { + "backendNodeId": 1, + "contentDocument": { + "backendNodeId": 2, + "documentURL": "https://other.com/inner", + "children": [ + {"backendNodeId": 3, "shadowRoots": [ + {"backendNodeId": 4, "shadowRootType": "closed", + "children": []} + ], "children": []} + ] + }, + "children": [] + } + pairs = [] + _walk_nodes(node, pairs, "https://example.com") + self.assertEqual(len(pairs), 0) + + def test_walk_nodes_skips_iframe_when_page_origin_unknown(self): + # documentURL present but no page origin to compare against -> skip + # uses top-level _walk_nodes import + node = { + "backendNodeId": 1, + "contentDocument": { + "backendNodeId": 2, + "documentURL": "https://example.com/inner", + "children": [ + {"backendNodeId": 3, "shadowRoots": [ + {"backendNodeId": 4, "shadowRootType": "closed", + "children": []} + ], "children": []} + ] + }, + "children": [] + } + pairs = [] + _walk_nodes(node, pairs, "") + self.assertEqual(len(pairs), 0) + + def test_get_origin_and_same_origin(self): + self.assertEqual( + _get_origin("https://example.com:8080/a/b"), + "https://example.com:8080" + ) + self.assertEqual(_get_origin(""), "") + self.assertEqual(_get_origin("not a url"), "") + self.assertTrue( + _same_origin("https://example.com/x", "https://example.com") + ) + self.assertFalse( + _same_origin("https://other.com/x", "https://example.com") + ) + self.assertFalse(_same_origin("https://example.com/x", "")) + + def test_expose_non_chromium_browser(self): + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + page.context.new_cdp_session.side_effect = Exception("Not Chromium") + # Should not throw + expose_closed_shadow_roots(page) + + def test_expose_no_closed_roots(self): + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + page.url = "https://example.com" + cdp = MagicMock() + page.context.new_cdp_session.return_value = cdp + cdp.send.side_effect = lambda method, params=None: ( + {"root": {"backendNodeId": 1, "children": []}} if method == "DOM.getDocument" else None + ) + expose_closed_shadow_roots(page) + cdp.detach.assert_called_once() + page.evaluate.assert_not_called() + # DOM.disable must be paired with the successful DOM.enable + sent = [c.args[0] for c in cdp.send.call_args_list] + self.assertIn("DOM.enable", sent) + self.assertIn("DOM.disable", sent) + + def test_expose_closed_roots_found(self): + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + page.url = "https://example.com" + cdp = MagicMock() + page.context.new_cdp_session.return_value = cdp + + def cdp_send(method, params=None): + if method == "DOM.getDocument": + return {"root": {"backendNodeId": 1, "children": [ + {"backendNodeId": 10, "shadowRoots": [ + {"backendNodeId": 20, "shadowRootType": "closed", "children": []} + ], "children": []} + ]}} + if method == "DOM.resolveNode": + return {"object": {"objectId": f"obj-{params['backendNodeId']}"}} + return None + + cdp.send.side_effect = cdp_send + expose_closed_shadow_roots(page) + page.evaluate.assert_called_once() + cdp.detach.assert_called_once() + + def test_expose_sends_dom_disable_after_mid_walk_failure(self): + # DOM.enable succeeded but a later send raised -> DOM.disable still sent + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + page.url = "https://example.com" + cdp = MagicMock() + page.context.new_cdp_session.return_value = cdp + + # DOM.enable returns None (success); DOM.getDocument raises mid-walk + def cdp_send(method, _params=None): + if method == "DOM.getDocument": + raise Exception("walk blew up") + + cdp.send.side_effect = cdp_send + expose_closed_shadow_roots(page) + sent = [c.args[0] for c in cdp.send.call_args_list] + self.assertIn("DOM.disable", sent) + cdp.detach.assert_called_once() + + def test_expose_does_not_send_dom_disable_when_enable_failed(self): + # DOM.enable itself raised -> no spurious DOM.disable + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + page.url = "https://example.com" + cdp = MagicMock() + page.context.new_cdp_session.return_value = cdp + + def cdp_send(method, _params=None): + if method == "DOM.enable": + raise Exception("enable failed") + + cdp.send.side_effect = cdp_send + expose_closed_shadow_roots(page) + sent = [c.args[0] for c in cdp.send.call_args_list] + self.assertNotIn("DOM.disable", sent) + cdp.detach.assert_called_once() + + def test_expose_cdp_error_non_fatal(self): + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + cdp = MagicMock() + page.context.new_cdp_session.return_value = cdp + cdp.send.side_effect = Exception("CDP failed") + # Should not throw + expose_closed_shadow_roots(page) + cdp.detach.assert_called_once() + + def test_expose_detach_error_suppressed(self): + # covers lines 174-175: except Exception: pass in finally + # uses top-level expose_closed_shadow_roots import + page = MagicMock() + cdp = MagicMock() + page.context.new_cdp_session.return_value = cdp + cdp.send.side_effect = lambda method, params=None: ( + {"root": {"backendNodeId": 1, "children": []}} + if method == "DOM.getDocument" else None + ) + cdp.detach.side_effect = Exception("Detach failed") + # Should not throw even when detach fails + expose_closed_shadow_roots(page) + + if __name__ == "__main__": unittest.main()