diff --git a/gpoa/frontend/appliers/systemd.py b/gpoa/frontend/appliers/systemd.py index 3659ebd1..fe5a804c 100644 --- a/gpoa/frontend/appliers/systemd.py +++ b/gpoa/frontend/appliers/systemd.py @@ -16,32 +16,306 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import dbus +import re +import subprocess from util.logging import log -class systemd_unit: - def __init__(self, unit_name, state): - self.system_bus = dbus.SystemBus() - self.systemd_dbus = self.system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1') - self.manager = dbus.Interface(self.systemd_dbus, 'org.freedesktop.systemd1.Manager') +SYSTEMD_BUS_NAME = 'org.freedesktop.systemd1' +SYSTEMD_OBJECT_PATH = '/org/freedesktop/systemd1' +SYSTEMD_MANAGER_IFACE = 'org.freedesktop.systemd1.Manager' +SYSTEMD_UNIT_IFACE = 'org.freedesktop.systemd1.Unit' +DBUS_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' +NO_SUCH_UNIT_ERRORS = { + 'org.freedesktop.systemd1.NoSuchUnit', + 'org.freedesktop.systemd1.LoadFailed', +} +UNIT_NAME_RE = re.compile( + r'^[A-Za-z0-9:_.@-]{1,255}\.(service|socket|timer|path|mount|automount|swap|target|device|slice|scope)$' +) + + +class SystemdManagerError(Exception): + def __init__(self, message, action=None, unit=None, dbus_name=None): + super().__init__(message) + self.action = action + self.unit = unit + self.dbus_name = dbus_name + + +def is_valid_unit_name(unit_name): + if not isinstance(unit_name, str): + return False + if not unit_name: + return False + if any(ord(ch) < 32 or ord(ch) == 127 for ch in unit_name): + return False + if not UNIT_NAME_RE.match(unit_name): + return False + name_part = unit_name.rsplit('.', 1)[0] + if name_part.startswith('-') or name_part.endswith('-'): + return False + return True + + +def _import_dbus(): + import dbus + return dbus + + +class SystemdManager: + def __init__(self, mode='machine'): + self.mode = mode + self.dbus = None + self.bus = None + self.systemd = None + self.manager = None + + if mode == 'global_user': + return + + self.dbus = _import_dbus() + if mode == 'user': + self.bus = self.dbus.SessionBus() + else: + self.bus = self.dbus.SystemBus() + + self.systemd = self.bus.get_object(SYSTEMD_BUS_NAME, SYSTEMD_OBJECT_PATH) + self.manager = self.dbus.Interface(self.systemd, SYSTEMD_MANAGER_IFACE) + + def _fail(self, action, exc, unit=None): + dbus_name = None + if hasattr(exc, 'get_dbus_name'): + dbus_name = exc.get_dbus_name() + raise SystemdManagerError(str(exc), action=action, unit=unit, dbus_name=dbus_name) + + def _run_global(self, args): + return subprocess.run( + ['systemctl', '--global'] + list(args), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=False, + ) + + def _global_output(self, result): + output = '{}\n{}'.format(result.stdout or '', result.stderr or '').strip() + return output or 'systemctl --global command failed' + + def _global_not_found(self, result): + output = self._global_output(result).lower() + return 'no files found' in output or 'not-found' in output or 'not found' in output + + def _fail_global(self, action, result, unit=None): + raise SystemdManagerError(self._global_output(result), action=action, unit=unit) + + def _unsupported_global_action(self, action, unit=None): + log('W48', { + 'reason': 'systemctl --global does not support runtime action {}'.format(action), + 'action': action, + 'unit': unit, + }) + return + + def _load_unit(self, unit_name): + try: + return self.manager.LoadUnit(self.dbus.String(unit_name)) + except self.dbus.DBusException as exc: + self._fail('load_unit', exc, unit=unit_name) + + def exists(self, unit_name): + if not is_valid_unit_name(unit_name): + return False + + if self.mode == 'global_user': + result = self._run_global(['cat', unit_name]) + if result.returncode == 0: + return True + if self._global_not_found(result): + return False + self._fail_global('exists', result, unit=unit_name) + + try: + unit_path = self.manager.LoadUnit(self.dbus.String(unit_name)) + proxy = self.bus.get_object(SYSTEMD_BUS_NAME, str(unit_path)) + properties = self.dbus.Interface(proxy, dbus_interface=DBUS_PROPERTIES_IFACE) + load_state = str(properties.Get(SYSTEMD_UNIT_IFACE, 'LoadState')) + return load_state != 'not-found' + except self.dbus.DBusException as exc: + if exc.get_dbus_name() in NO_SUCH_UNIT_ERRORS: + return False + self._fail('exists', exc, unit=unit_name) + + def _unit_properties(self, unit_name): + unit_path = self._load_unit(unit_name) + proxy = self.bus.get_object(SYSTEMD_BUS_NAME, str(unit_path)) + return self.dbus.Interface(proxy, dbus_interface=DBUS_PROPERTIES_IFACE) + + def active_state(self, unit_name): + if not is_valid_unit_name(unit_name): + return None + if self.mode == 'global_user': + return None + try: + properties = self._unit_properties(unit_name) + return str(properties.Get(SYSTEMD_UNIT_IFACE, 'ActiveState')) + except self.dbus.DBusException as exc: + self._fail('active_state', exc, unit=unit_name) + + def reload(self): + if self.mode == 'global_user': + return + try: + self.manager.Reload() + except self.dbus.DBusException as exc: + self._fail('reload', exc) + + def start(self, unit_name): + if self.mode == 'global_user': + self._unsupported_global_action('start', unit=unit_name) + return + try: + self.manager.StartUnit(unit_name, 'replace') + except self.dbus.DBusException as exc: + self._fail('start', exc, unit=unit_name) + + def stop(self, unit_name): + if self.mode == 'global_user': + self._unsupported_global_action('stop', unit=unit_name) + return + try: + self.manager.StopUnit(unit_name, 'replace') + except self.dbus.DBusException as exc: + self._fail('stop', exc, unit=unit_name) + + def restart(self, unit_name): + if self.mode == 'global_user': + self._unsupported_global_action('restart', unit=unit_name) + return + try: + self.manager.RestartUnit(unit_name, 'replace') + except self.dbus.DBusException as exc: + self._fail('restart', exc, unit=unit_name) + + def enable(self, unit_name): + if self.mode == 'global_user': + result = self._run_global(['enable', unit_name]) + if result.returncode != 0: + self._fail_global('enable', result, unit=unit_name) + return + try: + self.manager.EnableUnitFiles([unit_name], self.dbus.Boolean(False), self.dbus.Boolean(True)) + except self.dbus.DBusException as exc: + self._fail('enable', exc, unit=unit_name) + + def disable(self, unit_name): + if self.mode == 'global_user': + result = self._run_global(['disable', unit_name]) + if result.returncode != 0: + self._fail_global('disable', result, unit=unit_name) + return + try: + self.manager.DisableUnitFiles([unit_name], self.dbus.Boolean(False)) + except self.dbus.DBusException as exc: + self._fail('disable', exc, unit=unit_name) + + def mask(self, unit_name): + if self.mode == 'global_user': + result = self._run_global(['mask', unit_name]) + if result.returncode != 0: + self._fail_global('mask', result, unit=unit_name) + return + try: + self.manager.MaskUnitFiles([unit_name], self.dbus.Boolean(False), self.dbus.Boolean(True)) + except self.dbus.DBusException as exc: + self._fail('mask', exc, unit=unit_name) + + def unmask(self, unit_name): + if self.mode == 'global_user': + result = self._run_global(['unmask', unit_name]) + if result.returncode != 0: + self._fail_global('unmask', result, unit=unit_name) + return + try: + self.manager.UnmaskUnitFiles([unit_name], self.dbus.Boolean(False)) + except self.dbus.DBusException as exc: + self._fail('unmask', exc, unit=unit_name) + + def preset(self, unit_name): + if self.mode == 'global_user': + result = self._run_global(['preset', unit_name]) + if result.returncode != 0: + self._fail_global('preset', result, unit=unit_name) + return + try: + self.manager.PresetUnitFiles([unit_name], self.dbus.Boolean(False), self.dbus.Boolean(True)) + except self.dbus.DBusException as exc: + self._fail('preset', exc, unit=unit_name) + + def get_unit_file_state(self, unit_name): + if self.mode == 'global_user': + result = self._run_global(['is-enabled', unit_name]) + state = (result.stdout or result.stderr or '').strip() + if state: + return state.splitlines()[-1].strip() + self._fail_global('get_unit_file_state', result, unit=unit_name) + try: + return str(self.manager.GetUnitFileState(self.dbus.String(unit_name))) + except self.dbus.DBusException as exc: + self._fail('get_unit_file_state', exc, unit=unit_name) + + def apply_state(self, unit_name, state, now): + if state == 'as_is': + return + if state == 'enable': + self.unmask(unit_name) + self.enable(unit_name) + if now and self.mode != 'global_user': + self.start(unit_name) + return + if state == 'disable': + if now and self.mode != 'global_user': + self.stop(unit_name) + self.disable(unit_name) + return + if state == 'mask': + if now and self.mode != 'global_user': + self.stop(unit_name) + self.mask(unit_name) + return + if state == 'unmask': + self.unmask(unit_name) + if now and self.mode != 'global_user': + self.start(unit_name) + return + if state == 'preset': + self.preset(unit_name) + if now and self.mode != 'global_user': + self.start(unit_name) + return + raise ValueError('Unsupported state: {}'.format(state)) + + +class systemd_unit: + def __init__(self, unit_name, state, manager=None): + if not is_valid_unit_name(unit_name): + raise ValueError('Invalid unit name: {}'.format(unit_name)) self.unit_name = unit_name - self.desired_state = state - self.unit = self.manager.LoadUnit(dbus.String(self.unit_name)) - self.unit_proxy = self.system_bus.get_object('org.freedesktop.systemd1', str(self.unit)) - self.unit_interface = dbus.Interface(self.unit_proxy, dbus_interface='org.freedesktop.systemd1.Unit') - self.unit_properties = dbus.Interface(self.unit_proxy, dbus_interface='org.freedesktop.DBus.Properties') + self.desired_state = int(state) + if self.desired_state not in (0, 1): + raise ValueError('Invalid desired state for {}: {}'.format(unit_name, state)) + self.manager = manager if manager is not None else SystemdManager(mode='machine') def apply(self): logdata = {'unit': self.unit_name} if self.desired_state == 1: - self.manager.UnmaskUnitFiles([self.unit_name], dbus.Boolean(False)) - self.manager.EnableUnitFiles([self.unit_name], dbus.Boolean(False), dbus.Boolean(True)) + self.manager.unmask(self.unit_name) + self.manager.enable(self.unit_name) if self.unit_name == 'gpupdate.service': - if self.manager.GetUnitFileState(dbus.String(self.unit_name)) == 'enabled': + if self.manager.get_unit_file_state(self.unit_name) == 'enabled': return - self.manager.StartUnit(self.unit_name, 'replace') + self.manager.start(self.unit_name) log('I6', logdata) # In case the service has 'RestartSec' property set it @@ -51,14 +325,16 @@ def apply(self): if service_state not in ('active', 'activating'): service_timer_name = self.unit_name.replace(".service", ".timer") - self.unit = self.manager.LoadUnit(dbus.String(service_timer_name)) - service_state = self._get_state() - if service_state not in ('active', 'activating'): + if not is_valid_unit_name(service_timer_name) or not self.manager.exists(service_timer_name): + log('E46', logdata) + return + service_state = self.manager.active_state(service_timer_name) + if str(service_state) not in ('active', 'activating'): log('E46', logdata) else: - self.manager.StopUnit(self.unit_name, 'replace') - self.manager.DisableUnitFiles([self.unit_name], dbus.Boolean(False)) - self.manager.MaskUnitFiles([self.unit_name], dbus.Boolean(False), dbus.Boolean(True)) + self.manager.stop(self.unit_name) + self.manager.disable(self.unit_name) + self.manager.mask(self.unit_name) log('I6', logdata) service_state = self._get_state() @@ -70,7 +346,7 @@ def _get_state(self): ''' Get the string describing service state. ''' - return self.unit_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState') + return self.manager.active_state(self.unit_name) def restart(self): """ @@ -78,13 +354,11 @@ def restart(self): """ logdata = {'unit': self.unit_name, 'action': 'restart'} try: - self.unit = self.manager.LoadUnit(dbus.String(self.unit_name)) - self.manager.RestartUnit(self.unit_name, 'replace') + self.manager.restart(self.unit_name) log('I13', logdata) service_state = self._get_state() if service_state not in ('active', 'activating'): log('E77', logdata) - except dbus.DBusException as exc: + except SystemdManagerError as exc: log('E77', {**logdata, 'error': str(exc)}) - diff --git a/gpoa/frontend/change_journal.py b/gpoa/frontend/change_journal.py new file mode 100644 index 00000000..281c9033 --- /dev/null +++ b/gpoa/frontend/change_journal.py @@ -0,0 +1,218 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import hashlib +from pathlib import Path +import stat + + +_watched_paths = {} +_current_snapshots = {} +_current_snapshot_keys = {} +_changed_paths = set() +_presence_changed_paths = set() + + +def _normalize(path): + if not path: + return None + try: + return str(Path(path).resolve(strict=False)) + except Exception: + return str(path) + + +def _kind_from_mode(st_mode): + if stat.S_ISREG(st_mode): + return 'file' + if stat.S_ISDIR(st_mode): + return 'dir' + if stat.S_ISLNK(st_mode): + return 'symlink' + return 'other' + + +def _sha256(path_obj): + digest = hashlib.sha256() + with path_obj.open('rb') as file_obj: + while True: + chunk = file_obj.read(1024 * 1024) + if not chunk: + break + digest.update(chunk) + return digest.hexdigest() + + +def _present_snapshot_key(stat_result): + return ( + 'present', + stat_result.st_mode, + stat_result.st_uid, + stat_result.st_gid, + stat_result.st_size, + stat_result.st_mtime_ns, + stat_result.st_ctime_ns, + stat_result.st_ino, + ) + + +def _missing_snapshot(snapshot_key): + return { + 'exists': False, + 'kind': None, + 'stat': None, + 'sha256': None, + '_snapshot_key': snapshot_key, + } + + +def _present_snapshot(path_obj, stat_result, snapshot_key): + sha256 = None + if stat.S_ISREG(stat_result.st_mode): + try: + sha256 = _sha256(path_obj) + except Exception: + sha256 = '__error__' + + return { + 'exists': True, + 'kind': _kind_from_mode(stat_result.st_mode), + 'stat': ( + stat_result.st_mode, + stat_result.st_uid, + stat_result.st_gid, + stat_result.st_size, + stat_result.st_mtime_ns, + stat_result.st_ctime_ns, + stat_result.st_ino, + ), + 'sha256': sha256, + '_snapshot_key': snapshot_key, + } + + +def _snapshot(path): + path_obj = Path(path) + try: + stat_result = path_obj.lstat() + snapshot_key = _present_snapshot_key(stat_result) + except FileNotFoundError: + return _missing_snapshot(('absent',)) + except Exception: + return _missing_snapshot(('error',)) + + return _present_snapshot(path_obj, stat_result, snapshot_key) + + +def reset(): + _watched_paths.clear() + _current_snapshots.clear() + _current_snapshot_keys.clear() + _changed_paths.clear() + _presence_changed_paths.clear() + + +def watch(path): + normalized = _normalize(path) + if not normalized: + return + if normalized not in _watched_paths: + _watched_paths[normalized] = _snapshot(normalized) + _current_snapshots.pop(normalized, None) + _current_snapshot_keys.pop(normalized, None) + + +def watch_many(paths): + if not paths: + return + for path in paths: + watch(path) + + +def record_changed(path): + normalized = _normalize(path) + if normalized: + _changed_paths.add(normalized) + + +def record_presence_changed(path): + normalized = _normalize(path) + if normalized: + _presence_changed_paths.add(normalized) + _changed_paths.add(normalized) + + +def _snapshot_current(path): + path_obj = Path(path) + try: + stat_result = path_obj.lstat() + key = _present_snapshot_key(stat_result) + snapshot_factory = lambda: _present_snapshot(path_obj, stat_result, key) + except FileNotFoundError: + key = ('absent',) + snapshot_factory = lambda: _missing_snapshot(key) + except Exception: + key = ('error',) + snapshot_factory = lambda: _missing_snapshot(key) + + if _current_snapshot_keys.get(path) == key: + return _current_snapshots[path] + current = snapshot_factory() + _current_snapshot_keys[path] = key + _current_snapshots[path] = current + return current + + +def _presence_changed(path): + baseline = _watched_paths.get(path) + if baseline is None: + return False + current = _snapshot_current(path) + return baseline['exists'] != current['exists'] + + +def _changed(path): + baseline = _watched_paths.get(path) + if baseline is None: + return False + current = _snapshot_current(path) + + if baseline['exists'] != current['exists']: + return True + + if not baseline['exists'] and not current['exists']: + return False + + return ( + baseline['kind'] != current['kind'] + or baseline['stat'] != current['stat'] + or baseline['sha256'] != current['sha256'] + ) + + +def query(path, mode='changed'): + normalized = _normalize(path) + if not normalized: + return False + if mode == 'presence_changed': + if normalized in _presence_changed_paths: + return True + return _presence_changed(normalized) + if normalized in _changed_paths: + return True + return _changed(normalized) diff --git a/gpoa/frontend/frontend_manager.py b/gpoa/frontend/frontend_manager.py index af9de300..8b013122 100644 --- a/gpoa/frontend/frontend_manager.py +++ b/gpoa/frontend/frontend_manager.py @@ -27,6 +27,7 @@ ) from .chromium_applier import chromium_applier +from .change_journal import reset as reset_change_journal from .cifs_applier import cifs_applier, cifs_applier_user from .control_applier import control_applier from .cups_applier import cups_applier @@ -46,10 +47,24 @@ from .scripts_applier import scripts_applier, scripts_applier_user from .shortcut_applier import shortcut_applier, shortcut_applier_user from .systemd_applier import systemd_applier +from .systemd_preferences_applier import ( + systemd_preferences_applier, + systemd_preferences_applier_user, +) from .thunderbird_applier import thunderbird_applier from .yandex_browser_applier import yandex_browser_applier +def prime_dependency_journal(appliers): + applier = appliers.get('systemd_preferences') + if not applier: + return + + prime = getattr(applier, 'prime_dependency_journal', None) + if callable(prime): + prime() + + def determine_username(username=None): ''' Checks if the specified username is valid in order to prevent @@ -129,6 +144,11 @@ def _init_machine_appliers(self): self.machine_appliers['ini'] = ini_applier(self.storage) self.machine_appliers['kde'] = kde_applier(self.storage) self.machine_appliers['package'] = package_applier(self.storage) + # systemd_preferences must be registered last: its post_restart() checks + # dependency paths that other appliers (e.g. ini) may have modified during + # the same apply cycle. New appliers that write files must be added before + # this entry to ensure their changes are visible to the dependency journal. + self.machine_appliers['systemd_preferences'] = systemd_preferences_applier(self.storage) def _init_user_appliers(self): # User appliers are expected to work with user-writable @@ -149,6 +169,7 @@ def _init_user_appliers(self): self.user_appliers['ini'] = ini_applier_user(self.storage, self.username) self.user_appliers['kde'] = kde_applier_user(self.storage, self.username, self.file_cache) self.user_appliers['package'] = package_applier_user(self.storage, self.username) + self.user_appliers['systemd_preferences'] = systemd_preferences_applier_user(self.storage, self.username) def machine_apply(self): ''' @@ -158,6 +179,12 @@ def machine_apply(self): log('E13') return log('D16') + reset_change_journal() + try: + prime_dependency_journal(self.machine_appliers) + except Exception as exc: + logdata = {'applier_name': 'systemd_preferences', 'msg': str(exc)} + log('E24', logdata) for applier_name, applier_object in self.machine_appliers.items(): try: @@ -170,6 +197,12 @@ def user_apply(self): ''' Run appliers for users. ''' + reset_change_journal() + try: + prime_dependency_journal(self.user_appliers) + except Exception as exc: + logdata = {'applier': 'systemd_preferences', 'exception': str(exc)} + log('E19', logdata) if is_root(): for applier_name, applier_object in self.user_appliers.items(): try: @@ -199,4 +232,3 @@ def apply_parameters(self): self.machine_apply() else: self.user_apply() - diff --git a/gpoa/frontend/systemd_applier.py b/gpoa/frontend/systemd_applier.py index 7916d680..bd52cacf 100644 --- a/gpoa/frontend/systemd_applier.py +++ b/gpoa/frontend/systemd_applier.py @@ -19,7 +19,11 @@ from util.logging import log from .applier_frontend import applier_frontend, check_enabled -from .appliers.systemd import systemd_unit +from .appliers.systemd import ( + SystemdManagerError, + is_valid_unit_name, + systemd_unit, +) class systemd_applier(applier_frontend): @@ -40,18 +44,22 @@ def __init__(self, storage): def run(self): for setting in self.systemd_unit_settings: + unit_name = str(setting.valuename) try: - self.units.append(systemd_unit(setting.valuename, int(setting.data))) - logdata = {'unit': format(setting.valuename)} + desired_state = int(setting.data) + if not is_valid_unit_name(unit_name): + raise ValueError('Invalid unit name') + self.units.append(systemd_unit(unit_name, desired_state)) + logdata = {'unit': unit_name} log('I4', logdata) - except Exception as exc: - logdata = {'unit': format(setting.valuename), 'exc': exc} + except (TypeError, ValueError, SystemdManagerError) as exc: + logdata = {'unit': unit_name, 'exc': str(exc)} log('I5', logdata) for unit in self.units: try: unit.apply() - except: - logdata = {'unit': unit.unit_name} + except SystemdManagerError as exc: + logdata = {'unit': unit.unit_name, 'error': str(exc)} log('E45', logdata) def apply(self): @@ -78,4 +86,3 @@ def user_context_apply(self): def admin_context_apply(self): pass - diff --git a/gpoa/frontend/systemd_preferences_applier.py b/gpoa/frontend/systemd_preferences_applier.py new file mode 100644 index 00000000..ff082a5d --- /dev/null +++ b/gpoa/frontend/systemd_preferences_applier.py @@ -0,0 +1,783 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import base64 +import binascii +import os +from pathlib import Path +import stat +import tempfile + +from util.logging import log +from util.util import get_homedir, get_uid_by_username, string_to_literal_eval + +from .applier_frontend import applier_frontend, check_enabled +from .appliers.systemd import ( + SystemdManager, + SystemdManagerError, + is_valid_unit_name, +) +from .change_journal import query, record_changed, record_presence_changed, watch_many +from gpt.systemds_constants import ( + DEFAULT_DROPIN_NAME, + DROPIN_NAME_RE, + MAX_DEPENDENCIES_PER_RULE, + MAX_DEPENDENCY_PATH_LEN, + MAX_UNIT_FILE_SIZE, + NON_RESTARTABLE_TYPES, + VALID_APPLY_MODES, + VALID_DEP_MODES, + VALID_POLICY_TARGETS, + VALID_STATES, +) + +MANAGED_HEADER = '# gpupdate-managed uid: {}' +MAX_RULES_PER_SCOPE = 512 + + +class _Context: + def __init__(self, mode='machine', username=None): + self.mode = mode + self.username = username + self.systemd_dir = '/etc/systemd/system' + if mode == 'user': + self.systemd_dir = os.path.join(get_homedir(username), '.config/systemd/user') + elif mode == 'global_user': + self.systemd_dir = '/etc/systemd/user' + + +def _syslog(level, message, data=None): + payload = {'plugin': 'SystemdPreferencesApplier', 'message': message} + if data: + payload['data'] = data + log(level, payload) + + +def _as_bool(value): + if isinstance(value, bool): + return value + if value is None: + return False + return str(value).lower() in ('1', 'true', 'yes') + + +def _has_control_chars(value): + return any(ord(ch) < 32 or ord(ch) == 127 for ch in str(value)) + + +def _is_valid_dropin_name(value): + if not isinstance(value, str): + return False + if not value or value != value.strip(): + return False + if '\x00' in value or '/' in value or '\\' in value: + return False + if _has_control_chars(value): + return False + return bool(DROPIN_NAME_RE.match(value)) + + +def _expand_windows_var(path, username=None): + if not path: + return path + variables = { + 'HOME': '/etc/skel', + 'HOMEPATH': '/etc/skel', + 'HOMEDRIVE': '/', + 'SystemRoot': '/', + 'SystemDrive': '/', + 'USERNAME': username if username else '', + } + if username: + variables['HOME'] = get_homedir(username) + variables['HOMEPATH'] = variables['HOME'] + result = path + for key, value in variables.items(): + replacement = str(value) + if key not in ('USERNAME',) and not replacement.endswith('/'): + replacement = '{}{}'.format(replacement, '/') + result = result.replace('%{}%'.format(key), replacement) + return result + + +def _read_preferences(storage, scope_name, is_previous=False): + prefix = 'Software/BaseALT/Policies/Preferences/{}'.format(scope_name) + if is_previous: + prefix = 'Previous/{}'.format(prefix) + key = '{}/Systemds'.format(prefix) + value = storage.get_entry(key, preg=False) + if not value: + return [] + + items = string_to_literal_eval(value) + if not isinstance(items, list): + return [] + entries = [item for item in items if isinstance(item, dict)] + if len(entries) > MAX_RULES_PER_SCOPE: + log('W47', { + 'reason': 'Systemd preferences rule limit exceeded', + 'scope': scope_name, + 'count': len(entries), + 'limit': MAX_RULES_PER_SCOPE, + }) + return entries[:MAX_RULES_PER_SCOPE] + return entries + + +def _is_valid_dependency_path(path, policy_target): + if not isinstance(path, str): + return False + if not path or len(path) > MAX_DEPENDENCY_PATH_LEN: + return False + if '\x00' in path or _has_control_chars(path): + return False + + expanded = _expand_windows_var(path) + if not expanded or len(expanded) > MAX_DEPENDENCY_PATH_LEN: + return False + if '\x00' in expanded or _has_control_chars(expanded): + return False + + upper_path = path.upper() + if policy_target == 'user': + if '%HOME%' in upper_path or '%HOMEPATH%' in upper_path: + return os.path.isabs(expanded) + return os.path.isabs(path) and os.path.isabs(expanded) + return os.path.isabs(expanded) + + +def _derive_edit_mode(apply_mode): + if apply_mode == 'always': + return 'create_or_override' + elif apply_mode == 'if_exists': + return 'override' + elif apply_mode == 'if_missing': + return 'create' + return 'create_or_override' + + +def _normalize_rule(item): + unit = item.get('unit') + state = item.get('state') + apply_mode = item.get('apply_mode', item.get('applyMode', 'always')) + policy_target = item.get('policy_target', item.get('policyTarget', 'machine')) + uid = item.get('uid') + + if not unit or state not in VALID_STATES: + return None + if not is_valid_unit_name(str(unit)): + log('W47', {'reason': 'Invalid unit value', 'unit': unit}) + return None + if apply_mode not in VALID_APPLY_MODES: + return None + if policy_target not in VALID_POLICY_TARGETS: + return None + if not uid: + return None + + dependencies = item.get('file_dependencies', item.get('fileDependencies', [])) + if not isinstance(dependencies, list): + dependencies = [] + if len(dependencies) > MAX_DEPENDENCIES_PER_RULE: + log('W47', { + 'reason': 'Too many file dependencies, truncating', + 'unit': unit, + 'count': len(dependencies), + 'limit': MAX_DEPENDENCIES_PER_RULE, + }) + dependencies = dependencies[:MAX_DEPENDENCIES_PER_RULE] + + valid_dependencies = [] + for dep in dependencies: + if not isinstance(dep, dict): + continue + mode = dep.get('mode') + path = dep.get('path') + if mode not in VALID_DEP_MODES or not path: + continue + if not _is_valid_dependency_path(str(path), policy_target): + log('W47', { + 'reason': 'Invalid dependency path', + 'unit': unit, + 'path': str(path), + 'policy_target': policy_target, + }) + continue + valid_dependencies.append({'mode': mode, 'path': str(path)}) + + dropin_name = item.get('dropin_name', item.get('dropInName', DEFAULT_DROPIN_NAME)) or DEFAULT_DROPIN_NAME + if not _is_valid_dropin_name(str(dropin_name)): + log('W47', {'reason': 'Invalid dropInName', 'dropInName': dropin_name, 'unit': unit}) + return None + + unit_file = _decode_unit_file_b64(item, unit, uid) + if unit_file is None: + unit_file = _normalize_unit_file_content(item.get('unit_file', item.get('unitFile'))) + if unit_file is None and (item.get('unit_file') is not None or item.get('unitFile') is not None): + log('W47', { + 'reason': 'Invalid unit_file payload', + 'unit': unit, + 'uid': str(uid), + }) + + return { + 'uid': str(uid), + 'unit': str(unit), + 'state': state, + 'now': _as_bool(item.get('now', False)), + 'remove_policy': _as_bool(item.get('remove_policy', item.get('removePolicy', False))), + 'apply_mode': apply_mode, + 'policy_target': policy_target, + 'edit_mode': _derive_edit_mode(apply_mode), + 'dropin_name': str(dropin_name), + 'unit_file': unit_file, + 'file_dependencies': valid_dependencies, + 'element_type': item.get('element_type', item.get('elementType', 'service')), + } + + +def _rule_matches_apply_mode(rule, exists): + apply_mode = rule['apply_mode'] + if apply_mode == 'always': + return True + if apply_mode == 'if_exists': + return exists + return not exists + + +def _is_managed_by_uid(path, uid): + if not path.exists() or not path.is_file(): + return False + content = _safe_read_text(path) + if content is None: + return False + first_line = content.splitlines()[0] if content else '' + return first_line == MANAGED_HEADER.format(uid) + + +def _validate_existing_file(path): + fd = None + try: + flags = os.O_RDONLY + if hasattr(os, 'O_NOFOLLOW'): + flags |= os.O_NOFOLLOW + fd = os.open(str(path), flags) + st = os.fstat(fd) + if not stat.S_ISREG(st.st_mode): + return None, False + if st.st_nlink > 1: + return None, False + with os.fdopen(fd, 'r', encoding='utf-8') as file_obj: + content = file_obj.read() + fd = None + return content, True + except (OSError, UnicodeDecodeError): + return None, False + finally: + if fd is not None: + try: + os.close(fd) + except OSError: + pass + + +def _safe_read_text(path): + content, ok = _validate_existing_file(path) + if not ok: + return None + return content + + +def _safe_write_text(path, content): + parent = path.parent + parent.mkdir(parents=True, exist_ok=True) + if parent.is_symlink(): + return False, False + + existed = path.exists() + if existed: + _, ok = _validate_existing_file(path) + if not ok: + return False, existed + + data = content.encode('utf-8') + tmp_fd = None + tmp_path = None + try: + tmp_fd, tmp_path = tempfile.mkstemp(prefix='.gpupdate-', dir=str(parent)) + os.fchmod(tmp_fd, 0o644) + os.write(tmp_fd, data) + os.fsync(tmp_fd) + os.close(tmp_fd) + tmp_fd = None + os.replace(tmp_path, str(path)) + tmp_path = None # consumed by replace; prevent unlink in finally + except OSError: + return False, existed + finally: + if tmp_fd is not None: + try: + os.close(tmp_fd) + except OSError: + pass + if tmp_path and os.path.exists(tmp_path): + try: + os.unlink(tmp_path) + except OSError: + pass + + try: + dir_fd = os.open(str(parent), os.O_RDONLY) + try: + os.fsync(dir_fd) + finally: + os.close(dir_fd) + except OSError: + pass # fsync failure is non-fatal; file is already atomically written + + return True, existed + + +def _normalize_unit_file_content(unit_file): + if unit_file is None: + return None + + text = str(unit_file) + # Keep already multiline text as-is; only unescape policy-encoded newlines. + if '\n' in text or '\r' in text: + normalized = text.replace('\r\n', '\n').replace('\r', '\n') + elif '\\n' in text or '\\r' in text: + normalized = text.replace('\\r\\n', '\n').replace('\\n', '\n').replace('\\r', '\n') + else: + normalized = text + + if len(normalized.encode('utf-8')) > MAX_UNIT_FILE_SIZE: + return None + return normalized + + +def _decode_unit_file_b64(item, unit, uid): + payload = item.get('unit_file_b64', item.get('unitFileB64')) + if payload is None: + return None + + try: + data = base64.b64decode(str(payload), validate=True) + if len(data) > MAX_UNIT_FILE_SIZE: + log('W47', { + 'reason': 'unit_file_b64 exceeds size limit', + 'unit': unit, + 'uid': uid, + 'limit': MAX_UNIT_FILE_SIZE, + }) + return None + return data.decode('utf-8') + except (TypeError, ValueError, binascii.Error, UnicodeDecodeError): + log('W47', { + 'reason': 'Invalid unit_file_b64 payload', + 'unit': unit, + 'uid': uid, + }) + return None + + +class _systemd_preferences_runtime: + def __init__(self, storage, scope_name, context, systemd_manager=None): + self.storage = storage + self.scope_name = scope_name + self.context = context + self.systemd_manager = systemd_manager + self.daemon_reload_required = False + self.phase2_candidates = [] + + def _manager(self): + if self.systemd_manager is None: + try: + self.systemd_manager = SystemdManager(mode=self.context.mode) + except Exception as exc: + raise SystemdManagerError(str(exc), action='connect') + return self.systemd_manager + + def _exists(self, unit_name): + try: + return self._manager().exists(unit_name) + except SystemdManagerError as exc: + _syslog('W', 'Unable to query unit existence', {'unit': unit_name, 'error': str(exc)}) + return False + + def _daemon_reload(self): + log('D245', {'context': self.context.mode}) + try: + self._manager().reload() + except SystemdManagerError as exc: + error = str(exc) + log('W50', {'context': self.context.mode, 'error': error}) + _syslog('W', 'daemon-reload failed', {'context': self.context.mode, 'error': error}) + self.daemon_reload_required = False + return False + self.daemon_reload_required = False + return True + + def _active_state(self, unit_name): + try: + return self._manager().active_state(unit_name) + except SystemdManagerError: + return None + + def _stop(self, rule): + if rule.get('element_type') in NON_RESTARTABLE_TYPES: + return + if self.context.mode == 'global_user': + return + state = self._active_state(rule['unit']) + if state not in ('active', 'activating', 'deactivating'): + return + try: + self._manager().stop(rule['unit']) + except SystemdManagerError as exc: + _syslog('W', 'Stop failed', {'unit': rule['unit'], 'error': str(exc)}) + + def _restart(self, rule): + if rule.get('element_type') in NON_RESTARTABLE_TYPES: + log('W49', {'unit': rule['unit'], 'type': rule.get('element_type')}) + _syslog('D', 'Unit type is non-restartable', {'unit': rule['unit'], 'type': rule.get('element_type')}) + return + + if self.context.mode == 'global_user': + _syslog('D', 'Dependency restart skipped: not supported for global_user scope', + {'unit': rule['unit']}) + return + + state = self._active_state(rule['unit']) + if state not in ('active', 'activating'): + return + + try: + self._manager().restart(rule['unit']) + except SystemdManagerError as exc: + _syslog('W', 'Restart failed', {'unit': rule['unit'], 'error': str(exc)}) + + def _rule_managed_paths(self, rule): + unit_file_path = Path(self.context.systemd_dir).joinpath(rule['unit']) + dropin_path = Path(self.context.systemd_dir).joinpath( + '{}.d'.format(rule['unit']), rule['dropin_name']) + return unit_file_path, dropin_path + + def _write_rule_file(self, target_file, uid, unit_file): + marker = MANAGED_HEADER.format(uid) + body = unit_file if unit_file.endswith('\n') else '{}\n'.format(unit_file) + content = '{}\n{}'.format(marker, body) + old_content = _safe_read_text(target_file) if target_file.exists() else None + if old_content == content: + return + + written, existed = _safe_write_text(target_file, content) + if not written: + _syslog('W', 'Unable to safely write managed file', {'path': str(target_file)}) + return + + if existed: + record_changed(str(target_file)) + else: + record_presence_changed(str(target_file)) + self.daemon_reload_required = True + + def _apply_edit(self, rule, exists): + unit_file = rule.get('unit_file') + if not unit_file: + return + + unit_file_path, dropin_path = self._rule_managed_paths(rule) + edit_mode = rule['edit_mode'] + if edit_mode == 'create': + self._write_rule_file(unit_file_path, rule['uid'], unit_file) + return + if edit_mode == 'override': + self._write_rule_file(dropin_path, rule['uid'], unit_file) + return + if exists: + self._write_rule_file(dropin_path, rule['uid'], unit_file) + else: + self._write_rule_file(unit_file_path, rule['uid'], unit_file) + + def _run_state_action(self, rule): + state = rule['state'] + if state == 'as_is': + return + + try: + self._manager().apply_state(rule['unit'], state, rule['now']) + except (SystemdManagerError, ValueError) as exc: + _syslog('W', 'State apply failed', {'unit': rule['unit'], 'state': state, 'error': str(exc)}) + + def apply_rules(self, rules): + applicable_rules = [] + for rule in rules: + log('D244', {'unit': rule['unit'], 'state': rule['state']}) + exists = self._exists(rule['unit']) + if not _rule_matches_apply_mode(rule, exists): + # Rule does not qualify for edit/state this run, but should + # still be checked for dependency-triggered restarts. + self.phase2_candidates.append(rule) + continue + applicable_rules.append((rule, exists)) + + for rule, exists in applicable_rules: + self._apply_edit(rule, exists) + + if self.daemon_reload_required and not self._daemon_reload(): + _syslog('W', 'Skipping state apply due to daemon-reload failure', { + 'context': self.context.mode, + 'rules': len(applicable_rules), + }) + return + + for rule, _ in applicable_rules: + self._run_state_action(rule) + self.phase2_candidates.append(rule) + + def cleanup_removed_rules(self, removed_rules): + affected_units = set() + for rule in removed_rules: + log('D246', {'unit': rule['unit'], 'uid': rule['uid']}) + unit_file_path, dropin_path = self._rule_managed_paths(rule) + for target in (unit_file_path, dropin_path): + if not _is_managed_by_uid(target, rule['uid']): + continue + try: + target.unlink() + record_presence_changed(str(target)) + self.daemon_reload_required = True + affected_units.add((rule['unit'], rule.get('element_type', 'service'))) + except Exception as exc: + _syslog('W', 'Failed to cleanup managed file', {'path': str(target), 'error': str(exc)}) + dropin_dir = dropin_path.parent + if dropin_dir.exists(): + try: + dropin_dir.rmdir() + except OSError: + pass + + if self.daemon_reload_required: + if not self._daemon_reload(): + _syslog('W', 'Skipping cleanup restart due to daemon-reload failure', { + 'context': self.context.mode, + 'units': [unit_name for unit_name, _ in affected_units], + }) + return + for unit_name, element_type in affected_units: + cleanup_rule = { + 'unit': unit_name, + 'element_type': element_type, + } + self._stop(cleanup_rule) + + def _dependency_changed(self, dependency, username=None): + dep_path = _expand_windows_var(dependency['path'], username) + if not dep_path or not os.path.isabs(dep_path): + return False + mode = dependency['mode'] + return query(dep_path, mode=mode) + + def post_restart(self, username=None): + for rule in self.phase2_candidates: + dependencies = rule.get('file_dependencies', []) + if not dependencies: + continue + if any(self._dependency_changed(dep, username=username) for dep in dependencies): + log('D247', {'unit': rule['unit']}) + self._restart(rule) + + +def _get_removed_rules(storage, scope_name, target): + current_raw = _read_preferences(storage, scope_name, is_previous=False) + previous_raw = _read_preferences(storage, scope_name, is_previous=True) + current_map = {} + previous_map = {} + for item in current_raw: + normalized = _normalize_rule(item) + if normalized is not None and normalized['policy_target'] == target: + current_map[normalized['uid']] = normalized + for item in previous_raw: + normalized = _normalize_rule(item) + if normalized is not None and normalized['policy_target'] == target: + previous_map[normalized['uid']] = normalized + removed_uids = set(previous_map.keys()) - set(current_map.keys()) + return [previous_map[uid] for uid in removed_uids] + + +def _get_rules_for_scope(storage, scope_name, target): + current_raw = _read_preferences(storage, scope_name, is_previous=False) + rules = [] + for item in current_raw: + normalized = _normalize_rule(item) + if normalized is None: + continue + if normalized['policy_target'] != target: + continue + rules.append(normalized) + return rules + + +def _split_active_and_cleanup_rules(rules): + active_rules = [] + explicit_cleanup_rules = [] + for rule in rules: + if rule.get('remove_policy'): + explicit_cleanup_rules.append(rule) + continue + active_rules.append(rule) + return active_rules, explicit_cleanup_rules + + +def _merge_cleanup_rules(removed_by_diff, explicit_cleanup_rules): + merged = {} + for rule in removed_by_diff: + merged[rule['uid']] = rule + for rule in explicit_cleanup_rules: + merged[rule['uid']] = rule + return list(merged.values()) + + +def _get_rule_sets_for_scope(storage, scope_name, target): + current_rules = _get_rules_for_scope(storage, scope_name, target) + active_rules, explicit_cleanup_rules = _split_active_and_cleanup_rules(current_rules) + removed_by_diff = _get_removed_rules(storage, scope_name, target) + cleanup_rules = _merge_cleanup_rules(removed_by_diff, explicit_cleanup_rules) + return active_rules, cleanup_rules + + +def _collect_dependency_paths(storage, scope_name, target, username=None): + dependency_paths = [] + for rule in _get_rules_for_scope(storage, scope_name, target): + if rule.get('remove_policy'): + continue + for dependency in rule.get('file_dependencies', []): + dep_path = _expand_windows_var(dependency.get('path'), username) + if dep_path and os.path.isabs(dep_path): + dependency_paths.append(dep_path) + return dependency_paths + + +class systemd_preferences_applier(applier_frontend): + __module_name = 'SystemdPreferencesApplier' + __module_experimental = True + __module_enabled = False + __scope_name = 'Machine' + + def __init__(self, storage): + self.storage = storage + self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_experimental) + + def prime_dependency_journal(self): + if not self.__module_enabled: + return + dependency_paths = [] + dependency_paths.extend(_collect_dependency_paths(self.storage, self.__scope_name, target='machine')) + dependency_paths.extend(_collect_dependency_paths(self.storage, self.__scope_name, target='user')) + watch_many(dependency_paths) + + def apply(self): + if not self.__module_enabled: + log('D243') + return + + log('D240') + runtime = _systemd_preferences_runtime(self.storage, self.__scope_name, _Context(mode='machine')) + active_rules, cleanup_rules = _get_rule_sets_for_scope( + self.storage, self.__scope_name, target='machine') + runtime.apply_rules(active_rules) + runtime.cleanup_removed_rules(cleanup_rules) + runtime.post_restart() + + global_user_runtime = _systemd_preferences_runtime( + self.storage, + self.__scope_name, + _Context(mode='global_user'), + ) + active_user_rules, cleanup_user_rules = _get_rule_sets_for_scope( + self.storage, + self.__scope_name, + target='user', + ) + global_user_runtime.apply_rules(active_user_rules) + global_user_runtime.cleanup_removed_rules(cleanup_user_rules) + global_user_runtime.post_restart() + + +class systemd_preferences_applier_user(applier_frontend): + __module_name = 'SystemdPreferencesApplierUser' + __module_experimental = True + __module_enabled = False + + def __init__(self, storage, username): + self.storage = storage + self.username = username + self.uid = get_uid_by_username(username) + self.user_bus_path = '/run/user/{}/bus'.format(self.uid) if self.uid is not None else None + self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_experimental) + + def prime_dependency_journal(self): + if not self.__module_enabled: + return + + dependency_paths = [] + dependency_paths.extend(_collect_dependency_paths(self.storage, self.username, target='machine')) + dependency_paths.extend(_collect_dependency_paths( + self.storage, + self.username, + target='user', + username=self.username, + )) + watch_many(dependency_paths) + + def admin_context_apply(self): + if not self.__module_enabled: + log('D243') + return + + log('D241', {'username': self.username}) + runtime = _systemd_preferences_runtime(self.storage, self.username, _Context(mode='machine')) + active_rules, cleanup_rules = _get_rule_sets_for_scope( + self.storage, self.username, target='machine') + runtime.apply_rules(active_rules) + runtime.cleanup_removed_rules(cleanup_rules) + runtime.post_restart() + + def user_context_apply(self): + if not self.__module_enabled: + log('D243') + return + log('D242', {'username': self.username}) + if not self.user_bus_path or not os.path.exists(self.user_bus_path): + log('W48', {'username': self.username, 'path': self.user_bus_path}) + _syslog('W', 'systemd --user manager is unavailable', { + 'username': self.username, + 'path': self.user_bus_path, + }) + return + + runtime = _systemd_preferences_runtime( + self.storage, + self.username, + _Context(mode='user', username=self.username)) + active_rules, cleanup_rules = _get_rule_sets_for_scope( + self.storage, self.username, target='user') + runtime.apply_rules(active_rules) + runtime.cleanup_removed_rules(cleanup_rules) + runtime.post_restart(username=self.username) diff --git a/gpoa/gpt/gpt.py b/gpoa/gpt/gpt.py index 9678c3dd..d0e15752 100644 --- a/gpoa/gpt/gpt.py +++ b/gpoa/gpt/gpt.py @@ -39,6 +39,7 @@ from .scriptsini import merge_scripts, read_scripts from .services import merge_services, read_services from .shortcuts import merge_shortcuts, read_shortcuts +from .systemds import merge_systemds, read_systemds from .tasks import merge_tasks, read_tasks @@ -53,6 +54,7 @@ class FileType(Enum): ENVIRONMENTVARIABLES = 'environmentvariables.xml' INIFILES = 'inifiles.xml' SERVICES = 'services.xml' + SYSTEMDS = 'systemd.xml' PRINTERS = 'printers.xml' SCRIPTS = 'scripts.ini' NETWORKSHARES = 'networkshares.xml' @@ -80,6 +82,7 @@ def pref_parsers(): parsers[FileType.ENVIRONMENTVARIABLES] = read_envvars parsers[FileType.INIFILES] = read_inifiles parsers[FileType.SERVICES] = read_services + parsers[FileType.SYSTEMDS] = read_systemds parsers[FileType.PRINTERS] = read_printers parsers[FileType.SCRIPTS] = read_scripts parsers[FileType.NETWORKSHARES] = read_networkshares @@ -102,6 +105,7 @@ def pref_mergers(): mergers[FileType.ENVIRONMENTVARIABLES] = merge_envvars mergers[FileType.INIFILES] = merge_inifiles mergers[FileType.SERVICES] = merge_services + mergers[FileType.SYSTEMDS] = merge_systemds mergers[FileType.PRINTERS] = merge_printers mergers[FileType.SCRIPTS] = merge_scripts mergers[FileType.NETWORKSHARES] = merge_networkshares @@ -139,6 +143,7 @@ def __init__(self, gpt_path, username='Machine', gpo_info=None): , 'files' , 'inifiles' , 'services' + , 'systemd' , 'scheduledtasks' , 'scripts' , 'networkshares' @@ -285,13 +290,13 @@ def find_preffile(search_path, prefname): if not prefdir: return None - # Then search for preference directory pref_dir = find_dir(prefdir, prefname) - file_name = '{}.xml'.format(prefname) - # And then try to find the corresponding file. - pref_file = find_file(pref_dir, file_name) + if pref_dir: + pref_file = find_file(pref_dir, '{}.xml'.format(prefname)) + if pref_file: + return pref_file - return pref_file + return None def lp2gpt(): ''' diff --git a/gpoa/gpt/systemds.py b/gpoa/gpt/systemds.py new file mode 100644 index 00000000..19158a66 --- /dev/null +++ b/gpoa/gpt/systemds.py @@ -0,0 +1,357 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import base64 +import os +from xml.etree import ElementTree + +from util.logging import log + +from .dynamic_attributes import DynamicAttributes +from .systemds_constants import ( + DEFAULT_DROPIN_NAME, + DROPIN_NAME_RE, + MAX_DEPENDENCIES_PER_RULE, + MAX_DEPENDENCY_PATH_LEN, + MAX_UNIT_FILE_SIZE, + UNIT_NAME_RE, + VALID_APPLY_MODES, + VALID_DEP_MODES, + VALID_POLICY_TARGETS, + VALID_STATES, +) + + +VALID_POLICY_ELEMENTS = { + 'Service', + 'Socket', + 'Timer', + 'Path', + 'Mount', + 'Automount', + 'Swap', + 'Target', + 'Device', + 'Slice', + 'Scope', +} + +UNIT_SUFFIX = { + 'Service': '.service', + 'Socket': '.socket', + 'Timer': '.timer', + 'Path': '.path', + 'Mount': '.mount', + 'Automount': '.automount', + 'Swap': '.swap', + 'Target': '.target', + 'Device': '.device', + 'Slice': '.slice', + 'Scope': '.scope', +} + + +def _tag_name(element): + return str(element.tag).split('}')[-1] + + +def _as_bool(value, default=False): + if value is None: + return default + return str(value).lower() in ('1', 'true', 'yes') + + +def _normalize_unit_name(unit_name, element_name): + if not unit_name: + return None + + all_suffixes = set(UNIT_SUFFIX.values()) + if any(str(unit_name).endswith(suffix) for suffix in all_suffixes): + return unit_name + + suffix = UNIT_SUFFIX.get(element_name) + if not suffix: + return unit_name + + return '{}{}'.format(unit_name, suffix) + + +def _is_safe_component(value): + text = str(value) if value is not None else '' + if not text: + return False + if text in ('.', '..'): + return False + if text != text.strip(): + return False + if '/' in text or '\\' in text: + return False + if os.path.isabs(text): + return False + if len(text) >= 2 and text[1] == ':' and text[0].isalpha(): + return False + if '\x00' in text: + return False + return True + + +def _has_control_chars(value): + return any(ord(ch) < 32 or ord(ch) == 127 for ch in str(value)) + + +def _derive_edit_mode(apply_mode): + if apply_mode == 'always': + return 'create_or_override' + elif apply_mode == 'if_exists': + return 'override' + elif apply_mode == 'if_missing': + return 'create' + return 'create_or_override' + + +def _is_valid_unit_name(value): + if not _is_safe_component(value): + return False + if _has_control_chars(value): + return False + if not UNIT_NAME_RE.match(str(value)): + return False + name_part = str(value).rsplit('.', 1)[0] + if name_part.startswith('-') or name_part.endswith('-'): + return False + return True + + +def _is_valid_dropin_name(value): + if not _is_safe_component(value): + return False + if _has_control_chars(value): + return False + return bool(DROPIN_NAME_RE.match(str(value))) + + +def _expand_windows_var(path): + if not path: + return path + variables = { + 'HOME': '/etc/skel', + 'HOMEPATH': '/etc/skel', + 'HOMEDRIVE': '/', + 'SystemRoot': '/', + 'SystemDrive': '/', + 'USERNAME': '', + } + result = str(path) + for key, value in variables.items(): + replacement = str(value) + if key not in ('USERNAME',) and not replacement.endswith('/'): + replacement = '{}{}'.format(replacement, '/') + result = result.replace('%{}%'.format(key), replacement) + return result + + +def _is_valid_dependency_path(path, policy_target): + if not isinstance(path, str): + return False + if not path or len(path) > MAX_DEPENDENCY_PATH_LEN: + return False + if '\x00' in path or _has_control_chars(path): + return False + + expanded = _expand_windows_var(path) + if not expanded or len(expanded) > MAX_DEPENDENCY_PATH_LEN: + return False + if '\x00' in expanded or _has_control_chars(expanded): + return False + + upper_path = path.upper() + if policy_target == 'user': + if '%HOME%' in upper_path or '%HOMEPATH%' in upper_path: + return os.path.isabs(expanded) + return os.path.isabs(path) and os.path.isabs(expanded) + + return os.path.isabs(expanded) + + +def _get_systemds_root(systemds_file): + try: + from defusedxml import ElementTree as DefusedElementTree + xml_contents = DefusedElementTree.parse(systemds_file) + except ImportError: + log('W47', {'reason': 'defusedxml is unavailable, using xml.etree fallback'}) + xml_contents = ElementTree.parse(systemds_file) + return xml_contents.getroot() + + +def _invalid_entry(message, data=None): + payload = {'reason': message} + if data: + payload.update(data) + log('W47', payload) + + +def _parse_file_dependencies(properties, policy_target, unit): + file_dependencies = [] + dependencies = properties.find('FileDependencies') + if dependencies is None: + return file_dependencies + + dependency_items = list(dependencies.findall('Dependency')) + if len(dependency_items) > MAX_DEPENDENCIES_PER_RULE: + _invalid_entry('Too many dependency entries, truncating', { + 'unit': unit, + 'count': len(dependency_items), + 'limit': MAX_DEPENDENCIES_PER_RULE, + }) + dependency_items = dependency_items[:MAX_DEPENDENCIES_PER_RULE] + + for dependency in dependency_items: + mode = dependency.get('mode') + path = dependency.get('path') + if mode not in VALID_DEP_MODES or not path: + _invalid_entry('Invalid dependency entry', {'mode': mode, 'path': path}) + continue + if not _is_valid_dependency_path(str(path), policy_target): + _invalid_entry('Invalid dependency path', { + 'mode': mode, + 'path': path, + 'policy_target': policy_target, + 'unit': unit, + }) + continue + file_dependencies.append({'mode': mode, 'path': path}) + + return file_dependencies + + +def _parse_policy_element(policy_element): + element_name = _tag_name(policy_element) + if element_name not in VALID_POLICY_ELEMENTS: + return None + + properties = policy_element.find('Properties') + if properties is None: + _invalid_entry('Missing in Systemds element', {'element': element_name}) + return None + + unit = _normalize_unit_name(properties.get('unit'), element_name) + state = properties.get('state') + apply_mode = properties.get('applyMode', 'always') + policy_target = properties.get('policyTarget', 'machine') + + if not unit: + _invalid_entry('Missing unit attribute', {'element': element_name}) + return None + if not _is_valid_unit_name(unit): + _invalid_entry('Invalid unit value', {'element': element_name, 'unit': unit}) + return None + if state not in VALID_STATES: + _invalid_entry('Invalid state', {'element': element_name, 'state': state, 'unit': unit}) + return None + if apply_mode not in VALID_APPLY_MODES: + _invalid_entry('Invalid applyMode', {'element': element_name, 'apply_mode': apply_mode, 'unit': unit}) + return None + if policy_target not in VALID_POLICY_TARGETS: + _invalid_entry('Invalid policyTarget', {'element': element_name, 'policy_target': policy_target, 'unit': unit}) + return None + uid = policy_element.get('uid') + clsid = policy_element.get('clsid') + name = policy_element.get('name') + if not uid or not clsid or not name: + _invalid_entry('Missing required policy attributes', { + 'element': element_name, + 'uid': uid, + 'clsid': clsid, + 'name': name, + 'unit': unit, + }) + return None + + unit_file = properties.find('UnitFile') + unit_file_text = None + unit_file_b64 = None + if unit_file is not None and unit_file.text is not None: + # UnitFile mode=table is treated as plain text by design. + unit_file_text = str(unit_file.text) + if len(unit_file_text.encode('utf-8')) > MAX_UNIT_FILE_SIZE: + _invalid_entry('UnitFile exceeds size limit', { + 'element': element_name, + 'unit': unit, + 'limit': MAX_UNIT_FILE_SIZE, + }) + return None + unit_file_b64 = base64.b64encode(unit_file_text.encode('utf-8')).decode('ascii') + + policy = systemd_policy(unit) + policy.element_type = element_name.lower() + policy.clsid = clsid + policy.name = name + policy.status = policy_element.get('status') + policy.image = policy_element.get('image') + policy.changed = policy_element.get('changed') + policy.uid = uid + policy.desc = policy_element.get('desc') + policy.bypassErrors = policy_element.get('bypassErrors') + policy.userContext = policy_element.get('userContext') + policy.removePolicy = policy_element.get('removePolicy') + + policy.state = state + policy.now = _as_bool(properties.get('now'), default=False) + policy.apply_mode = apply_mode + policy.policy_target = policy_target + policy.edit_mode = _derive_edit_mode(apply_mode) + dropin_name = properties.get('dropInName', DEFAULT_DROPIN_NAME) or DEFAULT_DROPIN_NAME + if not _is_valid_dropin_name(dropin_name): + _invalid_entry('Invalid dropInName', {'element': element_name, 'dropInName': dropin_name, 'unit': unit}) + return None + + policy.dropin_name = dropin_name + policy.unit_file = unit_file_text + policy.unit_file_b64 = unit_file_b64 + policy.unit_file_mode = 'text' + policy.file_dependencies = _parse_file_dependencies(properties, policy_target, unit) + + return policy + + +def read_systemds(systemds_file): + """ + Read Systemds.xml from GPT. + """ + policies = [] + root = _get_systemds_root(systemds_file) + if _tag_name(root) != 'Systemds': + _invalid_entry('Unexpected root element in Systemds.xml', {'root': _tag_name(root)}) + return policies + + for policy_element in root: + parsed = _parse_policy_element(policy_element) + if parsed is not None: + policies.append(parsed) + + return policies + + +def merge_systemds(storage, systemd_objects, policy_name): + for systemd_object in systemd_objects: + storage.add_systemd(systemd_object, policy_name) + + +class systemd_policy(DynamicAttributes): + def __init__(self, unit): + self.unit = unit diff --git a/gpoa/gpt/systemds_constants.py b/gpoa/gpt/systemds_constants.py new file mode 100644 index 00000000..7381827e --- /dev/null +++ b/gpoa/gpt/systemds_constants.py @@ -0,0 +1,37 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import re + +VALID_STATES = {'as_is', 'enable', 'disable', 'mask', 'unmask', 'preset'} +VALID_APPLY_MODES = {'always', 'if_exists', 'if_missing'} +VALID_POLICY_TARGETS = {'machine', 'user'} +VALID_EDIT_MODES = {'create', 'override', 'create_or_override'} +VALID_DEP_MODES = {'changed', 'presence_changed'} +NON_RESTARTABLE_TYPES = {'device', 'scope'} + +DEFAULT_DROPIN_NAME = '50-gpo.conf' +DROPIN_NAME_RE = re.compile(r'^[A-Za-z0-9_.@-]{1,128}\.conf$') +UNIT_NAME_RE = re.compile( + r'^[A-Za-z0-9:_.@-]{1,255}\.(service|socket|timer|path|mount|automount|swap|target|device|slice|scope)$' +) + +MAX_RULES_PER_SCOPE = 512 +MAX_DEPENDENCIES_PER_RULE = 32 +MAX_DEPENDENCY_PATH_LEN = 4096 +MAX_UNIT_FILE_SIZE = 128 * 1024 diff --git a/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po b/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po index bb7a7186..7b20d829 100644 --- a/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po +++ b/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po @@ -1135,6 +1135,42 @@ msgstr "Ошибка загрузки плагина из файла" msgid "Plugin failed to apply with user privileges" msgstr "Плагин не смог примениться с правами пользователя" +msgid "Running systemd preferences applier for machine" +msgstr "Запуск применения systemd preferences для машины" + +msgid "Running systemd preferences applier for user in admin context" +msgstr "Запуск применения systemd preferences для пользователя в административном контексте" + +msgid "Running systemd preferences applier for user in user context" +msgstr "Запуск применения systemd preferences для пользователя в пользовательском контексте" + +msgid "Systemd preferences applier is disabled" +msgstr "Применение systemd preferences отключено" + +msgid "Applying systemd preference rule" +msgstr "Применение правила systemd preference" + +msgid "Running daemon-reload for systemd preferences" +msgstr "Выполняется daemon-reload для systemd preferences" + +msgid "Cleaning up removed systemd preferences rule" +msgstr "Очистка удалённого правила systemd preferences" + +msgid "Restarting unit due to changed file dependency" +msgstr "Перезапуск unit из-за изменённой файловой зависимости" + +msgid "Invalid Systemds preference entry" +msgstr "Некорректная запись Systemds preference" + +msgid "User systemd manager is unavailable" +msgstr "Пользовательский менеджер systemd недоступен" + +msgid "Restart skipped for non-restartable unit type" +msgstr "Перезапуск пропущен для нерестартуемого типа unit" + +msgid "daemon-reload for systemd preferences failed" +msgstr "Ошибка daemon-reload для systemd preferences" + # Warning_end # Fatal @@ -1151,4 +1187,3 @@ msgid "Unknown fatal code" msgstr "Неизвестный код фатальной ошибки" - diff --git a/gpoa/messages/__init__.py b/gpoa/messages/__init__.py index 754eecf4..1c3becf8 100644 --- a/gpoa/messages/__init__.py +++ b/gpoa/messages/__init__.py @@ -362,6 +362,14 @@ def debug_code(code): debug_ids[237] = 'Failed to load cached versions' debug_ids[238] = 'The trust attribute is not supported' debug_ids[239] = 'Setting the trust attribute for a shortcut' + debug_ids[240] = 'Running systemd preferences applier for machine' + debug_ids[241] = 'Running systemd preferences applier for user in admin context' + debug_ids[242] = 'Running systemd preferences applier for user in user context' + debug_ids[243] = 'Systemd preferences applier is disabled' + debug_ids[244] = 'Applying systemd preference rule' + debug_ids[245] = 'Running daemon-reload for systemd preferences' + debug_ids[246] = 'Cleaning up removed systemd preferences rule' + debug_ids[247] = 'Restarting unit due to changed file dependency' return debug_ids.get(code, 'Unknown debug code') @@ -419,6 +427,10 @@ def warning_code(code): warning_ids[44] = 'Plugin is not valid API object' warning_ids[45] = 'Error loading plugin from file' warning_ids[46] = 'Plugin failed to apply with user privileges' + warning_ids[47] = 'Invalid Systemds preference entry' + warning_ids[48] = 'User systemd manager is unavailable' + warning_ids[49] = 'Restart skipped for non-restartable unit type' + warning_ids[50] = 'daemon-reload for systemd preferences failed' return warning_ids.get(code, 'Unknown warning code') @@ -450,4 +462,3 @@ def message_with_code(code): retstr = 'core' + '[' + code[0:1] + code[1:].rjust(7, '0') + ']| ' + gettext.gettext(get_message(code)) return retstr - diff --git a/gpoa/storage/dconf_registry.py b/gpoa/storage/dconf_registry.py index 0a93e59e..e19dbf9d 100644 --- a/gpoa/storage/dconf_registry.py +++ b/gpoa/storage/dconf_registry.py @@ -97,6 +97,7 @@ class Dconf_registry(): environmentvariables = [] inifiles = [] services = [] + systemds = [] printers = [] scripts = [] networkshares = [] @@ -454,6 +455,11 @@ def add_networkshare(cls, networkshareobj, policy_name): networkshareobj.policy_name = policy_name cls.networkshares.append(networkshareobj) + @classmethod + def add_systemd(cls, systemdobj, policy_name): + systemdobj.policy_name = policy_name + cls.systemds.append(systemdobj) + @classmethod def get_shortcuts(cls): @@ -503,6 +509,10 @@ def get_files(cls): def get_networkshare(cls): return cls.networkshares + @classmethod + def get_systemds(cls): + return cls.systemds + @classmethod def get_ini(cls): @@ -821,8 +831,18 @@ def get_dconf_envprofile(): def convert_elements_to_list_dicts(elements): return list(map(lambda x: dict(x), elements)) +def _freeze_for_dedup(value): + if isinstance(value, dict): + return tuple((key, _freeze_for_dedup(val)) for key, val in sorted(value.items())) + if isinstance(value, list): + return tuple(_freeze_for_dedup(item) for item in value) + return value + def remove_duplicate_dicts_in_list(list_dict): - return convert_elements_to_list_dicts(list(OrderedDict((tuple(sorted(d.items())), d) for d in list_dict).values())) + result = OrderedDict() + for item in convert_elements_to_list_dicts(list_dict): + result.setdefault(_freeze_for_dedup(item), item) + return list(result.values()) def add_preferences_to_global_registry_dict(username, is_machine): if is_machine: @@ -838,6 +858,7 @@ def add_preferences_to_global_registry_dict(username, is_machine): ('Environmentvariables',remove_duplicate_dicts_in_list(Dconf_registry.environmentvariables)), ('Inifiles',remove_duplicate_dicts_in_list(Dconf_registry.inifiles)), ('Services',remove_duplicate_dicts_in_list(Dconf_registry.services)), + ('Systemds',remove_duplicate_dicts_in_list(Dconf_registry.systemds)), ('Printers',remove_duplicate_dicts_in_list(Dconf_registry.printers)), ('Scripts',remove_duplicate_dicts_in_list(Dconf_registry.scripts)), ('Networkshares',remove_duplicate_dicts_in_list(Dconf_registry.networkshares))] diff --git a/gpoa/test/frontend/test_change_journal.py b/gpoa/test/frontend/test_change_journal.py new file mode 100644 index 00000000..b88d8829 --- /dev/null +++ b/gpoa/test/frontend/test_change_journal.py @@ -0,0 +1,115 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest +import os +import sys +import types +import importlib +import tempfile +import unittest.mock + + +def _load_change_journal(): + if 'frontend' not in sys.modules: + frontend_pkg = types.ModuleType('frontend') + frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')] + sys.modules['frontend'] = frontend_pkg + return importlib.import_module('frontend.change_journal') + + +class ChangeJournalTestCase(unittest.TestCase): + def test_changed_by_content_update(self): + change_journal = _load_change_journal() + change_journal.reset() + + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, 'a.txt') + with open(path, 'w', encoding='utf-8') as file_obj: + file_obj.write('first') + change_journal.watch(path) + with open(path, 'w', encoding='utf-8') as file_obj: + file_obj.write('second') + + self.assertTrue(change_journal.query(path, mode='changed')) + self.assertFalse(change_journal.query(path, mode='presence_changed')) + + def test_presence_changed_on_create_and_delete(self): + change_journal = _load_change_journal() + change_journal.reset() + with tempfile.TemporaryDirectory() as tmpdir: + created = os.path.join(tmpdir, 'create.txt') + change_journal.watch(created) + with open(created, 'w', encoding='utf-8') as file_obj: + file_obj.write('x') + + self.assertTrue(change_journal.query(created, mode='presence_changed')) + self.assertTrue(change_journal.query(created, mode='changed')) + + deleted = os.path.join(tmpdir, 'delete.txt') + with open(deleted, 'w', encoding='utf-8') as file_obj: + file_obj.write('x') + change_journal.watch(deleted) + os.unlink(deleted) + + self.assertTrue(change_journal.query(deleted, mode='presence_changed')) + self.assertTrue(change_journal.query(deleted, mode='changed')) + + def test_unchanged_and_unwatched_paths(self): + change_journal = _load_change_journal() + change_journal.reset() + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, 'same.txt') + with open(path, 'w', encoding='utf-8') as file_obj: + file_obj.write('same') + change_journal.watch(path) + + self.assertFalse(change_journal.query(path, mode='changed')) + self.assertFalse(change_journal.query(path, mode='presence_changed')) + self.assertFalse(change_journal.query(os.path.join(tmpdir, 'unwatched.txt'), mode='changed')) + + def test_record_compatibility_for_manual_override(self): + change_journal = _load_change_journal() + change_journal.reset() + + change_journal.record_changed('/tmp/a') + self.assertTrue(change_journal.query('/tmp/a', mode='changed')) + self.assertFalse(change_journal.query('/tmp/a', mode='presence_changed')) + + change_journal.record_presence_changed('/tmp/b') + self.assertTrue(change_journal.query('/tmp/b', mode='changed')) + self.assertTrue(change_journal.query('/tmp/b', mode='presence_changed')) + + def test_query_reuses_current_snapshot_cache(self): + change_journal = _load_change_journal() + change_journal.reset() + + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, 'cache.txt') + with open(path, 'w', encoding='utf-8') as file_obj: + file_obj.write('content') + change_journal.watch(path) + + with unittest.mock.patch.object(change_journal, '_sha256', wraps=change_journal._sha256) as hash_mock: + self.assertFalse(change_journal.query(path, mode='changed')) + self.assertFalse(change_journal.query(path, mode='changed')) + self.assertEqual(hash_mock.call_count, 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/gpoa/test/frontend/test_frontend_manager.py b/gpoa/test/frontend/test_frontend_manager.py new file mode 100644 index 00000000..3129a5dd --- /dev/null +++ b/gpoa/test/frontend/test_frontend_manager.py @@ -0,0 +1,95 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import ast +import os +import unittest + + +def _read_frontend_manager_ast(): + source_path = os.path.join(os.getcwd(), 'frontend', 'frontend_manager.py') + with open(source_path, 'r', encoding='utf-8') as file_obj: + source = file_obj.read() + return ast.parse(source) + + +def _find_method(class_node, method_name): + for node in class_node.body: + if isinstance(node, ast.FunctionDef) and node.name == method_name: + return node + return None + + +def _has_call(node, function_name): + for child in ast.walk(node): + if not isinstance(child, ast.Call): + continue + if isinstance(child.func, ast.Name) and child.func.id == function_name: + return True + if isinstance(child.func, ast.Attribute) and child.func.attr == function_name: + return True + return False + + +def _find_stmt_index(statements, predicate): + for index, statement in enumerate(statements): + if predicate(statement): + return index + return None + + +class FrontendManagerOrderTestCase(unittest.TestCase): + def test_machine_apply_primes_journal_after_reset_before_apply_loop(self): + tree = _read_frontend_manager_ast() + manager_class = next(node for node in tree.body + if isinstance(node, ast.ClassDef) and node.name == 'frontend_manager') + method = _find_method(manager_class, 'machine_apply') + self.assertIsNotNone(method) + + reset_index = _find_stmt_index(method.body, lambda stmt: _has_call(stmt, 'reset_change_journal')) + prime_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.Try) + and _has_call(stmt, 'prime_dependency_journal')) + loop_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.For)) + + self.assertIsNotNone(reset_index) + self.assertIsNotNone(prime_index) + self.assertIsNotNone(loop_index) + self.assertLess(reset_index, prime_index) + self.assertLess(prime_index, loop_index) + + def test_user_apply_primes_journal_after_reset_before_apply_branches(self): + tree = _read_frontend_manager_ast() + manager_class = next(node for node in tree.body + if isinstance(node, ast.ClassDef) and node.name == 'frontend_manager') + method = _find_method(manager_class, 'user_apply') + self.assertIsNotNone(method) + + reset_index = _find_stmt_index(method.body, lambda stmt: _has_call(stmt, 'reset_change_journal')) + prime_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.Try) + and _has_call(stmt, 'prime_dependency_journal')) + branch_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.If)) + + self.assertIsNotNone(reset_index) + self.assertIsNotNone(prime_index) + self.assertIsNotNone(branch_index) + self.assertLess(reset_index, prime_index) + self.assertLess(prime_index, branch_index) + + +if __name__ == '__main__': + unittest.main() diff --git a/gpoa/test/frontend/test_systemd_applier.py b/gpoa/test/frontend/test_systemd_applier.py new file mode 100644 index 00000000..d6fd648c --- /dev/null +++ b/gpoa/test/frontend/test_systemd_applier.py @@ -0,0 +1,255 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import importlib +import os +import sys +import types +import unittest +import unittest.mock + + +class _entry: + def __init__(self, valuename, data): + self.valuename = valuename + self.data = data + + +class _storage_stub: + def __init__(self, entries): + self._entries = entries + + def filter_hklm_entries(self, _branch): + return self._entries + + def get_key_value(self, _path): + return None + + +def _load_systemd_applier_module(): + if 'frontend' not in sys.modules: + frontend_pkg = types.ModuleType('frontend') + frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')] + sys.modules['frontend'] = frontend_pkg + return importlib.import_module('frontend.systemd_applier') + + +def _load_systemd_manager_module(): + if 'frontend' not in sys.modules: + frontend_pkg = types.ModuleType('frontend') + frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')] + sys.modules['frontend'] = frontend_pkg + return importlib.import_module('frontend.appliers.systemd') + + +class _fake_dbus_exception(Exception): + def __init__(self, message, dbus_name): + super().__init__(message) + self._dbus_name = dbus_name + + def get_dbus_name(self): + return self._dbus_name + + +class _fake_manager_iface: + def __init__(self, load_exc=None): + self.unit_path = '/org/freedesktop/systemd1/unit/demo_2eservice' + self.load_exc = load_exc + + def LoadUnit(self, _unit_name): + if self.load_exc: + raise self.load_exc + return self.unit_path + + +class _fake_properties_iface: + def __init__(self, load_state='loaded', get_exc=None): + self.load_state = load_state + self.get_exc = get_exc + + def Get(self, _iface, _name): + if self.get_exc: + raise self.get_exc + return self.load_state + + +class _fake_proxy: + def __init__(self, ifaces): + self.ifaces = ifaces + + +class _fake_bus: + def __init__(self, objects): + self._objects = objects + + def get_object(self, _bus_name, object_path): + return self._objects[str(object_path)] + + +class _fake_dbus_module: + DBusException = _fake_dbus_exception + + def __init__(self, load_state='loaded', load_exc=None, get_exc=None): + manager_iface = _fake_manager_iface(load_exc=load_exc) + properties_iface = _fake_properties_iface(load_state=load_state, get_exc=get_exc) + self._objects = { + '/org/freedesktop/systemd1': _fake_proxy({ + 'org.freedesktop.systemd1.Manager': manager_iface, + }), + manager_iface.unit_path: _fake_proxy({ + 'org.freedesktop.DBus.Properties': properties_iface, + }), + } + + def SystemBus(self): + return _fake_bus(self._objects) + + def SessionBus(self): + return _fake_bus(self._objects) + + @staticmethod + def String(value): + return value + + @staticmethod + def Boolean(value): + return value + + @staticmethod + def Interface(proxy, interface_name=None, dbus_interface=None): + iface_name = dbus_interface if dbus_interface is not None else interface_name + return proxy.ifaces[iface_name] + + +class _fake_subprocess_result: + def __init__(self, returncode=0, stdout='', stderr=''): + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + +class SystemdApplierTestCase(unittest.TestCase): + def test_run_skips_invalid_unit_name(self): + module = _load_systemd_applier_module() + storage = _storage_stub([ + _entry('/tmp/evil.service', '1'), + _entry('ok.service', '1'), + ]) + applier = module.systemd_applier(storage) + + good_unit = unittest.mock.Mock() + with unittest.mock.patch('frontend.systemd_applier.systemd_unit', return_value=good_unit) as ctor: + applier.run() + + ctor.assert_called_once_with('ok.service', 1) + + def test_run_handles_dbus_apply_error(self): + module = _load_systemd_applier_module() + from frontend.appliers.systemd import SystemdManagerError + + storage = _storage_stub([_entry('ok.service', '1')]) + applier = module.systemd_applier(storage) + + bad_unit = unittest.mock.Mock() + bad_unit.unit_name = 'ok.service' + bad_unit.apply.side_effect = SystemdManagerError('boom', action='start', unit='ok.service') + + with unittest.mock.patch('frontend.systemd_applier.systemd_unit', return_value=bad_unit): + applier.run() + + bad_unit.apply.assert_called_once() + + def test_manager_exists_returns_false_for_not_found_load_state(self): + module = _load_systemd_manager_module() + fake_dbus = _fake_dbus_module(load_state='not-found') + + with unittest.mock.patch('frontend.appliers.systemd._import_dbus', return_value=fake_dbus): + manager = module.SystemdManager(mode='machine') + + self.assertFalse(manager.exists('demo.service')) + + def test_manager_exists_returns_true_for_loaded_load_state(self): + module = _load_systemd_manager_module() + fake_dbus = _fake_dbus_module(load_state='loaded') + + with unittest.mock.patch('frontend.appliers.systemd._import_dbus', return_value=fake_dbus): + manager = module.SystemdManager(mode='machine') + + self.assertTrue(manager.exists('demo.service')) + + def test_manager_exists_handles_no_such_unit_error(self): + module = _load_systemd_manager_module() + exc = _fake_dbus_exception('missing', 'org.freedesktop.systemd1.NoSuchUnit') + fake_dbus = _fake_dbus_module(load_exc=exc) + + with unittest.mock.patch('frontend.appliers.systemd._import_dbus', return_value=fake_dbus): + manager = module.SystemdManager(mode='machine') + + self.assertFalse(manager.exists('demo.service')) + + def test_global_manager_exists_uses_systemctl_global_cat(self): + module = _load_systemd_manager_module() + + with unittest.mock.patch('frontend.appliers.systemd.subprocess.run') as run_mock: + run_mock.return_value = _fake_subprocess_result(returncode=0, stdout='# /etc/systemd/user/demo.service\n') + manager = module.SystemdManager(mode='global_user') + + self.assertTrue(manager.exists('demo.service')) + run_mock.assert_called_once_with( + ['systemctl', '--global', 'cat', 'demo.service'], + stdout=module.subprocess.PIPE, + stderr=module.subprocess.PIPE, + text=True, + check=False, + ) + + def test_global_manager_apply_state_ignores_now_runtime_actions(self): + module = _load_systemd_manager_module() + + with unittest.mock.patch('frontend.appliers.systemd.subprocess.run') as run_mock: + run_mock.side_effect = [ + _fake_subprocess_result(returncode=0), + _fake_subprocess_result(returncode=0), + ] + manager = module.SystemdManager(mode='global_user') + start_mock = unittest.mock.Mock(side_effect=AssertionError('start() must not be used for --global')) + manager.start = start_mock + + manager.apply_state('demo.service', 'enable', now=True) + + self.assertEqual(run_mock.call_args_list, [ + unittest.mock.call( + ['systemctl', '--global', 'unmask', 'demo.service'], + stdout=module.subprocess.PIPE, + stderr=module.subprocess.PIPE, + text=True, + check=False, + ), + unittest.mock.call( + ['systemctl', '--global', 'enable', 'demo.service'], + stdout=module.subprocess.PIPE, + stderr=module.subprocess.PIPE, + text=True, + check=False, + ), + ]) + start_mock.assert_not_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/gpoa/test/frontend/test_systemd_preferences_applier.py b/gpoa/test/frontend/test_systemd_preferences_applier.py new file mode 100644 index 00000000..e0d8430d --- /dev/null +++ b/gpoa/test/frontend/test_systemd_preferences_applier.py @@ -0,0 +1,959 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import tempfile +import os +import sys +import types +import importlib +import base64 +import unittest +import unittest.mock +from pathlib import Path + + +class _storage_stub: + def __init__(self, values=None): + self.values = values or {} + + def get_entry(self, path, dictionary=None, preg=True): + return self.values.get(path) + + def get_key_value(self, path): + return None + + +class _manager_stub: + def __init__(self, exists_map=None, active_state_map=None, reload_exc=None): + self.exists_map = exists_map or {} + self.active_state_map = active_state_map or {} + self.reload_exc = reload_exc + self.exists_calls = [] + self.apply_state_calls = [] + self.restart_calls = [] + self.stop_calls = [] + self.reload_calls = 0 + self.call_order = [] + + def exists(self, unit_name): + self.exists_calls.append(unit_name) + return self.exists_map.get(unit_name, False) + + def reload(self): + self.call_order.append('reload') + self.reload_calls += 1 + if self.reload_exc is not None: + raise self.reload_exc + + def active_state(self, unit_name): + return self.active_state_map.get(unit_name, 'inactive') + + def restart(self, unit_name): + self.call_order.append('restart:{}'.format(unit_name)) + self.restart_calls.append(unit_name) + + def stop(self, unit_name): + self.call_order.append('stop:{}'.format(unit_name)) + self.stop_calls.append(unit_name) + + def apply_state(self, unit_name, state, now): + self.call_order.append('apply_state:{}'.format(unit_name)) + self.apply_state_calls.append((unit_name, state, now)) + + +def _stub_external_modules(): + """Stub heavy external dependencies not available in unit test environment.""" + if 'samba' not in sys.modules: + samba_stub = types.ModuleType('samba') + samba_stub.getopt = types.ModuleType('samba.getopt') + sys.modules['samba'] = samba_stub + sys.modules['samba.getopt'] = samba_stub.getopt + + if 'util.samba' not in sys.modules: + util_samba = types.ModuleType('util.samba') + + class _smbopts_stub: + def get_server_role(self): + return 'member server' + + util_samba.smbopts = _smbopts_stub + sys.modules['util.samba'] = util_samba + + if 'util' not in sys.modules: + util_pkg = types.ModuleType('util') + util_pkg.__path__ = [os.path.join(os.getcwd(), 'util')] + sys.modules['util'] = util_pkg + + if 'gpoa' not in sys.modules: + gpoa_stub = types.ModuleType('gpoa') + gpoa_stub.__path__ = [os.getcwd()] + sys.modules['gpoa'] = gpoa_stub + + if 'gpoa.messages' not in sys.modules: + msg_stub = types.ModuleType('gpoa.messages') + msg_stub.message_with_code = lambda code, *a, **kw: str(code) + sys.modules['gpoa.messages'] = msg_stub + + +def _load_spa(): + _stub_external_modules() + if 'frontend' not in sys.modules: + frontend_pkg = types.ModuleType('frontend') + frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')] + sys.modules['frontend'] = frontend_pkg + return importlib.import_module('frontend.systemd_preferences_applier') + + +class SystemdPreferencesApplierTestCase(unittest.TestCase): + def test_apply_mode_skips_non_matching_rules(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + manager = _manager_stub(exists_map={ + 'exists.service': True, + 'missing.service': False, + }) + runtime.systemd_manager = manager + runtime.apply_rules([ + { + 'uid': '1', + 'unit': 'missing.service', + 'state': 'enable', + 'now': False, + 'apply_mode': 'if_exists', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [], + 'element_type': 'service', + }, + { + 'uid': '2', + 'unit': 'exists.service', + 'state': 'disable', + 'now': False, + 'apply_mode': 'if_missing', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [], + 'element_type': 'service', + }, + { + 'uid': '3', + 'unit': 'exists.service', + 'state': 'enable', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [], + 'element_type': 'service', + }, + ]) + + self.assertEqual(manager.apply_state_calls, [('exists.service', 'enable', False)]) + + def test_non_applicable_rules_still_reach_phase2_candidates(self): + # Rules that don't match apply_mode must still be checked for + # dependency-triggered restarts (phase2_candidates). + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub(exists_map={'exists.service': True}) + runtime.apply_rules([ + { + 'uid': '1', + 'unit': 'exists.service', + 'state': 'enable', + 'now': False, + 'apply_mode': 'if_missing', + 'policy_target': 'machine', + 'edit_mode': 'create', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [{'mode': 'changed', 'path': '/tmp/test.ini'}], + 'element_type': 'service', + }, + ]) + + self.assertEqual(len(runtime.phase2_candidates), 1) + self.assertEqual(runtime.phase2_candidates[0]['uid'], '1') + # State action must not have been applied (apply_mode didn't match) + self.assertEqual(runtime.systemd_manager.apply_state_calls, []) + + def test_non_applicable_rule_triggers_dependency_restart(self): + # Service exists, apply_mode='if_missing' → edit skipped, but + # dependency change must still trigger a restart. + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub( + exists_map={'exists.service': True}, + active_state_map={'exists.service': 'active'}, + ) + runtime.apply_rules([ + { + 'uid': '1', + 'unit': 'exists.service', + 'state': 'enable', + 'now': False, + 'apply_mode': 'if_missing', + 'policy_target': 'machine', + 'edit_mode': 'create', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [{'mode': 'changed', 'path': '/tmp/test.ini'}], + 'element_type': 'service', + }, + ]) + + with unittest.mock.patch('frontend.systemd_preferences_applier.query', return_value=True): + runtime.post_restart() + + self.assertIn('exists.service', runtime.systemd_manager.restart_calls) + + def test_edit_mode_create_or_override_writes_expected_paths(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub(exists_map={ + 'exists.service': True, + 'new.service': False, + }) + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + runtime.apply_rules([ + { + 'uid': '10', + 'unit': 'exists.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create_or_override', + 'dropin_name': 'custom.conf', + 'unit_file': '[Service]\nRestart=always', + 'file_dependencies': [], + 'element_type': 'service', + }, + { + 'uid': '11', + 'unit': 'new.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create_or_override', + 'dropin_name': 'custom.conf', + 'unit_file': '[Service]\nRestart=no', + 'file_dependencies': [], + 'element_type': 'service', + }, + ]) + + dropin_path = '{}/exists.service.d/custom.conf'.format(tmpdir) + create_path = '{}/new.service'.format(tmpdir) + with open(dropin_path, 'r', encoding='utf-8') as fh: + self.assertIn('gpupdate-managed uid: 10', fh.read()) + with open(create_path, 'r', encoding='utf-8') as fh: + self.assertIn('gpupdate-managed uid: 11', fh.read()) + self.assertGreaterEqual(runtime.systemd_manager.reload_calls, 1) + + def test_apply_rules_uses_reload_barrier_before_state_actions(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + manager = _manager_stub(exists_map={'demo.service': False}) + runtime.systemd_manager = manager + + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + runtime.apply_rules([{ + 'uid': 'reload-order', + 'unit': 'demo.service', + 'state': 'enable', + 'now': True, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create', + 'dropin_name': '50-gpo.conf', + 'unit_file': '[Unit]\nDescription=Demo', + 'file_dependencies': [], + 'element_type': 'service', + }]) + + self.assertEqual(manager.call_order, ['reload', 'apply_state:demo.service']) + + def test_apply_rules_skips_state_actions_when_reload_fails(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + manager = _manager_stub( + exists_map={'demo.service': False, 'stateonly.service': True}, + reload_exc=spa.SystemdManagerError('reload failed', action='reload'), + ) + runtime.systemd_manager = manager + + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + runtime.apply_rules([ + { + 'uid': 'reload-fail', + 'unit': 'demo.service', + 'state': 'enable', + 'now': True, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create', + 'dropin_name': '50-gpo.conf', + 'unit_file': '[Unit]\nDescription=Demo', + 'file_dependencies': [], + 'element_type': 'service', + }, + { + 'uid': 'state-only', + 'unit': 'stateonly.service', + 'state': 'disable', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [], + 'element_type': 'service', + }, + ]) + + self.assertEqual(manager.reload_calls, 1) + self.assertEqual(manager.apply_state_calls, []) + self.assertEqual(runtime.phase2_candidates, []) + + def test_normalize_rule_unescapes_newline_sequences_in_unit_file(self): + spa = _load_spa() + + normalized = spa._normalize_rule({ + 'uid': '12', + 'unit': 'escaped.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': '[Service]\\nRestart=always', + 'file_dependencies': [], + 'element_type': 'service', + }) + + self.assertEqual(normalized['unit_file'], '[Service]\nRestart=always') + + def test_normalize_rule_maps_remove_policy_aliases(self): + spa = _load_spa() + + normalized_legacy = spa._normalize_rule({ + 'uid': 'rp-1', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'removePolicy': '1', + }) + normalized_snake = spa._normalize_rule({ + 'uid': 'rp-2', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'remove_policy': True, + }) + + self.assertTrue(normalized_legacy['remove_policy']) + self.assertTrue(normalized_snake['remove_policy']) + + def test_normalize_rule_decodes_unit_file_b64_with_priority(self): + spa = _load_spa() + + original = "[Service]\nExecStart=/bin/bash -c \"echo 'ok'\"\n" + encoded = base64.b64encode(original.encode('utf-8')).decode('ascii') + normalized = spa._normalize_rule({ + 'uid': '13', + 'unit': 'encoded.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file_b64': encoded, + 'unit_file': '[Service]\\nExecStart=/bin/false', + 'file_dependencies': [], + 'element_type': 'service', + }) + + self.assertEqual(normalized['unit_file'], original) + + def test_normalize_rule_falls_back_to_legacy_when_unit_file_b64_invalid(self): + spa = _load_spa() + + with unittest.mock.patch('frontend.systemd_preferences_applier.log') as log_mock: + normalized = spa._normalize_rule({ + 'uid': '14', + 'unit': 'encoded.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file_b64': 'invalid-%%%', + 'unit_file': '[Service]\\nRestart=always', + 'file_dependencies': [], + 'element_type': 'service', + }) + + self.assertEqual(normalized['unit_file'], '[Service]\nRestart=always') + log_mock.assert_any_call('W47', { + 'reason': 'Invalid unit_file_b64 payload', + 'unit': 'encoded.service', + 'uid': '14', + }) + + def test_normalize_rule_truncates_too_many_dependencies(self): + spa = _load_spa() + + too_many = [{'mode': 'changed', 'path': '/etc/demo{}'.format(idx)} for idx in range(64)] + normalized = spa._normalize_rule({ + 'uid': '15', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'dropin_name': '50-gpo.conf', + 'file_dependencies': too_many, + }) + self.assertIsNotNone(normalized) + self.assertEqual(len(normalized['file_dependencies']), 32) + + def test_normalize_rule_filters_invalid_dependency_paths(self): + spa = _load_spa() + + normalized = spa._normalize_rule({ + 'uid': '16', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + {'mode': 'changed', 'path': '../relative'}, + {'mode': 'changed', 'path': '/tmp/\ninvalid'}, + ], + }) + self.assertEqual(normalized['file_dependencies'], [{'mode': 'changed', 'path': '/etc/demo.conf'}]) + + def test_normalize_rule_rejects_oversized_unit_file(self): + spa = _load_spa() + + huge_payload = 'A' * (spa.MAX_UNIT_FILE_SIZE + 1) + encoded = base64.b64encode(huge_payload.encode('utf-8')).decode('ascii') + normalized = spa._normalize_rule({ + 'uid': '17', + 'unit': 'huge.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file_b64': encoded, + }) + self.assertIsNone(normalized['unit_file']) + + def test_post_restart_uses_dependency_modes(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub(active_state_map={'demo.service': 'active'}) + runtime.phase2_candidates = [{ + 'uid': '1', + 'unit': 'demo.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + {'mode': 'presence_changed', 'path': '/etc/demo.presence'}, + ], + 'element_type': 'service', + }] + + with unittest.mock.patch('frontend.systemd_preferences_applier.query') as query_mock: + query_mock.side_effect = lambda path, mode='changed': mode == 'changed' + runtime.post_restart() + + self.assertIn('demo.service', runtime.systemd_manager.restart_calls) + + def test_post_restart_skips_when_dependency_unchanged(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub(active_state_map={'demo.service': 'active'}) + runtime.phase2_candidates = [{ + 'uid': '1', + 'unit': 'demo.service', + 'state': 'as_is', + 'now': False, + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropin_name': '50-gpo.conf', + 'unit_file': None, + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + ], + 'element_type': 'service', + }] + + with unittest.mock.patch('frontend.systemd_preferences_applier.query', return_value=False): + runtime.post_restart() + + self.assertNotIn('demo.service', runtime.systemd_manager.restart_calls) + + def test_removed_rules_detected_from_previous_snapshot(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([{ + 'uid': 'keep', + 'unit': 'keep.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + }]), + 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([ + { + 'uid': 'keep', + 'unit': 'keep.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + }, + { + 'uid': 'drop', + 'unit': 'drop.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + }, + ]), + }) + removed = spa._get_removed_rules(storage, 'Machine', 'machine') + self.assertEqual(len(removed), 1) + self.assertEqual(removed[0]['uid'], 'drop') + + def test_get_rule_sets_for_scope_includes_remove_policy_cleanup(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([ + { + 'uid': 'active', + 'unit': 'active.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'removePolicy': '0', + }, + { + 'uid': 'cleanup', + 'unit': 'cleanup.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create', + 'removePolicy': '1', + }, + ]), + 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([]), + }) + + active_rules, cleanup_rules = spa._get_rule_sets_for_scope(storage, 'Machine', 'machine') + self.assertEqual([rule['uid'] for rule in active_rules], ['active']) + self.assertEqual([rule['uid'] for rule in cleanup_rules], ['cleanup']) + + def test_normalize_rule_rejects_unsafe_unit_and_dropin_paths(self): + spa = _load_spa() + + bad_unit = { + 'uid': 'bad-unit', + 'unit': '/tmp/evil.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + } + bad_dropin = { + 'uid': 'bad-dropin', + 'unit': 'safe.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'dropInName': '../../evil.conf', + } + self.assertIsNone(spa._normalize_rule(bad_unit)) + self.assertIsNone(spa._normalize_rule(bad_dropin)) + + def test_cleanup_removed_rules_keeps_non_restartable_types_skipped(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub(active_state_map={'usb.device': 'active'}) + + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + managed = os.path.join(tmpdir, 'usb.device') + with open(managed, 'w', encoding='utf-8') as file_obj: + file_obj.write('# gpupdate-managed uid: deadbeef\n[Unit]\nDescription=test\n') + + removed_rule = { + 'uid': 'deadbeef', + 'unit': 'usb.device', + 'dropin_name': '50-gpo.conf', + 'element_type': 'device', + } + + runtime.cleanup_removed_rules([removed_rule]) + + self.assertFalse(os.path.exists(managed)) + self.assertEqual(runtime.systemd_manager.reload_calls, 1) + self.assertNotIn('usb.device', runtime.systemd_manager.stop_calls) + + def test_cleanup_removed_rules_requires_marker_on_first_line(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub(active_state_map={'demo.service': 'active'}) + + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + managed = os.path.join(tmpdir, 'demo.service') + with open(managed, 'w', encoding='utf-8') as file_obj: + file_obj.write('[Unit]\n# gpupdate-managed uid: deadbeef\nDescription=test\n') + + removed_rule = { + 'uid': 'deadbeef', + 'unit': 'demo.service', + 'dropin_name': '50-gpo.conf', + 'element_type': 'service', + } + runtime.cleanup_removed_rules([removed_rule]) + + self.assertTrue(os.path.exists(managed)) + self.assertEqual(runtime.systemd_manager.reload_calls, 0) + + def test_cleanup_removed_rules_skips_restart_when_reload_fails(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub( + active_state_map={'demo.service': 'active'}, + reload_exc=spa.SystemdManagerError('reload failed', action='reload'), + ) + + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + managed = os.path.join(tmpdir, 'demo.service') + with open(managed, 'w', encoding='utf-8') as file_obj: + file_obj.write('# gpupdate-managed uid: deadbeef\n[Unit]\nDescription=test\n') + + removed_rule = { + 'uid': 'deadbeef', + 'unit': 'demo.service', + 'dropin_name': '50-gpo.conf', + 'element_type': 'service', + } + runtime.cleanup_removed_rules([removed_rule]) + + self.assertFalse(os.path.exists(managed)) + self.assertEqual(runtime.systemd_manager.reload_calls, 1) + self.assertEqual(runtime.systemd_manager.stop_calls, []) + + def test_write_rule_file_skips_symlink_target(self): + spa = _load_spa() + + storage = _storage_stub() + runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine')) + runtime.systemd_manager = _manager_stub() + + with tempfile.TemporaryDirectory() as tmpdir: + runtime.context.systemd_dir = tmpdir + real_target = os.path.join(tmpdir, 'real.service') + with open(real_target, 'w', encoding='utf-8') as file_obj: + file_obj.write('real') + + symlink_target = os.path.join(tmpdir, 'evil.service') + os.symlink(real_target, symlink_target) + + runtime._write_rule_file(Path(symlink_target), 'uid-1', '[Unit]\nDescription=test') + with open(real_target, 'r', encoding='utf-8') as file_obj: + self.assertEqual(file_obj.read(), 'real') + + def test_user_context_skips_when_user_manager_unavailable(self): + spa = _load_spa() + + storage = _storage_stub() + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + applier = spa.systemd_preferences_applier_user(storage, 'root') + with unittest.mock.patch('os.path.exists', return_value=False): + with unittest.mock.patch('frontend.systemd_preferences_applier._systemd_preferences_runtime.apply_rules') as apply_mock: + applier.user_context_apply() + self.assertFalse(apply_mock.called) + + def test_prime_dependency_journal_machine_watches_machine_dependencies(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([{ + 'uid': 'rule-1', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + ], + }]), + }) + + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + applier = spa.systemd_preferences_applier(storage) + with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock: + applier.prime_dependency_journal() + watch_many_mock.assert_called_once_with(['/etc/demo.conf']) + + def test_prime_dependency_journal_machine_watches_global_user_dependencies(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([ + { + 'uid': 'rule-machine', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + ], + }, + { + 'uid': 'rule-global-user', + 'unit': 'demo-user.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'user', + 'edit_mode': 'override', + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo-user.conf'}, + ], + }, + ]), + }) + + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + applier = spa.systemd_preferences_applier(storage) + with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock: + applier.prime_dependency_journal() + watch_many_mock.assert_called_once_with(['/etc/demo.conf', '/etc/demo-user.conf']) + + def test_prime_dependency_journal_skips_remove_policy_rules(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([{ + 'uid': 'rule-removed', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'removePolicy': '1', + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + ], + }]), + }) + + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + applier = spa.systemd_preferences_applier(storage) + with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock: + applier.prime_dependency_journal() + watch_many_mock.assert_called_once_with([]) + + def test_prime_dependency_journal_user_watches_machine_and_user_dependencies(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/alice/Systemds': str([ + { + 'uid': 'rule-machine', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'override', + 'file_dependencies': [ + {'mode': 'changed', 'path': '/etc/demo.conf'}, + ], + }, + { + 'uid': 'rule-user', + 'unit': 'demo.service', + 'state': 'as_is', + 'apply_mode': 'always', + 'policy_target': 'user', + 'edit_mode': 'override', + 'file_dependencies': [ + {'mode': 'changed', 'path': '%HOME%/.config/demo.conf'}, + ], + }, + ]), + }) + + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + with unittest.mock.patch('frontend.systemd_preferences_applier.get_uid_by_username', return_value=1000): + with unittest.mock.patch('frontend.systemd_preferences_applier.get_homedir', return_value='/home/alice'): + applier = spa.systemd_preferences_applier_user(storage, 'alice') + with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock: + applier.prime_dependency_journal() + expected_user_path = spa._expand_windows_var('%HOME%/.config/demo.conf', username='alice') + watch_many_mock.assert_called_once_with(['/etc/demo.conf', expected_user_path]) + + def test_apply_uses_cleanup_rules_for_remove_policy_items(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([ + { + 'uid': 'cleanup', + 'unit': 'cleanup.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create', + 'removePolicy': '1', + }, + ]), + 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([]), + }) + + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + with unittest.mock.patch('frontend.systemd_preferences_applier._systemd_preferences_runtime') as runtime_ctor: + runtime = unittest.mock.Mock() + global_runtime = unittest.mock.Mock() + runtime_ctor.side_effect = [runtime, global_runtime] + + applier = spa.systemd_preferences_applier(storage) + applier.apply() + + runtime.apply_rules.assert_called_once_with([]) + cleanup_arg = runtime.cleanup_removed_rules.call_args[0][0] + self.assertEqual(len(cleanup_arg), 1) + self.assertEqual(cleanup_arg[0]['uid'], 'cleanup') + global_runtime.apply_rules.assert_called_once_with([]) + global_runtime.cleanup_removed_rules.assert_called_once_with([]) + + def test_machine_apply_routes_global_user_rules_to_global_context(self): + spa = _load_spa() + + storage = _storage_stub({ + 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([ + { + 'uid': 'machine-rule', + 'unit': 'machine.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'machine', + 'edit_mode': 'create', + }, + { + 'uid': 'global-user-rule', + 'unit': 'global-user.service', + 'state': 'enable', + 'apply_mode': 'always', + 'policy_target': 'user', + 'edit_mode': 'create', + }, + ]), + 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([]), + }) + + with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True): + with unittest.mock.patch('frontend.systemd_preferences_applier._systemd_preferences_runtime') as runtime_ctor: + runtime = unittest.mock.Mock() + global_runtime = unittest.mock.Mock() + runtime_ctor.side_effect = [runtime, global_runtime] + + applier = spa.systemd_preferences_applier(storage) + applier.apply() + + self.assertEqual(runtime_ctor.call_args_list[0][0][2].mode, 'machine') + self.assertEqual(runtime_ctor.call_args_list[1][0][2].mode, 'global_user') + runtime.apply_rules.assert_called_once_with([ + unittest.mock.ANY, + ]) + self.assertEqual(runtime.apply_rules.call_args[0][0][0]['uid'], 'machine-rule') + global_runtime.apply_rules.assert_called_once_with([ + unittest.mock.ANY, + ]) + self.assertEqual(global_runtime.apply_rules.call_args[0][0][0]['uid'], 'global-user-rule') + + +if __name__ == '__main__': + unittest.main() diff --git a/gpoa/test/gpt/data/Systemds.xml b/gpoa/test/gpt/data/Systemds.xml new file mode 100644 index 00000000..b2b4d60c --- /dev/null +++ b/gpoa/test/gpt/data/Systemds.xml @@ -0,0 +1,43 @@ + + + + + [Service] +Restart=always + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/gpoa/test/gpt/data/Systemds_invalid.xml b/gpoa/test/gpt/data/Systemds_invalid.xml new file mode 100644 index 00000000..6c9bfa74 --- /dev/null +++ b/gpoa/test/gpt/data/Systemds_invalid.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/gpoa/test/gpt/test_systemds.py b/gpoa/test/gpt/test_systemds.py new file mode 100644 index 00000000..82663882 --- /dev/null +++ b/gpoa/test/gpt/test_systemds.py @@ -0,0 +1,236 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import tempfile +import unittest +import unittest.mock +import ast +import base64 +import types +from enum import Enum, unique +from pathlib import Path + + +class _storage_stub: + def __init__(self): + self.items = [] + + def add_systemd(self, item, policy_name): + item.policy_name = policy_name + self.items.append(item) + +def _load_gpt_discovery_helpers(): + source_path = os.path.join(os.getcwd(), 'gpt', 'gpt.py') + with open(source_path, 'r', encoding='utf-8') as file_obj: + tree = ast.parse(file_obj.read(), filename=source_path) + + needed_names = { + 'FileType', + 'get_preftype', + 'find_dir', + 'find_file', + 'find_preferences', + 'find_preffile', + } + selected_nodes = [] + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.ClassDef)) and node.name in needed_names: + selected_nodes.append(node) + elif isinstance(node, ast.Assign): + targets = [target.id for target in node.targets if isinstance(target, ast.Name)] + if any(name in needed_names for name in targets): + selected_nodes.append(node) + + module = ast.Module(body=selected_nodes, type_ignores=[]) + namespace = { + 'Enum': Enum, + 'unique': unique, + 'os': os, + 'Path': Path, + } + exec(compile(module, source_path, 'exec'), namespace) + return types.SimpleNamespace(**namespace) + + +class GptSystemdsTestCase(unittest.TestCase): + def _path(self, filename): + return '{}/test/gpt/data/{}'.format(os.getcwd(), filename) + + def test_read_systemds_all_types(self): + import gpt.systemds + + items = gpt.systemds.read_systemds(self._path('Systemds.xml')) + self.assertEqual(len(items), 11) + self.assertEqual(items[0].unit, 'sshd.service') + self.assertEqual(items[0].state, 'enable') + self.assertEqual(items[0].apply_mode, 'always') + self.assertEqual(items[0].policy_target, 'machine') + self.assertEqual(items[0].edit_mode, 'create_or_override') + self.assertEqual(items[0].dropin_name, 'override.conf') + self.assertEqual(items[0].unit_file_mode, 'text') + self.assertEqual( + base64.b64decode(items[0].unit_file_b64.encode('ascii')).decode('utf-8'), + items[0].unit_file, + ) + self.assertEqual(len(items[0].file_dependencies), 2) + + # Ensure automatic suffix mapping works for all supported tags. + expected_suffixes = { + 'service': '.service', + 'socket': '.socket', + 'timer': '.timer', + 'path': '.path', + 'mount': '.mount', + 'automount': '.automount', + 'swap': '.swap', + 'target': '.target', + 'device': '.device', + 'slice': '.slice', + 'scope': '.scope', + } + for item in items: + self.assertTrue(item.unit.endswith(expected_suffixes[item.element_type])) + + def test_soft_validation_skips_invalid_entries(self): + import gpt.systemds + + items = gpt.systemds.read_systemds(self._path('Systemds_invalid.xml')) + # good + bad-dep (kept with filtered deps); invalid path values are skipped + self.assertEqual(len(items), 2) + self.assertEqual(items[0].unit, 'good.service') + self.assertEqual(items[1].unit, 'bad3.service') + self.assertEqual(items[1].file_dependencies, []) + units = {item.unit for item in items} + self.assertNotIn('../../tmp/evil.service', units) + self.assertNotIn('safe.service', units) + + def test_merge_systemds(self): + import gpt.systemds + + storage = _storage_stub() + items = gpt.systemds.read_systemds(self._path('Systemds.xml')) + gpt.systemds.merge_systemds(storage, items, 'policy-test') + self.assertEqual(len(storage.items), len(items)) + self.assertEqual(storage.items[0].policy_name, 'policy-test') + + def test_read_systemds_preserves_quotes_via_unit_file_b64(self): + import gpt.systemds + + unit_file_text = "[Service]\nExecStart=/bin/bash -c \"echo 'ok'\"\n" + xml_content = """ + + + + {} + + + +""".format(unit_file_text) + + with tempfile.NamedTemporaryFile('w', encoding='utf-8', suffix='.xml', delete=False) as file_obj: + file_obj.write(xml_content) + tmp_path = file_obj.name + + try: + items = gpt.systemds.read_systemds(tmp_path) + finally: + os.unlink(tmp_path) + + self.assertEqual(len(items), 1) + restored = base64.b64decode(items[0].unit_file_b64.encode('ascii')).decode('utf-8') + self.assertEqual(restored, unit_file_text) + + def test_read_systemds_rejects_invalid_dependency_path(self): + import gpt.systemds + + xml_content = """ + + + + + + + + + +""" + with tempfile.NamedTemporaryFile('w', encoding='utf-8', suffix='.xml', delete=False) as file_obj: + file_obj.write(xml_content) + tmp_path = file_obj.name + + try: + items = gpt.systemds.read_systemds(tmp_path) + finally: + os.unlink(tmp_path) + + self.assertEqual(len(items), 1) + self.assertEqual(items[0].file_dependencies, []) + + def test_read_systemds_rejects_oversized_unit_file(self): + import gpt.systemds + + unit_file_text = "A" * (gpt.systemds.MAX_UNIT_FILE_SIZE + 1) + xml_content = """ + + + + {} + + + +""".format(unit_file_text) + with tempfile.NamedTemporaryFile('w', encoding='utf-8', suffix='.xml', delete=False) as file_obj: + file_obj.write(xml_content) + tmp_path = file_obj.name + + try: + items = gpt.systemds.read_systemds(tmp_path) + finally: + os.unlink(tmp_path) + + self.assertEqual(items, []) + + def test_gpt_discovery_supports_windows_systemd_layout(self): + gpt_helpers = _load_gpt_discovery_helpers() + + with tempfile.TemporaryDirectory() as tmpdir: + machine_dir = os.path.join(tmpdir, 'MACHINE') + valid_dir = os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMD') + os.makedirs(valid_dir, exist_ok=True) + valid_file = os.path.join(valid_dir, 'SYSTEMD.XML') + with open(valid_file, 'w', encoding='utf-8') as file_obj: + file_obj.write('') + + invalid_paths = [ + os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMDS', 'SYSTEMDS.XML'), + os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMDS', 'SYSTEMD.XML'), + os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMD', 'SYSTEMDS.XML'), + ] + for path in invalid_paths: + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'w', encoding='utf-8') as file_obj: + file_obj.write('') + + found = gpt_helpers.find_preffile(machine_dir, 'systemd') + self.assertEqual(found, valid_file) + self.assertEqual(gpt_helpers.get_preftype(valid_file), gpt_helpers.FileType.SYSTEMDS) + + +if __name__ == '__main__': + unittest.main() diff --git a/gpoa/test/storage/test_systemds_storage.py b/gpoa/test/storage/test_systemds_storage.py new file mode 100644 index 00000000..25e3915b --- /dev/null +++ b/gpoa/test/storage/test_systemds_storage.py @@ -0,0 +1,129 @@ +# +# GPOA - GPO Applier for Linux +# +# Copyright (C) 2026 BaseALT Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest +import ast +import base64 + +from gpt.systemds import systemd_policy + + +class SystemdsStorageTestCase(unittest.TestCase): + def setUp(self): + try: + from storage.dconf_registry import Dconf_registry + except Exception as exc: + self.skipTest('storage.dconf_registry is unavailable: {}'.format(exc)) + self.Dconf_registry = Dconf_registry + self.add_preferences_to_global_registry_dict = __import__( + 'storage.dconf_registry', fromlist=['add_preferences_to_global_registry_dict'] + ).add_preferences_to_global_registry_dict + + self._saved_registry = self.Dconf_registry.global_registry_dict + self._saved_systemds = self.Dconf_registry.systemds + + self.Dconf_registry.global_registry_dict = {self.Dconf_registry._GpoPriority: {}} + self.Dconf_registry.systemds = [] + + def tearDown(self): + if hasattr(self, 'Dconf_registry'): + self.Dconf_registry.global_registry_dict = self._saved_registry + self.Dconf_registry.systemds = self._saved_systemds + + def test_add_get_and_serialize_systemds(self): + item = systemd_policy('sshd.service') + item.uid = 'uid-1' + item.clsid = 'clsid-1' + item.name = 'sshd' + item.state = 'enable' + item.policy_target = 'machine' + item.apply_mode = 'always' + item.edit_mode = 'override' + item.dropin_name = '50-gpo.conf' + item.file_dependencies = [] + self.Dconf_registry.add_systemd(item, 'Policy') + + self.assertEqual(len(self.Dconf_registry.get_systemds()), 1) + self.add_preferences_to_global_registry_dict('Machine', True) + + prefix = 'Software/BaseALT/Policies/Preferences/Machine' + data = self.Dconf_registry.global_registry_dict[prefix]['Systemds'] + self.assertIn('sshd.service', data) + self.assertIn('uid-1', data) + + def test_remove_duplicates_supports_nested_lists(self): + item = systemd_policy('nginx.service') + item.uid = 'uid-2' + item.clsid = 'clsid-2' + item.name = 'nginx' + item.state = 'enable' + item.policy_target = 'machine' + item.apply_mode = 'if_exists' + item.edit_mode = 'override' + item.dropin_name = '50-gpo.conf' + item.file_dependencies = [{'mode': 'changed', 'path': '/etc/nginx/nginx.conf'}] + + duplicate = systemd_policy('nginx.service') + duplicate.uid = 'uid-2' + duplicate.clsid = 'clsid-2' + duplicate.name = 'nginx' + duplicate.state = 'enable' + duplicate.policy_target = 'machine' + duplicate.apply_mode = 'if_exists' + duplicate.edit_mode = 'override' + duplicate.dropin_name = '50-gpo.conf' + duplicate.file_dependencies = [{'mode': 'changed', 'path': '/etc/nginx/nginx.conf'}] + + self.Dconf_registry.add_systemd(item, 'Policy') + self.Dconf_registry.add_systemd(duplicate, 'Policy') + + self.add_preferences_to_global_registry_dict('Machine', True) + + prefix = 'Software/BaseALT/Policies/Preferences/Machine' + data = self.Dconf_registry.global_registry_dict[prefix]['Systemds'] + self.assertIn('nginx.service', data) + self.assertEqual(data.count('uid-2'), 1) + + def test_serialize_preserves_unit_file_b64_payload(self): + unit_file_text = "[Service]\nExecStart=/bin/bash -c \"echo 'ok'\"\n" + item = systemd_policy('quoted.service') + item.uid = 'uid-3' + item.clsid = 'clsid-3' + item.name = 'quoted' + item.state = 'as_is' + item.policy_target = 'machine' + item.apply_mode = 'always' + item.edit_mode = 'override' + item.dropin_name = '50-gpo.conf' + item.file_dependencies = [] + item.unit_file_b64 = base64.b64encode(unit_file_text.encode('utf-8')).decode('ascii') + + self.Dconf_registry.add_systemd(item, 'Policy') + self.add_preferences_to_global_registry_dict('Machine', True) + + prefix = 'Software/BaseALT/Policies/Preferences/Machine' + data = self.Dconf_registry.global_registry_dict[prefix]['Systemds'] + parsed = ast.literal_eval(data) + encoded = parsed[0].get('unit_file_b64') + self.assertIsNotNone(encoded) + restored = base64.b64decode(encoded.encode('ascii')).decode('utf-8') + self.assertEqual(restored, unit_file_text) + + +if __name__ == '__main__': + unittest.main() diff --git a/gpoa/util/util.py b/gpoa/util/util.py index e8fb5ab4..712b880c 100644 --- a/gpoa/util/util.py +++ b/gpoa/util/util.py @@ -190,7 +190,7 @@ def get_policy_variants(): def string_to_literal_eval(string): try: literaleval = ast.literal_eval(string) - except: + except (ValueError, SyntaxError): literaleval = string return literaleval diff --git a/gpupdate.spec b/gpupdate.spec index 1cbe7fc4..3755baf2 100644 --- a/gpupdate.spec +++ b/gpupdate.spec @@ -2,8 +2,14 @@ #add_python3_self_prov_path %buildroot%python3_sitelibdir/gpoa %add_python3_req_skip applaer.systemd +%add_python3_req_skip frontend.appliers.systemd +%add_python3_req_skip frontend.change_journal +%add_python3_req_skip frontend.systemd_applier +%add_python3_req_skip frontend.systemd_preferences_applier %add_python3_req_skip backend %add_python3_req_skip frontend.frontend_manager +%add_python3_req_skip gpt.systemds +%add_python3_req_skip gpt.systemds_constants %add_python3_req_skip gpt.envvars %add_python3_req_skip gpt.folders %add_python3_req_skip gpt.gpt @@ -39,7 +45,7 @@ %add_python3_req_skip frontend.appliers.ini_file Name: gpupdate -Version: 0.14.2 +Version: 0.14.3 Release: alt1 Summary: GPT applier @@ -211,6 +217,10 @@ fi %exclude %python3_sitelibdir/gpoa/test %changelog +* Sun Apr 12 2026 Korney Gedert 0.14.3-alt1 +- Added Systemds preferences applier with file-dependency restart support +- Added change_journal module for dependency snapshot tracking + * Thu Feb 26 2026 Danila Skachedubov 0.14.2-alt1 - Fix username resolution for trusted domain users