Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions sentry_streams/src/batch_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size";
const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms";
const METRIC_BATCH_SUBMIT_DURATION_MS: &str = "streams.pipeline.batch.submit_duration_ms";

fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option<String> {
match first {
Expand Down Expand Up @@ -217,6 +218,7 @@ pub struct BatchStep {
/// This helps us capping the size of the pending queue during prolonged backpressure periods.
pending_batch: bool,
commit_request_carried_over: Option<CommitRequest>,
step_labels: Vec<(String, String)>,
}

impl BatchStep {
Expand All @@ -227,6 +229,7 @@ impl BatchStep {
step_name: String,
next_step: Box<dyn ProcessingStrategy<RoutedValue>>,
) -> Self {
let step_labels = vec![("step".to_string(), step_name.clone())];
Self {
next_step,
route,
Expand All @@ -238,6 +241,7 @@ impl BatchStep {
outbound: VecDeque::new(),
pending_batch: false,
commit_request_carried_over: None,
step_labels,
}
}

Expand All @@ -249,10 +253,28 @@ impl BatchStep {
/// on going work.
fn drain_outbound(&mut self) -> Result<(), StrategyError> {
while let Some(msg) = self.outbound.pop_front() {
let outbound_len = self.outbound.len();
let c = self.next_step.poll()?;
self.commit_request_carried_over =
merge_commit_request(self.commit_request_carried_over.take(), c);
match self.next_step.submit(msg) {
let submit_start = Instant::now();
let submit_result = self.next_step.submit(msg);
let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0;
metrics::gauge!(METRIC_BATCH_SUBMIT_DURATION_MS, &self.step_labels)
.set(submit_duration_ms);
let submit_outcome = match &submit_result {
Ok(()) => "ok",
Err(SubmitError::MessageRejected(_)) => "message_rejected",
Err(SubmitError::InvalidMessage(_)) => "invalid_message",
};
log::debug!(
"BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}",
self.step_name,
submit_duration_ms,
outbound_len,
submit_outcome
);
match submit_result {
Ok(()) => {
if self.pending_batch {
self.pending_batch = false;
Expand Down Expand Up @@ -293,9 +315,15 @@ impl BatchStep {
let flush_start = Instant::now();
let batch_msg = b.flush()?;
get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64());
let step_labels = vec![("step".to_string(), self.step_name.clone())];
metrics::histogram!(METRIC_BATCH_SIZE, &step_labels).record(batch_elements);
metrics::histogram!(METRIC_BATCH_TIME_MS, &step_labels).record(batch_open_ms);
metrics::histogram!(METRIC_BATCH_SIZE, &self.step_labels).record(batch_elements);
metrics::histogram!(METRIC_BATCH_TIME_MS, &self.step_labels).record(batch_open_ms);
log::info!(
"Batch flushed. step: {:?}, batch_elements: {:?}, batch_open_ms: {:?} created_at: {:?}",
self.step_name,
batch_elements,
batch_open_ms,
b.created_at
);
self.batch = None;
let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer);

Expand Down
71 changes: 66 additions & 5 deletions sentry_streams/src/python_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ use sentry_arroyo::processing::strategies::{merge_commit_request, CommitRequest,
use sentry_arroyo::types::{Message, Partition, Topic};
use sentry_arroyo::utils::timing::Deadline;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::{Duration, Instant};

const METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS: &str =
"streams.pipeline.python_adapter.submit_duration_ms";
const METRIC_PYTHON_ADAPTER_POLL_DURATION_MS: &str =
"streams.pipeline.python_adapter.poll_duration_ms";
const METRIC_PYTHON_ADAPTER_POLL_HANDLE_DURATION_MS: &str =
"streams.pipeline.python_adapter.poll_handle_duration_ms";
const METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS: &str =
"streams.pipeline.python_adapter.next_step_submit_duration_ms";

import_exception!(arroyo.processing.strategies, MessageRejected);
import_exception!(arroyo.dlq, InvalidMessage);
Expand All @@ -38,6 +47,7 @@ pub struct PythonAdapter {
// TODO: Add a mutex here
next_strategy: Box<dyn ProcessingStrategy<RoutedValue>>,
commit_request_carried_over: Option<CommitRequest>,
step_labels: Vec<(String, String)>,
}

impl PythonAdapter {
Expand All @@ -48,13 +58,15 @@ impl PythonAdapter {
) -> Self {
traced_with_gil!(|py| {
let processing_step = delegate_factory.call_method0(py, "build").unwrap();
let step_labels = vec![("route".to_string(), format!("{:?}", route))];

Self {
route,
processing_step,
next_strategy,
transformed_messages: VecDeque::new(),
commit_request_carried_over: None,
step_labels,
}
})
}
Expand Down Expand Up @@ -137,9 +149,17 @@ impl ProcessingStrategy<RoutedValue> for PythonAdapter {
let py_committable = convert_committable_to_py(py, committable)
.expect("Unable to retrieve commitable from message");

let submit_start = Instant::now();
let res =
self.processing_step
.call_method1(py, "submit", (python_payload, py_committable));
let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0;
metrics::gauge!(METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS, &self.step_labels)
.set(submit_duration_ms);
log::debug!(
"PythonAdapter submit. duration_ms: {:?}",
submit_duration_ms
);

let Err(py_err) = res else {
return Ok(());
Expand Down Expand Up @@ -176,23 +196,64 @@ impl ProcessingStrategy<RoutedValue> for PythonAdapter {
///
/// This is the method that sends messages to the next ProcessingStrategy.
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
let out_messages = traced_with_gil!(|py| -> PyResult<Vec<Py<PyAny>>> {
let poll_start = Instant::now();
let poll_result = traced_with_gil!(|py| -> PyResult<Vec<Py<PyAny>>> {
let ret = self.processing_step.call_method0(py, "poll")?;
Ok(ret.extract(py).unwrap())
let out_messages: Vec<Py<PyAny>> = ret.extract(py).unwrap();
Ok(out_messages)
});

match out_messages {
match poll_result {
Ok(out_messages) => {
let message_count = out_messages.len();
let poll_duration_ms = poll_start.elapsed().as_secs_f64() * 1000.0;
let handle_start = Instant::now();
traced_with_gil!(|py| {
self.handle_py_return_value(py, out_messages);
});
let handle_duration_ms = handle_start.elapsed().as_secs_f64() * 1000.0;
if message_count > 0 {
metrics::gauge!(METRIC_PYTHON_ADAPTER_POLL_DURATION_MS, &self.step_labels)
.set(poll_duration_ms);
metrics::gauge!(
METRIC_PYTHON_ADAPTER_POLL_HANDLE_DURATION_MS,
&self.step_labels
)
.set(handle_duration_ms);
log::debug!(
"PythonAdapter poll returned messages. poll_duration_ms: {:?}, handle_duration_ms: {:?}, message_count: {:?}",
poll_duration_ms,
handle_duration_ms,
message_count
);
}
while let Some(msg) = self.transformed_messages.pop_front() {
let pending_len = self.transformed_messages.len();
let commit_request = self.next_strategy.poll()?;
self.commit_request_carried_over = merge_commit_request(
self.commit_request_carried_over.take(),
commit_request,
);
match self.next_strategy.submit(msg) {
let submit_start = Instant::now();
let submit_result = self.next_strategy.submit(msg);
let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0;
metrics::gauge!(
METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS,
&self.step_labels
)
.set(submit_duration_ms);
let submit_outcome = match &submit_result {
Ok(()) => "ok",
Err(SubmitError::MessageRejected(_)) => "message_rejected",
Err(SubmitError::InvalidMessage(_)) => "invalid_message",
};
log::debug!(
"PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}",
submit_duration_ms,
pending_len,
submit_outcome
);
match submit_result {
Err(SubmitError::MessageRejected(
sentry_arroyo::processing::strategies::MessageRejected {
message: transformed_message,
Expand Down
Loading