Skip to content

feat: activity cancellation#133

Open
timl3136 wants to merge 6 commits into
cadence-workflow:mainfrom
timl3136:activity_cancellation
Open

feat: activity cancellation#133
timl3136 wants to merge 6 commits into
cadence-workflow:mainfrom
timl3136:activity_cancellation

Conversation

@timl3136

Copy link
Copy Markdown
Member

What changed?
Implement activity cancellation

Why?
When a cancellation request sent to the server, and the activity have heartbeat, the next heartbeat will inform the worker that the activity is pending for cancellation which allows custom logic to be run before cancelling the execution of activity.

How did you test it?

Potential risks

Release notes

Documentation Changes

Signed-off-by: Tim Li <ltim@uber.com>
Comment thread cadence/_internal/activity/_context.py
Comment thread cadence/error.py
@codecov

codecov Bot commented Jun 10, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 90.62500% with 6 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
cadence/_internal/activity/_context.py 86.66% 2 Missing and 2 partials ⚠️
cadence/_internal/activity/_activity_executor.py 88.23% 2 Missing ⚠️
Files with missing lines Coverage Δ
cadence/_internal/activity/_heartbeat.py 100.00% <100.00%> (ø)
cadence/activity.py 82.53% <100.00%> (+0.43%) ⬆️
cadence/error.py 95.00% <100.00%> (+0.20%) ⬆️
cadence/_internal/activity/_activity_executor.py 91.54% <88.23%> (-1.44%) ⬇️
cadence/_internal/activity/_context.py 95.34% <86.66%> (-4.66%) ⬇️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

timl3136 added 2 commits June 10, 2026 12:34
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>

@shijiesheng shijiesheng left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think _ActivityCancellation is needed. For async activities, asyncio already handles cancellation natively.

@shijiesheng shijiesheng left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def heartbeat_details(self, *types: Type) -> list[Any]:
return self._heartbeat_sender.get_details(*types)

def is_cancelled(self) -> bool:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might also need APIs like wait_for_cancelled(timeout: timedelta = None) for both sync and async

timl3136 added 2 commits June 11, 2026 10:10
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
Comment on lines +125 to +132
def wait_for_cancelled(self, timeout: timedelta | float | None = None) -> bool:
if timeout is None:
sec: float | None = None
elif isinstance(timeout, timedelta):
sec = timeout.total_seconds()
else:
sec = float(timeout)
return self._heartbeat_sender.wait_for_cancellation(sec)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: Sync wait_for_cancelled(None) can block a pool worker forever

_SyncContext.wait_for_cancelled (cadence/_internal/activity/_context.py:125-132) delegates to _HeartbeatSender.wait_for_cancellation, which calls threading.Event.wait(timeout). When timeout is None, this blocks the thread-pool worker indefinitely. The event is only ever set by a successful heartbeat RPC that returns cancel_requested=True (_heartbeat.py:49-50). If the activity stops heartbeating, heartbeats fail (the exception is swallowed and logged in send_heartbeat), or the server never reports cancellation, the worker thread can never be woken and is permanently consumed from the bounded ThreadPoolExecutor. There is no mechanism to interrupt the blocked thread on start_to_close timeout/worker shutdown. Consider documenting that callers should pass a finite timeout and poll, or internally capping the wait and re-checking, so a single stuck activity cannot starve the worker's thread pool.

Was this helpful? React with 👍 / 👎

Signed-off-by: Tim Li <ltim@uber.com>
@gitar-bot

gitar-bot Bot commented Jun 11, 2026

Copy link
Copy Markdown
Code Review 👍 Approved with suggestions 2 resolved / 3 findings

Implements activity cancellation through heartbeat signals and updated executor logic, resolving previous issues with swallowed errors and empty payloads. Ensure wait_for_cancelled(None) is configured to avoid blocking pool workers indefinitely.

💡 Edge Case: Sync wait_for_cancelled(None) can block a pool worker forever

📄 cadence/_internal/activity/_context.py:125-132 📄 cadence/_internal/activity/_heartbeat.py:32-33 📄 cadence/_internal/activity/_heartbeat.py:49-51

