Skip to content

Enabling Distributed Processing#61

Open
markmac99 wants to merge 229 commits into
wmpg:masterfrom
markmac99:distrib_processing
Open

Enabling Distributed Processing#61
markmac99 wants to merge 229 commits into
wmpg:masterfrom
markmac99:distrib_processing

Conversation

@markmac99

@markmac99 markmac99 commented Feb 14, 2026

Copy link
Copy Markdown
Contributor

An upgrade to the RMS solver, CorrelateRMS, to enable distributed processing across multiple servers. To assist with this the first step has been split into two, one to create candidates and one to perform initial simple solutions.

This PR also replaces the JSON database with SQLite which is necessary to enable distributed processing and also brings performance benefits. The PR adds two new commandline arguments, one to control how much data to retain in the databases, and one to post-fix the log name with the phase name eg correlate_rms_20260101_12345_cands.log. This is to ensure each phase's logfile is uniquely named and to make it easier to monitor and debug.

@markmac99 markmac99 marked this pull request as ready for review June 2, 2026 17:21
@dvida

dvida commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Brilliant work! I'm excited to try it. Here is what I run with Codex CLI found, could you possibly take a peek a there and see if they have any merit?


I reviewed this PR in detail and found several issues that should be fixed before merge. The biggest risks are in the new remote/distributed processing paths, where transfer failures can currently be treated as success.

1. Remote upload retry helper returns success after all retries fail

wmpl/Utils/remoteDataHandling.py, RemoteDataHandler.putWithRetry()

Current code:

def putWithRetry(self, local_name, remname):
    for i in range(10):
        try:
            self.sftp_client.put(local_name, remname)
            break
        except Exception:
            time.sleep(1)
    if i == 10:
        log.warning(f'upload of {local_name} failed after 10 retries')
        return False
    return True

Because range(10) ends with i == 9, i == 10 is never true. If all ten upload attempts fail, this function still returns True.

That is dangerous because callers then move or delete local results as if upload succeeded, for example phase1 files and trajectory folders in uploadToMaster().

Suggested fix:

def putWithRetry(self, local_name, remname):
    for attempt in range(10):
        try:
            self.sftp_client.put(local_name, remname)
            return True
        except Exception as e:
            log.warning(f'upload attempt {attempt + 1}/10 failed for {local_name}: {e}')
            time.sleep(1)

    log.warning(f'upload of {local_name} failed after 10 retries')
    return False

2. Remote download can fail and still mark/remove the master copy

wmpl/Utils/remoteDataHandling.py, collectRemoteData()

Current code retries self.sftp_client.get(), but if all retries fail it still falls through to:

self.sftp_client.rename(fullname, f'{rem_dir}/processed/{trajfile}')

or even:

self.sftp_client.remove(fullname)

This can mark data as processed or delete it from the master even though the child did not successfully download it.

Suggested fix:

downloaded = False

for attempt in range(10):
    try:
        self.sftp_client.get(fullname, localname)
        downloaded = True
        break
    except Exception as e:
        log.warning(f'download attempt {attempt + 1}/10 failed for {fullname}: {e}')
        time.sleep(1)

if not downloaded:
    log.warning(f'download of {fullname} failed after 10 retries; leaving remote file in place')
    continue

try:
    self.sftp_client.rename(fullname, f'{rem_dir}/processed/{trajfile}')
except Exception:
    log.warning(f'unable to move {fullname} to processed after successful download')

I would avoid removing the remote file as a fallback unless there is a strong reason; keeping it is safer than losing work.

3. Child phase-2 mode crashes due to undefined mcmode

wmpl/Trajectory/CorrelateRMS.py, RMSDataHandle.getRemoteData()

Current code:

if self.mc_mode == MCMODE_PHASE1 or self.mc_mode == MCMODE_BOTH:
    status = self.RemoteDatahandler.collectRemoteData('candidates', self.output_dir, verbose=verbose)
elif mcmode == MCMODE_PHASE2:
    status = self.RemoteDatahandler.collectRemoteData('phase1', self.output_dir, verbose=verbose)

