From ac656b79711ad34b5bda1a77f2173c91483fd689 Mon Sep 17 00:00:00 2001 From: khuzema786 Date: Wed, 3 Jan 2024 03:55:51 +0530 Subject: [PATCH] LTS : Refactoring for CAC Integration --- crates/location_tracking_service/Cargo.toml | 4 +- .../src/common/cac.rs | 105 ++++-- .../src/common/utils.rs | 87 +---- .../src/domain/action/internal/ride.rs | 2 +- .../src/domain/action/ui/location.rs | 24 +- .../src/environment.rs | 331 +++++------------- crates/location_tracking_service/src/main.rs | 27 +- .../src/tools/error.rs | 10 +- .../dev/location_tracking_service.dhall | 10 +- 9 files changed, 198 insertions(+), 402 deletions(-) diff --git a/crates/location_tracking_service/Cargo.toml b/crates/location_tracking_service/Cargo.toml index 869b5d5..70cc89b 100644 --- a/crates/location_tracking_service/Cargo.toml +++ b/crates/location_tracking_service/Cargo.toml @@ -44,8 +44,8 @@ prometheus = { version = "0.13.3", features = ["process"] } shared = { git = "https://github.com/nammayatri/shared-kernel-rs", branch = "main" } macros = { git = "https://github.com/nammayatri/shared-kernel-rs", branch = "main" } -cac_client = { git = "ssh://git@ssh.bitbucket.juspay.net/picaf/context-aware-config.git", rev = "555ed8b" } -superposition_client = { git = "ssh://git@ssh.bitbucket.juspay.net/picaf/context-aware-config.git", rev = "555ed8b" } +cac_client = { git = "https://github.com/khuzema786/context-aware-config.git", rev = "7dceb4c" } +superposition_client = { git = "https://github.com/khuzema786/context-aware-config.git", rev = "7dceb4c" } [dev-dependencies] pprof = { version = "0.12", features = ["flamegraph"] } diff --git a/crates/location_tracking_service/src/common/cac.rs b/crates/location_tracking_service/src/common/cac.rs index c7ede96..e96b388 100644 --- a/crates/location_tracking_service/src/common/cac.rs +++ b/crates/location_tracking_service/src/common/cac.rs @@ -7,34 +7,35 @@ */ use crate::environment::{CacConfig, SuperpositionClientConfig}; +use crate::tools::error::AppError; use actix_web::rt; use cac_client as cac; -use shared::tools::error::AppError; +use rand::Rng; +use serde::de::DeserializeOwned; +use serde_json::json; use std::time::Duration; use superposition_client as spclient; pub async fn init_cac_clients(cac_conf: CacConfig) -> Result<(), AppError> { - let cac_hostname: String = cac_conf.cac_hostname; + let hostname = cac_conf.cac_hostname; let polling_interval: Duration = Duration::from_secs(cac_conf.cac_polling_interval); let update_cac_periodically = cac_conf.update_cac_periodically; - let cac_tenants: Vec = cac_conf.cac_tenants; + let tenant = cac_conf.cac_tenant; - for tenant in cac_tenants { - cac::CLIENT_FACTORY - .create_client( - tenant.to_string(), - update_cac_periodically, - polling_interval, - cac_hostname.to_string(), - ) - .await - .map_err(|err| { - AppError::CacConfigFailed(format!( - "{}: Failed to acquire cac_client : {}", - tenant, err - )) - })?; - } + cac::CLIENT_FACTORY + .create_client( + tenant.to_owned(), + update_cac_periodically, + polling_interval, + hostname, + ) + .await + .map_err(|err| { + AppError::CacClientInitFailed(format!( + "{}: Failed to acquire cac_client : {}", + tenant, err + )) + })?; Ok(()) } @@ -45,23 +46,57 @@ pub async fn init_superposition_clients( ) -> Result<(), AppError> { let hostname: String = superposition_client_config.superposition_hostname; let poll_frequency = superposition_client_config.superposition_poll_frequency; - let cac_tenants: Vec = cac_conf.cac_tenants; + let tenant = cac_conf.cac_tenant; - for tenant in cac_tenants { - rt::spawn( - spclient::CLIENT_FACTORY - .create_client(tenant.to_string(), poll_frequency, hostname.to_string()) - .await - .map_err(|err| { - AppError::CacConfigFailed(format!( - "{}: Failed to acquire superposition_client : {}", - tenant, err - )) - })? - .clone() - .run_polling_updates(), - ); - } + rt::spawn( + spclient::CLIENT_FACTORY + .create_client(tenant.to_owned(), poll_frequency, hostname) + .await + .map_err(|err| { + AppError::CacClientInitFailed(format!( + "Failed to acquire superposition_client for tenent {} : {}", + tenant, err + )) + })? + .clone() + .run_polling_updates(), + ); Ok(()) } + +pub async fn get_config(tenant_name: String, key: &str) -> Result +where + T: DeserializeOwned, +{ + let cac_client = cac::CLIENT_FACTORY.get_client(tenant_name.to_owned()); + let superpostion_client = spclient::CLIENT_FACTORY + .get_client(tenant_name.to_owned()) + .await; + + match (cac_client, superpostion_client) { + (Ok(cac_client), Ok(superpostion_client)) => { + let mut ctx = serde_json::Map::new(); + let variant_ids = superpostion_client + .get_applicable_variant(&json!(ctx.clone()), rand::thread_rng().gen_range(1..100)) + .await; + ctx.insert(String::from("variantIds"), variant_ids.into()); + let res = cac_client + .eval(ctx) + .map_err(|err| AppError::CacConfigFailed(err.to_string()))?; + + match res.get(key) { + Some(val) => Ok(serde_json::from_value(val.clone()) + .map_err(|err| AppError::CacConfigFailed(err.to_string()))?), + None => Err(AppError::CacConfigFailed(format!( + "Key does not exist in cac client's response for tenant {}", + tenant_name + ))), + } + } + _ => Err(AppError::CacConfigFailed(format!( + "Failed to fetch instance of cac client or superposition client for tenant {}", + tenant_name + ))), + } +} diff --git a/crates/location_tracking_service/src/common/utils.rs b/crates/location_tracking_service/src/common/utils.rs index 7d524b9..c6136e4 100644 --- a/crates/location_tracking_service/src/common/utils.rs +++ b/crates/location_tracking_service/src/common/utils.rs @@ -1,5 +1,3 @@ -use crate::environment::BuisnessConfigs; - /* Copyright 2022-23, Juspay India Pvt Ltd This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program @@ -11,11 +9,7 @@ use super::types::*; use crate::tools::error::AppError; use cac_client as cac; use geo::{point, Intersects}; -use rand::Rng; -use serde_json::{json, Map, Value}; -use shared::tools::error::AppError; -use std::{f64::consts::PI, sync::Arc}; -use superposition_client as spclient; +use std::f64::consts::PI; /// Retrieves the name of the city based on latitude and longitude coordinates. /// @@ -228,82 +222,3 @@ pub fn distance_between_in_meters(latlong1: &Point, latlong2: &Point) -> f64 { pub fn cat_maybes(options: Vec>) -> Vec { options.into_iter().flatten().collect() } - -pub fn get_cac_client(tenant_name: String) -> Result, String> { - cac::CLIENT_FACTORY.get_client(tenant_name) -} - -pub async fn get_superposition_client( - tenant_name: String, -) -> Result, String> { - spclient::CLIENT_FACTORY.get_client(tenant_name).await -} - -pub async fn get_config_from_cac_client( - tenant_name: String, - key: String, - mut ctx: Map, - buisness_cfgs: BuisnessConfigs, - toss: i8, -) -> Result { - let cacclient: Result, String> = get_cac_client(tenant_name.clone()); - let superclient: Result, String> = - get_superposition_client(tenant_name.clone()).await; - match (cacclient, superclient) { - (Ok(cacclient), Ok(superclient)) => { - let variant_ids = superclient - .get_applicable_variant(&json!(ctx.clone()), toss) - .await; - ctx.insert(String::from("variantIds"), variant_ids.clone().into()); - let res = cacclient.eval(ctx.clone()); - match res { - Ok(res) => match res.get(&key) { - Some(val) => Ok(val.clone()), - _ => { - log::error!("Key does not exist in cac client's response for tenant {}, trying fetch the same key from default config", tenant_name); - get_default_config(tenant_name, key, buisness_cfgs).await - } - }, - _ => { - log::error!("Failed to fetch config from cac client for tenant {}, trying fetch default config", tenant_name); - get_default_config(tenant_name, key, buisness_cfgs).await - } - } - } - (Err(_), Ok(_)) => { - log::error!( - "Failed to fetch instance of cac client for tenant {}", - tenant_name - ); - get_default_config(tenant_name, key, buisness_cfgs).await - } - (Ok(_), Err(_)) => { - log::error!( - "Failed to fetch instance of superposition client for tenant {}", - tenant_name - ); - get_default_config(tenant_name, key, buisness_cfgs).await - } - _ => { - log::error!( - "Failed to fetch instance of cac client and superposition client for tenant {}", - tenant_name - ); - get_default_config(tenant_name, key, buisness_cfgs).await - } - } -} - -pub async fn get_default_config( - tenant_name: String, - key: String, - def_buisness_cfgs: BuisnessConfigs, -) -> Result { - println!("Fetching default config for tenant {}", tenant_name); - def_buisness_cfgs.get_field(&key) -} - -pub fn get_random_number() -> i8 { - let mut rng = rand::thread_rng(); - rng.gen_range(1..100) -} diff --git a/crates/location_tracking_service/src/domain/action/internal/ride.rs b/crates/location_tracking_service/src/domain/action/internal/ride.rs index 0645fd6..7eb1957 100644 --- a/crates/location_tracking_service/src/domain/action/internal/ride.rs +++ b/crates/location_tracking_service/src/domain/action/internal/ride.rs @@ -69,7 +69,7 @@ pub async fn ride_end( &data.persistent_redis, &request_body.driver_id, &request_body.merchant_id, - data.batch_size, + data.business_configs.batch_size, ) .await?; diff --git a/crates/location_tracking_service/src/domain/action/ui/location.rs b/crates/location_tracking_service/src/domain/action/ui/location.rs index 589f808..7b0479c 100644 --- a/crates/location_tracking_service/src/domain/action/ui/location.rs +++ b/crates/location_tracking_service/src/domain/action/ui/location.rs @@ -86,16 +86,16 @@ pub async fn update_driver_location( &data.persistent_redis, &data.auth_url, &data.auth_api_key, - &data.auth_token_expiry, + &data.business_configs.auth_token_expiry, &token, &merchant_id, ) .await?; - if locations.len() > data.batch_size as usize { + if locations.len() > data.business_configs.batch_size as usize { warn!( "Way points more than {} points => {} points", - data.batch_size, + data.business_configs.batch_size, locations.len() ); } @@ -138,8 +138,8 @@ pub async fn update_driver_location( sliding_window_limiter( &data.persistent_redis, &sliding_rate_limiter_key(&driver_id, &city, &merchant_id), - data.location_update_limit, - data.location_update_interval as u32, + data.business_configs.location_update_limit, + data.business_configs.location_update_interval as u32, ) .await?; @@ -219,7 +219,7 @@ async fn process_driver_locations( let set_driver_last_location_update = async { set_driver_last_location_update( &data.persistent_redis, - &data.last_location_timstamp_expiry, + &data.business_configs.last_location_timstamp_expiry, &driver_id, &merchant_id, &latest_driver_location.pt, @@ -273,14 +273,14 @@ async fn process_driver_locations( let locations = get_filtered_driver_locations( last_known_location.as_ref(), locations, - data.min_location_accuracy, - data.driver_location_accuracy_buffer, + data.business_configs.min_location_accuracy, + data.business_configs.driver_location_accuracy_buffer, ); if !locations.is_empty() { - if locations.len() > data.batch_size as usize { + if locations.len() > data.business_configs.batch_size as usize { warn!( "On Ride Way points more than {} points after filtering => {} points", - data.batch_size, + data.business_configs.batch_size, locations.len() ); } @@ -310,7 +310,9 @@ async fn process_driver_locations( ) .await?; - if on_ride_driver_locations_count + geo_entries.len() as i64 > data.batch_size { + if on_ride_driver_locations_count + geo_entries.len() as i64 + > data.business_configs.batch_size + { let mut on_ride_driver_locations = get_on_ride_driver_locations( &data.persistent_redis, &driver_id, diff --git a/crates/location_tracking_service/src/environment.rs b/crates/location_tracking_service/src/environment.rs index 2ee860b..0a2e070 100644 --- a/crates/location_tracking_service/src/environment.rs +++ b/crates/location_tracking_service/src/environment.rs @@ -11,23 +11,15 @@ use std::{env::var, sync::Arc}; use crate::{ - common::{geo_polygon::read_geo_polygon, types::*}, + common::{cac::get_config, geo_polygon::read_geo_polygon, types::*}, tools::logger::LoggerConfig, }; use rdkafka::{error::KafkaError, producer::FutureProducer, ClientConfig}; use reqwest::Url; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use shared::tools::error::AppError; -use shared::{ - redis::types::{RedisConnectionPool, RedisSettings}, - utils::logger::*, -}; -use std::time::Duration; -use tokio::sync::mpsc::Sender; -use tracing::info; - +use shared::redis::types::{RedisConnectionPool, RedisSettings}; use tokio::sync::mpsc::Sender; +use tracing::{error, info}; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct AppConfig { @@ -52,23 +44,24 @@ pub struct AppConfig { pub request_timeout: u64, pub log_unprocessible_req_body: Vec, pub max_allowed_req_size: usize, + pub bucket_size: u64, + pub nearby_bucket_threshold: u64, pub cac_config: CacConfig, pub superposition_client_config: SuperpositionClientConfig, - pub buisness_configs: BuisnessConfigs, + pub business_configs: BusinessConfigs, } #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct BuisnessConfigs { +pub struct BusinessConfigs { pub auth_token_expiry: u32, - pub min_location_accuracy: f64, + pub min_location_accuracy: Accuracy, pub driver_location_accuracy_buffer: f64, pub last_location_timstamp_expiry: u32, pub location_update_limit: usize, pub location_update_interval: u64, pub batch_size: i64, - pub bucket_size: u64, - pub nearby_bucket_threshold: u64, } + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct KafkaConfig { pub kafka_key: String, @@ -80,7 +73,7 @@ pub struct CacConfig { pub cac_hostname: String, pub cac_polling_interval: u64, pub update_cac_periodically: bool, - pub cac_tenants: Vec, + pub cac_tenant: String, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -101,54 +94,6 @@ pub struct RedisConfig { pub stream_read_count: u64, } -impl BuisnessConfigs { - pub fn get_field(&self, key: &str) -> Result { - match key { - "auth_token_expiry" => Ok(Value::Number(serde_json::Number::from( - self.auth_token_expiry, - ))), - "min_location_accuracy" => { - let res = serde_json::Number::from_f64(self.min_location_accuracy); - match res { - Some(res) => Ok(Value::Number(res)), - _ => Err(AppError::DefaultConfigsNotFound( - "Failed to extract default config due to failure in decoding f64 to Number" - .to_string(), - )), - } - } - "last_location_timstamp_expiry" => Ok(Value::Number(serde_json::Number::from( - self.last_location_timstamp_expiry, - ))), - "location_update_limit" => Ok(Value::Number(serde_json::Number::from( - self.location_update_limit, - ))), - "location_update_interval" => Ok(Value::Number(serde_json::Number::from( - self.location_update_interval, - ))), - "batch_size" => Ok(Value::Number(serde_json::Number::from(self.batch_size))), - "bucket_size" => Ok(Value::Number(serde_json::Number::from(self.bucket_size))), - "nearby_bucket_threshold" => Ok(Value::Number(serde_json::Number::from( - self.nearby_bucket_threshold, - ))), - "driver_location_accuracy_buffer" => { - let res = serde_json::Number::from_f64(self.driver_location_accuracy_buffer); - match res { - Some(res) => Ok(Value::Number(res)), - _ => Err(AppError::DefaultConfigsNotFound( - "Failed to extract default config due to failure in decoding f64 to Number" - .to_string(), - )), - } - } - _ => Err(AppError::DefaultConfigsNotFound( - "Failed to extract default config, given key did not match any default config's field".to_string(), - )), - } - } -} - -#[derive(Clone)] pub struct AppState { pub non_persistent_redis: Arc, pub persistent_redis: Arc, @@ -161,22 +106,17 @@ pub struct AppState { pub auth_url: Url, pub auth_api_key: String, pub bulk_location_callback_url: Url, - pub auth_token_expiry: u32, pub redis_expiry: u32, - pub min_location_accuracy: Accuracy, - pub last_location_timstamp_expiry: u32, - pub location_update_limit: usize, - pub location_update_interval: u64, pub producer: Option, pub driver_location_update_topic: String, - pub batch_size: i64, - pub bucket_size: u64, - pub nearby_bucket_threshold: u64, - pub driver_location_accuracy_buffer: f64, pub blacklist_merchants: Vec, pub max_allowed_req_size: usize, pub log_unprocessible_req_body: Vec, pub request_timeout: u64, + pub bucket_size: u64, + pub nearby_bucket_threshold: u64, + pub cac_tenant: String, + pub business_configs: BusinessConfigs, } impl AppState { @@ -318,176 +258,93 @@ impl AppState { auth_api_key: app_config.auth_api_key, bulk_location_callback_url: Url::parse(app_config.bulk_location_callback_url.as_str()) .expect("Failed to parse bulk_location_callback_url."), - auth_token_expiry: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "auth_token_expiry".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val - .as_u64() - .expect("Failed to convert auth token expiry to u32") - as u32, - Err(err) => { - panic!("{err}") - } - } - }, - min_location_accuracy: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "min_location_accuracy".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => { - Accuracy(val.as_f64().expect( - "Failed to min location accuracy to f64 and then Accuracy type", - )) - } - Err(err) => { - panic!("{err}") - } - } - }, redis_expiry: app_config.redis_expiry, - last_location_timstamp_expiry: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "last_location_timstamp_expiry".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val - .as_u64() - .expect("Failed to convert last location timestamp expiary to u32") - as u32, - Err(err) => { - panic!("{err}") - } - } - }, - location_update_limit: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "location_update_limit".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val - .as_u64() - .expect("Failed to convert location update limit to usize") - as usize, - Err(err) => { - panic!("{err}") - } - } - }, - location_update_interval: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "location_update_interval".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val - .as_u64() - .expect("Failed to convert location update interval to u64"), - Err(err) => { - panic!("{err}") - } - } - }, producer, driver_location_update_topic: app_config.driver_location_update_topic, - batch_size: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "batch_size".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val.as_i64().expect("Failed to convert batch size to i64"), - Err(err) => { - panic!("{err}") - } - } - }, - bucket_size: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "bucket_size".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val.as_u64().expect("Failed to convert bucket size to u64"), - Err(err) => { - panic!("{err}") - } - } - }, - nearby_bucket_threshold: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "nearby_bucket_threshold".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val - .as_u64() - .expect("Failed to convert nearby bucket threshold to u64"), - Err(err) => { - panic!("{err}") - } - } - }, - driver_location_accuracy_buffer: { - let cfgs = get_config_from_cac_client( - app_config.cac_config.cac_tenants[0].to_string(), - "driver_location_accuracy_buffer".to_string(), - serde_json::Map::new(), - app_config.buisness_configs.clone(), - get_random_number(), - ) - .await; - match cfgs { - Ok(val) => val - .as_f64() - .expect("Failed to convert driver location accuracy buffer to f64"), - Err(err) => { - panic!("{err}") - } - } - }, max_allowed_req_size: app_config.max_allowed_req_size, log_unprocessible_req_body: app_config.log_unprocessible_req_body, request_timeout: app_config.request_timeout, blacklist_merchants, + bucket_size: app_config.bucket_size, + nearby_bucket_threshold: app_config.nearby_bucket_threshold, + cac_tenant: app_config.cac_config.cac_tenant, + business_configs: app_config.business_configs, + } + } +} + +impl Clone for AppState { + fn clone(&self) -> Self { + let tokio_runtime = tokio::runtime::Runtime::new(); + let business_configs = match tokio_runtime { + Ok(tokio_runtime) => tokio_runtime.block_on(async { + BusinessConfigs { + auth_token_expiry: get_config(self.cac_tenant.clone(), "auth_token_expiry") + .await + .unwrap_or(self.business_configs.auth_token_expiry), + min_location_accuracy: get_config( + self.cac_tenant.clone(), + "min_location_accuracy", + ) + .await + .unwrap_or(self.business_configs.min_location_accuracy), + last_location_timstamp_expiry: get_config( + self.cac_tenant.clone(), + "last_location_timstamp_expiry", + ) + .await + .unwrap_or(self.business_configs.last_location_timstamp_expiry), + location_update_limit: get_config( + self.cac_tenant.clone(), + "location_update_limit", + ) + .await + .unwrap_or(self.business_configs.location_update_limit), + location_update_interval: get_config( + self.cac_tenant.clone(), + "location_update_interval", + ) + .await + .unwrap_or(self.business_configs.location_update_interval), + batch_size: get_config(self.cac_tenant.clone(), "batch_size") + .await + .unwrap_or(self.business_configs.batch_size), + driver_location_accuracy_buffer: get_config( + self.cac_tenant.clone(), + "driver_location_accuracy_buffer", + ) + .await + .unwrap_or(self.business_configs.driver_location_accuracy_buffer), + } + }), + Err(err) => { + error!("Error in tokio runtime while cloning CAC configs. {err}"); + self.business_configs.clone() + } + }; + AppState { + non_persistent_redis: self.non_persistent_redis.clone(), + persistent_redis: self.persistent_redis.clone(), + drainer_delay: self.drainer_delay, + drainer_size: self.drainer_size, + new_ride_drainer_delay: self.new_ride_drainer_delay, + sender: self.sender.clone(), + polygon: self.polygon.clone(), + blacklist_polygon: self.blacklist_polygon.clone(), + auth_url: self.auth_url.clone(), + auth_api_key: self.auth_api_key.clone(), + bulk_location_callback_url: self.bulk_location_callback_url.clone(), + redis_expiry: self.redis_expiry, + producer: self.producer.clone(), + driver_location_update_topic: self.driver_location_update_topic.clone(), + max_allowed_req_size: self.max_allowed_req_size, + log_unprocessible_req_body: self.log_unprocessible_req_body.clone(), + request_timeout: self.request_timeout, + blacklist_merchants: self.blacklist_merchants.clone(), + bucket_size: self.bucket_size, + nearby_bucket_threshold: self.nearby_bucket_threshold, + cac_tenant: self.cac_tenant.clone(), + business_configs, } } } diff --git a/crates/location_tracking_service/src/main.rs b/crates/location_tracking_service/src/main.rs index 717d483..05e4f5f 100644 --- a/crates/location_tracking_service/src/main.rs +++ b/crates/location_tracking_service/src/main.rs @@ -18,7 +18,6 @@ use location_tracking_service::{ use std::{ env::var, sync::atomic::{AtomicBool, Ordering}, - time::Duration, }; use std::{net::Ipv4Addr, sync::Arc}; use tokio::signal::unix::SignalKind; @@ -80,28 +79,16 @@ async fn start_server() -> std::io::Result<()> { error!("Panic Occured : {payload}"); })); - let cac_resp = init_cac_clients(app_config.cac_config.clone()).await; - let superposition_response: Result<(), AppError> = init_superposition_clients( + let _ = init_cac_clients(app_config.cac_config.clone()) + .await + .map_err(|err| error!("Init CAC Client Error : {err}")); + let _ = init_superposition_clients( app_config.superposition_client_config.clone(), app_config.cac_config.clone(), ) - .await; - - tokio::time::sleep(Duration::from_secs(5)).await; - - match (cac_resp, superposition_response) { - (Ok(_), Ok(_)) => (), - (Err(err), Ok(_)) => { - error!("Failed to instantiate CAC :{}", err); - } - (Ok(_), Err(err)) => { - error!("Failed to instantiate Superposition Client :{}", err); - } - (Err(err1), Err(err2)) => { - error!("Failed to instantiate CAC :{}", err1); - error!("Failed to instantiate Superposition Client :{}", err2); - } - }; + .await + .map_err(|err| error!("Init Superposition Client Error : {err}")); + let port = app_config.port; let workers = app_config.workers; let max_allowed_req_size = app_config.max_allowed_req_size; diff --git a/crates/location_tracking_service/src/tools/error.rs b/crates/location_tracking_service/src/tools/error.rs index f464393..7eb1a2e 100644 --- a/crates/location_tracking_service/src/tools/error.rs +++ b/crates/location_tracking_service/src/tools/error.rs @@ -43,7 +43,7 @@ pub enum AppError { KafkaPushFailed(String), DrainerPushFailed(String), CacConfigFailed(String), - DefaultConfigsNotFound(String), + CacClientInitFailed(String), } impl AppError { @@ -81,8 +81,8 @@ impl AppError { AppError::CacConfigFailed(reason) => { format!("CAC Config Failed : {reason}") } - AppError::DefaultConfigsNotFound(reason) => { - format!("Default Configs Not Found : {reason}") + AppError::CacClientInitFailed(reason) => { + format!("CAC Client Init Failed : {reason}") } _ => "Some Error Occured".to_string(), } @@ -114,7 +114,7 @@ impl AppError { AppError::KafkaPushFailed(_) => "KAFKA_PUSH_FAILED", AppError::DrainerPushFailed(_) => "DRAINER_PUSH_FAILED", AppError::CacConfigFailed(_) => "CAC_CONFIG_FAILED", - AppError::DefaultConfigsNotFound(_) => "DEFAULT_CONFIGS_NOT_FOUND", + AppError::CacClientInitFailed(_) => "CAC_INIT_FAILED", } .to_string() } @@ -149,7 +149,7 @@ impl ResponseError for AppError { AppError::KafkaPushFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::DrainerPushFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::CacConfigFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, - AppError::DefaultConfigsNotFound(_) => StatusCode::INTERNAL_SERVER_ERROR, + AppError::CacClientInitFailed(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::InvalidConfiguration(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::RequestTimeout => StatusCode::REQUEST_TIMEOUT, } diff --git a/dhall-configs/dev/location_tracking_service.dhall b/dhall-configs/dev/location_tracking_service.dhall index 74ccb9a..e2d7540 100644 --- a/dhall-configs/dev/location_tracking_service.dhall +++ b/dhall-configs/dev/location_tracking_service.dhall @@ -37,7 +37,7 @@ let cac_config = { cac_hostname = "http://localhost:8080", cac_polling_interval = 60, update_cac_periodically = True, - cac_tenants = ["LTS-default"], + cac_tenant = "LTS-default", } let superposition_client_config = { @@ -45,7 +45,7 @@ let superposition_client_config = { superposition_poll_frequency = 1, } -let buisness_configs = { +let business_configs = { auth_token_expiry = 86400, min_location_accuracy = 50.0, driver_location_accuracy_buffer = 25.0, @@ -53,8 +53,6 @@ let buisness_configs = { location_update_limit = 6000000000, location_update_interval = 60, batch_size = 100, - bucket_size = 30, - nearby_bucket_threshold = 4, } -- drainer_delay :: 4 * 1024KB * 1024MB * 1024GB / 100 Bytes = 41943040 in { @@ -79,7 +77,9 @@ in { request_timeout = 9000, log_unprocessible_req_body = ["UNPROCESSIBLE_REQUEST", "REQUEST_TIMEOUT", "LARGE_PAYLOAD_SIZE", "HITS_LIMIT_EXCEEDED"], max_allowed_req_size = 512000, -- 500 KB + bucket_size = 30, + nearby_bucket_threshold = 4, cac_config = cac_config, superposition_client_config = superposition_client_config, - buisness_configs = buisness_configs, + business_configs = business_configs, } \ No newline at end of file