diff --git a/README.md b/README.md new file mode 100644 index 0000000..7594ded --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# End-to-end processing of ILT HBA data with flocs + +This package aims to provide relatively simple end-to-end automatic processing of ILT HBA data. Where `flocs-runners` provides the interface to running pipelines, `flocs-processing` is the scaffolding to tie it together. Data reduction is coordinated via a dedicated SQLite database that holds information on which observations to process, which pipelines to run for them and all of the related statuses. Orchestration of all the pipelines is handled via Airflow through a DAG. + +## Folder setup +Flocs-processing requires three folders to be setup: + +* A processing folder -- this is where data is stored while processing +* A data folder -- this is where the input data is found +* An output folder -- this is where finished pipeline outputs are copied to, and searched for in steps that depend on it. + +The expected naming directory structure for input data is `//{calibrator,target}`. Inside the calibrator and target folders, the observations should follow the usual `LXXXXXX` naming scheme. These **must** match the SAS IDs in the database for flocs to be able to find them. + +## Database setup +A database for processing is created via `flocs-processing create-database`. This will create an empty database with the necessary columns. Datasets to process can be added via `flocs-processing add-field`. + +## Processing data +To start processing data, Airflow needs to be running. This will be delegated to `flocs-processing process-from-database` in the future, but for now requires running Airflow manually. For setup do the following: + +1. Install airflow: `uv pip install apache-airflow` +2. Set up a folder that wil contain all of Airflow's own stuff and assign it to the `AIRFLOW_HOME` environment variable. +3. Run `airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"` +4. Define `AIRFLOW__CORE__DAGS_FOLDER` as `${AIRFLOW_HOME}/dags` and create the folder. Copy the DAGs inside `flocs_processing/dags` to this folder. +5. Define `AIRFLOW__CORE__LOAD_EXAMPLES` as `False` + +Next start a screen or tmux session and run `airflow standalone` to start the Airflow instance. It will echo a user name and password on first start, but will also store the credentials in `${AIRFLOW_HOME}/simple_auth_manager_passwords.json.generated`. The Airflow instance will start on port 8080. You can access it via `localhost:8080` in your browser. If it is running on a remote cluster, you can set up a tunnel via e.g. `ssh -N -L 8080:localhost:8080 ` to forward it to your local machine. + +Once `flocs-processing` is complete the processing loop will be automatic, but for now the user must trigger the DAG manually. On the "Dags" tab you should now see the flocs DAGs available. To manually trigger one, click on the name and on the subsequent page use the "Trigger" button in the top right. + diff --git a/flocs_processing/dags/pilot_single_target.py b/flocs_processing/dags/pilot_single_target.py new file mode 100644 index 0000000..a3e045f --- /dev/null +++ b/flocs_processing/dags/pilot_single_target.py @@ -0,0 +1,588 @@ +from enum import Enum +import datetime +import functools +import os +import pathlib +import re +import sqlite3 +import subprocess +import time + +from airflow.exceptions import AirflowFailException, AirflowSkipException +from airflow.sdk import dag, get_current_context, task +from airflow.task.trigger_rule import TriggerRule +from flocs_lta.lta_search import ObservationStager +from stager_access import get_surls_requested, get_surls_online + +# Need to replace this with a config file +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" +NN_MODEL_CACHE = "" + + +@functools.total_ordering +class PIPELINE_STATUS(Enum): + nothing = 0 + downloaded = 1 + finished = 2 + running = 3 + processing = 98 + error = 99 + + def __eq__(self, other): + if other.__class__ is int: + return self.value == other + elif other.__class__ is self.__class__: + return self.value == other.value + else: + raise NotImplementedError + + def __lt__(self, other): + if self.__class__ is not other.__class__: + raise NotImplementedError + return self.value < other.value + + +def get_db_columns(): + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay,status_vlbi_dd" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where finished==0 order by priority desc" + ).fetchall() + print(field) + return field + + +def set_status_processing(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.processing.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_finished(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.finished.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_downloaded(name, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_field_finished(name, target): + query = f"update {TABLE_NAME} set finished=1 where target_name=='{name}' and sas_id_target=='{target}'" + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute(query) + + +def set_final_calibrator(name, target, final_cal): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set sas_id_calibrator_final={final_cal} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: + rundirs = pathlib.Path(searchpath) + rundirs_sorted = sorted(rundirs.iterdir()) + if pipeline: + rundirs_sorted_filtered = [ + d + for d in rundirs_sorted + if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) and d.is_dir() + ] + else: + rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] + rundir_final = rundirs_sorted_filtered[-1].absolute() + return rundir_final + + +@dag(max_active_runs=1) +def pilot_single_target(): + @task + def get_unprocessed_target(): + field = dict(get_db_columns()[0]) + print(field["target_name"]) + return field + + @task.short_circuit + def check_fields(): + fields = get_db_columns() + return bool(fields) + + @task + def download_field(field): + if field["downloaded"]: + return field + else: + has_cal1 = False + stage_calibrators = False + if field["sas_id_calibrator1"]: + ms_folder = f"L{field['sas_id_calibrator1']}" + cal1_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal1_full_path): + has_cal1 = True + else: + stage_calibrators = True + + has_cal2 = False + if field["sas_id_calibrator2"]: + ms_folder = f"L{field['sas_id_calibrator2']}" + cal2_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal2_full_path): + has_cal2 = True + else: + stage_calibrators = True + if field["sas_id_target"]: + ms_folder = f"L{field['sas_id_target']}" + target_full_path = os.path.join( + DATA_DIR, field["target_name"], "target", ms_folder + ) + if os.path.exists(target_full_path): + stage_target = False + else: + stage_target = True + else: + raise AirflowFailException( + f"No target SAS ID in database for field {field['target_name']}" + ) + + if stage_calibrators or stage_target: + print(f"Field {field['sas_id_target']} is not downloaded.") + stager = ObservationStager(get_surls=True) + stager.find_observation_by_sasid( + "ALL", + field["sas_id_target"], + None, + 120e6, + 168e6, + ) + if stage_calibrators: + stager.find_nearest_calibrators(2, 120e6, 168e6) + stage_id_calibrators = stager.stage_calibrators() + if stage_target: + stage_id_target = stager.stage_target() + else: + return field + + calibrator_staged = False + target_staged = False + calibrator_downloaded = has_cal1 or has_cal2 + target_downloaded = not stage_target + while True: + if len(get_surls_online(stage_id_calibrators)) == len( + get_surls_requested(stage_id_calibrators) + ): + calibrator_staged = True + if calibrator_staged and not calibrator_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "calibrator") + cmd = ( + f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" + ) + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + calibrator_downloaded = True + else: + raise RuntimeError + + if len(get_surls_online(stage_id_target)) == len( + get_surls_requested(stage_id_target) + ): + calibrator_staged = True + if target_staged and not target_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "target") + cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" + with open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_downloaded( + field["target_name"], + field["sas_id_target"], + ) + target_downloaded = True + else: + raise RuntimeError + if calibrator_downloaded and target_downloaded: + break + time.sleep(60) + + @task + def run_linc_calibrator1(field): + if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator1"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator1']}" + set_status_processing( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_linc_calibrator2(field): + if (field["status_calibrator2"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator2"] == PIPELINE_STATUS.running + ): + print( + f"Flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator2']}" + set_status_processing( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task(trigger_rule=TriggerRule.ONE_DONE) + def select_best_calibrator(result1, result2): + if result1["sas_id_calibrator_final"]: + return result1 + elif result2["sas_id_calibrator_final"]: + return result2 + elif result1 and result2: + print("Selecting between cal1 and cal2") + # Need actual selection logic here + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif result1 and (not result2): + print("Only cal 1 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif (not result1) and result2: + print("Only cal 2 succeeded, continuing with that") + set_final_calibrator( + result2["target_name"], + result2["sas_id_target"], + result2["sas_id_calibrator2"], + ) + return result2 + else: + raise AirflowFailException("No calibrators succeeded; stopping processing.") + + @task + def run_linc_target(field): + if (field["status_target"] == PIPELINE_STATUS.finished) or ( + field["status_target"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + calibrator_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_calibrator" + ) + calibrator_solutions = ( + calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" + ) + set_status_processing( + field["target_name"], "target", field["sas_id_target"] + ) + cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "target", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def validate_linc_target(field): + return field + + @task(retries=0, retry_delay=datetime.timedelta(seconds=5)) + def run_vlbi_delay(field): + if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_delay"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + set_status_processing( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + + delay_cat = os.path.join(outdir, "delay_calibrators.csv") + image_cat = os.path.join(outdir, "image_catalogue.csv") + + proc = subprocess.run( + "detect_bad_slurm_nodes.sh", + shell=True, + text=True, + stdout=subprocess.PIPE, + ) + bad_nodes = proc.stdout.strip() + if bad_nodes: + print(f"Excluding the following bad nodes from scheduling: {bad_nodes}") + os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" + + context = get_current_context() + if context["ti"].try_number == 1: + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" + else: + # Extract the previous working directory + flocs_workdir = "" + print( + f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir." + ) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt" + ) as f_out: + for line in f_out.readlines(): + print(line) + if "Running workflow with" in line: + flocs_workdir = line.split(" ")[-1].strip() + break + if not flocs_workdir: + raise RuntimeError( + "Could not retrieve PILOT workdir. Flocs probably crashed before launching." + ) + print(f"Resuming failed PILOT run in {flocs_workdir}") + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + + if success: + set_status_finished( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_ddf_subtract(field): + return field + + @task + def run_vlbi_ddcal(field): + if (field["status_vlbi_dd"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_dd"] == PIPELINE_STATUS.running + ): + return field + else: + print( + f"Processing ILT dd calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + print(f"Using LINC target run: {target_path}") + + sols_path = get_most_recent_run( + outdir, field["sas_id_target"], "VLBI_delay" + ) + sols_path = sols_path / "results_VLBI_delay-calibration" + sols = list(sols_path.glob("merged*selfcalcycle???_linearfulljones*.h5"))[0] + print(f"Using PILOT delay calibration solutions: {sols}") + + source_cat = os.path.join(DATA_DIR, field["target_name"], "vlbi_target.csv") + if not os.path.isfile(source_cat): + raise AirflowFailException(f"{source_cat} not found.") + + set_status_processing( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-time 24:00:00 --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} --model-cache {NN_MODEL_CACHE} --ms-suffix .dp3concat {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err: + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + set_field_finished(field["target_name"], field["sas_id_target"]) + else: + raise RuntimeError + return field + + proceed = check_fields() + get_field = get_unprocessed_target() + field = download_field(get_field) + result_cal1 = run_linc_calibrator1(field) + result_cal2 = run_linc_calibrator2(field) + best_cal = select_best_calibrator(result_cal1, result_cal2) + result_targ = run_linc_target(best_cal) + linc_is_valid = validate_linc_target(result_targ) + result_vlbi_delay = run_vlbi_delay(linc_is_valid) + result_vlbi_dd = run_vlbi_ddcal(result_vlbi_delay) + + proceed >> get_field + + +pilot_single_target() diff --git a/flocs_processing/dags/pilot_widefield.py b/flocs_processing/dags/pilot_widefield.py new file mode 100644 index 0000000..6675ada --- /dev/null +++ b/flocs_processing/dags/pilot_widefield.py @@ -0,0 +1,710 @@ +from enum import Enum +import datetime +import functools +import os +import pathlib +import re +import sqlite3 +import subprocess +import time + +from airflow.exceptions import AirflowFailException +from airflow.sdk import dag, get_current_context, task +from airflow.providers.standard.sensors.python import PythonSensor +from airflow.task.trigger_rule import TriggerRule +from flocs_lta.lta_search import ObservationStager +from stager_access import get_surls_requested, get_surls_online + +# Need to replace this with a config file +TABLE_NAME = "" +DATABASE = "" +SLURM_ACCOUNT = "" +SLURM_QUEUE = "" +DATA_DIR = "" +OUTPUT_DIR = "" +PROCESSING_DIR = "" +NN_MODEL_CACHE = "" +DDF_CONFIG = "" + +NEEDS_MANUAL_APPROVAL_DELAY = True + + +def get_approval(field, identifier, needs_approval): + if not needs_approval: + return True + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = f"sas_id_target,status_{identifier}" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where sas_id_target=='{field['sas_id_target']}'" + ).fetchall() + status = field[0][f"status_{identifier}"] + return status == PIPELINE_STATUS.finished.value + + +@functools.total_ordering +class PIPELINE_STATUS(Enum): + nothing = 0 + downloaded = 1 + finished = 2 + await_approval = 3 + processing = 98 + error = 99 + + def __eq__(self, other): + if other.__class__ is int: + return self.value == other + elif other.__class__ is self.__class__: + return self.value == other.value + else: + raise NotImplementedError + + def __lt__(self, other): + if self.__class__ is not other.__class__: + raise NotImplementedError + return self.value < other.value + + +def get_db_columns(): + with sqlite3.connect(DATABASE) as db: + db.row_factory = sqlite3.Row + cursor = db.cursor() + columns = "target_name,priority,finished,downloaded,sas_id_calibrator1,sas_id_calibrator2,sas_id_calibrator_final,sas_id_target,status_calibrator1,status_calibrator2,status_target,status_vlbi_delay,status_vlbi_dd" + field = cursor.execute( + f"select {columns} from {TABLE_NAME} where finished==0 order by priority desc" + ).fetchall() + print(field) + return field + + +def set_status_processing(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.processing.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_await_approval(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.await_approval.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_finished(name, identifier, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set status_{identifier}={PIPELINE_STATUS.finished.value} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_status_downloaded(name, target): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set downloaded=1 where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def set_field_finished(name, target): + query = f"update {TABLE_NAME} set finished=1 where target_name=='{name}' and sas_id_target=='{target}'" + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute(query) + + +def set_final_calibrator(name, target, final_cal): + with sqlite3.connect(DATABASE) as db: + cursor = db.cursor() + cursor.execute( + f"update {TABLE_NAME} set sas_id_calibrator_final={final_cal} where target_name=='{name}' and sas_id_target=='{target}'" + ) + + +def get_most_recent_run(searchpath: str, sas_id: str, pipeline: str) -> pathlib.Path: + rundirs = pathlib.Path(searchpath) + rundirs_sorted = sorted(rundirs.iterdir()) + if pipeline: + rundirs_sorted_filtered = [ + d + for d in rundirs_sorted + if ((sas_id in d.parts[-1]) and (pipeline in d.parts[-1])) and d.is_dir() + ] + else: + rundirs_sorted_filtered = [d for d in rundirs_sorted if sas_id in d.parts[-1]] + rundir_final = rundirs_sorted_filtered[-1].absolute() + return rundir_final + + +@dag(max_active_runs=1) +def pilot_widefield(): + @task + def get_unprocessed_target(): + field = dict(get_db_columns()[0]) + print(field["target_name"]) + return field + + @task.short_circuit + def check_fields(): + fields = get_db_columns() + return bool(fields) + + @task + def download_field(field): + if field["downloaded"]: + return field + else: + has_cal1 = False + stage_calibrators = False + if field["sas_id_calibrator1"]: + ms_folder = f"L{field['sas_id_calibrator1']}" + cal1_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal1_full_path): + has_cal1 = True + else: + stage_calibrators = True + + has_cal2 = False + if field["sas_id_calibrator2"]: + ms_folder = f"L{field['sas_id_calibrator2']}" + cal2_full_path = os.path.join( + DATA_DIR, field["target_name"], "calibrator", ms_folder + ) + if os.path.exists(cal2_full_path): + has_cal2 = True + else: + stage_calibrators = True + if field["sas_id_target"]: + ms_folder = f"L{field['sas_id_target']}" + target_full_path = os.path.join( + DATA_DIR, field["target_name"], "target", ms_folder + ) + if os.path.exists(target_full_path): + stage_target = False + else: + stage_target = True + else: + raise AirflowFailException( + f"No target SAS ID in database for field {field['target_name']}" + ) + + if stage_calibrators or stage_target: + print(f"Field {field['sas_id_target']} is not downloaded.") + stager = ObservationStager(get_surls=True) + stager.find_observation_by_sasid( + "ALL", + field["sas_id_target"], + None, + 120e6, + 168e6, + ) + if stage_calibrators: + stager.find_nearest_calibrators(2, 120e6, 168e6) + stage_id_calibrators = stager.stage_calibrators() + if stage_target: + stage_id_target = stager.stage_target() + else: + return field + + calibrator_staged = False + target_staged = False + calibrator_downloaded = has_cal1 or has_cal2 + target_downloaded = not stage_target + while True: + if len(get_surls_online(stage_id_calibrators)) == len( + get_surls_requested(stage_id_calibrators) + ): + calibrator_staged = True + if calibrator_staged and not calibrator_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "calibrator") + cmd = ( + f"flocs-lta download --outdir {dl_path} {stage_id_calibrators}" + ) + with ( + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + calibrator_downloaded = True + else: + raise RuntimeError + + if len(get_surls_online(stage_id_target)) == len( + get_surls_requested(stage_id_target) + ): + calibrator_staged = True + if target_staged and not target_downloaded: + dl_path = os.path.join(DATA_DIR, field["target_name"], "target") + cmd = f"flocs-lta download --outdir {dl_path} {stage_id_target}" + with ( + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_out, + open( + f"log_download_calibrators_{field['target_name']}.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + if not proc.returncode: + set_status_downloaded( + field["target_name"], + field["sas_id_target"], + ) + target_downloaded = True + else: + raise RuntimeError + if calibrator_downloaded and target_downloaded: + break + time.sleep(60) + + @task + def run_linc_calibrator1(field): + if (field["status_calibrator1"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator1"] == PIPELINE_STATUS.processing + ): + print( + f"Flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator1']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator1']}" + set_status_processing( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with ( + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}.txt", + "w+", + ) as f_out, + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator1']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator1", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_linc_calibrator2(field): + if (field["status_calibrator2"] == PIPELINE_STATUS.finished) or ( + field["status_calibrator2"] == PIPELINE_STATUS.processing + ): + print( + f"Flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']} already processed." + ) + return field + else: + print( + f"Processing flux density calibrator {field['sas_id_calibrator2']} for observation {field['target_name']} {field['sas_id_target']}" + ) + ms_folder = f"L{field['sas_id_calibrator2']}" + set_status_processing( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + cmd = f"flocs-run linc calibrator --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} {os.path.join(DATA_DIR, field['target_name'], 'calibrator', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with ( + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}.txt", + "w+", + ) as f_out, + open( + f"log_LINC_calibrator_{field['target_name']}_{field['sas_id_calibrator2']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "calibrator2", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task(trigger_rule=TriggerRule.ONE_DONE) + def select_best_calibrator(result1, result2): + if result1["sas_id_calibrator_final"]: + return result1 + elif result2["sas_id_calibrator_final"]: + return result2 + elif result1 and result2: + print("Selecting between cal1 and cal2") + # Need actual selection logic here + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif result1 and (not result2): + print("Only cal 1 succeeded, continuing with that") + set_final_calibrator( + result1["target_name"], + result1["sas_id_target"], + result1["sas_id_calibrator1"], + ) + return result1 + elif (not result1) and result2: + print("Only cal 2 succeeded, continuing with that") + set_final_calibrator( + result2["target_name"], + result2["sas_id_target"], + result2["sas_id_calibrator2"], + ) + return result2 + else: + raise AirflowFailException("No calibrators succeeded; stopping processing.") + + @task + def run_linc_target(field): + if (field["status_target"] == PIPELINE_STATUS.finished) or ( + field["status_target"] == PIPELINE_STATUS.processing + ): + return field + else: + print( + f"Processing target observation {field['target_name']} {field['sas_id_target']} with calibrator {field['sas_id_calibrator_final']}" + ) + ms_folder = f"L{field['sas_id_target']}" + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + calibrator_path = get_most_recent_run( + outdir, field["sas_id_calibrator_final"], "LINC_calibrator" + ) + calibrator_solutions = ( + calibrator_path / "results_LINC_calibrator" / "cal_solutions.h5" + ) + set_status_processing( + field["target_name"], "target", field["sas_id_target"] + ) + cmd = f"flocs-run linc target --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --cal-solutions {calibrator_solutions} {os.path.join(DATA_DIR, field['target_name'], 'target', ms_folder)}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with ( + open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_LINC_target_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "target", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def validate_linc_target(field): + return field + + @task(retries=0, retry_delay=datetime.timedelta(seconds=5)) + def run_vlbi_delay(field): + if (field["status_vlbi_delay"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_delay"] == PIPELINE_STATUS.processing + ): + return field + else: + print( + f"Processing delay calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + set_status_processing( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + + delay_cat = os.path.join(outdir, "delay_calibrators.csv") + image_cat = os.path.join(outdir, "image_catalogue.csv") + + proc = subprocess.run( + "detect_bad_slurm_nodes.sh", + shell=True, + text=True, + stdout=subprocess.PIPE, + ) + bad_nodes = proc.stdout.strip() + if bad_nodes: + print(f"Excluding the following bad nodes from scheduling: {bad_nodes}") + os.environ["TOIL_SLURM_ARGS"] = f"--exclude={bad_nodes}" + + context = get_current_context() + if context["ti"].try_number == 1: + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" + else: + # Extract the previous working directory + flocs_workdir = "" + print( + f"Scanning log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt for workdir." + ) + with open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt" + ) as f_out: + for line in f_out.readlines(): + print(line) + if "Running workflow with" in line: + flocs_workdir = line.split(" ")[-1].strip() + break + if not flocs_workdir: + raise RuntimeError( + "Could not retrieve PILOT workdir. Flocs probably crashed before launching." + ) + print(f"Resuming failed PILOT run in {flocs_workdir}") + cmd = f"flocs-run vlbi delay-calibration --runner toil --scheduler slurm --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {flocs_workdir} --restart --outdir {outdir} --ms-suffix dp3concat --delay-calibrator {delay_cat} --image-catalogue {image_cat} {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with ( + open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_VLBI_delay-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + + if success: + if NEEDS_MANUAL_APPROVAL_DELAY: + set_status_await_approval( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + set_status_finished( + field["target_name"], "vlbi_delay", field["sas_id_target"] + ) + else: + raise RuntimeError + return field + + @task + def run_ddf_pipeline(field): + if (field["status_ddf"] == PIPELINE_STATUS.finished) or ( + field["status_ddf"] == PIPELINE_STATUS.processing + ): + return field + else: + print( + f"Starting ddf-pipeline for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + + cmd = f"flocs-run ddf-pipeline --scheduler slurm --slurm-time 72:00:00 --slurm-cores 32 --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {OUTPUT_DIR} --config-file {DDF_CONFIG} {target_ms_path}" + with ( + open( + f"log_DDF-pipeline_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_DDF-pipeline_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + jobid = None + if not proc.returncode: + f_out.seek(0) + for line in f_out.readlines(): + if "Submitted batch job" in line: + jobid = line.strip().split()[-1] + else: + raise RuntimeError("Failed to submit job.") + + if not jobid: + raise RuntimeError("Failed to retrieve job id") + else: + while True: + poll_cmd = f"sacct -X -j {jobid} --format=State --noheader" + status = subprocess.run( + poll_cmd, shell=True, text=True, capture_output=True + ).stdout.strip() + if status == "RUNNING": + time.sleep(60) + elif status == "COMPLETED": + break + elif status == "FAILED": + raise RuntimeError( + f"DDF-pipeline for {field['target_name']} {field['sas_id_target']} failed." + ) + return field + + @task + def run_ddf_subtract(field): + return field + + @task + def run_vlbi_ddcal(field): + if (field["status_vlbi_dd"] == PIPELINE_STATUS.finished) or ( + field["status_vlbi_dd"] == PIPELINE_STATUS.processing + ): + return field + else: + print( + f"Processing ILT dd calibration for {field['target_name']} {field['sas_id_target']}" + ) + outdir = os.path.join(OUTPUT_DIR, field["target_name"]) + target_path = get_most_recent_run( + outdir, field["sas_id_target"], "LINC_target" + ) + target_ms_path = target_path / "results_LINC_target" / "results" + print(f"Using LINC target run: {target_path}") + + sols_path = get_most_recent_run( + outdir, field["sas_id_target"], "VLBI_delay" + ) + sols_path = sols_path / "results_VLBI_delay-calibration" + sols = list(sols_path.glob("merged*selfcalcycle???_linearfulljones*.h5"))[0] + print(f"Using PILOT delay calibration solutions: {sols}") + + source_cat = os.path.join(DATA_DIR, field["target_name"], "vlbi_target.csv") + if not os.path.isfile(source_cat): + raise AirflowFailException(f"{source_cat} not found.") + + set_status_processing( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + cmd = f"flocs-run vlbi dd-calibration --runner toil --scheduler slurm --slurm-time 24:00:00 --slurm-account {SLURM_ACCOUNT} --slurm-queue {SLURM_QUEUE} --rundir {PROCESSING_DIR} --outdir {outdir} --delay-solset {sols} --phasediff-score 10.0 --source-catalogue {source_cat} --model-cache {NN_MODEL_CACHE} --ms-suffix .dp3concat {target_ms_path}" + if not os.path.isdir(outdir): + os.mkdir(outdir) + print(cmd) + with ( + open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}.txt", + "w+", + ) as f_out, + open( + f"log_VLBI_dd-calibration_{field['target_name']}_{field['sas_id_target']}_err.txt", + "w+", + ) as f_err, + ): + proc = subprocess.run( + cmd, shell=True, text=True, stdout=f_out, stderr=f_err + ) + success = False + pattern = re.compile(r"Workflow.* stopped. Success: True") + if not proc.returncode: + f_err.seek(0) + if pattern.search(f_err.read()): + success = True + if success: + set_status_finished( + field["target_name"], "vlbi_dd", field["sas_id_target"] + ) + set_field_finished(field["target_name"], field["sas_id_target"]) + else: + raise RuntimeError + return field + + proceed = check_fields() + get_field = get_unprocessed_target() + field = download_field(get_field) + result_cal1 = run_linc_calibrator1(field) + result_cal2 = run_linc_calibrator2(field) + best_cal = select_best_calibrator(result_cal1, result_cal2) + result_targ = run_linc_target(best_cal) + linc_is_valid = validate_linc_target(result_targ) + result_vlbi_delay = run_vlbi_delay(linc_is_valid) + + proceed >> get_field + await_approval_delay = PythonSensor( + task_id="approve_delay", + python_callable=get_approval, + poke_interval=60, + timeout=86400 * 7, + mode="poke", + op_args=[result_vlbi_delay, "vlbi_delay", NEEDS_MANUAL_APPROVAL_DELAY], + ) + # result_ddf = run_ddf_pipeline(await_approval_delay) + result_ddf = run_ddf_pipeline(result_vlbi_delay) + + result_ddf_subtract = run_ddf_subtract(result_ddf) + result_vlbi_dd = run_vlbi_ddcal(result_ddf_subtract) + await_approval_delay >> result_ddf >> result_ddf_subtract >> result_vlbi_dd + + +pilot_widefield() diff --git a/flocs_processing/flocs_processing.py b/flocs_processing/flocs_processing.py index f23ef2e..a603739 100644 --- a/flocs_processing/flocs_processing.py +++ b/flocs_processing/flocs_processing.py @@ -98,18 +98,25 @@ def create_database( "linc" ], ): + pipeline_str = ",".join(pipelines) pipelines = list(map(str.lower, pipelines)) - dbstr = f"create table {table_name}(source_name text default NULL" + dbstr = f"create table {table_name}(target_name text default NULL, pipelines text default '{pipeline_str}', priority int default 0, finished bit default 0, downloaded bit default 0" if "linc" in pipelines: dbstr += ", sas_id_calibrator1 text default NULL, sas_id_calibrator2 text default NULL, sas_id_calibrator_final text default NULL, sas_id_target text primary key default NULL, status_calibrator1 smallint default 0, status_calibrator2 smallint default 0, status_target smallint default 0" if "ddf-pipeline" in pipelines: - dbstr += ", status_ddf smallint default 0" + dbstr += f", status_ddf smallint default {PIPELINE_STATUS.nothing.value}" if "vlbi-delay-widefield" in pipelines: - dbstr += ", status_ddf smallint default 0" - dbstr += ", status_delay smallint default 0" + dbstr += f", status_vlbi_delay smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_dd smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_intermediate_img smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_facet_subtract smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += ( + f", status_vlbi_facet_img smallint default {PIPELINE_STATUS.nothing.value}" + ) if "vlbi-delay-single-target" in pipelines: - dbstr += ", status_delay smallint default 0" + dbstr += f", status_vlbi_delay smallint default {PIPELINE_STATUS.nothing.value}" + dbstr += f", status_vlbi_dd smallint default {PIPELINE_STATUS.nothing.value}" dbstr += ");" cmd = ["sqlite3", dbname, dbstr] @@ -137,18 +144,21 @@ def add_field( table_name: Annotated[ str, Parameter(help="Database table that will be processed.") ] = "processing_flocs", + pipelines: Annotated[ + str, Parameter(help="Pipelines this field needs to be processed with.") + ] = "", ): - dbstr = f"insert into {table_name} (source_name" + dbstr = f"insert into {table_name} (target_name" if len(sas_id_calibrators) == 1: dbstr += ", sas_id_calibrator1" if len(sas_id_calibrators) == 2: dbstr += ", sas_id_calibrator1, sas_id_calibrator2" dbstr += f", sas_id_target) values ('{field_name}', " if len(sas_id_calibrators) == 1: - dbstr += f"{sas_id_calibrators[0]}" + dbstr += f"'{sas_id_calibrators[0]}'" if len(sas_id_calibrators) == 2: - dbstr += f"{sas_id_calibrators[0]}, {sas_id_calibrators[1]}, " - dbstr += f"{sas_id_target})" + dbstr += f"'{sas_id_calibrators[0]}', '{sas_id_calibrators[1]}', " + dbstr += f"'{sas_id_target}')" cmd = ["sqlite3", dbname, dbstr] print(f"Adding field {field_name} to {table_name} via: {" ".join(cmd)}") @@ -316,7 +326,10 @@ def launch_vlbi_delay(self, field_name, sas_id, restart: bool = False): rundirs_sorted_filtered = [ d for d in rundirs_sorted - if ((sas_id in d.parts[-1]) and (d.parts[-1].startswith(("LINC_target")))) + if ( + (sas_id in d.parts[-1]) + and (d.parts[-1].startswith(("LINC_target"))) + ) ] # Last LINC target reduction for this source linc_target_dir = rundirs_sorted_filtered[-1] @@ -373,7 +386,10 @@ def launch_vlbi_delay(self, field_name, sas_id, restart: bool = False): rundirs_sorted_filtered = [ d for d in rundirs_sorted - if ((sas_id in d.parts[-1]) and (d.parts[-1].startswith(("LINC_target")))) + if ( + (sas_id in d.parts[-1]) + and (d.parts[-1].startswith(("LINC_target"))) + ) ] # Last LINC target reduction for this source linc_target_dir = rundirs_sorted_filtered[-1]