From b0679a2e5242069c7ad54dcec5655adf1053f924 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Mon, 23 Mar 2026 17:18:20 -0400 Subject: [PATCH 1/3] Replace IP-based matching with instance-ID-based matching to handle EC2 eventual consistency When launching large numbers of instances, DescribeInstances may return instances with missing PrivateIpAddress due to EC2 API eventual consistency. Previously, clustermgtd matched EC2 instances to Slurm nodes by IP address, causing these instances to be treated as non-existent and terminated. This commit replaces IP-based matching with instance-ID-based matching by leveraging Slurm's native scontrol update InstanceId support (since 23.11). --- src/common/schedulers/slurm_commands.py | 22 ++++++++++++++++++++- src/slurm_plugin/clustermgtd.py | 13 ++++++------ src/slurm_plugin/instance_manager.py | 20 +++++++++++++++++-- src/slurm_plugin/slurm_resources.py | 2 ++ tests/slurm_plugin/test_instance_manager.py | 1 + 5 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/common/schedulers/slurm_commands.py b/src/common/schedulers/slurm_commands.py index 38a0fc51..7691e8b4 100644 --- a/src/common/schedulers/slurm_commands.py +++ b/src/common/schedulers/slurm_commands.py @@ -63,7 +63,8 @@ SCONTROL_OUTPUT_AWK_PARSER = ( 'awk \'BEGIN{{RS="\\n\\n" ; ORS="######\\n";}} {{print}}\' | ' + "grep -oP '^(NodeName=\\S+)|(NodeAddr=\\S+)|(NodeHostName=\\S+)|(? + # comma-separated values are treated as a single literal string instead of being distributed across nodes. + # We have reported the bug. Once SchedMD fixes this, move instance_ids into the batched loop above. + if instance_ids: + node_list = list(nodes) if not isinstance(nodes, list) else nodes + for node_name, instance_id in zip(node_list, instance_ids): + validate_subprocess_argument(node_name) + validate_subprocess_argument(instance_id) + run_command( # nosec B604 + f"{SCONTROL} update nodename={node_name} instanceid={instance_id}", + raise_on_error=raise_on_error, + timeout=command_timeout, + shell=True, + ) + def update_partitions(partitions, state): succeeded_partitions = [] @@ -433,6 +452,7 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]: "SlurmdStartTime": "slurmdstarttime", "LastBusyTime": "lastbusytime", "ReservationName": "reservation_name", + "InstanceId": "instance_id", } date_fields = ["SlurmdStartTime", "LastBusyTime"] diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index c612dea1..723fe7ad 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -1145,15 +1145,14 @@ def _parse_scheduler_nodes_data(nodes): @staticmethod def _update_slurm_nodes_with_ec2_info(nodes, cluster_instances): + """Associate EC2 instances with Slurm nodes by matching on instance ID.""" if cluster_instances: - ip_to_slurm_node_map = {node.nodeaddr: node for node in nodes} + instance_id_to_slurm_node_map = {node.instance_id: node for node in nodes if node.instance_id} for instance in cluster_instances: - for private_ip in instance.all_private_ips: - if private_ip in ip_to_slurm_node_map: - slurm_node = ip_to_slurm_node_map.get(private_ip) - slurm_node.instance = instance - instance.slurm_node = slurm_node - break + if instance.id in instance_id_to_slurm_node_map: + slurm_node = instance_id_to_slurm_node_map[instance.id] + slurm_node.instance = instance + instance.slurm_node = slurm_node @staticmethod def get_instance_id_to_active_node_map(partitions: List[SlurmPartition]) -> Dict: diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index bd60ec57..1cdd3dc0 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -262,7 +262,8 @@ def get_cluster_instances(self, include_head_node=False, alive_states_only=True) """ Get instances that are associated with the cluster. - Instances without all the info set are ignored and not returned + Instances with missing EC2 info (e.g., PrivateIpAddress due to EC2 eventual consistency) are included + with empty IP fields to allow instance-ID-based matching in clustermgtd. """ ec2_client = boto3.client("ec2", region_name=self._region, config=self._boto3_config) paginator = ec2_client.get_paginator("describe_instances") @@ -290,11 +291,25 @@ def get_cluster_instances(self, include_head_node=False, alive_states_only=True) ) ) except Exception as e: + required_fields = {"PrivateIpAddress", "PrivateDnsName", "NetworkInterfaces"} + missing_fields = required_fields - set(instance_info.keys()) logger.warning( - "Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s", + "Instance %s missing some EC2 info, exception: %s, message: %s. " + "Missing top-level fields: %s. " + "Adding with instance ID only to allow fallback matching.", instance_info["InstanceId"], type(e).__name__, e, + missing_fields if missing_fields else "none", + ) + instances.append( + EC2Instance( + instance_info["InstanceId"], + "", + "", + set(), + instance_info.get("LaunchTime"), + ) ) return instances @@ -1077,6 +1092,7 @@ def _update_slurm_node_addrs(self, slurm_nodes: List[str], launched_instances: L slurm_nodes, nodeaddrs=[instance.private_ip for instance in launched_instances], nodehostnames=node_hostnames, + instance_ids=[instance.id for instance in launched_instances], ) logger.info( "Nodes are now configured with instances %s", diff --git a/src/slurm_plugin/slurm_resources.py b/src/slurm_plugin/slurm_resources.py index 65e5e1ff..bbf4a1c6 100644 --- a/src/slurm_plugin/slurm_resources.py +++ b/src/slurm_plugin/slurm_resources.py @@ -239,6 +239,7 @@ def __init__( slurmdstarttime: datetime = None, lastbusytime: datetime = None, reservation_name: str = None, + instance_id: str = None, ): """Initialize slurm node with attributes.""" self.name = name @@ -249,6 +250,7 @@ def __init__( self.partitions = partitions.strip().split(",") if partitions else None self.reason = reason self.instance = instance + self.instance_id = instance_id self.slurmdstarttime = slurmdstarttime self.lastbusytime = lastbusytime self.reservation_name = reservation_name diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index 28f809b0..12ace2ed 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -907,6 +907,7 @@ def get_unhealthy_cluster_instance_status( generate_error=False, ), [ + EC2Instance("i-1", "", "", set(), datetime(2020, 1, 1, tzinfo=timezone.utc)), EC2Instance("i-2", "ip-2", "hostname", {"ip-2"}, datetime(2020, 1, 1, tzinfo=timezone.utc)), ], False, From e5e09cd7736c1e5dde10a9abd47e6b35f0f404c7 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Mon, 23 Mar 2026 18:56:52 -0400 Subject: [PATCH 2/3] Add unit test verify instance id matching logic --- src/slurm_plugin/slurm_resources.py | 4 ++ .../common/schedulers/test_slurm_commands.py | 48 ++++++++++++++++++- tests/slurm_plugin/test_clustermgtd.py | 29 +++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/src/slurm_plugin/slurm_resources.py b/src/slurm_plugin/slurm_resources.py index bbf4a1c6..2be6b42d 100644 --- a/src/slurm_plugin/slurm_resources.py +++ b/src/slurm_plugin/slurm_resources.py @@ -531,6 +531,7 @@ def __init__( slurmdstarttime=None, lastbusytime=None, reservation_name=None, + instance_id=None, ): """Initialize slurm node with attributes.""" super().__init__( @@ -544,6 +545,7 @@ def __init__( slurmdstarttime, lastbusytime=lastbusytime, reservation_name=reservation_name, + instance_id=instance_id, ) def is_healthy( @@ -665,6 +667,7 @@ def __init__( slurmdstarttime=None, lastbusytime=None, reservation_name=None, + instance_id=None, ): """Initialize slurm node with attributes.""" super().__init__( @@ -678,6 +681,7 @@ def __init__( slurmdstarttime, lastbusytime=lastbusytime, reservation_name=reservation_name, + instance_id=instance_id, ) def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True): diff --git a/tests/common/schedulers/test_slurm_commands.py b/tests/common/schedulers/test_slurm_commands.py index 6b6fa0d8..eca2e526 100644 --- a/tests/common/schedulers/test_slurm_commands.py +++ b/tests/common/schedulers/test_slurm_commands.py @@ -245,6 +245,44 @@ def test_is_static_node(nodename, expected_is_static): ], True, ), + # Test case: InstanceId is parsed from scontrol show nodes output + ( + "NodeName=queue1-st-c5xlarge-1\n" + "NodeAddr=10.0.1.1\n" + "NodeHostName=queue1-st-c5xlarge-1\n" + "State=IDLE+CLOUD\n" + "Partitions=queue1\n" + "SlurmdStartTime=2023-01-23T17:57:07\n" + "InstanceId=i-0abc123def456\n" + "######\n" + "NodeName=queue1-dy-c5xlarge-2\n" + "NodeAddr=queue1-dy-c5xlarge-2\n" + "NodeHostName=queue1-dy-c5xlarge-2\n" + "State=IDLE+CLOUD+POWER\n" + "Partitions=queue1\n" + "SlurmdStartTime=None\n" + "######\n", + [ + StaticNode( + "queue1-st-c5xlarge-1", + "10.0.1.1", + "queue1-st-c5xlarge-1", + "IDLE+CLOUD", + "queue1", + slurmdstarttime=datetime(2023, 1, 23, 17, 57, 7).astimezone(tz=timezone.utc), + instance_id="i-0abc123def456", + ), + DynamicNode( + "queue1-dy-c5xlarge-2", + "queue1-dy-c5xlarge-2", + "queue1-dy-c5xlarge-2", + "IDLE+CLOUD+POWER", + "queue1", + slurmdstarttime=None, + ), + ], + False, + ), ], ) def test_parse_nodes_info(node_info, expected_parsed_nodes_output, invalid_name, caplog): @@ -613,10 +651,16 @@ def test_update_nodes(batch_node_info, state, reason, raise_on_error, run_comman mocker.patch("common.schedulers.slurm_commands._batch_node_info", return_value=batch_node_info, autospec=True) if expected_exception is ValueError: with pytest.raises(ValueError): - update_nodes(batch_node_info, "some_nodeaddrs", "some_hostnames", state, reason, raise_on_error) + update_nodes( + batch_node_info, "some_nodeaddrs", "some_hostnames", + state=state, reason=reason, raise_on_error=raise_on_error, + ) else: cmd_mock = mocker.patch("common.schedulers.slurm_commands.run_command", autospec=True) - update_nodes(batch_node_info, "some_nodeaddrs", "some_hostnames", state, reason, raise_on_error) + update_nodes( + batch_node_info, "some_nodeaddrs", "some_hostnames", + state=state, reason=reason, raise_on_error=raise_on_error, + ) cmd_mock.assert_has_calls(run_command_calls) diff --git a/tests/slurm_plugin/test_clustermgtd.py b/tests/slurm_plugin/test_clustermgtd.py index 72a9f8b0..a769cfce 100644 --- a/tests/slurm_plugin/test_clustermgtd.py +++ b/tests/slurm_plugin/test_clustermgtd.py @@ -1509,6 +1509,35 @@ def test_terminate_orphaned_instances( ) +def test_update_slurm_nodes_with_ec2_info_instance_id_matching(): + """Test that _update_slurm_nodes_with_ec2_info matches by instance ID instead of IP.""" + # Nodes with instance_id set (as would be after our change) + node1 = StaticNode("queue1-st-c5xlarge-1", "10.0.1.1", "queue1-st-c5xlarge-1", "IDLE+CLOUD", "queue1", + instance_id="i-aaa111") + node2 = DynamicNode("queue1-dy-c5xlarge-2", "10.0.1.2", "queue1-dy-c5xlarge-2", "IDLE+CLOUD", "queue1", + instance_id="i-bbb222") + # Node without instance_id (powered down, not yet assigned) + node3 = DynamicNode("queue1-dy-c5xlarge-3", "queue1-dy-c5xlarge-3", "queue1-dy-c5xlarge-3", + "IDLE+CLOUD+POWER", "queue1") + + # EC2 instances - one with full IP, one with missing IP (eventual consistency) + instance1 = EC2Instance("i-aaa111", "10.0.1.1", "hostname-1", {"10.0.1.1"}, "launch_time_1") + instance2 = EC2Instance("i-bbb222", "", "", set(), "launch_time_2") # missing IP + + nodes = [node1, node2, node3] + cluster_instances = [instance1, instance2] + + ClusterManager._update_slurm_nodes_with_ec2_info(nodes, cluster_instances) + + # Both instances should be matched by instance ID + assert_that(node1.instance).is_equal_to(instance1) + assert_that(instance1.slurm_node).is_equal_to(node1) + assert_that(node2.instance).is_equal_to(instance2) + assert_that(instance2.slurm_node).is_equal_to(node2) + # Node3 has no instance_id, should not be matched + assert_that(node3.instance).is_none() + + @pytest.mark.parametrize( "disable_cluster_management, disable_health_check, mock_cluster_instances, nodes, partitions, status, " "queue_compute_resource_nodes_map", From 56b0fb7b5c581d0504915b061d8ccc949b4eb980 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Thu, 26 Mar 2026 14:37:56 -0400 Subject: [PATCH 3/3] Fix tests for instance-ID-based node matching --- tests/slurm_plugin/test_clustermgtd.py | 12 ++++++++---- tests/slurm_plugin/test_instance_manager.py | 15 ++++++++++----- tests/slurm_plugin/test_resume.py | 4 ++-- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/tests/slurm_plugin/test_clustermgtd.py b/tests/slurm_plugin/test_clustermgtd.py index a769cfce..0cc2be0b 100644 --- a/tests/slurm_plugin/test_clustermgtd.py +++ b/tests/slurm_plugin/test_clustermgtd.py @@ -1761,18 +1761,18 @@ def test_manage_cluster( "default.conf", [ # This node fail scheduler state check and corresponding instance will be terminated and replaced - StaticNode("queue-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+DRAIN", "queue1"), + StaticNode("queue-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+DRAIN", "queue1", instance_id="i-1"), # This node fail scheduler state check and node will be power_down - DynamicNode("queue-dy-c5xlarge-2", "ip-2", "hostname", "DOWN+CLOUD", "queue1"), + DynamicNode("queue-dy-c5xlarge-2", "ip-2", "hostname", "DOWN+CLOUD", "queue1", instance_id="i-2"), # This node is good and should not be touched by clustermgtd - DynamicNode("queue-dy-c5xlarge-3", "ip-3", "hostname", "IDLE+CLOUD", "queue1"), + DynamicNode("queue-dy-c5xlarge-3", "ip-3", "hostname", "IDLE+CLOUD", "queue1", instance_id="i-3"), # This node is in power_saving state but still has running backing instance, it should be terminated DynamicNode("queue-dy-c5xlarge-6", "ip-6", "hostname", "IDLE+CLOUD+POWER", "queue1"), # This node is in powering_down but still has no valid backing instance, no boto3 call DynamicNode("queue-dy-c5xlarge-8", "ip-8", "hostname", "IDLE+CLOUD+POWERING_DOWN", "queue1"), ], [ - StaticNode("queue-st-c5xlarge-4", "ip-4", "hostname", "IDLE+CLOUD", "queue2"), + StaticNode("queue-st-c5xlarge-4", "ip-4", "hostname", "IDLE+CLOUD", "queue2", instance_id="i-4"), DynamicNode("queue-dy-c5xlarge-5", "ip-5", "hostname", "DOWN+CLOUD", "queue2"), ], [ @@ -1973,6 +1973,7 @@ def test_manage_cluster( "DOWN+CLOUD", "queue1", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-1", ), DynamicNode( "queue-dy-c5xlarge-2", @@ -1981,6 +1982,7 @@ def test_manage_cluster( "DOWN+CLOUD", "queue1", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-2", ), DynamicNode( "queue-dy-c5xlarge-3", @@ -1989,6 +1991,7 @@ def test_manage_cluster( "IDLE+CLOUD", "queue1", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-3", ), ], [ @@ -1999,6 +2002,7 @@ def test_manage_cluster( "IDLE+CLOUD", "queue2", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-4", ), DynamicNode( "queue-dy-c5xlarge-5", diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index 12ace2ed..d7ae8834 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -3107,7 +3107,7 @@ def test_assign_instances_to_nodes( [], False, None, - call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None), + call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None, instance_ids=[]), None, ), ( @@ -3115,7 +3115,7 @@ def test_assign_instances_to_nodes( [EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")], False, None, - call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None), + call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None, instance_ids=["id-1"]), None, ), ( @@ -3123,7 +3123,7 @@ def test_assign_instances_to_nodes( [EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")], True, None, - call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]), + call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"], instance_ids=["id-1"]), None, ), ( @@ -3131,7 +3131,7 @@ def test_assign_instances_to_nodes( [EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")], True, subprocess.CalledProcessError(1, "command"), - call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]), + call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"], instance_ids=["id-1"]), NodeAddrUpdateError(), ), ( @@ -3142,7 +3142,12 @@ def test_assign_instances_to_nodes( ], False, None, - call(["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], nodeaddrs=["ip-1", "ip-2"], nodehostnames=None), + call( + ["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], + nodeaddrs=["ip-1", "ip-2"], + nodehostnames=None, + instance_ids=["id-1", "id-2"], + ), None, ), ], diff --git a/tests/slurm_plugin/test_resume.py b/tests/slurm_plugin/test_resume.py index 5601e864..16b2c5cb 100644 --- a/tests/slurm_plugin/test_resume.py +++ b/tests/slurm_plugin/test_resume.py @@ -280,7 +280,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): "ServiceUnavailable": {"queue1-st-c5xlarge-2"}, "LimitedInstanceCapacity": {"queue1-dy-c5xlarge-2", "queue1-st-c5xlarge-1"}, }, - [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None)], + [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None, instance_ids=["i-11111"])], dict( zip( ["queue1-dy-c5xlarge-1"], @@ -332,7 +332,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): client_error("InsufficientReservedInstanceCapacity"), ], {"InsufficientReservedInstanceCapacity": {"queue1-st-c5xlarge-2"}}, - [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None)], + [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None, instance_ids=["i-11111"])], dict( zip( ["queue1-dy-c5xlarge-1"],