-
Notifications
You must be signed in to change notification settings - Fork 10
Background workers #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Yannicked
wants to merge
39
commits into
iterorganization:develop
Choose a base branch
from
Yannicked:feature/celery-tasks
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
fc08d1f
Async Simulation endpoint
Yannicked 1eb037c
Add migration
Yannicked aba96f5
Fix ty
Yannicked 55d8e53
Remove automatic table creation
Yannicked 795de15
Add Celery
Yannicked e6989d9
Copy files
Yannicked d6c654d
Cleanup tasks
Yannicked 7f9678f
Add tests
Yannicked 22f35ee
Handle exceptions
Yannicked 1fbe188
Sanitize path
Yannicked dc05598
Cleanup simulations post endpoint
Yannicked 3fbfea6
Merge branch 'develop' into feature/celery-tasks
Yannicked d4fbb52
Add celery documentation
Yannicked 2fbd598
Update ingestion status migration
Yannicked d4e6ac4
Fix typing
Yannicked 68938a1
Cleanup tests
Yannicked 03d2abb
Fix issues
Yannicked af44d84
Test script
Yannicked a9ed16c
Fix issues with imas data
Yannicked b60af62
Update test script for IMAS
Yannicked 36b4bfa
Update tests
Yannicked a7c992e
Merge branch 'develop' into feature/celery-tasks
Yannicked 287d0e1
Update docker files
Yannicked 015242c
Check for master.h5 in hdf5 imas
Yannicked 14967aa
Fix imas backend detection
Yannicked c21c06c
Fix test
Yannicked 05cf791
Temporarily comment celery worker and beat
Yannicked fc7c3ef
Add uv lockfile
Yannicked 95c100b
Update revision string
Yannicked 5b5eb9f
Make mdsplus check superset
Yannicked c748208
Remove unused model
Yannicked c7719ff
fix: mock Config.load in task_environment test fixture to fix failing…
Yannicked e31729c
fix: close DB session in complete_ingestion and run validation in v1.…
Yannicked 6edca35
fix: use the URI parser, not the SQLAlchemy type, in to_model_with_path
Yannicked 1bdefe2
Revert "fix: close DB session in complete_ingestion and run validatio…
Yannicked 50efed8
Suggestions from alexandra
Yannicked f016eae
Merge branch 'develop' into feature/celery-tasks
Yannicked 5a9aa2c
Fix get_db sqlite fallback, .nc detection, and db close in finally
Yannicked e47cf62
Fix typo in SimulationStatusResponse docstring
Yannicked File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| """Add ingestion status | ||
|
|
||
| Revision ID: b2c52ee8ff12 | ||
| Revises: 28bee3aa2429 | ||
| Create Date: 2026-05-11 16:16:03.768893 | ||
|
|
||
| """ | ||
|
|
||
| from typing import Sequence, Union | ||
|
|
||
| import sqlalchemy as sa | ||
|
|
||
| from alembic import op | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = "b2c52ee8ff12" | ||
| down_revision: Union[str, Sequence[str], None] = "28bee3aa2429" | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| """Upgrade schema.""" | ||
| conn = op.get_bind() | ||
| dialect = conn.dialect.name | ||
| if dialect == "postgresql": | ||
| op.execute( | ||
| "CREATE TYPE ingestionstatus AS ENUM ('QUEUED', 'COPYING', 'COPIED', " | ||
| "'VALIDATING', 'VALIDATED', 'COMPLETED', 'COPY_FAILED', " | ||
| "'VALIDATION_FAILED')" | ||
| ) | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.add_column( | ||
| sa.Column( | ||
| "ingestion_status", | ||
| sa.Enum( | ||
| "QUEUED", | ||
| "COPYING", | ||
| "COPIED", | ||
| "VALIDATING", | ||
| "VALIDATED", | ||
| "COMPLETED", | ||
| "COPY_FAILED", | ||
| "VALIDATION_FAILED", | ||
| name="ingestionstatus", | ||
| ), | ||
| nullable=True, | ||
| ) | ||
| ) | ||
| batch_op.add_column(sa.Column("ingestion_version", sa.Integer(), nullable=True)) | ||
| op.execute( | ||
| "UPDATE simulations SET ingestion_status = 'COMPLETED' WHERE ingestion_status " | ||
| "IS NULL" | ||
| ) | ||
| op.execute( | ||
| "UPDATE simulations SET ingestion_version = 0 WHERE ingestion_version IS NULL" | ||
| ) | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.alter_column("ingestion_status", nullable=False) | ||
| batch_op.alter_column("ingestion_version", nullable=False) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| """Downgrade schema.""" | ||
| with op.batch_alter_table("simulations", schema=None) as batch_op: | ||
| batch_op.drop_column("ingestion_version") | ||
| batch_op.drop_column("ingestion_status") | ||
| conn = op.get_bind() | ||
| dialect = conn.dialect.name | ||
| if dialect == "postgresql": | ||
| op.execute("DROP TYPE ingestionstatus") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| # Celery async task processing | ||
|
|
||
| SimDB uses [Celery](https://docs.celeryproject.org/) to run asynchronous background | ||
| tasks such as copying simulation files and completing the ingestion pipeline. | ||
|
|
||
| ## Overview | ||
|
|
||
| When simulations are uploaded via the REST API, the server offloads heavy operations | ||
| to Celery workers instead of blocking the HTTP request. Tasks are defined in | ||
| `src/simdb/workers/tasks.py`: | ||
|
|
||
| - `copy_files_task` — copies input/output files from source locations to the server's | ||
| upload folder and updates the simulation's ingestion status. | ||
| - `complete_ingestion_task` — marks a simulation as fully ingested. | ||
| - `validate_imas_task` — runs validation checks on IMAS data (placeholder). | ||
| - `send_email_task` — sends email notifications. | ||
|
|
||
| Tasks can be chained in the API endpoint: | ||
|
|
||
| ```python | ||
| copy_files = copy_files_task.si(simulation.uuid, ...) | ||
| complete = complete_ingestion_task.si(simulation.uuid) | ||
| _ = (copy_files | complete).apply_async() | ||
| ``` | ||
|
|
||
| ## Configuration | ||
|
|
||
| Celery is configured via `app.cfg`: | ||
|
|
||
| | Section | Option | Required | Description | | ||
| |---------|----------------|----------|--------------------------------------------------| | ||
| | celery | broker_url | no | Redis URL for the message broker. Defaults to `redis://localhost:6379/0` | | ||
| | celery | result_backend | no | Redis URL for results storage. Defaults to `redis://localhost:6379/0` | | ||
|
|
||
| Example: | ||
|
|
||
| ```ini | ||
| [celery] | ||
| broker_url = redis://localhost:6379/0 | ||
| result_backend = redis://localhost:6379/0 | ||
| ``` | ||
|
|
||
| ## Running workers | ||
|
|
||
| ### Standalone worker | ||
|
|
||
| Start a Celery worker using the built-in CLI: | ||
|
|
||
| ```bash | ||
| simdb_worker | ||
| ``` | ||
|
|
||
| ### Worker with beat scheduler | ||
|
|
||
| For periodic tasks (e.g. cleanup, reports), run both the worker and beat: | ||
|
|
||
| ```bash | ||
| # Terminal 1: worker | ||
| simdb_worker | ||
|
|
||
| # Terminal 2: beat scheduler | ||
| simdb_beat | ||
| ``` | ||
|
|
||
| ### Flower monitoring | ||
|
|
||
| [Flower](https://flower.readthedocs.io/) provides a web UI for monitoring Celery | ||
| workers and tasks: | ||
|
|
||
| ```bash | ||
| celery -A simdb.workers.celery flower --port=5555 | ||
| ``` | ||
|
|
||
| ## Testing with eager mode | ||
|
|
||
| In tests, set `task_always_eager = True` to run tasks synchronously without a | ||
| broker. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.