Skip to content

Commit 20c95e0

Browse files
committed
tests unitaires
1 parent e89b8bf commit 20c95e0

3 files changed

Lines changed: 207 additions & 46 deletions

File tree

src/bin/lifecycle.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ def main():
4141
specific_name = before + '_' + after if before and after else None
4242
except (json.JSONDecodeError, Exception):
4343
specific_name = None
44-
44+
if u.is_backend_concerned(data) == False:
45+
print(u.returncode(0, "not concerned"))
46+
exit(0)
4547
script = None
4648
if specific_name:
4749
script = find_script(lifecycle_dir, [specific_name])

src/lib/backend_utils.py

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ def returncode(code,message):
4141
data={}
4242
data['status']=code
4343
data['message']=message
44+
4445
return json.dumps(data)
4546

4647
def is_backend_concerned(entity):
4748
br=config('branchAttr')
48-
entry=make_entry_array(entity);
49+
entry=make_entry_array(entity)
4950
if config('branchAttr') in entry:
5051
peopleType=entry[config('branchAttr')]
5152
else:
@@ -59,41 +60,8 @@ def is_backend_concerned(entity):
5960
for v in peopleType:
6061
if v in listBackend :
6162
return True
62-
else:
63-
if peopleType in listBackend:
64-
return True
65-
66-
return False
67-
68-
def find_key(element, key):
69-
'''
70-
Check if *keys (nested) exists in `element` (dict).
71-
'''
72-
r=_finditem(element,key)
73-
if r is None:
74-
return ""
75-
else:
76-
return r
77-
78-
def _finditem(obj, key):
79-
if key in obj: return obj[key]
80-
for k, v in obj.items():
81-
if isinstance(v,dict):
82-
item = _finditem(v, key)
83-
if item is not None:
84-
return item
85-
86-
def first_non_empty_value(value):
87-
if isinstance(value, list):
88-
for item in value:
89-
s = str(item).strip()
90-
if s != "":
91-
return s
92-
return ""
93-
s = str(value).strip()
94-
return s if s != "" else ""
9563

96-
def make_entry_array(entity):
64+
def make_entry_array(entity,key='before'):
9765
data = {}
9866
if "identity" in entity['payload']:
9967
objectclasses = entity['payload']['identity']['identity']['additionalFields']['objectClasses']
@@ -103,7 +71,17 @@ def make_entry_array(entity):
10371
additionalFields = entity['payload']['identity']['identity']['additionalFields']['attributes']
10472
else:
10573
additionalFields = {}
106-
74+
elif key in entity['payload']:
75+
# cas cycle de vie
76+
objectclasses = entity['payload'][key]['additionalFields']['objectClasses']
77+
inetOrgPerson = entity['payload'][key]['inetOrgPerson']
78+
# ajout lifecyle car en dehors du tableau
79+
inetOrgPerson['lifecycle'] = entity['payload'][key]['lifecycle']
80+
addFieldsDict = entity['payload'][key]['additionalFields']
81+
if 'attributes' in addFieldsDict:
82+
additionalFields = entity['payload'][key]['additionalFields']['attributes']
83+
else:
84+
additionalFields = {}
10785
else:
10886
objectclasses = entity['payload']['additionalFields']['objectClasses']
10987
inetOrgPerson = entity['payload']['inetOrgPerson']
@@ -123,17 +101,35 @@ def make_entry_array(entity):
123101
if type(v) is int:
124102
v = str(v)
125103
data[k] = v
126-
127-
# employeeNumber is usually SINGLE-VALUE in LDAP schema.
128-
# Prefer primaryEmployeeNumber when provided by upstream payload.
129-
if 'employeeNumber' in data:
130-
primary_employee_number = first_non_empty_value(find_key(entity, 'primaryEmployeeNumber'))
131-
if primary_employee_number != "":
132-
data['employeeNumber'] = primary_employee_number
133-
else:
134-
data['employeeNumber'] = first_non_empty_value(data['employeeNumber'])
135104
return data
136105

