-
-
Notifications
You must be signed in to change notification settings - Fork 119
Expand file tree
/
Copy pathtaskqueue.py
More file actions
93 lines (78 loc) · 3.67 KB
/
Copy pathtaskqueue.py
File metadata and controls
93 lines (78 loc) · 3.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Centralized task-queue / Redis access, dispatched on ``config.QUEUE_TYPE``.
Companion to :mod:`database`, in the same spirit as ``tasks/mediaserver.py``: one
module owns the Redis connection and the two RQ queues so the rest of the code
never builds them itself. ``app_helper`` re-exports the handles below, so every
module that does ``from app_helper import redis_conn, rq_queue_high`` is untouched.
``redis`` (default) and ``embedded`` both talk to a *real* Redis via
``config.REDIS_URL`` -- RQ relies on Lua ``EVAL``, pub/sub and job registries, so a
genuine server is required. The only difference is who starts it: with ``embedded``
the macOS supervisor launches the bundled ``redis-server`` binary (see
:func:`build_embedded_redis_argv`) and exports its socket URL as ``REDIS_URL``
before the app and workers boot.
``rq`` hardcodes ``UnixSignalDeathPenalty`` as every job registry's default, so
on Windows (no ``signal.SIGALRM``) registry cleanup -- the janitor loop and the
workers' periodic maintenance -- raises ``AttributeError`` whenever an abandoned
job carries an ``on_failure`` callback, and the job is never removed.
``BaseRegistry`` is therefore re-pointed at RQ's own platform dispatcher and
the queues below are built with the same class: a no-op on POSIX, the
timer-based penalty (what RQ's workers already use) on Windows.
"""
from redis import Redis
from rq import Queue, get_current_job
from rq.job import Job
from rq.exceptions import NoSuchJobError
from rq.command import send_stop_job_command
from rq.registry import BaseRegistry
from rq.timeouts import get_default_death_penalty_class
import config
_death_penalty_class = get_default_death_penalty_class()
BaseRegistry.death_penalty_class = _death_penalty_class
__all__ = [
"redis_conn",
"rq_queue_high",
"rq_queue_default",
"Job",
"NoSuchJobError",
"send_stop_job_command",
"get_current_job",
"build_embedded_redis_argv",
"redis_socket_options",
]
def redis_socket_options(url):
"""Connection kwargs that depend on the transport.
``socket_keepalive`` is a TCP-only option; redis-py's
``UnixDomainSocketConnection`` raises ``TypeError`` if it is passed. The
embedded (macOS) queue connects over a unix socket (``unix://``), so the
option is omitted there and kept for the TCP ``redis://`` URLs used by the
container/cloud deployments.
"""
return {} if str(url).startswith("unix://") else {"socket_keepalive": True}
redis_conn = Redis.from_url(
config.REDIS_URL,
socket_connect_timeout=30,
socket_timeout=60,
health_check_interval=30,
retry_on_timeout=True,
**redis_socket_options(config.REDIS_URL),
)
rq_queue_high = Queue('high', connection=redis_conn, default_timeout=-1, death_penalty_class=_death_penalty_class)
rq_queue_default = Queue('default', connection=redis_conn, default_timeout=-1, death_penalty_class=_death_penalty_class)
def build_embedded_redis_argv(server_binary, socket_path, data_dir):
"""Return ``(argv, redis_url)`` for launching a private embedded Redis.
Used only by the standalone (macOS) supervisor when ``QUEUE_TYPE`` is
``embedded``. The supervisor owns the resulting process (spawn, log capture,
group shutdown). TCP is disabled (``--port 0``) so the instance is reachable
only over the unix socket and never collides with a Redis the user already
runs on 6379; persistence is off because RQ state is transient queue data.
"""
argv = [
server_binary,
"--unixsocket", socket_path,
"--unixsocketperm", "700",
"--port", "0",
"--save", "",
"--appendonly", "no",
"--dir", data_dir,
]
redis_url = f"unix://{socket_path}?db=0"
return argv, redis_url