From dc29cac17f5cb29af12c75b3da94a6357970cb3e Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 4 Jun 2026 07:55:47 -0700 Subject: [PATCH] Add submit and poll timing metrics for batch and Python adapter steps. Expose gauge metrics and debug logs to diagnose backpressure and slow downstream submits during batch drain and Python delegate poll/submit. Co-authored-by: Cursor --- sentry_streams/src/batch_step.rs | 36 ++++++++++++-- sentry_streams/src/python_operator.rs | 71 +++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 9 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 2f3fe952..28e51e20 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size"; const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms"; +const METRIC_BATCH_SUBMIT_DURATION_MS: &str = "streams.pipeline.batch.submit_duration_ms"; fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -217,6 +218,7 @@ pub struct BatchStep { /// This helps us capping the size of the pending queue during prolonged backpressure periods. pending_batch: bool, commit_request_carried_over: Option, + step_labels: Vec<(String, String)>, } impl BatchStep { @@ -227,6 +229,7 @@ impl BatchStep { step_name: String, next_step: Box>, ) -> Self { + let step_labels = vec![("step".to_string(), step_name.clone())]; Self { next_step, route, @@ -238,6 +241,7 @@ impl BatchStep { outbound: VecDeque::new(), pending_batch: false, commit_request_carried_over: None, + step_labels, } } @@ -249,10 +253,28 @@ impl BatchStep { /// on going work. fn drain_outbound(&mut self) -> Result<(), StrategyError> { while let Some(msg) = self.outbound.pop_front() { + let outbound_len = self.outbound.len(); let c = self.next_step.poll()?; self.commit_request_carried_over = merge_commit_request(self.commit_request_carried_over.take(), c); - match self.next_step.submit(msg) { + let submit_start = Instant::now(); + let submit_result = self.next_step.submit(msg); + let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; + metrics::gauge!(METRIC_BATCH_SUBMIT_DURATION_MS, &self.step_labels) + .set(submit_duration_ms); + let submit_outcome = match &submit_result { + Ok(()) => "ok", + Err(SubmitError::MessageRejected(_)) => "message_rejected", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + }; + log::debug!( + "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}", + self.step_name, + submit_duration_ms, + outbound_len, + submit_outcome + ); + match submit_result { Ok(()) => { if self.pending_batch { self.pending_batch = false; @@ -293,9 +315,15 @@ impl BatchStep { let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); - let step_labels = vec![("step".to_string(), self.step_name.clone())]; - metrics::histogram!(METRIC_BATCH_SIZE, &step_labels).record(batch_elements); - metrics::histogram!(METRIC_BATCH_TIME_MS, &step_labels).record(batch_open_ms); + metrics::histogram!(METRIC_BATCH_SIZE, &self.step_labels).record(batch_elements); + metrics::histogram!(METRIC_BATCH_TIME_MS, &self.step_labels).record(batch_open_ms); + log::info!( + "Batch flushed. step: {:?}, batch_elements: {:?}, batch_open_ms: {:?} created_at: {:?}", + self.step_name, + batch_elements, + batch_open_ms, + b.created_at + ); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index d7d094f6..b0a2de23 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -15,7 +15,16 @@ use sentry_arroyo::processing::strategies::{merge_commit_request, CommitRequest, use sentry_arroyo::types::{Message, Partition, Topic}; use sentry_arroyo::utils::timing::Deadline; use std::collections::VecDeque; -use std::time::Duration; +use std::time::{Duration, Instant}; + +const METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS: &str = + "streams.pipeline.python_adapter.submit_duration_ms"; +const METRIC_PYTHON_ADAPTER_POLL_DURATION_MS: &str = + "streams.pipeline.python_adapter.poll_duration_ms"; +const METRIC_PYTHON_ADAPTER_POLL_HANDLE_DURATION_MS: &str = + "streams.pipeline.python_adapter.poll_handle_duration_ms"; +const METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS: &str = + "streams.pipeline.python_adapter.next_step_submit_duration_ms"; import_exception!(arroyo.processing.strategies, MessageRejected); import_exception!(arroyo.dlq, InvalidMessage); @@ -38,6 +47,7 @@ pub struct PythonAdapter { // TODO: Add a mutex here next_strategy: Box>, commit_request_carried_over: Option, + step_labels: Vec<(String, String)>, } impl PythonAdapter { @@ -48,6 +58,7 @@ impl PythonAdapter { ) -> Self { traced_with_gil!(|py| { let processing_step = delegate_factory.call_method0(py, "build").unwrap(); + let step_labels = vec![("route".to_string(), format!("{:?}", route))]; Self { route, @@ -55,6 +66,7 @@ impl PythonAdapter { next_strategy, transformed_messages: VecDeque::new(), commit_request_carried_over: None, + step_labels, } }) } @@ -137,9 +149,17 @@ impl ProcessingStrategy for PythonAdapter { let py_committable = convert_committable_to_py(py, committable) .expect("Unable to retrieve commitable from message"); + let submit_start = Instant::now(); let res = self.processing_step .call_method1(py, "submit", (python_payload, py_committable)); + let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; + metrics::gauge!(METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS, &self.step_labels) + .set(submit_duration_ms); + log::debug!( + "PythonAdapter submit. duration_ms: {:?}", + submit_duration_ms + ); let Err(py_err) = res else { return Ok(()); @@ -176,23 +196,64 @@ impl ProcessingStrategy for PythonAdapter { /// /// This is the method that sends messages to the next ProcessingStrategy. fn poll(&mut self) -> Result, StrategyError> { - let out_messages = traced_with_gil!(|py| -> PyResult>> { + let poll_start = Instant::now(); + let poll_result = traced_with_gil!(|py| -> PyResult>> { let ret = self.processing_step.call_method0(py, "poll")?; - Ok(ret.extract(py).unwrap()) + let out_messages: Vec> = ret.extract(py).unwrap(); + Ok(out_messages) }); - match out_messages { + match poll_result { Ok(out_messages) => { + let message_count = out_messages.len(); + let poll_duration_ms = poll_start.elapsed().as_secs_f64() * 1000.0; + let handle_start = Instant::now(); traced_with_gil!(|py| { self.handle_py_return_value(py, out_messages); }); + let handle_duration_ms = handle_start.elapsed().as_secs_f64() * 1000.0; + if message_count > 0 { + metrics::gauge!(METRIC_PYTHON_ADAPTER_POLL_DURATION_MS, &self.step_labels) + .set(poll_duration_ms); + metrics::gauge!( + METRIC_PYTHON_ADAPTER_POLL_HANDLE_DURATION_MS, + &self.step_labels + ) + .set(handle_duration_ms); + log::debug!( + "PythonAdapter poll returned messages. poll_duration_ms: {:?}, handle_duration_ms: {:?}, message_count: {:?}", + poll_duration_ms, + handle_duration_ms, + message_count + ); + } while let Some(msg) = self.transformed_messages.pop_front() { + let pending_len = self.transformed_messages.len(); let commit_request = self.next_strategy.poll()?; self.commit_request_carried_over = merge_commit_request( self.commit_request_carried_over.take(), commit_request, ); - match self.next_strategy.submit(msg) { + let submit_start = Instant::now(); + let submit_result = self.next_strategy.submit(msg); + let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; + metrics::gauge!( + METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS, + &self.step_labels + ) + .set(submit_duration_ms); + let submit_outcome = match &submit_result { + Ok(()) => "ok", + Err(SubmitError::MessageRejected(_)) => "message_rejected", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + }; + log::debug!( + "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", + submit_duration_ms, + pending_len, + submit_outcome + ); + match submit_result { Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected { message: transformed_message,