Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pyfixtures import fixture
from virtool.quality.fastqc import parse_fastqc
from virtool.utils import compress_file, is_gzipped
from virtool.workflow import hooks, step, RunSubprocess
from virtool.workflow.analysis import ReadPaths
from virtool.workflow.data.samples import WFNewSample
Expand Down Expand Up @@ -51,17 +52,25 @@ async def run_fastqc(

@step
async def finalize(
intermediate: SimpleNamespace, new_sample: WFNewSample, read_paths: ReadPaths
intermediate: SimpleNamespace,
new_sample: WFNewSample,
proc: int,
read_paths: ReadPaths,
):
"""
Save the sample data in Virtool.

* Compress any uncompressed read files.
* Upload the read files to the sample file endpoints.
* POST the JSON quality data to sample endpoint.
"""
for i, path in enumerate(read_paths):
new_path = await asyncio.to_thread(path.rename, f"reads_{i + 1}.fq.gz")
await new_sample.upload(new_path)
target = path.with_name(f"reads_{i + 1}.fq.gz")
if is_gzipped(path):
await asyncio.to_thread(path.rename, target)
else:
await asyncio.to_thread(compress_file, path, target, proc)
await new_sample.upload(target)
Comment on lines 67 to +73
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation processes each read file (compression and upload) sequentially. For paired-end samples, this means the second file's compression only starts after the first file has finished its upload. Since compression is CPU-bound and uploading is network-bound, these operations can be overlapped to improve performance. Using asyncio.gather would allow the workflow to process both files in parallel, which is more efficient and safe given that Virtool samples typically have at most two read files. Additionally, adding a check to ensure path != target before renaming prevents potential issues on systems where renaming a file to itself might cause errors or unnecessary I/O.

Suggested change
for i, path in enumerate(read_paths):
new_path = await asyncio.to_thread(path.rename, f"reads_{i + 1}.fq.gz")
await new_sample.upload(new_path)
target = path.with_name(f"reads_{i + 1}.fq.gz")
if is_gzipped(path):
await asyncio.to_thread(path.rename, target)
else:
await asyncio.to_thread(compress_file, path, target, proc)
await new_sample.upload(target)
async def process_path(i, path):
target = path.with_name(f"reads_{i + 1}.fq.gz")
if is_gzipped(path):
if path != target:
await asyncio.to_thread(path.rename, target)
else:
await asyncio.to_thread(compress_file, path, target, proc)
await new_sample.upload(target)
await asyncio.gather(
*(process_path(i, path) for i, path in enumerate(read_paths))
)

Copy link
Copy Markdown
Member Author

@ReeceHoffmann ReeceHoffmann May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. re gather: Technically there is a core of a good idea here given that these steps are rate limited by different resources. As written though it will just try to spin up to compression instances using all cores for both. If we want the performance benefit we can introduce a pipelined approach that actaully separates the compression and upload steps, but imo the performance is probably not worth the complexity here. esp given that most samples are only a single file
  2. re path!= target guard. In linux I think think this just prevents a no-op. I'm okay with this change, just not sure it really does anything


await new_sample.finalize(intermediate.quality.dict())

Expand Down
Loading