From 18fbc4d926531d589856836d4e8f30a26e972a3a Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 17 Apr 2026 20:01:26 +0200 Subject: [PATCH 01/10] feat: added definition source and mapping on send --- src/flow_service/mod.rs | 65 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 1dc35c0..d27bc8e 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -26,6 +26,7 @@ pub struct FlowUpdateService { flow_types: Vec, channel: Channel, aquila_token: String, + definition_source: Option, } impl FlowUpdateService { @@ -64,9 +65,15 @@ impl FlowUpdateService { flow_types, channel, aquila_token, + definition_source: None, } } + pub fn with_definiton_source(mut self, source: String) -> Self { + self.definition_source = Some(source); + self + } + pub fn with_flow_types(mut self, flow_types: Vec) -> Self { self.flow_types = flow_types; self @@ -90,11 +97,11 @@ impl FlowUpdateService { self } - pub async fn send(&self) { + pub async fn send(&mut self) { let _ = self.send_with_status().await; } - pub async fn send_with_status(&self) -> bool { + pub async fn send_with_status(&mut self) -> bool { let data_types_success = self.update_data_types().await; let runtime_functions_success = self.update_runtime_functions().await; let functions_success = self.update_functions().await; @@ -102,12 +109,24 @@ impl FlowUpdateService { data_types_success && runtime_functions_success && functions_success && flow_types_success } - async fn update_data_types(&self) -> bool { + async fn update_data_types(&mut self) -> bool { if self.data_types.is_empty() { log::info!("No DataTypes present."); return true; } + if let Some(source) = &self.definition_source { + self.data_types = self + .data_types + .clone() + .into_iter() + .map(|mut x| { + x.definition_source = source.to_string(); + x + }) + .collect() + }; + log::info!("Updating {} DataTypes.", self.data_types.len()); let mut client = DataTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( @@ -135,12 +154,24 @@ impl FlowUpdateService { } } - async fn update_functions(&self) -> bool { + async fn update_functions(&mut self) -> bool { if self.functions.is_empty() { log::info!("No FunctionDefinitions present."); return true; } + if let Some(source) = &self.definition_source { + self.functions = self + .functions + .clone() + .into_iter() + .map(|mut x| { + x.definition_source = source.to_string(); + x + }) + .collect() + }; + log::info!("Updating {} FunctionDefinitions.", self.functions.len()); let mut client = FunctionDefinitionServiceClient::new(self.channel.clone()); let request = Request::from_parts( @@ -167,12 +198,24 @@ impl FlowUpdateService { } } - async fn update_runtime_functions(&self) -> bool { + async fn update_runtime_functions(&mut self) -> bool { if self.runtime_functions.is_empty() { log::info!("No RuntimeFunctionDefinitions present."); return true; } + if let Some(source) = &self.definition_source { + self.runtime_functions = self + .runtime_functions + .clone() + .into_iter() + .map(|mut x| { + x.definition_source = source.to_string(); + x + }) + .collect() + }; + log::info!( "Updating {} RuntimeFunctionDefinitions.", self.runtime_functions.len() @@ -202,12 +245,22 @@ impl FlowUpdateService { } } - async fn update_flow_types(&self) -> bool { + async fn update_flow_types(&mut self) -> bool { if self.flow_types.is_empty() { log::info!("No FlowTypes present."); return true; } + self.flow_types = self + .flow_types + .clone() + .into_iter() + .map(|mut x| { + x.definition_source = self.definition_source.clone(); + x + }) + .collect(); + log::info!("Updating {} FlowTypes.", self.flow_types.len()); let mut client = FlowTypeServiceClient::new(self.channel.clone()); let request = Request::from_parts( From 4e6d8c4eb611f29b29d93120f1b15fb8a8b06eab Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 17 Apr 2026 20:19:27 +0200 Subject: [PATCH 02/10] ref: fixed typo --- src/flow_service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index d27bc8e..b0f00b1 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -69,7 +69,7 @@ impl FlowUpdateService { } } - pub fn with_definiton_source(mut self, source: String) -> Self { + pub fn with_definition_source(mut self, source: String) -> Self { self.definition_source = Some(source); self } From 478f7cd1557922d91197888d65a4d417de01dd03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:20:03 +0200 Subject: [PATCH 03/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index b0f00b1..35de1d6 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -116,15 +116,9 @@ impl FlowUpdateService { } if let Some(source) = &self.definition_source { - self.data_types = self - .data_types - .clone() - .into_iter() - .map(|mut x| { - x.definition_source = source.to_string(); - x - }) - .collect() + for data_type in self.data_types.iter_mut() { + data_type.definition_source = source.to_string(); + } }; log::info!("Updating {} DataTypes.", self.data_types.len()); From 1315652cd1c8c9439b6961daafca2fa04c0e49d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:20:13 +0200 Subject: [PATCH 04/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 35de1d6..0eb6c27 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -245,15 +245,10 @@ impl FlowUpdateService { return true; } - self.flow_types = self - .flow_types - .clone() - .into_iter() - .map(|mut x| { - x.definition_source = self.definition_source.clone(); - x - }) - .collect(); + let definition_source = self.definition_source.clone(); + for flow_type in self.flow_types.iter_mut() { + flow_type.definition_source = definition_source.clone(); + } log::info!("Updating {} FlowTypes.", self.flow_types.len()); let mut client = FlowTypeServiceClient::new(self.channel.clone()); From de3b9c1a546610e30c6fce020b9f929e7853caad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:57:12 +0200 Subject: [PATCH 05/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 0eb6c27..f6e1ac2 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -245,9 +245,10 @@ impl FlowUpdateService { return true; } - let definition_source = self.definition_source.clone(); - for flow_type in self.flow_types.iter_mut() { - flow_type.definition_source = definition_source.clone(); + if let Some(source) = &self.definition_source { + for flow_type in self.flow_types.iter_mut() { + flow_type.definition_source = Some(source.to_string()); + } } log::info!("Updating {} FlowTypes.", self.flow_types.len()); From 37893da71de6b44f705dee6c29999a794e45e326 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:57:21 +0200 Subject: [PATCH 06/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index f6e1ac2..02a31c3 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -198,17 +198,10 @@ impl FlowUpdateService { return true; } - if let Some(source) = &self.definition_source { - self.runtime_functions = self - .runtime_functions - .clone() - .into_iter() - .map(|mut x| { - x.definition_source = source.to_string(); - x - }) - .collect() - }; + let definition_source = self.definition_source.clone(); + for runtime_function in self.runtime_functions.iter_mut() { + runtime_function.definition_source = definition_source.clone(); + } log::info!( "Updating {} RuntimeFunctionDefinitions.", From 4ccb083d9bee6dd7cc5502dcc0f80b58d0d8e52b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:57:29 +0200 Subject: [PATCH 07/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 02a31c3..8f45546 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -155,15 +155,9 @@ impl FlowUpdateService { } if let Some(source) = &self.definition_source { - self.functions = self - .functions - .clone() - .into_iter() - .map(|mut x| { - x.definition_source = source.to_string(); - x - }) - .collect() + for function in self.functions.iter_mut() { + function.definition_source = source.to_string(); + } }; log::info!("Updating {} FunctionDefinitions.", self.functions.len()); From 4450d429ad3e36370a115bea04cbc5d4ef42a333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 20:57:37 +0200 Subject: [PATCH 08/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 8f45546..008f023 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -119,7 +119,7 @@ impl FlowUpdateService { for data_type in self.data_types.iter_mut() { data_type.definition_source = source.to_string(); } - }; + } log::info!("Updating {} DataTypes.", self.data_types.len()); let mut client = DataTypeServiceClient::new(self.channel.clone()); From 3231fcc6f48700db2c85cd798932bd87843165cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 17 Apr 2026 22:06:10 +0200 Subject: [PATCH 09/10] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 008f023..79a69a3 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -192,9 +192,10 @@ impl FlowUpdateService { return true; } - let definition_source = self.definition_source.clone(); - for runtime_function in self.runtime_functions.iter_mut() { - runtime_function.definition_source = definition_source.clone(); + if let Some(source) = &self.definition_source { + for runtime_function in self.runtime_functions.iter_mut() { + runtime_function.definition_source = Some(source.to_string()); + } } log::info!( From 5d433c3a42c6caa220e61259399682d160c1448e Mon Sep 17 00:00:00 2001 From: Raphael Date: Tue, 21 Apr 2026 18:30:01 +0200 Subject: [PATCH 10/10] fix: correct type assignment --- src/flow_service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 79a69a3..8bd43f7 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -194,7 +194,7 @@ impl FlowUpdateService { if let Some(source) = &self.definition_source { for runtime_function in self.runtime_functions.iter_mut() { - runtime_function.definition_source = Some(source.to_string()); + runtime_function.definition_source = source.to_string(); } }