_SyncContext.wait_for_cancelled (cadence/_internal/activity/_context.py:125-132) delegates to _HeartbeatSender.wait_for_cancellation, which calls threading.Event.wait(timeout). When timeout is None, this blocks the thread-pool worker indefinitely. The event is only ever set by a successful heartbeat RPC that returns cancel_requested=True (_heartbeat.py:49-50). If the activity stops heartbeating, heartbeats fail (the exception is swallowed and logged in send_heartbeat), or the server never reports cancellation, the worker thread can never be woken and is permanently consumed from the bounded ThreadPoolExecutor. There is no mechanism to interrupt the blocked thread on start_to_close timeout/worker shutdown. Consider documenting that callers should pass a finite timeout and poll, or internally capping the wait and re-checking, so a single stuck activity cannot starve the worker's thread pool.

✅ 2 resolved
Edge Case: Async activity swallowing CancelledError is reported as success

📄 cadence/_internal/activity/_context.py:29-43 📄 cadence/_internal/activity/_activity_executor.py:45-55
In _Context.execute (cadence/_internal/activity/_context.py:29-44), when a cancellation is requested the registered callback calls activity_task.cancel(), injecting asyncio.CancelledError into the activity. If a user's async activity catches CancelledError and returns a value (or returns None) instead of re-raising, await activity_task returns normally, _Context.execute returns the result, and ActivityExecutor.execute (cadence/_internal/activity/_activity_executor.py:49-50) calls _report_success. The result: even though the server requested cancellation (context.is_cancelled() is True), the activity is reported as completed rather than cancelled. This may be intended (an activity that chooses to finish should complete), but it can be surprising. Consider documenting this contract, or after await activity_task returns, checking self._cancellation.is_requested() to decide whether to honor the completion. At minimum, document that async activities must re-raise CancelledError (or raise ActivityCancelledError) to report cancellation.

Quality: ActivityCancelledError() with no details sends empty payload, not None

📄 cadence/error.py:29-32 📄 cadence/_internal/activity/_activity_executor.py:114-119
ActivityCancelledError.__init__ stores self.details = list(details) (cadence/error.py:29-32), so raising ActivityCancelledError() with no arguments yields details == [], not None. In _report_cancelled (cadence/_internal/activity/_activity_executor.py:114-119) the branch details if details is not None else Payload() therefore takes the to_data([]) path for an empty list, which produces an empty Payload anyway — so behavior is currently equivalent. However the is not None guard is effectively dead for the ActivityCancelledError path (an instance always passes a list), while the async CancelledError path always passes None. This is harmless today but the dual representation (empty list vs None both meaning "no details") is easy to break later. Consider normalizing to a single representation.

🤖 Prompt for agents
Code Review: Implements activity cancellation through heartbeat signals and updated executor logic, resolving previous issues with swallowed errors and empty payloads. Ensure `wait_for_cancelled(None)` is configured to avoid blocking pool workers indefinitely.

1. 💡 Edge Case: Sync wait_for_cancelled(None) can block a pool worker forever
   Files: cadence/_internal/activity/_context.py:125-132, cadence/_internal/activity/_heartbeat.py:32-33, cadence/_internal/activity/_heartbeat.py:49-51

   `_SyncContext.wait_for_cancelled` (cadence/_internal/activity/_context.py:125-132) delegates to `_HeartbeatSender.wait_for_cancellation`, which calls `threading.Event.wait(timeout)`. When `timeout is None`, this blocks the thread-pool worker indefinitely. The event is only ever set by a successful heartbeat RPC that returns `cancel_requested=True` (`_heartbeat.py:49-50`). If the activity stops heartbeating, heartbeats fail (the exception is swallowed and logged in `send_heartbeat`), or the server never reports cancellation, the worker thread can never be woken and is permanently consumed from the bounded `ThreadPoolExecutor`. There is no mechanism to interrupt the blocked thread on start_to_close timeout/worker shutdown. Consider documenting that callers should pass a finite timeout and poll, or internally capping the wait and re-checking, so a single stuck activity cannot starve the worker's thread pool.

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants