From 6ee514d6d06a9db333bf59c42dd5a401e487fe39 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 21 Aug 2024 18:55:40 +0200 Subject: [PATCH] Integrate mqtt into trigger2 Signed-off-by: Ryan Levick --- Cargo.lock | 2 ++ crates/factor-outbound-mqtt/src/host.rs | 11 +------ crates/factor-outbound-mqtt/src/lib.rs | 42 ++++++++++++++++++++++++- crates/runtime-config/Cargo.toml | 1 + crates/runtime-config/src/lib.rs | 7 +++++ crates/trigger2/Cargo.toml | 7 +++-- crates/trigger2/src/factors.rs | 3 ++ 7 files changed, 59 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e09144361a..4d54df716d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8151,6 +8151,7 @@ dependencies = [ "spin-factor-key-value-redis", "spin-factor-key-value-spin", "spin-factor-outbound-http", + "spin-factor-outbound-mqtt", "spin-factor-outbound-networking", "spin-factor-outbound-redis", "spin-factor-sqlite", @@ -8462,6 +8463,7 @@ dependencies = [ "spin-core", "spin-factor-key-value", "spin-factor-outbound-http", + "spin-factor-outbound-mqtt", "spin-factor-outbound-networking", "spin-factor-outbound-redis", "spin-factor-sqlite", diff --git a/crates/factor-outbound-mqtt/src/host.rs b/crates/factor-outbound-mqtt/src/host.rs index 3cd22abbd0..460b888c16 100644 --- a/crates/factor-outbound-mqtt/src/host.rs +++ b/crates/factor-outbound-mqtt/src/host.rs @@ -6,16 +6,7 @@ use spin_factor_outbound_networking::OutboundAllowedHosts; use spin_world::v2::mqtt::{self as v2, Connection, Error, Qos}; use tracing::{instrument, Level}; -#[async_trait] -pub trait ClientCreator: Send + Sync { - fn create( - &self, - address: String, - username: String, - password: String, - keep_alive_interval: Duration, - ) -> Result, Error>; -} +use crate::ClientCreator; pub struct InstanceState { allowed_hosts: OutboundAllowedHosts, diff --git a/crates/factor-outbound-mqtt/src/lib.rs b/crates/factor-outbound-mqtt/src/lib.rs index 4816e12bec..21e4242f96 100644 --- a/crates/factor-outbound-mqtt/src/lib.rs +++ b/crates/factor-outbound-mqtt/src/lib.rs @@ -15,7 +15,7 @@ use spin_factors::{ use spin_world::v2::mqtt::{self as v2, Error, Qos}; use tokio::sync::Mutex; -pub use host::{ClientCreator, MqttClient}; +pub use host::MqttClient; pub struct OutboundMqttFactor { create_client: Arc, @@ -73,6 +73,19 @@ pub struct NetworkedMqttClient { const MQTT_CHANNEL_CAP: usize = 1000; impl NetworkedMqttClient { + /// Create a [`ClientCreator`] that creates a [`NetworkedMqttClient`]. + pub fn creator() -> Arc { + Arc::new(|address, username, password, keep_alive_interval| { + Ok(Arc::new(NetworkedMqttClient::create( + address, + username, + password, + keep_alive_interval, + )?) as _) + }) + } + + /// Create a new [`NetworkedMqttClient`] with the given address, username, password, and keep alive interval. pub fn create( address: String, username: String, @@ -127,3 +140,30 @@ impl MqttClient for NetworkedMqttClient { Ok(()) } } + +/// A trait for creating MQTT client. +#[async_trait] +pub trait ClientCreator: Send + Sync { + fn create( + &self, + address: String, + username: String, + password: String, + keep_alive_interval: Duration, + ) -> Result, Error>; +} + +impl ClientCreator for F +where + F: Fn(String, String, String, Duration) -> Result, Error> + Send + Sync, +{ + fn create( + &self, + address: String, + username: String, + password: String, + keep_alive_interval: Duration, + ) -> Result, Error> { + self(address, username, password, keep_alive_interval) + } +} diff --git a/crates/runtime-config/Cargo.toml b/crates/runtime-config/Cargo.toml index e67129a5e5..6954a6dac1 100644 --- a/crates/runtime-config/Cargo.toml +++ b/crates/runtime-config/Cargo.toml @@ -16,6 +16,7 @@ spin-factor-key-value-spin = { path = "../factor-key-value-spin" } spin-factor-key-value-redis = { path = "../factor-key-value-redis" } spin-factor-key-value-azure = { path = "../factor-key-value-azure" } spin-factor-outbound-http = { path = "../factor-outbound-http" } +spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factor-outbound-redis = { path = "../factor-outbound-redis" } spin-factor-sqlite = { path = "../factor-sqlite" } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index d4df03c73f..b6dead06c4 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -4,6 +4,7 @@ use anyhow::Context as _; use spin_factor_key_value::runtime_config::spin::{self as key_value, MakeKeyValueStore}; use spin_factor_key_value::{DefaultLabelResolver as _, KeyValueFactor}; use spin_factor_outbound_http::OutboundHttpFactor; +use spin_factor_outbound_mqtt::OutboundMqttFactor; use spin_factor_outbound_networking::runtime_config::spin::SpinTlsRuntimeConfig; use spin_factor_outbound_networking::OutboundNetworkingFactor; use spin_factor_outbound_redis::OutboundRedisFactor; @@ -176,6 +177,12 @@ impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<' } } +impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<'_> { + fn get_runtime_config(&mut self) -> anyhow::Result> { + Ok(None) + } +} + impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<'_> { fn get_runtime_config(&mut self) -> anyhow::Result> { self.sqlite.resolve_from_toml(self.table.as_ref()) diff --git a/crates/trigger2/Cargo.toml b/crates/trigger2/Cargo.toml index 70a7588ac0..d6246eda8c 100644 --- a/crates/trigger2/Cargo.toml +++ b/crates/trigger2/Cargo.toml @@ -21,13 +21,14 @@ spin-app = { path = "../app" } spin-common = { path = "../common" } spin-componentize = { path = "../componentize" } spin-core = { path = "../core" } +spin-factor-key-value = { path = "../factor-key-value" } spin-factor-outbound-http = { path = "../factor-outbound-http" } +spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } +spin-factor-outbound-redis = { path = "../factor-outbound-redis" } +spin-factor-sqlite = { path = "../factor-sqlite" } spin-factor-variables = { path = "../factor-variables" } spin-factor-wasi = { path = "../factor-wasi" } -spin-factor-key-value = { path = "../factor-key-value" } -spin-factor-sqlite = { path = "../factor-sqlite" } -spin-factor-outbound-redis = { path = "../factor-outbound-redis" } spin-factors = { path = "../factors" } spin-factors-executor = { path = "../factors-executor" } spin-telemetry = { path = "../telemetry" } diff --git a/crates/trigger2/src/factors.rs b/crates/trigger2/src/factors.rs index 897151863f..dd98bb474a 100644 --- a/crates/trigger2/src/factors.rs +++ b/crates/trigger2/src/factors.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use spin_factor_key_value::KeyValueFactor; use spin_factor_outbound_http::OutboundHttpFactor; +use spin_factor_outbound_mqtt::{NetworkedMqttClient, OutboundMqttFactor}; use spin_factor_outbound_networking::OutboundNetworkingFactor; use spin_factor_outbound_redis::OutboundRedisFactor; use spin_factor_sqlite::SqliteFactor; @@ -19,6 +20,7 @@ pub struct TriggerFactors { pub outbound_http: OutboundHttpFactor, pub sqlite: SqliteFactor, pub redis: OutboundRedisFactor, + pub mqtt: OutboundMqttFactor, } impl TriggerFactors { @@ -37,6 +39,7 @@ impl TriggerFactors { outbound_http: OutboundHttpFactor, sqlite: SqliteFactor::new(default_sqlite_label_resolver), redis: OutboundRedisFactor::new(), + mqtt: OutboundMqttFactor::new(NetworkedMqttClient::creator()), } } }