diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 2f3fe952..28e51e20 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size"; const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms"; +const METRIC_BATCH_SUBMIT_DURATION_MS: &str = "streams.pipeline.batch.submit_duration_ms"; fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -217,6 +218,7 @@ pub struct BatchStep { /// This helps us capping the size of the pending queue during prolonged backpressure periods. pending_batch: bool, commit_request_carried_over: Option, + step_labels: Vec<(String, String)>, } impl BatchStep { @@ -227,6 +229,7 @@ impl BatchStep { step_name: String, next_step: Box>, ) -> Self { + let step_labels = vec![("step".to_string(), step_name.clone())]; Self { next_step, route, @@ -238,6 +241,7 @@ impl BatchStep { outbound: VecDeque::new(), pending_batch: false, commit_request_carried_over: None, + step_labels, } } @@ -249,10 +253,28 @@ impl BatchStep { /// on going work. fn drain_outbound(&mut self) -> Result<(), StrategyError> { while let Some(msg) = self.outbound.pop_front() { + let outbound_len = self.outbound.len(); let c = self.next_step.poll()?; self.commit_request_carried_over = merge_commit_request(self.commit_request_carried_over.take(), c); - match self.next_step.submit(msg) { + let submit_start = Instant::now(); + let submit_result = self.next_step.submit(msg); + let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; + metrics::gauge!(METRIC_BATCH_SUBMIT_DURATION_MS, &self.step_labels) + .set(submit_duration_ms); + let submit_outcome = match &submit_result { + Ok(()) => "ok", + Err(SubmitError::MessageRejected(_)) => "message_rejected", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + }; + log::debug!( + "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}", + self.step_name, + submit_duration_ms, + outbound_len, + submit_outcome + ); + match submit_result { Ok(()) => { if self.pending_batch { self.pending_batch = false; @@ -293,9 +315,15 @@ impl BatchStep { let flush_start = Instant::now(); let batch_msg = b.flush()?; get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64()); - let step_labels = vec![("step".to_string(), self.step_name.clone())]; - metrics::histogram!(METRIC_BATCH_SIZE, &step_labels).record(batch_elements); - metrics::histogram!(METRIC_BATCH_TIME_MS, &step_labels).record(batch_open_ms); + metrics::histogram!(METRIC_BATCH_SIZE, &self.step_labels).record(batch_elements); + metrics::histogram!(METRIC_BATCH_TIME_MS, &self.step_labels).record(batch_open_ms); + log::info!( + "Batch flushed. step: {:?}, batch_elements: {:?}, batch_open_ms: {:?} created_at: {:?}", + self.step_name, + batch_elements, + batch_open_ms, + b.created_at + ); self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index d7d094f6..b0a2de23 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -15,7 +15,16 @@ use sentry_arroyo::processing::strategies::{merge_commit_request, CommitRequest, use sentry_arroyo::types::{Message, Partition, Topic}; use sentry_arroyo::utils::timing::Deadline; use std::collections::VecDeque; -use std::time::Duration; +use std::time::{Duration, Instant}; + +const METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS: &str = + "streams.pipeline.python_adapter.submit_duration_ms"; +const METRIC_PYTHON_ADAPTER_POLL_DURATION_MS: &str = + "streams.pipeline.python_adapter.poll_duration_ms"; +const METRIC_PYTHON_ADAPTER_POLL_HANDLE_DURATION_MS: &str = + "streams.pipeline.python_adapter.poll_handle_duration_ms"; +const METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS: &str = + "streams.pipeline.python_adapter.next_step_submit_duration_ms"; import_exception!(arroyo.processing.strategies, MessageRejected); import_exception!(arroyo.dlq, InvalidMessage); @@ -38,6 +47,7 @@ pub struct PythonAdapter { // TODO: Add a mutex here next_strategy: Box>, commit_request_carried_over: Option, + step_labels: Vec<(String, String)>, } impl PythonAdapter { @@ -48,6 +58,7 @@ impl PythonAdapter { ) -> Self { traced_with_gil!(|py| { let processing_step = delegate_factory.call_method0(py, "build").unwrap(); + let step_labels = vec![("route".to_string(), format!("{:?}", route))]; Self { route, @@ -55,6 +66,7 @@ impl PythonAdapter { next_strategy, transformed_messages: VecDeque::new(), commit_request_carried_over: None, + step_labels, } }) } @@ -137,9 +149,17 @@ impl ProcessingStrategy for PythonAdapter { let py_committable = convert_committable_to_py(py, committable) .expect("Unable to retrieve commitable from message"); + let submit_start = Instant::now(); let res = self.processing_step .call_method1(py, "submit", (python_payload, py_committable)); + let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; + metrics::gauge!(METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS, &self.step_labels) + .set(submit_duration_ms); + log::debug!( + "PythonAdapter submit. duration_ms: {:?}", + submit_duration_ms + ); let Err(py_err) = res else { return Ok(()); @@ -176,23 +196,64 @@ impl ProcessingStrategy for PythonAdapter { /// /// This is the method that sends messages to the next ProcessingStrategy. fn poll(&mut self) -> Result, StrategyError> { - let out_messages = traced_with_gil!(|py| -> PyResult>> { + let poll_start = Instant::now(); + let poll_result = traced_with_gil!(|py| -> PyResult>> { let ret = self.processing_step.call_method0(py, "poll")?; - Ok(ret.extract(py).unwrap()) + let out_messages: Vec> = ret.extract(py).unwrap(); + Ok(out_messages) }); - match out_messages { + match poll_result { Ok(out_messages) => { + let message_count = out_messages.len(); + let poll_duration_ms = poll_start.elapsed().as_secs_f64() * 1000.0; + let handle_start = Instant::now(); traced_with_gil!(|py| { self.handle_py_return_value(py, out_messages); }); + let handle_duration_ms = handle_start.elapsed().as_secs_f64() * 1000.0; + if message_count > 0 { + metrics::gauge!(METRIC_PYTHON_ADAPTER_POLL_DURATION_MS, &self.step_labels) + .set(poll_duration_ms); + metrics::gauge!( + METRIC_PYTHON_ADAPTER_POLL_HANDLE_DURATION_MS, + &self.step_labels + ) + .set(handle_duration_ms); + log::debug!( + "PythonAdapter poll returned messages. poll_duration_ms: {:?}, handle_duration_ms: {:?}, message_count: {:?}", + poll_duration_ms, + handle_duration_ms, + message_count + ); + } while let Some(msg) = self.transformed_messages.pop_front() { + let pending_len = self.transformed_messages.len(); let commit_request = self.next_strategy.poll()?; self.commit_request_carried_over = merge_commit_request( self.commit_request_carried_over.take(), commit_request, ); - match self.next_strategy.submit(msg) { + let submit_start = Instant::now(); + let submit_result = self.next_strategy.submit(msg); + let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; + metrics::gauge!( + METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS, + &self.step_labels + ) + .set(submit_duration_ms); + let submit_outcome = match &submit_result { + Ok(()) => "ok", + Err(SubmitError::MessageRejected(_)) => "message_rejected", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + }; + log::debug!( + "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", + submit_duration_ms, + pending_len, + submit_outcome + ); + match submit_result { Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected { message: transformed_message,