diff --git a/src/flow_definition/feature/mod.rs b/src/flow_definition/feature/mod.rs index faec114..f37a250 100644 --- a/src/flow_definition/feature/mod.rs +++ b/src/flow_definition/feature/mod.rs @@ -1,12 +1,13 @@ pub mod version; use serde::Deserialize; -use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition}; +use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; #[derive(Deserialize, Debug, Clone)] pub struct Feature { pub name: String, pub data_types: Vec, pub flow_types: Vec, - pub functions: Vec, + pub runtime_functions: Vec, + pub functions: Vec, } diff --git a/src/flow_definition/feature/version.rs b/src/flow_definition/feature/version.rs index f079301..22fa106 100644 --- a/src/flow_definition/feature/version.rs +++ b/src/flow_definition/feature/version.rs @@ -1,4 +1,4 @@ -use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition}; +use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; pub trait HasVersion { fn version(&self) -> &String; @@ -22,6 +22,11 @@ impl HasVersion for FlowType { } } +impl HasVersion for FunctionDefinition { + fn version(&self) -> &String { + &self.version + } +} impl HasVersion for RuntimeFunctionDefinition { fn version(&self) -> &String { &self.version diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index 9ace181..ab473ac 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -3,12 +3,12 @@ mod feature; use crate::flow_definition::error::ReaderError; use crate::flow_definition::feature::Feature; +use crate::flow_definition::feature::version::HasVersion; use serde::de::DeserializeOwned; use std::fs; use std::path::Path; -use tucana::shared::{DefinitionDataType, FlowType, RuntimeFunctionDefinition}; +use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; use walkdir::WalkDir; -use crate::flow_definition::feature::version::HasVersion; pub struct Reader { should_break: bool, @@ -62,10 +62,10 @@ impl Reader { ); log::debug!( - "Found Functions {:?}", + "Found RuntimeFunctions {:?}", &features .iter() - .flat_map(|f| f.functions.iter().map(|t| t.runtime_name.clone())) + .flat_map(|f| f.runtime_functions.iter().map(|t| t.runtime_name.clone())) .collect::>() ); @@ -120,22 +120,30 @@ impl Reader { } let data_types = match self - .load_definitions_for_feature::(&path, "data_type")? + .load_definitions_for_feature::(&path, "data_types")? { Some(v) => v, None => continue, }; let flow_types = - match self.load_definitions_for_feature::(&path, "flow_type")? { + match self.load_definitions_for_feature::(&path, "flow_types")? { Some(v) => v, None => continue, }; - let functions = match self.load_definitions_for_feature::( - &path, - "runtime_definition", - )? { + let runtime_functions = match self + .load_definitions_for_feature::( + &path, + "runtime_functions", + )? { + Some(v) => v, + None => continue, + }; + + let functions = match self + .load_definitions_for_feature::(&path, "functions")? + { Some(v) => v, None => continue, }; @@ -144,6 +152,7 @@ impl Reader { name: feature_name, data_types, flow_types, + runtime_functions, functions, }; diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index ee4a603..2202ea0 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -5,12 +5,15 @@ use crate::{ use tonic::{Extensions, Request, transport::Channel}; use tucana::{ aquila::{ - DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest, - data_type_service_client::DataTypeServiceClient, + DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest, + RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient, flow_type_service_client::FlowTypeServiceClient, + function_definition_service_client::FunctionDefinitionServiceClient, runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient, }, - shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition}, + shared::{ + DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition, + }, }; pub mod auth; @@ -18,7 +21,8 @@ pub mod retry; pub struct FlowUpdateService { data_types: Vec, - runtime_definitions: Vec, + runtime_functions: Vec, + functions: Vec, flow_types: Vec, channel: Channel, aquila_token: String, @@ -27,10 +31,11 @@ pub struct FlowUpdateService { impl FlowUpdateService { /// Create a new FlowUpdateService instance from an Aquila URL and a definition path. /// - /// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types. + /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types. pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self { let mut data_types = Vec::new(); - let mut runtime_definitions = Vec::new(); + let mut runtime_functions = Vec::new(); + let mut functions = Vec::new(); let mut flow_types = Vec::new(); let reader = Reader::configure(definition_path.to_string(), true, vec![], None); @@ -46,14 +51,16 @@ impl FlowUpdateService { for feature in features { data_types.append(&mut feature.data_types.clone()); flow_types.append(&mut feature.flow_types.clone()); - runtime_definitions.append(&mut feature.functions.clone()); + runtime_functions.append(&mut feature.runtime_functions.clone()); + functions.append(&mut feature.functions.clone()); } let channel = create_channel_with_retry("Aquila", aquila_url).await; Self { data_types, - runtime_definitions, + runtime_functions, + functions, flow_types, channel, aquila_token, @@ -70,27 +77,33 @@ impl FlowUpdateService { self } - pub fn with_runtime_definitions( + pub fn with_runtime_functions( mut self, - runtime_definitions: Vec, + runtime_functions: Vec, ) -> Self { - self.runtime_definitions = runtime_definitions; + self.runtime_functions = runtime_functions; + self + } + + pub fn with_functions(mut self, functions: Vec) -> Self { + self.functions = functions; self } pub async fn send(&self) { self.update_data_types().await; - self.update_runtime_definitions().await; + self.update_runtime_functions().await; + self.update_functions().await; self.update_flow_types().await; } async fn update_data_types(&self) { if self.data_types.is_empty() { - log::info!("No data types to update"); + log::info!("No DataTypes present."); return; } - log::info!("Updating the current DataTypes!"); + log::info!("Updating {} DataTypes.", self.data_types.len()); let mut client = DataTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), @@ -113,19 +126,51 @@ impl FlowUpdateService { } } - async fn update_runtime_definitions(&self) { - if self.runtime_definitions.is_empty() { - log::info!("No runtime definitions to update"); + async fn update_functions(&self) { + if self.functions.is_empty() { + log::info!("No FunctionDefinitions present."); return; } - log::info!("Updating the current RuntimeDefinitions!"); + log::info!("Updating {} FunctionDefinitions.", self.functions.len()); + let mut client = FunctionDefinitionServiceClient::new(self.channel.clone()); + let request = Request::from_parts( + get_authorization_metadata(&self.aquila_token), + Extensions::new(), + FunctionDefinitionUpdateRequest { + functions: self.functions.clone(), + }, + ); + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the FunctionDefinitions accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update function definitions: {:?}", err); + } + } + } + + async fn update_runtime_functions(&self) { + if self.runtime_functions.is_empty() { + log::info!("No RuntimeFunctionDefinitions present."); + return; + } + + log::info!( + "Updating {} RuntimeFunctionDefinitions.", + self.runtime_functions.len() + ); let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), Extensions::new(), RuntimeFunctionDefinitionUpdateRequest { - runtime_functions: self.runtime_definitions.clone(), + runtime_functions: self.runtime_functions.clone(), }, ); @@ -144,11 +189,11 @@ impl FlowUpdateService { async fn update_flow_types(&self) { if self.flow_types.is_empty() { - log::info!("No FlowTypes to update!"); + log::info!("No FlowTypes present."); return; } - log::info!("Updating the current FlowTypes!"); + log::info!("Updating {} FlowTypes.", self.flow_types.len()); let mut client = FlowTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token),