diff --git a/src/common/schedulers/slurm_commands.py b/src/common/schedulers/slurm_commands.py index 38a0fc51..7691e8b4 100644 --- a/src/common/schedulers/slurm_commands.py +++ b/src/common/schedulers/slurm_commands.py @@ -63,7 +63,8 @@ SCONTROL_OUTPUT_AWK_PARSER = ( 'awk \'BEGIN{{RS="\\n\\n" ; ORS="######\\n";}} {{print}}\' | ' + "grep -oP '^(NodeName=\\S+)|(NodeAddr=\\S+)|(NodeHostName=\\S+)|(? + # comma-separated values are treated as a single literal string instead of being distributed across nodes. + # We have reported the bug. Once SchedMD fixes this, move instance_ids into the batched loop above. + if instance_ids: + node_list = list(nodes) if not isinstance(nodes, list) else nodes + for node_name, instance_id in zip(node_list, instance_ids): + validate_subprocess_argument(node_name) + validate_subprocess_argument(instance_id) + run_command( # nosec B604 + f"{SCONTROL} update nodename={node_name} instanceid={instance_id}", + raise_on_error=raise_on_error, + timeout=command_timeout, + shell=True, + ) + def update_partitions(partitions, state): succeeded_partitions = [] @@ -433,6 +452,7 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]: "SlurmdStartTime": "slurmdstarttime", "LastBusyTime": "lastbusytime", "ReservationName": "reservation_name", + "InstanceId": "instance_id", } date_fields = ["SlurmdStartTime", "LastBusyTime"] diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index c612dea1..723fe7ad 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -1145,15 +1145,14 @@ def _parse_scheduler_nodes_data(nodes): @staticmethod def _update_slurm_nodes_with_ec2_info(nodes, cluster_instances): + """Associate EC2 instances with Slurm nodes by matching on instance ID.""" if cluster_instances: - ip_to_slurm_node_map = {node.nodeaddr: node for node in nodes} + instance_id_to_slurm_node_map = {node.instance_id: node for node in nodes if node.instance_id} for instance in cluster_instances: - for private_ip in instance.all_private_ips: - if private_ip in ip_to_slurm_node_map: - slurm_node = ip_to_slurm_node_map.get(private_ip) - slurm_node.instance = instance - instance.slurm_node = slurm_node - break + if instance.id in instance_id_to_slurm_node_map: + slurm_node = instance_id_to_slurm_node_map[instance.id] + slurm_node.instance = instance + instance.slurm_node = slurm_node @staticmethod def get_instance_id_to_active_node_map(partitions: List[SlurmPartition]) -> Dict: diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index bd60ec57..1cdd3dc0 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -262,7 +262,8 @@ def get_cluster_instances(self, include_head_node=False, alive_states_only=True) """ Get instances that are associated with the cluster. - Instances without all the info set are ignored and not returned + Instances with missing EC2 info (e.g., PrivateIpAddress due to EC2 eventual consistency) are included + with empty IP fields to allow instance-ID-based matching in clustermgtd. """ ec2_client = boto3.client("ec2", region_name=self._region, config=self._boto3_config) paginator = ec2_client.get_paginator("describe_instances") @@ -290,11 +291,25 @@ def get_cluster_instances(self, include_head_node=False, alive_states_only=True) ) ) except Exception as e: + required_fields = {"PrivateIpAddress", "PrivateDnsName", "NetworkInterfaces"} + missing_fields = required_fields - set(instance_info.keys()) logger.warning( - "Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s", + "Instance %s missing some EC2 info, exception: %s, message: %s. " + "Missing top-level fields: %s. " + "Adding with instance ID only to allow fallback matching.", instance_info["InstanceId"], type(e).__name__, e, + missing_fields if missing_fields else "none", + ) + instances.append( + EC2Instance( + instance_info["InstanceId"], + "", + "", + set(), + instance_info.get("LaunchTime"), + ) ) return instances @@ -1077,6 +1092,7 @@ def _update_slurm_node_addrs(self, slurm_nodes: List[str], launched_instances: L slurm_nodes, nodeaddrs=[instance.private_ip for instance in launched_instances], nodehostnames=node_hostnames, + instance_ids=[instance.id for instance in launched_instances], ) logger.info( "Nodes are now configured with instances %s", diff --git a/src/slurm_plugin/slurm_resources.py b/src/slurm_plugin/slurm_resources.py index 65e5e1ff..2be6b42d 100644 --- a/src/slurm_plugin/slurm_resources.py +++ b/src/slurm_plugin/slurm_resources.py @@ -239,6 +239,7 @@ def __init__( slurmdstarttime: datetime = None, lastbusytime: datetime = None, reservation_name: str = None, + instance_id: str = None, ): """Initialize slurm node with attributes.""" self.name = name @@ -249,6 +250,7 @@ def __init__( self.partitions = partitions.strip().split(",") if partitions else None self.reason = reason self.instance = instance + self.instance_id = instance_id self.slurmdstarttime = slurmdstarttime self.lastbusytime = lastbusytime self.reservation_name = reservation_name @@ -529,6 +531,7 @@ def __init__( slurmdstarttime=None, lastbusytime=None, reservation_name=None, + instance_id=None, ): """Initialize slurm node with attributes.""" super().__init__( @@ -542,6 +545,7 @@ def __init__( slurmdstarttime, lastbusytime=lastbusytime, reservation_name=reservation_name, + instance_id=instance_id, ) def is_healthy( @@ -663,6 +667,7 @@ def __init__( slurmdstarttime=None, lastbusytime=None, reservation_name=None, + instance_id=None, ): """Initialize slurm node with attributes.""" super().__init__( @@ -676,6 +681,7 @@ def __init__( slurmdstarttime, lastbusytime=lastbusytime, reservation_name=reservation_name, + instance_id=instance_id, ) def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True): diff --git a/tests/common/schedulers/test_slurm_commands.py b/tests/common/schedulers/test_slurm_commands.py index 6b6fa0d8..eca2e526 100644 --- a/tests/common/schedulers/test_slurm_commands.py +++ b/tests/common/schedulers/test_slurm_commands.py @@ -245,6 +245,44 @@ def test_is_static_node(nodename, expected_is_static): ], True, ), + # Test case: InstanceId is parsed from scontrol show nodes output + ( + "NodeName=queue1-st-c5xlarge-1\n" + "NodeAddr=10.0.1.1\n" + "NodeHostName=queue1-st-c5xlarge-1\n" + "State=IDLE+CLOUD\n" + "Partitions=queue1\n" + "SlurmdStartTime=2023-01-23T17:57:07\n" + "InstanceId=i-0abc123def456\n" + "######\n" + "NodeName=queue1-dy-c5xlarge-2\n" + "NodeAddr=queue1-dy-c5xlarge-2\n" + "NodeHostName=queue1-dy-c5xlarge-2\n" + "State=IDLE+CLOUD+POWER\n" + "Partitions=queue1\n" + "SlurmdStartTime=None\n" + "######\n", + [ + StaticNode( + "queue1-st-c5xlarge-1", + "10.0.1.1", + "queue1-st-c5xlarge-1", + "IDLE+CLOUD", + "queue1", + slurmdstarttime=datetime(2023, 1, 23, 17, 57, 7).astimezone(tz=timezone.utc), + instance_id="i-0abc123def456", + ), + DynamicNode( + "queue1-dy-c5xlarge-2", + "queue1-dy-c5xlarge-2", + "queue1-dy-c5xlarge-2", + "IDLE+CLOUD+POWER", + "queue1", + slurmdstarttime=None, + ), + ], + False, + ), ], ) def test_parse_nodes_info(node_info, expected_parsed_nodes_output, invalid_name, caplog): @@ -613,10 +651,16 @@ def test_update_nodes(batch_node_info, state, reason, raise_on_error, run_comman mocker.patch("common.schedulers.slurm_commands._batch_node_info", return_value=batch_node_info, autospec=True) if expected_exception is ValueError: with pytest.raises(ValueError): - update_nodes(batch_node_info, "some_nodeaddrs", "some_hostnames", state, reason, raise_on_error) + update_nodes( + batch_node_info, "some_nodeaddrs", "some_hostnames", + state=state, reason=reason, raise_on_error=raise_on_error, + ) else: cmd_mock = mocker.patch("common.schedulers.slurm_commands.run_command", autospec=True) - update_nodes(batch_node_info, "some_nodeaddrs", "some_hostnames", state, reason, raise_on_error) + update_nodes( + batch_node_info, "some_nodeaddrs", "some_hostnames", + state=state, reason=reason, raise_on_error=raise_on_error, + ) cmd_mock.assert_has_calls(run_command_calls) diff --git a/tests/slurm_plugin/test_clustermgtd.py b/tests/slurm_plugin/test_clustermgtd.py index 72a9f8b0..0cc2be0b 100644 --- a/tests/slurm_plugin/test_clustermgtd.py +++ b/tests/slurm_plugin/test_clustermgtd.py @@ -1509,6 +1509,35 @@ def test_terminate_orphaned_instances( ) +def test_update_slurm_nodes_with_ec2_info_instance_id_matching(): + """Test that _update_slurm_nodes_with_ec2_info matches by instance ID instead of IP.""" + # Nodes with instance_id set (as would be after our change) + node1 = StaticNode("queue1-st-c5xlarge-1", "10.0.1.1", "queue1-st-c5xlarge-1", "IDLE+CLOUD", "queue1", + instance_id="i-aaa111") + node2 = DynamicNode("queue1-dy-c5xlarge-2", "10.0.1.2", "queue1-dy-c5xlarge-2", "IDLE+CLOUD", "queue1", + instance_id="i-bbb222") + # Node without instance_id (powered down, not yet assigned) + node3 = DynamicNode("queue1-dy-c5xlarge-3", "queue1-dy-c5xlarge-3", "queue1-dy-c5xlarge-3", + "IDLE+CLOUD+POWER", "queue1") + + # EC2 instances - one with full IP, one with missing IP (eventual consistency) + instance1 = EC2Instance("i-aaa111", "10.0.1.1", "hostname-1", {"10.0.1.1"}, "launch_time_1") + instance2 = EC2Instance("i-bbb222", "", "", set(), "launch_time_2") # missing IP + + nodes = [node1, node2, node3] + cluster_instances = [instance1, instance2] + + ClusterManager._update_slurm_nodes_with_ec2_info(nodes, cluster_instances) + + # Both instances should be matched by instance ID + assert_that(node1.instance).is_equal_to(instance1) + assert_that(instance1.slurm_node).is_equal_to(node1) + assert_that(node2.instance).is_equal_to(instance2) + assert_that(instance2.slurm_node).is_equal_to(node2) + # Node3 has no instance_id, should not be matched + assert_that(node3.instance).is_none() + + @pytest.mark.parametrize( "disable_cluster_management, disable_health_check, mock_cluster_instances, nodes, partitions, status, " "queue_compute_resource_nodes_map", @@ -1732,18 +1761,18 @@ def test_manage_cluster( "default.conf", [ # This node fail scheduler state check and corresponding instance will be terminated and replaced - StaticNode("queue-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+DRAIN", "queue1"), + StaticNode("queue-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+DRAIN", "queue1", instance_id="i-1"), # This node fail scheduler state check and node will be power_down - DynamicNode("queue-dy-c5xlarge-2", "ip-2", "hostname", "DOWN+CLOUD", "queue1"), + DynamicNode("queue-dy-c5xlarge-2", "ip-2", "hostname", "DOWN+CLOUD", "queue1", instance_id="i-2"), # This node is good and should not be touched by clustermgtd - DynamicNode("queue-dy-c5xlarge-3", "ip-3", "hostname", "IDLE+CLOUD", "queue1"), + DynamicNode("queue-dy-c5xlarge-3", "ip-3", "hostname", "IDLE+CLOUD", "queue1", instance_id="i-3"), # This node is in power_saving state but still has running backing instance, it should be terminated DynamicNode("queue-dy-c5xlarge-6", "ip-6", "hostname", "IDLE+CLOUD+POWER", "queue1"), # This node is in powering_down but still has no valid backing instance, no boto3 call DynamicNode("queue-dy-c5xlarge-8", "ip-8", "hostname", "IDLE+CLOUD+POWERING_DOWN", "queue1"), ], [ - StaticNode("queue-st-c5xlarge-4", "ip-4", "hostname", "IDLE+CLOUD", "queue2"), + StaticNode("queue-st-c5xlarge-4", "ip-4", "hostname", "IDLE+CLOUD", "queue2", instance_id="i-4"), DynamicNode("queue-dy-c5xlarge-5", "ip-5", "hostname", "DOWN+CLOUD", "queue2"), ], [ @@ -1944,6 +1973,7 @@ def test_manage_cluster( "DOWN+CLOUD", "queue1", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-1", ), DynamicNode( "queue-dy-c5xlarge-2", @@ -1952,6 +1982,7 @@ def test_manage_cluster( "DOWN+CLOUD", "queue1", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-2", ), DynamicNode( "queue-dy-c5xlarge-3", @@ -1960,6 +1991,7 @@ def test_manage_cluster( "IDLE+CLOUD", "queue1", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-3", ), ], [ @@ -1970,6 +2002,7 @@ def test_manage_cluster( "IDLE+CLOUD", "queue2", slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc), + instance_id="i-4", ), DynamicNode( "queue-dy-c5xlarge-5", diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index 28f809b0..d7ae8834 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -907,6 +907,7 @@ def get_unhealthy_cluster_instance_status( generate_error=False, ), [ + EC2Instance("i-1", "", "", set(), datetime(2020, 1, 1, tzinfo=timezone.utc)), EC2Instance("i-2", "ip-2", "hostname", {"ip-2"}, datetime(2020, 1, 1, tzinfo=timezone.utc)), ], False, @@ -3106,7 +3107,7 @@ def test_assign_instances_to_nodes( [], False, None, - call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None), + call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None, instance_ids=[]), None, ), ( @@ -3114,7 +3115,7 @@ def test_assign_instances_to_nodes( [EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")], False, None, - call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None), + call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None, instance_ids=["id-1"]), None, ), ( @@ -3122,7 +3123,7 @@ def test_assign_instances_to_nodes( [EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")], True, None, - call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]), + call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"], instance_ids=["id-1"]), None, ), ( @@ -3130,7 +3131,7 @@ def test_assign_instances_to_nodes( [EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")], True, subprocess.CalledProcessError(1, "command"), - call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]), + call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"], instance_ids=["id-1"]), NodeAddrUpdateError(), ), ( @@ -3141,7 +3142,12 @@ def test_assign_instances_to_nodes( ], False, None, - call(["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], nodeaddrs=["ip-1", "ip-2"], nodehostnames=None), + call( + ["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], + nodeaddrs=["ip-1", "ip-2"], + nodehostnames=None, + instance_ids=["id-1", "id-2"], + ), None, ), ], diff --git a/tests/slurm_plugin/test_resume.py b/tests/slurm_plugin/test_resume.py index 5601e864..16b2c5cb 100644 --- a/tests/slurm_plugin/test_resume.py +++ b/tests/slurm_plugin/test_resume.py @@ -280,7 +280,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): "ServiceUnavailable": {"queue1-st-c5xlarge-2"}, "LimitedInstanceCapacity": {"queue1-dy-c5xlarge-2", "queue1-st-c5xlarge-1"}, }, - [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None)], + [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None, instance_ids=["i-11111"])], dict( zip( ["queue1-dy-c5xlarge-1"], @@ -332,7 +332,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): client_error("InsufficientReservedInstanceCapacity"), ], {"InsufficientReservedInstanceCapacity": {"queue1-st-c5xlarge-2"}}, - [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None)], + [call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None, instance_ids=["i-11111"])], dict( zip( ["queue1-dy-c5xlarge-1"],