diff --git a/config/app.example.php b/config/app.example.php index 9c0cdbf4..ce4a0395 100644 --- a/config/app.example.php +++ b/config/app.example.php @@ -13,6 +13,9 @@ 'workerLifetime' => 60, // 1 minutes // Legacy: 'workermaxruntime' is deprecated but still supported + // optional random offset (0-N seconds) added per worker to stagger shutdowns in a fleet (0 = disabled) + 'workerLifetimeJitter' => 0, + // seconds of running time after which the PHP process will terminate, null uses workerLifetime * 2 'workerPhpTimeout' => null, // Legacy: 'workertimeout' is deprecated but still supported diff --git a/docs/sections/configuration.md b/docs/sections/configuration.md index 7573d5c4..312d6b9b 100644 --- a/docs/sections/configuration.md +++ b/docs/sections/configuration.md @@ -49,6 +49,14 @@ You may create a file called `app_queue.php` inside your `config` folder (NOT th bin/cake queue run --max-runtime 300 # Run for 5 minutes ``` +- Optional per-worker jitter (in seconds) added to the worker lifetime: + + ```php + $config['Queue']['workerLifetimeJitter'] = 30; // up to +30s random offset per worker + ``` + + Each worker picks a random offset in `[0, workerLifetimeJitter]` at startup and adds it to its effective lifetime. Useful when many workers are spawned simultaneously (e.g. ECS/Kubernetes) to stagger shutdowns and avoid a thundering herd of concurrent restarts. Defaults to `0` (no jitter). If `workerLifetime` or the `--max-runtime` override is `0` (unlimited), this setting has no effect. + - Seconds of running time after which the PHP process of the worker will terminate: ```php diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index ea022074..d816a078 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -197,9 +197,11 @@ public function run(array $args): int { $this->exit = false; $startTime = time(); + $jitterOffset = $this->computeLifetimeJitterOffset(); + $maxRuntime = $this->resolveMaxRuntime($config['maxruntime'], $jitterOffset); while (!$this->exit) { - $this->setPhpTimeout($config['maxruntime']); + $this->setPhpTimeout($maxRuntime); try { $this->updatePid($pid); @@ -232,11 +234,6 @@ public function run(array $args): int { sleep(Config::sleeptime()); } - $workerLifetime = Configure::read('Queue.workerLifetime') ?? Configure::read('Queue.workermaxruntime'); - if ($workerLifetime === null && $config['maxruntime'] === null) { - throw new RuntimeException('Queue.workerLifetime (or deprecated workermaxruntime) config is required'); - } - $maxRuntime = $config['maxruntime'] ?? (int)$workerLifetime; // check if we are over the maximum runtime and end processing if so. if ($maxRuntime > 0 && (time() - $startTime) >= $maxRuntime) { $this->exit = true; @@ -626,6 +623,50 @@ protected function setPhpTimeout(?int $maxruntime): void { set_time_limit($timeLimit); } + /** + * Compute the per-worker lifetime jitter offset in seconds. + * + * Returns a random integer in [0, Queue.workerLifetimeJitter]. Used to stagger + * worker shutdowns so a fleet spawned at the same moment does not all exit + * on the same tick (thundering herd). + * + * @return int + */ + protected function computeLifetimeJitterOffset(): int { + $jitter = (int)Configure::read('Queue.workerLifetimeJitter', 0); + if ($jitter <= 0) { + return 0; + } + + return mt_rand(0, $jitter); + } + + /** + * Resolve the effective worker runtime, applying jitter only to bounded workers. + * + * @param int|null $maxruntime Max runtime in seconds if set via CLI option. + * @param int $jitterOffset Per-worker random offset in seconds. + * + * @throws \RuntimeException + * + * @return int + */ + protected function resolveMaxRuntime(?int $maxruntime, int $jitterOffset): int { + $workerLifetime = Configure::read('Queue.workerLifetime') ?? Configure::read('Queue.workermaxruntime'); + if ($workerLifetime === null && $maxruntime === null) { + throw new RuntimeException('Queue.workerLifetime (or deprecated workermaxruntime) config is required'); + } + + $resolvedMaxRuntime = $maxruntime ?? (int)$workerLifetime; + if ($resolvedMaxRuntime <= 0 || $jitterOffset <= 0) { + return (int)$resolvedMaxRuntime; + } + + $this->io->out('Applying worker lifetime jitter: +' . $jitterOffset . ' seconds'); + + return (int)$resolvedMaxRuntime + $jitterOffset; + } + /** * @param array $args * diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index 4e3766d7..c02155bf 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -91,6 +91,28 @@ public function testMemoryUsage() { $this->assertMatchesRegularExpression('/^\d+MB/', $result, 'Should be e.g. `17MB` or `17MB/1GB` etc.'); } + /** + * @return void + */ + public function testResolveMaxRuntimeAppliesJitterToBoundedWorkers() { + $this->Processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + $result = $this->invokeMethod($this->Processor, 'resolveMaxRuntime', [30, 7]); + + $this->assertSame(37, $result); + } + + /** + * @return void + */ + public function testResolveMaxRuntimeDoesNotApplyJitterToUnlimitedWorkers() { + $this->Processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + $result = $this->invokeMethod($this->Processor, 'resolveMaxRuntime', [0, 7]); + + $this->assertSame(0, $result); + } + /** * @return void */ @@ -442,4 +464,51 @@ public function testSetPhpTimeoutWithDeprecatedConfig() { Configure::delete('Queue.workertimeout'); } + /** + * @return void + */ + public function testComputeLifetimeJitterOffsetDefaultsToZero() { + $processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + Configure::delete('Queue.workerLifetimeJitter'); + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertSame(0, $result); + + Configure::write('Queue.workerLifetimeJitter', 0); + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertSame(0, $result); + + Configure::delete('Queue.workerLifetimeJitter'); + } + + /** + * @return void + */ + public function testComputeLifetimeJitterOffsetWithinBounds() { + $processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + Configure::write('Queue.workerLifetimeJitter', 15); + for ($i = 0; $i < 50; $i++) { + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertIsInt($result); + $this->assertGreaterThanOrEqual(0, $result); + $this->assertLessThanOrEqual(15, $result); + } + + Configure::delete('Queue.workerLifetimeJitter'); + } + + /** + * @return void + */ + public function testComputeLifetimeJitterOffsetIgnoresNegative() { + $processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + Configure::write('Queue.workerLifetimeJitter', -10); + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertSame(0, $result); + + Configure::delete('Queue.workerLifetimeJitter'); + } + }