Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions crates/fluss/src/client/table/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,18 @@ impl TableLookup {
let lookup_row_type = row_type.project_with_field_names(primary_keys)?;

let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec();
let primary_key_encoder =
KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?;
let primary_key_encoder = KeyEncoderFactory::of_primary_key(
&lookup_row_type,
&physical_primary_keys,
self.table_info.get_table_config(),
self.table_info.is_default_bucket_key(),
)?;

let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
None
} else {
let bucket_keys = self.table_info.get_bucket_keys().to_vec();
Some(KeyEncoderFactory::of(
Some(KeyEncoderFactory::of_bucket_key(
&lookup_row_type,
&bucket_keys,
&data_lake_format,
Expand Down Expand Up @@ -452,7 +456,7 @@ impl TablePrefixLookup {

let bucket_keys = self.table_info.get_bucket_keys().to_vec();
let prefix_key_encoder =
KeyEncoderFactory::of(&lookup_row_type, &bucket_keys, &data_lake_format)?;
KeyEncoderFactory::of_bucket_key(&lookup_row_type, &bucket_keys, &data_lake_format)?;

let partition_getter = if self.table_info.is_partitioned() {
Some(PartitionGetter::new(
Expand Down
9 changes: 7 additions & 2 deletions crates/fluss/src/client/table/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,14 @@ impl UpsertWriterFactory {
&partial_update_columns,
)?;

let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?;
let primary_key_encoder = KeyEncoderFactory::of_primary_key(
row_type,
physical_pks,
table_info.get_table_config(),
table_info.is_default_bucket_key(),
)?;
let bucket_key_encoder = if !table_info.is_default_bucket_key() {
Some(KeyEncoderFactory::of(
Some(KeyEncoderFactory::of_bucket_key(
row_type,
table_info.get_bucket_keys(),
data_lake_format,
Expand Down
89 changes: 89 additions & 0 deletions crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,42 @@ impl KvFormat {
}
}

/// KV layout version from `table.kv.format-version` (Java `ConfigOptions.TABLE_KV_FORMAT_VERSION`).
///
/// Unset or unparseable values resolve to [`KvFormatVersion::V1`], matching Java
/// `getKvFormatVersion().orElse(1)`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub enum KvFormatVersion {
#[default]
V1,
V2,
}

impl KvFormatVersion {
/// Parse the `table.kv.format-version` property value.
///
/// Whitespace-only and non-numeric garbage fall back to [`Default::default`] ([`V1`](Self::V1)).
/// Only `1` and `2` are recognized as explicit versions; other integers also fall back to `V1`.
pub fn parse_property(raw: Option<&str>) -> Self {
let Some(s) = raw.map(str::trim).filter(|s| !s.is_empty()) else {
return Self::default();
};
match s {
"1" => Self::V1,
"2" => Self::V2,
_ => s
.parse::<i32>()
.ok()
.and_then(|n| match n {
1 => Some(Self::V1),
2 => Some(Self::V2),
_ => None,
})
.unwrap_or_default(),
}
}
}

#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct TablePath {
database: String,
Expand Down Expand Up @@ -1172,6 +1208,17 @@ impl TableConfig {
kv_format.parse().map_err(Into::into)
}

/// KV layout version from `table.kv.format-version` (Java `ConfigOptions.TABLE_KV_FORMAT_VERSION`).
///
/// When unset or invalid, returns [`KvFormatVersion::V1`] (same as Java `getKvFormatVersion().orElse(1)`).
pub fn get_kv_format_version(&self) -> KvFormatVersion {
KvFormatVersion::parse_property(
self.properties
.get("table.kv.format-version")
.map(String::as_str),
)
}

pub fn get_log_format(&self) -> Result<LogFormat> {
// TODO: Consolidate configurations logic, constants, defaults in a single place
const DEFAULT_LOG_FORMAT: &str = "ARROW";
Expand Down Expand Up @@ -1643,4 +1690,46 @@ mod tests {
);
assert!(table_info.is_auto_partitioned());
}

#[test]
fn kv_format_version_parse_property_defaults() {
assert_eq!(KvFormatVersion::parse_property(None), KvFormatVersion::V1);
assert_eq!(
KvFormatVersion::parse_property(Some("")),
KvFormatVersion::V1
);
assert_eq!(
KvFormatVersion::parse_property(Some(" ")),
KvFormatVersion::V1
);
assert_eq!(
KvFormatVersion::parse_property(Some("1")),
KvFormatVersion::V1
);
assert_eq!(
KvFormatVersion::parse_property(Some(" 2 ")),
KvFormatVersion::V2
);
assert_eq!(
KvFormatVersion::parse_property(Some("99")),
KvFormatVersion::V1
);
assert_eq!(
KvFormatVersion::parse_property(Some("nope")),
KvFormatVersion::V1
);
}

#[test]
fn table_config_get_kv_format_version() {
let empty = TableConfig::from_properties(HashMap::new());
assert_eq!(empty.get_kv_format_version(), KvFormatVersion::V1);

let mut m = HashMap::new();
m.insert("table.kv.format-version".to_string(), "2".to_string());
assert_eq!(
TableConfig::from_properties(m).get_kv_format_version(),
KvFormatVersion::V2
);
}
}
5 changes: 0 additions & 5 deletions crates/fluss/src/row/binary/iceberg_binary_row_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ impl IcebergBinaryRowWriter {
}
}

// Dependency order note:
// 1) Keep this PR scoped to writer-level Java parity.
// 2) Wire the writer through IcebergKeyEncoder in follow-up #308.
// TODO(#308): add end-to-end key-encoding tests via IcebergKeyEncoder
// (similar to CompactedKeyEncoder tests for CompactedKeyWriter).
pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
match field_type {
// Match Java IcebergBinaryRowWriter.createFieldWriter() supported types exactly.
Expand Down
215 changes: 215 additions & 0 deletions crates/fluss/src/row/encode/iceberg_key_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Primary key encoder for Iceberg lake tiering (single key column).
//!
//! Reference: `org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder`

use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::RowType;
use crate::row::binary::{BinaryWriter, IcebergBinaryRowWriter, ValueWriter};
use crate::row::encode::KeyEncoder;
use crate::row::field_getter::FieldGetter;
use crate::row::{Datum, InternalRow};
use bytes::Bytes;

pub struct IcebergKeyEncoder {
field_getter: FieldGetter,
field_encoder: ValueWriter,
writer: IcebergBinaryRowWriter,
}

impl IcebergKeyEncoder {
/// Create an Iceberg-format key encoder for the given primary key field names.
///
/// Iceberg tiering requires exactly one primary key column (FIP / Java `checkArgument`).
pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<Self> {
if keys.len() != 1 {
return Err(IllegalArgument {
message: format!(
"Key fields must have exactly one field for iceberg format, but got: {keys:?}"
),
});
}

let key_name = &keys[0];
let key_index = match row_type.get_field_index(key_name) {
Some(idx) => idx,
None => {
return Err(IllegalArgument {
message: format!("Field {key_name:?} not found in input row type {row_type:?}"),
});
}
};

let data_type = row_type.fields().get(key_index).unwrap().data_type();
let field_encoder = IcebergBinaryRowWriter::create_value_writer(data_type)?;
let field_getter = FieldGetter::create(data_type, key_index);

Ok(IcebergKeyEncoder {
field_getter,
field_encoder,
writer: IcebergBinaryRowWriter::new(),
})
}
}

impl KeyEncoder for IcebergKeyEncoder {
fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> {
self.writer.reset();
match self.field_getter.get_field(row)? {
Datum::Null => Err(IllegalArgument {
message: "Cannot encode Iceberg key with null value".to_string(),
}),
value => {
self.field_encoder
.write_value(&mut self.writer, 0, &value)?;
self.writer.complete();
Ok(self.writer.to_bytes())
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::{DataType, DataTypes, TimestampType};
use crate::row::datum::{Time, TimestampNtz};
use crate::row::{Decimal, GenericRow};
use bigdecimal::{BigDecimal, num_bigint::BigInt};

#[test]
fn single_key_field_requirement() {
let row_type = RowType::with_data_types_and_field_names(
vec![DataTypes::int(), DataTypes::string()],
vec!["id", "name"],
);

assert!(IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).is_ok());

let err = IcebergKeyEncoder::create_key_encoder(
&row_type,
&["id".to_string(), "name".to_string()],
)
.err()
.expect("expected error for multiple keys");
assert!(
err.to_string().contains("exactly one field"),
"unexpected: {err}"
);
}

#[test]
fn test_integer_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::int()]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let row = GenericRow::from_data(vec![Datum::from(42i32)]);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), (42i64).to_le_bytes());
}

#[test]
fn test_long_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::bigint()]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let v: i64 = 1234567890123456789;
let row = GenericRow::from_data(vec![Datum::from(v)]);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), v.to_le_bytes());
}

#[test]
fn test_string_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::string()]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let s = "Hello Iceberg, Fluss this side!";
let row = GenericRow::from_data(vec![Datum::from(s)]);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), s.as_bytes());
}

#[test]
fn test_decimal_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::decimal(10, 2)]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let dec =
Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 10, 2).unwrap();
let row = GenericRow::from_data(vec![Datum::Decimal(dec)]);
let encoded = encoder.encode_key(&row).unwrap();
let expected = BigDecimal::new(BigInt::from(12345), 2)
.into_bigint_and_exponent()
.0
.to_signed_bytes_be();
assert_eq!(encoded.as_ref(), expected.as_slice());
}

#[test]
fn test_timestamp_encoding() {
let row_type = RowType::with_data_types(vec![DataType::Timestamp(
TimestampType::with_nullable(false, 6).unwrap(),
)]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let millis = 1698235273182i64;
let nanos = 123000;
let micros = millis * 1000 + (nanos as i64 / 1000);
let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap();
let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), micros.to_le_bytes());
}

#[test]
fn test_date_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::date()]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let date_value = 19655i32;
let row = GenericRow::from_data(vec![Datum::from(date_value)]);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), (date_value as i64).to_le_bytes());
}

#[test]
fn test_time_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::time()]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let time_millis = 34200000i32;
let row = GenericRow::from_data(vec![Datum::Time(Time::new(time_millis))]);
let encoded = encoder.encode_key(&row).unwrap();
let micros = (time_millis as i64) * 1000;
assert_eq!(encoded.as_ref(), micros.to_le_bytes());
}

#[test]
fn test_binary_encoding() {
let row_type = RowType::with_data_types(vec![DataTypes::bytes()]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["f0".to_string()]).unwrap();
let data: &[u8] = b"Hello i only understand binary data";
let row = GenericRow::from_data(vec![Datum::from(data)]);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), data);
}
}
Loading
Loading