106+
def find_key(element, key):
107+
'''
108+
Check if *keys (nested) exists in `element` (dict).
109+
'''
110+
r=_finditem(element,key)
111+
if r is None:
112+
return ""
113+
else:
114+
return r
115+
116+
def _finditem(obj, key):
117+
if key in obj: return obj[key]
118+
for k, v in obj.items():
119+
if isinstance(v,dict):
120+
item = _finditem(v, key)
121+
if item is not None:
122+
return item
123+
124+
def first_non_empty_value(value):
125+
if isinstance(value, list):
126+
for item in value:
127+
s = str(item).strip()
128+
if s != "":
129+
return s
130+
return ""
131+
s = str(value).strip()
132+
return s if s != "" else ""
137133

138134
def make_objectclass(entity,entry):
139135
data = {}

unittest/lifecycleTest.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import importlib.util
2+
import json
3+
import os
4+
import shutil
5+
import subprocess
6+
import sys
7+
import tempfile
8+
import unittest
9+
10+
__DIR__ = os.path.dirname(os.path.abspath(__file__))
11+
__PYTHONENV__ = '/../.venv/bin/python'
12+
13+
# Preload src/lib so backend_utils is resolvable when importing lifecycle module
14+
sys.path.insert(0, os.path.join(__DIR__, '..', 'src', 'lib'))
15+
16+
_spec = importlib.util.spec_from_file_location(
17+
'lifecycle_bin',
18+
os.path.join(__DIR__, '..', 'src', 'bin', 'lifecycle.py'),
19+
)
20+
_lc = importlib.util.module_from_spec(_spec)
21+
_spec.loader.exec_module(_lc)
22+
23+
24+
class TestFindScript(unittest.TestCase):
25+
26+
def setUp(self):
27+
self.tmpdir = tempfile.mkdtemp()
28+
29+
def tearDown(self):
30+
shutil.rmtree(self.tmpdir)
31+
32+
def _touch(self, name):
33+
path = os.path.join(self.tmpdir, name)
34+
open(path, 'w').close()
35+
return path
36+
37+
def test_finds_py_extension(self):
38+
self._touch('lifecycle.py')
39+
self.assertEqual(
40+
_lc.find_script(self.tmpdir, ['lifecycle']),
41+
os.path.join(self.tmpdir, 'lifecycle.py'),
42+
)
43+
44+
def test_finds_sh_extension(self):
45+
self._touch('O_I.sh')
46+
self.assertEqual(
47+
_lc.find_script(self.tmpdir, ['O_I']),
48+
os.path.join(self.tmpdir, 'O_I.sh'),
49+
)
50+
51+
def test_finds_pl_extension(self):
52+
self._touch('lifecycle.pl')
53+
self.assertEqual(
54+
_lc.find_script(self.tmpdir, ['lifecycle']),
55+
os.path.join(self.tmpdir, 'lifecycle.pl'),
56+
)
57+
58+
def test_finds_no_extension(self):
59+
self._touch('lifecycle')
60+
self.assertEqual(
61+
_lc.find_script(self.tmpdir, ['lifecycle']),
62+
os.path.join(self.tmpdir, 'lifecycle'),
63+
)
64+
65+
def test_specific_name_preferred_over_fallback(self):
66+
self._touch('O_I.py')
67+
self._touch('lifecycle.py')
68+
self.assertEqual(
69+
_lc.find_script(self.tmpdir, ['O_I', 'lifecycle']),
70+
os.path.join(self.tmpdir, 'O_I.py'),
71+
)
72+
73+
def test_falls_back_to_second_name(self):
74+
self._touch('lifecycle.py')
75+
self.assertEqual(
76+
_lc.find_script(self.tmpdir, ['O_I', 'lifecycle']),
77+
os.path.join(self.tmpdir, 'lifecycle.py'),
78+
)
79+
80+
def test_returns_none_when_not_found(self):
81+
self.assertIsNone(_lc.find_script(self.tmpdir, ['nonexistent']))
82+
83+
def test_returns_none_on_empty_dir(self):
84+
self.assertIsNone(_lc.find_script(self.tmpdir, ['lifecycle']))
85+
86+
def test_returns_none_on_empty_names(self):
87+
self._touch('lifecycle.py')
88+
self.assertIsNone(_lc.find_script(self.tmpdir, []))
89+
90+
91+
class TestRunBackend(unittest.TestCase):
92+
93+
def setUp(self):
94+
self._scripts = []
95+
96+
def tearDown(self):
97+
for s in self._scripts:
98+
try:
99+
os.unlink(s)
100+
except OSError:
101+
pass
102+
103+
def _make_script(self, body):
104+
fd, path = tempfile.mkstemp(suffix='.py', prefix='lc_test_')
105+
os.write(fd, ('#!/usr/bin/python3\n' + body).encode())
106+
os.close(fd)
107+
os.chmod(path, 0o755)
108+
self._scripts.append(path)
109+
return path
110+
111+
def test_returncode_zero(self):
112+
script = self._make_script('print("ok")\n')
113+
self.assertEqual(_lc.run_backend(script, '')['returncode'], 0)
114+
115+
def test_returncode_nonzero(self):
116+
script = self._make_script('import sys\nsys.exit(3)\n')
117+
self.assertEqual(_lc.run_backend(script, '')['returncode'], 3)
118+
119+
def test_content_passed_via_stdin(self):
120+
script = self._make_script('import sys\nprint(sys.stdin.read().strip())\n')
121+
ret = _lc.run_backend(script, 'ping')
122+
self.assertEqual(ret['returncode'], 0)
123+
self.assertIn('ping', ret['stdout'])
124+
125+
def test_stdout_captured_as_string(self):
126+
script = self._make_script('print("hello lifecycle")\n')
127+
ret = _lc.run_backend(script, '')
128+
self.assertIsInstance(ret['stdout'], str)
129+
self.assertIn('hello lifecycle', ret['stdout'])
130+
131+
132+
class TestLifecycleBin(unittest.TestCase):
133+
"""Integration tests: invoke lifecycle.py as a subprocess."""
134+
135+
def run_backend(self, script, file):
136+
dir_ = os.getcwd()
137+
exe = dir_ + __PYTHONENV__
138+
with open(file, 'r') as fic:
139+
content = fic.read().replace('\n', '')
140+
os.chdir('../src/bin')
141+
ret = subprocess.run([exe, script], input=content.encode(), capture_output=True)
142+
os.chdir(dir_)
143+
return {'returncode': ret.returncode, 'stdout': ret.stdout.decode()}
144+
145+
def test_lifecycle_concerned(self):
146+
"""lifecycle.json: O→I transition, concerned backend → lifecycle script runs."""
147+
ret = self.run_backend('lifecycle.py', './files_ad_utils/lifecycle.json')
148+
self.assertEqual(ret['returncode'], 0)
149+
result = json.loads(ret['stdout'])
150+
self.assertEqual(result['status'], 0)
151+
self.assertEqual(result['message'], 'lifecycle.py')
152+
153+
def test_lifecycle_notconcerned(self):
154+
"""lifecycle-notconcerned.json: backend not concerned → not concerned."""
155+
ret = self.run_backend('lifecycle.py', './files_ad_utils/lifecycle-notconcerned.json')
156+
self.assertEqual(ret['returncode'], 0)
157+
result = json.loads(ret['stdout'])
158+
self.assertEqual(result['status'], 0)
159+
self.assertEqual(result['message'], 'not concerned')
160+
161+
162+
if __name__ == '__main__':
163+
unittest.main()

0 commit comments

Comments
 (0)