Enabling Distributed Processing#61
Conversation
remove observations from json db and move to sqlite start adding support for distributed processing
add support to archive old records
|
Brilliant work! I'm excited to try it. Here is what I run with Codex CLI found, could you possibly take a peek a there and see if they have any merit? I reviewed this PR in detail and found several issues that should be fixed before merge. The biggest risks are in the new remote/distributed processing paths, where transfer failures can currently be treated as success. 1. Remote upload retry helper returns success after all retries fail
Current code: def putWithRetry(self, local_name, remname):
for i in range(10):
try:
self.sftp_client.put(local_name, remname)
break
except Exception:
time.sleep(1)
if i == 10:
log.warning(f'upload of {local_name} failed after 10 retries')
return False
return TrueBecause That is dangerous because callers then move or delete local results as if upload succeeded, for example phase1 files and trajectory folders in Suggested fix: def putWithRetry(self, local_name, remname):
for attempt in range(10):
try:
self.sftp_client.put(local_name, remname)
return True
except Exception as e:
log.warning(f'upload attempt {attempt + 1}/10 failed for {local_name}: {e}')
time.sleep(1)
log.warning(f'upload of {local_name} failed after 10 retries')
return False2. Remote download can fail and still mark/remove the master copy
Current code retries self.sftp_client.rename(fullname, f'{rem_dir}/processed/{trajfile}')or even: self.sftp_client.remove(fullname)This can mark data as processed or delete it from the master even though the child did not successfully download it. Suggested fix: downloaded = False
for attempt in range(10):
try:
self.sftp_client.get(fullname, localname)
downloaded = True
break
except Exception as e:
log.warning(f'download attempt {attempt + 1}/10 failed for {fullname}: {e}')
time.sleep(1)
if not downloaded:
log.warning(f'download of {fullname} failed after 10 retries; leaving remote file in place')
continue
try:
self.sftp_client.rename(fullname, f'{rem_dir}/processed/{trajfile}')
except Exception:
log.warning(f'unable to move {fullname} to processed after successful download')I would avoid removing the remote file as a fallback unless there is a strong reason; keeping it is safer than losing work. 3. Child phase-2 mode crashes due to undefined
|
|
Thanks Denis,will take a fuller look at a couple of them but 1 is a bug, should be if i==9. Will fix it. 2 If we've failed ten times to download a file, then its extremely unlikely we'll be able to rename or delete it. I concede its possible, but ten failures in a row probably indicates some serious network issues. I'll review options. 3 is indeed a bug, though i suspect the condition is unreachable. Will fix it. 4 Saving a phase 1 solution creates both the phase1 version AND the trajectories folder tree and so the trajectories folder must exist if there's a phase1 solution. So this situation could only arise if a set of candidates resulted in zero solutions. This is possible of course though unlikely. I think the underlying problem here is that lines 314-319 should be intented one more tab. 5 - this is interesting as i have not touched this code. I'm guessing some change was made post my forking the master branch. I'll check it out and integrate any subsequent changes. 6 - this function is actually unused, so although the bug is real, it has no impact! i'll fix it or remove the unused function. |
…ed the function to make its purpose clearer.
|
Done some work to close points 1,2,3, and 4, and I've fixed the unused function mentioned in 6 above. I'll run some tests today and advise. Issue 5 is a false alarm as this PR doesn't introduce any changes to I can see however that a change was made a week ago to this file to add the try/except code mentioned in the Codex comment. These changes should not be overwritten by this PR. |
|
Agreed that issue 5 is a false alarm, I fixed this recently. Not sure why it picked it up. Let me know once things are ready to rock and roll. |
…global :) Fixed properly now
|
Ha. During testing i have remembered why i was removing the remote copy even if the download failed.... When a child node downloads a file from the server, it then attempts to move it to the 'processed' folder on the server. This is to avoid reprocessing it again on the next pass. If the downloaded file can't be moved to the processed folder it is because a copy ALREADY exists in the "processed" folder, ie it was already processed. So, it should indeed be deleted from the server as otherwise the next pass will again download and process it, and so on ad infinitum. On the other hand if i can't download the file in the first place. its virtually certain to be because of a network glitch. If after 10s of retries it still can't be downloaded, then we do probably want to leave it, though the rename / delete would also almost certainly fail. I'll update the code to remove the file from the server if the move/rename operation fails. |
|
okay, all done and tested. During testing i spotted and corrected a couple of other small issues, and improved logging so its possible to more easily track which trajectories got assigned to child nodes. |
merge upstream fixes
|
I think this is all good to go, pace the changes in documentation you asked for however i found one interesting problem. When a monte-carlo solution is run on a phase1 solution, sometimes the reference timestamp changes by a few milliseconds. When run on a single machine, the software can detect this because each improved solution has the original trajectory path stored in the However, if phase2 solutions are being distributed to child nodes, each node has its own copy of the files. The software will correctly delete the "old" copy on the child node, but when the data are uploaded to the server, the server simply moves the uploaded files to the trajectories folder. It does not check if the solution is an improvement to an existing solution and so sometimes, we can end up with two solutions on disk. Logic already exists to delete these when doing the MC phase on the master node, so i just need to extend this to run when merging in data from child nodes. If we prefer to wait till its resolved i am fine with that. Should only take a few days. Meanwhile the workaround is simple - don't distribute the MCPhase to child nodes :) |
An upgrade to the RMS solver, CorrelateRMS, to enable distributed processing across multiple servers. To assist with this the first step has been split into two, one to create candidates and one to perform initial simple solutions.
This PR also replaces the JSON database with SQLite which is necessary to enable distributed processing and also brings performance benefits. The PR adds two new commandline arguments, one to control how much data to retain in the databases, and one to post-fix the log name with the phase name eg correlate_rms_20260101_12345_cands.log. This is to ensure each phase's logfile is uniquely named and to make it easier to monitor and debug.