From f0e47ebc72d8deb119968bf6faceec295388ea89 Mon Sep 17 00:00:00 2001 From: tommasofaedo Date: Fri, 19 Jun 2026 08:02:03 +0200 Subject: [PATCH 1/3] fix: browse() and list_datablocks() for V3 multi-frame EXPLORE (S7-1200 FW V4.5) On V3 PLCs (FW >= V4.5) the EXPLORE response for RID 0x8A11FFFF spans multiple TPKT frames and uses a zlib-compressed PlcContentInfo XML format instead of the PObject tree expected by _parse_explore_datablocks(). The existing reassemble=True path does not strip V3 HMAC prefixes from continuation frames, so list_datablocks() returned [] on these PLCs. Changes: connection.py: - Add collect_explore_frames(): collects V3 multi-fragment EXPLORE responses by receiving continuation frames and stripping their HMAC prefix, stopping when a shorter-than-reference frame is detected. _s7commplus_client.py: - Add _build_explore_payload_v3(): VLQ-encoded EXPLORE payload for V3 PLCs (required format for 0x8A11FFFF and per-DB RID explores). - Add _parse_explore_datablocks_xml(): decompresses the zlib PlcContentInfo XML blob and extracts Entity[@Id="Block"][@Type="DB"] entries; falls back to _parse_explore_datablocks() when no zlib magic is found. - list_datablocks(): when protocol_version >= V3, use _build_explore_payload_v3 + collect_explore_frames + _parse_explore_datablocks_xml. - browse(): when protocol_version >= V3, use V3 payload builder and frame collector for each per-DB EXPLORE. - _parse_explore_fields(): three fixes for V3 PLCs: * Accept WSTRING dtype 0x15 in addition to 0x13 for name attributes. * Auto-detect encoding: UTF-8 (V3, no null bytes) vs UTF-16-BE (V1/V2). * BLOB skip: account for the extra 0x00 byte V3 PLCs insert before VLQ len. * WSTRING skip: advance past string data bytes (was only skipping VLQ). Tested on S7-1200 CPU 1212C DC/DC/DC, firmware V4.5 (V3 protocol, no TLS): - list_datablocks() now returns [{"name": "Data_block_1", "number": 100, "rid": 2316173412}] where it previously returned []. - The PlcContentInfo XML (6131 bytes after decompression) is correctly parsed from a 3-frame response (first 946-byte frame + two continuations). Known limitation: on FW V4.5, DB field definitions and I/Q/M tag names are stored in zlib BLOBs with a Siemens preset dictionary (magic 78 7D, FDICT flag set). Python zlib.decompress() returns Z_NEED_DICT. browse() returns DB names/numbers but cannot enumerate individual field names on V3 PLCs. --- s7/_s7commplus_client.py | 127 ++++++++++++++++++++++++++++++++++++--- s7/connection.py | 50 +++++++++++++++ 2 files changed, 170 insertions(+), 7 deletions(-) diff --git a/s7/_s7commplus_client.py b/s7/_s7commplus_client.py index 0246eb17..10e77594 100644 --- a/s7/_s7commplus_client.py +++ b/s7/_s7commplus_client.py @@ -12,7 +12,7 @@ from typing import Any, Optional from .connection import S7CommPlusConnection -from .protocol import FunctionCode, Ids, ElementID, DataType, ObjectId +from .protocol import FunctionCode, Ids, ElementID, DataType, ObjectId, ProtocolVersion from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq from .codec import ( encode_item_address, @@ -365,6 +365,15 @@ def list_datablocks(self) -> list[dict[str, Any]]: if self._connection is None: raise RuntimeError("Not connected") + if self._connection._protocol_version >= ProtocolVersion.V3: + # V3 PLCs (FW >= V4.5): EXPLORE 0x8A11FFFF returns a multi-frame + # zlib-compressed PlcContentInfo XML blob. The existing reassemble + # path does not strip V3 HMAC prefixes, so we collect frames manually. + payload = _build_explore_payload_v3(0x8A11FFFF) + first_response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5) + response = self._connection.collect_explore_frames(first_response) + return _parse_explore_datablocks_xml(response) + payload = _build_explore_request(Ids.NATIVE_THE_PLC_PROGRAM_RID, [Ids.OBJECT_VARIABLE_TYPE_NAME, Ids.BLOCK_BLOCK_NUMBER]) response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) return _parse_explore_datablocks(response) @@ -394,9 +403,17 @@ def browse(self) -> list[dict[str, Any]]: db_rid = db_info.get("rid", 0) if db_rid == 0: continue - payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME]) + is_v3 = self._connection._protocol_version >= ProtocolVersion.V3 + if is_v3: + payload = _build_explore_payload_v3(db_rid) + else: + payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME]) try: - response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) + if is_v3: + first_response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5) + response = self._connection.collect_explore_frames(first_response) + else: + response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) fields = _parse_explore_fields(response, db_info["number"], db_info["name"]) variables.extend(fields) except Exception: @@ -763,6 +780,80 @@ def _build_explore_request(explore_id: int, attribute_ids: list[int]) -> bytes: return bytes(payload) +def _build_explore_payload_v3(explore_id: int) -> bytes: + """Build a V3-style EXPLORE payload targeting a specific RID. + + V3 PLCs (FW >= V4.5) use a compact VLQ-encoded format instead of + the fixed big-endian layout of _build_explore_request(). The RID + 0x8A11FFFF triggers the PLC to return a ``PlcContentInfo`` XML blob + compressed with zlib (magic ``78 DA``) spanning multiple TPKT frames. + + Args: + explore_id: RID to explore (e.g. ``0x8A11FFFF`` for all blocks). + + Returns: + Encoded EXPLORE payload. + """ + payload = bytearray() + payload += encode_uint32_vlq(explore_id) + # Trailing UInt32 fill + filler byte (same tail as _build_explore_request) + payload += struct.pack(">I", 0) + bytes([0]) + return bytes(payload) + + +def _parse_explore_datablocks_xml(response: bytes) -> list[dict[str, Any]]: + """Parse a V3 EXPLORE response containing a zlib-compressed PlcContentInfo XML blob. + + On V3 PLCs the ``0x8A11FFFF`` EXPLORE returns a ``PlcContentInfo`` XML + document compressed with standard zlib (magic ``78 DA``) embedded inside a + large BLOB attribute that spans multiple TPKT frames. This parser locates + the zlib header in the concatenated response, decompresses it, and extracts + DB entities. + + Falls back to :func:`_parse_explore_datablocks` when no ``78 DA`` magic is + found so that V1/V2 responses are handled transparently. + + Returns: + List of dicts: ``{"name": str, "number": int, "rid": int}`` + """ + import zlib + import xml.etree.ElementTree as ET + + zlib_pos = response.find(b"\x78\xda") + if zlib_pos < 0: + logger.debug("_parse_explore_datablocks_xml: no zlib magic, falling back to PObject parser") + return _parse_explore_datablocks(response) + + try: + xml_bytes = zlib.decompress(response[zlib_pos:]) + except zlib.error as exc: + logger.debug(f"_parse_explore_datablocks_xml: zlib error {exc}") + return [] + + try: + root = ET.fromstring(xml_bytes.decode("utf-8")) + except Exception as exc: + logger.debug(f"_parse_explore_datablocks_xml: XML parse error {exc}") + return [] + + datablocks: list[dict[str, Any]] = [] + for entity in root.findall('.//Entity[@Id="Block"]'): + header = entity.find("Header") + if header is None or header.get("Type") != "DB": + continue + name = header.get("Name", "") + try: + number = int(header.get("Number", "0")) + rid = int(entity.get("Rid", "0")) + except ValueError: + continue + if name and number > 0: + datablocks.append({"name": name, "number": number, "rid": rid}) + + logger.debug(f"_parse_explore_datablocks_xml: found {len(datablocks)} DB(s)") + return datablocks + + def _parse_explore_datablocks(response: bytes) -> list[dict[str, Any]]: """Parse an EXPLORE(thePLCProgram) response to extract datablock info. @@ -910,26 +1001,48 @@ def _parse_explore_fields(response: bytes, db_number: int, db_name: str) -> list datatype = response[offset + 1] offset += 2 - if attr_id == Ids.OBJECT_VARIABLE_TYPE_NAME and datatype == 0x13: + if attr_id == Ids.OBJECT_VARIABLE_TYPE_NAME and datatype in (0x13, 0x15): # S7String / WSTRING if offset >= len(response): break str_len, consumed = _vlq32(response, offset) offset += consumed if offset + str_len <= len(response): + raw_str = response[offset : offset + str_len] try: - field_name = response[offset : offset + str_len].decode("utf-16-be", errors="replace") + # V3 PLCs send UTF-8; V1/V2 send UTF-16-BE. + # UTF-16-BE always contains null bytes for ASCII names; + # UTF-8 ASCII names never do — use that as the discriminator. + if b"\x00" in raw_str: + field_name = raw_str.decode("utf-16-be", errors="replace").rstrip("\x00") + else: + field_name = raw_str.decode("utf-8", errors="replace") except Exception: field_name = "" offset += str_len continue - # Skip attribute value - if flags & 0x10: + # Skip attribute value. V3 PLCs insert an extra 0x00 byte before + # the VLQ length of BLOB (0x14) attributes; WSTRING (0x15) skip + # must also advance past the string data bytes. + if flags & 0x10: # array if offset >= len(response): break count, consumed = _vlq32(response, offset) offset += consumed offset += count + elif datatype == 0x14: # BLOB — V3 adds an extra 0x00 before VLQ length + if offset >= len(response): + break + offset += 1 # extra 0x00 byte present in V3 encoding + if offset >= len(response): + break + blob_len, consumed = _vlq32(response, offset) + offset += consumed + blob_len + elif datatype in (0x13, 0x15): # S7String / WSTRING not matched above + if offset >= len(response): + break + str_len, consumed = _vlq32(response, offset) + offset += consumed + str_len else: if offset >= len(response): break diff --git a/s7/connection.py b/s7/connection.py index 5b861402..7566befe 100644 --- a/s7/connection.py +++ b/s7/connection.py @@ -408,6 +408,56 @@ def _send_legitimation_legacy(self, response: bytes) -> None: raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") + def collect_explore_frames(self, first_payload: bytes) -> bytes: + """Collect multi-fragment EXPLORE continuation frames for V3 PLCs. + + On V3 PLCs (FW >= V4.5) a large EXPLORE response (e.g. RID 0x8A11FFFF) + spans multiple TPKT frames. The first frame is the normal response + (already stripped of its 10-byte header by send_request). Continuation + frames carry **no** response header — they are raw BLOB data protected + only by a V3 HMAC prefix. The caller must concatenate them before + parsing. + + Detection of the last fragment: a frame whose body (after HMAC strip) + is measurably shorter than the first frame body is the last fragment. + We use a 5-byte tolerance to absorb minor size jitter. + + Args: + first_payload: First EXPLORE response payload, already returned by + send_request() (10-byte response header already stripped). + + Returns: + All fragment payloads concatenated (first_payload + continuations). + """ + # The first frame body (already header-stripped) was originally + # len(first_payload) + 10 bytes on the wire (10-byte response header). + # Continuation frames of the same "full" size will be that long after + # HMAC strip; a shorter body signals the last fragment. + reference_size = len(first_payload) + 10 + all_data = first_payload + while True: + try: + raw = self._recv_s7_data() + if not raw: + break + # Strip the 4-byte S7CommPlus fragment header (0x72 ver len:2) + if len(raw) < 4 or raw[0] != 0x72: + break + frag_len = (raw[2] << 8) | raw[3] + body = raw[4 : 4 + frag_len] + # V3 non-TLS: strip the HMAC prefix ([hash_len][hash_bytes]) + if self._protocol_version >= ProtocolVersion.V3 and len(body) > 33: + hash_len = body[0] + body = body[1 + hash_len :] + if not body: + break + all_data += body + if len(body) < reference_size - 5: + break # last fragment + except Exception: + break + return all_data + def disconnect(self) -> None: """Disconnect from PLC.""" if self._connected and self._session_id: From 906c877173800fba7f398f19c7f283806e145ef0 Mon Sep 17 00:00:00 2001 From: tommasofaedo Date: Mon, 29 Jun 2026 16:23:16 +0200 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20PR=20#753=20review=20?= =?UTF-8?q?=E2=80=94=20add=20caps=20and=20trailer=20detection=20to=20colle?= =?UTF-8?q?ct=5Fexplore=5Fframes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add fragment count and byte size caps using _MAX_REASSEMBLED_FRAGMENTS / _MAX_REASSEMBLED_BYTES (same limits already used by _recv_reassembled_payload) - Add frag_len == 0 check as primary end-of-stream trailer detection - Keep shorter-than-full-frame heuristic as fallback - Update docstring to document termination and safety limits --- s7/connection.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/s7/connection.py b/s7/connection.py index 7566befe..2385c5fc 100644 --- a/s7/connection.py +++ b/s7/connection.py @@ -418,9 +418,14 @@ def collect_explore_frames(self, first_payload: bytes) -> bytes: only by a V3 HMAC prefix. The caller must concatenate them before parsing. - Detection of the last fragment: a frame whose body (after HMAC strip) - is measurably shorter than the first frame body is the last fragment. - We use a 5-byte tolerance to absorb minor size jitter. + Termination: a ``frag_len == 0`` frame is the standard S7CommPlus + end-of-stream trailer. As a fallback, a frame whose body (after HMAC + strip) is measurably shorter than the first frame body is treated as the + last fragment (5-byte tolerance). + + Collection is capped by ``_MAX_REASSEMBLED_FRAGMENTS`` and + ``_MAX_REASSEMBLED_BYTES`` to prevent unbounded allocation on malformed + or adversarial responses. Args: first_payload: First EXPLORE response payload, already returned by @@ -435,7 +440,14 @@ def collect_explore_frames(self, first_payload: bytes) -> bytes: # HMAC strip; a shorter body signals the last fragment. reference_size = len(first_payload) + 10 all_data = first_payload + fragment_count = 0 while True: + if len(all_data) > self._MAX_REASSEMBLED_BYTES or fragment_count >= self._MAX_REASSEMBLED_FRAGMENTS: + from snap7.error import S7ConnectionError + raise S7ConnectionError( + f"collect_explore_frames: response too large " + f"({len(all_data)} bytes, {fragment_count} fragments)" + ) try: raw = self._recv_s7_data() if not raw: @@ -444,6 +456,8 @@ def collect_explore_frames(self, first_payload: bytes) -> bytes: if len(raw) < 4 or raw[0] != 0x72: break frag_len = (raw[2] << 8) | raw[3] + if frag_len == 0: + break # standard S7CommPlus end-of-stream trailer body = raw[4 : 4 + frag_len] # V3 non-TLS: strip the HMAC prefix ([hash_len][hash_bytes]) if self._protocol_version >= ProtocolVersion.V3 and len(body) > 33: @@ -452,8 +466,9 @@ def collect_explore_frames(self, first_payload: bytes) -> bytes: if not body: break all_data += body + fragment_count += 1 if len(body) < reference_size - 5: - break # last fragment + break # fallback: shorter-than-full frame signals last fragment except Exception: break return all_data From d1fd133af3f56b685487c17528b9391bae1c1c37 Mon Sep 17 00:00:00 2001 From: tommasofaedo Date: Mon, 29 Jun 2026 16:26:21 +0200 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20address=20PR=20#753=20review=20?= =?UTF-8?q?=E2=80=94=20guard=20BLOB=20V3=20offset,=20add=20XXE=20comment,?= =?UTF-8?q?=20pass=20protocol=5Fversion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _parse_explore_fields: add protocol_version param (default 0 = backward compat) Guard the BLOB extra-0x00 skip with `if protocol_version >= ProtocolVersion.V3` so V1/V2 EXPLORE responses with BLOB attributes are not mis-parsed - browse(): pass self._connection._protocol_version to _parse_explore_fields - _parse_explore_datablocks_xml: add comment noting ET.fromstring XXE safety assumption (safe by default in Python 3.8+, XML source is the PLC) --- s7/_s7commplus_client.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/s7/_s7commplus_client.py b/s7/_s7commplus_client.py index 10e77594..4ad2c7ab 100644 --- a/s7/_s7commplus_client.py +++ b/s7/_s7commplus_client.py @@ -414,7 +414,8 @@ def browse(self) -> list[dict[str, Any]]: response = self._connection.collect_explore_frames(first_response) else: response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) - fields = _parse_explore_fields(response, db_info["number"], db_info["name"]) + fields = _parse_explore_fields(response, db_info["number"], db_info["name"], + protocol_version=self._connection._protocol_version) variables.extend(fields) except Exception: logger.debug(f"Failed to explore DB {db_info['name']} (rid={db_rid:#x})") @@ -831,6 +832,8 @@ def _parse_explore_datablocks_xml(response: bytes) -> list[dict[str, Any]]: return [] try: + # ET.fromstring is safe against XXE by default in Python 3.8+: external entities + # are not expanded. The XML source is the PLC (trusted local network device). root = ET.fromstring(xml_bytes.decode("utf-8")) except Exception as exc: logger.debug(f"_parse_explore_datablocks_xml: XML parse error {exc}") @@ -935,9 +938,15 @@ def _parse_explore_datablocks(response: bytes) -> list[dict[str, Any]]: return datablocks -def _parse_explore_fields(response: bytes, db_number: int, db_name: str) -> list[dict[str, Any]]: +def _parse_explore_fields(response: bytes, db_number: int, db_name: str, protocol_version: int = 0) -> list[dict[str, Any]]: """Parse an EXPLORE response for a single DB to extract field layout. + Args: + protocol_version: Protocol version from the connection (0 = V1/unknown). + Pass ``ProtocolVersion.V3`` (3) for V3 PLCs so that V3-specific + encoding differences (extra 0x00 before BLOB VLQ length) are handled + correctly without corrupting V1/V2 responses. + Returns: List of dicts with keys: ``name``, ``db_number``, ``byte_offset``, ``data_type``, ``lid``, @@ -1030,10 +1039,11 @@ def _parse_explore_fields(response: bytes, db_number: int, db_name: str) -> list count, consumed = _vlq32(response, offset) offset += consumed offset += count - elif datatype == 0x14: # BLOB — V3 adds an extra 0x00 before VLQ length + elif datatype == 0x14: # BLOB if offset >= len(response): break - offset += 1 # extra 0x00 byte present in V3 encoding + if protocol_version >= ProtocolVersion.V3: + offset += 1 # V3 inserts an extra 0x00 before the VLQ length if offset >= len(response): break blob_len, consumed = _vlq32(response, offset)