Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,33 +94,37 @@ impl EventFormat for Event {
})?
};

// Detect schema conflicts using raw inferred schema vs existing stream schema
// Pass the actual values and schema_version to check if values can be coerced to existing types
let conflicts = detect_schema_conflicts(
&raw_inferred_schema,
stream_schema,
&value_arr,
schema_version,
);

// If there are conflicts, rename the fields in JSON values
let value_arr = if !conflicts.is_empty() {
rename_conflicting_fields_in_json(value_arr, &conflicts)
} else {
let value_arr = if static_schema_flag {
value_arr
};
} else {
// Detect schema conflicts using raw inferred schema vs existing stream schema
// Pass the actual values and schema_version to check if values can be coerced to existing types
let conflicts = detect_schema_conflicts(
&raw_inferred_schema,
stream_schema,
&value_arr,
schema_version,
);

// Per-record fallback: catches batches with mixed JSON types for the
// same field, which the batch-level detect_schema_conflicts misses
// because arrow's inference picks one winning type (string over bool).
// Internally short-circuits when this can't apply (single-record
// batches, or no field-name collision at the same type).
let value_arr = super::rename_per_record_type_mismatches(
value_arr,
&raw_inferred_schema,
stream_schema,
schema_version,
);
// If there are conflicts, rename the fields in JSON values
let value_arr = if !conflicts.is_empty() {
rename_conflicting_fields_in_json(value_arr, &conflicts)
} else {
value_arr
};

// Per-record fallback: catches batches with mixed JSON types for the
// same field, which the batch-level detect_schema_conflicts misses
// because arrow's inference picks one winning type (string over bool).
// Internally short-circuits when this can't apply (single-record
// batches, or no field-name collision at the same type).
super::rename_per_record_type_mismatches(
value_arr,
&raw_inferred_schema,
stream_schema,
schema_version,
)
};

