feat: activity cancellation#133
Conversation
Signed-off-by: Tim Li <ltim@uber.com>
Codecov Report❌ Patch coverage is
... and 1 file with indirect coverage changes 🚀 New features to boost your workflow:
|
Signed-off-by: Tim Li <ltim@uber.com>
shijiesheng
left a comment
There was a problem hiding this comment.
I don't think _ActivityCancellation is needed. For async activities, asyncio already handles cancellation natively.
| def heartbeat_details(self, *types: Type) -> list[Any]: | ||
| return self._heartbeat_sender.get_details(*types) | ||
|
|
||
| def is_cancelled(self) -> bool: |
There was a problem hiding this comment.
We might also need APIs like wait_for_cancelled(timeout: timedelta = None) for both sync and async
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
| def wait_for_cancelled(self, timeout: timedelta | float | None = None) -> bool: | ||
| if timeout is None: | ||
| sec: float | None = None | ||
| elif isinstance(timeout, timedelta): | ||
| sec = timeout.total_seconds() | ||
| else: | ||
| sec = float(timeout) | ||
| return self._heartbeat_sender.wait_for_cancellation(sec) |
There was a problem hiding this comment.
💡 Edge Case: Sync wait_for_cancelled(None) can block a pool worker forever
_SyncContext.wait_for_cancelled (cadence/_internal/activity/_context.py:125-132) delegates to _HeartbeatSender.wait_for_cancellation, which calls threading.Event.wait(timeout). When timeout is None, this blocks the thread-pool worker indefinitely. The event is only ever set by a successful heartbeat RPC that returns cancel_requested=True (_heartbeat.py:49-50). If the activity stops heartbeating, heartbeats fail (the exception is swallowed and logged in send_heartbeat), or the server never reports cancellation, the worker thread can never be woken and is permanently consumed from the bounded ThreadPoolExecutor. There is no mechanism to interrupt the blocked thread on start_to_close timeout/worker shutdown. Consider documenting that callers should pass a finite timeout and poll, or internally capping the wait and re-checking, so a single stuck activity cannot starve the worker's thread pool.
Was this helpful? React with 👍 / 👎
Code Review 👍 Approved with suggestions 2 resolved / 3 findingsImplements activity cancellation through heartbeat signals and updated executor logic, resolving previous issues with swallowed errors and empty payloads. Ensure 💡 Edge Case: Sync wait_for_cancelled(None) can block a pool worker forever📄 cadence/_internal/activity/_context.py:125-132 📄 cadence/_internal/activity/_heartbeat.py:32-33 📄 cadence/_internal/activity/_heartbeat.py:49-51
✅ 2 resolved✅ Edge Case: Async activity swallowing CancelledError is reported as success
✅ Quality: ActivityCancelledError() with no details sends empty payload, not None
🤖 Prompt for agentsOptionsAuto-apply is off → Gitar will not commit updates to this branch. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
What changed?
Implement activity cancellation
Why?
When a cancellation request sent to the server, and the activity have heartbeat, the next heartbeat will inform the worker that the activity is pending for cancellation which allows custom logic to be run before cancelling the execution of activity.
How did you test it?
Potential risks
Release notes
Documentation Changes