diff --git a/gpoa/frontend/appliers/systemd.py b/gpoa/frontend/appliers/systemd.py
index 3659ebd1..fe5a804c 100644
--- a/gpoa/frontend/appliers/systemd.py
+++ b/gpoa/frontend/appliers/systemd.py
@@ -16,32 +16,306 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-import dbus
+import re
+import subprocess
from util.logging import log
-class systemd_unit:
- def __init__(self, unit_name, state):
- self.system_bus = dbus.SystemBus()
- self.systemd_dbus = self.system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
- self.manager = dbus.Interface(self.systemd_dbus, 'org.freedesktop.systemd1.Manager')
+SYSTEMD_BUS_NAME = 'org.freedesktop.systemd1'
+SYSTEMD_OBJECT_PATH = '/org/freedesktop/systemd1'
+SYSTEMD_MANAGER_IFACE = 'org.freedesktop.systemd1.Manager'
+SYSTEMD_UNIT_IFACE = 'org.freedesktop.systemd1.Unit'
+DBUS_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties'
+NO_SUCH_UNIT_ERRORS = {
+ 'org.freedesktop.systemd1.NoSuchUnit',
+ 'org.freedesktop.systemd1.LoadFailed',
+}
+UNIT_NAME_RE = re.compile(
+ r'^[A-Za-z0-9:_.@-]{1,255}\.(service|socket|timer|path|mount|automount|swap|target|device|slice|scope)$'
+)
+
+
+class SystemdManagerError(Exception):
+ def __init__(self, message, action=None, unit=None, dbus_name=None):
+ super().__init__(message)
+ self.action = action
+ self.unit = unit
+ self.dbus_name = dbus_name
+
+
+def is_valid_unit_name(unit_name):
+ if not isinstance(unit_name, str):
+ return False
+ if not unit_name:
+ return False
+ if any(ord(ch) < 32 or ord(ch) == 127 for ch in unit_name):
+ return False
+ if not UNIT_NAME_RE.match(unit_name):
+ return False
+ name_part = unit_name.rsplit('.', 1)[0]
+ if name_part.startswith('-') or name_part.endswith('-'):
+ return False
+ return True
+
+
+def _import_dbus():
+ import dbus
+ return dbus
+
+
+class SystemdManager:
+ def __init__(self, mode='machine'):
+ self.mode = mode
+ self.dbus = None
+ self.bus = None
+ self.systemd = None
+ self.manager = None
+
+ if mode == 'global_user':
+ return
+
+ self.dbus = _import_dbus()
+ if mode == 'user':
+ self.bus = self.dbus.SessionBus()
+ else:
+ self.bus = self.dbus.SystemBus()
+
+ self.systemd = self.bus.get_object(SYSTEMD_BUS_NAME, SYSTEMD_OBJECT_PATH)
+ self.manager = self.dbus.Interface(self.systemd, SYSTEMD_MANAGER_IFACE)
+
+ def _fail(self, action, exc, unit=None):
+ dbus_name = None
+ if hasattr(exc, 'get_dbus_name'):
+ dbus_name = exc.get_dbus_name()
+ raise SystemdManagerError(str(exc), action=action, unit=unit, dbus_name=dbus_name)
+
+ def _run_global(self, args):
+ return subprocess.run(
+ ['systemctl', '--global'] + list(args),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ check=False,
+ )
+
+ def _global_output(self, result):
+ output = '{}\n{}'.format(result.stdout or '', result.stderr or '').strip()
+ return output or 'systemctl --global command failed'
+
+ def _global_not_found(self, result):
+ output = self._global_output(result).lower()
+ return 'no files found' in output or 'not-found' in output or 'not found' in output
+
+ def _fail_global(self, action, result, unit=None):
+ raise SystemdManagerError(self._global_output(result), action=action, unit=unit)
+
+ def _unsupported_global_action(self, action, unit=None):
+ log('W48', {
+ 'reason': 'systemctl --global does not support runtime action {}'.format(action),
+ 'action': action,
+ 'unit': unit,
+ })
+ return
+
+ def _load_unit(self, unit_name):
+ try:
+ return self.manager.LoadUnit(self.dbus.String(unit_name))
+ except self.dbus.DBusException as exc:
+ self._fail('load_unit', exc, unit=unit_name)
+
+ def exists(self, unit_name):
+ if not is_valid_unit_name(unit_name):
+ return False
+
+ if self.mode == 'global_user':
+ result = self._run_global(['cat', unit_name])
+ if result.returncode == 0:
+ return True
+ if self._global_not_found(result):
+ return False
+ self._fail_global('exists', result, unit=unit_name)
+
+ try:
+ unit_path = self.manager.LoadUnit(self.dbus.String(unit_name))
+ proxy = self.bus.get_object(SYSTEMD_BUS_NAME, str(unit_path))
+ properties = self.dbus.Interface(proxy, dbus_interface=DBUS_PROPERTIES_IFACE)
+ load_state = str(properties.Get(SYSTEMD_UNIT_IFACE, 'LoadState'))
+ return load_state != 'not-found'
+ except self.dbus.DBusException as exc:
+ if exc.get_dbus_name() in NO_SUCH_UNIT_ERRORS:
+ return False
+ self._fail('exists', exc, unit=unit_name)
+
+ def _unit_properties(self, unit_name):
+ unit_path = self._load_unit(unit_name)
+ proxy = self.bus.get_object(SYSTEMD_BUS_NAME, str(unit_path))
+ return self.dbus.Interface(proxy, dbus_interface=DBUS_PROPERTIES_IFACE)
+
+ def active_state(self, unit_name):
+ if not is_valid_unit_name(unit_name):
+ return None
+ if self.mode == 'global_user':
+ return None
+ try:
+ properties = self._unit_properties(unit_name)
+ return str(properties.Get(SYSTEMD_UNIT_IFACE, 'ActiveState'))
+ except self.dbus.DBusException as exc:
+ self._fail('active_state', exc, unit=unit_name)
+
+ def reload(self):
+ if self.mode == 'global_user':
+ return
+ try:
+ self.manager.Reload()
+ except self.dbus.DBusException as exc:
+ self._fail('reload', exc)
+
+ def start(self, unit_name):
+ if self.mode == 'global_user':
+ self._unsupported_global_action('start', unit=unit_name)
+ return
+ try:
+ self.manager.StartUnit(unit_name, 'replace')
+ except self.dbus.DBusException as exc:
+ self._fail('start', exc, unit=unit_name)
+
+ def stop(self, unit_name):
+ if self.mode == 'global_user':
+ self._unsupported_global_action('stop', unit=unit_name)
+ return
+ try:
+ self.manager.StopUnit(unit_name, 'replace')
+ except self.dbus.DBusException as exc:
+ self._fail('stop', exc, unit=unit_name)
+
+ def restart(self, unit_name):
+ if self.mode == 'global_user':
+ self._unsupported_global_action('restart', unit=unit_name)
+ return
+ try:
+ self.manager.RestartUnit(unit_name, 'replace')
+ except self.dbus.DBusException as exc:
+ self._fail('restart', exc, unit=unit_name)
+
+ def enable(self, unit_name):
+ if self.mode == 'global_user':
+ result = self._run_global(['enable', unit_name])
+ if result.returncode != 0:
+ self._fail_global('enable', result, unit=unit_name)
+ return
+ try:
+ self.manager.EnableUnitFiles([unit_name], self.dbus.Boolean(False), self.dbus.Boolean(True))
+ except self.dbus.DBusException as exc:
+ self._fail('enable', exc, unit=unit_name)
+
+ def disable(self, unit_name):
+ if self.mode == 'global_user':
+ result = self._run_global(['disable', unit_name])
+ if result.returncode != 0:
+ self._fail_global('disable', result, unit=unit_name)
+ return
+ try:
+ self.manager.DisableUnitFiles([unit_name], self.dbus.Boolean(False))
+ except self.dbus.DBusException as exc:
+ self._fail('disable', exc, unit=unit_name)
+
+ def mask(self, unit_name):
+ if self.mode == 'global_user':
+ result = self._run_global(['mask', unit_name])
+ if result.returncode != 0:
+ self._fail_global('mask', result, unit=unit_name)
+ return
+ try:
+ self.manager.MaskUnitFiles([unit_name], self.dbus.Boolean(False), self.dbus.Boolean(True))
+ except self.dbus.DBusException as exc:
+ self._fail('mask', exc, unit=unit_name)
+
+ def unmask(self, unit_name):
+ if self.mode == 'global_user':
+ result = self._run_global(['unmask', unit_name])
+ if result.returncode != 0:
+ self._fail_global('unmask', result, unit=unit_name)
+ return
+ try:
+ self.manager.UnmaskUnitFiles([unit_name], self.dbus.Boolean(False))
+ except self.dbus.DBusException as exc:
+ self._fail('unmask', exc, unit=unit_name)
+
+ def preset(self, unit_name):
+ if self.mode == 'global_user':
+ result = self._run_global(['preset', unit_name])
+ if result.returncode != 0:
+ self._fail_global('preset', result, unit=unit_name)
+ return
+ try:
+ self.manager.PresetUnitFiles([unit_name], self.dbus.Boolean(False), self.dbus.Boolean(True))
+ except self.dbus.DBusException as exc:
+ self._fail('preset', exc, unit=unit_name)
+
+ def get_unit_file_state(self, unit_name):
+ if self.mode == 'global_user':
+ result = self._run_global(['is-enabled', unit_name])
+ state = (result.stdout or result.stderr or '').strip()
+ if state:
+ return state.splitlines()[-1].strip()
+ self._fail_global('get_unit_file_state', result, unit=unit_name)
+ try:
+ return str(self.manager.GetUnitFileState(self.dbus.String(unit_name)))
+ except self.dbus.DBusException as exc:
+ self._fail('get_unit_file_state', exc, unit=unit_name)
+
+ def apply_state(self, unit_name, state, now):
+ if state == 'as_is':
+ return
+ if state == 'enable':
+ self.unmask(unit_name)
+ self.enable(unit_name)
+ if now and self.mode != 'global_user':
+ self.start(unit_name)
+ return
+ if state == 'disable':
+ if now and self.mode != 'global_user':
+ self.stop(unit_name)
+ self.disable(unit_name)
+ return
+ if state == 'mask':
+ if now and self.mode != 'global_user':
+ self.stop(unit_name)
+ self.mask(unit_name)
+ return
+ if state == 'unmask':
+ self.unmask(unit_name)
+ if now and self.mode != 'global_user':
+ self.start(unit_name)
+ return
+ if state == 'preset':
+ self.preset(unit_name)
+ if now and self.mode != 'global_user':
+ self.start(unit_name)
+ return
+ raise ValueError('Unsupported state: {}'.format(state))
+
+
+class systemd_unit:
+ def __init__(self, unit_name, state, manager=None):
+ if not is_valid_unit_name(unit_name):
+ raise ValueError('Invalid unit name: {}'.format(unit_name))
self.unit_name = unit_name
- self.desired_state = state
- self.unit = self.manager.LoadUnit(dbus.String(self.unit_name))
- self.unit_proxy = self.system_bus.get_object('org.freedesktop.systemd1', str(self.unit))
- self.unit_interface = dbus.Interface(self.unit_proxy, dbus_interface='org.freedesktop.systemd1.Unit')
- self.unit_properties = dbus.Interface(self.unit_proxy, dbus_interface='org.freedesktop.DBus.Properties')
+ self.desired_state = int(state)
+ if self.desired_state not in (0, 1):
+ raise ValueError('Invalid desired state for {}: {}'.format(unit_name, state))
+ self.manager = manager if manager is not None else SystemdManager(mode='machine')
def apply(self):
logdata = {'unit': self.unit_name}
if self.desired_state == 1:
- self.manager.UnmaskUnitFiles([self.unit_name], dbus.Boolean(False))
- self.manager.EnableUnitFiles([self.unit_name], dbus.Boolean(False), dbus.Boolean(True))
+ self.manager.unmask(self.unit_name)
+ self.manager.enable(self.unit_name)
if self.unit_name == 'gpupdate.service':
- if self.manager.GetUnitFileState(dbus.String(self.unit_name)) == 'enabled':
+ if self.manager.get_unit_file_state(self.unit_name) == 'enabled':
return
- self.manager.StartUnit(self.unit_name, 'replace')
+ self.manager.start(self.unit_name)
log('I6', logdata)
# In case the service has 'RestartSec' property set it
@@ -51,14 +325,16 @@ def apply(self):
if service_state not in ('active', 'activating'):
service_timer_name = self.unit_name.replace(".service", ".timer")
- self.unit = self.manager.LoadUnit(dbus.String(service_timer_name))
- service_state = self._get_state()
- if service_state not in ('active', 'activating'):
+ if not is_valid_unit_name(service_timer_name) or not self.manager.exists(service_timer_name):
+ log('E46', logdata)
+ return
+ service_state = self.manager.active_state(service_timer_name)
+ if str(service_state) not in ('active', 'activating'):
log('E46', logdata)
else:
- self.manager.StopUnit(self.unit_name, 'replace')
- self.manager.DisableUnitFiles([self.unit_name], dbus.Boolean(False))
- self.manager.MaskUnitFiles([self.unit_name], dbus.Boolean(False), dbus.Boolean(True))
+ self.manager.stop(self.unit_name)
+ self.manager.disable(self.unit_name)
+ self.manager.mask(self.unit_name)
log('I6', logdata)
service_state = self._get_state()
@@ -70,7 +346,7 @@ def _get_state(self):
'''
Get the string describing service state.
'''
- return self.unit_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
+ return self.manager.active_state(self.unit_name)
def restart(self):
"""
@@ -78,13 +354,11 @@ def restart(self):
"""
logdata = {'unit': self.unit_name, 'action': 'restart'}
try:
- self.unit = self.manager.LoadUnit(dbus.String(self.unit_name))
- self.manager.RestartUnit(self.unit_name, 'replace')
+ self.manager.restart(self.unit_name)
log('I13', logdata)
service_state = self._get_state()
if service_state not in ('active', 'activating'):
log('E77', logdata)
- except dbus.DBusException as exc:
+ except SystemdManagerError as exc:
log('E77', {**logdata, 'error': str(exc)})
-
diff --git a/gpoa/frontend/change_journal.py b/gpoa/frontend/change_journal.py
new file mode 100644
index 00000000..281c9033
--- /dev/null
+++ b/gpoa/frontend/change_journal.py
@@ -0,0 +1,218 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import hashlib
+from pathlib import Path
+import stat
+
+
+_watched_paths = {}
+_current_snapshots = {}
+_current_snapshot_keys = {}
+_changed_paths = set()
+_presence_changed_paths = set()
+
+
+def _normalize(path):
+ if not path:
+ return None
+ try:
+ return str(Path(path).resolve(strict=False))
+ except Exception:
+ return str(path)
+
+
+def _kind_from_mode(st_mode):
+ if stat.S_ISREG(st_mode):
+ return 'file'
+ if stat.S_ISDIR(st_mode):
+ return 'dir'
+ if stat.S_ISLNK(st_mode):
+ return 'symlink'
+ return 'other'
+
+
+def _sha256(path_obj):
+ digest = hashlib.sha256()
+ with path_obj.open('rb') as file_obj:
+ while True:
+ chunk = file_obj.read(1024 * 1024)
+ if not chunk:
+ break
+ digest.update(chunk)
+ return digest.hexdigest()
+
+
+def _present_snapshot_key(stat_result):
+ return (
+ 'present',
+ stat_result.st_mode,
+ stat_result.st_uid,
+ stat_result.st_gid,
+ stat_result.st_size,
+ stat_result.st_mtime_ns,
+ stat_result.st_ctime_ns,
+ stat_result.st_ino,
+ )
+
+
+def _missing_snapshot(snapshot_key):
+ return {
+ 'exists': False,
+ 'kind': None,
+ 'stat': None,
+ 'sha256': None,
+ '_snapshot_key': snapshot_key,
+ }
+
+
+def _present_snapshot(path_obj, stat_result, snapshot_key):
+ sha256 = None
+ if stat.S_ISREG(stat_result.st_mode):
+ try:
+ sha256 = _sha256(path_obj)
+ except Exception:
+ sha256 = '__error__'
+
+ return {
+ 'exists': True,
+ 'kind': _kind_from_mode(stat_result.st_mode),
+ 'stat': (
+ stat_result.st_mode,
+ stat_result.st_uid,
+ stat_result.st_gid,
+ stat_result.st_size,
+ stat_result.st_mtime_ns,
+ stat_result.st_ctime_ns,
+ stat_result.st_ino,
+ ),
+ 'sha256': sha256,
+ '_snapshot_key': snapshot_key,
+ }
+
+
+def _snapshot(path):
+ path_obj = Path(path)
+ try:
+ stat_result = path_obj.lstat()
+ snapshot_key = _present_snapshot_key(stat_result)
+ except FileNotFoundError:
+ return _missing_snapshot(('absent',))
+ except Exception:
+ return _missing_snapshot(('error',))
+
+ return _present_snapshot(path_obj, stat_result, snapshot_key)
+
+
+def reset():
+ _watched_paths.clear()
+ _current_snapshots.clear()
+ _current_snapshot_keys.clear()
+ _changed_paths.clear()
+ _presence_changed_paths.clear()
+
+
+def watch(path):
+ normalized = _normalize(path)
+ if not normalized:
+ return
+ if normalized not in _watched_paths:
+ _watched_paths[normalized] = _snapshot(normalized)
+ _current_snapshots.pop(normalized, None)
+ _current_snapshot_keys.pop(normalized, None)
+
+
+def watch_many(paths):
+ if not paths:
+ return
+ for path in paths:
+ watch(path)
+
+
+def record_changed(path):
+ normalized = _normalize(path)
+ if normalized:
+ _changed_paths.add(normalized)
+
+
+def record_presence_changed(path):
+ normalized = _normalize(path)
+ if normalized:
+ _presence_changed_paths.add(normalized)
+ _changed_paths.add(normalized)
+
+
+def _snapshot_current(path):
+ path_obj = Path(path)
+ try:
+ stat_result = path_obj.lstat()
+ key = _present_snapshot_key(stat_result)
+ snapshot_factory = lambda: _present_snapshot(path_obj, stat_result, key)
+ except FileNotFoundError:
+ key = ('absent',)
+ snapshot_factory = lambda: _missing_snapshot(key)
+ except Exception:
+ key = ('error',)
+ snapshot_factory = lambda: _missing_snapshot(key)
+
+ if _current_snapshot_keys.get(path) == key:
+ return _current_snapshots[path]
+ current = snapshot_factory()
+ _current_snapshot_keys[path] = key
+ _current_snapshots[path] = current
+ return current
+
+
+def _presence_changed(path):
+ baseline = _watched_paths.get(path)
+ if baseline is None:
+ return False
+ current = _snapshot_current(path)
+ return baseline['exists'] != current['exists']
+
+
+def _changed(path):
+ baseline = _watched_paths.get(path)
+ if baseline is None:
+ return False
+ current = _snapshot_current(path)
+
+ if baseline['exists'] != current['exists']:
+ return True
+
+ if not baseline['exists'] and not current['exists']:
+ return False
+
+ return (
+ baseline['kind'] != current['kind']
+ or baseline['stat'] != current['stat']
+ or baseline['sha256'] != current['sha256']
+ )
+
+
+def query(path, mode='changed'):
+ normalized = _normalize(path)
+ if not normalized:
+ return False
+ if mode == 'presence_changed':
+ if normalized in _presence_changed_paths:
+ return True
+ return _presence_changed(normalized)
+ if normalized in _changed_paths:
+ return True
+ return _changed(normalized)
diff --git a/gpoa/frontend/frontend_manager.py b/gpoa/frontend/frontend_manager.py
index af9de300..8b013122 100644
--- a/gpoa/frontend/frontend_manager.py
+++ b/gpoa/frontend/frontend_manager.py
@@ -27,6 +27,7 @@
)
from .chromium_applier import chromium_applier
+from .change_journal import reset as reset_change_journal
from .cifs_applier import cifs_applier, cifs_applier_user
from .control_applier import control_applier
from .cups_applier import cups_applier
@@ -46,10 +47,24 @@
from .scripts_applier import scripts_applier, scripts_applier_user
from .shortcut_applier import shortcut_applier, shortcut_applier_user
from .systemd_applier import systemd_applier
+from .systemd_preferences_applier import (
+ systemd_preferences_applier,
+ systemd_preferences_applier_user,
+)
from .thunderbird_applier import thunderbird_applier
from .yandex_browser_applier import yandex_browser_applier
+def prime_dependency_journal(appliers):
+ applier = appliers.get('systemd_preferences')
+ if not applier:
+ return
+
+ prime = getattr(applier, 'prime_dependency_journal', None)
+ if callable(prime):
+ prime()
+
+
def determine_username(username=None):
'''
Checks if the specified username is valid in order to prevent
@@ -129,6 +144,11 @@ def _init_machine_appliers(self):
self.machine_appliers['ini'] = ini_applier(self.storage)
self.machine_appliers['kde'] = kde_applier(self.storage)
self.machine_appliers['package'] = package_applier(self.storage)
+ # systemd_preferences must be registered last: its post_restart() checks
+ # dependency paths that other appliers (e.g. ini) may have modified during
+ # the same apply cycle. New appliers that write files must be added before
+ # this entry to ensure their changes are visible to the dependency journal.
+ self.machine_appliers['systemd_preferences'] = systemd_preferences_applier(self.storage)
def _init_user_appliers(self):
# User appliers are expected to work with user-writable
@@ -149,6 +169,7 @@ def _init_user_appliers(self):
self.user_appliers['ini'] = ini_applier_user(self.storage, self.username)
self.user_appliers['kde'] = kde_applier_user(self.storage, self.username, self.file_cache)
self.user_appliers['package'] = package_applier_user(self.storage, self.username)
+ self.user_appliers['systemd_preferences'] = systemd_preferences_applier_user(self.storage, self.username)
def machine_apply(self):
'''
@@ -158,6 +179,12 @@ def machine_apply(self):
log('E13')
return
log('D16')
+ reset_change_journal()
+ try:
+ prime_dependency_journal(self.machine_appliers)
+ except Exception as exc:
+ logdata = {'applier_name': 'systemd_preferences', 'msg': str(exc)}
+ log('E24', logdata)
for applier_name, applier_object in self.machine_appliers.items():
try:
@@ -170,6 +197,12 @@ def user_apply(self):
'''
Run appliers for users.
'''
+ reset_change_journal()
+ try:
+ prime_dependency_journal(self.user_appliers)
+ except Exception as exc:
+ logdata = {'applier': 'systemd_preferences', 'exception': str(exc)}
+ log('E19', logdata)
if is_root():
for applier_name, applier_object in self.user_appliers.items():
try:
@@ -199,4 +232,3 @@ def apply_parameters(self):
self.machine_apply()
else:
self.user_apply()
-
diff --git a/gpoa/frontend/systemd_applier.py b/gpoa/frontend/systemd_applier.py
index 7916d680..bd52cacf 100644
--- a/gpoa/frontend/systemd_applier.py
+++ b/gpoa/frontend/systemd_applier.py
@@ -19,7 +19,11 @@
from util.logging import log
from .applier_frontend import applier_frontend, check_enabled
-from .appliers.systemd import systemd_unit
+from .appliers.systemd import (
+ SystemdManagerError,
+ is_valid_unit_name,
+ systemd_unit,
+)
class systemd_applier(applier_frontend):
@@ -40,18 +44,22 @@ def __init__(self, storage):
def run(self):
for setting in self.systemd_unit_settings:
+ unit_name = str(setting.valuename)
try:
- self.units.append(systemd_unit(setting.valuename, int(setting.data)))
- logdata = {'unit': format(setting.valuename)}
+ desired_state = int(setting.data)
+ if not is_valid_unit_name(unit_name):
+ raise ValueError('Invalid unit name')
+ self.units.append(systemd_unit(unit_name, desired_state))
+ logdata = {'unit': unit_name}
log('I4', logdata)
- except Exception as exc:
- logdata = {'unit': format(setting.valuename), 'exc': exc}
+ except (TypeError, ValueError, SystemdManagerError) as exc:
+ logdata = {'unit': unit_name, 'exc': str(exc)}
log('I5', logdata)
for unit in self.units:
try:
unit.apply()
- except:
- logdata = {'unit': unit.unit_name}
+ except SystemdManagerError as exc:
+ logdata = {'unit': unit.unit_name, 'error': str(exc)}
log('E45', logdata)
def apply(self):
@@ -78,4 +86,3 @@ def user_context_apply(self):
def admin_context_apply(self):
pass
-
diff --git a/gpoa/frontend/systemd_preferences_applier.py b/gpoa/frontend/systemd_preferences_applier.py
new file mode 100644
index 00000000..ff082a5d
--- /dev/null
+++ b/gpoa/frontend/systemd_preferences_applier.py
@@ -0,0 +1,783 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import base64
+import binascii
+import os
+from pathlib import Path
+import stat
+import tempfile
+
+from util.logging import log
+from util.util import get_homedir, get_uid_by_username, string_to_literal_eval
+
+from .applier_frontend import applier_frontend, check_enabled
+from .appliers.systemd import (
+ SystemdManager,
+ SystemdManagerError,
+ is_valid_unit_name,
+)
+from .change_journal import query, record_changed, record_presence_changed, watch_many
+from gpt.systemds_constants import (
+ DEFAULT_DROPIN_NAME,
+ DROPIN_NAME_RE,
+ MAX_DEPENDENCIES_PER_RULE,
+ MAX_DEPENDENCY_PATH_LEN,
+ MAX_UNIT_FILE_SIZE,
+ NON_RESTARTABLE_TYPES,
+ VALID_APPLY_MODES,
+ VALID_DEP_MODES,
+ VALID_POLICY_TARGETS,
+ VALID_STATES,
+)
+
+MANAGED_HEADER = '# gpupdate-managed uid: {}'
+MAX_RULES_PER_SCOPE = 512
+
+
+class _Context:
+ def __init__(self, mode='machine', username=None):
+ self.mode = mode
+ self.username = username
+ self.systemd_dir = '/etc/systemd/system'
+ if mode == 'user':
+ self.systemd_dir = os.path.join(get_homedir(username), '.config/systemd/user')
+ elif mode == 'global_user':
+ self.systemd_dir = '/etc/systemd/user'
+
+
+def _syslog(level, message, data=None):
+ payload = {'plugin': 'SystemdPreferencesApplier', 'message': message}
+ if data:
+ payload['data'] = data
+ log(level, payload)
+
+
+def _as_bool(value):
+ if isinstance(value, bool):
+ return value
+ if value is None:
+ return False
+ return str(value).lower() in ('1', 'true', 'yes')
+
+
+def _has_control_chars(value):
+ return any(ord(ch) < 32 or ord(ch) == 127 for ch in str(value))
+
+
+def _is_valid_dropin_name(value):
+ if not isinstance(value, str):
+ return False
+ if not value or value != value.strip():
+ return False
+ if '\x00' in value or '/' in value or '\\' in value:
+ return False
+ if _has_control_chars(value):
+ return False
+ return bool(DROPIN_NAME_RE.match(value))
+
+
+def _expand_windows_var(path, username=None):
+ if not path:
+ return path
+ variables = {
+ 'HOME': '/etc/skel',
+ 'HOMEPATH': '/etc/skel',
+ 'HOMEDRIVE': '/',
+ 'SystemRoot': '/',
+ 'SystemDrive': '/',
+ 'USERNAME': username if username else '',
+ }
+ if username:
+ variables['HOME'] = get_homedir(username)
+ variables['HOMEPATH'] = variables['HOME']
+ result = path
+ for key, value in variables.items():
+ replacement = str(value)
+ if key not in ('USERNAME',) and not replacement.endswith('/'):
+ replacement = '{}{}'.format(replacement, '/')
+ result = result.replace('%{}%'.format(key), replacement)
+ return result
+
+
+def _read_preferences(storage, scope_name, is_previous=False):
+ prefix = 'Software/BaseALT/Policies/Preferences/{}'.format(scope_name)
+ if is_previous:
+ prefix = 'Previous/{}'.format(prefix)
+ key = '{}/Systemds'.format(prefix)
+ value = storage.get_entry(key, preg=False)
+ if not value:
+ return []
+
+ items = string_to_literal_eval(value)
+ if not isinstance(items, list):
+ return []
+ entries = [item for item in items if isinstance(item, dict)]
+ if len(entries) > MAX_RULES_PER_SCOPE:
+ log('W47', {
+ 'reason': 'Systemd preferences rule limit exceeded',
+ 'scope': scope_name,
+ 'count': len(entries),
+ 'limit': MAX_RULES_PER_SCOPE,
+ })
+ return entries[:MAX_RULES_PER_SCOPE]
+ return entries
+
+
+def _is_valid_dependency_path(path, policy_target):
+ if not isinstance(path, str):
+ return False
+ if not path or len(path) > MAX_DEPENDENCY_PATH_LEN:
+ return False
+ if '\x00' in path or _has_control_chars(path):
+ return False
+
+ expanded = _expand_windows_var(path)
+ if not expanded or len(expanded) > MAX_DEPENDENCY_PATH_LEN:
+ return False
+ if '\x00' in expanded or _has_control_chars(expanded):
+ return False
+
+ upper_path = path.upper()
+ if policy_target == 'user':
+ if '%HOME%' in upper_path or '%HOMEPATH%' in upper_path:
+ return os.path.isabs(expanded)
+ return os.path.isabs(path) and os.path.isabs(expanded)
+ return os.path.isabs(expanded)
+
+
+def _derive_edit_mode(apply_mode):
+ if apply_mode == 'always':
+ return 'create_or_override'
+ elif apply_mode == 'if_exists':
+ return 'override'
+ elif apply_mode == 'if_missing':
+ return 'create'
+ return 'create_or_override'
+
+
+def _normalize_rule(item):
+ unit = item.get('unit')
+ state = item.get('state')
+ apply_mode = item.get('apply_mode', item.get('applyMode', 'always'))
+ policy_target = item.get('policy_target', item.get('policyTarget', 'machine'))
+ uid = item.get('uid')
+
+ if not unit or state not in VALID_STATES:
+ return None
+ if not is_valid_unit_name(str(unit)):
+ log('W47', {'reason': 'Invalid unit value', 'unit': unit})
+ return None
+ if apply_mode not in VALID_APPLY_MODES:
+ return None
+ if policy_target not in VALID_POLICY_TARGETS:
+ return None
+ if not uid:
+ return None
+
+ dependencies = item.get('file_dependencies', item.get('fileDependencies', []))
+ if not isinstance(dependencies, list):
+ dependencies = []
+ if len(dependencies) > MAX_DEPENDENCIES_PER_RULE:
+ log('W47', {
+ 'reason': 'Too many file dependencies, truncating',
+ 'unit': unit,
+ 'count': len(dependencies),
+ 'limit': MAX_DEPENDENCIES_PER_RULE,
+ })
+ dependencies = dependencies[:MAX_DEPENDENCIES_PER_RULE]
+
+ valid_dependencies = []
+ for dep in dependencies:
+ if not isinstance(dep, dict):
+ continue
+ mode = dep.get('mode')
+ path = dep.get('path')
+ if mode not in VALID_DEP_MODES or not path:
+ continue
+ if not _is_valid_dependency_path(str(path), policy_target):
+ log('W47', {
+ 'reason': 'Invalid dependency path',
+ 'unit': unit,
+ 'path': str(path),
+ 'policy_target': policy_target,
+ })
+ continue
+ valid_dependencies.append({'mode': mode, 'path': str(path)})
+
+ dropin_name = item.get('dropin_name', item.get('dropInName', DEFAULT_DROPIN_NAME)) or DEFAULT_DROPIN_NAME
+ if not _is_valid_dropin_name(str(dropin_name)):
+ log('W47', {'reason': 'Invalid dropInName', 'dropInName': dropin_name, 'unit': unit})
+ return None
+
+ unit_file = _decode_unit_file_b64(item, unit, uid)
+ if unit_file is None:
+ unit_file = _normalize_unit_file_content(item.get('unit_file', item.get('unitFile')))
+ if unit_file is None and (item.get('unit_file') is not None or item.get('unitFile') is not None):
+ log('W47', {
+ 'reason': 'Invalid unit_file payload',
+ 'unit': unit,
+ 'uid': str(uid),
+ })
+
+ return {
+ 'uid': str(uid),
+ 'unit': str(unit),
+ 'state': state,
+ 'now': _as_bool(item.get('now', False)),
+ 'remove_policy': _as_bool(item.get('remove_policy', item.get('removePolicy', False))),
+ 'apply_mode': apply_mode,
+ 'policy_target': policy_target,
+ 'edit_mode': _derive_edit_mode(apply_mode),
+ 'dropin_name': str(dropin_name),
+ 'unit_file': unit_file,
+ 'file_dependencies': valid_dependencies,
+ 'element_type': item.get('element_type', item.get('elementType', 'service')),
+ }
+
+
+def _rule_matches_apply_mode(rule, exists):
+ apply_mode = rule['apply_mode']
+ if apply_mode == 'always':
+ return True
+ if apply_mode == 'if_exists':
+ return exists
+ return not exists
+
+
+def _is_managed_by_uid(path, uid):
+ if not path.exists() or not path.is_file():
+ return False
+ content = _safe_read_text(path)
+ if content is None:
+ return False
+ first_line = content.splitlines()[0] if content else ''
+ return first_line == MANAGED_HEADER.format(uid)
+
+
+def _validate_existing_file(path):
+ fd = None
+ try:
+ flags = os.O_RDONLY
+ if hasattr(os, 'O_NOFOLLOW'):
+ flags |= os.O_NOFOLLOW
+ fd = os.open(str(path), flags)
+ st = os.fstat(fd)
+ if not stat.S_ISREG(st.st_mode):
+ return None, False
+ if st.st_nlink > 1:
+ return None, False
+ with os.fdopen(fd, 'r', encoding='utf-8') as file_obj:
+ content = file_obj.read()
+ fd = None
+ return content, True
+ except (OSError, UnicodeDecodeError):
+ return None, False
+ finally:
+ if fd is not None:
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+
+
+def _safe_read_text(path):
+ content, ok = _validate_existing_file(path)
+ if not ok:
+ return None
+ return content
+
+
+def _safe_write_text(path, content):
+ parent = path.parent
+ parent.mkdir(parents=True, exist_ok=True)
+ if parent.is_symlink():
+ return False, False
+
+ existed = path.exists()
+ if existed:
+ _, ok = _validate_existing_file(path)
+ if not ok:
+ return False, existed
+
+ data = content.encode('utf-8')
+ tmp_fd = None
+ tmp_path = None
+ try:
+ tmp_fd, tmp_path = tempfile.mkstemp(prefix='.gpupdate-', dir=str(parent))
+ os.fchmod(tmp_fd, 0o644)
+ os.write(tmp_fd, data)
+ os.fsync(tmp_fd)
+ os.close(tmp_fd)
+ tmp_fd = None
+ os.replace(tmp_path, str(path))
+ tmp_path = None # consumed by replace; prevent unlink in finally
+ except OSError:
+ return False, existed
+ finally:
+ if tmp_fd is not None:
+ try:
+ os.close(tmp_fd)
+ except OSError:
+ pass
+ if tmp_path and os.path.exists(tmp_path):
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+
+ try:
+ dir_fd = os.open(str(parent), os.O_RDONLY)
+ try:
+ os.fsync(dir_fd)
+ finally:
+ os.close(dir_fd)
+ except OSError:
+ pass # fsync failure is non-fatal; file is already atomically written
+
+ return True, existed
+
+
+def _normalize_unit_file_content(unit_file):
+ if unit_file is None:
+ return None
+
+ text = str(unit_file)
+ # Keep already multiline text as-is; only unescape policy-encoded newlines.
+ if '\n' in text or '\r' in text:
+ normalized = text.replace('\r\n', '\n').replace('\r', '\n')
+ elif '\\n' in text or '\\r' in text:
+ normalized = text.replace('\\r\\n', '\n').replace('\\n', '\n').replace('\\r', '\n')
+ else:
+ normalized = text
+
+ if len(normalized.encode('utf-8')) > MAX_UNIT_FILE_SIZE:
+ return None
+ return normalized
+
+
+def _decode_unit_file_b64(item, unit, uid):
+ payload = item.get('unit_file_b64', item.get('unitFileB64'))
+ if payload is None:
+ return None
+
+ try:
+ data = base64.b64decode(str(payload), validate=True)
+ if len(data) > MAX_UNIT_FILE_SIZE:
+ log('W47', {
+ 'reason': 'unit_file_b64 exceeds size limit',
+ 'unit': unit,
+ 'uid': uid,
+ 'limit': MAX_UNIT_FILE_SIZE,
+ })
+ return None
+ return data.decode('utf-8')
+ except (TypeError, ValueError, binascii.Error, UnicodeDecodeError):
+ log('W47', {
+ 'reason': 'Invalid unit_file_b64 payload',
+ 'unit': unit,
+ 'uid': uid,
+ })
+ return None
+
+
+class _systemd_preferences_runtime:
+ def __init__(self, storage, scope_name, context, systemd_manager=None):
+ self.storage = storage
+ self.scope_name = scope_name
+ self.context = context
+ self.systemd_manager = systemd_manager
+ self.daemon_reload_required = False
+ self.phase2_candidates = []
+
+ def _manager(self):
+ if self.systemd_manager is None:
+ try:
+ self.systemd_manager = SystemdManager(mode=self.context.mode)
+ except Exception as exc:
+ raise SystemdManagerError(str(exc), action='connect')
+ return self.systemd_manager
+
+ def _exists(self, unit_name):
+ try:
+ return self._manager().exists(unit_name)
+ except SystemdManagerError as exc:
+ _syslog('W', 'Unable to query unit existence', {'unit': unit_name, 'error': str(exc)})
+ return False
+
+ def _daemon_reload(self):
+ log('D245', {'context': self.context.mode})
+ try:
+ self._manager().reload()
+ except SystemdManagerError as exc:
+ error = str(exc)
+ log('W50', {'context': self.context.mode, 'error': error})
+ _syslog('W', 'daemon-reload failed', {'context': self.context.mode, 'error': error})
+ self.daemon_reload_required = False
+ return False
+ self.daemon_reload_required = False
+ return True
+
+ def _active_state(self, unit_name):
+ try:
+ return self._manager().active_state(unit_name)
+ except SystemdManagerError:
+ return None
+
+ def _stop(self, rule):
+ if rule.get('element_type') in NON_RESTARTABLE_TYPES:
+ return
+ if self.context.mode == 'global_user':
+ return
+ state = self._active_state(rule['unit'])
+ if state not in ('active', 'activating', 'deactivating'):
+ return
+ try:
+ self._manager().stop(rule['unit'])
+ except SystemdManagerError as exc:
+ _syslog('W', 'Stop failed', {'unit': rule['unit'], 'error': str(exc)})
+
+ def _restart(self, rule):
+ if rule.get('element_type') in NON_RESTARTABLE_TYPES:
+ log('W49', {'unit': rule['unit'], 'type': rule.get('element_type')})
+ _syslog('D', 'Unit type is non-restartable', {'unit': rule['unit'], 'type': rule.get('element_type')})
+ return
+
+ if self.context.mode == 'global_user':
+ _syslog('D', 'Dependency restart skipped: not supported for global_user scope',
+ {'unit': rule['unit']})
+ return
+
+ state = self._active_state(rule['unit'])
+ if state not in ('active', 'activating'):
+ return
+
+ try:
+ self._manager().restart(rule['unit'])
+ except SystemdManagerError as exc:
+ _syslog('W', 'Restart failed', {'unit': rule['unit'], 'error': str(exc)})
+
+ def _rule_managed_paths(self, rule):
+ unit_file_path = Path(self.context.systemd_dir).joinpath(rule['unit'])
+ dropin_path = Path(self.context.systemd_dir).joinpath(
+ '{}.d'.format(rule['unit']), rule['dropin_name'])
+ return unit_file_path, dropin_path
+
+ def _write_rule_file(self, target_file, uid, unit_file):
+ marker = MANAGED_HEADER.format(uid)
+ body = unit_file if unit_file.endswith('\n') else '{}\n'.format(unit_file)
+ content = '{}\n{}'.format(marker, body)
+ old_content = _safe_read_text(target_file) if target_file.exists() else None
+ if old_content == content:
+ return
+
+ written, existed = _safe_write_text(target_file, content)
+ if not written:
+ _syslog('W', 'Unable to safely write managed file', {'path': str(target_file)})
+ return
+
+ if existed:
+ record_changed(str(target_file))
+ else:
+ record_presence_changed(str(target_file))
+ self.daemon_reload_required = True
+
+ def _apply_edit(self, rule, exists):
+ unit_file = rule.get('unit_file')
+ if not unit_file:
+ return
+
+ unit_file_path, dropin_path = self._rule_managed_paths(rule)
+ edit_mode = rule['edit_mode']
+ if edit_mode == 'create':
+ self._write_rule_file(unit_file_path, rule['uid'], unit_file)
+ return
+ if edit_mode == 'override':
+ self._write_rule_file(dropin_path, rule['uid'], unit_file)
+ return
+ if exists:
+ self._write_rule_file(dropin_path, rule['uid'], unit_file)
+ else:
+ self._write_rule_file(unit_file_path, rule['uid'], unit_file)
+
+ def _run_state_action(self, rule):
+ state = rule['state']
+ if state == 'as_is':
+ return
+
+ try:
+ self._manager().apply_state(rule['unit'], state, rule['now'])
+ except (SystemdManagerError, ValueError) as exc:
+ _syslog('W', 'State apply failed', {'unit': rule['unit'], 'state': state, 'error': str(exc)})
+
+ def apply_rules(self, rules):
+ applicable_rules = []
+ for rule in rules:
+ log('D244', {'unit': rule['unit'], 'state': rule['state']})
+ exists = self._exists(rule['unit'])
+ if not _rule_matches_apply_mode(rule, exists):
+ # Rule does not qualify for edit/state this run, but should
+ # still be checked for dependency-triggered restarts.
+ self.phase2_candidates.append(rule)
+ continue
+ applicable_rules.append((rule, exists))
+
+ for rule, exists in applicable_rules:
+ self._apply_edit(rule, exists)
+
+ if self.daemon_reload_required and not self._daemon_reload():
+ _syslog('W', 'Skipping state apply due to daemon-reload failure', {
+ 'context': self.context.mode,
+ 'rules': len(applicable_rules),
+ })
+ return
+
+ for rule, _ in applicable_rules:
+ self._run_state_action(rule)
+ self.phase2_candidates.append(rule)
+
+ def cleanup_removed_rules(self, removed_rules):
+ affected_units = set()
+ for rule in removed_rules:
+ log('D246', {'unit': rule['unit'], 'uid': rule['uid']})
+ unit_file_path, dropin_path = self._rule_managed_paths(rule)
+ for target in (unit_file_path, dropin_path):
+ if not _is_managed_by_uid(target, rule['uid']):
+ continue
+ try:
+ target.unlink()
+ record_presence_changed(str(target))
+ self.daemon_reload_required = True
+ affected_units.add((rule['unit'], rule.get('element_type', 'service')))
+ except Exception as exc:
+ _syslog('W', 'Failed to cleanup managed file', {'path': str(target), 'error': str(exc)})
+ dropin_dir = dropin_path.parent
+ if dropin_dir.exists():
+ try:
+ dropin_dir.rmdir()
+ except OSError:
+ pass
+
+ if self.daemon_reload_required:
+ if not self._daemon_reload():
+ _syslog('W', 'Skipping cleanup restart due to daemon-reload failure', {
+ 'context': self.context.mode,
+ 'units': [unit_name for unit_name, _ in affected_units],
+ })
+ return
+ for unit_name, element_type in affected_units:
+ cleanup_rule = {
+ 'unit': unit_name,
+ 'element_type': element_type,
+ }
+ self._stop(cleanup_rule)
+
+ def _dependency_changed(self, dependency, username=None):
+ dep_path = _expand_windows_var(dependency['path'], username)
+ if not dep_path or not os.path.isabs(dep_path):
+ return False
+ mode = dependency['mode']
+ return query(dep_path, mode=mode)
+
+ def post_restart(self, username=None):
+ for rule in self.phase2_candidates:
+ dependencies = rule.get('file_dependencies', [])
+ if not dependencies:
+ continue
+ if any(self._dependency_changed(dep, username=username) for dep in dependencies):
+ log('D247', {'unit': rule['unit']})
+ self._restart(rule)
+
+
+def _get_removed_rules(storage, scope_name, target):
+ current_raw = _read_preferences(storage, scope_name, is_previous=False)
+ previous_raw = _read_preferences(storage, scope_name, is_previous=True)
+ current_map = {}
+ previous_map = {}
+ for item in current_raw:
+ normalized = _normalize_rule(item)
+ if normalized is not None and normalized['policy_target'] == target:
+ current_map[normalized['uid']] = normalized
+ for item in previous_raw:
+ normalized = _normalize_rule(item)
+ if normalized is not None and normalized['policy_target'] == target:
+ previous_map[normalized['uid']] = normalized
+ removed_uids = set(previous_map.keys()) - set(current_map.keys())
+ return [previous_map[uid] for uid in removed_uids]
+
+
+def _get_rules_for_scope(storage, scope_name, target):
+ current_raw = _read_preferences(storage, scope_name, is_previous=False)
+ rules = []
+ for item in current_raw:
+ normalized = _normalize_rule(item)
+ if normalized is None:
+ continue
+ if normalized['policy_target'] != target:
+ continue
+ rules.append(normalized)
+ return rules
+
+
+def _split_active_and_cleanup_rules(rules):
+ active_rules = []
+ explicit_cleanup_rules = []
+ for rule in rules:
+ if rule.get('remove_policy'):
+ explicit_cleanup_rules.append(rule)
+ continue
+ active_rules.append(rule)
+ return active_rules, explicit_cleanup_rules
+
+
+def _merge_cleanup_rules(removed_by_diff, explicit_cleanup_rules):
+ merged = {}
+ for rule in removed_by_diff:
+ merged[rule['uid']] = rule
+ for rule in explicit_cleanup_rules:
+ merged[rule['uid']] = rule
+ return list(merged.values())
+
+
+def _get_rule_sets_for_scope(storage, scope_name, target):
+ current_rules = _get_rules_for_scope(storage, scope_name, target)
+ active_rules, explicit_cleanup_rules = _split_active_and_cleanup_rules(current_rules)
+ removed_by_diff = _get_removed_rules(storage, scope_name, target)
+ cleanup_rules = _merge_cleanup_rules(removed_by_diff, explicit_cleanup_rules)
+ return active_rules, cleanup_rules
+
+
+def _collect_dependency_paths(storage, scope_name, target, username=None):
+ dependency_paths = []
+ for rule in _get_rules_for_scope(storage, scope_name, target):
+ if rule.get('remove_policy'):
+ continue
+ for dependency in rule.get('file_dependencies', []):
+ dep_path = _expand_windows_var(dependency.get('path'), username)
+ if dep_path and os.path.isabs(dep_path):
+ dependency_paths.append(dep_path)
+ return dependency_paths
+
+
+class systemd_preferences_applier(applier_frontend):
+ __module_name = 'SystemdPreferencesApplier'
+ __module_experimental = True
+ __module_enabled = False
+ __scope_name = 'Machine'
+
+ def __init__(self, storage):
+ self.storage = storage
+ self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_experimental)
+
+ def prime_dependency_journal(self):
+ if not self.__module_enabled:
+ return
+ dependency_paths = []
+ dependency_paths.extend(_collect_dependency_paths(self.storage, self.__scope_name, target='machine'))
+ dependency_paths.extend(_collect_dependency_paths(self.storage, self.__scope_name, target='user'))
+ watch_many(dependency_paths)
+
+ def apply(self):
+ if not self.__module_enabled:
+ log('D243')
+ return
+
+ log('D240')
+ runtime = _systemd_preferences_runtime(self.storage, self.__scope_name, _Context(mode='machine'))
+ active_rules, cleanup_rules = _get_rule_sets_for_scope(
+ self.storage, self.__scope_name, target='machine')
+ runtime.apply_rules(active_rules)
+ runtime.cleanup_removed_rules(cleanup_rules)
+ runtime.post_restart()
+
+ global_user_runtime = _systemd_preferences_runtime(
+ self.storage,
+ self.__scope_name,
+ _Context(mode='global_user'),
+ )
+ active_user_rules, cleanup_user_rules = _get_rule_sets_for_scope(
+ self.storage,
+ self.__scope_name,
+ target='user',
+ )
+ global_user_runtime.apply_rules(active_user_rules)
+ global_user_runtime.cleanup_removed_rules(cleanup_user_rules)
+ global_user_runtime.post_restart()
+
+
+class systemd_preferences_applier_user(applier_frontend):
+ __module_name = 'SystemdPreferencesApplierUser'
+ __module_experimental = True
+ __module_enabled = False
+
+ def __init__(self, storage, username):
+ self.storage = storage
+ self.username = username
+ self.uid = get_uid_by_username(username)
+ self.user_bus_path = '/run/user/{}/bus'.format(self.uid) if self.uid is not None else None
+ self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_experimental)
+
+ def prime_dependency_journal(self):
+ if not self.__module_enabled:
+ return
+
+ dependency_paths = []
+ dependency_paths.extend(_collect_dependency_paths(self.storage, self.username, target='machine'))
+ dependency_paths.extend(_collect_dependency_paths(
+ self.storage,
+ self.username,
+ target='user',
+ username=self.username,
+ ))
+ watch_many(dependency_paths)
+
+ def admin_context_apply(self):
+ if not self.__module_enabled:
+ log('D243')
+ return
+
+ log('D241', {'username': self.username})
+ runtime = _systemd_preferences_runtime(self.storage, self.username, _Context(mode='machine'))
+ active_rules, cleanup_rules = _get_rule_sets_for_scope(
+ self.storage, self.username, target='machine')
+ runtime.apply_rules(active_rules)
+ runtime.cleanup_removed_rules(cleanup_rules)
+ runtime.post_restart()
+
+ def user_context_apply(self):
+ if not self.__module_enabled:
+ log('D243')
+ return
+ log('D242', {'username': self.username})
+ if not self.user_bus_path or not os.path.exists(self.user_bus_path):
+ log('W48', {'username': self.username, 'path': self.user_bus_path})
+ _syslog('W', 'systemd --user manager is unavailable', {
+ 'username': self.username,
+ 'path': self.user_bus_path,
+ })
+ return
+
+ runtime = _systemd_preferences_runtime(
+ self.storage,
+ self.username,
+ _Context(mode='user', username=self.username))
+ active_rules, cleanup_rules = _get_rule_sets_for_scope(
+ self.storage, self.username, target='user')
+ runtime.apply_rules(active_rules)
+ runtime.cleanup_removed_rules(cleanup_rules)
+ runtime.post_restart(username=self.username)
diff --git a/gpoa/gpt/gpt.py b/gpoa/gpt/gpt.py
index 9678c3dd..d0e15752 100644
--- a/gpoa/gpt/gpt.py
+++ b/gpoa/gpt/gpt.py
@@ -39,6 +39,7 @@
from .scriptsini import merge_scripts, read_scripts
from .services import merge_services, read_services
from .shortcuts import merge_shortcuts, read_shortcuts
+from .systemds import merge_systemds, read_systemds
from .tasks import merge_tasks, read_tasks
@@ -53,6 +54,7 @@ class FileType(Enum):
ENVIRONMENTVARIABLES = 'environmentvariables.xml'
INIFILES = 'inifiles.xml'
SERVICES = 'services.xml'
+ SYSTEMDS = 'systemd.xml'
PRINTERS = 'printers.xml'
SCRIPTS = 'scripts.ini'
NETWORKSHARES = 'networkshares.xml'
@@ -80,6 +82,7 @@ def pref_parsers():
parsers[FileType.ENVIRONMENTVARIABLES] = read_envvars
parsers[FileType.INIFILES] = read_inifiles
parsers[FileType.SERVICES] = read_services
+ parsers[FileType.SYSTEMDS] = read_systemds
parsers[FileType.PRINTERS] = read_printers
parsers[FileType.SCRIPTS] = read_scripts
parsers[FileType.NETWORKSHARES] = read_networkshares
@@ -102,6 +105,7 @@ def pref_mergers():
mergers[FileType.ENVIRONMENTVARIABLES] = merge_envvars
mergers[FileType.INIFILES] = merge_inifiles
mergers[FileType.SERVICES] = merge_services
+ mergers[FileType.SYSTEMDS] = merge_systemds
mergers[FileType.PRINTERS] = merge_printers
mergers[FileType.SCRIPTS] = merge_scripts
mergers[FileType.NETWORKSHARES] = merge_networkshares
@@ -139,6 +143,7 @@ def __init__(self, gpt_path, username='Machine', gpo_info=None):
, 'files'
, 'inifiles'
, 'services'
+ , 'systemd'
, 'scheduledtasks'
, 'scripts'
, 'networkshares'
@@ -285,13 +290,13 @@ def find_preffile(search_path, prefname):
if not prefdir:
return None
- # Then search for preference directory
pref_dir = find_dir(prefdir, prefname)
- file_name = '{}.xml'.format(prefname)
- # And then try to find the corresponding file.
- pref_file = find_file(pref_dir, file_name)
+ if pref_dir:
+ pref_file = find_file(pref_dir, '{}.xml'.format(prefname))
+ if pref_file:
+ return pref_file
- return pref_file
+ return None
def lp2gpt():
'''
diff --git a/gpoa/gpt/systemds.py b/gpoa/gpt/systemds.py
new file mode 100644
index 00000000..19158a66
--- /dev/null
+++ b/gpoa/gpt/systemds.py
@@ -0,0 +1,357 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import base64
+import os
+from xml.etree import ElementTree
+
+from util.logging import log
+
+from .dynamic_attributes import DynamicAttributes
+from .systemds_constants import (
+ DEFAULT_DROPIN_NAME,
+ DROPIN_NAME_RE,
+ MAX_DEPENDENCIES_PER_RULE,
+ MAX_DEPENDENCY_PATH_LEN,
+ MAX_UNIT_FILE_SIZE,
+ UNIT_NAME_RE,
+ VALID_APPLY_MODES,
+ VALID_DEP_MODES,
+ VALID_POLICY_TARGETS,
+ VALID_STATES,
+)
+
+
+VALID_POLICY_ELEMENTS = {
+ 'Service',
+ 'Socket',
+ 'Timer',
+ 'Path',
+ 'Mount',
+ 'Automount',
+ 'Swap',
+ 'Target',
+ 'Device',
+ 'Slice',
+ 'Scope',
+}
+
+UNIT_SUFFIX = {
+ 'Service': '.service',
+ 'Socket': '.socket',
+ 'Timer': '.timer',
+ 'Path': '.path',
+ 'Mount': '.mount',
+ 'Automount': '.automount',
+ 'Swap': '.swap',
+ 'Target': '.target',
+ 'Device': '.device',
+ 'Slice': '.slice',
+ 'Scope': '.scope',
+}
+
+
+def _tag_name(element):
+ return str(element.tag).split('}')[-1]
+
+
+def _as_bool(value, default=False):
+ if value is None:
+ return default
+ return str(value).lower() in ('1', 'true', 'yes')
+
+
+def _normalize_unit_name(unit_name, element_name):
+ if not unit_name:
+ return None
+
+ all_suffixes = set(UNIT_SUFFIX.values())
+ if any(str(unit_name).endswith(suffix) for suffix in all_suffixes):
+ return unit_name
+
+ suffix = UNIT_SUFFIX.get(element_name)
+ if not suffix:
+ return unit_name
+
+ return '{}{}'.format(unit_name, suffix)
+
+
+def _is_safe_component(value):
+ text = str(value) if value is not None else ''
+ if not text:
+ return False
+ if text in ('.', '..'):
+ return False
+ if text != text.strip():
+ return False
+ if '/' in text or '\\' in text:
+ return False
+ if os.path.isabs(text):
+ return False
+ if len(text) >= 2 and text[1] == ':' and text[0].isalpha():
+ return False
+ if '\x00' in text:
+ return False
+ return True
+
+
+def _has_control_chars(value):
+ return any(ord(ch) < 32 or ord(ch) == 127 for ch in str(value))
+
+
+def _derive_edit_mode(apply_mode):
+ if apply_mode == 'always':
+ return 'create_or_override'
+ elif apply_mode == 'if_exists':
+ return 'override'
+ elif apply_mode == 'if_missing':
+ return 'create'
+ return 'create_or_override'
+
+
+def _is_valid_unit_name(value):
+ if not _is_safe_component(value):
+ return False
+ if _has_control_chars(value):
+ return False
+ if not UNIT_NAME_RE.match(str(value)):
+ return False
+ name_part = str(value).rsplit('.', 1)[0]
+ if name_part.startswith('-') or name_part.endswith('-'):
+ return False
+ return True
+
+
+def _is_valid_dropin_name(value):
+ if not _is_safe_component(value):
+ return False
+ if _has_control_chars(value):
+ return False
+ return bool(DROPIN_NAME_RE.match(str(value)))
+
+
+def _expand_windows_var(path):
+ if not path:
+ return path
+ variables = {
+ 'HOME': '/etc/skel',
+ 'HOMEPATH': '/etc/skel',
+ 'HOMEDRIVE': '/',
+ 'SystemRoot': '/',
+ 'SystemDrive': '/',
+ 'USERNAME': '',
+ }
+ result = str(path)
+ for key, value in variables.items():
+ replacement = str(value)
+ if key not in ('USERNAME',) and not replacement.endswith('/'):
+ replacement = '{}{}'.format(replacement, '/')
+ result = result.replace('%{}%'.format(key), replacement)
+ return result
+
+
+def _is_valid_dependency_path(path, policy_target):
+ if not isinstance(path, str):
+ return False
+ if not path or len(path) > MAX_DEPENDENCY_PATH_LEN:
+ return False
+ if '\x00' in path or _has_control_chars(path):
+ return False
+
+ expanded = _expand_windows_var(path)
+ if not expanded or len(expanded) > MAX_DEPENDENCY_PATH_LEN:
+ return False
+ if '\x00' in expanded or _has_control_chars(expanded):
+ return False
+
+ upper_path = path.upper()
+ if policy_target == 'user':
+ if '%HOME%' in upper_path or '%HOMEPATH%' in upper_path:
+ return os.path.isabs(expanded)
+ return os.path.isabs(path) and os.path.isabs(expanded)
+
+ return os.path.isabs(expanded)
+
+
+def _get_systemds_root(systemds_file):
+ try:
+ from defusedxml import ElementTree as DefusedElementTree
+ xml_contents = DefusedElementTree.parse(systemds_file)
+ except ImportError:
+ log('W47', {'reason': 'defusedxml is unavailable, using xml.etree fallback'})
+ xml_contents = ElementTree.parse(systemds_file)
+ return xml_contents.getroot()
+
+
+def _invalid_entry(message, data=None):
+ payload = {'reason': message}
+ if data:
+ payload.update(data)
+ log('W47', payload)
+
+
+def _parse_file_dependencies(properties, policy_target, unit):
+ file_dependencies = []
+ dependencies = properties.find('FileDependencies')
+ if dependencies is None:
+ return file_dependencies
+
+ dependency_items = list(dependencies.findall('Dependency'))
+ if len(dependency_items) > MAX_DEPENDENCIES_PER_RULE:
+ _invalid_entry('Too many dependency entries, truncating', {
+ 'unit': unit,
+ 'count': len(dependency_items),
+ 'limit': MAX_DEPENDENCIES_PER_RULE,
+ })
+ dependency_items = dependency_items[:MAX_DEPENDENCIES_PER_RULE]
+
+ for dependency in dependency_items:
+ mode = dependency.get('mode')
+ path = dependency.get('path')
+ if mode not in VALID_DEP_MODES or not path:
+ _invalid_entry('Invalid dependency entry', {'mode': mode, 'path': path})
+ continue
+ if not _is_valid_dependency_path(str(path), policy_target):
+ _invalid_entry('Invalid dependency path', {
+ 'mode': mode,
+ 'path': path,
+ 'policy_target': policy_target,
+ 'unit': unit,
+ })
+ continue
+ file_dependencies.append({'mode': mode, 'path': path})
+
+ return file_dependencies
+
+
+def _parse_policy_element(policy_element):
+ element_name = _tag_name(policy_element)
+ if element_name not in VALID_POLICY_ELEMENTS:
+ return None
+
+ properties = policy_element.find('Properties')
+ if properties is None:
+ _invalid_entry('Missing in Systemds element', {'element': element_name})
+ return None
+
+ unit = _normalize_unit_name(properties.get('unit'), element_name)
+ state = properties.get('state')
+ apply_mode = properties.get('applyMode', 'always')
+ policy_target = properties.get('policyTarget', 'machine')
+
+ if not unit:
+ _invalid_entry('Missing unit attribute', {'element': element_name})
+ return None
+ if not _is_valid_unit_name(unit):
+ _invalid_entry('Invalid unit value', {'element': element_name, 'unit': unit})
+ return None
+ if state not in VALID_STATES:
+ _invalid_entry('Invalid state', {'element': element_name, 'state': state, 'unit': unit})
+ return None
+ if apply_mode not in VALID_APPLY_MODES:
+ _invalid_entry('Invalid applyMode', {'element': element_name, 'apply_mode': apply_mode, 'unit': unit})
+ return None
+ if policy_target not in VALID_POLICY_TARGETS:
+ _invalid_entry('Invalid policyTarget', {'element': element_name, 'policy_target': policy_target, 'unit': unit})
+ return None
+ uid = policy_element.get('uid')
+ clsid = policy_element.get('clsid')
+ name = policy_element.get('name')
+ if not uid or not clsid or not name:
+ _invalid_entry('Missing required policy attributes', {
+ 'element': element_name,
+ 'uid': uid,
+ 'clsid': clsid,
+ 'name': name,
+ 'unit': unit,
+ })
+ return None
+
+ unit_file = properties.find('UnitFile')
+ unit_file_text = None
+ unit_file_b64 = None
+ if unit_file is not None and unit_file.text is not None:
+ # UnitFile mode=table is treated as plain text by design.
+ unit_file_text = str(unit_file.text)
+ if len(unit_file_text.encode('utf-8')) > MAX_UNIT_FILE_SIZE:
+ _invalid_entry('UnitFile exceeds size limit', {
+ 'element': element_name,
+ 'unit': unit,
+ 'limit': MAX_UNIT_FILE_SIZE,
+ })
+ return None
+ unit_file_b64 = base64.b64encode(unit_file_text.encode('utf-8')).decode('ascii')
+
+ policy = systemd_policy(unit)
+ policy.element_type = element_name.lower()
+ policy.clsid = clsid
+ policy.name = name
+ policy.status = policy_element.get('status')
+ policy.image = policy_element.get('image')
+ policy.changed = policy_element.get('changed')
+ policy.uid = uid
+ policy.desc = policy_element.get('desc')
+ policy.bypassErrors = policy_element.get('bypassErrors')
+ policy.userContext = policy_element.get('userContext')
+ policy.removePolicy = policy_element.get('removePolicy')
+
+ policy.state = state
+ policy.now = _as_bool(properties.get('now'), default=False)
+ policy.apply_mode = apply_mode
+ policy.policy_target = policy_target
+ policy.edit_mode = _derive_edit_mode(apply_mode)
+ dropin_name = properties.get('dropInName', DEFAULT_DROPIN_NAME) or DEFAULT_DROPIN_NAME
+ if not _is_valid_dropin_name(dropin_name):
+ _invalid_entry('Invalid dropInName', {'element': element_name, 'dropInName': dropin_name, 'unit': unit})
+ return None
+
+ policy.dropin_name = dropin_name
+ policy.unit_file = unit_file_text
+ policy.unit_file_b64 = unit_file_b64
+ policy.unit_file_mode = 'text'
+ policy.file_dependencies = _parse_file_dependencies(properties, policy_target, unit)
+
+ return policy
+
+
+def read_systemds(systemds_file):
+ """
+ Read Systemds.xml from GPT.
+ """
+ policies = []
+ root = _get_systemds_root(systemds_file)
+ if _tag_name(root) != 'Systemds':
+ _invalid_entry('Unexpected root element in Systemds.xml', {'root': _tag_name(root)})
+ return policies
+
+ for policy_element in root:
+ parsed = _parse_policy_element(policy_element)
+ if parsed is not None:
+ policies.append(parsed)
+
+ return policies
+
+
+def merge_systemds(storage, systemd_objects, policy_name):
+ for systemd_object in systemd_objects:
+ storage.add_systemd(systemd_object, policy_name)
+
+
+class systemd_policy(DynamicAttributes):
+ def __init__(self, unit):
+ self.unit = unit
diff --git a/gpoa/gpt/systemds_constants.py b/gpoa/gpt/systemds_constants.py
new file mode 100644
index 00000000..7381827e
--- /dev/null
+++ b/gpoa/gpt/systemds_constants.py
@@ -0,0 +1,37 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import re
+
+VALID_STATES = {'as_is', 'enable', 'disable', 'mask', 'unmask', 'preset'}
+VALID_APPLY_MODES = {'always', 'if_exists', 'if_missing'}
+VALID_POLICY_TARGETS = {'machine', 'user'}
+VALID_EDIT_MODES = {'create', 'override', 'create_or_override'}
+VALID_DEP_MODES = {'changed', 'presence_changed'}
+NON_RESTARTABLE_TYPES = {'device', 'scope'}
+
+DEFAULT_DROPIN_NAME = '50-gpo.conf'
+DROPIN_NAME_RE = re.compile(r'^[A-Za-z0-9_.@-]{1,128}\.conf$')
+UNIT_NAME_RE = re.compile(
+ r'^[A-Za-z0-9:_.@-]{1,255}\.(service|socket|timer|path|mount|automount|swap|target|device|slice|scope)$'
+)
+
+MAX_RULES_PER_SCOPE = 512
+MAX_DEPENDENCIES_PER_RULE = 32
+MAX_DEPENDENCY_PATH_LEN = 4096
+MAX_UNIT_FILE_SIZE = 128 * 1024
diff --git a/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po b/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po
index bb7a7186..7b20d829 100644
--- a/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po
+++ b/gpoa/locale/ru_RU/LC_MESSAGES/gpoa.po
@@ -1135,6 +1135,42 @@ msgstr "Ошибка загрузки плагина из файла"
msgid "Plugin failed to apply with user privileges"
msgstr "Плагин не смог примениться с правами пользователя"
+msgid "Running systemd preferences applier for machine"
+msgstr "Запуск применения systemd preferences для машины"
+
+msgid "Running systemd preferences applier for user in admin context"
+msgstr "Запуск применения systemd preferences для пользователя в административном контексте"
+
+msgid "Running systemd preferences applier for user in user context"
+msgstr "Запуск применения systemd preferences для пользователя в пользовательском контексте"
+
+msgid "Systemd preferences applier is disabled"
+msgstr "Применение systemd preferences отключено"
+
+msgid "Applying systemd preference rule"
+msgstr "Применение правила systemd preference"
+
+msgid "Running daemon-reload for systemd preferences"
+msgstr "Выполняется daemon-reload для systemd preferences"
+
+msgid "Cleaning up removed systemd preferences rule"
+msgstr "Очистка удалённого правила systemd preferences"
+
+msgid "Restarting unit due to changed file dependency"
+msgstr "Перезапуск unit из-за изменённой файловой зависимости"
+
+msgid "Invalid Systemds preference entry"
+msgstr "Некорректная запись Systemds preference"
+
+msgid "User systemd manager is unavailable"
+msgstr "Пользовательский менеджер systemd недоступен"
+
+msgid "Restart skipped for non-restartable unit type"
+msgstr "Перезапуск пропущен для нерестартуемого типа unit"
+
+msgid "daemon-reload for systemd preferences failed"
+msgstr "Ошибка daemon-reload для systemd preferences"
+
# Warning_end
# Fatal
@@ -1151,4 +1187,3 @@ msgid "Unknown fatal code"
msgstr "Неизвестный код фатальной ошибки"
-
diff --git a/gpoa/messages/__init__.py b/gpoa/messages/__init__.py
index 754eecf4..1c3becf8 100644
--- a/gpoa/messages/__init__.py
+++ b/gpoa/messages/__init__.py
@@ -362,6 +362,14 @@ def debug_code(code):
debug_ids[237] = 'Failed to load cached versions'
debug_ids[238] = 'The trust attribute is not supported'
debug_ids[239] = 'Setting the trust attribute for a shortcut'
+ debug_ids[240] = 'Running systemd preferences applier for machine'
+ debug_ids[241] = 'Running systemd preferences applier for user in admin context'
+ debug_ids[242] = 'Running systemd preferences applier for user in user context'
+ debug_ids[243] = 'Systemd preferences applier is disabled'
+ debug_ids[244] = 'Applying systemd preference rule'
+ debug_ids[245] = 'Running daemon-reload for systemd preferences'
+ debug_ids[246] = 'Cleaning up removed systemd preferences rule'
+ debug_ids[247] = 'Restarting unit due to changed file dependency'
return debug_ids.get(code, 'Unknown debug code')
@@ -419,6 +427,10 @@ def warning_code(code):
warning_ids[44] = 'Plugin is not valid API object'
warning_ids[45] = 'Error loading plugin from file'
warning_ids[46] = 'Plugin failed to apply with user privileges'
+ warning_ids[47] = 'Invalid Systemds preference entry'
+ warning_ids[48] = 'User systemd manager is unavailable'
+ warning_ids[49] = 'Restart skipped for non-restartable unit type'
+ warning_ids[50] = 'daemon-reload for systemd preferences failed'
return warning_ids.get(code, 'Unknown warning code')
@@ -450,4 +462,3 @@ def message_with_code(code):
retstr = 'core' + '[' + code[0:1] + code[1:].rjust(7, '0') + ']| ' + gettext.gettext(get_message(code))
return retstr
-
diff --git a/gpoa/storage/dconf_registry.py b/gpoa/storage/dconf_registry.py
index 0a93e59e..e19dbf9d 100644
--- a/gpoa/storage/dconf_registry.py
+++ b/gpoa/storage/dconf_registry.py
@@ -97,6 +97,7 @@ class Dconf_registry():
environmentvariables = []
inifiles = []
services = []
+ systemds = []
printers = []
scripts = []
networkshares = []
@@ -454,6 +455,11 @@ def add_networkshare(cls, networkshareobj, policy_name):
networkshareobj.policy_name = policy_name
cls.networkshares.append(networkshareobj)
+ @classmethod
+ def add_systemd(cls, systemdobj, policy_name):
+ systemdobj.policy_name = policy_name
+ cls.systemds.append(systemdobj)
+
@classmethod
def get_shortcuts(cls):
@@ -503,6 +509,10 @@ def get_files(cls):
def get_networkshare(cls):
return cls.networkshares
+ @classmethod
+ def get_systemds(cls):
+ return cls.systemds
+
@classmethod
def get_ini(cls):
@@ -821,8 +831,18 @@ def get_dconf_envprofile():
def convert_elements_to_list_dicts(elements):
return list(map(lambda x: dict(x), elements))
+def _freeze_for_dedup(value):
+ if isinstance(value, dict):
+ return tuple((key, _freeze_for_dedup(val)) for key, val in sorted(value.items()))
+ if isinstance(value, list):
+ return tuple(_freeze_for_dedup(item) for item in value)
+ return value
+
def remove_duplicate_dicts_in_list(list_dict):
- return convert_elements_to_list_dicts(list(OrderedDict((tuple(sorted(d.items())), d) for d in list_dict).values()))
+ result = OrderedDict()
+ for item in convert_elements_to_list_dicts(list_dict):
+ result.setdefault(_freeze_for_dedup(item), item)
+ return list(result.values())
def add_preferences_to_global_registry_dict(username, is_machine):
if is_machine:
@@ -838,6 +858,7 @@ def add_preferences_to_global_registry_dict(username, is_machine):
('Environmentvariables',remove_duplicate_dicts_in_list(Dconf_registry.environmentvariables)),
('Inifiles',remove_duplicate_dicts_in_list(Dconf_registry.inifiles)),
('Services',remove_duplicate_dicts_in_list(Dconf_registry.services)),
+ ('Systemds',remove_duplicate_dicts_in_list(Dconf_registry.systemds)),
('Printers',remove_duplicate_dicts_in_list(Dconf_registry.printers)),
('Scripts',remove_duplicate_dicts_in_list(Dconf_registry.scripts)),
('Networkshares',remove_duplicate_dicts_in_list(Dconf_registry.networkshares))]
diff --git a/gpoa/test/frontend/test_change_journal.py b/gpoa/test/frontend/test_change_journal.py
new file mode 100644
index 00000000..b88d8829
--- /dev/null
+++ b/gpoa/test/frontend/test_change_journal.py
@@ -0,0 +1,115 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import unittest
+import os
+import sys
+import types
+import importlib
+import tempfile
+import unittest.mock
+
+
+def _load_change_journal():
+ if 'frontend' not in sys.modules:
+ frontend_pkg = types.ModuleType('frontend')
+ frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')]
+ sys.modules['frontend'] = frontend_pkg
+ return importlib.import_module('frontend.change_journal')
+
+
+class ChangeJournalTestCase(unittest.TestCase):
+ def test_changed_by_content_update(self):
+ change_journal = _load_change_journal()
+ change_journal.reset()
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ path = os.path.join(tmpdir, 'a.txt')
+ with open(path, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('first')
+ change_journal.watch(path)
+ with open(path, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('second')
+
+ self.assertTrue(change_journal.query(path, mode='changed'))
+ self.assertFalse(change_journal.query(path, mode='presence_changed'))
+
+ def test_presence_changed_on_create_and_delete(self):
+ change_journal = _load_change_journal()
+ change_journal.reset()
+ with tempfile.TemporaryDirectory() as tmpdir:
+ created = os.path.join(tmpdir, 'create.txt')
+ change_journal.watch(created)
+ with open(created, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('x')
+
+ self.assertTrue(change_journal.query(created, mode='presence_changed'))
+ self.assertTrue(change_journal.query(created, mode='changed'))
+
+ deleted = os.path.join(tmpdir, 'delete.txt')
+ with open(deleted, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('x')
+ change_journal.watch(deleted)
+ os.unlink(deleted)
+
+ self.assertTrue(change_journal.query(deleted, mode='presence_changed'))
+ self.assertTrue(change_journal.query(deleted, mode='changed'))
+
+ def test_unchanged_and_unwatched_paths(self):
+ change_journal = _load_change_journal()
+ change_journal.reset()
+ with tempfile.TemporaryDirectory() as tmpdir:
+ path = os.path.join(tmpdir, 'same.txt')
+ with open(path, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('same')
+ change_journal.watch(path)
+
+ self.assertFalse(change_journal.query(path, mode='changed'))
+ self.assertFalse(change_journal.query(path, mode='presence_changed'))
+ self.assertFalse(change_journal.query(os.path.join(tmpdir, 'unwatched.txt'), mode='changed'))
+
+ def test_record_compatibility_for_manual_override(self):
+ change_journal = _load_change_journal()
+ change_journal.reset()
+
+ change_journal.record_changed('/tmp/a')
+ self.assertTrue(change_journal.query('/tmp/a', mode='changed'))
+ self.assertFalse(change_journal.query('/tmp/a', mode='presence_changed'))
+
+ change_journal.record_presence_changed('/tmp/b')
+ self.assertTrue(change_journal.query('/tmp/b', mode='changed'))
+ self.assertTrue(change_journal.query('/tmp/b', mode='presence_changed'))
+
+ def test_query_reuses_current_snapshot_cache(self):
+ change_journal = _load_change_journal()
+ change_journal.reset()
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ path = os.path.join(tmpdir, 'cache.txt')
+ with open(path, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('content')
+ change_journal.watch(path)
+
+ with unittest.mock.patch.object(change_journal, '_sha256', wraps=change_journal._sha256) as hash_mock:
+ self.assertFalse(change_journal.query(path, mode='changed'))
+ self.assertFalse(change_journal.query(path, mode='changed'))
+ self.assertEqual(hash_mock.call_count, 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/gpoa/test/frontend/test_frontend_manager.py b/gpoa/test/frontend/test_frontend_manager.py
new file mode 100644
index 00000000..3129a5dd
--- /dev/null
+++ b/gpoa/test/frontend/test_frontend_manager.py
@@ -0,0 +1,95 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import ast
+import os
+import unittest
+
+
+def _read_frontend_manager_ast():
+ source_path = os.path.join(os.getcwd(), 'frontend', 'frontend_manager.py')
+ with open(source_path, 'r', encoding='utf-8') as file_obj:
+ source = file_obj.read()
+ return ast.parse(source)
+
+
+def _find_method(class_node, method_name):
+ for node in class_node.body:
+ if isinstance(node, ast.FunctionDef) and node.name == method_name:
+ return node
+ return None
+
+
+def _has_call(node, function_name):
+ for child in ast.walk(node):
+ if not isinstance(child, ast.Call):
+ continue
+ if isinstance(child.func, ast.Name) and child.func.id == function_name:
+ return True
+ if isinstance(child.func, ast.Attribute) and child.func.attr == function_name:
+ return True
+ return False
+
+
+def _find_stmt_index(statements, predicate):
+ for index, statement in enumerate(statements):
+ if predicate(statement):
+ return index
+ return None
+
+
+class FrontendManagerOrderTestCase(unittest.TestCase):
+ def test_machine_apply_primes_journal_after_reset_before_apply_loop(self):
+ tree = _read_frontend_manager_ast()
+ manager_class = next(node for node in tree.body
+ if isinstance(node, ast.ClassDef) and node.name == 'frontend_manager')
+ method = _find_method(manager_class, 'machine_apply')
+ self.assertIsNotNone(method)
+
+ reset_index = _find_stmt_index(method.body, lambda stmt: _has_call(stmt, 'reset_change_journal'))
+ prime_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.Try)
+ and _has_call(stmt, 'prime_dependency_journal'))
+ loop_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.For))
+
+ self.assertIsNotNone(reset_index)
+ self.assertIsNotNone(prime_index)
+ self.assertIsNotNone(loop_index)
+ self.assertLess(reset_index, prime_index)
+ self.assertLess(prime_index, loop_index)
+
+ def test_user_apply_primes_journal_after_reset_before_apply_branches(self):
+ tree = _read_frontend_manager_ast()
+ manager_class = next(node for node in tree.body
+ if isinstance(node, ast.ClassDef) and node.name == 'frontend_manager')
+ method = _find_method(manager_class, 'user_apply')
+ self.assertIsNotNone(method)
+
+ reset_index = _find_stmt_index(method.body, lambda stmt: _has_call(stmt, 'reset_change_journal'))
+ prime_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.Try)
+ and _has_call(stmt, 'prime_dependency_journal'))
+ branch_index = _find_stmt_index(method.body, lambda stmt: isinstance(stmt, ast.If))
+
+ self.assertIsNotNone(reset_index)
+ self.assertIsNotNone(prime_index)
+ self.assertIsNotNone(branch_index)
+ self.assertLess(reset_index, prime_index)
+ self.assertLess(prime_index, branch_index)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/gpoa/test/frontend/test_systemd_applier.py b/gpoa/test/frontend/test_systemd_applier.py
new file mode 100644
index 00000000..d6fd648c
--- /dev/null
+++ b/gpoa/test/frontend/test_systemd_applier.py
@@ -0,0 +1,255 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import importlib
+import os
+import sys
+import types
+import unittest
+import unittest.mock
+
+
+class _entry:
+ def __init__(self, valuename, data):
+ self.valuename = valuename
+ self.data = data
+
+
+class _storage_stub:
+ def __init__(self, entries):
+ self._entries = entries
+
+ def filter_hklm_entries(self, _branch):
+ return self._entries
+
+ def get_key_value(self, _path):
+ return None
+
+
+def _load_systemd_applier_module():
+ if 'frontend' not in sys.modules:
+ frontend_pkg = types.ModuleType('frontend')
+ frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')]
+ sys.modules['frontend'] = frontend_pkg
+ return importlib.import_module('frontend.systemd_applier')
+
+
+def _load_systemd_manager_module():
+ if 'frontend' not in sys.modules:
+ frontend_pkg = types.ModuleType('frontend')
+ frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')]
+ sys.modules['frontend'] = frontend_pkg
+ return importlib.import_module('frontend.appliers.systemd')
+
+
+class _fake_dbus_exception(Exception):
+ def __init__(self, message, dbus_name):
+ super().__init__(message)
+ self._dbus_name = dbus_name
+
+ def get_dbus_name(self):
+ return self._dbus_name
+
+
+class _fake_manager_iface:
+ def __init__(self, load_exc=None):
+ self.unit_path = '/org/freedesktop/systemd1/unit/demo_2eservice'
+ self.load_exc = load_exc
+
+ def LoadUnit(self, _unit_name):
+ if self.load_exc:
+ raise self.load_exc
+ return self.unit_path
+
+
+class _fake_properties_iface:
+ def __init__(self, load_state='loaded', get_exc=None):
+ self.load_state = load_state
+ self.get_exc = get_exc
+
+ def Get(self, _iface, _name):
+ if self.get_exc:
+ raise self.get_exc
+ return self.load_state
+
+
+class _fake_proxy:
+ def __init__(self, ifaces):
+ self.ifaces = ifaces
+
+
+class _fake_bus:
+ def __init__(self, objects):
+ self._objects = objects
+
+ def get_object(self, _bus_name, object_path):
+ return self._objects[str(object_path)]
+
+
+class _fake_dbus_module:
+ DBusException = _fake_dbus_exception
+
+ def __init__(self, load_state='loaded', load_exc=None, get_exc=None):
+ manager_iface = _fake_manager_iface(load_exc=load_exc)
+ properties_iface = _fake_properties_iface(load_state=load_state, get_exc=get_exc)
+ self._objects = {
+ '/org/freedesktop/systemd1': _fake_proxy({
+ 'org.freedesktop.systemd1.Manager': manager_iface,
+ }),
+ manager_iface.unit_path: _fake_proxy({
+ 'org.freedesktop.DBus.Properties': properties_iface,
+ }),
+ }
+
+ def SystemBus(self):
+ return _fake_bus(self._objects)
+
+ def SessionBus(self):
+ return _fake_bus(self._objects)
+
+ @staticmethod
+ def String(value):
+ return value
+
+ @staticmethod
+ def Boolean(value):
+ return value
+
+ @staticmethod
+ def Interface(proxy, interface_name=None, dbus_interface=None):
+ iface_name = dbus_interface if dbus_interface is not None else interface_name
+ return proxy.ifaces[iface_name]
+
+
+class _fake_subprocess_result:
+ def __init__(self, returncode=0, stdout='', stderr=''):
+ self.returncode = returncode
+ self.stdout = stdout
+ self.stderr = stderr
+
+
+class SystemdApplierTestCase(unittest.TestCase):
+ def test_run_skips_invalid_unit_name(self):
+ module = _load_systemd_applier_module()
+ storage = _storage_stub([
+ _entry('/tmp/evil.service', '1'),
+ _entry('ok.service', '1'),
+ ])
+ applier = module.systemd_applier(storage)
+
+ good_unit = unittest.mock.Mock()
+ with unittest.mock.patch('frontend.systemd_applier.systemd_unit', return_value=good_unit) as ctor:
+ applier.run()
+
+ ctor.assert_called_once_with('ok.service', 1)
+
+ def test_run_handles_dbus_apply_error(self):
+ module = _load_systemd_applier_module()
+ from frontend.appliers.systemd import SystemdManagerError
+
+ storage = _storage_stub([_entry('ok.service', '1')])
+ applier = module.systemd_applier(storage)
+
+ bad_unit = unittest.mock.Mock()
+ bad_unit.unit_name = 'ok.service'
+ bad_unit.apply.side_effect = SystemdManagerError('boom', action='start', unit='ok.service')
+
+ with unittest.mock.patch('frontend.systemd_applier.systemd_unit', return_value=bad_unit):
+ applier.run()
+
+ bad_unit.apply.assert_called_once()
+
+ def test_manager_exists_returns_false_for_not_found_load_state(self):
+ module = _load_systemd_manager_module()
+ fake_dbus = _fake_dbus_module(load_state='not-found')
+
+ with unittest.mock.patch('frontend.appliers.systemd._import_dbus', return_value=fake_dbus):
+ manager = module.SystemdManager(mode='machine')
+
+ self.assertFalse(manager.exists('demo.service'))
+
+ def test_manager_exists_returns_true_for_loaded_load_state(self):
+ module = _load_systemd_manager_module()
+ fake_dbus = _fake_dbus_module(load_state='loaded')
+
+ with unittest.mock.patch('frontend.appliers.systemd._import_dbus', return_value=fake_dbus):
+ manager = module.SystemdManager(mode='machine')
+
+ self.assertTrue(manager.exists('demo.service'))
+
+ def test_manager_exists_handles_no_such_unit_error(self):
+ module = _load_systemd_manager_module()
+ exc = _fake_dbus_exception('missing', 'org.freedesktop.systemd1.NoSuchUnit')
+ fake_dbus = _fake_dbus_module(load_exc=exc)
+
+ with unittest.mock.patch('frontend.appliers.systemd._import_dbus', return_value=fake_dbus):
+ manager = module.SystemdManager(mode='machine')
+
+ self.assertFalse(manager.exists('demo.service'))
+
+ def test_global_manager_exists_uses_systemctl_global_cat(self):
+ module = _load_systemd_manager_module()
+
+ with unittest.mock.patch('frontend.appliers.systemd.subprocess.run') as run_mock:
+ run_mock.return_value = _fake_subprocess_result(returncode=0, stdout='# /etc/systemd/user/demo.service\n')
+ manager = module.SystemdManager(mode='global_user')
+
+ self.assertTrue(manager.exists('demo.service'))
+ run_mock.assert_called_once_with(
+ ['systemctl', '--global', 'cat', 'demo.service'],
+ stdout=module.subprocess.PIPE,
+ stderr=module.subprocess.PIPE,
+ text=True,
+ check=False,
+ )
+
+ def test_global_manager_apply_state_ignores_now_runtime_actions(self):
+ module = _load_systemd_manager_module()
+
+ with unittest.mock.patch('frontend.appliers.systemd.subprocess.run') as run_mock:
+ run_mock.side_effect = [
+ _fake_subprocess_result(returncode=0),
+ _fake_subprocess_result(returncode=0),
+ ]
+ manager = module.SystemdManager(mode='global_user')
+ start_mock = unittest.mock.Mock(side_effect=AssertionError('start() must not be used for --global'))
+ manager.start = start_mock
+
+ manager.apply_state('demo.service', 'enable', now=True)
+
+ self.assertEqual(run_mock.call_args_list, [
+ unittest.mock.call(
+ ['systemctl', '--global', 'unmask', 'demo.service'],
+ stdout=module.subprocess.PIPE,
+ stderr=module.subprocess.PIPE,
+ text=True,
+ check=False,
+ ),
+ unittest.mock.call(
+ ['systemctl', '--global', 'enable', 'demo.service'],
+ stdout=module.subprocess.PIPE,
+ stderr=module.subprocess.PIPE,
+ text=True,
+ check=False,
+ ),
+ ])
+ start_mock.assert_not_called()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/gpoa/test/frontend/test_systemd_preferences_applier.py b/gpoa/test/frontend/test_systemd_preferences_applier.py
new file mode 100644
index 00000000..e0d8430d
--- /dev/null
+++ b/gpoa/test/frontend/test_systemd_preferences_applier.py
@@ -0,0 +1,959 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import tempfile
+import os
+import sys
+import types
+import importlib
+import base64
+import unittest
+import unittest.mock
+from pathlib import Path
+
+
+class _storage_stub:
+ def __init__(self, values=None):
+ self.values = values or {}
+
+ def get_entry(self, path, dictionary=None, preg=True):
+ return self.values.get(path)
+
+ def get_key_value(self, path):
+ return None
+
+
+class _manager_stub:
+ def __init__(self, exists_map=None, active_state_map=None, reload_exc=None):
+ self.exists_map = exists_map or {}
+ self.active_state_map = active_state_map or {}
+ self.reload_exc = reload_exc
+ self.exists_calls = []
+ self.apply_state_calls = []
+ self.restart_calls = []
+ self.stop_calls = []
+ self.reload_calls = 0
+ self.call_order = []
+
+ def exists(self, unit_name):
+ self.exists_calls.append(unit_name)
+ return self.exists_map.get(unit_name, False)
+
+ def reload(self):
+ self.call_order.append('reload')
+ self.reload_calls += 1
+ if self.reload_exc is not None:
+ raise self.reload_exc
+
+ def active_state(self, unit_name):
+ return self.active_state_map.get(unit_name, 'inactive')
+
+ def restart(self, unit_name):
+ self.call_order.append('restart:{}'.format(unit_name))
+ self.restart_calls.append(unit_name)
+
+ def stop(self, unit_name):
+ self.call_order.append('stop:{}'.format(unit_name))
+ self.stop_calls.append(unit_name)
+
+ def apply_state(self, unit_name, state, now):
+ self.call_order.append('apply_state:{}'.format(unit_name))
+ self.apply_state_calls.append((unit_name, state, now))
+
+
+def _stub_external_modules():
+ """Stub heavy external dependencies not available in unit test environment."""
+ if 'samba' not in sys.modules:
+ samba_stub = types.ModuleType('samba')
+ samba_stub.getopt = types.ModuleType('samba.getopt')
+ sys.modules['samba'] = samba_stub
+ sys.modules['samba.getopt'] = samba_stub.getopt
+
+ if 'util.samba' not in sys.modules:
+ util_samba = types.ModuleType('util.samba')
+
+ class _smbopts_stub:
+ def get_server_role(self):
+ return 'member server'
+
+ util_samba.smbopts = _smbopts_stub
+ sys.modules['util.samba'] = util_samba
+
+ if 'util' not in sys.modules:
+ util_pkg = types.ModuleType('util')
+ util_pkg.__path__ = [os.path.join(os.getcwd(), 'util')]
+ sys.modules['util'] = util_pkg
+
+ if 'gpoa' not in sys.modules:
+ gpoa_stub = types.ModuleType('gpoa')
+ gpoa_stub.__path__ = [os.getcwd()]
+ sys.modules['gpoa'] = gpoa_stub
+
+ if 'gpoa.messages' not in sys.modules:
+ msg_stub = types.ModuleType('gpoa.messages')
+ msg_stub.message_with_code = lambda code, *a, **kw: str(code)
+ sys.modules['gpoa.messages'] = msg_stub
+
+
+def _load_spa():
+ _stub_external_modules()
+ if 'frontend' not in sys.modules:
+ frontend_pkg = types.ModuleType('frontend')
+ frontend_pkg.__path__ = [os.path.join(os.getcwd(), 'frontend')]
+ sys.modules['frontend'] = frontend_pkg
+ return importlib.import_module('frontend.systemd_preferences_applier')
+
+
+class SystemdPreferencesApplierTestCase(unittest.TestCase):
+ def test_apply_mode_skips_non_matching_rules(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ manager = _manager_stub(exists_map={
+ 'exists.service': True,
+ 'missing.service': False,
+ })
+ runtime.systemd_manager = manager
+ runtime.apply_rules([
+ {
+ 'uid': '1',
+ 'unit': 'missing.service',
+ 'state': 'enable',
+ 'now': False,
+ 'apply_mode': 'if_exists',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ {
+ 'uid': '2',
+ 'unit': 'exists.service',
+ 'state': 'disable',
+ 'now': False,
+ 'apply_mode': 'if_missing',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ {
+ 'uid': '3',
+ 'unit': 'exists.service',
+ 'state': 'enable',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ ])
+
+ self.assertEqual(manager.apply_state_calls, [('exists.service', 'enable', False)])
+
+ def test_non_applicable_rules_still_reach_phase2_candidates(self):
+ # Rules that don't match apply_mode must still be checked for
+ # dependency-triggered restarts (phase2_candidates).
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(exists_map={'exists.service': True})
+ runtime.apply_rules([
+ {
+ 'uid': '1',
+ 'unit': 'exists.service',
+ 'state': 'enable',
+ 'now': False,
+ 'apply_mode': 'if_missing',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [{'mode': 'changed', 'path': '/tmp/test.ini'}],
+ 'element_type': 'service',
+ },
+ ])
+
+ self.assertEqual(len(runtime.phase2_candidates), 1)
+ self.assertEqual(runtime.phase2_candidates[0]['uid'], '1')
+ # State action must not have been applied (apply_mode didn't match)
+ self.assertEqual(runtime.systemd_manager.apply_state_calls, [])
+
+ def test_non_applicable_rule_triggers_dependency_restart(self):
+ # Service exists, apply_mode='if_missing' → edit skipped, but
+ # dependency change must still trigger a restart.
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(
+ exists_map={'exists.service': True},
+ active_state_map={'exists.service': 'active'},
+ )
+ runtime.apply_rules([
+ {
+ 'uid': '1',
+ 'unit': 'exists.service',
+ 'state': 'enable',
+ 'now': False,
+ 'apply_mode': 'if_missing',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [{'mode': 'changed', 'path': '/tmp/test.ini'}],
+ 'element_type': 'service',
+ },
+ ])
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.query', return_value=True):
+ runtime.post_restart()
+
+ self.assertIn('exists.service', runtime.systemd_manager.restart_calls)
+
+ def test_edit_mode_create_or_override_writes_expected_paths(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(exists_map={
+ 'exists.service': True,
+ 'new.service': False,
+ })
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ runtime.apply_rules([
+ {
+ 'uid': '10',
+ 'unit': 'exists.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create_or_override',
+ 'dropin_name': 'custom.conf',
+ 'unit_file': '[Service]\nRestart=always',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ {
+ 'uid': '11',
+ 'unit': 'new.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create_or_override',
+ 'dropin_name': 'custom.conf',
+ 'unit_file': '[Service]\nRestart=no',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ ])
+
+ dropin_path = '{}/exists.service.d/custom.conf'.format(tmpdir)
+ create_path = '{}/new.service'.format(tmpdir)
+ with open(dropin_path, 'r', encoding='utf-8') as fh:
+ self.assertIn('gpupdate-managed uid: 10', fh.read())
+ with open(create_path, 'r', encoding='utf-8') as fh:
+ self.assertIn('gpupdate-managed uid: 11', fh.read())
+ self.assertGreaterEqual(runtime.systemd_manager.reload_calls, 1)
+
+ def test_apply_rules_uses_reload_barrier_before_state_actions(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ manager = _manager_stub(exists_map={'demo.service': False})
+ runtime.systemd_manager = manager
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ runtime.apply_rules([{
+ 'uid': 'reload-order',
+ 'unit': 'demo.service',
+ 'state': 'enable',
+ 'now': True,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': '[Unit]\nDescription=Demo',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ }])
+
+ self.assertEqual(manager.call_order, ['reload', 'apply_state:demo.service'])
+
+ def test_apply_rules_skips_state_actions_when_reload_fails(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ manager = _manager_stub(
+ exists_map={'demo.service': False, 'stateonly.service': True},
+ reload_exc=spa.SystemdManagerError('reload failed', action='reload'),
+ )
+ runtime.systemd_manager = manager
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ runtime.apply_rules([
+ {
+ 'uid': 'reload-fail',
+ 'unit': 'demo.service',
+ 'state': 'enable',
+ 'now': True,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': '[Unit]\nDescription=Demo',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ {
+ 'uid': 'state-only',
+ 'unit': 'stateonly.service',
+ 'state': 'disable',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ },
+ ])
+
+ self.assertEqual(manager.reload_calls, 1)
+ self.assertEqual(manager.apply_state_calls, [])
+ self.assertEqual(runtime.phase2_candidates, [])
+
+ def test_normalize_rule_unescapes_newline_sequences_in_unit_file(self):
+ spa = _load_spa()
+
+ normalized = spa._normalize_rule({
+ 'uid': '12',
+ 'unit': 'escaped.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': '[Service]\\nRestart=always',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ })
+
+ self.assertEqual(normalized['unit_file'], '[Service]\nRestart=always')
+
+ def test_normalize_rule_maps_remove_policy_aliases(self):
+ spa = _load_spa()
+
+ normalized_legacy = spa._normalize_rule({
+ 'uid': 'rp-1',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'removePolicy': '1',
+ })
+ normalized_snake = spa._normalize_rule({
+ 'uid': 'rp-2',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'remove_policy': True,
+ })
+
+ self.assertTrue(normalized_legacy['remove_policy'])
+ self.assertTrue(normalized_snake['remove_policy'])
+
+ def test_normalize_rule_decodes_unit_file_b64_with_priority(self):
+ spa = _load_spa()
+
+ original = "[Service]\nExecStart=/bin/bash -c \"echo 'ok'\"\n"
+ encoded = base64.b64encode(original.encode('utf-8')).decode('ascii')
+ normalized = spa._normalize_rule({
+ 'uid': '13',
+ 'unit': 'encoded.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file_b64': encoded,
+ 'unit_file': '[Service]\\nExecStart=/bin/false',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ })
+
+ self.assertEqual(normalized['unit_file'], original)
+
+ def test_normalize_rule_falls_back_to_legacy_when_unit_file_b64_invalid(self):
+ spa = _load_spa()
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.log') as log_mock:
+ normalized = spa._normalize_rule({
+ 'uid': '14',
+ 'unit': 'encoded.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file_b64': 'invalid-%%%',
+ 'unit_file': '[Service]\\nRestart=always',
+ 'file_dependencies': [],
+ 'element_type': 'service',
+ })
+
+ self.assertEqual(normalized['unit_file'], '[Service]\nRestart=always')
+ log_mock.assert_any_call('W47', {
+ 'reason': 'Invalid unit_file_b64 payload',
+ 'unit': 'encoded.service',
+ 'uid': '14',
+ })
+
+ def test_normalize_rule_truncates_too_many_dependencies(self):
+ spa = _load_spa()
+
+ too_many = [{'mode': 'changed', 'path': '/etc/demo{}'.format(idx)} for idx in range(64)]
+ normalized = spa._normalize_rule({
+ 'uid': '15',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'dropin_name': '50-gpo.conf',
+ 'file_dependencies': too_many,
+ })
+ self.assertIsNotNone(normalized)
+ self.assertEqual(len(normalized['file_dependencies']), 32)
+
+ def test_normalize_rule_filters_invalid_dependency_paths(self):
+ spa = _load_spa()
+
+ normalized = spa._normalize_rule({
+ 'uid': '16',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ {'mode': 'changed', 'path': '../relative'},
+ {'mode': 'changed', 'path': '/tmp/\ninvalid'},
+ ],
+ })
+ self.assertEqual(normalized['file_dependencies'], [{'mode': 'changed', 'path': '/etc/demo.conf'}])
+
+ def test_normalize_rule_rejects_oversized_unit_file(self):
+ spa = _load_spa()
+
+ huge_payload = 'A' * (spa.MAX_UNIT_FILE_SIZE + 1)
+ encoded = base64.b64encode(huge_payload.encode('utf-8')).decode('ascii')
+ normalized = spa._normalize_rule({
+ 'uid': '17',
+ 'unit': 'huge.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file_b64': encoded,
+ })
+ self.assertIsNone(normalized['unit_file'])
+
+ def test_post_restart_uses_dependency_modes(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(active_state_map={'demo.service': 'active'})
+ runtime.phase2_candidates = [{
+ 'uid': '1',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ {'mode': 'presence_changed', 'path': '/etc/demo.presence'},
+ ],
+ 'element_type': 'service',
+ }]
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.query') as query_mock:
+ query_mock.side_effect = lambda path, mode='changed': mode == 'changed'
+ runtime.post_restart()
+
+ self.assertIn('demo.service', runtime.systemd_manager.restart_calls)
+
+ def test_post_restart_skips_when_dependency_unchanged(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(active_state_map={'demo.service': 'active'})
+ runtime.phase2_candidates = [{
+ 'uid': '1',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'now': False,
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropin_name': '50-gpo.conf',
+ 'unit_file': None,
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ ],
+ 'element_type': 'service',
+ }]
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.query', return_value=False):
+ runtime.post_restart()
+
+ self.assertNotIn('demo.service', runtime.systemd_manager.restart_calls)
+
+ def test_removed_rules_detected_from_previous_snapshot(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([{
+ 'uid': 'keep',
+ 'unit': 'keep.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ }]),
+ 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([
+ {
+ 'uid': 'keep',
+ 'unit': 'keep.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ },
+ {
+ 'uid': 'drop',
+ 'unit': 'drop.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ },
+ ]),
+ })
+ removed = spa._get_removed_rules(storage, 'Machine', 'machine')
+ self.assertEqual(len(removed), 1)
+ self.assertEqual(removed[0]['uid'], 'drop')
+
+ def test_get_rule_sets_for_scope_includes_remove_policy_cleanup(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([
+ {
+ 'uid': 'active',
+ 'unit': 'active.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'removePolicy': '0',
+ },
+ {
+ 'uid': 'cleanup',
+ 'unit': 'cleanup.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ 'removePolicy': '1',
+ },
+ ]),
+ 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([]),
+ })
+
+ active_rules, cleanup_rules = spa._get_rule_sets_for_scope(storage, 'Machine', 'machine')
+ self.assertEqual([rule['uid'] for rule in active_rules], ['active'])
+ self.assertEqual([rule['uid'] for rule in cleanup_rules], ['cleanup'])
+
+ def test_normalize_rule_rejects_unsafe_unit_and_dropin_paths(self):
+ spa = _load_spa()
+
+ bad_unit = {
+ 'uid': 'bad-unit',
+ 'unit': '/tmp/evil.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ }
+ bad_dropin = {
+ 'uid': 'bad-dropin',
+ 'unit': 'safe.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'dropInName': '../../evil.conf',
+ }
+ self.assertIsNone(spa._normalize_rule(bad_unit))
+ self.assertIsNone(spa._normalize_rule(bad_dropin))
+
+ def test_cleanup_removed_rules_keeps_non_restartable_types_skipped(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(active_state_map={'usb.device': 'active'})
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ managed = os.path.join(tmpdir, 'usb.device')
+ with open(managed, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('# gpupdate-managed uid: deadbeef\n[Unit]\nDescription=test\n')
+
+ removed_rule = {
+ 'uid': 'deadbeef',
+ 'unit': 'usb.device',
+ 'dropin_name': '50-gpo.conf',
+ 'element_type': 'device',
+ }
+
+ runtime.cleanup_removed_rules([removed_rule])
+
+ self.assertFalse(os.path.exists(managed))
+ self.assertEqual(runtime.systemd_manager.reload_calls, 1)
+ self.assertNotIn('usb.device', runtime.systemd_manager.stop_calls)
+
+ def test_cleanup_removed_rules_requires_marker_on_first_line(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(active_state_map={'demo.service': 'active'})
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ managed = os.path.join(tmpdir, 'demo.service')
+ with open(managed, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('[Unit]\n# gpupdate-managed uid: deadbeef\nDescription=test\n')
+
+ removed_rule = {
+ 'uid': 'deadbeef',
+ 'unit': 'demo.service',
+ 'dropin_name': '50-gpo.conf',
+ 'element_type': 'service',
+ }
+ runtime.cleanup_removed_rules([removed_rule])
+
+ self.assertTrue(os.path.exists(managed))
+ self.assertEqual(runtime.systemd_manager.reload_calls, 0)
+
+ def test_cleanup_removed_rules_skips_restart_when_reload_fails(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub(
+ active_state_map={'demo.service': 'active'},
+ reload_exc=spa.SystemdManagerError('reload failed', action='reload'),
+ )
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ managed = os.path.join(tmpdir, 'demo.service')
+ with open(managed, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('# gpupdate-managed uid: deadbeef\n[Unit]\nDescription=test\n')
+
+ removed_rule = {
+ 'uid': 'deadbeef',
+ 'unit': 'demo.service',
+ 'dropin_name': '50-gpo.conf',
+ 'element_type': 'service',
+ }
+ runtime.cleanup_removed_rules([removed_rule])
+
+ self.assertFalse(os.path.exists(managed))
+ self.assertEqual(runtime.systemd_manager.reload_calls, 1)
+ self.assertEqual(runtime.systemd_manager.stop_calls, [])
+
+ def test_write_rule_file_skips_symlink_target(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ runtime = spa._systemd_preferences_runtime(storage, 'Machine', spa._Context(mode='machine'))
+ runtime.systemd_manager = _manager_stub()
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ runtime.context.systemd_dir = tmpdir
+ real_target = os.path.join(tmpdir, 'real.service')
+ with open(real_target, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('real')
+
+ symlink_target = os.path.join(tmpdir, 'evil.service')
+ os.symlink(real_target, symlink_target)
+
+ runtime._write_rule_file(Path(symlink_target), 'uid-1', '[Unit]\nDescription=test')
+ with open(real_target, 'r', encoding='utf-8') as file_obj:
+ self.assertEqual(file_obj.read(), 'real')
+
+ def test_user_context_skips_when_user_manager_unavailable(self):
+ spa = _load_spa()
+
+ storage = _storage_stub()
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ applier = spa.systemd_preferences_applier_user(storage, 'root')
+ with unittest.mock.patch('os.path.exists', return_value=False):
+ with unittest.mock.patch('frontend.systemd_preferences_applier._systemd_preferences_runtime.apply_rules') as apply_mock:
+ applier.user_context_apply()
+ self.assertFalse(apply_mock.called)
+
+ def test_prime_dependency_journal_machine_watches_machine_dependencies(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([{
+ 'uid': 'rule-1',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ ],
+ }]),
+ })
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ applier = spa.systemd_preferences_applier(storage)
+ with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock:
+ applier.prime_dependency_journal()
+ watch_many_mock.assert_called_once_with(['/etc/demo.conf'])
+
+ def test_prime_dependency_journal_machine_watches_global_user_dependencies(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([
+ {
+ 'uid': 'rule-machine',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ ],
+ },
+ {
+ 'uid': 'rule-global-user',
+ 'unit': 'demo-user.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'user',
+ 'edit_mode': 'override',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo-user.conf'},
+ ],
+ },
+ ]),
+ })
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ applier = spa.systemd_preferences_applier(storage)
+ with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock:
+ applier.prime_dependency_journal()
+ watch_many_mock.assert_called_once_with(['/etc/demo.conf', '/etc/demo-user.conf'])
+
+ def test_prime_dependency_journal_skips_remove_policy_rules(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([{
+ 'uid': 'rule-removed',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'removePolicy': '1',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ ],
+ }]),
+ })
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ applier = spa.systemd_preferences_applier(storage)
+ with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock:
+ applier.prime_dependency_journal()
+ watch_many_mock.assert_called_once_with([])
+
+ def test_prime_dependency_journal_user_watches_machine_and_user_dependencies(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/alice/Systemds': str([
+ {
+ 'uid': 'rule-machine',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'override',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '/etc/demo.conf'},
+ ],
+ },
+ {
+ 'uid': 'rule-user',
+ 'unit': 'demo.service',
+ 'state': 'as_is',
+ 'apply_mode': 'always',
+ 'policy_target': 'user',
+ 'edit_mode': 'override',
+ 'file_dependencies': [
+ {'mode': 'changed', 'path': '%HOME%/.config/demo.conf'},
+ ],
+ },
+ ]),
+ })
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ with unittest.mock.patch('frontend.systemd_preferences_applier.get_uid_by_username', return_value=1000):
+ with unittest.mock.patch('frontend.systemd_preferences_applier.get_homedir', return_value='/home/alice'):
+ applier = spa.systemd_preferences_applier_user(storage, 'alice')
+ with unittest.mock.patch('frontend.systemd_preferences_applier.watch_many') as watch_many_mock:
+ applier.prime_dependency_journal()
+ expected_user_path = spa._expand_windows_var('%HOME%/.config/demo.conf', username='alice')
+ watch_many_mock.assert_called_once_with(['/etc/demo.conf', expected_user_path])
+
+ def test_apply_uses_cleanup_rules_for_remove_policy_items(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([
+ {
+ 'uid': 'cleanup',
+ 'unit': 'cleanup.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ 'removePolicy': '1',
+ },
+ ]),
+ 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([]),
+ })
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ with unittest.mock.patch('frontend.systemd_preferences_applier._systemd_preferences_runtime') as runtime_ctor:
+ runtime = unittest.mock.Mock()
+ global_runtime = unittest.mock.Mock()
+ runtime_ctor.side_effect = [runtime, global_runtime]
+
+ applier = spa.systemd_preferences_applier(storage)
+ applier.apply()
+
+ runtime.apply_rules.assert_called_once_with([])
+ cleanup_arg = runtime.cleanup_removed_rules.call_args[0][0]
+ self.assertEqual(len(cleanup_arg), 1)
+ self.assertEqual(cleanup_arg[0]['uid'], 'cleanup')
+ global_runtime.apply_rules.assert_called_once_with([])
+ global_runtime.cleanup_removed_rules.assert_called_once_with([])
+
+ def test_machine_apply_routes_global_user_rules_to_global_context(self):
+ spa = _load_spa()
+
+ storage = _storage_stub({
+ 'Software/BaseALT/Policies/Preferences/Machine/Systemds': str([
+ {
+ 'uid': 'machine-rule',
+ 'unit': 'machine.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'machine',
+ 'edit_mode': 'create',
+ },
+ {
+ 'uid': 'global-user-rule',
+ 'unit': 'global-user.service',
+ 'state': 'enable',
+ 'apply_mode': 'always',
+ 'policy_target': 'user',
+ 'edit_mode': 'create',
+ },
+ ]),
+ 'Previous/Software/BaseALT/Policies/Preferences/Machine/Systemds': str([]),
+ })
+
+ with unittest.mock.patch('frontend.systemd_preferences_applier.check_enabled', return_value=True):
+ with unittest.mock.patch('frontend.systemd_preferences_applier._systemd_preferences_runtime') as runtime_ctor:
+ runtime = unittest.mock.Mock()
+ global_runtime = unittest.mock.Mock()
+ runtime_ctor.side_effect = [runtime, global_runtime]
+
+ applier = spa.systemd_preferences_applier(storage)
+ applier.apply()
+
+ self.assertEqual(runtime_ctor.call_args_list[0][0][2].mode, 'machine')
+ self.assertEqual(runtime_ctor.call_args_list[1][0][2].mode, 'global_user')
+ runtime.apply_rules.assert_called_once_with([
+ unittest.mock.ANY,
+ ])
+ self.assertEqual(runtime.apply_rules.call_args[0][0][0]['uid'], 'machine-rule')
+ global_runtime.apply_rules.assert_called_once_with([
+ unittest.mock.ANY,
+ ])
+ self.assertEqual(global_runtime.apply_rules.call_args[0][0][0]['uid'], 'global-user-rule')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/gpoa/test/gpt/data/Systemds.xml b/gpoa/test/gpt/data/Systemds.xml
new file mode 100644
index 00000000..b2b4d60c
--- /dev/null
+++ b/gpoa/test/gpt/data/Systemds.xml
@@ -0,0 +1,43 @@
+
+
+
+
+ [Service]
+Restart=always
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/gpoa/test/gpt/data/Systemds_invalid.xml b/gpoa/test/gpt/data/Systemds_invalid.xml
new file mode 100644
index 00000000..6c9bfa74
--- /dev/null
+++ b/gpoa/test/gpt/data/Systemds_invalid.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/gpoa/test/gpt/test_systemds.py b/gpoa/test/gpt/test_systemds.py
new file mode 100644
index 00000000..82663882
--- /dev/null
+++ b/gpoa/test/gpt/test_systemds.py
@@ -0,0 +1,236 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import os
+import tempfile
+import unittest
+import unittest.mock
+import ast
+import base64
+import types
+from enum import Enum, unique
+from pathlib import Path
+
+
+class _storage_stub:
+ def __init__(self):
+ self.items = []
+
+ def add_systemd(self, item, policy_name):
+ item.policy_name = policy_name
+ self.items.append(item)
+
+def _load_gpt_discovery_helpers():
+ source_path = os.path.join(os.getcwd(), 'gpt', 'gpt.py')
+ with open(source_path, 'r', encoding='utf-8') as file_obj:
+ tree = ast.parse(file_obj.read(), filename=source_path)
+
+ needed_names = {
+ 'FileType',
+ 'get_preftype',
+ 'find_dir',
+ 'find_file',
+ 'find_preferences',
+ 'find_preffile',
+ }
+ selected_nodes = []
+ for node in tree.body:
+ if isinstance(node, (ast.FunctionDef, ast.ClassDef)) and node.name in needed_names:
+ selected_nodes.append(node)
+ elif isinstance(node, ast.Assign):
+ targets = [target.id for target in node.targets if isinstance(target, ast.Name)]
+ if any(name in needed_names for name in targets):
+ selected_nodes.append(node)
+
+ module = ast.Module(body=selected_nodes, type_ignores=[])
+ namespace = {
+ 'Enum': Enum,
+ 'unique': unique,
+ 'os': os,
+ 'Path': Path,
+ }
+ exec(compile(module, source_path, 'exec'), namespace)
+ return types.SimpleNamespace(**namespace)
+
+
+class GptSystemdsTestCase(unittest.TestCase):
+ def _path(self, filename):
+ return '{}/test/gpt/data/{}'.format(os.getcwd(), filename)
+
+ def test_read_systemds_all_types(self):
+ import gpt.systemds
+
+ items = gpt.systemds.read_systemds(self._path('Systemds.xml'))
+ self.assertEqual(len(items), 11)
+ self.assertEqual(items[0].unit, 'sshd.service')
+ self.assertEqual(items[0].state, 'enable')
+ self.assertEqual(items[0].apply_mode, 'always')
+ self.assertEqual(items[0].policy_target, 'machine')
+ self.assertEqual(items[0].edit_mode, 'create_or_override')
+ self.assertEqual(items[0].dropin_name, 'override.conf')
+ self.assertEqual(items[0].unit_file_mode, 'text')
+ self.assertEqual(
+ base64.b64decode(items[0].unit_file_b64.encode('ascii')).decode('utf-8'),
+ items[0].unit_file,
+ )
+ self.assertEqual(len(items[0].file_dependencies), 2)
+
+ # Ensure automatic suffix mapping works for all supported tags.
+ expected_suffixes = {
+ 'service': '.service',
+ 'socket': '.socket',
+ 'timer': '.timer',
+ 'path': '.path',
+ 'mount': '.mount',
+ 'automount': '.automount',
+ 'swap': '.swap',
+ 'target': '.target',
+ 'device': '.device',
+ 'slice': '.slice',
+ 'scope': '.scope',
+ }
+ for item in items:
+ self.assertTrue(item.unit.endswith(expected_suffixes[item.element_type]))
+
+ def test_soft_validation_skips_invalid_entries(self):
+ import gpt.systemds
+
+ items = gpt.systemds.read_systemds(self._path('Systemds_invalid.xml'))
+ # good + bad-dep (kept with filtered deps); invalid path values are skipped
+ self.assertEqual(len(items), 2)
+ self.assertEqual(items[0].unit, 'good.service')
+ self.assertEqual(items[1].unit, 'bad3.service')
+ self.assertEqual(items[1].file_dependencies, [])
+ units = {item.unit for item in items}
+ self.assertNotIn('../../tmp/evil.service', units)
+ self.assertNotIn('safe.service', units)
+
+ def test_merge_systemds(self):
+ import gpt.systemds
+
+ storage = _storage_stub()
+ items = gpt.systemds.read_systemds(self._path('Systemds.xml'))
+ gpt.systemds.merge_systemds(storage, items, 'policy-test')
+ self.assertEqual(len(storage.items), len(items))
+ self.assertEqual(storage.items[0].policy_name, 'policy-test')
+
+ def test_read_systemds_preserves_quotes_via_unit_file_b64(self):
+ import gpt.systemds
+
+ unit_file_text = "[Service]\nExecStart=/bin/bash -c \"echo 'ok'\"\n"
+ xml_content = """
+
+
+
+ {}
+
+
+
+""".format(unit_file_text)
+
+ with tempfile.NamedTemporaryFile('w', encoding='utf-8', suffix='.xml', delete=False) as file_obj:
+ file_obj.write(xml_content)
+ tmp_path = file_obj.name
+
+ try:
+ items = gpt.systemds.read_systemds(tmp_path)
+ finally:
+ os.unlink(tmp_path)
+
+ self.assertEqual(len(items), 1)
+ restored = base64.b64decode(items[0].unit_file_b64.encode('ascii')).decode('utf-8')
+ self.assertEqual(restored, unit_file_text)
+
+ def test_read_systemds_rejects_invalid_dependency_path(self):
+ import gpt.systemds
+
+ xml_content = """
+
+
+
+
+
+
+
+
+
+"""
+ with tempfile.NamedTemporaryFile('w', encoding='utf-8', suffix='.xml', delete=False) as file_obj:
+ file_obj.write(xml_content)
+ tmp_path = file_obj.name
+
+ try:
+ items = gpt.systemds.read_systemds(tmp_path)
+ finally:
+ os.unlink(tmp_path)
+
+ self.assertEqual(len(items), 1)
+ self.assertEqual(items[0].file_dependencies, [])
+
+ def test_read_systemds_rejects_oversized_unit_file(self):
+ import gpt.systemds
+
+ unit_file_text = "A" * (gpt.systemds.MAX_UNIT_FILE_SIZE + 1)
+ xml_content = """
+
+
+
+ {}
+
+
+
+""".format(unit_file_text)
+ with tempfile.NamedTemporaryFile('w', encoding='utf-8', suffix='.xml', delete=False) as file_obj:
+ file_obj.write(xml_content)
+ tmp_path = file_obj.name
+
+ try:
+ items = gpt.systemds.read_systemds(tmp_path)
+ finally:
+ os.unlink(tmp_path)
+
+ self.assertEqual(items, [])
+
+ def test_gpt_discovery_supports_windows_systemd_layout(self):
+ gpt_helpers = _load_gpt_discovery_helpers()
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ machine_dir = os.path.join(tmpdir, 'MACHINE')
+ valid_dir = os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMD')
+ os.makedirs(valid_dir, exist_ok=True)
+ valid_file = os.path.join(valid_dir, 'SYSTEMD.XML')
+ with open(valid_file, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('')
+
+ invalid_paths = [
+ os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMDS', 'SYSTEMDS.XML'),
+ os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMDS', 'SYSTEMD.XML'),
+ os.path.join(machine_dir, 'PREFERENCES', 'SYSTEMD', 'SYSTEMDS.XML'),
+ ]
+ for path in invalid_paths:
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with open(path, 'w', encoding='utf-8') as file_obj:
+ file_obj.write('')
+
+ found = gpt_helpers.find_preffile(machine_dir, 'systemd')
+ self.assertEqual(found, valid_file)
+ self.assertEqual(gpt_helpers.get_preftype(valid_file), gpt_helpers.FileType.SYSTEMDS)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/gpoa/test/storage/test_systemds_storage.py b/gpoa/test/storage/test_systemds_storage.py
new file mode 100644
index 00000000..25e3915b
--- /dev/null
+++ b/gpoa/test/storage/test_systemds_storage.py
@@ -0,0 +1,129 @@
+#
+# GPOA - GPO Applier for Linux
+#
+# Copyright (C) 2026 BaseALT Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import unittest
+import ast
+import base64
+
+from gpt.systemds import systemd_policy
+
+
+class SystemdsStorageTestCase(unittest.TestCase):
+ def setUp(self):
+ try:
+ from storage.dconf_registry import Dconf_registry
+ except Exception as exc:
+ self.skipTest('storage.dconf_registry is unavailable: {}'.format(exc))
+ self.Dconf_registry = Dconf_registry
+ self.add_preferences_to_global_registry_dict = __import__(
+ 'storage.dconf_registry', fromlist=['add_preferences_to_global_registry_dict']
+ ).add_preferences_to_global_registry_dict
+
+ self._saved_registry = self.Dconf_registry.global_registry_dict
+ self._saved_systemds = self.Dconf_registry.systemds
+
+ self.Dconf_registry.global_registry_dict = {self.Dconf_registry._GpoPriority: {}}
+ self.Dconf_registry.systemds = []
+
+ def tearDown(self):
+ if hasattr(self, 'Dconf_registry'):
+ self.Dconf_registry.global_registry_dict = self._saved_registry
+ self.Dconf_registry.systemds = self._saved_systemds
+
+ def test_add_get_and_serialize_systemds(self):
+ item = systemd_policy('sshd.service')
+ item.uid = 'uid-1'
+ item.clsid = 'clsid-1'
+ item.name = 'sshd'
+ item.state = 'enable'
+ item.policy_target = 'machine'
+ item.apply_mode = 'always'
+ item.edit_mode = 'override'
+ item.dropin_name = '50-gpo.conf'
+ item.file_dependencies = []
+ self.Dconf_registry.add_systemd(item, 'Policy')
+
+ self.assertEqual(len(self.Dconf_registry.get_systemds()), 1)
+ self.add_preferences_to_global_registry_dict('Machine', True)
+
+ prefix = 'Software/BaseALT/Policies/Preferences/Machine'
+ data = self.Dconf_registry.global_registry_dict[prefix]['Systemds']
+ self.assertIn('sshd.service', data)
+ self.assertIn('uid-1', data)
+
+ def test_remove_duplicates_supports_nested_lists(self):
+ item = systemd_policy('nginx.service')
+ item.uid = 'uid-2'
+ item.clsid = 'clsid-2'
+ item.name = 'nginx'
+ item.state = 'enable'
+ item.policy_target = 'machine'
+ item.apply_mode = 'if_exists'
+ item.edit_mode = 'override'
+ item.dropin_name = '50-gpo.conf'
+ item.file_dependencies = [{'mode': 'changed', 'path': '/etc/nginx/nginx.conf'}]
+
+ duplicate = systemd_policy('nginx.service')
+ duplicate.uid = 'uid-2'
+ duplicate.clsid = 'clsid-2'
+ duplicate.name = 'nginx'
+ duplicate.state = 'enable'
+ duplicate.policy_target = 'machine'
+ duplicate.apply_mode = 'if_exists'
+ duplicate.edit_mode = 'override'
+ duplicate.dropin_name = '50-gpo.conf'
+ duplicate.file_dependencies = [{'mode': 'changed', 'path': '/etc/nginx/nginx.conf'}]
+
+ self.Dconf_registry.add_systemd(item, 'Policy')
+ self.Dconf_registry.add_systemd(duplicate, 'Policy')
+
+ self.add_preferences_to_global_registry_dict('Machine', True)
+
+ prefix = 'Software/BaseALT/Policies/Preferences/Machine'
+ data = self.Dconf_registry.global_registry_dict[prefix]['Systemds']
+ self.assertIn('nginx.service', data)
+ self.assertEqual(data.count('uid-2'), 1)
+
+ def test_serialize_preserves_unit_file_b64_payload(self):
+ unit_file_text = "[Service]\nExecStart=/bin/bash -c \"echo 'ok'\"\n"
+ item = systemd_policy('quoted.service')
+ item.uid = 'uid-3'
+ item.clsid = 'clsid-3'
+ item.name = 'quoted'
+ item.state = 'as_is'
+ item.policy_target = 'machine'
+ item.apply_mode = 'always'
+ item.edit_mode = 'override'
+ item.dropin_name = '50-gpo.conf'
+ item.file_dependencies = []
+ item.unit_file_b64 = base64.b64encode(unit_file_text.encode('utf-8')).decode('ascii')
+
+ self.Dconf_registry.add_systemd(item, 'Policy')
+ self.add_preferences_to_global_registry_dict('Machine', True)
+
+ prefix = 'Software/BaseALT/Policies/Preferences/Machine'
+ data = self.Dconf_registry.global_registry_dict[prefix]['Systemds']
+ parsed = ast.literal_eval(data)
+ encoded = parsed[0].get('unit_file_b64')
+ self.assertIsNotNone(encoded)
+ restored = base64.b64decode(encoded.encode('ascii')).decode('utf-8')
+ self.assertEqual(restored, unit_file_text)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/gpoa/util/util.py b/gpoa/util/util.py
index e8fb5ab4..712b880c 100644
--- a/gpoa/util/util.py
+++ b/gpoa/util/util.py
@@ -190,7 +190,7 @@ def get_policy_variants():
def string_to_literal_eval(string):
try:
literaleval = ast.literal_eval(string)
- except:
+ except (ValueError, SyntaxError):
literaleval = string
return literaleval
diff --git a/gpupdate.spec b/gpupdate.spec
index 1cbe7fc4..3755baf2 100644
--- a/gpupdate.spec
+++ b/gpupdate.spec
@@ -2,8 +2,14 @@
#add_python3_self_prov_path %buildroot%python3_sitelibdir/gpoa
%add_python3_req_skip applaer.systemd
+%add_python3_req_skip frontend.appliers.systemd
+%add_python3_req_skip frontend.change_journal
+%add_python3_req_skip frontend.systemd_applier
+%add_python3_req_skip frontend.systemd_preferences_applier
%add_python3_req_skip backend
%add_python3_req_skip frontend.frontend_manager
+%add_python3_req_skip gpt.systemds
+%add_python3_req_skip gpt.systemds_constants
%add_python3_req_skip gpt.envvars
%add_python3_req_skip gpt.folders
%add_python3_req_skip gpt.gpt
@@ -39,7 +45,7 @@
%add_python3_req_skip frontend.appliers.ini_file
Name: gpupdate
-Version: 0.14.2
+Version: 0.14.3
Release: alt1
Summary: GPT applier
@@ -211,6 +217,10 @@ fi
%exclude %python3_sitelibdir/gpoa/test
%changelog
+* Sun Apr 12 2026 Korney Gedert 0.14.3-alt1
+- Added Systemds preferences applier with file-dependency restart support
+- Added change_journal module for dependency snapshot tracking
+
* Thu Feb 26 2026 Danila Skachedubov 0.14.2-alt1
- Fix username resolution for trusted domain users