Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions native-engine/auron-jni-bridge/src/jni_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ pub struct JavaClasses<'a> {
pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>,
pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>,
pub cAuronNativeOrcSinkUtils: AuronNativeOrcSinkUtils<'a>,
pub cAuronNativeParquetSinkUtils: AuronNativeParquetSinkUtils<'a>,
pub cAuronBlockObject: AuronBlockObject<'a>,
pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
Expand Down Expand Up @@ -504,6 +505,7 @@ impl JavaClasses<'static> {
c_spark_udaf_mem_tracker,
c_auron_rss_partition_writer_base,
c_auron_on_heap_spill_manager,
c_auron_native_orc_sink_utils,
c_auron_native_parquet_sink_utils,
c_auron_block_object,
c_auron_json_fallback_wrapper,
Expand All @@ -517,6 +519,7 @@ impl JavaClasses<'static> {
SparkUDAFMemTracker::new(env)?,
AuronRssPartitionWriterBase::new(env)?,
AuronOnHeapSpillManager::new(env)?,
AuronNativeOrcSinkUtils::new(env)?,
AuronNativeParquetSinkUtils::new(env)?,
AuronBlockObject::new(env)?,
AuronJsonFallbackWrapper::new(env)?,
Expand All @@ -530,6 +533,7 @@ impl JavaClasses<'static> {
SparkUDAFMemTracker::default(),
AuronRssPartitionWriterBase::default(),
AuronOnHeapSpillManager::default(),
AuronNativeOrcSinkUtils::default(),
AuronNativeParquetSinkUtils::default(),
AuronBlockObject::default(),
AuronJsonFallbackWrapper::default(),
Expand Down Expand Up @@ -568,6 +572,7 @@ impl JavaClasses<'static> {
cSparkUDAFMemTracker: c_spark_udaf_mem_tracker,
cAuronRssPartitionWriterBase: c_auron_rss_partition_writer_base,
cAuronOnHeapSpillManager: c_auron_on_heap_spill_manager,
cAuronNativeOrcSinkUtils: c_auron_native_orc_sink_utils,
cAuronNativeParquetSinkUtils: c_auron_native_parquet_sink_utils,
cAuronBlockObject: c_auron_block_object,
cAuronJsonFallbackWrapper: c_auron_json_fallback_wrapper,
Expand Down Expand Up @@ -1603,6 +1608,42 @@ impl<'a> AuronNativeParquetSinkUtils<'a> {
}
}

#[allow(non_snake_case)]
pub struct AuronNativeOrcSinkUtils<'a> {
pub class: JClass<'a>,
pub method_getTaskOutputPath: JStaticMethodID,
pub method_getTaskOutputPath_ret: ReturnType,
pub method_completeOutput: JStaticMethodID,
pub method_completeOutput_ret: ReturnType,
}
impl<'a> AuronNativeOrcSinkUtils<'a> {
pub const SIG_TYPE: &'static str =
"org/apache/spark/sql/execution/auron/plan/NativeOrcSinkUtils";

pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronNativeOrcSinkUtils<'a>> {
let class = get_global_jclass(env, Self::SIG_TYPE)?;
Ok(AuronNativeOrcSinkUtils {
class,
method_getTaskOutputPath: env.get_static_method_id(
class,
"getTaskOutputPath",
"()Ljava/lang/String;",
)?,
method_getTaskOutputPath_ret: ReturnType::Object,
method_completeOutput: env.get_static_method_id(
class,
"completeOutput",
"(Ljava/lang/String;JJ)V",
)?,
method_completeOutput_ret: ReturnType::Primitive(Primitive::Void),
})
}

fn default() -> Self {
unsafe { std::mem::zeroed() }
}
}

#[allow(non_snake_case)]
pub struct AuronBlockObject<'a> {
pub class: JClass<'a>,
Expand Down
14 changes: 14 additions & 0 deletions native-engine/auron-planner/proto/auron.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message PhysicalPlanNode {
ParquetSinkExecNode parquet_sink = 24;
OrcScanExecNode orc_scan = 25;
KafkaScanExecNode kafka_scan = 26;
OrcSinkExecNode orc_sink = 27;
}
}

Expand Down Expand Up @@ -622,6 +623,19 @@ message ParquetProp {
string value = 2;
}

message OrcSinkExecNode {
PhysicalPlanNode input = 1;
string fs_resource_id = 2;
int32 num_dyn_parts = 3;
Schema schema = 4;
repeated OrcProp prop = 5;
}

message OrcProp {
string key = 1;
string value = 2;
}

message IpcWriterExecNode {
PhysicalPlanNode input = 1;
string ipc_consumer_resource_id = 2;
Expand Down
14 changes: 14 additions & 0 deletions native-engine/auron-planner/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use datafusion_ext_plans::{
ipc_writer_exec::IpcWriterExec,
limit_exec::LimitExec,
orc_exec::OrcExec,
orc_sink_exec::OrcSinkExec,
parquet_exec::ParquetExec,
parquet_sink_exec::ParquetSinkExec,
project_exec::ProjectExec,
Expand Down Expand Up @@ -802,6 +803,19 @@ impl PhysicalPlanner {
props,
)))
}
PhysicalPlanType::OrcSink(orc_sink) => {
let mut props: Vec<(String, String)> = vec![];
for prop in &orc_sink.prop {
props.push((prop.key.clone(), prop.value.clone()));
}
Ok(Arc::new(OrcSinkExec::new(
convert_box_required!(self, orc_sink.input)?,
orc_sink.fs_resource_id.clone(),
orc_sink.num_dyn_parts as usize,
Arc::new(convert_required!(orc_sink.schema)?),
props,
)))
}
PhysicalPlanType::KafkaScan(kafka_scan) => {
let schema = Arc::new(convert_required!(kafka_scan.schema)?);
if !kafka_scan.mock_data_json_array.is_empty() {
Expand Down
2 changes: 2 additions & 0 deletions native-engine/auron/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion_ext_commons::{df_execution_err, downcast_any};
use datafusion_ext_plans::{
common::execution_context::{ExecutionContext, cancel_all_tasks},
ipc_writer_exec::IpcWriterExec,
orc_sink_exec::OrcSinkExec,
parquet_sink_exec::ParquetSinkExec,
shuffle_writer_exec::ShuffleWriterExec,
};
Expand Down Expand Up @@ -156,6 +157,7 @@ impl NativeExecutionRuntime {

// coalesce output stream if necessary
if downcast_any!(execution_plan_cloned, EmptyExec).is_err()
&& downcast_any!(execution_plan_cloned, OrcSinkExec).is_err()
&& downcast_any!(execution_plan_cloned, ParquetSinkExec).is_err()
&& downcast_any!(execution_plan_cloned, IpcWriterExec).is_err()
&& downcast_any!(execution_plan_cloned, ShuffleWriterExec).is_err()
Expand Down
1 change: 1 addition & 0 deletions native-engine/datafusion-ext-plans/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod ipc_writer_exec;
pub mod joins;
pub mod limit_exec;
pub mod orc_exec;
pub mod orc_sink_exec;
pub mod parquet_exec;
pub mod parquet_sink_exec;
pub mod project_exec;
Expand Down
Loading
Loading