// collect all the keys from all the json objects in the request body
let fields =
Expand Down
71 changes: 43 additions & 28 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,21 +584,22 @@ pub fn rename_per_record_type_mismatches(
existing_schema: &HashMap<String, Arc<Field>>,
schema_version: SchemaVersion,
) -> Vec<Value> {
if values.len() <= 1 || existing_schema.is_empty() {
return values;
}
// Bail out unless at least one inferred field collides with storage at
// the same type. Without that, arrow's inference can't have hidden a
// mixed-type batch behind a matching aggregate type.
let needs_check = inferred_schema.fields().iter().any(|f| {
existing_schema
.get(f.name())
.is_some_and(|s| s.data_type() == f.data_type())
});
if !needs_check {
if values.len() <= 1 {
return values;
}

// Lookup of inferred field types — used as the reference type when a field
// isn't yet in storage (e.g. first batch for the stream, or a new column).
// Without this, mixed-type batches for new fields slip through: arrow picks
// a single aggregate type (Utf8 wins over Bool, etc.), the batch-level
// conflict check sees nothing to compare against in empty storage, and
// records carrying the loser type later fail `fields_mismatch`.
let inferred_types: HashMap<&str, &DataType> = inferred_schema
.fields()
.iter()
.map(|f| (f.name().as_str(), f.data_type()))
.collect();

values
.into_iter()
.map(|value| {
Expand All @@ -611,13 +612,31 @@ pub fn rename_per_record_type_mismatches(
if val.is_null() {
return (key, val);
}
let Some(existing_field) = existing_schema.get(&key) else {
// Prefer storage's declared type; fall back to the inferred
// type so within-batch mismatches on new fields are caught.
let target_type = existing_schema
.get(&key)
.map(|f| f.data_type())
.or_else(|| inferred_types.get(key.as_str()).copied());
let Some(target_type) = target_type else {
return (key, val);
};
if value_compatible_with_type(&val, existing_field.data_type(), schema_version)
// When the resolved target is a structural arrow type
// (list/struct/map), arrow validates conformance at decode
// time and value_compatible_with_type can't reliably judge
// arrays/objects — skip to avoid spurious renames of valid
// nested values. Scalars still flow through the check, so
// an array landing on e.g. a Utf8 column is still routed
// to a typed sibling rather than crashing decode.
if (val.is_array() || val.is_object())
&& (target_type.is_list()
|| matches!(target_type, DataType::Struct(_) | DataType::Map(_, _)))
{
return (key, val);
}
if value_compatible_with_type(&val, target_type, schema_version) {
return (key, val);
}
let suffix = get_datatype_suffix(&datatype_for_value(&val));
let new_key = format!("{key}_{suffix}");
(new_key, val)
Expand Down Expand Up @@ -1162,26 +1181,22 @@ mod tests {
}

#[test]
fn rename_per_record_short_circuits_when_no_field_overlap_at_same_type() {
// No inferred field shares both name AND type with storage —
// arrow can't have absorbed a mixed-type batch, so we skip the loop.
let mut storage: HashMap<String, Arc<Field>> = HashMap::new();
storage.insert(
"escaped".to_string(),
Arc::new(Field::new("escaped", DataType::Utf8, true)),
);
// Inferred has a DIFFERENT type for the shared field — handled by
// detect_schema_conflicts as a batch-level rename, not per-record.
let inferred = Schema::new(vec![Field::new("escaped", DataType::Boolean, true)]);
fn rename_per_record_renames_against_inferred_when_field_absent_from_storage() {
// First-batch case: storage is empty, so the reference type for each
// field comes from arrow's batch-level inference. With mixed bool+string
// for `escaped`, arrow picks Utf8 (string wins), and the bool record
// must be rewritten so it routes to a typed sibling column rather than
// failing fields_mismatch later.
let storage: HashMap<String, Arc<Field>> = HashMap::new();
let inferred = Schema::new(vec![Field::new("escaped", DataType::Utf8, true)]);
let renamed = rename_per_record_type_mismatches(
vec![json!({"escaped": true}), json!({"escaped": false})],
vec![json!({"escaped": "true"}), json!({"escaped": false})],
&inferred,
&storage,
SchemaVersion::V1,
);
// Per-record loop skipped; values pass through unchanged.
assert!(renamed[0].as_object().unwrap().contains_key("escaped"));
assert!(renamed[1].as_object().unwrap().contains_key("escaped"));
assert!(renamed[1].as_object().unwrap().contains_key("escaped_bool"));
}

#[test]
Expand Down
10 changes: 2 additions & 8 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ use crate::{
handlers::{
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
INFER_TIMESTAMP_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
UPDATE_STREAM_KEY, parse_dataset_labels, parse_dataset_tags,
TELEMETRY_TYPE_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};

#[derive(Debug)]
pub struct PutStreamHeaders {
pub time_partition: String,
pub time_partition_limit: String,
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
Expand All @@ -49,7 +48,6 @@ pub struct PutStreamHeaders {
impl Default for PutStreamHeaders {
fn default() -> Self {
Self {
time_partition: String::default(),
time_partition_limit: String::default(),
custom_partition: None,
static_schema_flag: false,
Expand Down Expand Up @@ -80,10 +78,6 @@ impl From<&HeaderMap> for PutStreamHeaders {
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok());
PutStreamHeaders {
time_partition: headers
.get(TIME_PARTITION_KEY)
.map_or("", |v| v.to_str().unwrap())
.to_string(),
time_partition_limit: headers
.get(TIME_PARTITION_LIMIT_KEY)
.map_or("", |v| v.to_str().unwrap())
Expand Down
21 changes: 2 additions & 19 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,6 @@ impl Parseable {
tenant_id: &Option<String>,
) -> Result<HeaderMap, StreamError> {
let PutStreamHeaders {
time_partition,
time_partition_limit,
custom_partition,
static_schema_flag,
Expand Down Expand Up @@ -763,7 +762,6 @@ impl Parseable {
.update_stream(
headers,
stream_name,
&time_partition,
static_schema_flag,
&time_partition_limit,
custom_partition.as_ref(),
Expand All @@ -782,25 +780,18 @@ impl Parseable {
validate_custom_partition(custom_partition)?;
}

if !time_partition.is_empty() && custom_partition.is_some() {
return Err(StreamError::Custom {
msg: "Cannot set both time partition and custom partition".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
"",
custom_partition.as_ref(),
static_schema_flag,
)?;

let log_source_entry = LogSourceEntry::new(log_source, HashSet::new());
self.create_stream(
stream_name.to_string(),
&time_partition,
"",
time_partition_in_days,
custom_partition.as_ref(),
static_schema_flag,
Expand All @@ -818,12 +809,10 @@ impl Parseable {
Ok(headers.clone())
}

#[allow(clippy::too_many_arguments)]
async fn update_stream(
&self,
headers: &HeaderMap,
stream_name: &str,
time_partition: &str,
static_schema_flag: bool,
time_partition_limit: &str,
custom_partition: Option<&String>,
Expand All @@ -832,12 +821,6 @@ impl Parseable {
if !self.streams.contains(stream_name, tenant_id) {
return Err(StreamNotFound(stream_name.to_string()).into());
}
if !time_partition.is_empty() {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
if static_schema_flag {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
Expand Down
Loading