mcmode is not defined in this method. This will raise NameError when a child node starts in mode 2.

Suggested fix:

if self.mc_mode == MCMODE_PHASE1 or self.mc_mode == MCMODE_BOTH:
    status = self.RemoteDatahandler.collectRemoteData('candidates', self.output_dir, verbose=verbose)
elif self.mc_mode == MCMODE_PHASE2:
    status = self.RemoteDatahandler.collectRemoteData('phase1', self.output_dir, verbose=verbose)
else:
    status = False

4. Phase1-only child upload can crash when no trajectories directory exists

wmpl/Utils/remoteDataHandling.py, uploadToMaster()

traj_dir is assigned only inside this block:

if os.path.isdir(os.path.join(source_dir, 'trajectories')):
    traj_dir = f'{source_dir}/trajectories'
    ...

but later it is used unconditionally:

if success_flag:
    shutil.rmtree(traj_dir, ignore_errors=True)

A child that only produced phase1 files and databases can crash with UnboundLocalError.

Suggested fix:

traj_dir = os.path.join(source_dir, 'trajectories')

if os.path.isdir(traj_dir):
    for dirpath, dirnames, filenames in os.walk(traj_dir):
        ...
    if success_flag:
        shutil.rmtree(traj_dir, ignore_errors=True)

or at minimum:

if success_flag and os.path.isdir(os.path.join(source_dir, 'trajectories')):
    shutil.rmtree(os.path.join(source_dir, 'trajectories'), ignore_errors=True)

5. AutoGUI no longer handles solver failure gracefully

wmpl/Trajectory/AutoGUI.py

This PR removes the previous try/except and None guard around solveTrajectoryGeneric(). If the solver raises or returns None, the GUI now crashes or dereferences self.traj.output_dir.

Suggested fix: restore the previous guard.

try:
    self.traj = solveTrajectoryGeneric(
        jdt_ref, meteor_list, self.dir_path, solver=kwargs.solver,
        max_toffset=max_toffset, monte_carlo=False, save_results=False,
        geometric_uncert=kwargs.uncertgeom,
        gravity_correction=(not kwargs.disablegravity),
        gravity_factor=kwargs.gfact,
        plot_all_spatial_residuals=False,
        plot_file_type=kwargs.imgformat,
        show_plots=False,
        v_init_part=kwargs.velpart,
        v_init_ht=vinitht,
        show_jacchia=kwargs.jacchia,
        estimate_timing_vel=(False if kwargs.notimefit is None else kwargs.notimefit),
        fixed_times=kwargs.fixedtimes,
        mc_noise_std=kwargs.mcstd
    )
except Exception as e:
    print(f"Trajectory solution raised exception: {e}")
    self.worker.update_status.emit('red', f"Trajectory solution failed: {e}")
    return False

if self.traj is None:
    print("Trajectory solution failed (solver returned None).")
    self.worker.update_status.emit('red', "Trajectory solution failed!")
    return False

6. CandidateDatabase.getCandidate() reads the wrong column

wmpl/Trajectory/CorrelateDB.py

The candidates table is:

CREATE TABLE candidates(cand_id VARCHAR UNIQUE, ref_dt REAL, obs_ids VARCHAR, status INTEGER)

but getCandidate() does:

obs_ids = json.loads(rw[1])

rw[1] is ref_dt, not obs_ids.

Suggested fix:

obs_ids = json.loads(rw[2])

or better, select only the needed column:

cur = self.dbhandle.execute(
    "SELECT obs_ids FROM candidates WHERE cand_id=? and status=1",
    (cand_id,)
)
rw = cur.fetchone()
if rw is not None:
    obs_ids = json.loads(rw[0])

Additional recommendation

Several SQLite queries are built with f-strings. Most inputs are internally generated, but using parameterized queries would make this much safer and less brittle, especially for observation IDs, candidate IDs, and file paths.

Example:

cur = self.dbhandle.execute(
    "SELECT obs_id FROM paired_obs WHERE obs_id=? and status=1",
    (obs_id,)
)

