diff --git a/Cargo.lock b/Cargo.lock index be2a9fa..7ad5c66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,7 +84,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-health", - "tucana", + "tucana 0.0.70", "uuid", ] @@ -274,7 +274,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana", + "tucana 0.0.68", "walkdir", ] @@ -2036,6 +2036,26 @@ dependencies = [ "tonic-prost-build", ] +[[package]] +name = "tucana" +version = "0.0.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ececdc8eeccd39a9ba82a4ee78e9663d9871a9ca9da65013477536360322392" +dependencies = [ + "pbjson", + "pbjson-build", + "pbjson-types", + "prost", + "prost-build", + "prost-types", + "serde", + "serde_json", + "tonic", + "tonic-build", + "tonic-prost", + "tonic-prost-build", +] + [[package]] name = "typenum" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 394a15a..96292a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ log = "0.4.26" env_logger = "0.11.8" prost = "0.14.1" tonic = "0.14.1" -tucana = { version = "0.0.68", features = ["all"] } +tucana = { version = "0.0.70", features = ["all"] } code0-flow = { version = "0.0.32", features = ["flow_health"] } serde_json = "1.0.140" async-nats = "0.47.0" diff --git a/src/configuration/service.rs b/src/configuration/service.rs index cf08742..94a97a8 100644 --- a/src/configuration/service.rs +++ b/src/configuration/service.rs @@ -1,19 +1,19 @@ use serde::{Deserialize, Serialize}; use serde_json::from_str; use std::{fs::File, io::Read}; -use tucana::shared::{ActionConfigurations, helper::value::from_json_value}; +use tucana::shared::{ModuleConfigurations, helper::value::from_json_value}; #[derive(Serialize, Deserialize, Clone)] -struct SerializableActionConfiguration { +struct SerializableModuleConfiguration { identifier: String, value: serde_json::Value, } #[derive(Serialize, Deserialize, Clone)] -struct SerializableActionProjectConfiguration { +struct SerializableModuleProjectConfiguration { project_id: i64, #[serde(default)] - configs: Vec, + configs: Vec, } #[derive(Serialize, Deserialize, Clone)] @@ -21,7 +21,7 @@ pub struct SerializableActionServiceConfiguration { token: String, identifier: String, #[serde(default)] - configs: Vec, + configs: Vec, } #[derive(Serialize, Deserialize, Clone, Default)] @@ -36,7 +36,7 @@ pub struct SerializableServiceConfiguration { pub struct ActionServiceConfiguration { token: String, service_name: String, - config: Vec, + config: Vec, } #[derive(Serialize, Deserialize, Clone)] @@ -51,8 +51,8 @@ pub struct ServiceConfiguration { runtimes: Vec, } -impl From for tucana::shared::ActionConfiguration { - fn from(value: SerializableActionConfiguration) -> Self { +impl From for tucana::shared::ModuleConfiguration { + fn from(value: SerializableModuleConfiguration) -> Self { Self { identifier: value.identifier, value: Some(from_json_value(value.value)), @@ -60,25 +60,25 @@ impl From for tucana::shared::ActionConfigurati } } -impl From for tucana::shared::ActionProjectConfiguration { - fn from(value: SerializableActionProjectConfiguration) -> Self { +impl From for tucana::shared::ModuleProjectConfigurations { + fn from(value: SerializableModuleProjectConfiguration) -> Self { Self { project_id: value.project_id, - action_configurations: value.configs.into_iter().map(Into::into).collect(), + module_configurations: value.configs.into_iter().map(Into::into).collect(), } } } impl From for ActionServiceConfiguration { fn from(value: SerializableActionServiceConfiguration) -> Self { - let action_identifier = value.identifier.clone(); + let module_identifier = value.identifier.clone(); Self { token: value.token, service_name: value.identifier, - config: vec![ActionConfigurations { - action_identifier, - action_configurations: value.configs.into_iter().map(Into::into).collect(), + config: vec![ModuleConfigurations { + module_identifier, + module_configurations: value.configs.into_iter().map(Into::into).collect(), }], } } @@ -114,7 +114,7 @@ impl ServiceConfiguration { pub fn get_action_configuration( &self, action_identifier: &String, - ) -> Vec { + ) -> Vec { match self .actions .iter() diff --git a/src/sagittarius/action_configuration_service_client_impl.rs b/src/sagittarius/action_configuration_service_client_impl.rs deleted file mode 100644 index 0b19f94..0000000 --- a/src/sagittarius/action_configuration_service_client_impl.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::authorization::authorization::get_authorization_metadata; -use tonic::transport::Channel; -use tonic::{Extensions, Request}; -use tucana::shared::ActionConfigurationDefinition; - -pub struct SagittariusActionConfigurationServiceClient { - client: - tucana::sagittarius::action_configuration_service_client::ActionConfigurationServiceClient< - Channel, - >, - token: String, -} - -impl SagittariusActionConfigurationServiceClient { - pub fn new(channel: Channel, token: String) -> Self { - let client = tucana::sagittarius::action_configuration_service_client::ActionConfigurationServiceClient::new(channel); - - Self { client, token } - } - - pub async fn update_action_configuration( - &mut self, - action_identifier: String, - configs: Vec, - ) -> bool { - let request = Request::from_parts( - get_authorization_metadata(&self.token), - Extensions::new(), - tucana::sagittarius::ActionConfigurationUpdateRequest { - action_identifier: action_identifier, - action_configurations: configs, - }, - ); - - let response = match self.client.update(request).await { - Ok(response) => { - log::info!("Successfully transferred action configuration update.",); - response.into_inner() - } - Err(err) => { - log::error!("Failed to update action configurations: {:?}", err); - return true; - } - }; - - match response.success { - true => log::info!("Sagittarius successfully updated ActionConfiguration."), - false => log::error!("Sagittarius didn't update any ActionConfiguration."), - }; - - response.success - } -} diff --git a/src/sagittarius/data_type_service_client_impl.rs b/src/sagittarius/data_type_service_client_impl.rs deleted file mode 100644 index e6520e4..0000000 --- a/src/sagittarius/data_type_service_client_impl.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::authorization::authorization::get_authorization_metadata; -use tonic::transport::Channel; -use tonic::{Extensions, Request}; -use tucana::sagittarius::{ - DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest, - data_type_service_client::DataTypeServiceClient, -}; -use tucana::{ - aquila::DataTypeUpdateRequest as AquilaDataTypeUpdateRequest, - aquila::DataTypeUpdateResponse as AquilaDataTypeUpdateResponse, -}; -pub struct SagittariusDataTypeServiceClient { - client: DataTypeServiceClient, - token: String, -} - -impl SagittariusDataTypeServiceClient { - pub fn new(channel: Channel, token: String) -> Self { - let client = DataTypeServiceClient::new(channel); - - Self { client, token } - } - - pub async fn update_data_types( - &mut self, - data_type_update_request: AquilaDataTypeUpdateRequest, - ) -> AquilaDataTypeUpdateResponse { - let request = Request::from_parts( - get_authorization_metadata(&self.token), - Extensions::new(), - SagittariusDataTypeUpdateRequest { - data_types: data_type_update_request.data_types, - }, - ); - - let response = match self.client.update(request).await { - Ok(response) => { - log::info!("Successfully transferred data types.",); - response.into_inner() - } - Err(err) => { - log::error!("Failed to update DataTypes: {:?}", err); - return AquilaDataTypeUpdateResponse { success: false }; - } - }; - - match response.success { - true => log::info!("Sagittarius successfully updated DataTypes."), - false => log::error!("Sagittarius didn't update any DataTypes."), - }; - - AquilaDataTypeUpdateResponse { - success: response.success, - } - } -} diff --git a/src/sagittarius/flow_service_client_impl.rs b/src/sagittarius/flow_service_client_impl.rs index fa68c6d..8689892 100644 --- a/src/sagittarius/flow_service_client_impl.rs +++ b/src/sagittarius/flow_service_client_impl.rs @@ -21,7 +21,7 @@ pub struct SagittariusFlowClient { env: String, token: String, sagittarius_ready: Arc, - action_config_tx: broadcast::Sender, + action_config_tx: broadcast::Sender, } impl SagittariusFlowClient { @@ -31,7 +31,7 @@ impl SagittariusFlowClient { token: String, channel: Channel, sagittarius_ready: Arc, - action_config_tx: broadcast::Sender, + action_config_tx: broadcast::Sender, ) -> SagittariusFlowClient { let client = FlowServiceClient::new(channel); @@ -151,7 +151,7 @@ impl SagittariusFlowClient { }; } } - Data::ActionConfigurations(action_configurations) => { + Data::ModuleConfigurations(action_configurations) => { if let Err(err) = self.action_config_tx.send(action_configurations) { log::warn!("No action configuration receivers available: {:?}", err); } diff --git a/src/sagittarius/flow_type_service_client_impl.rs b/src/sagittarius/flow_type_service_client_impl.rs deleted file mode 100644 index 033d85b..0000000 --- a/src/sagittarius/flow_type_service_client_impl.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::authorization::authorization::get_authorization_metadata; -use tonic::Extensions; -use tonic::Request; -use tonic::transport::Channel; -use tucana::aquila::FlowTypeUpdateRequest as AquilaFlowTypeUpdateRequest; -use tucana::aquila::FlowTypeUpdateResponse as AquilaFlowTypeUpdateResponse; -use tucana::sagittarius::FlowTypeUpdateRequest as SagittariusFlowTypeUpdateRequest; -use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient; - -pub struct SagittariusFlowTypeServiceClient { - client: FlowTypeServiceClient, - token: String, -} - -impl SagittariusFlowTypeServiceClient { - pub fn new(channel: Channel, token: String) -> Self { - let client = FlowTypeServiceClient::new(channel); - - Self { client, token } - } - - pub async fn update_flow_types( - &mut self, - flow_type_update_request: AquilaFlowTypeUpdateRequest, - ) -> AquilaFlowTypeUpdateResponse { - let request = Request::from_parts( - get_authorization_metadata(&self.token), - Extensions::new(), - SagittariusFlowTypeUpdateRequest { - flow_types: flow_type_update_request.flow_types, - }, - ); - - let response = match self.client.update(request).await { - Ok(response) => { - log::info!("Successfully transferred FlowTypes.",); - response.into_inner() - } - Err(err) => { - log::error!("Failed to update FlowTypes: {:?}", err); - return AquilaFlowTypeUpdateResponse { success: false }; - } - }; - - match response.success { - true => log::info!("Sagittarius successfully updated FlowTypes."), - false => log::error!("Sagittarius didn't update any FlowTypes."), - }; - - AquilaFlowTypeUpdateResponse { - success: response.success, - } - } -} diff --git a/src/sagittarius/function_service_client_impl.rs b/src/sagittarius/function_service_client_impl.rs deleted file mode 100644 index 4ac468d..0000000 --- a/src/sagittarius/function_service_client_impl.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::authorization::authorization::get_authorization_metadata; -use tonic::Extensions; -use tonic::Request; -use tonic::transport::Channel; -use tucana::sagittarius::function_definition_service_client::FunctionDefinitionServiceClient; - -pub struct SagittariusFunctionDefinitionServiceClient { - client: FunctionDefinitionServiceClient, - token: String, -} - -impl SagittariusFunctionDefinitionServiceClient { - pub fn new(channel: Channel, token: String) -> Self { - let client = FunctionDefinitionServiceClient::new(channel); - Self { client, token } - } - - pub async fn update_function_definitions( - &mut self, - function_update_request: tucana::aquila::FunctionDefinitionUpdateRequest, - ) -> tucana::aquila::FunctionDefinitionUpdateResponse { - let request = Request::from_parts( - get_authorization_metadata(&self.token), - Extensions::new(), - tucana::sagittarius::FunctionDefinitionUpdateRequest { - functions: function_update_request.functions, - }, - ); - - let response = match self.client.update(request).await { - Ok(response) => { - log::info!("Successfully transferred Functions.",); - response.into_inner() - } - Err(err) => { - log::error!("Failed to update Functions: {:?}", err); - return tucana::aquila::FunctionDefinitionUpdateResponse { success: false }; - } - }; - - match response.success { - true => log::info!("Sagittarius successfully updated Functions."), - false => log::error!("Sagittarius didn't update any Functions."), - }; - - tucana::aquila::FunctionDefinitionUpdateResponse { - success: response.success, - } - } -} diff --git a/src/sagittarius/mod.rs b/src/sagittarius/mod.rs index 7433acf..b2b351c 100644 --- a/src/sagittarius/mod.rs +++ b/src/sagittarius/mod.rs @@ -1,10 +1,6 @@ -pub mod action_configuration_service_client_impl; -pub mod data_type_service_client_impl; pub mod flow_service_client_impl; -pub mod flow_type_service_client_impl; +pub mod module_service_client_impl; pub mod retry; -pub mod runtime_function_service_client_impl; -pub mod function_service_client_impl; pub mod runtime_status_service_client_impl; pub mod runtime_usage_client_impl; pub mod test_execution_client_impl; diff --git a/src/sagittarius/module_service_client_impl.rs b/src/sagittarius/module_service_client_impl.rs new file mode 100644 index 0000000..c124185 --- /dev/null +++ b/src/sagittarius/module_service_client_impl.rs @@ -0,0 +1,50 @@ +use crate::authorization::authorization::get_authorization_metadata; +use tonic::transport::Channel; +use tonic::{Extensions, Request}; + +pub struct SagittariusModuleServiceClient { + client: tucana::sagittarius::module_service_client::ModuleServiceClient, + token: String, +} + +impl SagittariusModuleServiceClient { + pub fn new(channel: Channel, token: String) -> Self { + let client = tucana::sagittarius::module_service_client::ModuleServiceClient::new(channel); + + Self { client, token } + } + + pub async fn update_modules( + &mut self, + modules_update_request: tucana::aquila::ModuleUpdateRequest, + ) -> tucana::aquila::ModuleUpdateResponse { + let request = Request::from_parts( + get_authorization_metadata(&self.token), + Extensions::new(), + tucana::sagittarius::ModuleUpdateRequest { + modules: modules_update_request.modules, + }, + ); + + match self.client.update(request).await { + Ok(response) => { + let res = response.into_inner(); + match res.success { + true => log::info!("Sagittarius successfully updated Modules."), + false => log::error!( + "Sagittarius didn't update any Modules. Reason: {:?}", + res.error + ), + }; + + tucana::aquila::ModuleUpdateResponse { + success: res.success, + } + } + Err(err) => { + log::error!("Failed to update Modules via Sagittarius RPC transport: {:?}", err); + tucana::aquila::ModuleUpdateResponse { success: false } + } + } + } +} diff --git a/src/sagittarius/runtime_function_service_client_impl.rs b/src/sagittarius/runtime_function_service_client_impl.rs deleted file mode 100644 index 7326a1c..0000000 --- a/src/sagittarius/runtime_function_service_client_impl.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::authorization::authorization::get_authorization_metadata; -use tonic::Extensions; -use tonic::Request; -use tonic::transport::Channel; -use tucana::aquila::RuntimeFunctionDefinitionUpdateRequest as AquilaRuntimeFunctionUpdateRequest; -use tucana::aquila::RuntimeFunctionDefinitionUpdateResponse as AquilaRuntimeFunctionUpdateResponse; -use tucana::sagittarius::RuntimeFunctionDefinitionUpdateRequest as SagittariusRuntimeFunctionUpdateRequest; -use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient; - -pub struct SagittariusRuntimeFunctionServiceClient { - client: RuntimeFunctionDefinitionServiceClient, - token: String, -} - -impl SagittariusRuntimeFunctionServiceClient { - pub fn new(channel: Channel, token: String) -> Self { - let client = RuntimeFunctionDefinitionServiceClient::new(channel); - Self { client, token } - } - - pub async fn update_runtime_function_definitions( - &mut self, - runtime_function_update_request: AquilaRuntimeFunctionUpdateRequest, - ) -> AquilaRuntimeFunctionUpdateResponse { - let request = Request::from_parts( - get_authorization_metadata(&self.token), - Extensions::new(), - SagittariusRuntimeFunctionUpdateRequest { - runtime_functions: runtime_function_update_request.runtime_functions, - }, - ); - - let response = match self.client.update(request).await { - Ok(response) => { - log::info!("Successfully transferred RuntimeFunctions.",); - response.into_inner() - } - Err(err) => { - log::error!("Failed to update RuntimeFunctions: {:?}", err); - return AquilaRuntimeFunctionUpdateResponse { success: false }; - } - }; - - match response.success { - true => log::info!("Sagittarius successfully updated RuntimeFunctions."), - false => log::error!("Sagittarius didn't update any RuntimeFunctionRuntimeFunctions."), - }; - - AquilaRuntimeFunctionUpdateResponse { - success: response.success, - } - } -} diff --git a/src/sagittarius/runtime_status_service_client_impl.rs b/src/sagittarius/runtime_status_service_client_impl.rs index af891dc..f625d9a 100644 --- a/src/sagittarius/runtime_status_service_client_impl.rs +++ b/src/sagittarius/runtime_status_service_client_impl.rs @@ -25,6 +25,9 @@ impl SagittariusRuntimeStatusServiceClient { tucana::aquila::runtime_status_update_request::Status::ExecutionRuntimeStatus(execution_runtime_status) => { Some(tucana::sagittarius::runtime_status_update_request::Status::ExecutionRuntimeStatus(execution_runtime_status)) }, + tucana::aquila::runtime_status_update_request::Status::ActionStatus(action_status) => { + Some(tucana::sagittarius::runtime_status_update_request::Status::ActionStatus(action_status)) + }, }, None => None, }; diff --git a/src/sagittarius/test_execution_client_impl.rs b/src/sagittarius/test_execution_client_impl.rs index 1fcd619..8af053b 100644 --- a/src/sagittarius/test_execution_client_impl.rs +++ b/src/sagittarius/test_execution_client_impl.rs @@ -12,8 +12,8 @@ use tonic::Request; use tonic::transport::Channel; use tucana::sagittarius::execution_logon_request::Data; use tucana::sagittarius::execution_service_client::ExecutionServiceClient; -use tucana::sagittarius::{ExecutionLogonRequest, Logon, TestExecutionResponse}; -use tucana::shared::{ExecutionFlow, ValidationFlow, Value}; +use tucana::sagittarius::{ExecutionLogonRequest, Logon}; +use tucana::shared::{ExecutionFlow, ExecutionResult, ValidationFlow}; pub struct SagittariusTestExecutionServiceClient { nats_client: async_nats::Client, @@ -111,16 +111,12 @@ impl SagittariusTestExecutionServiceClient { let topic = format!("test_execution.{}", uuid); let result = self.nats_client.request(topic, bytes.into()).await; + // Aquila will expect a `Execution Result` back from Taurus match result { - Ok(message) => match Value::decode(message.payload) { + Ok(message) => match ExecutionResult::decode(message.payload) { Ok(value) => { let execution_result = ExecutionLogonRequest { - data: Some(Data::Response(TestExecutionResponse { - flow_id: request.flow_id, - execution_uuid: uuid, - result: Some(value), - logs: vec![], - })), + data: Some(Data::Response(value)), }; if let Err(err) = tx.send(execution_result).await { diff --git a/src/server/action_transfer_service_server_impl.rs b/src/server/action_transfer_service_server_impl.rs index 317cab0..f82277a 100644 --- a/src/server/action_transfer_service_server_impl.rs +++ b/src/server/action_transfer_service_server_impl.rs @@ -9,8 +9,9 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tucana::{ aquila::{ - ActionLogon, Event, ExecutionRequest, ExecutionResult, TransferRequest, TransferResponse, - action_transfer_service_server::ActionTransferService, transfer_response, + ActionEvent, ActionExecutionRequest, ActionExecutionResponse, ActionLogon, + ActionTransferRequest, ActionTransferResponse, + action_transfer_service_server::ActionTransferService, }, shared::{ExecutionFlow, Flows, ValidationFlow, Value}, }; @@ -21,7 +22,7 @@ pub struct AquilaActionTransferServiceServer { client: async_nats::Client, kv: async_nats::jetstream::kv::Store, actions: ServiceConfiguration, - action_config_tx: tokio::sync::broadcast::Sender, + action_config_tx: tokio::sync::broadcast::Sender, is_static: bool, } @@ -30,7 +31,7 @@ impl AquilaActionTransferServiceServer { client: async_nats::Client, kv: async_nats::jetstream::kv::Store, actions: ServiceConfiguration, - action_config_tx: tokio::sync::broadcast::Sender, + action_config_tx: tokio::sync::broadcast::Sender, is_static: bool, ) -> Self { Self { @@ -119,12 +120,12 @@ fn convert_validation_flow(flow: ValidationFlow, input_value: Option) -> } fn applies_to_action( - configs: &tucana::shared::ActionConfigurations, + configs: &tucana::shared::ModuleConfigurations, action_identifier: &str, ) -> bool { - configs.action_configurations.iter().any(|project_cfg| { + configs.module_configurations.iter().any(|project_cfg| { project_cfg - .action_configurations + .module_configurations .iter() .any(|cfg| cfg.identifier == action_identifier) }) @@ -132,7 +133,7 @@ fn applies_to_action( /// Extracts the bearer token from gRPC metadata. fn extract_token( - request: &tonic::Request>, + request: &tonic::Request>, ) -> Result { log::debug!("Extracting authorization token from metadata"); match request.metadata().get("authorization") { @@ -156,18 +157,24 @@ async fn handle_logon( action_logon: ActionLogon, actions: Arc>, client: async_nats::Client, - cfg_tx: tokio::sync::broadcast::Sender, - tx: tokio::sync::mpsc::Sender>, + cfg_tx: tokio::sync::broadcast::Sender, + tx: tokio::sync::mpsc::Sender>, pending_replies: PendingReplies, cfg_forwarder_started: &mut bool, ) -> Result { log::info!("Action successfull logged on: {:?}", action_logon); + let identifier = match action_logon.module { + Some(ref m) => m.identifier.clone(), + None => { + return Err(Status::aborted("Please provide a module configuration.")); + } + }; let lock = actions.lock().await; if !lock.has_action(&token.to_string()) { log::debug!( "Rejected action with identifer: {}, becuase its not registered", - action_logon.action_identifier + identifier ); return Err(Status::unauthenticated( "token not matching to action identifier", @@ -176,18 +183,15 @@ async fn handle_logon( log::debug!( "Action with identifer: {}, connected successfully", - action_logon.action_identifier + identifier ); - let sub = match client - .subscribe(format!("action.{}.*", action_logon.action_identifier)) - .await - { + let sub = match client.subscribe(format!("action.{}.*", identifier)).await { Ok(s) => s, Err(err) => { log::error!( "Could not subscribe to action: {}. Reason: {:?}", - action_logon.action_identifier, + identifier, err ); return Err(Status::internal( @@ -196,10 +200,7 @@ async fn handle_logon( } }; - log::debug!( - "Subscribed to action subject action.{}.*", - action_logon.action_identifier - ); + log::debug!("Subscribed to action subject action.{}.*", identifier); let tx_clone = tx.clone(); let pending_replies_clone = pending_replies.clone(); @@ -209,11 +210,8 @@ async fn handle_logon( if !*cfg_forwarder_started { *cfg_forwarder_started = true; - log::debug!( - "Starting config forwarder for action {}", - action_logon.action_identifier - ); - spawn_cfg_forwarder(action_logon.action_identifier.clone(), cfg_tx, tx.clone()); + log::debug!("Starting config forwarder for action {}", identifier); + spawn_cfg_forwarder(identifier.clone(), cfg_tx, tx.clone()); } Ok(action_logon) @@ -222,8 +220,8 @@ async fn handle_logon( /// Forwards config updates for the given action identifier to the gRPC stream. fn spawn_cfg_forwarder( action_identifier: String, - cfg_tx: tokio::sync::broadcast::Sender, - tx: tokio::sync::mpsc::Sender>, + cfg_tx: tokio::sync::broadcast::Sender, + tx: tokio::sync::mpsc::Sender>, ) { let mut cfg_rx = cfg_tx.subscribe(); tokio::spawn(async move { @@ -237,8 +235,10 @@ fn spawn_cfg_forwarder( } log::debug!("Forwarding config update to action {}", action_identifier); - let resp = TransferResponse { - data: Some(transfer_response::Data::ActionConfigurations(cfgs)), + let resp = ActionTransferResponse { + data: Some( + tucana::aquila::action_transfer_response::Data::ModuleConfigurations(cfgs), + ), }; if tx.send(Ok(resp)).await.is_err() { @@ -253,7 +253,7 @@ fn spawn_cfg_forwarder( /// Looks up matching flows for an event and requests their execution. async fn handle_event( - event: Event, + event: ActionEvent, kv: async_nats::jetstream::kv::Store, client: async_nats::Client, ) { @@ -297,7 +297,7 @@ async fn handle_event( /// Publishes execution results back to the original NATS reply subject. async fn handle_result( - execution_result: ExecutionResult, + execution_result: ActionExecutionResponse, client: async_nats::Client, pending_replies: PendingReplies, ) { @@ -335,11 +335,11 @@ async fn handle_result( #[tonic::async_trait] impl ActionTransferService for AquilaActionTransferServiceServer { type TransferStream = - Pin> + Send + 'static>>; + Pin> + Send + 'static>>; async fn transfer( &self, - request: tonic::Request>, + request: tonic::Request>, ) -> std::result::Result, tonic::Status> { let token = extract_token(&request)?; @@ -354,7 +354,8 @@ impl ActionTransferService for AquilaActionTransferServiceServer { let is_static = self.is_static; let pending_replies: PendingReplies = Arc::new(Mutex::new(HashMap::new())); - let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let (tx, rx) = + tokio::sync::mpsc::channel::>(32); tokio::spawn(async move { let mut cfg_forwarder_started = false; @@ -381,11 +382,16 @@ impl ActionTransferService for AquilaActionTransferServiceServer { first_request = false; match data { - tucana::aquila::transfer_request::Data::Logon(action_logon) => { - log::debug!( - "Received logon for action {}", - action_logon.action_identifier - ); + tucana::aquila::action_transfer_request::Data::Logon(action_logon) => { + let identifier = match action_logon.module { + Some(ref m) => m.identifier.clone(), + None => { + log::error!("Logon failed (no module present)"); + break; + } + }; + + log::debug!("Received logon for action {}", identifier); let accepted = match handle_logon( &token, @@ -427,9 +433,17 @@ impl ActionTransferService for AquilaActionTransferServiceServer { } }; + let identifier = match props.module { + Some(ref m) => m.identifier.clone(), + None => { + log::error!("Logon failed (no module present)"); + break; + } + }; + if is_static { let lock = actions.lock().await; - let configs = lock.get_action_configuration(&props.action_identifier); + let configs = lock.get_action_configuration(&identifier); for conf in configs { if let Err(err) = cfg_tx.send(conf) { log::warn!("No action configuration receivers available: {:?}", err); @@ -438,19 +452,19 @@ impl ActionTransferService for AquilaActionTransferServiceServer { }; match data { - tucana::aquila::transfer_request::Data::Logon(_) => { + tucana::aquila::action_transfer_request::Data::Logon(_) => { log::error!("Received duplicate logon after initial logon"); break; } - tucana::aquila::transfer_request::Data::Event(event) => { - log::debug!("Received event from action {}", props.action_identifier); + tucana::aquila::action_transfer_request::Data::Event(event) => { + log::debug!("Received event from action {}", identifier); handle_event(event, kv.clone(), client.clone()).await; } - tucana::aquila::transfer_request::Data::Result(execution_result) => { + tucana::aquila::action_transfer_request::Data::Result(execution_result) => { log::debug!( "Received execution result {} from action {}", execution_result.execution_identifier, - props.action_identifier + identifier, ); handle_result(execution_result, client.clone(), pending_replies.clone()) @@ -469,7 +483,7 @@ impl ActionTransferService for AquilaActionTransferServiceServer { /// Forwards NATS execution requests to the connected action via gRPC and stores reply subjects. async fn forward_nats_to_action( mut sub: Subscriber, - tx: tokio::sync::mpsc::Sender>, + tx: tokio::sync::mpsc::Sender>, pending_replies: PendingReplies, ) { log::debug!("Waiting for incoming request"); @@ -477,7 +491,7 @@ async fn forward_nats_to_action( while let Some(msg) = sub.next().await { log::debug!("Received RemoteRuntime execution request"); - let execution = match ExecutionRequest::decode(msg.payload.as_ref()) { + let execution = match ActionExecutionRequest::decode(msg.payload.as_ref()) { Ok(req) => req, Err(err) => { log::error!("Invalid execution request payload: {:?}", err); @@ -506,8 +520,10 @@ async fn forward_nats_to_action( execution_id ); - let resp = TransferResponse { - data: Some(transfer_response::Data::Execution(execution)), + let resp = ActionTransferResponse { + data: Some(tucana::aquila::action_transfer_response::Data::Execution( + execution, + )), }; if tx.send(Ok(resp)).await.is_err() { diff --git a/src/server/data_type_service_server_impl.rs b/src/server/data_type_service_server_impl.rs deleted file mode 100644 index ef4b222..0000000 --- a/src/server/data_type_service_server_impl.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::{ - authorization::authorization::extract_token, configuration::service::ServiceConfiguration, - sagittarius::data_type_service_client_impl::SagittariusDataTypeServiceClient, -}; -use std::sync::Arc; -use tokio::sync::Mutex; -use tonic::Status; -use tucana::aquila::data_type_service_server::DataTypeService; - -pub struct AquilaDataTypeServiceServer { - service_configuration: ServiceConfiguration, - client: Arc>, -} - -impl AquilaDataTypeServiceServer { - pub fn new( - client: Arc>, - service_configuration: ServiceConfiguration, - ) -> Self { - Self { - client, - service_configuration, - } - } -} - -#[tonic::async_trait] -impl DataTypeService for AquilaDataTypeServiceServer { - async fn update( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let token = match extract_token(&request) { - Ok(t) => t, - Err(status) => return Err(status), - }; - - if !self.service_configuration.has_service(&token.to_string()) { - return Err(Status::unauthenticated("token is not valid")); - } - - let data_type_update_request = request.into_inner(); - - log::debug!( - "Received DataTypes: {:?}", - data_type_update_request - .data_types - .iter() - .map(|d| d.identifier.clone()) - .collect::>() - ); - - let mut client = self.client.lock().await; - let response = client.update_data_types(data_type_update_request).await; - - Ok(tonic::Response::new( - tucana::aquila::DataTypeUpdateResponse { - success: response.success, - }, - )) - } -} diff --git a/src/server/flow_type_service_server_impl.rs b/src/server/flow_type_service_server_impl.rs deleted file mode 100644 index c8163ad..0000000 --- a/src/server/flow_type_service_server_impl.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::sync::Arc; -use tokio::sync::Mutex; -use tonic::Status; -use tucana::aquila::flow_type_service_server::FlowTypeService; - -use crate::{ - authorization::authorization::extract_token, configuration::service::ServiceConfiguration, sagittarius::flow_type_service_client_impl::SagittariusFlowTypeServiceClient -}; - -pub struct AquilaFlowTypeServiceServer { - client: Arc>, - service_configuration: ServiceConfiguration, -} - -impl AquilaFlowTypeServiceServer { - pub fn new( - client: Arc>, - service_configuration: ServiceConfiguration, - ) -> Self { - Self { - client, - service_configuration, - } - } -} - -#[tonic::async_trait] -impl FlowTypeService for AquilaFlowTypeServiceServer { - async fn update( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let token = match extract_token(&request) { - Ok(t) => t, - Err(status) => return Err(status), - }; - - if !self.service_configuration.has_service(&token.to_string()) { - return Err(Status::unauthenticated("token is not valid")); - } - - let flow_type_update_request = request.into_inner(); - - log::debug!( - "Received FlowTypes: {:?}", - flow_type_update_request - .flow_types - .iter() - .map(|f| f.identifier.clone()) - .collect::>() - ); - - let mut client = self.client.lock().await; - let response = client.update_flow_types(flow_type_update_request).await; - - Ok(tonic::Response::new( - tucana::aquila::FlowTypeUpdateResponse { - success: response.success, - }, - )) - } -} diff --git a/src/server/function_service_server_impl.rs b/src/server/function_service_server_impl.rs deleted file mode 100644 index 303afa1..0000000 --- a/src/server/function_service_server_impl.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::Mutex; -use tonic::Status; -use tucana::aquila::function_definition_service_server::FunctionDefinitionService; - -use crate::{ - authorization::authorization::extract_token, configuration::service::ServiceConfiguration, sagittarius::function_service_client_impl::SagittariusFunctionDefinitionServiceClient -}; - -pub struct AquilaFunctionDefinitionServiceServer { - client: Arc>, - service_configuration: ServiceConfiguration, -} - -impl AquilaFunctionDefinitionServiceServer { - pub fn new( - client: Arc>, - service_configuration: ServiceConfiguration, - ) -> Self { - Self { - client, - service_configuration, - } - } -} - -#[tonic::async_trait] -impl FunctionDefinitionService for AquilaFunctionDefinitionServiceServer { - async fn update( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { - let token = match extract_token(&request) { - Ok(t) => t, - Err(status) => return Err(status), - }; - - if !self.service_configuration.has_service(&token.to_string()) { - return Err(Status::unauthenticated("token is not valid")); - } - - let function_definition_update_request = request.into_inner(); - - log::debug!( - "Received Functions: {:?}", - function_definition_update_request - .functions - .iter() - .map(|f| f.runtime_name.clone()) - .collect::>() - ); - - let mut client = self.client.lock().await; - let response = client - .update_function_definitions(function_definition_update_request) - .await; - - Ok(tonic::Response::new( - tucana::aquila::FunctionDefinitionUpdateResponse { - success: response.success, - }, - )) - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs index 7787cb5..f3eb778 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,25 +1,19 @@ use crate::{ configuration::{config::Config, service::ServiceConfiguration, state::AppReadiness}, sagittarius::{ - data_type_service_client_impl::SagittariusDataTypeServiceClient, - flow_type_service_client_impl::SagittariusFlowTypeServiceClient, - function_service_client_impl::SagittariusFunctionDefinitionServiceClient, - runtime_function_service_client_impl::SagittariusRuntimeFunctionServiceClient, + module_service_client_impl::SagittariusModuleServiceClient, runtime_status_service_client_impl::SagittariusRuntimeStatusServiceClient, runtime_usage_client_impl::SagittariusRuntimeUsageClient, }, server::{ action_transfer_service_server_impl::AquilaActionTransferServiceServer, - function_service_server_impl::AquilaFunctionDefinitionServiceServer, + module_service_server_impl::AquilaModuleServiceServer, runtime_status_service_server_impl::AquilaRuntimeStatusServiceServer, runtime_usage_service_server_impl::AquilaRuntimeUsageServiceServer, }, }; use async_nats::jetstream::kv::Store; -use data_type_service_server_impl::AquilaDataTypeServiceServer; -use flow_type_service_server_impl::AquilaFlowTypeServiceServer; use log::info; -use runtime_function_service_server_impl::AquilaRuntimeFunctionServiceServer; use std::{net::SocketAddr, sync::Arc}; use tokio::sync::Mutex; use tonic::{ @@ -28,19 +22,13 @@ use tonic::{ }; use tucana::aquila::{ action_transfer_service_server::ActionTransferServiceServer, - data_type_service_server::DataTypeServiceServer, - flow_type_service_server::FlowTypeServiceServer, - function_definition_service_server::FunctionDefinitionServiceServer, - runtime_function_definition_service_server::RuntimeFunctionDefinitionServiceServer, + module_service_server::ModuleServiceServer, runtime_status_service_server::RuntimeStatusServiceServer, runtime_usage_service_server::RuntimeUsageServiceServer, }; mod action_transfer_service_server_impl; -mod data_type_service_server_impl; -mod flow_type_service_server_impl; -mod function_service_server_impl; -mod runtime_function_service_server_impl; +mod module_service_server_impl; mod runtime_status_service_server_impl; mod runtime_usage_service_server_impl; @@ -54,7 +42,7 @@ pub struct AquilaGRPCServer { service_configuration: ServiceConfiguration, nats_client: async_nats::Client, kv_store: Arc, - action_config_tx: tokio::sync::broadcast::Sender, + action_config_tx: tokio::sync::broadcast::Sender, is_static: bool, } @@ -66,7 +54,7 @@ impl AquilaGRPCServer { service_configuration: ServiceConfiguration, nats_client: async_nats::Client, kv_store: Arc, - action_config_tx: tokio::sync::broadcast::Sender, + action_config_tx: tokio::sync::broadcast::Sender, ) -> Self { let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() { Ok(addr) => { @@ -92,32 +80,12 @@ impl AquilaGRPCServer { } pub async fn start(&self) -> Result<(), tonic::transport::Error> { - let data_type_service = Arc::new(Mutex::new(SagittariusDataTypeServiceClient::new( + let module_service = Arc::new(Mutex::new(SagittariusModuleServiceClient::new( self.channel.clone(), self.token.clone(), ))); - info!("DataTypeService started"); - - let flow_type_service = Arc::new(Mutex::new(SagittariusFlowTypeServiceClient::new( - self.channel.clone(), - self.token.clone(), - ))); - info!("FlowTypeService started"); - - let runtime_function_service = Arc::new(Mutex::new( - SagittariusRuntimeFunctionServiceClient::new(self.channel.clone(), self.token.clone()), - )); - - info!("RuntimeFunctionService started"); - - let function_service = - Arc::new(Mutex::new(SagittariusFunctionDefinitionServiceClient::new( - self.channel.clone(), - self.token.clone(), - ))); - - info!("FunctionService started"); + info!("ModuleService started"); let runtime_usage_service = Arc::new(Mutex::new(SagittariusRuntimeUsageClient::new( self.channel.clone(), @@ -132,20 +100,8 @@ impl AquilaGRPCServer { info!("RuntimeStatusService started"); - let data_type_server = AquilaDataTypeServiceServer::new( - data_type_service.clone(), - self.service_configuration.clone(), - ); - let flow_type_server = AquilaFlowTypeServiceServer::new( - flow_type_service.clone(), - self.service_configuration.clone(), - ); - let function_server = AquilaFunctionDefinitionServiceServer::new( - function_service.clone(), - self.service_configuration.clone(), - ); - let runtime_function_server = AquilaRuntimeFunctionServiceServer::new( - runtime_function_service.clone(), + let module_server = AquilaModuleServiceServer::new( + module_service.clone(), self.service_configuration.clone(), ); let runtime_usage_server = AquilaRuntimeUsageServiceServer::new( @@ -191,20 +147,8 @@ impl AquilaGRPCServer { .add_service(tonic_health::pb::health_server::HealthServer::new( health_service, )) - .add_service(DataTypeServiceServer::with_interceptor( - data_type_server, - intercept.clone(), - )) - .add_service(FlowTypeServiceServer::with_interceptor( - flow_type_server, - intercept.clone(), - )) - .add_service(RuntimeFunctionDefinitionServiceServer::with_interceptor( - runtime_function_server, - intercept.clone(), - )) - .add_service(FunctionDefinitionServiceServer::with_interceptor( - function_server, + .add_service(ModuleServiceServer::with_interceptor( + module_server, intercept.clone(), )) .add_service(RuntimeUsageServiceServer::with_interceptor( @@ -223,20 +167,8 @@ impl AquilaGRPCServer { .await } else { Server::builder() - .add_service(DataTypeServiceServer::with_interceptor( - data_type_server, - intercept.clone(), - )) - .add_service(FlowTypeServiceServer::with_interceptor( - flow_type_server, - intercept.clone(), - )) - .add_service(RuntimeFunctionDefinitionServiceServer::with_interceptor( - runtime_function_server, - intercept.clone(), - )) - .add_service(FunctionDefinitionServiceServer::with_interceptor( - function_server, + .add_service(ModuleServiceServer::with_interceptor( + module_server, intercept.clone(), )) .add_service(RuntimeUsageServiceServer::with_interceptor( diff --git a/src/server/module_service_server_impl.rs b/src/server/module_service_server_impl.rs new file mode 100644 index 0000000..e463463 --- /dev/null +++ b/src/server/module_service_server_impl.rs @@ -0,0 +1,60 @@ +use crate::{ + authorization::authorization::extract_token, configuration::service::ServiceConfiguration, + sagittarius::module_service_client_impl::SagittariusModuleServiceClient, +}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tonic::Status; +use tucana::aquila::module_service_server::ModuleService; + +pub struct AquilaModuleServiceServer { + service_configuration: ServiceConfiguration, + client: Arc>, +} + +impl AquilaModuleServiceServer { + pub fn new( + client: Arc>, + service_configuration: ServiceConfiguration, + ) -> Self { + Self { + client, + service_configuration, + } + } +} + +#[tonic::async_trait] +impl ModuleService for AquilaModuleServiceServer { + async fn update( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let token = match extract_token(&request) { + Ok(t) => t, + Err(status) => return Err(status), + }; + + if !self.service_configuration.has_service(&token.to_string()) { + return Err(Status::unauthenticated("token is not valid")); + } + + let modules_update_request = request.into_inner(); + + log::debug!( + "Received Modules: {:?}", + modules_update_request + .modules + .iter() + .map(|d| d.identifier.clone()) + .collect::>() + ); + + let mut client = self.client.lock().await; + let response = client.update_modules(modules_update_request).await; + + Ok(tonic::Response::new(tucana::aquila::ModuleUpdateResponse { + success: response.success, + })) + } +} diff --git a/src/server/runtime_function_service_server_impl.rs b/src/server/runtime_function_service_server_impl.rs deleted file mode 100644 index 52bcf17..0000000 --- a/src/server/runtime_function_service_server_impl.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::Mutex; -use tonic::Status; -use tucana::aquila::runtime_function_definition_service_server::RuntimeFunctionDefinitionService; - -use crate::{ - authorization::authorization::extract_token, configuration::service::ServiceConfiguration, sagittarius::runtime_function_service_client_impl::SagittariusRuntimeFunctionServiceClient -}; - -pub struct AquilaRuntimeFunctionServiceServer { - client: Arc>, - service_configuration: ServiceConfiguration, -} - -impl AquilaRuntimeFunctionServiceServer { - pub fn new( - client: Arc>, - service_configuration: ServiceConfiguration, - ) -> Self { - Self { - client, - service_configuration, - } - } -} - -#[tonic::async_trait] -impl RuntimeFunctionDefinitionService for AquilaRuntimeFunctionServiceServer { - async fn update( - &self, - request: tonic::Request, - ) -> Result< - tonic::Response, - tonic::Status, - > { - let token = match extract_token(&request) { - Ok(t) => t, - Err(status) => return Err(status), - }; - - if !self.service_configuration.has_service(&token.to_string()) { - return Err(Status::unauthenticated("token is not valid")); - } - - let runtime_function_definition_update_request = request.into_inner(); - - log::debug!( - "Received RuntimeFunctions: {:?}", - runtime_function_definition_update_request - .runtime_functions - .iter() - .map(|f| f.runtime_name.clone()) - .collect::>() - ); - - let mut client = self.client.lock().await; - let response = client - .update_runtime_function_definitions(runtime_function_definition_update_request) - .await; - - Ok(tonic::Response::new( - tucana::aquila::RuntimeFunctionDefinitionUpdateResponse { - success: response.success, - }, - )) - } -} diff --git a/src/startup/dynamic_mode.rs b/src/startup/dynamic_mode.rs index bff3c38..76daa51 100644 --- a/src/startup/dynamic_mode.rs +++ b/src/startup/dynamic_mode.rs @@ -10,7 +10,7 @@ pub async fn run( config: AquilaConfig, app_readiness: AppReadiness, kv_store: Arc, - action_config_tx: tokio::sync::broadcast::Sender, + action_config_tx: tokio::sync::broadcast::Sender, mut server_task: tokio::task::JoinHandle<()>, ) { let kv_for_flow = kv_store.clone(); diff --git a/src/startup/mod.rs b/src/startup/mod.rs index c4e5a54..f152e02 100644 --- a/src/startup/mod.rs +++ b/src/startup/mod.rs @@ -57,7 +57,7 @@ pub async fn run( .await; let (action_config_tx, _) = - tokio::sync::broadcast::channel::(64); + tokio::sync::broadcast::channel::(64); let server = AquilaGRPCServer::new( &config,