Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/common/schedulers/slurm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
SCONTROL_OUTPUT_AWK_PARSER = (
'awk \'BEGIN{{RS="\\n\\n" ; ORS="######\\n";}} {{print}}\' | '
+ "grep -oP '^(NodeName=\\S+)|(NodeAddr=\\S+)|(NodeHostName=\\S+)|(?<!Next)(State=\\S+)|"
+ "(Partitions=\\S+)|(SlurmdStartTime=\\S+)|(LastBusyTime=\\S+)|(ReservationName=\\S+)|(Reason=.*)|(######)'"
+ "(Partitions=\\S+)|(SlurmdStartTime=\\S+)|(LastBusyTime=\\S+)|(ReservationName=\\S+)"
+ "|(InstanceId=\\S+)|(Reason=.*)|(######)'"
)

# Set default timeouts for running different slurm commands.
Expand Down Expand Up @@ -129,6 +130,7 @@ def update_nodes(
nodes,
nodeaddrs=None,
nodehostnames=None,
instance_ids=None,
state=None,
reason=None,
raise_on_error=True,
Expand Down Expand Up @@ -174,6 +176,23 @@ def update_nodes(
f"{update_cmd} {node_info}", raise_on_error=raise_on_error, timeout=command_timeout, shell=True
)

# TODO: InstanceId should ideally be set in the same batched scontrol update command as NodeAddr
# (e.g., "scontrol update nodename=node-[1-100] nodeaddr=ip1,ip2,... instanceid=id1,id2,...").
# However, Slurm has a bug where InstanceId does not support per-node batch assignment ->
# comma-separated values are treated as a single literal string instead of being distributed across nodes.
# We have reported the bug. Once SchedMD fixes this, move instance_ids into the batched loop above.
if instance_ids:
node_list = list(nodes) if not isinstance(nodes, list) else nodes
for node_name, instance_id in zip(node_list, instance_ids):
validate_subprocess_argument(node_name)
validate_subprocess_argument(instance_id)
run_command( # nosec B604
f"{SCONTROL} update nodename={node_name} instanceid={instance_id}",
raise_on_error=raise_on_error,
timeout=command_timeout,
shell=True,
)


def update_partitions(partitions, state):
succeeded_partitions = []
Expand Down Expand Up @@ -433,6 +452,7 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]:
"SlurmdStartTime": "slurmdstarttime",
"LastBusyTime": "lastbusytime",
"ReservationName": "reservation_name",
"InstanceId": "instance_id",
}

date_fields = ["SlurmdStartTime", "LastBusyTime"]
Expand Down
13 changes: 6 additions & 7 deletions src/slurm_plugin/clustermgtd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,15 +1145,14 @@ def _parse_scheduler_nodes_data(nodes):

@staticmethod
def _update_slurm_nodes_with_ec2_info(nodes, cluster_instances):
"""Associate EC2 instances with Slurm nodes by matching on instance ID."""
if cluster_instances:
ip_to_slurm_node_map = {node.nodeaddr: node for node in nodes}
instance_id_to_slurm_node_map = {node.instance_id: node for node in nodes if node.instance_id}
for instance in cluster_instances:
for private_ip in instance.all_private_ips:
if private_ip in ip_to_slurm_node_map:
slurm_node = ip_to_slurm_node_map.get(private_ip)
slurm_node.instance = instance
instance.slurm_node = slurm_node
break
if instance.id in instance_id_to_slurm_node_map:
slurm_node = instance_id_to_slurm_node_map[instance.id]
slurm_node.instance = instance
instance.slurm_node = slurm_node

@staticmethod
def get_instance_id_to_active_node_map(partitions: List[SlurmPartition]) -> Dict:
Expand Down
20 changes: 18 additions & 2 deletions src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ def get_cluster_instances(self, include_head_node=False, alive_states_only=True)
"""
Get instances that are associated with the cluster.

Instances without all the info set are ignored and not returned
Instances with missing EC2 info (e.g., PrivateIpAddress due to EC2 eventual consistency) are included
with empty IP fields to allow instance-ID-based matching in clustermgtd.
"""
ec2_client = boto3.client("ec2", region_name=self._region, config=self._boto3_config)
paginator = ec2_client.get_paginator("describe_instances")
Expand Down Expand Up @@ -290,11 +291,25 @@ def get_cluster_instances(self, include_head_node=False, alive_states_only=True)
)
)
except Exception as e:
required_fields = {"PrivateIpAddress", "PrivateDnsName", "NetworkInterfaces"}
missing_fields = required_fields - set(instance_info.keys())
logger.warning(
"Ignoring instance %s because not all EC2 info are available, exception: %s, message: %s",
"Instance %s missing some EC2 info, exception: %s, message: %s. "
"Missing top-level fields: %s. "
"Adding with instance ID only to allow fallback matching.",
instance_info["InstanceId"],
type(e).__name__,
e,
missing_fields if missing_fields else "none",
)
instances.append(
EC2Instance(
instance_info["InstanceId"],
"",
"",
set(),
instance_info.get("LaunchTime"),
)
)

