diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 51a0a071..15399b01 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -303,14 +303,18 @@ impl TableLookup { let lookup_row_type = row_type.project_with_field_names(primary_keys)?; let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec(); - let primary_key_encoder = - KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?; + let primary_key_encoder = KeyEncoderFactory::of_primary_key( + &lookup_row_type, + &physical_primary_keys, + self.table_info.get_table_config(), + self.table_info.is_default_bucket_key(), + )?; let bucket_key_encoder = if self.table_info.is_default_bucket_key() { None } else { let bucket_keys = self.table_info.get_bucket_keys().to_vec(); - Some(KeyEncoderFactory::of( + Some(KeyEncoderFactory::of_bucket_key( &lookup_row_type, &bucket_keys, &data_lake_format, @@ -452,7 +456,7 @@ impl TablePrefixLookup { let bucket_keys = self.table_info.get_bucket_keys().to_vec(); let prefix_key_encoder = - KeyEncoderFactory::of(&lookup_row_type, &bucket_keys, &data_lake_format)?; + KeyEncoderFactory::of_bucket_key(&lookup_row_type, &bucket_keys, &data_lake_format)?; let partition_getter = if self.table_info.is_partitioned() { Some(PartitionGetter::new( diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index 52ec37b3..74b661d6 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -143,9 +143,14 @@ impl UpsertWriterFactory { &partial_update_columns, )?; - let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?; + let primary_key_encoder = KeyEncoderFactory::of_primary_key( + row_type, + physical_pks, + table_info.get_table_config(), + table_info.is_default_bucket_key(), + )?; let bucket_key_encoder = if !table_info.is_default_bucket_key() { - Some(KeyEncoderFactory::of( + Some(KeyEncoderFactory::of_bucket_key( row_type, table_info.get_bucket_keys(), data_lake_format, diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 390bdbfc..6564d12d 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -905,6 +905,42 @@ impl KvFormat { } } +/// KV layout version from `table.kv.format-version` (Java `ConfigOptions.TABLE_KV_FORMAT_VERSION`). +/// +/// Unset or unparseable values resolve to [`KvFormatVersion::V1`], matching Java +/// `getKvFormatVersion().orElse(1)`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +pub enum KvFormatVersion { + #[default] + V1, + V2, +} + +impl KvFormatVersion { + /// Parse the `table.kv.format-version` property value. + /// + /// Whitespace-only and non-numeric garbage fall back to [`Default::default`] ([`V1`](Self::V1)). + /// Only `1` and `2` are recognized as explicit versions; other integers also fall back to `V1`. + pub fn parse_property(raw: Option<&str>) -> Self { + let Some(s) = raw.map(str::trim).filter(|s| !s.is_empty()) else { + return Self::default(); + }; + match s { + "1" => Self::V1, + "2" => Self::V2, + _ => s + .parse::() + .ok() + .and_then(|n| match n { + 1 => Some(Self::V1), + 2 => Some(Self::V2), + _ => None, + }) + .unwrap_or_default(), + } + } +} + #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] pub struct TablePath { database: String, @@ -1172,6 +1208,17 @@ impl TableConfig { kv_format.parse().map_err(Into::into) } + /// KV layout version from `table.kv.format-version` (Java `ConfigOptions.TABLE_KV_FORMAT_VERSION`). + /// + /// When unset or invalid, returns [`KvFormatVersion::V1`] (same as Java `getKvFormatVersion().orElse(1)`). + pub fn get_kv_format_version(&self) -> KvFormatVersion { + KvFormatVersion::parse_property( + self.properties + .get("table.kv.format-version") + .map(String::as_str), + ) + } + pub fn get_log_format(&self) -> Result { // TODO: Consolidate configurations logic, constants, defaults in a single place const DEFAULT_LOG_FORMAT: &str = "ARROW"; @@ -1643,4 +1690,46 @@ mod tests { ); assert!(table_info.is_auto_partitioned()); } + + #[test] + fn kv_format_version_parse_property_defaults() { + assert_eq!(KvFormatVersion::parse_property(None), KvFormatVersion::V1); + assert_eq!( + KvFormatVersion::parse_property(Some("")), + KvFormatVersion::V1 + ); + assert_eq!( + KvFormatVersion::parse_property(Some(" ")), + KvFormatVersion::V1 + ); + assert_eq!( + KvFormatVersion::parse_property(Some("1")), + KvFormatVersion::V1 + ); + assert_eq!( + KvFormatVersion::parse_property(Some(" 2 ")), + KvFormatVersion::V2 + ); + assert_eq!( + KvFormatVersion::parse_property(Some("99")), + KvFormatVersion::V1 + ); + assert_eq!( + KvFormatVersion::parse_property(Some("nope")), + KvFormatVersion::V1 + ); + } + + #[test] + fn table_config_get_kv_format_version() { + let empty = TableConfig::from_properties(HashMap::new()); + assert_eq!(empty.get_kv_format_version(), KvFormatVersion::V1); + + let mut m = HashMap::new(); + m.insert("table.kv.format-version".to_string(), "2".to_string()); + assert_eq!( + TableConfig::from_properties(m).get_kv_format_version(), + KvFormatVersion::V2 + ); + } } diff --git a/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs b/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs index b0e8434d..d0100581 100644 --- a/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs +++ b/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs @@ -60,11 +60,6 @@ impl IcebergBinaryRowWriter { } } - // Dependency order note: - // 1) Keep this PR scoped to writer-level Java parity. - // 2) Wire the writer through IcebergKeyEncoder in follow-up #308. - // TODO(#308): add end-to-end key-encoding tests via IcebergKeyEncoder - // (similar to CompactedKeyEncoder tests for CompactedKeyWriter). pub fn create_value_writer(field_type: &DataType) -> Result { match field_type { // Match Java IcebergBinaryRowWriter.createFieldWriter() supported types exactly. diff --git a/crates/fluss/src/row/encode/iceberg_key_encoder.rs b/crates/fluss/src/row/encode/iceberg_key_encoder.rs new file mode 100644 index 00000000..7e65661a --- /dev/null +++ b/crates/fluss/src/row/encode/iceberg_key_encoder.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Primary key encoder for Iceberg lake tiering (single key column). +//! +//! Reference: `org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder` + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::RowType; +use crate::row::binary::{BinaryWriter, IcebergBinaryRowWriter, ValueWriter}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_encoder: ValueWriter, + writer: IcebergBinaryRowWriter, +} + +impl IcebergKeyEncoder { + /// Create an Iceberg-format key encoder for the given primary key field names. + /// + /// Iceberg tiering requires exactly one primary key column (FIP / Java `checkArgument`). + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key_name = &keys[0]; + let key_index = match row_type.get_field_index(key_name) { + Some(idx) => idx, + None => { + return Err(IllegalArgument { + message: format!("Field {key_name:?} not found in input row type {row_type:?}"), + }); + } + }; + + let data_type = row_type.fields().get(key_index).unwrap().data_type(); + let field_encoder = IcebergBinaryRowWriter::create_value_writer(data_type)?; + let field_getter = FieldGetter::create(data_type, key_index); + + Ok(IcebergKeyEncoder { + field_getter, + field_encoder, + writer: IcebergBinaryRowWriter::new(), + }) + } +} + +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result { + self.writer.reset(); + match self.field_getter.get_field(row)? { + Datum::Null => Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }), + value => { + self.field_encoder + .write_value(&mut self.writer, 0, &value)?; + self.writer.complete(); + Ok(self.writer.to_bytes()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{DataType, DataTypes, TimestampType}; + use crate::row::datum::{Time, TimestampNtz}; + use crate::row::{Decimal, GenericRow}; + use bigdecimal::{BigDecimal, num_bigint::BigInt}; + + #[test] + fn single_key_field_requirement() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "name"], + ); + + assert!(IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).is_ok()); + + let err = IcebergKeyEncoder::create_key_encoder( + &row_type, + &["id".to_string(), "name".to_string()], + ) + .err() + .expect("expected error for multiple keys"); + assert!( + err.to_string().contains("exactly one field"), + "unexpected: {err}" + ); + } + + #[test] + fn test_integer_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::int()]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let row = GenericRow::from_data(vec![Datum::from(42i32)]); + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (42i64).to_le_bytes()); + } + + #[test] + fn test_long_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::bigint()]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let v: i64 = 1234567890123456789; + let row = GenericRow::from_data(vec![Datum::from(v)]); + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), v.to_le_bytes()); + } + + #[test] + fn test_string_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::string()]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let s = "Hello Iceberg, Fluss this side!"; + let row = GenericRow::from_data(vec![Datum::from(s)]); + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), s.as_bytes()); + } + + #[test] + fn test_decimal_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::decimal(10, 2)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let dec = + Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 10, 2).unwrap(); + let row = GenericRow::from_data(vec![Datum::Decimal(dec)]); + let encoded = encoder.encode_key(&row).unwrap(); + let expected = BigDecimal::new(BigInt::from(12345), 2) + .into_bigint_and_exponent() + .0 + .to_signed_bytes_be(); + assert_eq!(encoded.as_ref(), expected.as_slice()); + } + + #[test] + fn test_timestamp_encoding() { + let row_type = RowType::with_data_types(vec![DataType::Timestamp( + TimestampType::with_nullable(false, 6).unwrap(), + )]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let millis = 1698235273182i64; + let nanos = 123000; + let micros = millis * 1000 + (nanos as i64 / 1000); + let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap(); + let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]); + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes()); + } + + #[test] + fn test_date_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::date()]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let date_value = 19655i32; + let row = GenericRow::from_data(vec![Datum::from(date_value)]); + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (date_value as i64).to_le_bytes()); + } + + #[test] + fn test_time_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::time()]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let time_millis = 34200000i32; + let row = GenericRow::from_data(vec![Datum::Time(Time::new(time_millis))]); + let encoded = encoder.encode_key(&row).unwrap(); + let micros = (time_millis as i64) * 1000; + assert_eq!(encoded.as_ref(), micros.to_le_bytes()); + } + + #[test] + fn test_binary_encoding() { + let row_type = RowType::with_data_types(vec![DataTypes::bytes()]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap(); + let data: &[u8] = b"Hello i only understand binary data"; + let row = GenericRow::from_data(vec![Datum::from(data)]); + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), data); + } +} diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 16a540eb..325a4387 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -17,14 +17,17 @@ mod compacted_key_encoder; mod compacted_row_encoder; +mod iceberg_key_encoder; use crate::error::{Error, Result}; -use crate::metadata::{DataLakeFormat, KvFormat, RowType}; +use crate::metadata::{DataLakeFormat, KvFormat, KvFormatVersion, RowType, TableConfig}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; use crate::row::{Datum, InternalRow}; use bytes::Bytes; +pub use iceberg_key_encoder::IcebergKeyEncoder; + /// An interface for encoding key of row into bytes. #[allow(dead_code)] pub trait KeyEncoder: Send + Sync { @@ -34,15 +37,7 @@ pub trait KeyEncoder: Send + Sync { pub struct KeyEncoderFactory; impl KeyEncoderFactory { - /// Create a key encoder to encode the key bytes of the input row. - /// # Arguments - /// * `row_type` - the row type of the input row - /// * `key_fields` - the key fields to encode - /// * `lake_format` - the data lake format - /// - /// # Returns - /// key encoder - pub fn of( + fn of( row_type: &RowType, key_fields: &[String], data_lake_format: &Option, @@ -54,14 +49,45 @@ impl KeyEncoderFactory { Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( row_type, key_fields, )?)), - Some(DataLakeFormat::Iceberg) => Err(Error::UnsupportedOperation { - message: "KeyEncoder for Iceberg format is not yet implemented".to_string(), - }), + Some(DataLakeFormat::Iceberg) => Ok(Box::new(IcebergKeyEncoder::create_key_encoder( + row_type, key_fields, + )?)), None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( row_type, key_fields, )?)), } } + + pub fn of_bucket_key( + row_type: &RowType, + key_fields: &[String], + data_lake_format: &Option, + ) -> Result> { + Self::of(row_type, key_fields, data_lake_format) + } + + /// When `table.kv.format-version` is **2** and the table uses a **custom** bucket key + /// (`is_default_bucket_key == false`), the primary key is encoded with + /// [`CompactedKeyEncoder`] so prefix lookups work, even for Iceberg-tiered tables. + /// Otherwise encoding follows [`Self::of`] (v1 tables, or v2 with default bucket key). + pub fn of_primary_key( + row_type: &RowType, + key_fields: &[String], + table_config: &TableConfig, + is_default_bucket_key: bool, + ) -> Result> { + let kv_version = table_config.get_kv_format_version(); + let data_lake_format = &table_config.get_datalake_format()?; + + match (kv_version, is_default_bucket_key) { + (KvFormatVersion::V1, _) | (KvFormatVersion::V2, true) => { + Self::of(row_type, key_fields, data_lake_format) + } + (KvFormatVersion::V2, false) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( + row_type, key_fields, + )?)), + } + } } /// An encoder to write binary row data. It's used to write rows @@ -125,3 +151,66 @@ impl RowEncoderFactory { } } } + +#[cfg(test)] +mod key_encoder_factory_tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::{Datum, GenericRow}; + use std::collections::HashMap; + + fn table_config(pairs: &[(&str, &str)]) -> TableConfig { + TableConfig::from_properties( + pairs + .iter() + .map(|(k, v)| ((*k).to_string(), (*v).to_string())) + .collect::>(), + ) + } + + #[test] + fn primary_key_kv2_default_bucket_iceberg_uses_iceberg_encoding() { + let cfg = table_config(&[ + ("table.kv.format-version", "2"), + ("table.datalake.format", "iceberg"), + ]); + let row_type = RowType::with_data_types(vec![DataTypes::int()]); + let mut enc = + KeyEncoderFactory::of_primary_key(&row_type, &["f0".to_string()], &cfg, true).unwrap(); + let row = GenericRow::from_data(vec![Datum::from(7i32)]); + let bytes = enc.encode_key(&row).unwrap(); + assert_eq!(bytes.as_ref(), (7i64).to_le_bytes()); + } + + #[test] + fn primary_key_kv2_custom_bucket_uses_compacted_even_with_iceberg() { + let cfg = table_config(&[ + ("table.kv.format-version", "2"), + ("table.datalake.format", "iceberg"), + ]); + let row_type = RowType::with_data_types(vec![DataTypes::int(), DataTypes::string()]); + let mut enc = KeyEncoderFactory::of_primary_key( + &row_type, + &["f0".to_string(), "f1".to_string()], + &cfg, + false, + ) + .unwrap(); + let row = GenericRow::from_data(vec![Datum::from(1i32), Datum::from("x")]); + let bytes = enc.encode_key(&row).unwrap(); + assert_ne!(bytes.as_ref(), (1i64).to_le_bytes()); + } + + #[test] + fn primary_key_invalid_kv_format_version_defaults_to_v1() { + let cfg = table_config(&[ + ("table.kv.format-version", "99"), + ("table.datalake.format", "iceberg"), + ]); + let row_type = RowType::with_data_types(vec![DataTypes::int()]); + let mut enc = + KeyEncoderFactory::of_primary_key(&row_type, &["f0".to_string()], &cfg, false).unwrap(); + let row = GenericRow::from_data(vec![Datum::from(7i32)]); + assert_eq!(enc.encode_key(&row).unwrap().as_ref(), (7i64).to_le_bytes()); + } +} diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 36f9a1c2..52744459 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -38,7 +38,7 @@ pub use column::*; pub use compacted::CompactedRow; pub use datum::*; pub use decimal::{Decimal, MAX_COMPACT_PRECISION}; -pub use encode::KeyEncoder; +pub use encode::{IcebergKeyEncoder, KeyEncoder}; pub(crate) use fixed_schema_decoder::FixedSchemaDecoder; pub use lookup_row::LookupRow; pub(crate) use projected_row::ProjectedRow;