diff --git a/Cargo.lock b/Cargo.lock index 3994ee46..cd64587b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,6 +247,8 @@ version = "0.1.0" dependencies = [ "prost", "prost-types", + "serde", + "serde_json", "time", "tonic", "tonic-build", @@ -393,6 +395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -570,6 +573,7 @@ dependencies = [ "serde", "serde_json", "strum_macros", + "time", "tokio", ] @@ -1116,6 +1120,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mock-cloud-connector" +version = "0.1.0" +dependencies = [ + "async-trait", + "cloud-connector-proto", + "env_logger", + "freyja-build-common", + "freyja-common", + "log", + "serde", + "serde_json", + "tokio", + "tonic", +] + [[package]] name = "mock-digital-twin" version = "0.1.0" @@ -1878,10 +1898,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde", "time-core", + "time-macros", ] [[package]] @@ -1890,6 +1912,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +[[package]] +name = "time-macros" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" diff --git a/Cargo.toml b/Cargo.toml index bcb3d09e..d82e02f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "build_common", "common", "freyja", + "mocks/mock_cloud_connector", "mocks/mock_digital_twin", "mocks/mock_mapping_service", "proc_macros", diff --git a/adapters/cloud/grpc_cloud_adapter/src/grpc_cloud_adapter.rs b/adapters/cloud/grpc_cloud_adapter/src/grpc_cloud_adapter.rs index 97e47a1c..6794382c 100644 --- a/adapters/cloud/grpc_cloud_adapter/src/grpc_cloud_adapter.rs +++ b/adapters/cloud/grpc_cloud_adapter/src/grpc_cloud_adapter.rs @@ -2,17 +2,16 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT +use std::sync::Arc; use std::time::Duration; -use std::{str::FromStr, sync::Arc}; use async_trait::async_trait; use log::debug; use tokio::sync::Mutex; use tonic::transport::Channel; -use cloud_connector_proto::{ - prost_types::Timestamp, - v1::{cloud_connector_client::CloudConnectorClient, UpdateDigitalTwinRequestBuilder}, +use cloud_connector_proto::v1::{ + cloud_connector_client::CloudConnectorClient, UpdateDigitalTwinRequestBuilder, }; use freyja_build_common::config_file_stem; use freyja_common::{ @@ -81,12 +80,9 @@ impl CloudAdapter for GRPCCloudAdapter { ) -> Result { debug!("Received a request to send to the cloud"); - let timestamp = Timestamp::from_str(cloud_message.signal_timestamp.as_str()) - .map_err(CloudAdapterError::deserialize)?; - let request = UpdateDigitalTwinRequestBuilder::new() .string_value(cloud_message.signal_value) - .timestamp(timestamp) + .timestamp_offset(cloud_message.signal_timestamp) .metadata(cloud_message.metadata) .build(); diff --git a/adapters/cloud/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs b/adapters/cloud/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs index 945bdba8..463220d7 100644 --- a/adapters/cloud/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs +++ b/adapters/cloud/in_memory_mock_cloud_adapter/src/in_memory_mock_cloud_adapter.rs @@ -75,7 +75,7 @@ mod in_memory_mock_cloud_adapter_tests { let cloud_message = CloudMessageRequest { metadata: HashMap::new(), signal_value: String::from("72"), - signal_timestamp: OffsetDateTime::now_utc().to_string(), + signal_timestamp: OffsetDateTime::now_utc(), }; assert!(cloud_adapter.send_to_cloud(cloud_message).await.is_ok()); diff --git a/adapters/data/in_memory_mock_data_adapter/res/in_memory_mock_data_adapter_config.default.json b/adapters/data/in_memory_mock_data_adapter/res/in_memory_mock_data_adapter_config.default.json index 3a9891ad..971add68 100644 --- a/adapters/data/in_memory_mock_data_adapter/res/in_memory_mock_data_adapter_config.default.json +++ b/adapters/data/in_memory_mock_data_adapter/res/in_memory_mock_data_adapter_config.default.json @@ -2,19 +2,19 @@ "signal_update_frequency_ms": 1000, "entities": [ { - "entity_id": "dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1", + "entity_id": "dtmi:sdv:HVAC:AmbientAirTemperature;1", "values": { "Static": 42.0 } }, { - "entity_id": "dtmi:sdv:Vehicle:Cabin:HVAC:IsAirConditioningActive;1", + "entity_id": "dtmi:sdv:HVAC:IsAirConditioningActive;1", "values": { "Static": 0.0 } }, { - "entity_id": "dtmi:sdv:Vehicle:OBD:HybridBatteryRemaining;1", + "entity_id": "dtmi:sdv:OBD:HybridBatteryRemaining;1", "values": { "Stepwise": { "start": 77.7, diff --git a/adapters/data/in_memory_mock_data_adapter/src/in_memory_mock_data_adapter.rs b/adapters/data/in_memory_mock_data_adapter/src/in_memory_mock_data_adapter.rs index df78002b..3fca99fe 100644 --- a/adapters/data/in_memory_mock_data_adapter/src/in_memory_mock_data_adapter.rs +++ b/adapters/data/in_memory_mock_data_adapter/src/in_memory_mock_data_adapter.rs @@ -138,9 +138,10 @@ impl DataAdapter for InMemoryMockDataAdapter { { let data = data.lock().await; for entity_id in entities_with_subscribe { - if Self::generate_signal_value(&entity_id, signals.clone(), &data).is_err() + if let Err(e) = + Self::generate_signal_value(&entity_id, signals.clone(), &data) { - warn!("Attempt to set value for non-existent entity {entity_id}"); + warn!("Attempt to set value for non-existent entity {entity_id}: {e}"); } } } diff --git a/common/Cargo.toml b/common/Cargo.toml index 237b4708..32c6112b 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -17,4 +17,5 @@ proc-macros = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } strum_macros = { workspace = true } +time = { workspace = true } tokio = { workspace = true } \ No newline at end of file diff --git a/common/src/cloud_adapter.rs b/common/src/cloud_adapter.rs index 8d84353d..41415b2f 100644 --- a/common/src/cloud_adapter.rs +++ b/common/src/cloud_adapter.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; use tokio::sync::Mutex; use crate::service_discovery_adapter_selector::ServiceDiscoveryAdapterSelector; @@ -42,7 +43,7 @@ pub struct CloudMessageRequest { pub signal_value: String, // Timestamp of when the signal was emitted - pub signal_timestamp: String, + pub signal_timestamp: OffsetDateTime, } /// Represents a response to a message sent to the cloud digital twin diff --git a/freyja/src/emitter.rs b/freyja/src/emitter.rs index 62e3b977..cd2bb2ee 100644 --- a/freyja/src/emitter.rs +++ b/freyja/src/emitter.rs @@ -173,7 +173,7 @@ impl let cloud_message = CloudMessageRequest { metadata: signal.target.metadata.clone(), signal_value: converted, - signal_timestamp: OffsetDateTime::now_utc().to_string(), + signal_timestamp: OffsetDateTime::now_utc(), }; let response = self diff --git a/mocks/mock_cloud_connector/Cargo.toml b/mocks/mock_cloud_connector/Cargo.toml new file mode 100644 index 00000000..e3515ce9 --- /dev/null +++ b/mocks/mock_cloud_connector/Cargo.toml @@ -0,0 +1,24 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. +# SPDX-License-Identifier: MIT + +[package] +name = "mock-cloud-connector" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[dependencies] +async-trait = { workspace = true } +cloud-connector-proto = { workspace = true } +env_logger = { workspace = true } +freyja-build-common = { workspace = true } +freyja-common = { workspace = true } +log = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } + +[build-dependencies] +freyja-build-common = { workspace = true } \ No newline at end of file diff --git a/mocks/mock_cloud_connector/README.md b/mocks/mock_cloud_connector/README.md new file mode 100644 index 00000000..9dd099bb --- /dev/null +++ b/mocks/mock_cloud_connector/README.md @@ -0,0 +1,25 @@ +# Mock Cloud Connector + +The Mock Cloud Connector mocks the behavior of a Cloud Connector. This enables functionality similar to the [In-Memory Mock Cloud Adapter](../../adapters/cloud/in_memory_mock_cloud_adapter/README.md) while also utilizing the standard Freyja application. + +The Mock Cloud Connector implements the [Cloud Connector API](../../interfaces/cloud_connector/v1/cloud_connector.proto), making it compatible with the [gRPC Cloud Adapter](../../adapters/cloud/grpc_cloud_adapter/README.md). + +## Configuration + +This mock supports the following configuration: + +- `server_authority`: The authority that will be used for hosting the mock cloud connector service. + +This mock supports [config overrides](../../docs/tutorials/config-overrides.md). The override filename is `mock_cloud_connector_config.json`, and the default config is located at `res/mock_cloud_connector_config.default.json`. + +## Behavior + +This cloud connector prints the requests that it receives to the console, enabling users to verify that data is flowing from Freyja to the cloud connector. As a mock, this cloud connector does not have any cloud connectivity. + +## Build and Run + +To build and run the Mock Cloud Connector, run the following command: + +```shell +cargo run -p mock-cloud-connector +``` diff --git a/mocks/mock_cloud_connector/build.rs b/mocks/mock_cloud_connector/build.rs new file mode 100644 index 00000000..1c2b6171 --- /dev/null +++ b/mocks/mock_cloud_connector/build.rs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use freyja_build_common::copy_config; + +const CONFIG_FILE_STEM: &str = "mock_cloud_connector_config"; + +fn main() { + copy_config(CONFIG_FILE_STEM); +} diff --git a/mocks/mock_cloud_connector/res/mock_cloud_connector_config.default.json b/mocks/mock_cloud_connector/res/mock_cloud_connector_config.default.json new file mode 100644 index 00000000..f16ca973 --- /dev/null +++ b/mocks/mock_cloud_connector/res/mock_cloud_connector_config.default.json @@ -0,0 +1,3 @@ +{ + "server_authority": "0.0.0.0:5176" +} diff --git a/mocks/mock_cloud_connector/src/config.rs b/mocks/mock_cloud_connector/src/config.rs new file mode 100644 index 00000000..50e5a975 --- /dev/null +++ b/mocks/mock_cloud_connector/src/config.rs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use serde::{Deserialize, Serialize}; + +/// Config for the mock cloud connector +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Config { + /// The server authority for hosting a gRPC server + pub server_authority: String, +} diff --git a/mocks/mock_cloud_connector/src/main.rs b/mocks/mock_cloud_connector/src/main.rs new file mode 100644 index 00000000..70060169 --- /dev/null +++ b/mocks/mock_cloud_connector/src/main.rs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +mod config; +mod mock_cloud_connector_impl; + +use std::env; + +use cloud_connector_proto::v1::cloud_connector_server::CloudConnectorServer; +use env_logger::Target; +use log::{info, LevelFilter}; +use tonic::transport::Server; + +use crate::{config::Config, mock_cloud_connector_impl::MockCloudConnectorImpl}; +use freyja_build_common::config_file_stem; +use freyja_common::{ + cmd_utils::{get_log_level, parse_args}, + config_utils, out_dir, +}; + +/// Starts the following threads and tasks: +/// - A thread which listens for input from the command window +/// - A task which handles async get responses +/// - A task which handles publishing to subscribers +/// - A gRPC server to accept incoming requests +#[tokio::main] +async fn main() { + let args = parse_args(env::args()).expect("Failed to parse args"); + + // Setup logging + let log_level = get_log_level(&args, LevelFilter::Info).expect("Could not parse log level"); + env_logger::Builder::new() + .filter(None, log_level) + .target(Target::Stdout) + .init(); + + let config: Config = config_utils::read_from_files( + config_file_stem!(), + config_utils::JSON_EXT, + out_dir!(), + |e| log::error!("{}", e), + |e| log::error!("{}", e), + ) + .unwrap(); + + // Server setup + info!( + "Mock Cloud Connector Server starting at {}", + config.server_authority + ); + + let addr = config + .server_authority + .parse() + .expect("Unable to parse server address"); + + let mock_cloud_connector = MockCloudConnectorImpl {}; + + Server::builder() + .add_service(CloudConnectorServer::new(mock_cloud_connector)) + .serve(addr) + .await + .unwrap(); +} diff --git a/mocks/mock_cloud_connector/src/mock_cloud_connector_impl.rs b/mocks/mock_cloud_connector/src/mock_cloud_connector_impl.rs new file mode 100644 index 00000000..0d3edba3 --- /dev/null +++ b/mocks/mock_cloud_connector/src/mock_cloud_connector_impl.rs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use async_trait::async_trait; +use cloud_connector_proto::v1::{ + cloud_connector_server::CloudConnector, UpdateDigitalTwinRequest, UpdateDigitalTwinResponse, +}; +use log::info; +use tonic::{Request, Response, Status}; + +/// Implements a Mock Cloud Connector +pub struct MockCloudConnectorImpl {} + +#[async_trait] +impl CloudConnector for MockCloudConnectorImpl { + /// Update the digital twin + /// + /// # Arguments + /// - `request`: the update request+ + async fn update_digital_twin( + &self, + request: Request, + ) -> Result, Status> { + let message_json = serde_json::to_string_pretty(&request.into_inner()) + .map_err(|_| Status::invalid_argument("Could not parse request"))?; + + info!("Mock Cloud Connector received a message!\n{message_json}"); + + Ok(Response::new(UpdateDigitalTwinResponse {})) + } +} diff --git a/proto/cloud_connector/Cargo.toml b/proto/cloud_connector/Cargo.toml index d456ad81..2a98e84c 100644 --- a/proto/cloud_connector/Cargo.toml +++ b/proto/cloud_connector/Cargo.toml @@ -11,7 +11,9 @@ license = "MIT" [dependencies] prost = { workspace = true } prost-types = { workspace = true } -time = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +time = { workspace = true, features = ["serde-human-readable"] } tonic = { workspace = true } [build-dependencies] diff --git a/proto/cloud_connector/src/lib.rs b/proto/cloud_connector/src/lib.rs index 2f95e779..c69cede8 100644 --- a/proto/cloud_connector/src/lib.rs +++ b/proto/cloud_connector/src/lib.rs @@ -9,10 +9,60 @@ pub mod v1 { use std::collections::HashMap; use prost_types::{value::Kind, Timestamp, Value}; + use serde::ser::{Serialize, SerializeStruct, Serializer}; use time::OffsetDateTime; tonic::include_proto!("cloud_connector"); + // Because the members of UpdateDigitalTwinRequest do not implement serialize + // and are not owned by this project, implementing Serialize has to be done manually. + impl Serialize for UpdateDigitalTwinRequest { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let serialize_null_field = |state: &mut ::SerializeStruct, + key: &'static str| { + state.serialize_field(key, &None::<()>) + }; + + let mut state = serializer.serialize_struct("UpdateDigitalTwinRequest", 3)?; + + // Serialize value + const VALUE_FIELD: &str = "value"; + match self.value.as_ref() { + None => serialize_null_field(&mut state, VALUE_FIELD)?, + Some(v) => match v.kind.as_ref() { + None => serialize_null_field(&mut state, VALUE_FIELD)?, + Some(k) => match k { + Kind::NullValue(_) => serialize_null_field(&mut state, VALUE_FIELD)?, + Kind::NumberValue(n) => state.serialize_field(VALUE_FIELD, &n)?, + Kind::StringValue(s) => state.serialize_field(VALUE_FIELD, &s)?, + Kind::BoolValue(b) => state.serialize_field(VALUE_FIELD, &b)?, + _ => serialize_null_field(&mut state, VALUE_FIELD)?, + }, + }, + } + + // Serialize timestamp + // Note that the nanos are discarded for simplicity + const TIMESTAMP_FIELD: &str = "timestamp"; + match self.timestamp { + None => serialize_null_field(&mut state, TIMESTAMP_FIELD)?, + Some(Timestamp { seconds, .. }) => { + let timestamp = OffsetDateTime::from_unix_timestamp(seconds).unwrap(); + state.serialize_field(TIMESTAMP_FIELD, ×tamp)?; + } + } + + // Serialize metadata + state.serialize_field("metadata", &self.metadata)?; + + // End serialization + state.end() + } + } + /// Helper class to deal with the verbose contracts that tonic generates #[derive(Default)] pub struct UpdateDigitalTwinRequestBuilder { @@ -85,9 +135,11 @@ pub mod v1 { self } - /// Set the request timestamp to the current time - pub fn timestamp_now(mut self) -> Self { - let timestamp = OffsetDateTime::now_utc(); + /// Set the request timestamp to an `OffsetDateTime` + /// + /// # Arguments + /// - `timestamp`: the timestamp to set + pub fn timestamp_offset(mut self, timestamp: OffsetDateTime) -> Self { self.request.timestamp = Some( Timestamp::date_time( timestamp.year().into(), @@ -103,6 +155,14 @@ pub mod v1 { self } + /// Set the request timestamp to the current time + pub fn timestamp_now(mut self) -> Self { + let timestamp = OffsetDateTime::now_utc(); + self = self.timestamp_offset(timestamp); + + self + } + /// Set the request metadata. This overwrites any previously set metadata /// /// # Arguments @@ -123,3 +183,115 @@ pub mod v1 { } } } + +#[cfg(test)] +mod cloud_connector_tests { + use serde_json::{json, Map, Value}; + + use crate::v1::{UpdateDigitalTwinRequest, UpdateDigitalTwinRequestBuilder}; + + fn serialize_round_trip(request: &UpdateDigitalTwinRequest) -> Value { + let serialize_result = serde_json::to_string(&request); + assert!(serialize_result.is_ok()); + let serialized = serialize_result.unwrap(); + + let deserialize_result = serde_json::from_str::(&serialized); + assert!(deserialize_result.is_ok()); + deserialize_result.unwrap() + } + + #[test] + fn test_serialize_no_value() { + let request = UpdateDigitalTwinRequestBuilder::new().build(); + + let result = serialize_round_trip(&request); + + assert_eq!(result["value"], Value::Null); + } + + #[test] + fn test_serialize_null() { + let request = UpdateDigitalTwinRequestBuilder::new().null_value().build(); + + let result = serialize_round_trip(&request); + + assert_eq!(result["value"], Value::Null); + } + + #[test] + fn test_serialize_bool() { + let b = true; + let request = UpdateDigitalTwinRequestBuilder::new().bool_value(b).build(); + + let result = serialize_round_trip(&request); + + assert_eq!(result["value"], Value::Bool(b)); + } + + #[test] + fn test_serialize_number() { + let n = 42.0; + let request = UpdateDigitalTwinRequestBuilder::new() + .number_value(n) + .build(); + + let result = serialize_round_trip(&request); + + assert_eq!(result["value"], json!(n)); + } + + #[test] + fn test_serialize_string() { + let s = "foo"; + let request = UpdateDigitalTwinRequestBuilder::new() + .string_value(s.into()) + .build(); + + let result = serialize_round_trip(&request); + + assert_eq!(result["value"], Value::String(s.into())); + } + + #[test] + fn test_serialize_no_timestamp() { + let request = UpdateDigitalTwinRequestBuilder::new().build(); + + let result = serialize_round_trip(&request); + + assert_eq!(result["timestamp"], Value::Null); + } + + #[test] + fn test_serialize_timestamp() { + let request = UpdateDigitalTwinRequestBuilder::new() + .timestamp_now() + .build(); + + let result = serialize_round_trip(&request); + + assert_ne!(result["timestamp"], Value::Null); + } + + #[test] + fn test_serialize_no_metadata() { + let request = UpdateDigitalTwinRequestBuilder::new().build(); + + let result = serialize_round_trip(&request); + + assert_ne!(result["metadata"], Value::Null); + } + + #[test] + fn test_serialize_metadata() { + let metadata = ("foo", "bar"); + let request = UpdateDigitalTwinRequestBuilder::new() + .add_metadata(metadata.0.into(), metadata.1.into()) + .build(); + + let result = serialize_round_trip(&request); + + let mut map = Map::new(); + map.insert(metadata.0.into(), Value::String(metadata.1.into())); + assert_eq!(result["metadata"], Value::Object(map)); + } +}