Minimal tests I would add

  • putWithRetry() returns False after repeated SFTP failures and does not move/delete local files.
  • collectRemoteData() leaves remote files in place when download fails.
  • child mode 2 initialization calls collectRemoteData('phase1', ...) without raising NameError.
  • uploadToMaster() succeeds when only phase1/ and database files exist, with no trajectories/ directory.
  • AutoGUI handles solveTrajectoryGeneric() returning None.
  • CandidateDatabase.getCandidate() returns the saved observation IDs.

@markmac99

Copy link
Copy Markdown
Contributor Author

Thanks Denis,will take a fuller look at a couple of them but

1 is a bug, should be if i==9. Will fix it.

2 If we've failed ten times to download a file, then its extremely unlikely we'll be able to rename or delete it. I concede its possible, but ten failures in a row probably indicates some serious network issues. I'll review options.

3 is indeed a bug, though i suspect the condition is unreachable. Will fix it.

4 Saving a phase 1 solution creates both the phase1 version AND the trajectories folder tree and so the trajectories folder must exist if there's a phase1 solution. So this situation could only arise if a set of candidates resulted in zero solutions. This is possible of course though unlikely. I think the underlying problem here is that lines 314-319 should be intented one more tab.

5 - this is interesting as i have not touched this code. I'm guessing some change was made post my forking the master branch. I'll check it out and integrate any subsequent changes.

6 - this function is actually unused, so although the bug is real, it has no impact! i'll fix it or remove the unused function.

@markmac99

Copy link
Copy Markdown
Contributor Author

Done some work to close points 1,2,3, and 4, and I've fixed the unused function mentioned in 6 above. I'll run some tests today and advise.

Issue 5 is a false alarm as this PR doesn't introduce any changes to Trajectory/AutoGUI.py.

I can see however that a change was made a week ago to this file to add the try/except code mentioned in the Codex comment. These changes should not be overwritten by this PR.

@dvida

dvida commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

Agreed that issue 5 is a false alarm, I fixed this recently. Not sure why it picked it up. Let me know once things are ready to rock and roll.

@markmac99 markmac99 marked this pull request as draft June 3, 2026 14:39
@markmac99

Copy link
Copy Markdown
Contributor Author

Ha. During testing i have remembered why i was removing the remote copy even if the download failed....

When a child node downloads a file from the server, it then attempts to move it to the 'processed' folder on the server. This is to avoid reprocessing it again on the next pass. If the downloaded file can't be moved to the processed folder it is because a copy ALREADY exists in the "processed" folder, ie it was already processed. So, it should indeed be deleted from the server as otherwise the next pass will again download and process it, and so on ad infinitum.

On the other hand if i can't download the file in the first place. its virtually certain to be because of a network glitch. If after 10s of retries it still can't be downloaded, then we do probably want to leave it, though the rename / delete would also almost certainly fail.

I'll update the code to remove the file from the server if the move/rename operation fails.

@markmac99

Copy link
Copy Markdown
Contributor Author

okay, all done and tested. During testing i spotted and corrected a couple of other small issues, and improved logging so its possible to more easily track which trajectories got assigned to child nodes.

@markmac99 markmac99 marked this pull request as ready for review June 5, 2026 21:45
@markmac99

markmac99 commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

I think this is all good to go, pace the changes in documentation you asked for however i found one interesting problem.

When a monte-carlo solution is run on a phase1 solution, sometimes the reference timestamp changes by a few milliseconds.

When run on a single machine, the software can detect this because each improved solution has the original trajectory path stored in the pre_mc_longname field and can therefore remove the old folder.

However, if phase2 solutions are being distributed to child nodes, each node has its own copy of the files. The software will correctly delete the "old" copy on the child node, but when the data are uploaded to the server, the server simply moves the uploaded files to the trajectories folder. It does not check if the solution is an improvement to an existing solution and so sometimes, we can end up with two solutions on disk.

Logic already exists to delete these when doing the MC phase on the master node, so i just need to extend this to run when merging in data from child nodes.

If we prefer to wait till its resolved i am fine with that. Should only take a few days.

Meanwhile the workaround is simple - don't distribute the MCPhase to child nodes :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants