From 7818ddfa1a90261847b8fd77bab60fcec39bc75a Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Sat, 23 May 2026 12:26:39 +0100 Subject: [PATCH 01/31] Initial DAG for LINC+delay --- flocs_processing/dags/linc.py | 246 ++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 flocs_processing/dags/linc.py diff --git a/flocs_processing/dags/linc.py b/flocs_processing/dags/linc.py new file mode 100644 index 0000000..264cf70 --- /dev/null +++ b/flocs_processing/dags/linc.py @@ -0,0 +1,246 @@ +from enum import Enum +import functools +import os +import pathlib +import sqlite3 +import subprocess + +from airflow.exceptions import AirflowFailException, AirflowSkipException +from airflow.sdk import dag, task +from airflow.task.trigger_rule import TriggerRule + +# Need to replace this with a config file +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" + + +@functools.total_ordering +class PIPELINE_STATUS(Enum): + nothing = 0 + downloaded = 1 + finished = 2 + running = 3 + processing = 98 + error = 99 + + def __eq__(self, other): + if other.__class__ is int: + return self.value == other + elif other.__class__ is self.__class__: + return self.value == other.value + else: + raise NotImplementedError + + def __lt__(self, other): + if self.__class__ is not other.__class__: + raise NotImplementedError + return self.value < other.value + + +def get_db_columns(): + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = "target_name,priority,finished,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where finished==0 order by priority" + ).fetchall() + print(field) + return field + + +def set_status_processing(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.processing.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_finished(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.finished.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_final_calibrator(name, target, final_cal): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set sas_id_calibrator_final={final_cal} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def get_most_recent_run(searchpath: str, sas_id: str) -> pathlib.Path: + rundirs = pathlib.Path(searchpath) + rundirs_sorted = sorted(rundirs.iterdir(), key=os.path.getctime) + rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] + rundir_final = rundirs_sorted_filtered[-1].absolute() + return rundir_final + + +@dag +def linc(): + @task + def get_unprocessed_target(): + field = dict(get_db_columns()[0]) + print(field["target_name"]) + return field + + @task.short_circuit + def check_fields(): + fields = get_db_columns() + return bool(fields) + + @task + def run_linc_calibrator1(field): + if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator1"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator1']}" + set_status_processing( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + outdir = os.path.join( + "/snap8/scratch/do011/dc-swei1/airflow/output", field["target_name"] + ) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir /snap8/scratch/do011/dc-swei1/airflow/rundir --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", + "w", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_finished( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_linc_calibrator2(field): + raise AirflowSkipException() + + @task(trigger_rule=TriggerRule.ONE_DONE) + def select_best_calibrator(result1, result2): + if result1 and result2: + print("Selecting between cal1 and cal2") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif result1 and (not result2): + print("Only cal 1 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif (not result1) and result2: + print("Only cal 2 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator2"], + ) + return result2 + else: + raise AirflowFailException("No calibrators succeeded; stopping processing.") + + @task + def run_linc_target(field): + if (field["status_target"] == PIPELINE_STATUS.finished) or ( + field["status_target"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join( + "/snap8/scratch/do011/dc-swei1/airflow/output", field["target_name"] + ) + calibrator_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"] + ) + calibrator_solutions = ( + calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" + ) + set_status_processing( + field["target_name"], "target", field["sas_id_target"] + ) + cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir /snap8/scratch/do011/dc-swei1/airflow/rundir --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", + "w", + ) as f_out, open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + return True + set_status_finished( + field["target_name"], "target", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def validate_linc_target(field): + return True + + @task + def run_vlbi_delay(field): + return True + + @task + def run_ddf_pipeline(field): + return True + + proceed = check_fields() + field = get_unprocessed_target() + result_cal1 = run_linc_calibrator1(field) + result_cal2 = run_linc_calibrator2(field) + best_cal = select_best_calibrator(result_cal1, result_cal2) + result_targ = run_linc_target(best_cal) + linc_is_valid = validate_linc_target(result_targ) + result_vlbi_delay = run_vlbi_delay(linc_is_valid) + # run_ddf_pipeline(vlbi_delay_is_valid) + + proceed >> field + + +linc() From da28933b772839630453e7489e7b2e1e2fd5a34a Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Thu, 28 May 2026 15:18:51 +0100 Subject: [PATCH 02/31] Add download step --- flocs_processing/dags/linc.py | 132 ++++++++++++++++++++++++++- flocs_processing/flocs_processing.py | 32 +++++-- 2 files changed, 153 insertions(+), 11 deletions(-) diff --git a/flocs_processing/dags/linc.py b/flocs_processing/dags/linc.py index 264cf70..bd56493 100644 --- a/flocs_processing/dags/linc.py +++ b/flocs_processing/dags/linc.py @@ -4,10 +4,13 @@ import pathlib import sqlite3 import subprocess +import time from airflow.exceptions import AirflowFailException, AirflowSkipException from airflow.sdk import dag, task from airflow.task.trigger_rule import TriggerRule +from flocs_lta.lta_search import ObservationStager +from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file TABLE_NAME = "" @@ -44,7 +47,7 @@ def get_db_columns(): with sqlite3.connect(DATABASE) as db: db.row_factory = sqlite3.Row cursor = db.cursor() - columns = "target_name,priority,finished,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay" + columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay" field = cursor.execute( f"select {columns} from {TABLE_NAME} where finished==0 order by priority" ).fetchall() @@ -68,6 +71,14 @@ def set_status_finished(name, identifier, target): ) +def set_status_downloaded(name, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + ) + + def set_final_calibrator(name, target, final_cal): with sqlite3.connect(DATABASE) as db: cursor = db.cursor() @@ -97,6 +108,120 @@ def check_fields(): fields = get_db_columns() return bool(fields) + @task + def download_field(field): + if field["downloaded"]: + return field + else: + has_cal1 = False + stage_calibrators = False + if field["sas_id_calibrator1"]: + ms_folder = f"L{field['sas_id_calibrator1']}" + cal1_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal1_full_path): + has_cal1 = True + else: + stage_calibrators = True + + has_cal2 = False + if field["sas_id_calibrator2"]: + ms_folder = f"L{field['sas_id_calibrator2']}" + cal2_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal2_full_path): + has_cal2 = True + else: + stage_calibrators = True + if field["sas_id_target"]: + ms_folder = f"L{field['sas_id_target']}" + target_full_path = os.path.join( + DATA_DIR, field["target_name"], "target", ms_folder + ) + if os.path.exists(target_full_path): + stage_target = False + else: + stage_target = True + else: + raise AirflowFailException( + f"No target SAS ID in database for field {field['target_name']}" + ) + + print(f"Field {field['sas_id_target']} is not downloaded.") + stager = ObservationStager(get_surls=True) + stager.find_observation_by_sasid( + "ALL", + field["sas_id_target"], + None, + 120e6, + 168e6, + ) + if stage_calibrators: + stager.find_nearest_calibrators(2, 120e6, 168e6) + stage_id_calibrators = stager.stage_calibrators() + if stage_target: + stage_id_target = stager.stage_target() + + calibrator_staged = False + target_staged = False + calibrator_downloaded = has_cal1 or has_cal2 + target_downloaded = not stage_target + while True: + if len(get_surls_online(stage_id_calibrators)) == len( + get_surls_requested(stage_id_calibrators) + ): + calibrator_staged = True + if calibrator_staged and not calibrator_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "calibrator") + cmd = ( + f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" + ) + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + calibrator_downloaded = True + else: + raise RuntimeError + + if len(get_surls_online(stage_id_target)) == len( + get_surls_requested(stage_id_target) + ): + calibrator_staged = True + if target_staged and not target_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "target") + cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_downloaded( + field["target_name"], + field["sas_id_target"], + ) + target_downloaded = True + else: + raise RuntimeError + if calibrator_downloaded and target_downloaded: + break + time.sleep(60) + @task def run_linc_calibrator1(field): if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( @@ -231,7 +356,8 @@ def run_ddf_pipeline(field): return True proceed = check_fields() - field = get_unprocessed_target() + get_field = get_unprocessed_target() + field = download_field(get_field) result_cal1 = run_linc_calibrator1(field) result_cal2 = run_linc_calibrator2(field) best_cal = select_best_calibrator(result_cal1, result_cal2) @@ -240,7 +366,7 @@ def run_ddf_pipeline(field): result_vlbi_delay = run_vlbi_delay(linc_is_valid) # run_ddf_pipeline(vlbi_delay_is_valid) - proceed >> field + proceed >> get_field linc() diff --git a/flocs_processing/flocs_processing.py b/flocs_processing/flocs_processing.py index f23ef2e..517d3f2 100644 --- a/flocs_processing/flocs_processing.py +++ b/flocs_processing/flocs_processing.py @@ -98,18 +98,25 @@ def create_database( "linc" ], ): + pipeline_str = ",".join(pipelines) pipelines = list(map(str.lower, pipelines)) - dbstr = f"create table {table_name}(source_name text default NULL" + dbstr = f"create table {table_name}(target_name text default NULL, pipelines text default '{pipeline_str}', priority int default 0, finished bit default 0, downloaded bit default 0" if "linc" in pipelines: dbstr += ", sas_id_calibrator1 text default NULL, sas_id_calibrator2 text default NULL, sas_id_calibrator_final text default NULL, sas_id_target text primary key default NULL, status_calibrator1 smallint default 0, status_calibrator2 smallint default 0, status_target smallint default 0" if "ddf-pipeline" in pipelines: - dbstr += ", status_ddf smallint default 0" + dbstr += f", status_ddf smallint default {PIPELINE_STATUS.nothing.value}" if "vlbi-delay-widefield" in pipelines: - dbstr += ", status_ddf smallint default 0" - dbstr += ", status_delay smallint default 0" + dbstr += f", status_vlbi_delay smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_dd smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_intermediate_img smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_facet_subtract smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += ( + f", status_vlbi_facet_img smallint default {PIPELINE_STATUS.nothing.value}" + ) if "vlbi-delay-single-target" in pipelines: - dbstr += ", status_delay smallint default 0" + dbstr += f", status_vlbi_delay smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_dd smallint default {PIPELINE_STATUS.nothing.value}" dbstr += ");" cmd = ["sqlite3", dbname, dbstr] @@ -137,8 +144,11 @@ def add_field( table_name: Annotated[ str, Parameter(help="Database table that will be processed.") ] = "processing_flocs", + pipelines: Annotated[ + str, Parameter(help="Pipelines this field needs to be processed with.") + ] = "", ): - dbstr = f"insert into {table_name} (source_name" + dbstr = f"insert into {table_name} (target_name" if len(sas_id_calibrators) == 1: dbstr += ", sas_id_calibrator1" if len(sas_id_calibrators) == 2: @@ -316,7 +326,10 @@ def launch_vlbi_delay(self, field_name, sas_id, restart: bool = False): rundirs_sorted_filtered = [ d for d in rundirs_sorted - if ((sas_id in d.parts[-1]) and (d.parts[-1].startswith(("LINC_target")))) + if ( + (sas_id in d.parts[-1]) + and (d.parts[-1].startswith(("LINC_target"))) + ) ] # Last LINC target reduction for this source linc_target_dir = rundirs_sorted_filtered[-1] @@ -373,7 +386,10 @@ def launch_vlbi_delay(self, field_name, sas_id, restart: bool = False): rundirs_sorted_filtered = [ d for d in rundirs_sorted - if ((sas_id in d.parts[-1]) and (d.parts[-1].startswith(("LINC_target")))) + if ( + (sas_id in d.parts[-1]) + and (d.parts[-1].startswith(("LINC_target"))) + ) ] # Last LINC target reduction for this source linc_target_dir = rundirs_sorted_filtered[-1] From e3c5ae1c21a1a7778aac560ce71fc14909460130 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Thu, 28 May 2026 15:33:36 +0100 Subject: [PATCH 03/31] Fix small errors --- flocs_processing/dags/linc.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/flocs_processing/dags/linc.py b/flocs_processing/dags/linc.py index bd56493..1d89fa0 100644 --- a/flocs_processing/dags/linc.py +++ b/flocs_processing/dags/linc.py @@ -149,20 +149,23 @@ def download_field(field): f"No target SAS ID in database for field {field['target_name']}" ) - print(f"Field {field['sas_id_target']} is not downloaded.") - stager = ObservationStager(get_surls=True) - stager.find_observation_by_sasid( - "ALL", - field["sas_id_target"], - None, - 120e6, - 168e6, - ) - if stage_calibrators: - stager.find_nearest_calibrators(2, 120e6, 168e6) - stage_id_calibrators = stager.stage_calibrators() - if stage_target: - stage_id_target = stager.stage_target() + if stage_calibrators or stage_target: + print(f"Field {field['sas_id_target']} is not downloaded.") + stager = ObservationStager(get_surls=True) + stager.find_observation_by_sasid( + "ALL", + field["sas_id_target"], + None, + 120e6, + 168e6, + ) + if stage_calibrators: + stager.find_nearest_calibrators(2, 120e6, 168e6) + stage_id_calibrators = stager.stage_calibrators() + if stage_target: + stage_id_target = stager.stage_target() + else: + return field calibrator_staged = False target_staged = False From e9612d6cbea0a3d5ed640b60167bef4a553b4294 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Sun, 31 May 2026 13:56:13 +0100 Subject: [PATCH 04/31] delay cal, rundir and outdir --- flocs_processing/dags/linc.py | 67 ++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/flocs_processing/dags/linc.py b/flocs_processing/dags/linc.py index 1d89fa0..1238f17 100644 --- a/flocs_processing/dags/linc.py +++ b/flocs_processing/dags/linc.py @@ -18,6 +18,8 @@ SLURM_ACCOUNT = "" SLURM_QUEUE = "" DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" @functools.total_ordering @@ -87,10 +89,17 @@ def set_final_calibrator(name, target, final_cal): ) -def get_most_recent_run(searchpath: str, sas_id: str) -> pathlib.Path: +def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: rundirs = pathlib.Path(searchpath) rundirs_sorted = sorted(rundirs.iterdir(), key=os.path.getctime) - rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] + if pipeline: + rundirs_sorted_filtered = [ + d + for d in rundirs_sorted + if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) + ] + else: + rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] rundir_final = rundirs_sorted_filtered[-1].absolute() return rundir_final @@ -242,10 +251,8 @@ def run_linc_calibrator1(field): set_status_processing( field["target_name"], "calibrator1", field["sas_id_target"] ) - outdir = os.path.join( - "/snap8/scratch/do011/dc-swei1/airflow/output", field["target_name"] - ) - cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir /snap8/scratch/do011/dc-swei1/airflow/rundir --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) @@ -311,11 +318,9 @@ def run_linc_target(field): f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" ) ms_folder = f"L{field['sas_id_target']}" - outdir = os.path.join( - "/snap8/scratch/do011/dc-swei1/airflow/output", field["target_name"] - ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) calibrator_path = get_most_recent_run( - outdir, field["sas_id_calibrator_final"] + outdir, field["sas_id_calibrator_final"], "LINC_calibrator" ) calibrator_solutions = ( calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" @@ -323,7 +328,7 @@ def run_linc_target(field): set_status_processing( field["target_name"], "target", field["sas_id_target"] ) - cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir /snap8/scratch/do011/dc-swei1/airflow/rundir --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" + cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) @@ -352,7 +357,45 @@ def validate_linc_target(field): @task def run_vlbi_delay(field): - return True + if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_delay"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + set_status_processing( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w", + ) as f_out, open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + return True + set_status_finished( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + raise RuntimeError + return field @task def run_ddf_pipeline(field): From 5bd78a3b4355b0f8611044b69856b03d51850348 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Fri, 5 Jun 2026 16:06:35 +0100 Subject: [PATCH 05/31] dd cal dag entry --- flocs_processing/dags/vlbi_single_target.py | 471 ++++++++++++++++++++ 1 file changed, 471 insertions(+) create mode 100644 flocs_processing/dags/vlbi_single_target.py diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py new file mode 100644 index 0000000..d611a2c --- /dev/null +++ b/flocs_processing/dags/vlbi_single_target.py @@ -0,0 +1,471 @@ +from enum import Enum +import functools +import os +import pathlib +import sqlite3 +import subprocess +import time + +from airflow.exceptions import AirflowFailException, AirflowSkipException +from airflow.sdk import dag, task +from airflow.task.trigger_rule import TriggerRule +from flocs_lta.lta_search import ObservationStager +from stager_access import get_surls_requested, get_surls_online + +# Need to replace this with a config file +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" + + +@functools.total_ordering +class PIPELINE_STATUS(Enum): + nothing = 0 + downloaded = 1 + finished = 2 + running = 3 + processing = 98 + error = 99 + + def __eq__(self, other): + if other.__class__ is int: + return self.value == other + elif other.__class__ is self.__class__: + return self.value == other.value + else: + raise NotImplementedError + + def __lt__(self, other): + if self.__class__ is not other.__class__: + raise NotImplementedError + return self.value < other.value + + +def get_db_columns(): + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where finished==0 order by priority" + ).fetchall() + print(field) + return field + + +def set_status_processing(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.processing.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_finished(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.finished.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_downloaded(name, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_final_calibrator(name, target, final_cal): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set sas_id_calibrator_final={final_cal} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: + rundirs = pathlib.Path(searchpath) + rundirs_sorted = sorted(rundirs.iterdir(), key=os.path.getctime) + if pipeline: + rundirs_sorted_filtered = [ + d + for d in rundirs_sorted + if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) + ] + else: + rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] + rundir_final = rundirs_sorted_filtered[-1].absolute() + return rundir_final + + +@dag +def linc(): + @task + def get_unprocessed_target(): + field = dict(get_db_columns()[0]) + print(field["target_name"]) + return field + + @task.short_circuit + def check_fields(): + fields = get_db_columns() + return bool(fields) + + @task + def download_field(field): + if field["downloaded"]: + return field + else: + has_cal1 = False + stage_calibrators = False + if field["sas_id_calibrator1"]: + ms_folder = f"L{field['sas_id_calibrator1']}" + cal1_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal1_full_path): + has_cal1 = True + else: + stage_calibrators = True + + has_cal2 = False + if field["sas_id_calibrator2"]: + ms_folder = f"L{field['sas_id_calibrator2']}" + cal2_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal2_full_path): + has_cal2 = True + else: + stage_calibrators = True + if field["sas_id_target"]: + ms_folder = f"L{field['sas_id_target']}" + target_full_path = os.path.join( + DATA_DIR, field["target_name"], "target", ms_folder + ) + if os.path.exists(target_full_path): + stage_target = False + else: + stage_target = True + else: + raise AirflowFailException( + f"No target SAS ID in database for field {field['target_name']}" + ) + + if stage_calibrators or stage_target: + print(f"Field {field['sas_id_target']} is not downloaded.") + stager = ObservationStager(get_surls=True) + stager.find_observation_by_sasid( + "ALL", + field["sas_id_target"], + None, + 120e6, + 168e6, + ) + if stage_calibrators: + stager.find_nearest_calibrators(2, 120e6, 168e6) + stage_id_calibrators = stager.stage_calibrators() + if stage_target: + stage_id_target = stager.stage_target() + else: + return field + + calibrator_staged = False + target_staged = False + calibrator_downloaded = has_cal1 or has_cal2 + target_downloaded = not stage_target + while True: + if len(get_surls_online(stage_id_calibrators)) == len( + get_surls_requested(stage_id_calibrators) + ): + calibrator_staged = True + if calibrator_staged and not calibrator_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "calibrator") + cmd = ( + f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" + ) + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + calibrator_downloaded = True + else: + raise RuntimeError + + if len(get_surls_online(stage_id_target)) == len( + get_surls_requested(stage_id_target) + ): + calibrator_staged = True + if target_staged and not target_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "target") + cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_downloaded( + field["target_name"], + field["sas_id_target"], + ) + target_downloaded = True + else: + raise RuntimeError + if calibrator_downloaded and target_downloaded: + break + time.sleep(60) + + @task + def run_linc_calibrator1(field): + if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator1"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator1']}" + set_status_processing( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", + "w", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_finished( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_linc_calibrator2(field): + raise AirflowSkipException() + + @task(trigger_rule=TriggerRule.ONE_DONE) + def select_best_calibrator(result1, result2): + if result1 and result2: + print("Selecting between cal1 and cal2") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif result1 and (not result2): + print("Only cal 1 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif (not result1) and result2: + print("Only cal 2 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator2"], + ) + return result2 + else: + raise AirflowFailException("No calibrators succeeded; stopping processing.") + + @task + def run_linc_target(field): + if (field["status_target"] == PIPELINE_STATUS.finished) or ( + field["status_target"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + calibrator_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_calibrator" + ) + calibrator_solutions = ( + calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" + ) + set_status_processing( + field["target_name"], "target", field["sas_id_target"] + ) + cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", + "w", + ) as f_out, open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + return True + set_status_finished( + field["target_name"], "target", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def validate_linc_target(field): + return True + + @task + def run_vlbi_delay(field): + if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_delay"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + set_status_processing( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w", + ) as f_out, open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + return True + set_status_finished( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_vlbi_ddcal(field): + if (field["status_vlbi_dd"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_dd"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing ILT dd calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + + sols_path = get_most_recent_run( + outdir, field["sas_id_target"], "VLBI_delay" + ) + sols_path = sols_path / "results_VLBI_delay-calibration" / "results" + sols = sols_path.glob("merged_*_selfcalcycle???_linearfulljones*.h5") + + source_cat = os.path.join( + DATA_DIR, field["target_name"], "target", "vlbi_target.csv" + ) + + set_status_processing( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w", + ) as f_out, open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + return True + set_status_finished( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_ddf_pipeline(field): + return True + + proceed = check_fields() + get_field = get_unprocessed_target() + field = download_field(get_field) + result_cal1 = run_linc_calibrator1(field) + result_cal2 = run_linc_calibrator2(field) + best_cal = select_best_calibrator(result_cal1, result_cal2) + result_targ = run_linc_target(best_cal) + linc_is_valid = validate_linc_target(result_targ) + result_vlbi_delay = run_vlbi_delay(linc_is_valid) + result_vlbi_dd = run_vlbi_ddcal(result_vlbi_delay) + # run_ddf_pipeline(vlbi_delay_is_valid) + + proceed >> get_field + + +linc() From 9910c17b6c2ea96289346897ed9b7c07c92a23df Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Fri, 5 Jun 2026 16:15:43 +0100 Subject: [PATCH 06/31] Tweak dd cal --- flocs_processing/dags/vlbi_single_target.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index d611a2c..ca4958c 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -422,6 +422,8 @@ def run_vlbi_ddcal(field): source_cat = os.path.join( DATA_DIR, field["target_name"], "target", "vlbi_target.csv" ) + if not os.path.isfile(source_cat): + raise AirflowFailException(f"{source_cat} not found.") set_status_processing( field["target_name"], "vlbi_dd", field["sas_id_target"] From 834d888e936bf649f67b422549e4550f8f27fe18 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 9 Jun 2026 15:15:50 +0100 Subject: [PATCH 07/31] Return field from linc target validation --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index ca4958c..f07b743 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -353,7 +353,7 @@ def run_linc_target(field): @task def validate_linc_target(field): - return True + return field @task def run_vlbi_delay(field): From 0b698a582dffdbe1b0c5e0c028fb181aac5b9c07 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 9 Jun 2026 16:07:38 +0100 Subject: [PATCH 08/31] Get dd status column --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index f07b743..288e66c 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -49,7 +49,7 @@ def get_db_columns(): with sqlite3.connect(DATABASE) as db: db.row_factory = sqlite3.Row cursor = db.cursor() - columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay" + columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay,status_vlbi_dd" field = cursor.execute( f"select {columns} from {TABLE_NAME} where finished==0 order by priority" ).fetchall() From 125de55acc6659c1e92fffbddeee48d9ae6382a6 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 9 Jun 2026 16:15:31 +0100 Subject: [PATCH 09/31] Fix target retrieval in ddcal --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 288e66c..6a1704b 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -409,7 +409,7 @@ def run_vlbi_ddcal(field): ) outdir = os.path.join(OUTPUT_DIR, field["target_name"]) target_path = get_most_recent_run( - outdir, field["sas_id_calibrator_final"], "LINC_target" + outdir, field["sas_id_target"], "LINC_target" ) target_ms_path = target_path / "results_LINC_target" / "results" From abd97e8b289085ecb296159d5f071e3dc6437e39 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 9 Jun 2026 16:44:06 +0100 Subject: [PATCH 10/31] Set whole field to finished once ddcal finishes successfully --- flocs_processing/dags/vlbi_single_target.py | 30 ++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 6a1704b..11e21a7 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -13,13 +13,13 @@ from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file -TABLE_NAME = "" -DATABASE = "" -SLURM_ACCOUNT = "" -SLURM_QUEUE = "" -DATA_DIR = "" -OUTPUT_DIR = "" -PROCESSING_DIR = "" +TABLE_NAME = "processing_banados" +DATABASE = "/project/lofarvlbi/Data/fsweijen/banados-high-z/banados_airflow.sqlite" +SLURM_ACCOUNT = "lofarvlbi-fsweijen" +SLURM_QUEUE = "normal,long" +DATA_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" +OUTPUT_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" +PROCESSING_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z/processing" @functools.total_ordering @@ -81,6 +81,15 @@ def set_status_downloaded(name, target): ) +def set_field_finished(name, target): + # name = str(field_dict["target_name"]) + # target = str(field_dict["sas_id_target"]) + query = f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute(query) + + def set_final_calibrator(name, target, final_cal): with sqlite3.connect(DATABASE) as db: cursor = db.cursor() @@ -105,7 +114,7 @@ def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib. @dag -def linc(): +def single_target_vlbi(max_active_runs=1): @task def get_unprocessed_target(): field = dict(get_db_columns()[0]) @@ -412,12 +421,14 @@ def run_vlbi_ddcal(field): outdir, field["sas_id_target"], "LINC_target" ) target_ms_path = target_path / "results_LINC_target" / "results" + print(f"Using LINC target run: {target_path}") sols_path = get_most_recent_run( outdir, field["sas_id_target"], "VLBI_delay" ) sols_path = sols_path / "results_VLBI_delay-calibration" / "results" sols = sols_path.glob("merged_*_selfcalcycle???_linearfulljones*.h5") + print(f"Using PILOT delay calibration run: {sols_path}") source_cat = os.path.join( DATA_DIR, field["target_name"], "target", "vlbi_target.csv" @@ -447,6 +458,7 @@ def run_vlbi_ddcal(field): set_status_finished( field["target_name"], "vlbi_dd", field["sas_id_target"] ) + set_field_finished(field["target_name"], field["sas_id_target"]) else: raise RuntimeError return field @@ -470,4 +482,4 @@ def run_ddf_pipeline(field): proceed >> get_field -linc() +single_target_vlbi() From 245b360507dfb5f0b3f3c0d916995f64ac1f7f93 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 9 Jun 2026 16:49:13 +0100 Subject: [PATCH 11/31] One target list per field --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 11e21a7..e789230 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -431,7 +431,7 @@ def run_vlbi_ddcal(field): print(f"Using PILOT delay calibration run: {sols_path}") source_cat = os.path.join( - DATA_DIR, field["target_name"], "target", "vlbi_target.csv" + DATA_DIR, field["target_name"], "vlbi_target.csv" ) if not os.path.isfile(source_cat): raise AirflowFailException(f"{source_cat} not found.") From 137c4560257f2b04ec7f586dcfb26770959a9e6e Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 9 Jun 2026 16:55:16 +0100 Subject: [PATCH 12/31] Sort priority descending --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index e789230..437790e 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -51,7 +51,7 @@ def get_db_columns(): cursor = db.cursor() columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay,status_vlbi_dd" field = cursor.execute( - f"select {columns} from {TABLE_NAME} where finished==0 order by priority" + f"select {columns} from {TABLE_NAME} where finished==0 order by priority desc" ).fetchall() print(field) return field From f1966cab2ed2e4eece787f4adf38ef016fb496dc Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 10 Jun 2026 09:07:27 +0100 Subject: [PATCH 13/31] Empty explicit paths --- flocs_processing/dags/vlbi_single_target.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 437790e..6976cd0 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -13,13 +13,13 @@ from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file -TABLE_NAME = "processing_banados" -DATABASE = "/project/lofarvlbi/Data/fsweijen/banados-high-z/banados_airflow.sqlite" -SLURM_ACCOUNT = "lofarvlbi-fsweijen" -SLURM_QUEUE = "normal,long" -DATA_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" -OUTPUT_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" -PROCESSING_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z/processing" +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" @functools.total_ordering @@ -428,11 +428,9 @@ def run_vlbi_ddcal(field): ) sols_path = sols_path / "results_VLBI_delay-calibration" / "results" sols = sols_path.glob("merged_*_selfcalcycle???_linearfulljones*.h5") - print(f"Using PILOT delay calibration run: {sols_path}") + print(f"Using PILOT delay calibration solutions: {sols}") - source_cat = os.path.join( - DATA_DIR, field["target_name"], "vlbi_target.csv" - ) + source_cat = os.path.join(DATA_DIR, field["target_name"], "vlbi_target.csv") if not os.path.isfile(source_cat): raise AirflowFailException(f"{source_cat} not found.") From cc5421b241670d26913bde5bd1b10c185f548944 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 10 Jun 2026 09:14:24 +0100 Subject: [PATCH 14/31] Fix delay sols search path --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 6976cd0..72810cd 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -426,7 +426,7 @@ def run_vlbi_ddcal(field): sols_path = get_most_recent_run( outdir, field["sas_id_target"], "VLBI_delay" ) - sols_path = sols_path / "results_VLBI_delay-calibration" / "results" + sols_path = sols_path / "results_VLBI_delay-calibration" sols = sols_path.glob("merged_*_selfcalcycle???_linearfulljones*.h5") print(f"Using PILOT delay calibration solutions: {sols}") From b72eec108577a5b935e778fc34a808b97e9b1c8c Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 10 Jun 2026 09:20:43 +0100 Subject: [PATCH 15/31] Fix suffix search --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 72810cd..907d59b 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -437,7 +437,7 @@ def run_vlbi_ddcal(field): set_status_processing( field["target_name"], "vlbi_dd", field["sas_id_target"] ) - cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} {target_ms_path}" + cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} --ms-suffix .dp3concat {target_ms_path}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) From ee81117dc58529aaab1f2b7725c8ddf4be41f667 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 10 Jun 2026 15:01:18 +0100 Subject: [PATCH 16/31] Tweak most recent dir --- flocs_processing/dags/vlbi_single_target.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 907d59b..493ab5e 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -100,12 +100,12 @@ def set_final_calibrator(name, target, final_cal): def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: rundirs = pathlib.Path(searchpath) - rundirs_sorted = sorted(rundirs.iterdir(), key=os.path.getctime) + rundirs_sorted = sorted(rundirs.iterdir()) if pipeline: rundirs_sorted_filtered = [ d for d in rundirs_sorted - if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) + if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) and d.is_dir() ] else: rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] From e36d32734ead5d4b547e218cfdfb65b590eb2516 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Mon, 15 Jun 2026 13:55:06 +0100 Subject: [PATCH 17/31] Fix target search path in delay --- flocs_processing/dags/vlbi_single_target.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 493ab5e..31f967d 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -113,8 +113,8 @@ def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib. return rundir_final -@dag -def single_target_vlbi(max_active_runs=1): +@dag(max_active_runs=1) +def single_target_vlbi(): @task def get_unprocessed_target(): field = dict(get_db_columns()[0]) @@ -377,7 +377,7 @@ def run_vlbi_delay(field): ms_folder = f"L{field['sas_id_target']}" outdir = os.path.join(OUTPUT_DIR, field["target_name"]) target_path = get_most_recent_run( - outdir, field["sas_id_calibrator_final"], "LINC_target" + outdir, field["sas_id_target"], "LINC_target" ) target_ms_path = target_path / "results_LINC_target" / "results" set_status_processing( From 28037f255f0ca1f4b0dd230531e0835d7438f25a Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Mon, 15 Jun 2026 14:13:25 +0100 Subject: [PATCH 18/31] Fix suffix for delay calibration --- flocs_processing/dags/vlbi_single_target.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 31f967d..9831325 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -383,7 +383,7 @@ def run_vlbi_delay(field): set_status_processing( field["target_name"], "vlbi_delay", field["sas_id_target"] ) - cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {target_ms_path}" + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat {target_ms_path}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) From 16781d25d76f498fca987a04478ea67d932f54f3 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Mon, 15 Jun 2026 17:24:50 +0100 Subject: [PATCH 19/31] More power: restarts and bad nodes --- flocs_processing/dags/vlbi_single_target.py | 50 ++++++++++++++++++--- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 9831325..d60512e 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -1,13 +1,15 @@ from enum import Enum +import datetime import functools import os import pathlib +import re import sqlite3 import subprocess import time from airflow.exceptions import AirflowFailException, AirflowSkipException -from airflow.sdk import dag, task +from airflow.sdk import dag, get_current_context, task from airflow.task.trigger_rule import TriggerRule from flocs_lta.lta_search import ObservationStager from stager_access import get_surls_requested, get_surls_online @@ -20,6 +22,7 @@ DATA_DIR = "" OUTPUT_DIR = "" PROCESSING_DIR = "" +NN_MODEL_CACHE = "" @functools.total_ordering @@ -364,7 +367,7 @@ def run_linc_target(field): def validate_linc_target(field): return field - @task + @task(retries=2, retry_delay=datetime.timedelta(seconds=5)) def run_vlbi_delay(field): if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( field["status_vlbi_delay"] == PIPELINE_STATUS.running @@ -374,7 +377,6 @@ def run_vlbi_delay(field): print( f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" ) - ms_folder = f"L{field['sas_id_target']}" outdir = os.path.join(OUTPUT_DIR, field["target_name"]) target_path = get_most_recent_run( outdir, field["sas_id_target"], "LINC_target" @@ -383,26 +385,60 @@ def run_vlbi_delay(field): set_status_processing( field["target_name"], "vlbi_delay", field["sas_id_target"] ) - cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat {target_ms_path}" + + delay_cat = os.path.join(outdir, "delay_calibrators.csv") + + proc = subprocess.run("detect_bad_slurm_nodes.sh", shell=True, text=True) + bad_nodes = proc.stdout + os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" + + context = get_current_context() + if context["ti"].try_number == 1: + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} {target_ms_path}" + else: + flocs_workdir = context["ti"].xcom_pull( + task_ids=context["ti"].task_id, key="flocs_workdir" + ) + print(f"Resuming failed PILOT run in {flocs_workdir}") + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrators {delay_cat} {target_ms_path}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) with open( f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", - "w", + "w+", ) as f_out, open( f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w", + "w+", ) as f_err: proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: False") if not proc.returncode: - return True + if not pattern.search(proc.stderr): + success = True + + if success: set_status_finished( field["target_name"], "vlbi_delay", field["sas_id_target"] ) else: + f_out.seek(0) + flocs_workdir = "" + for line in f_out.readlines(): + print(line) + if "Running workflow with" in line: + flocs_workdir = line.split(" ")[-1] + break + if not flocs_workdir: + raise RuntimeError("Could not retrieve PILOT workdir. Flocs probably crashed before launching.") + if context["ti"].try_number == 1: + print(f"PILOT failed, storing rundir {flocs_workdir} for retries in `flocs_workdir`") + context["ti"].xcom_push( + key="flocs_workdir", value=flocs_workdir + ) raise RuntimeError return field From 2135952b0e4c7a7bbc7c396bd0025bf05ebad51d Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Mon, 15 Jun 2026 17:25:13 +0100 Subject: [PATCH 20/31] Remove outdated LINC dag --- flocs_processing/dags/linc.py | 418 ---------------------------------- 1 file changed, 418 deletions(-) delete mode 100644 flocs_processing/dags/linc.py diff --git a/flocs_processing/dags/linc.py b/flocs_processing/dags/linc.py deleted file mode 100644 index 1238f17..0000000 --- a/flocs_processing/dags/linc.py +++ /dev/null @@ -1,418 +0,0 @@ -from enum import Enum -import functools -import os -import pathlib -import sqlite3 -import subprocess -import time - -from airflow.exceptions import AirflowFailException, AirflowSkipException -from airflow.sdk import dag, task -from airflow.task.trigger_rule import TriggerRule -from flocs_lta.lta_search import ObservationStager -from stager_access import get_surls_requested, get_surls_online - -# Need to replace this with a config file -TABLE_NAME = "" -DATABASE = "" -SLURM_ACCOUNT = "" -SLURM_QUEUE = "" -DATA_DIR = "" -OUTPUT_DIR = "" -PROCESSING_DIR = "" - - -@functools.total_ordering -class PIPELINE_STATUS(Enum): - nothing = 0 - downloaded = 1 - finished = 2 - running = 3 - processing = 98 - error = 99 - - def __eq__(self, other): - if other.__class__ is int: - return self.value == other - elif other.__class__ is self.__class__: - return self.value == other.value - else: - raise NotImplementedError - - def __lt__(self, other): - if self.__class__ is not other.__class__: - raise NotImplementedError - return self.value < other.value - - -def get_db_columns(): - with sqlite3.connect(DATABASE) as db: - db.row_factory = sqlite3.Row - cursor = db.cursor() - columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay" - field = cursor.execute( - f"select {columns} from {TABLE_NAME} where finished==0 order by priority" - ).fetchall() - print(field) - return field - - -def set_status_processing(name, identifier, target): - with sqlite3.connect(DATABASE) as db: - cursor = db.cursor() - cursor.execute( - f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.processing.value} where target_name=='{name}' and sas_id_target=='{target}'" - ) - - -def set_status_finished(name, identifier, target): - with sqlite3.connect(DATABASE) as db: - cursor = db.cursor() - cursor.execute( - f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.finished.value} where target_name=='{name}' and sas_id_target=='{target}'" - ) - - -def set_status_downloaded(name, target): - with sqlite3.connect(DATABASE) as db: - cursor = db.cursor() - cursor.execute( - f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" - ) - - -def set_final_calibrator(name, target, final_cal): - with sqlite3.connect(DATABASE) as db: - cursor = db.cursor() - cursor.execute( - f"update {TABLE_NAME} set sas_id_calibrator_final={final_cal} where target_name=='{name}' and sas_id_target=='{target}'" - ) - - -def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: - rundirs = pathlib.Path(searchpath) - rundirs_sorted = sorted(rundirs.iterdir(), key=os.path.getctime) - if pipeline: - rundirs_sorted_filtered = [ - d - for d in rundirs_sorted - if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) - ] - else: - rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] - rundir_final = rundirs_sorted_filtered[-1].absolute() - return rundir_final - - -@dag -def linc(): - @task - def get_unprocessed_target(): - field = dict(get_db_columns()[0]) - print(field["target_name"]) - return field - - @task.short_circuit - def check_fields(): - fields = get_db_columns() - return bool(fields) - - @task - def download_field(field): - if field["downloaded"]: - return field - else: - has_cal1 = False - stage_calibrators = False - if field["sas_id_calibrator1"]: - ms_folder = f"L{field['sas_id_calibrator1']}" - cal1_full_path = os.path.join( - DATA_DIR, field["target_name"], "calibrator", ms_folder - ) - if os.path.exists(cal1_full_path): - has_cal1 = True - else: - stage_calibrators = True - - has_cal2 = False - if field["sas_id_calibrator2"]: - ms_folder = f"L{field['sas_id_calibrator2']}" - cal2_full_path = os.path.join( - DATA_DIR, field["target_name"], "calibrator", ms_folder - ) - if os.path.exists(cal2_full_path): - has_cal2 = True - else: - stage_calibrators = True - if field["sas_id_target"]: - ms_folder = f"L{field['sas_id_target']}" - target_full_path = os.path.join( - DATA_DIR, field["target_name"], "target", ms_folder - ) - if os.path.exists(target_full_path): - stage_target = False - else: - stage_target = True - else: - raise AirflowFailException( - f"No target SAS ID in database for field {field['target_name']}" - ) - - if stage_calibrators or stage_target: - print(f"Field {field['sas_id_target']} is not downloaded.") - stager = ObservationStager(get_surls=True) - stager.find_observation_by_sasid( - "ALL", - field["sas_id_target"], - None, - 120e6, - 168e6, - ) - if stage_calibrators: - stager.find_nearest_calibrators(2, 120e6, 168e6) - stage_id_calibrators = stager.stage_calibrators() - if stage_target: - stage_id_target = stager.stage_target() - else: - return field - - calibrator_staged = False - target_staged = False - calibrator_downloaded = has_cal1 or has_cal2 - target_downloaded = not stage_target - while True: - if len(get_surls_online(stage_id_calibrators)) == len( - get_surls_requested(stage_id_calibrators) - ): - calibrator_staged = True - if calibrator_staged and not calibrator_downloaded: - dl_path = os.path.join(DATA_DIR, field["target_name"], "calibrator") - cmd = ( - f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" - ) - with open( - f"log_download_calibrators_{field['target_name']}.txt", - "w", - ) as f_out, open( - f"log_download_calibrators_{field['target_name']}.txt", - "w", - ) as f_err: - proc = subprocess.run( - cmd, shell=True, text=True, stdout=f_out, stderr=f_err - ) - if not proc.returncode: - calibrator_downloaded = True - else: - raise RuntimeError - - if len(get_surls_online(stage_id_target)) == len( - get_surls_requested(stage_id_target) - ): - calibrator_staged = True - if target_staged and not target_downloaded: - dl_path = os.path.join(DATA_DIR, field["target_name"], "target") - cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" - with open( - f"log_download_calibrators_{field['target_name']}.txt", - "w", - ) as f_out, open( - f"log_download_calibrators_{field['target_name']}.txt", - "w", - ) as f_err: - proc = subprocess.run( - cmd, shell=True, text=True, stdout=f_out, stderr=f_err - ) - if not proc.returncode: - set_status_downloaded( - field["target_name"], - field["sas_id_target"], - ) - target_downloaded = True - else: - raise RuntimeError - if calibrator_downloaded and target_downloaded: - break - time.sleep(60) - - @task - def run_linc_calibrator1(field): - if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( - field["status_calibrator1"] == PIPELINE_STATUS.running - ): - print( - f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." - ) - return field - else: - print( - f"Processing flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']}" - ) - ms_folder = f"L{field['sas_id_calibrator1']}" - set_status_processing( - field["target_name"], "calibrator1", field["sas_id_target"] - ) - outdir = os.path.join(OUTPUT_DIR, field["target_name"]) - cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" - if not os.path.isdir(outdir): - os.mkdir(outdir) - print(cmd) - with open( - f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", - "w", - ) as f_out, open( - f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", - "w", - ) as f_err: - proc = subprocess.run( - cmd, shell=True, text=True, stdout=f_out, stderr=f_err - ) - if not proc.returncode: - set_status_finished( - field["target_name"], "calibrator1", field["sas_id_target"] - ) - else: - raise RuntimeError - return field - - @task - def run_linc_calibrator2(field): - raise AirflowSkipException() - - @task(trigger_rule=TriggerRule.ONE_DONE) - def select_best_calibrator(result1, result2): - if result1 and result2: - print("Selecting between cal1 and cal2") - set_final_calibrator( - result1["target_name"], - result1["sas_id_target"], - result1["sas_id_calibrator1"], - ) - return result1 - elif result1 and (not result2): - print("Only cal 1 succeeded, continuing with that") - set_final_calibrator( - result1["target_name"], - result1["sas_id_target"], - result1["sas_id_calibrator1"], - ) - return result1 - elif (not result1) and result2: - print("Only cal 2 succeeded, continuing with that") - set_final_calibrator( - result1["target_name"], - result1["sas_id_target"], - result1["sas_id_calibrator2"], - ) - return result2 - else: - raise AirflowFailException("No calibrators succeeded; stopping processing.") - - @task - def run_linc_target(field): - if (field["status_target"] == PIPELINE_STATUS.finished) or ( - field["status_target"] == PIPELINE_STATUS.running - ): - return field - else: - print( - f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" - ) - ms_folder = f"L{field['sas_id_target']}" - outdir = os.path.join(OUTPUT_DIR, field["target_name"]) - calibrator_path = get_most_recent_run( - outdir, field["sas_id_calibrator_final"], "LINC_calibrator" - ) - calibrator_solutions = ( - calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" - ) - set_status_processing( - field["target_name"], "target", field["sas_id_target"] - ) - cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" - if not os.path.isdir(outdir): - os.mkdir(outdir) - print(cmd) - with open( - f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", - "w", - ) as f_out, open( - f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w", - ) as f_err: - proc = subprocess.run( - cmd, shell=True, text=True, stdout=f_out, stderr=f_err - ) - if not proc.returncode: - return True - set_status_finished( - field["target_name"], "target", field["sas_id_target"] - ) - else: - raise RuntimeError - return field - - @task - def validate_linc_target(field): - return True - - @task - def run_vlbi_delay(field): - if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( - field["status_vlbi_delay"] == PIPELINE_STATUS.running - ): - return field - else: - print( - f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" - ) - ms_folder = f"L{field['sas_id_target']}" - outdir = os.path.join(OUTPUT_DIR, field["target_name"]) - target_path = get_most_recent_run( - outdir, field["sas_id_calibrator_final"], "LINC_target" - ) - target_ms_path = target_path / "results_LINC_target" / "results" - set_status_processing( - field["target_name"], "vlbi_delay", field["sas_id_target"] - ) - cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {target_ms_path}" - if not os.path.isdir(outdir): - os.mkdir(outdir) - print(cmd) - with open( - f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", - "w", - ) as f_out, open( - f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w", - ) as f_err: - proc = subprocess.run( - cmd, shell=True, text=True, stdout=f_out, stderr=f_err - ) - if not proc.returncode: - return True - set_status_finished( - field["target_name"], "vlbi_delay", field["sas_id_target"] - ) - else: - raise RuntimeError - return field - - @task - def run_ddf_pipeline(field): - return True - - proceed = check_fields() - get_field = get_unprocessed_target() - field = download_field(get_field) - result_cal1 = run_linc_calibrator1(field) - result_cal2 = run_linc_calibrator2(field) - best_cal = select_best_calibrator(result_cal1, result_cal2) - result_targ = run_linc_target(best_cal) - linc_is_valid = validate_linc_target(result_targ) - result_vlbi_delay = run_vlbi_delay(linc_is_valid) - # run_ddf_pipeline(vlbi_delay_is_valid) - - proceed >> get_field - - -linc() From e9741f49dd2b19db5f5831fbaf2dd2671eb1a00e Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 16 Jun 2026 09:32:28 +0100 Subject: [PATCH 21/31] Attempt to retrieve rundir on the fly --- flocs_processing/dags/vlbi_single_target.py | 55 ++++++++++----------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index d60512e..06840dd 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -15,14 +15,14 @@ from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file -TABLE_NAME = "" -DATABASE = "" -SLURM_ACCOUNT = "" -SLURM_QUEUE = "" -DATA_DIR = "" -OUTPUT_DIR = "" -PROCESSING_DIR = "" -NN_MODEL_CACHE = "" +TABLE_NAME = "processing_banados" +DATABASE = "/project/lofarvlbi/Data/fsweijen/banados-high-z/banados_airflow.sqlite" +SLURM_ACCOUNT = "lofarvlbi" +SLURM_QUEUE = "normal" +DATA_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" +OUTPUT_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" +PROCESSING_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z/processing" +NN_MODEL_CACHE = "/project/lofarvlbi/Software/fsweijen/nn_cache" @functools.total_ordering @@ -367,7 +367,7 @@ def run_linc_target(field): def validate_linc_target(field): return field - @task(retries=2, retry_delay=datetime.timedelta(seconds=5)) + @task(retries=0, retry_delay=datetime.timedelta(seconds=5)) def run_vlbi_delay(field): if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( field["status_vlbi_delay"] == PIPELINE_STATUS.running @@ -388,19 +388,28 @@ def run_vlbi_delay(field): delay_cat = os.path.join(outdir, "delay_calibrators.csv") - proc = subprocess.run("detect_bad_slurm_nodes.sh", shell=True, text=True) - bad_nodes = proc.stdout - os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" + proc = subprocess.run("detect_bad_slurm_nodes.sh", shell=True, text=True, stdout=subprocess.PIPE) + bad_nodes = proc.stdout.strip() + if bad_nodes: + os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" context = get_current_context() if context["ti"].try_number == 1: cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} {target_ms_path}" else: - flocs_workdir = context["ti"].xcom_pull( - task_ids=context["ti"].task_id, key="flocs_workdir" - ) + # Extract the previous working directory + flocs_workdir = "" + print(f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir.") + with open(f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt") as f_out: + for line in f_out.readlines(): + print(line) + if "Running workflow with" in line: + flocs_workdir = line.split(" ")[-1] + break + if not flocs_workdir: + raise RuntimeError("Could not retrieve PILOT workdir. Flocs probably crashed before launching.") print(f"Resuming failed PILOT run in {flocs_workdir}") - cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrators {delay_cat} {target_ms_path}" + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} {target_ms_path}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) @@ -425,20 +434,6 @@ def run_vlbi_delay(field): field["target_name"], "vlbi_delay", field["sas_id_target"] ) else: - f_out.seek(0) - flocs_workdir = "" - for line in f_out.readlines(): - print(line) - if "Running workflow with" in line: - flocs_workdir = line.split(" ")[-1] - break - if not flocs_workdir: - raise RuntimeError("Could not retrieve PILOT workdir. Flocs probably crashed before launching.") - if context["ti"].try_number == 1: - print(f"PILOT failed, storing rundir {flocs_workdir} for retries in `flocs_workdir`") - context["ti"].xcom_push( - key="flocs_workdir", value=flocs_workdir - ) raise RuntimeError return field From 0644f6eddd8623ee40634bd117dd351e2e79cbc3 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Tue, 16 Jun 2026 13:55:11 +0100 Subject: [PATCH 22/31] Small updates --- flocs_processing/dags/vlbi_single_target.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 06840dd..04948e6 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -220,7 +220,7 @@ def download_field(field): if len(get_surls_online(stage_id_target)) == len( get_surls_requested(stage_id_target) ): - calibrator_staged = True + target_staged = True if target_staged and not target_downloaded: dl_path = os.path.join(DATA_DIR, field["target_name"], "target") cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" @@ -388,7 +388,12 @@ def run_vlbi_delay(field): delay_cat = os.path.join(outdir, "delay_calibrators.csv") - proc = subprocess.run("detect_bad_slurm_nodes.sh", shell=True, text=True, stdout=subprocess.PIPE) + proc = subprocess.run( + "detect_bad_slurm_nodes.sh", + shell=True, + text=True, + stdout=subprocess.PIPE, + ) bad_nodes = proc.stdout.strip() if bad_nodes: os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" @@ -399,15 +404,21 @@ def run_vlbi_delay(field): else: # Extract the previous working directory flocs_workdir = "" - print(f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir.") - with open(f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt") as f_out: + print( + f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir." + ) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt" + ) as f_out: for line in f_out.readlines(): print(line) if "Running workflow with" in line: flocs_workdir = line.split(" ")[-1] break if not flocs_workdir: - raise RuntimeError("Could not retrieve PILOT workdir. Flocs probably crashed before launching.") + raise RuntimeError( + "Could not retrieve PILOT workdir. Flocs probably crashed before launching." + ) print(f"Resuming failed PILOT run in {flocs_workdir}") cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} {target_ms_path}" if not os.path.isdir(outdir): From 74dd7ef0b07b113bfbe04f33831500b15accf669 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Thu, 18 Jun 2026 13:47:32 +0100 Subject: [PATCH 23/31] Remove explicit paths --- flocs_processing/dags/vlbi_single_target.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 04948e6..4c645ef 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -15,14 +15,14 @@ from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file -TABLE_NAME = "processing_banados" -DATABASE = "/project/lofarvlbi/Data/fsweijen/banados-high-z/banados_airflow.sqlite" -SLURM_ACCOUNT = "lofarvlbi" -SLURM_QUEUE = "normal" -DATA_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" -OUTPUT_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" -PROCESSING_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z/processing" -NN_MODEL_CACHE = "/project/lofarvlbi/Software/fsweijen/nn_cache" +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" +NN_MODEL_CACHE = "" @functools.total_ordering From b761683906a90a59f8a6f90198e1e1c6aa879f79 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Sun, 21 Jun 2026 14:04:27 +0100 Subject: [PATCH 24/31] Fix dd cal and update success check --- flocs_processing/dags/vlbi_single_target.py | 49 ++++++++++----------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 4c645ef..97b8ff4 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -220,7 +220,7 @@ def download_field(field): if len(get_surls_online(stage_id_target)) == len( get_surls_requested(stage_id_target) ): - target_staged = True + calibrator_staged = True if target_staged and not target_downloaded: dl_path = os.path.join(DATA_DIR, field["target_name"], "target") cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" @@ -387,40 +387,31 @@ def run_vlbi_delay(field): ) delay_cat = os.path.join(outdir, "delay_calibrators.csv") + image_cat = os.path.join(outdir, "image_catalogue.csv") - proc = subprocess.run( - "detect_bad_slurm_nodes.sh", - shell=True, - text=True, - stdout=subprocess.PIPE, - ) + proc = subprocess.run("detect_bad_slurm_nodes.sh", shell=True, text=True, stdout=subprocess.PIPE) bad_nodes = proc.stdout.strip() if bad_nodes: + print(f"Excluding the following bad nodes from scheduling: {bad_nodes}") os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" context = get_current_context() if context["ti"].try_number == 1: - cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} {target_ms_path}" + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" else: # Extract the previous working directory flocs_workdir = "" - print( - f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir." - ) - with open( - f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt" - ) as f_out: + print(f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir.") + with open(f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt") as f_out: for line in f_out.readlines(): print(line) if "Running workflow with" in line: - flocs_workdir = line.split(" ")[-1] + flocs_workdir = line.split(" ")[-1].strip() break if not flocs_workdir: - raise RuntimeError( - "Could not retrieve PILOT workdir. Flocs probably crashed before launching." - ) + raise RuntimeError("Could not retrieve PILOT workdir. Flocs probably crashed before launching.") print(f"Resuming failed PILOT run in {flocs_workdir}") - cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} {target_ms_path}" + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) @@ -435,9 +426,10 @@ def run_vlbi_delay(field): cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) success = False - pattern = re.compile(r"Workflow.* stopped. Success: False") + pattern = re.compile(r"Workflow.* stopped. Success: True") if not proc.returncode: - if not pattern.search(proc.stderr): + f_err.seek(0) + if pattern.search(f_err.read()): success = True if success: @@ -469,7 +461,7 @@ def run_vlbi_ddcal(field): outdir, field["sas_id_target"], "VLBI_delay" ) sols_path = sols_path / "results_VLBI_delay-calibration" - sols = sols_path.glob("merged_*_selfcalcycle???_linearfulljones*.h5") + sols = list(sols_path.glob("merged*selfcalcycle???_linearfulljones*.h5"))[0] print(f"Using PILOT delay calibration solutions: {sols}") source_cat = os.path.join(DATA_DIR, field["target_name"], "vlbi_target.csv") @@ -479,22 +471,27 @@ def run_vlbi_ddcal(field): set_status_processing( field["target_name"], "vlbi_dd", field["sas_id_target"] ) - cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} --ms-suffix .dp3concat {target_ms_path}" + cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-time 24:00:00 --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} --model-cache {NN_MODEL_CACHE} --ms-suffix .dp3concat {target_ms_path}" if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) with open( f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", - "w", + "w+", ) as f_out, open( f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w", + "w+", ) as f_err: proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") if not proc.returncode: - return True + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: set_status_finished( field["target_name"], "vlbi_dd", field["sas_id_target"] ) From f59f56fc0dcfba536c313104e7e7a8cd38150113 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Sun, 21 Jun 2026 15:14:56 +0100 Subject: [PATCH 25/31] Add some documentation --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..af403d5 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# End-to-end processing of ILT HBA data with flocs + +This package aims to provide relatively simple end-to-end automatic processing of ILT HBA data. Where `flocs-runners` provides the interface to running pipelines, `flocs-processing` is the scaffolding to tie it together. Data reduction is coordinated via a dedicated SQLite database that holds information on which observations to process, which pipelines to run for them and all of the related statuses. Orchestration of all the pipelines is handled via Airflow through a DAG. + +## Database setup +A database for processing is created via `flocs-processing create-database`. This will create an empty database with the necessary columns. Datasets to process can be added via `flocs-processing add-field`. + +## Processing data +To start processing data, Airflow needs to be running. This will be delegated to `flocs-processing process-from-database` in the future, but for now requires running Airflow manually. For setup do the following: + +1. Set up a folder that wil contain all of Airflow's own stuff and assign it to the `AIRFLOW_HOME` environment variable. +2. Run `airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"` +3. Define `AIRFLOW__CORE__DAGS_FOLDER` as `${AIRFLOW_HOME}/dags` and create the folder. Copy the DAGs inside `flocs_processing/dags` to this folder. +4. Define `AIRFLOW__CORE__LOAD_EXAMPLES` as `False` + +Next start a screen or tmux session and run `airflow standalone` to start the Airflow instance. It will echo a user name and password on first start, but will also store the credentials in `${AIRFLOW_HOME}/simple_auth_manager_passwords.json.generated`. The Airflow instance will start on port 8080. You can access it via `localhost:8080` in your browser. If it is running on a remote cluster, you can set up a tunnel via e.g. `ssh -N -L 8080:localhost:8080 ` to forward it to your local machine. + +Once `flocs-processing` is complete the processing loop will be automatic, but for now the user must trigger the DAG manually. On the "Dags" tab you should now see the flocs DAGs available. To manually trigger one, click on the name and on the subsequent page use the "Trigger" button in the top right. From 062e8bc64078af4cd3a77ee65c36c41952c2fb2f Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Sun, 21 Jun 2026 17:03:52 +0100 Subject: [PATCH 26/31] Implement calibrator2 and homogenise success triggers --- flocs_processing/dags/vlbi_single_target.py | 106 +++++++++++++++----- 1 file changed, 81 insertions(+), 25 deletions(-) diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/vlbi_single_target.py index 97b8ff4..827c06d 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/vlbi_single_target.py @@ -15,14 +15,14 @@ from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file -TABLE_NAME = "" -DATABASE = "" -SLURM_ACCOUNT = "" -SLURM_QUEUE = "" -DATA_DIR = "" -OUTPUT_DIR = "" -PROCESSING_DIR = "" -NN_MODEL_CACHE = "" +TABLE_NAME = "processing_banados" +DATABASE = "/project/lofarvlbi/Data/fsweijen/banados-high-z/banados_airflow.sqlite" +SLURM_ACCOUNT = "lofarvlbi" +SLURM_QUEUE = "normal" +DATA_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" +OUTPUT_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" +PROCESSING_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z/processing" +NN_MODEL_CACHE = "/project/lofarvlbi/Software/fsweijen/nn_cache" @functools.total_ordering @@ -85,9 +85,7 @@ def set_status_downloaded(name, target): def set_field_finished(name, target): - # name = str(field_dict["target_name"]) - # target = str(field_dict["sas_id_target"]) - query = f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + query = f"update {TABLE_NAME} set finished=1 where target_name=='{name}' and sas_id_target=='{target}'" with sqlite3.connect(DATABASE) as db: cursor = db.cursor() cursor.execute(query) @@ -204,10 +202,10 @@ def download_field(field): ) with open( f"log_download_calibrators_{field['target_name']}.txt", - "w", + "w+", ) as f_out, open( f"log_download_calibrators_{field['target_name']}.txt", - "w", + "w+", ) as f_err: proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err @@ -226,10 +224,10 @@ def download_field(field): cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" with open( f"log_download_calibrators_{field['target_name']}.txt", - "w", + "w+", ) as f_out, open( f"log_download_calibrators_{field['target_name']}.txt", - "w", + "w+", ) as f_err: proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err @@ -270,15 +268,21 @@ def run_linc_calibrator1(field): print(cmd) with open( f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", - "w", + "w+", ) as f_out, open( f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", - "w", + "w+", ) as f_err: proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: set_status_finished( field["target_name"], "calibrator1", field["sas_id_target"] ) @@ -288,12 +292,59 @@ def run_linc_calibrator1(field): @task def run_linc_calibrator2(field): - raise AirflowSkipException() + if (field["status_calibrator2"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator2"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator2']}" + set_status_processing( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + else: + raise RuntimeError + return field @task(trigger_rule=TriggerRule.ONE_DONE) def select_best_calibrator(result1, result2): - if result1 and result2: + if result1["sas_id_calibrator_final"]: + return result1 + elif result2["sas_id_calibrator_final"]: + return result2 + elif result1 and result2: print("Selecting between cal1 and cal2") + # Need actual selection logic here set_final_calibrator( result1["target_name"], result1["sas_id_target"], @@ -311,9 +362,9 @@ def select_best_calibrator(result1, result2): elif (not result1) and result2: print("Only cal 2 succeeded, continuing with that") set_final_calibrator( - result1["target_name"], - result1["sas_id_target"], - result1["sas_id_calibrator2"], + result2["target_name"], + result2["sas_id_target"], + result2["sas_id_calibrator2"], ) return result2 else: @@ -346,16 +397,21 @@ def run_linc_target(field): print(cmd) with open( f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", - "w", + "w+", ) as f_out, open( f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w", + "w+", ) as f_err: proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") if not proc.returncode: - return True + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: set_status_finished( field["target_name"], "target", field["sas_id_target"] ) From 3e181cabea0ef129b53f3cf8acbafa4c574a9a0f Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 24 Jun 2026 14:54:03 +0100 Subject: [PATCH 27/31] Fix adding a field --- flocs_processing/flocs_processing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flocs_processing/flocs_processing.py b/flocs_processing/flocs_processing.py index 517d3f2..a603739 100644 --- a/flocs_processing/flocs_processing.py +++ b/flocs_processing/flocs_processing.py @@ -155,10 +155,10 @@ def add_field( dbstr += ", sas_id_calibrator1, sas_id_calibrator2" dbstr += f", sas_id_target) values ('{field_name}', " if len(sas_id_calibrators) == 1: - dbstr += f"{sas_id_calibrators[0]}" + dbstr += f"'{sas_id_calibrators[0]}'" if len(sas_id_calibrators) == 2: - dbstr += f"{sas_id_calibrators[0]}, {sas_id_calibrators[1]}, " - dbstr += f"{sas_id_target})" + dbstr += f"'{sas_id_calibrators[0]}', '{sas_id_calibrators[1]}', " + dbstr += f"'{sas_id_target}')" cmd = ["sqlite3", dbname, dbstr] print(f"Adding field {field_name} to {table_name} via: {" ".join(cmd)}") From f99d1928c5f0fa628a15f8d72aab9cab1d1af7b9 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 24 Jun 2026 15:38:38 +0100 Subject: [PATCH 28/31] Draft widefield dag --- ...ingle_target.py => pilot_single_target.py} | 48 +- flocs_processing/dags/pilot_widefield.py | 604 ++++++++++++++++++ 2 files changed, 633 insertions(+), 19 deletions(-) rename flocs_processing/dags/{vlbi_single_target.py => pilot_single_target.py} (95%) create mode 100644 flocs_processing/dags/pilot_widefield.py diff --git a/flocs_processing/dags/vlbi_single_target.py b/flocs_processing/dags/pilot_single_target.py similarity index 95% rename from flocs_processing/dags/vlbi_single_target.py rename to flocs_processing/dags/pilot_single_target.py index 827c06d..a3e045f 100644 --- a/flocs_processing/dags/vlbi_single_target.py +++ b/flocs_processing/dags/pilot_single_target.py @@ -15,14 +15,14 @@ from stager_access import get_surls_requested, get_surls_online # Need to replace this with a config file -TABLE_NAME = "processing_banados" -DATABASE = "/project/lofarvlbi/Data/fsweijen/banados-high-z/banados_airflow.sqlite" -SLURM_ACCOUNT = "lofarvlbi" -SLURM_QUEUE = "normal" -DATA_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" -OUTPUT_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z" -PROCESSING_DIR = "/project/lofarvlbi/Data/fsweijen/banados-high-z/processing" -NN_MODEL_CACHE = "/project/lofarvlbi/Software/fsweijen/nn_cache" +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" +NN_MODEL_CACHE = "" @functools.total_ordering @@ -115,7 +115,7 @@ def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib. @dag(max_active_runs=1) -def single_target_vlbi(): +def pilot_single_target(): @task def get_unprocessed_target(): field = dict(get_db_columns()[0]) @@ -445,7 +445,12 @@ def run_vlbi_delay(field): delay_cat = os.path.join(outdir, "delay_calibrators.csv") image_cat = os.path.join(outdir, "image_catalogue.csv") - proc = subprocess.run("detect_bad_slurm_nodes.sh", shell=True, text=True, stdout=subprocess.PIPE) + proc = subprocess.run( + "detect_bad_slurm_nodes.sh", + shell=True, + text=True, + stdout=subprocess.PIPE, + ) bad_nodes = proc.stdout.strip() if bad_nodes: print(f"Excluding the following bad nodes from scheduling: {bad_nodes}") @@ -457,15 +462,21 @@ def run_vlbi_delay(field): else: # Extract the previous working directory flocs_workdir = "" - print(f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir.") - with open(f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt") as f_out: + print( + f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir." + ) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt" + ) as f_out: for line in f_out.readlines(): print(line) if "Running workflow with" in line: flocs_workdir = line.split(" ")[-1].strip() break if not flocs_workdir: - raise RuntimeError("Could not retrieve PILOT workdir. Flocs probably crashed before launching.") + raise RuntimeError( + "Could not retrieve PILOT workdir. Flocs probably crashed before launching." + ) print(f"Resuming failed PILOT run in {flocs_workdir}") cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" if not os.path.isdir(outdir): @@ -496,6 +507,10 @@ def run_vlbi_delay(field): raise RuntimeError return field + @task + def run_ddf_subtract(field): + return field + @task def run_vlbi_ddcal(field): if (field["status_vlbi_dd"] == PIPELINE_STATUS.finished) or ( @@ -556,10 +571,6 @@ def run_vlbi_ddcal(field): raise RuntimeError return field - @task - def run_ddf_pipeline(field): - return True - proceed = check_fields() get_field = get_unprocessed_target() field = download_field(get_field) @@ -570,9 +581,8 @@ def run_ddf_pipeline(field): linc_is_valid = validate_linc_target(result_targ) result_vlbi_delay = run_vlbi_delay(linc_is_valid) result_vlbi_dd = run_vlbi_ddcal(result_vlbi_delay) - # run_ddf_pipeline(vlbi_delay_is_valid) proceed >> get_field -single_target_vlbi() +pilot_single_target() diff --git a/flocs_processing/dags/pilot_widefield.py b/flocs_processing/dags/pilot_widefield.py new file mode 100644 index 0000000..c735f5b --- /dev/null +++ b/flocs_processing/dags/pilot_widefield.py @@ -0,0 +1,604 @@ +from enum import Enum +import datetime +import functools +import os +import pathlib +import re +import sqlite3 +import subprocess +import time + +from airflow.exceptions import AirflowFailException, AirflowSkipException +from airflow.sdk import dag, get_current_context, task +from airflow.task.trigger_rule import TriggerRule +from flocs_lta.lta_search import ObservationStager +from stager_access import get_surls_requested, get_surls_online + +# Need to replace this with a config file +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" +NN_MODEL_CACHE = "" + + +@functools.total_ordering +class PIPELINE_STATUS(Enum): + nothing = 0 + downloaded = 1 + finished = 2 + running = 3 + processing = 98 + error = 99 + + def __eq__(self, other): + if other.__class__ is int: + return self.value == other + elif other.__class__ is self.__class__: + return self.value == other.value + else: + raise NotImplementedError + + def __lt__(self, other): + if self.__class__ is not other.__class__: + raise NotImplementedError + return self.value < other.value + + +def get_db_columns(): + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay,status_vlbi_dd" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where finished==0 order by priority desc" + ).fetchall() + print(field) + return field + + +def set_status_processing(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.processing.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_finished(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.finished.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_downloaded(name, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_field_finished(name, target): + query = f"update {TABLE_NAME} set finished=1 where target_name=='{name}' and sas_id_target=='{target}'" + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute(query) + + +def set_final_calibrator(name, target, final_cal): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set sas_id_calibrator_final={final_cal} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: + rundirs = pathlib.Path(searchpath) + rundirs_sorted = sorted(rundirs.iterdir()) + if pipeline: + rundirs_sorted_filtered = [ + d + for d in rundirs_sorted + if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) and d.is_dir() + ] + else: + rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] + rundir_final = rundirs_sorted_filtered[-1].absolute() + return rundir_final + + +@dag(max_active_runs=1) +def pilot_widefield(): + @task + def get_unprocessed_target(): + field = dict(get_db_columns()[0]) + print(field["target_name"]) + return field + + @task.short_circuit + def check_fields(): + fields = get_db_columns() + return bool(fields) + + @task + def download_field(field): + if field["downloaded"]: + return field + else: + has_cal1 = False + stage_calibrators = False + if field["sas_id_calibrator1"]: + ms_folder = f"L{field['sas_id_calibrator1']}" + cal1_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal1_full_path): + has_cal1 = True + else: + stage_calibrators = True + + has_cal2 = False + if field["sas_id_calibrator2"]: + ms_folder = f"L{field['sas_id_calibrator2']}" + cal2_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal2_full_path): + has_cal2 = True + else: + stage_calibrators = True + if field["sas_id_target"]: + ms_folder = f"L{field['sas_id_target']}" + target_full_path = os.path.join( + DATA_DIR, field["target_name"], "target", ms_folder + ) + if os.path.exists(target_full_path): + stage_target = False + else: + stage_target = True + else: + raise AirflowFailException( + f"No target SAS ID in database for field {field['target_name']}" + ) + + if stage_calibrators or stage_target: + print(f"Field {field['sas_id_target']} is not downloaded.") + stager = ObservationStager(get_surls=True) + stager.find_observation_by_sasid( + "ALL", + field["sas_id_target"], + None, + 120e6, + 168e6, + ) + if stage_calibrators: + stager.find_nearest_calibrators(2, 120e6, 168e6) + stage_id_calibrators = stager.stage_calibrators() + if stage_target: + stage_id_target = stager.stage_target() + else: + return field + + calibrator_staged = False + target_staged = False + calibrator_downloaded = has_cal1 or has_cal2 + target_downloaded = not stage_target + while True: + if len(get_surls_online(stage_id_calibrators)) == len( + get_surls_requested(stage_id_calibrators) + ): + calibrator_staged = True + if calibrator_staged and not calibrator_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "calibrator") + cmd = ( + f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" + ) + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + calibrator_downloaded = True + else: + raise RuntimeError + + if len(get_surls_online(stage_id_target)) == len( + get_surls_requested(stage_id_target) + ): + calibrator_staged = True + if target_staged and not target_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "target") + cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_downloaded( + field["target_name"], + field["sas_id_target"], + ) + target_downloaded = True + else: + raise RuntimeError + if calibrator_downloaded and target_downloaded: + break + time.sleep(60) + + @task + def run_linc_calibrator1(field): + if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator1"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator1']}" + set_status_processing( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_linc_calibrator2(field): + if (field["status_calibrator2"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator2"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator2']}" + set_status_processing( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task(trigger_rule=TriggerRule.ONE_DONE) + def select_best_calibrator(result1, result2): + if result1["sas_id_calibrator_final"]: + return result1 + elif result2["sas_id_calibrator_final"]: + return result2 + elif result1 and result2: + print("Selecting between cal1 and cal2") + # Need actual selection logic here + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif result1 and (not result2): + print("Only cal 1 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif (not result1) and result2: + print("Only cal 2 succeeded, continuing with that") + set_final_calibrator( + result2["target_name"], + result2["sas_id_target"], + result2["sas_id_calibrator2"], + ) + return result2 + else: + raise AirflowFailException("No calibrators succeeded; stopping processing.") + + @task + def run_linc_target(field): + if (field["status_target"] == PIPELINE_STATUS.finished) or ( + field["status_target"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + calibrator_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_calibrator" + ) + calibrator_solutions = ( + calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" + ) + set_status_processing( + field["target_name"], "target", field["sas_id_target"] + ) + cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "target", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def validate_linc_target(field): + return field + + @task(retries=0, retry_delay=datetime.timedelta(seconds=5)) + def run_vlbi_delay(field): + if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_delay"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + set_status_processing( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + + delay_cat = os.path.join(outdir, "delay_calibrators.csv") + image_cat = os.path.join(outdir, "image_catalogue.csv") + + proc = subprocess.run( + "detect_bad_slurm_nodes.sh", + shell=True, + text=True, + stdout=subprocess.PIPE, + ) + bad_nodes = proc.stdout.strip() + if bad_nodes: + print(f"Excluding the following bad nodes from scheduling: {bad_nodes}") + os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" + + context = get_current_context() + if context["ti"].try_number == 1: + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" + else: + # Extract the previous working directory + flocs_workdir = "" + print( + f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir." + ) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt" + ) as f_out: + for line in f_out.readlines(): + print(line) + if "Running workflow with" in line: + flocs_workdir = line.split(" ")[-1].strip() + break + if not flocs_workdir: + raise RuntimeError( + "Could not retrieve PILOT workdir. Flocs probably crashed before launching." + ) + print(f"Resuming failed PILOT run in {flocs_workdir}") + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + + if success: + set_status_finished( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_ddf_pipeline(field): + if (field["status_ddf"] == PIPELINE_STATUS.finished) or ( + field["status_ddf"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Starting ddf-pipeline for {field['target_name']} {field['sas_id_target']}" + ) + return field + + + @task + def run_ddf_subtract(field): + return field + + @task + def run_vlbi_ddcal(field): + if (field["status_vlbi_dd"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_dd"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing ILT dd calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + print(f"Using LINC target run: {target_path}") + + sols_path = get_most_recent_run( + outdir, field["sas_id_target"], "VLBI_delay" + ) + sols_path = sols_path / "results_VLBI_delay-calibration" + sols = list(sols_path.glob("merged*selfcalcycle???_linearfulljones*.h5"))[0] + print(f"Using PILOT delay calibration solutions: {sols}") + + source_cat = os.path.join(DATA_DIR, field["target_name"], "vlbi_target.csv") + if not os.path.isfile(source_cat): + raise AirflowFailException(f"{source_cat} not found.") + + set_status_processing( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-time 24:00:00 --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} --model-cache {NN_MODEL_CACHE} --ms-suffix .dp3concat {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + set_field_finished(field["target_name"], field["sas_id_target"]) + else: + raise RuntimeError + return field + + proceed = check_fields() + get_field = get_unprocessed_target() + field = download_field(get_field) + result_cal1 = run_linc_calibrator1(field) + result_cal2 = run_linc_calibrator2(field) + best_cal = select_best_calibrator(result_cal1, result_cal2) + result_targ = run_linc_target(best_cal) + linc_is_valid = validate_linc_target(result_targ) + result_vlbi_delay = run_vlbi_delay(linc_is_valid) + result_ddf = run_ddf_pipeline(result_vlbi_delay) + result_ddf_subtract = run_ddf_subtract(result_ddf) + result_vlbi_dd = run_vlbi_ddcal(result_vlbi_delay) + + proceed >> get_field + result_ddf >> result_ddf_subtract >> result_vlbi_dd + + +pilot_widefield() From 52a36d462e80bfe393268908c62495c267580ab0 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 24 Jun 2026 16:38:38 +0100 Subject: [PATCH 29/31] Update readme --- README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index af403d5..cd88973 100644 --- a/README.md +++ b/README.md @@ -8,10 +8,11 @@ A database for processing is created via `flocs-processing create-database`. Thi ## Processing data To start processing data, Airflow needs to be running. This will be delegated to `flocs-processing process-from-database` in the future, but for now requires running Airflow manually. For setup do the following: -1. Set up a folder that wil contain all of Airflow's own stuff and assign it to the `AIRFLOW_HOME` environment variable. -2. Run `airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"` -3. Define `AIRFLOW__CORE__DAGS_FOLDER` as `${AIRFLOW_HOME}/dags` and create the folder. Copy the DAGs inside `flocs_processing/dags` to this folder. -4. Define `AIRFLOW__CORE__LOAD_EXAMPLES` as `False` +1. Install airflow: `uv pip install apache-airflow` +2. Set up a folder that wil contain all of Airflow's own stuff and assign it to the `AIRFLOW_HOME` environment variable. +3. Run `airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"` +4. Define `AIRFLOW__CORE__DAGS_FOLDER` as `${AIRFLOW_HOME}/dags` and create the folder. Copy the DAGs inside `flocs_processing/dags` to this folder. +5. Define `AIRFLOW__CORE__LOAD_EXAMPLES` as `False` Next start a screen or tmux session and run `airflow standalone` to start the Airflow instance. It will echo a user name and password on first start, but will also store the credentials in `${AIRFLOW_HOME}/simple_auth_manager_passwords.json.generated`. The Airflow instance will start on port 8080. You can access it via `localhost:8080` in your browser. If it is running on a remote cluster, you can set up a tunnel via e.g. `ssh -N -L 8080:localhost:8080 ` to forward it to your local machine. From 354b1544ef4330b2a8770007695a79e9cfc2e683 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 24 Jun 2026 16:43:18 +0100 Subject: [PATCH 30/31] Expand readme with expected folder setup --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index cd88973..7594ded 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,15 @@ This package aims to provide relatively simple end-to-end automatic processing of ILT HBA data. Where `flocs-runners` provides the interface to running pipelines, `flocs-processing` is the scaffolding to tie it together. Data reduction is coordinated via a dedicated SQLite database that holds information on which observations to process, which pipelines to run for them and all of the related statuses. Orchestration of all the pipelines is handled via Airflow through a DAG. +## Folder setup +Flocs-processing requires three folders to be setup: + +* A processing folder -- this is where data is stored while processing +* A data folder -- this is where the input data is found +* An output folder -- this is where finished pipeline outputs are copied to, and searched for in steps that depend on it. + +The expected naming directory structure for input data is `//{calibrator,target}`. Inside the calibrator and target folders, the observations should follow the usual `LXXXXXX` naming scheme. These **must** match the SAS IDs in the database for flocs to be able to find them. + ## Database setup A database for processing is created via `flocs-processing create-database`. This will create an empty database with the necessary columns. Datasets to process can be added via `flocs-processing add-field`. @@ -17,3 +26,4 @@ To start processing data, Airflow needs to be running. This will be delegated to Next start a screen or tmux session and run `airflow standalone` to start the Airflow instance. It will echo a user name and password on first start, but will also store the credentials in `${AIRFLOW_HOME}/simple_auth_manager_passwords.json.generated`. The Airflow instance will start on port 8080. You can access it via `localhost:8080` in your browser. If it is running on a remote cluster, you can set up a tunnel via e.g. `ssh -N -L 8080:localhost:8080 ` to forward it to your local machine. Once `flocs-processing` is complete the processing loop will be automatic, but for now the user must trigger the DAG manually. On the "Dags" tab you should now see the flocs DAGs available. To manually trigger one, click on the name and on the subsequent page use the "Trigger" button in the top right. + From e50a28de2c65816dd4f2c7c637baa1e7d418c0c5 Mon Sep 17 00:00:00 2001 From: Frits Sweijen Date: Wed, 24 Jun 2026 21:35:26 +0100 Subject: [PATCH 31/31] Allow manual approval of delay solutions for widefield --- flocs_processing/dags/pilot_widefield.py | 236 ++++++++++++++++------- 1 file changed, 171 insertions(+), 65 deletions(-) diff --git a/flocs_processing/dags/pilot_widefield.py b/flocs_processing/dags/pilot_widefield.py index c735f5b..6675ada 100644 --- a/flocs_processing/dags/pilot_widefield.py +++ b/flocs_processing/dags/pilot_widefield.py @@ -8,8 +8,9 @@ import subprocess import time -from airflow.exceptions import AirflowFailException, AirflowSkipException +from airflow.exceptions import AirflowFailException from airflow.sdk import dag, get_current_context, task +from airflow.providers.standard.sensors.python import PythonSensor from airflow.task.trigger_rule import TriggerRule from flocs_lta.lta_search import ObservationStager from stager_access import get_surls_requested, get_surls_online @@ -23,6 +24,23 @@ OUTPUT_DIR = "" PROCESSING_DIR = "" NN_MODEL_CACHE = "" +DDF_CONFIG = "" + +NEEDS_MANUAL_APPROVAL_DELAY = True + + +def get_approval(field, identifier, needs_approval): + if not needs_approval: + return True + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = f"sas_id_target,status_{identifier}" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where sas_id_target=='{field['sas_id_target']}'" + ).fetchall() + status = field[0][f"status_{identifier}"] + return status == PIPELINE_STATUS.finished.value @functools.total_ordering @@ -30,7 +48,7 @@ class PIPELINE_STATUS(Enum): nothing = 0 downloaded = 1 finished = 2 - running = 3 + await_approval = 3 processing = 98 error = 99 @@ -68,6 +86,14 @@ def set_status_processing(name, identifier, target): ) +def set_status_await_approval(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.await_approval.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + def set_status_finished(name, identifier, target): with sqlite3.connect(DATABASE) as db: cursor = db.cursor() @@ -200,13 +226,16 @@ def download_field(field): cmd = ( f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" ) - with open( - f"log_download_calibrators_{field['target_name']}.txt", - "w+", - ) as f_out, open( - f"log_download_calibrators_{field['target_name']}.txt", - "w+", - ) as f_err: + with ( + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -222,13 +251,16 @@ def download_field(field): if target_staged and not target_downloaded: dl_path = os.path.join(DATA_DIR, field["target_name"], "target") cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" - with open( - f"log_download_calibrators_{field['target_name']}.txt", - "w+", - ) as f_out, open( - f"log_download_calibrators_{field['target_name']}.txt", - "w+", - ) as f_err: + with ( + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -247,7 +279,7 @@ def download_field(field): @task def run_linc_calibrator1(field): if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( - field["status_calibrator1"] == PIPELINE_STATUS.running + field["status_calibrator1"] == PIPELINE_STATUS.processing ): print( f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." @@ -266,13 +298,16 @@ def run_linc_calibrator1(field): if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) - with open( - f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", - "w+", - ) as f_out, open( - f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", - "w+", - ) as f_err: + with ( + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", + "w+", + ) as f_out, + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -293,7 +328,7 @@ def run_linc_calibrator1(field): @task def run_linc_calibrator2(field): if (field["status_calibrator2"] == PIPELINE_STATUS.finished) or ( - field["status_calibrator2"] == PIPELINE_STATUS.running + field["status_calibrator2"] == PIPELINE_STATUS.processing ): print( f"Flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']} already processed." @@ -312,13 +347,16 @@ def run_linc_calibrator2(field): if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) - with open( - f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}.txt", - "w+", - ) as f_out, open( - f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}_err.txt", - "w+", - ) as f_err: + with ( + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}.txt", + "w+", + ) as f_out, + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}_err.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -373,7 +411,7 @@ def select_best_calibrator(result1, result2): @task def run_linc_target(field): if (field["status_target"] == PIPELINE_STATUS.finished) or ( - field["status_target"] == PIPELINE_STATUS.running + field["status_target"] == PIPELINE_STATUS.processing ): return field else: @@ -395,13 +433,16 @@ def run_linc_target(field): if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) - with open( - f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", - "w+", - ) as f_out, open( - f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w+", - ) as f_err: + with ( + open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -426,7 +467,7 @@ def validate_linc_target(field): @task(retries=0, retry_delay=datetime.timedelta(seconds=5)) def run_vlbi_delay(field): if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( - field["status_vlbi_delay"] == PIPELINE_STATUS.running + field["status_vlbi_delay"] == PIPELINE_STATUS.processing ): return field else: @@ -482,13 +523,16 @@ def run_vlbi_delay(field): if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) - with open( - f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", - "w+", - ) as f_out, open( - f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w+", - ) as f_err: + with ( + open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -500,9 +544,14 @@ def run_vlbi_delay(field): success = True if success: - set_status_finished( - field["target_name"], "vlbi_delay", field["sas_id_target"] - ) + if NEEDS_MANUAL_APPROVAL_DELAY: + set_status_await_approval( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + set_status_finished( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) else: raise RuntimeError return field @@ -510,15 +559,59 @@ def run_vlbi_delay(field): @task def run_ddf_pipeline(field): if (field["status_ddf"] == PIPELINE_STATUS.finished) or ( - field["status_ddf"] == PIPELINE_STATUS.running + field["status_ddf"] == PIPELINE_STATUS.processing ): return field else: print( f"Starting ddf-pipeline for {field['target_name']} {field['sas_id_target']}" ) - return field + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + + cmd = f"flocs-run ddf-pipeline --scheduler slurm --slurm-time 72:00:00 --slurm-cores 32 --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {OUTPUT_DIR} --config-file {DDF_CONFIG} {target_ms_path}" + with ( + open( + f"log_DDF-pipeline_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_DDF-pipeline_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + jobid = None + if not proc.returncode: + f_out.seek(0) + for line in f_out.readlines(): + if "Submitted batch job" in line: + jobid = line.strip().split()[-1] + else: + raise RuntimeError("Failed to submit job.") + if not jobid: + raise RuntimeError("Failed to retrieve job id") + else: + while True: + poll_cmd = f"sacct -X -j {jobid} --format=State --noheader" + status = subprocess.run( + poll_cmd, shell=True, text=True, capture_output=True + ).stdout.strip() + if status == "RUNNING": + time.sleep(60) + elif status == "COMPLETED": + break + elif status == "FAILED": + raise RuntimeError( + f"DDF-pipeline for {field['target_name']} {field['sas_id_target']} failed." + ) + return field @task def run_ddf_subtract(field): @@ -527,7 +620,7 @@ def run_ddf_subtract(field): @task def run_vlbi_ddcal(field): if (field["status_vlbi_dd"] == PIPELINE_STATUS.finished) or ( - field["status_vlbi_dd"] == PIPELINE_STATUS.running + field["status_vlbi_dd"] == PIPELINE_STATUS.processing ): return field else: @@ -559,13 +652,16 @@ def run_vlbi_ddcal(field): if not os.path.isdir(outdir): os.mkdir(outdir) print(cmd) - with open( - f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", - "w+", - ) as f_out, open( - f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", - "w+", - ) as f_err: + with ( + open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): proc = subprocess.run( cmd, shell=True, text=True, stdout=f_out, stderr=f_err ) @@ -593,12 +689,22 @@ def run_vlbi_ddcal(field): result_targ = run_linc_target(best_cal) linc_is_valid = validate_linc_target(result_targ) result_vlbi_delay = run_vlbi_delay(linc_is_valid) - result_ddf = run_ddf_pipeline(result_vlbi_delay) - result_ddf_subtract = run_ddf_subtract(result_ddf) - result_vlbi_dd = run_vlbi_ddcal(result_vlbi_delay) proceed >> get_field - result_ddf >> result_ddf_subtract >> result_vlbi_dd + await_approval_delay = PythonSensor( + task_id="approve_delay", + python_callable=get_approval, + poke_interval=60, + timeout=86400 * 7, + mode="poke", + op_args=[result_vlbi_delay, "vlbi_delay", NEEDS_MANUAL_APPROVAL_DELAY], + ) + # result_ddf = run_ddf_pipeline(await_approval_delay) + result_ddf = run_ddf_pipeline(result_vlbi_delay) + + result_ddf_subtract = run_ddf_subtract(result_ddf) + result_vlbi_dd = run_vlbi_ddcal(result_ddf_subtract) + await_approval_delay >> result_ddf >> result_ddf_subtract >> result_vlbi_dd pilot_widefield()