return instances
Expand Down Expand Up @@ -1077,6 +1092,7 @@ def _update_slurm_node_addrs(self, slurm_nodes: List[str], launched_instances: L
slurm_nodes,
nodeaddrs=[instance.private_ip for instance in launched_instances],
nodehostnames=node_hostnames,
instance_ids=[instance.id for instance in launched_instances],
)
logger.info(
"Nodes are now configured with instances %s",
Expand Down
6 changes: 6 additions & 0 deletions src/slurm_plugin/slurm_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def __init__(
slurmdstarttime: datetime = None,
lastbusytime: datetime = None,
reservation_name: str = None,
instance_id: str = None,
):
"""Initialize slurm node with attributes."""
self.name = name
Expand All @@ -249,6 +250,7 @@ def __init__(
self.partitions = partitions.strip().split(",") if partitions else None
self.reason = reason
self.instance = instance
self.instance_id = instance_id
self.slurmdstarttime = slurmdstarttime
self.lastbusytime = lastbusytime
self.reservation_name = reservation_name
Expand Down Expand Up @@ -529,6 +531,7 @@ def __init__(
slurmdstarttime=None,
lastbusytime=None,
reservation_name=None,
instance_id=None,
):
"""Initialize slurm node with attributes."""
super().__init__(
Expand All @@ -542,6 +545,7 @@ def __init__(
slurmdstarttime,
lastbusytime=lastbusytime,
reservation_name=reservation_name,
instance_id=instance_id,
)

def is_healthy(
Expand Down Expand Up @@ -663,6 +667,7 @@ def __init__(
slurmdstarttime=None,
lastbusytime=None,
reservation_name=None,
instance_id=None,
):
"""Initialize slurm node with attributes."""
super().__init__(
Expand All @@ -676,6 +681,7 @@ def __init__(
slurmdstarttime,
lastbusytime=lastbusytime,
reservation_name=reservation_name,
instance_id=instance_id,
)

def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
Expand Down
48 changes: 46 additions & 2 deletions tests/common/schedulers/test_slurm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,44 @@ def test_is_static_node(nodename, expected_is_static):
],
True,
),
# Test case: InstanceId is parsed from scontrol show nodes output
(
"NodeName=queue1-st-c5xlarge-1\n"
"NodeAddr=10.0.1.1\n"
"NodeHostName=queue1-st-c5xlarge-1\n"
"State=IDLE+CLOUD\n"
"Partitions=queue1\n"
"SlurmdStartTime=2023-01-23T17:57:07\n"
"InstanceId=i-0abc123def456\n"
"######\n"
"NodeName=queue1-dy-c5xlarge-2\n"
"NodeAddr=queue1-dy-c5xlarge-2\n"
"NodeHostName=queue1-dy-c5xlarge-2\n"
"State=IDLE+CLOUD+POWER\n"
"Partitions=queue1\n"
"SlurmdStartTime=None\n"
"######\n",
[
StaticNode(
"queue1-st-c5xlarge-1",
"10.0.1.1",
"queue1-st-c5xlarge-1",
"IDLE+CLOUD",
"queue1",
slurmdstarttime=datetime(2023, 1, 23, 17, 57, 7).astimezone(tz=timezone.utc),
instance_id="i-0abc123def456",
),
DynamicNode(
"queue1-dy-c5xlarge-2",
"queue1-dy-c5xlarge-2",
"queue1-dy-c5xlarge-2",
"IDLE+CLOUD+POWER",
"queue1",
slurmdstarttime=None,
),
],
False,
),
],
)
def test_parse_nodes_info(node_info, expected_parsed_nodes_output, invalid_name, caplog):
Expand Down Expand Up @@ -613,10 +651,16 @@ def test_update_nodes(batch_node_info, state, reason, raise_on_error, run_comman
mocker.patch("common.schedulers.slurm_commands._batch_node_info", return_value=batch_node_info, autospec=True)
if expected_exception is ValueError:
with pytest.raises(ValueError):
update_nodes(batch_node_info, "some_nodeaddrs", "some_hostnames", state, reason, raise_on_error)
update_nodes(
batch_node_info, "some_nodeaddrs", "some_hostnames",
state=state, reason=reason, raise_on_error=raise_on_error,
)
else:
cmd_mock = mocker.patch("common.schedulers.slurm_commands.run_command", autospec=True)
update_nodes(batch_node_info, "some_nodeaddrs", "some_hostnames", state, reason, raise_on_error)
update_nodes(
batch_node_info, "some_nodeaddrs", "some_hostnames",
state=state, reason=reason, raise_on_error=raise_on_error,
)
cmd_mock.assert_has_calls(run_command_calls)


Expand Down
41 changes: 37 additions & 4 deletions tests/slurm_plugin/test_clustermgtd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,35 @@ def test_terminate_orphaned_instances(
)


def test_update_slurm_nodes_with_ec2_info_instance_id_matching():
"""Test that _update_slurm_nodes_with_ec2_info matches by instance ID instead of IP."""
# Nodes with instance_id set (as would be after our change)
node1 = StaticNode("queue1-st-c5xlarge-1", "10.0.1.1", "queue1-st-c5xlarge-1", "IDLE+CLOUD", "queue1",
instance_id="i-aaa111")
node2 = DynamicNode("queue1-dy-c5xlarge-2", "10.0.1.2", "queue1-dy-c5xlarge-2", "IDLE+CLOUD", "queue1",
instance_id="i-bbb222")
# Node without instance_id (powered down, not yet assigned)
node3 = DynamicNode("queue1-dy-c5xlarge-3", "queue1-dy-c5xlarge-3", "queue1-dy-c5xlarge-3",
"IDLE+CLOUD+POWER", "queue1")

# EC2 instances - one with full IP, one with missing IP (eventual consistency)
instance1 = EC2Instance("i-aaa111", "10.0.1.1", "hostname-1", {"10.0.1.1"}, "launch_time_1")
instance2 = EC2Instance("i-bbb222", "", "", set(), "launch_time_2") # missing IP

nodes = [node1, node2, node3]
cluster_instances = [instance1, instance2]

ClusterManager._update_slurm_nodes_with_ec2_info(nodes, cluster_instances)

# Both instances should be matched by instance ID
assert_that(node1.instance).is_equal_to(instance1)
assert_that(instance1.slurm_node).is_equal_to(node1)
assert_that(node2.instance).is_equal_to(instance2)
assert_that(instance2.slurm_node).is_equal_to(node2)
# Node3 has no instance_id, should not be matched
assert_that(node3.instance).is_none()


@pytest.mark.parametrize(
"disable_cluster_management, disable_health_check, mock_cluster_instances, nodes, partitions, status, "
"queue_compute_resource_nodes_map",
Expand Down Expand Up @@ -1732,18 +1761,18 @@ def test_manage_cluster(
"default.conf",
[
# This node fail scheduler state check and corresponding instance will be terminated and replaced
StaticNode("queue-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+DRAIN", "queue1"),
StaticNode("queue-st-c5xlarge-1", "ip-1", "hostname", "IDLE+CLOUD+DRAIN", "queue1", instance_id="i-1"),
# This node fail scheduler state check and node will be power_down
DynamicNode("queue-dy-c5xlarge-2", "ip-2", "hostname", "DOWN+CLOUD", "queue1"),
DynamicNode("queue-dy-c5xlarge-2", "ip-2", "hostname", "DOWN+CLOUD", "queue1", instance_id="i-2"),
# This node is good and should not be touched by clustermgtd
DynamicNode("queue-dy-c5xlarge-3", "ip-3", "hostname", "IDLE+CLOUD", "queue1"),
DynamicNode("queue-dy-c5xlarge-3", "ip-3", "hostname", "IDLE+CLOUD", "queue1", instance_id="i-3"),
# This node is in power_saving state but still has running backing instance, it should be terminated
DynamicNode("queue-dy-c5xlarge-6", "ip-6", "hostname", "IDLE+CLOUD+POWER", "queue1"),
# This node is in powering_down but still has no valid backing instance, no boto3 call
DynamicNode("queue-dy-c5xlarge-8", "ip-8", "hostname", "IDLE+CLOUD+POWERING_DOWN", "queue1"),
],
[
StaticNode("queue-st-c5xlarge-4", "ip-4", "hostname", "IDLE+CLOUD", "queue2"),
StaticNode("queue-st-c5xlarge-4", "ip-4", "hostname", "IDLE+CLOUD", "queue2", instance_id="i-4"),
DynamicNode("queue-dy-c5xlarge-5", "ip-5", "hostname", "DOWN+CLOUD", "queue2"),
],
[
Expand Down Expand Up @@ -1944,6 +1973,7 @@ def test_manage_cluster(
"DOWN+CLOUD",
"queue1",
slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc),
instance_id="i-1",
),
DynamicNode(
"queue-dy-c5xlarge-2",
Expand All @@ -1952,6 +1982,7 @@ def test_manage_cluster(
"DOWN+CLOUD",
"queue1",
slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc),
instance_id="i-2",
),
DynamicNode(
"queue-dy-c5xlarge-3",
Expand All @@ -1960,6 +1991,7 @@ def test_manage_cluster(
"IDLE+CLOUD",
"queue1",
slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc),
instance_id="i-3",
),
],
[
Expand All @@ -1970,6 +2002,7 @@ def test_manage_cluster(
"IDLE+CLOUD",
"queue2",
slurmdstarttime=datetime(2020, 1, 1, tzinfo=timezone.utc),
instance_id="i-4",
),
DynamicNode(
"queue-dy-c5xlarge-5",
Expand Down
16 changes: 11 additions & 5 deletions tests/slurm_plugin/test_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ def get_unhealthy_cluster_instance_status(
generate_error=False,
),
[
EC2Instance("i-1", "", "", set(), datetime(2020, 1, 1, tzinfo=timezone.utc)),
EC2Instance("i-2", "ip-2", "hostname", {"ip-2"}, datetime(2020, 1, 1, tzinfo=timezone.utc)),
],
False,
Expand Down Expand Up @@ -3106,31 +3107,31 @@ def test_assign_instances_to_nodes(
[],
False,
None,
call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None),
call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None, instance_ids=[]),
None,
),
(
["queue1-st-c5xlarge-1"],
[EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")],
False,
None,
call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None),
call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None, instance_ids=["id-1"]),
None,
),
(
["queue1-st-c5xlarge-1"],
[EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")],
True,
None,
call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]),
call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"], instance_ids=["id-1"]),
None,
),
(
["queue1-st-c5xlarge-1"],
[EC2Instance("id-1", "ip-1", "hostname-1", {"ip-1"}, "some_launch_time")],
True,
subprocess.CalledProcessError(1, "command"),
call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]),
call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"], instance_ids=["id-1"]),
NodeAddrUpdateError(),
),
(
Expand All @@ -3141,7 +3142,12 @@ def test_assign_instances_to_nodes(
],
False,
None,
call(["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], nodeaddrs=["ip-1", "ip-2"], nodehostnames=None),
call(
["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"],
nodeaddrs=["ip-1", "ip-2"],
nodehostnames=None,
instance_ids=["id-1", "id-2"],
),
None,
),
],
Expand Down
4 changes: 2 additions & 2 deletions tests/slurm_plugin/test_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker):
"ServiceUnavailable": {"queue1-st-c5xlarge-2"},
"LimitedInstanceCapacity": {"queue1-dy-c5xlarge-2", "queue1-st-c5xlarge-1"},
},
[call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None)],
[call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None, instance_ids=["i-11111"])],
dict(
zip(
["queue1-dy-c5xlarge-1"],
Expand Down Expand Up @@ -332,7 +332,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker):
client_error("InsufficientReservedInstanceCapacity"),
],
{"InsufficientReservedInstanceCapacity": {"queue1-st-c5xlarge-2"}},
[call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None)],
[call(["queue1-dy-c5xlarge-1"], nodeaddrs=["ip.1.0.0.1"], nodehostnames=None, instance_ids=["i-11111"])],
dict(
zip(
["queue1-dy-c5xlarge-1"],
Expand Down
Loading