From 5b76e92f82bb12c7c91ac283c4436934a612512c Mon Sep 17 00:00:00 2001 From: "ratnadeep.b" Date: Thu, 4 Jan 2024 14:22:57 +0530 Subject: [PATCH] lts/CAC: added dynamic fetching of buisness configs. --- .../src/domain/action/internal/ride.rs | 6 +- .../src/domain/action/ui/location.rs | 59 ++++++++++++---- .../src/environment.rs | 70 +------------------ 3 files changed, 51 insertions(+), 84 deletions(-) diff --git a/crates/location_tracking_service/src/domain/action/internal/ride.rs b/crates/location_tracking_service/src/domain/action/internal/ride.rs index 7eb1957..d4694c6 100644 --- a/crates/location_tracking_service/src/domain/action/internal/ride.rs +++ b/crates/location_tracking_service/src/domain/action/internal/ride.rs @@ -6,6 +6,7 @@ the GNU Affero General Public License along with this program. If not, see . */ +use crate::common::cac::get_config; use crate::environment::AppState; use crate::redis::commands::*; use crate::tools::error::AppError; @@ -65,11 +66,14 @@ pub async fn ride_end( data: Data, request_body: RideEndRequest, ) -> Result { + let batch_sze = get_config(data.cac_tenant.clone(), "batch_size") + .await + .unwrap_or(data.business_configs.batch_size); let mut on_ride_driver_locations = get_on_ride_driver_locations( &data.persistent_redis, &request_body.driver_id, &request_body.merchant_id, - data.business_configs.batch_size, + batch_sze, ) .await?; diff --git a/crates/location_tracking_service/src/domain/action/ui/location.rs b/crates/location_tracking_service/src/domain/action/ui/location.rs index 8ea3692..c347a63 100644 --- a/crates/location_tracking_service/src/domain/action/ui/location.rs +++ b/crates/location_tracking_service/src/domain/action/ui/location.rs @@ -8,6 +8,7 @@ use std::pin::Pin; +use crate::common::cac::get_config; use crate::common::utils::{distance_between_in_meters, get_city, is_blacklist_for_special_zone}; use crate::common::{sliding_window_rate_limiter::sliding_window_limiter, types::*}; use crate::domain::types::ui::location::*; @@ -82,20 +83,25 @@ pub async fn update_driver_location( mut locations: Vec, driver_mode: DriverMode, ) -> Result { + let auth_token_exp: u32 = get_config(data.cac_tenant.clone(), "auth_token_expiry") + .await + .unwrap_or(data.business_configs.auth_token_expiry); let driver_id = get_driver_id_from_authentication( &data.persistent_redis, &data.auth_url, &data.auth_api_key, - &data.business_configs.auth_token_expiry, + &auth_token_exp, &token, &merchant_id, ) .await?; - - if locations.len() > data.business_configs.batch_size as usize { + let batch_sze = get_config(data.cac_tenant.clone(), "batch_size") + .await + .unwrap_or(data.business_configs.batch_size) as usize; + if locations.len() > batch_sze { warn!( "Way points more than {} points => {} points", - data.business_configs.batch_size, + batch_sze, locations.len() ); } @@ -135,11 +141,18 @@ pub async fn update_driver_location( &data.polygon, )?; + let loc_update_limit = get_config(data.cac_tenant.clone(), "location_update_limit") + .await + .unwrap_or(data.business_configs.location_update_limit); + let loc_update_interval = (get_config(data.cac_tenant.clone(), "location_update_interval") + .await + .unwrap_or(data.business_configs.location_update_interval)) + as u32; sliding_window_limiter( &data.persistent_redis, &sliding_rate_limiter_key(&driver_id, &city, &merchant_id), - data.business_configs.location_update_limit, - data.business_configs.location_update_interval as u32, + loc_update_limit, + loc_update_interval, ) .await?; @@ -217,9 +230,13 @@ async fn process_driver_locations( let mut all_tasks: Vec>>>> = Vec::new(); let set_driver_last_location_update = async { + let last_loc_timestamp_exp = + get_config(data.cac_tenant.clone(), "last_location_timstamp_expiry") + .await + .unwrap_or(data.business_configs.last_location_timstamp_expiry); set_driver_last_location_update( &data.persistent_redis, - &data.business_configs.last_location_timstamp_expiry, + &last_loc_timestamp_exp, &driver_id, &merchant_id, &latest_driver_location.pt, @@ -268,19 +285,33 @@ async fn process_driver_locations( latest_driver_location.pt, driver_id, merchant_id ); } - + let batch_sze = get_config(data.cac_tenant.clone(), "batch_size") + .await + .unwrap_or(data.business_configs.batch_size) as usize; let locations = if let Some(RideStatus::INPROGRESS) = driver_ride_status.as_ref() { + let min_loc_accuracy = Accuracy( + get_config(data.cac_tenant.clone(), "min_location_accuracy") + .await + .map_err(|err| { + log::error!("Error fetching min_location_accuracy: {}", err.message()); + }) + .unwrap_or(data.business_configs.min_location_accuracy), + ); + let driver_loc_accuracy_buffer = + get_config(data.cac_tenant.clone(), "driver_location_accuracy_buffer") + .await + .unwrap_or(data.business_configs.driver_location_accuracy_buffer); let locations = get_filtered_driver_locations( last_known_location.as_ref(), locations, - Accuracy(data.business_configs.min_location_accuracy), - data.business_configs.driver_location_accuracy_buffer, + min_loc_accuracy, + driver_loc_accuracy_buffer, ); if !locations.is_empty() { - if locations.len() > data.business_configs.batch_size as usize { + if locations.len() > batch_sze { warn!( "On Ride Way points more than {} points after filtering => {} points", - data.business_configs.batch_size, + batch_sze.clone(), locations.len() ); } @@ -310,9 +341,7 @@ async fn process_driver_locations( ) .await?; - if on_ride_driver_locations_count + geo_entries.len() as i64 - > data.business_configs.batch_size - { + if on_ride_driver_locations_count + geo_entries.len() as i64 > batch_sze as i64 { let mut on_ride_driver_locations = get_on_ride_driver_locations( &data.persistent_redis, &driver_id, diff --git a/crates/location_tracking_service/src/environment.rs b/crates/location_tracking_service/src/environment.rs index 3054c3b..d7d1980 100644 --- a/crates/location_tracking_service/src/environment.rs +++ b/crates/location_tracking_service/src/environment.rs @@ -10,7 +10,7 @@ use std::{env::var, sync::Arc}; use crate::{ - common::{cac::get_config, geo_polygon::read_geo_polygon, types::*}, + common::{geo_polygon::read_geo_polygon, types::*}, tools::logger::LoggerConfig, }; use rdkafka::{error::KafkaError, producer::FutureProducer, ClientConfig}; @@ -93,6 +93,7 @@ pub struct RedisConfig { pub stream_read_count: u64, } +#[derive(Clone)] pub struct AppState { pub non_persistent_redis: Arc, pub persistent_redis: Arc, @@ -271,70 +272,3 @@ impl AppState { } } } - -impl Clone for AppState { - fn clone(&self) -> Self { - info!("Cloning AppState"); - let business_configs = futures::executor::block_on(async { - BusinessConfigs { - auth_token_expiry: get_config(self.cac_tenant.clone(), "auth_token_expiry") - .await - .unwrap_or(self.business_configs.auth_token_expiry), - min_location_accuracy: get_config(self.cac_tenant.clone(), "min_location_accuracy") - .await - .map_err(|err| { - log::error!("Error fetching min_location_accuracy: {}", err.message()); - }) - .unwrap_or(self.business_configs.min_location_accuracy), - last_location_timstamp_expiry: get_config( - self.cac_tenant.clone(), - "last_location_timstamp_expiry", - ) - .await - .unwrap_or(self.business_configs.last_location_timstamp_expiry), - location_update_limit: get_config(self.cac_tenant.clone(), "location_update_limit") - .await - .unwrap_or(self.business_configs.location_update_limit), - location_update_interval: get_config( - self.cac_tenant.clone(), - "location_update_interval", - ) - .await - .unwrap_or(self.business_configs.location_update_interval), - batch_size: get_config(self.cac_tenant.clone(), "batch_size") - .await - .unwrap_or(self.business_configs.batch_size), - driver_location_accuracy_buffer: get_config( - self.cac_tenant.clone(), - "driver_location_accuracy_buffer", - ) - .await - .unwrap_or(self.business_configs.driver_location_accuracy_buffer), - } - }); - AppState { - non_persistent_redis: self.non_persistent_redis.clone(), - persistent_redis: self.persistent_redis.clone(), - drainer_delay: self.drainer_delay, - drainer_size: self.drainer_size, - new_ride_drainer_delay: self.new_ride_drainer_delay, - sender: self.sender.clone(), - polygon: self.polygon.clone(), - blacklist_polygon: self.blacklist_polygon.clone(), - auth_url: self.auth_url.clone(), - auth_api_key: self.auth_api_key.clone(), - bulk_location_callback_url: self.bulk_location_callback_url.clone(), - redis_expiry: self.redis_expiry, - producer: self.producer.clone(), - driver_location_update_topic: self.driver_location_update_topic.clone(), - max_allowed_req_size: self.max_allowed_req_size, - log_unprocessible_req_body: self.log_unprocessible_req_body.clone(), - request_timeout: self.request_timeout, - blacklist_merchants: self.blacklist_merchants.clone(), - bucket_size: self.bucket_size, - nearby_bucket_threshold: self.nearby_bucket_threshold, - cac_tenant: self.cac_tenant.clone(), - business_configs, - } - } -}