Job Traceability, Management, and Audibility Overhaul#645
Open
bencap wants to merge 140 commits intorelease-2026.2.0from
Open
Job Traceability, Management, and Audibility Overhaul#645bencap wants to merge 140 commits intorelease-2026.2.0from
bencap wants to merge 140 commits intorelease-2026.2.0from
Conversation
This was
linked to
issues
Jan 29, 2026
da26721 to
f3ea5ce
Compare
Base automatically changed from
feature/bencap/derived-gene-name-from-mapped-output
to
release-2025.6.0
February 4, 2026 23:58
1aa5409 to
1fb9fdd
Compare
…cture Break down 1767-line jobs.py into domain-driven modules, improving maintainability and developer experience. - variant_processing/: Variant creation and VRS mapping - external_services/: ClinGen, UniProt, gnomAD integrations - data_management/: Database and view operations - utils/: Shared utilities (state, retry, constants) - registry.py: Centralized ARQ job configuration - constants.py: Environment configuration - redis.py: Redis connection settings - lifecycle.py: Worker lifecycle hooks - worker.py: Main ArqWorkerSettings class - All job functions maintain identical behavior - Registry provides BACKGROUND_FUNCTIONS/BACKGROUND_CRONJOBS lists for ARQ initialization - Test structure mirrors source organization This refactor ensures ARQ worker initialization is backwards compatible. The modular architecture establishes a more maintainable foundation for MaveDB's automated processing workflows while preserving all existing functionality.
Implement complete database foundation for pipeline-based job tracking and monitoring: Database Tables: • pipelines - High-level workflow grouping with correlation IDs for end-to-end tracing • job_runs - Individual job execution tracking with full lifecycle management • job_dependencies - Workflow orchestration with success/completion dependency types • job_metrics - Detailed performance metrics (CPU, memory, execution time, business metrics) • variant_annotation_status - Granular variant-level annotation tracking with success data Key Features: • Pipeline workflow management with dependency resolution • Comprehensive job lifecycle tracking (pending → running → completed/failed) • Retry logic with configurable limits and backoff strategies • Resource usage and performance metrics collection • Variant-level annotation status for debugging failures • Correlation ID support for request tracing across system • JSONB metadata fields for flexible job-specific data • Optimized indexes for common query patterns Schema Design: • Foreign key relationships maintain data integrity • Check constraints ensure valid enum values and positive numbers • Strategic indexes optimize dependency resolution and metrics queries • Cascade deletes prevent orphaned records • Version tracking for audit and debugging Models & Enums: • SQLAlchemy models with proper relationships and hybrid properties • Comprehensive enum definitions for job/pipeline status and failure categories
Add comprehensive job lifecycle management with status-based completion: * Implement convenience methods for common job outcomes: - succeed_job() for successful completion - fail_job() for error handling with exception details - cancel_job() for user/system cancellation - skip_job() for conditional job skipping * Enhance progress tracking with increment_progress() and set_progress_total() * Add comprehensive error handling with specific exception types * Improve job state validation and atomic transaction handling * Implement extensive test coverage for all job operations
- Created PipelineManager capable of coordinating jobs within a pipeline context - Introduced `construct_bulk_cancellation_result` to standardize cancellation result structures. - Added `job_dependency_is_met` to check job dependencies based on their types and statuses. - Created comprehensive tests for PipelineManager covering initialization, job coordination, status transitions, and error handling. - Implemented mocks for database and Redis dependencies to isolate tests. - Added tests for job enqueuing, cancellation, pausing, unpausing, and retrying functionalities.
Adds decorators for managed jobs and pipelines. These can be applied to async ARQ functions to automatically persist their state as they execute
…n history and current annotations
…te related models and logic
… with ARQ support Removes dedicated per-job scripts in favor of a generic handler for arbitrary named jobs/pipelines.
recovery, and concurrency limit - Wrap send_slack_error in try/except so Slack outages never break job lifecycle management or error recovery in decorators - Add failure_category field to JobExecutionOutcome with explicit categories on all job failure returns - Classify unhandled exceptions automatically via classify_exception() (ConnectionError → NETWORK_ERROR, TimeoutError → TIMEOUT, etc.) - Trim unused FailureCategory enum values (21 → 11) - Strip redundant Slack try/except wrappers from decorators - Implement UniProt polling retry via SERVICE_UNAVAILABLE category instead of silently skipping unfinished jobs - Add RUNNING to STARTABLE_JOB_STATUSES so start_job() handles crash recovery directly with a warning log - Set max_jobs=2 in ArqWorkerSettings to prevent event loop starvation from concurrent sync psycopg2 DB calls - Update worker instructions and README
… exception classification
- Removed assertions for progress updates in various test cases across multiple test files. - Simplified tests by eliminating unnecessary mocking of the JobManager's update_progress method. - Ensured that tests still validate the expected outcomes without relying on progress update calls.
- Removes keys from S3 if uploaded and raises to the user directly
…ent retry delay logic
- Introduced new FastAPI routers for job runs and pipelines, providing endpoints for listing and showing details with filtering options. - Implemented CLI scripts for job runs and pipelines, allowing operators to inspect state and progress via command line. - Enhanced view models for job runs and pipelines to include necessary fields and serialization. - Updated server main to include new routers and ensure proper routing. - Added tests for the new endpoints and CLI commands to ensure functionality and access control. - Updated versioning to indicate a development release.
… dependency states
…eueing
ARQ's enqueue_job silently returns None if arq:job:{id} or arq:result:{id}
already exists. This left the DB in QUEUED state while ARQ had no record
of the job — a worker would never pick it up. Two scenarios trigger it:
- Crashed RUNNING job: arq:job: survives because finish_job never ran
- Prior run within TTL: arq:result: lingers for up to 1 hour
Fix: make prepare_retry async and delete both keys before the caller
re-enqueues. All retry paths (cleanup, pipeline manager, job management
decorator) now benefit automatically.
- prepare_retry: made async, deletes arq:job: and arq:result: via
redis.delete before returning
- cleanup.py: await prepare_retry; treat enqueue_job returning None as
a hard error (RuntimeError) rather than silently succeeding
- job_management.py, pipeline_manager.py: await prepare_retry
- tests: convert prepare_retry tests to async; add unit tests for key
deletion and redis=None path; add regression integration tests that
seed stale keys and assert the job is present in ARQ's queue after
cleanup runs
…erially Replace the serial allele fetch loop in `warm_clingen_cache` with `asyncio.as_completed` gated by an `asyncio.Semaphore`, keeping up to `CLINGEN_CACHE_WARMING_CONCURRENCY` (5) requests in-flight at a time. - Adds `CLINGEN_CACHE_WARMING_CONCURRENCY = 5` constant to `lib/clingen/constants.py` - Switches serial `for` loop to semaphore + `as_completed` pattern - Passes captured exception to `exc_info=` for richer warning logs
Previously every attempt of a JobRun was enqueued under `_job_id=urn`,
so ARQ's deduplication keys (arq:job:, arq:in-progress:, arq:result:)
collided across retries. When a decorator tried to prepare and enqueue
a retry from inside the still-running slot, the in-flight attempt's
teardown could clobber the new enqueue or silently block it.
Introduce `arq_job_id(job) -> f"{urn}#{retry_count}"` and use it at
every enqueue site (pipeline manager, stalled-job cleanup, routers,
standalone scripts). Each attempt now occupies a disjoint Redis key
namespace while remaining deterministic from JobRun state.
- add `arq_job_id` helper in worker/lib/managers/utils.py
- route all `_job_id=` enqueues through the helper
- drop the redis.delete() stale-key cleanup from
`JobManager.prepare_retry`; no longer needed for correctness
- update regression tests to plant stale keys at `urn#0` and assert
retries land at `urn#1`
- refresh pipeline_management.md example
The job management decorator already calls send_slack_error for both FAILED and ERRORED outcomes. Calling it again inside the job's own exception handlers caused every mapping failure to produce two Slack messages for the same error.
aa819ae to
ab18b69
Compare
- QUEUED stall detection now checks ARQ's Redis queue directly rather than relying on a time threshold. A job present in Redis (queued, in_progress, or deferred) is legitimately waiting and is skipped; only jobs absent from Redis had their enqueue crash and need recovery. This eliminates false positives where healthy jobs waiting for a worker slot were incorrectly treated as stalled. - Remove QUEUED_TIMEOUT_MINUTES — no longer needed since the Redis check is exact regardless of job age. - Reduce PENDING_TIMEOUT_MINUTES from 30 to 5. The threshold only needs to clear the normal pipeline coordination race window; 30 minutes was far too conservative.
ab18b69 to
39c2b16
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR introduces a robust, auditable, and maintainable background job system for MaveDB, supporting both standalone jobs and complex pipelines. It provides a strong foundation for future workflow automation, error recovery, and developer onboarding.
Features include:
1. Worker Job System Refactor & Enhancements
Refactored the monolithic worker job system into a modular architecture:
2. Job & Pipeline Management Infrastructure
Implemented JobManager and PipelineManager classes for robust job and pipeline lifecycle management: (05fc52b, ae18eeb, 3799d84, 1e447a7, 7b44346)
3. Decorator System for Jobs and Pipelines
Introduced decorators for job and pipeline management:
(c2100a2, 155e549, 4a4055d, 3c4e6b9, 010f15c)
Improved test mode support and simplified stacking/usage patterns.
4. Comprehensive Test Suite
Added and refactored unit and integration tests for all job modules, managers, and decorators.
(05fc52b, ae18eeb, a701d53, 806f8ed, 011522c, 010f15c, 8a22306, a716cc9, b0397b4, 8c5e225, 3c4e6b9, 4a4055d, 1fe076a, 1abe4c6)
Enhanced test coverage for error handling, state transitions, and job orchestration.
Introduced fixtures and utilities for easier test setup and mocking.
Categorized tests with markers for unit, integration, and network tests.
(16a5a50, f34939c)
5. Developer Documentation
Added detailed markdown documentation in the worker/jobs/] directory:
(1abe4c6)
6. Database & Model Changes
(1db6b68)
Alembic migration for pipeline and job tracking schema.
Updated models and enums to support new job/pipeline features.
7. Miscellaneous Improvements
Dependency updates (e.g., added asyncclick).
(a3f36d1)