From af41a78063c579da5f6f7098f0b773265bb43df7 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 12 Apr 2026 01:33:19 +0200 Subject: [PATCH 1/6] ref: renamed functions into runtime functions --- src/flow_definition/feature/mod.rs | 4 ++-- src/flow_definition/mod.rs | 8 ++++---- src/flow_service/mod.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/flow_definition/feature/mod.rs b/src/flow_definition/feature/mod.rs index faec114..26d645f 100644 --- a/src/flow_definition/feature/mod.rs +++ b/src/flow_definition/feature/mod.rs @@ -1,12 +1,12 @@ pub mod version; use serde::Deserialize; -use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition}; +use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; #[derive(Deserialize, Debug, Clone)] pub struct Feature { pub name: String, pub data_types: Vec, pub flow_types: Vec, - pub functions: Vec, + pub runtime_functions: Vec, } diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index 9ace181..b713896 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -62,10 +62,10 @@ impl Reader { ); log::debug!( - "Found Functions {:?}", + "Found RuntimeFunctions {:?}", &features .iter() - .flat_map(|f| f.functions.iter().map(|t| t.runtime_name.clone())) + .flat_map(|f| f.runtime_functions.iter().map(|t| t.runtime_name.clone())) .collect::>() ); @@ -132,7 +132,7 @@ impl Reader { None => continue, }; - let functions = match self.load_definitions_for_feature::( + let runtime_functions = match self.load_definitions_for_feature::( &path, "runtime_definition", )? { @@ -144,7 +144,7 @@ impl Reader { name: feature_name, data_types, flow_types, - functions, + runtime_functions, }; features.push(feature); diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index ee4a603..3a6873c 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -46,7 +46,7 @@ impl FlowUpdateService { for feature in features { data_types.append(&mut feature.data_types.clone()); flow_types.append(&mut feature.flow_types.clone()); - runtime_definitions.append(&mut feature.functions.clone()); + runtime_definitions.append(&mut feature.runtime_functions.clone()); } let channel = create_channel_with_retry("Aquila", aquila_url).await; From c4ec7d5cbc66592ddc415d2e52d1697cebc5f9e7 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 12 Apr 2026 01:40:35 +0200 Subject: [PATCH 2/6] feat: flow-definition will read function definitions --- src/flow_definition/feature/mod.rs | 1 + src/flow_definition/feature/version.rs | 7 ++++++- src/flow_definition/mod.rs | 21 +++++++++++++++------ 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/flow_definition/feature/mod.rs b/src/flow_definition/feature/mod.rs index 26d645f..f37a250 100644 --- a/src/flow_definition/feature/mod.rs +++ b/src/flow_definition/feature/mod.rs @@ -9,4 +9,5 @@ pub struct Feature { pub data_types: Vec, pub flow_types: Vec, pub runtime_functions: Vec, + pub functions: Vec, } diff --git a/src/flow_definition/feature/version.rs b/src/flow_definition/feature/version.rs index f079301..22fa106 100644 --- a/src/flow_definition/feature/version.rs +++ b/src/flow_definition/feature/version.rs @@ -1,4 +1,4 @@ -use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition}; +use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; pub trait HasVersion { fn version(&self) -> &String; @@ -22,6 +22,11 @@ impl HasVersion for FlowType { } } +impl HasVersion for FunctionDefinition { + fn version(&self) -> &String { + &self.version + } +} impl HasVersion for RuntimeFunctionDefinition { fn version(&self) -> &String { &self.version diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index b713896..f0b12ca 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -3,12 +3,12 @@ mod feature; use crate::flow_definition::error::ReaderError; use crate::flow_definition::feature::Feature; +use crate::flow_definition::feature::version::HasVersion; use serde::de::DeserializeOwned; use std::fs; use std::path::Path; -use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition}; +use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; use walkdir::WalkDir; -use crate::flow_definition::feature::version::HasVersion; pub struct Reader { should_break: bool, @@ -132,10 +132,18 @@ impl Reader { None => continue, }; - let runtime_functions = match self.load_definitions_for_feature::( - &path, - "runtime_definition", - )? { + let runtime_functions = match self + .load_definitions_for_feature::( + &path, + "runtime_functions", + )? { + Some(v) => v, + None => continue, + }; + + let functions = match self + .load_definitions_for_feature::(&path, "functions")? + { Some(v) => v, None => continue, }; @@ -145,6 +153,7 @@ impl Reader { data_types, flow_types, runtime_functions, + functions, }; features.push(feature); From eb49beb5a855ddb2208455d68e3fc2edf0c97345 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 12 Apr 2026 01:49:21 +0200 Subject: [PATCH 3/6] feat: added function definition service to flow serivce --- src/flow_service/mod.rs | 85 +++++++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 20 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 3a6873c..9fbcab8 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -5,12 +5,15 @@ use crate::{ use tonic::{Extensions, Request, transport::Channel}; use tucana::{ aquila::{ - DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest, - data_type_service_client::DataTypeServiceClient, + DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest, + RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient, flow_type_service_client::FlowTypeServiceClient, + function_definition_service_client::FunctionDefinitionServiceClient, runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient, }, - shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition}, + shared::{ + DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition, + }, }; pub mod auth; @@ -18,7 +21,8 @@ pub mod retry; pub struct FlowUpdateService { data_types: Vec, - runtime_definitions: Vec, + runtime_functions: Vec, + functions: Vec, flow_types: Vec, channel: Channel, aquila_token: String, @@ -30,7 +34,8 @@ impl FlowUpdateService { /// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types. pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self { let mut data_types = Vec::new(); - let mut runtime_definitions = Vec::new(); + let mut runtime_functions = Vec::new(); + let mut functions = Vec::new(); let mut flow_types = Vec::new(); let reader = Reader::configure(definition_path.to_string(), true, vec![], None); @@ -46,14 +51,16 @@ impl FlowUpdateService { for feature in features { data_types.append(&mut feature.data_types.clone()); flow_types.append(&mut feature.flow_types.clone()); - runtime_definitions.append(&mut feature.runtime_functions.clone()); + runtime_functions.append(&mut feature.runtime_functions.clone()); + functions.append(&mut feature.functions.clone()); } let channel = create_channel_with_retry("Aquila", aquila_url).await; Self { data_types, - runtime_definitions, + runtime_functions, + functions, flow_types, channel, aquila_token, @@ -70,27 +77,33 @@ impl FlowUpdateService { self } - pub fn with_runtime_definitions( + pub fn with_runtime_functions( mut self, - runtime_definitions: Vec, + runtime_functions: Vec, ) -> Self { - self.runtime_definitions = runtime_definitions; + self.runtime_functions = runtime_functions; + self + } + + pub fn with_functions(mut self, functions: Vec) -> Self { + self.functions = functions; self } pub async fn send(&self) { self.update_data_types().await; - self.update_runtime_definitions().await; + self.update_runtime_functions().await; + self.update_functions().await; self.update_flow_types().await; } async fn update_data_types(&self) { if self.data_types.is_empty() { - log::info!("No data types to update"); + log::info!("No DataTypes present."); return; } - log::info!("Updating the current DataTypes!"); + log::info!("Updating {} DataTypes.", self.data_types.len()); let mut client = DataTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), @@ -113,19 +126,51 @@ impl FlowUpdateService { } } - async fn update_runtime_definitions(&self) { - if self.runtime_definitions.is_empty() { - log::info!("No runtime definitions to update"); + async fn update_functions(&self) { + if self.functions.is_empty() { + log::info!("No FunctionDefinitions present."); return; } - log::info!("Updating the current RuntimeDefinitions!"); + log::info!("Updating {} FunctionDefinitions.", self.functions.len()); + let mut client = FunctionDefinitionServiceClient::new(self.channel.clone()); + let request = Request::from_parts( + get_authorization_metadata(&self.aquila_token), + Extensions::new(), + FunctionDefinitionUpdateRequest { + functions: self.functions.clone(), + }, + ); + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the FunctionDefinitions accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update function definitions: {:?}", err); + } + } + } + + async fn update_runtime_functions(&self) { + if self.runtime_functions.is_empty() { + log::info!("No RuntimeFunctionDefintions present."); + return; + } + + log::info!( + "Updating {} RuntimeFunctionDefinitions.", + self.runtime_functions.len() + ); let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), Extensions::new(), RuntimeFunctionDefinitionUpdateRequest { - runtime_functions: self.runtime_definitions.clone(), + runtime_functions: self.runtime_functions.clone(), }, ); @@ -144,11 +189,11 @@ impl FlowUpdateService { async fn update_flow_types(&self) { if self.flow_types.is_empty() { - log::info!("No FlowTypes to update!"); + log::info!("No FlowTypes present."); return; } - log::info!("Updating the current FlowTypes!"); + log::info!("Updating {} FlowTypes.", self.flow_types.len()); let mut client = FlowTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), From 44131921de4cb07ccd6c2ab80df170e9f8bb4c69 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 12 Apr 2026 01:59:48 +0200 Subject: [PATCH 4/6] ref: correct names for folders --- src/flow_definition/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index f0b12ca..ab473ac 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -120,14 +120,14 @@ impl Reader { } let data_types = match self - .load_definitions_for_feature::(&path, "data_type")? + .load_definitions_for_feature::(&path, "data_types")? { Some(v) => v, None => continue, }; let flow_types = - match self.load_definitions_for_feature::(&path, "flow_type")? { + match self.load_definitions_for_feature::(&path, "flow_types")? { Some(v) => v, None => continue, }; From 37ff4ca238ea6c9009306254e3ee3b39b934b608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Sun, 12 Apr 2026 02:26:39 +0200 Subject: [PATCH 5/6] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 9fbcab8..b5df9f1 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -31,7 +31,7 @@ pub struct FlowUpdateService { impl FlowUpdateService { /// Create a new FlowUpdateService instance from an Aquila URL and a definition path. /// - /// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types. + /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types. pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self { let mut data_types = Vec::new(); let mut runtime_functions = Vec::new(); From 9a6986c87f47eeb021628660ec54e2241c452412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Sun, 12 Apr 2026 02:26:47 +0200 Subject: [PATCH 6/6] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index b5df9f1..2202ea0 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -157,7 +157,7 @@ impl FlowUpdateService { async fn update_runtime_functions(&self) { if self.runtime_functions.is_empty() { - log::info!("No RuntimeFunctionDefintions present."); + log::info!("No RuntimeFunctionDefinitions present."); return; }