diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 1dc35c0..8bd43f7 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -26,6 +26,7 @@ pub struct FlowUpdateService { flow_types: Vec, channel: Channel, aquila_token: String, + definition_source: Option, } impl FlowUpdateService { @@ -64,9 +65,15 @@ impl FlowUpdateService { flow_types, channel, aquila_token, + definition_source: None, } } + pub fn with_definition_source(mut self, source: String) -> Self { + self.definition_source = Some(source); + self + } + pub fn with_flow_types(mut self, flow_types: Vec) -> Self { self.flow_types = flow_types; self @@ -90,11 +97,11 @@ impl FlowUpdateService { self } - pub async fn send(&self) { + pub async fn send(&mut self) { let _ = self.send_with_status().await; } - pub async fn send_with_status(&self) -> bool { + pub async fn send_with_status(&mut self) -> bool { let data_types_success = self.update_data_types().await; let runtime_functions_success = self.update_runtime_functions().await; let functions_success = self.update_functions().await; @@ -102,12 +109,18 @@ impl FlowUpdateService { data_types_success && runtime_functions_success && functions_success && flow_types_success } - async fn update_data_types(&self) -> bool { + async fn update_data_types(&mut self) -> bool { if self.data_types.is_empty() { log::info!("No DataTypes present."); return true; } + if let Some(source) = &self.definition_source { + for data_type in self.data_types.iter_mut() { + data_type.definition_source = source.to_string(); + } + } + log::info!("Updating {} DataTypes.", self.data_types.len()); let mut client = DataTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( @@ -135,12 +148,18 @@ impl FlowUpdateService { } } - async fn update_functions(&self) -> bool { + async fn update_functions(&mut self) -> bool { if self.functions.is_empty() { log::info!("No FunctionDefinitions present."); return true; } + if let Some(source) = &self.definition_source { + for function in self.functions.iter_mut() { + function.definition_source = source.to_string(); + } + }; + log::info!("Updating {} FunctionDefinitions.", self.functions.len()); let mut client = FunctionDefinitionServiceClient::new(self.channel.clone()); let request = Request::from_parts( @@ -167,12 +186,18 @@ impl FlowUpdateService { } } - async fn update_runtime_functions(&self) -> bool { + async fn update_runtime_functions(&mut self) -> bool { if self.runtime_functions.is_empty() { log::info!("No RuntimeFunctionDefinitions present."); return true; } + if let Some(source) = &self.definition_source { + for runtime_function in self.runtime_functions.iter_mut() { + runtime_function.definition_source = source.to_string(); + } + } + log::info!( "Updating {} RuntimeFunctionDefinitions.", self.runtime_functions.len() @@ -202,12 +227,18 @@ impl FlowUpdateService { } } - async fn update_flow_types(&self) -> bool { + async fn update_flow_types(&mut self) -> bool { if self.flow_types.is_empty() { log::info!("No FlowTypes present."); return true; } + if let Some(source) = &self.definition_source { + for flow_type in self.flow_types.iter_mut() { + flow_type.definition_source = Some(source.to_string()); + } + } + log::info!("Updating {} FlowTypes.", self.flow_types.len()); let mut client = FlowTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts(