Skip to content

Commit

Permalink
lts/CAC: added dynamic fetching of buisness configs.
Browse files Browse the repository at this point in the history
  • Loading branch information
ratnadeep.b committed Jan 4, 2024
1 parent 83ba14d commit 2226a9f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use crate::common::cac::get_config;
use crate::environment::AppState;
use crate::redis::commands::*;
use crate::tools::error::AppError;
Expand Down Expand Up @@ -65,11 +66,14 @@ pub async fn ride_end(
data: Data<AppState>,
request_body: RideEndRequest,
) -> Result<RideEndResponse, AppError> {
let batch_sze = get_config(data.cac_tenant.clone(), "batch_size")
.await
.unwrap_or(data.business_configs.batch_size);
let mut on_ride_driver_locations = get_on_ride_driver_locations(
&data.persistent_redis,
&request_body.driver_id,
&request_body.merchant_id,
data.business_configs.batch_size,
batch_sze,
)
.await?;

Expand Down
59 changes: 44 additions & 15 deletions crates/location_tracking_service/src/domain/action/ui/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use std::pin::Pin;

use crate::common::cac::get_config;
use crate::common::utils::{distance_between_in_meters, get_city, is_blacklist_for_special_zone};
use crate::common::{sliding_window_rate_limiter::sliding_window_limiter, types::*};
use crate::domain::types::ui::location::*;
Expand Down Expand Up @@ -82,20 +83,25 @@ pub async fn update_driver_location(
mut locations: Vec<UpdateDriverLocationRequest>,
driver_mode: DriverMode,
) -> Result<APISuccess, AppError> {
let auth_token_exp: u32 = get_config(data.cac_tenant.clone(), "auth_token_expiry")
.await
.unwrap_or(data.business_configs.auth_token_expiry);
let driver_id = get_driver_id_from_authentication(
&data.persistent_redis,
&data.auth_url,
&data.auth_api_key,
&data.business_configs.auth_token_expiry,
&auth_token_exp,
&token,
&merchant_id,
)
.await?;

if locations.len() > data.business_configs.batch_size as usize {
let batch_sze = get_config(data.cac_tenant.clone(), "batch_size")
.await
.unwrap_or(data.business_configs.batch_size) as usize;
if locations.len() > batch_sze {
warn!(
"Way points more than {} points => {} points",
data.business_configs.batch_size,
batch_sze,
locations.len()
);
}
Expand Down Expand Up @@ -135,11 +141,18 @@ pub async fn update_driver_location(
&data.polygon,
)?;

let loc_update_limit = get_config(data.cac_tenant.clone(), "location_update_limit")
.await
.unwrap_or(data.business_configs.location_update_limit);
let loc_update_interval = (get_config(data.cac_tenant.clone(), "location_update_interval")
.await
.unwrap_or(data.business_configs.location_update_interval))
as u32;
sliding_window_limiter(
&data.persistent_redis,
&sliding_rate_limiter_key(&driver_id, &city, &merchant_id),
data.business_configs.location_update_limit,
data.business_configs.location_update_interval as u32,
loc_update_limit,
loc_update_interval,
)
.await?;

Expand Down Expand Up @@ -217,9 +230,13 @@ async fn process_driver_locations(
let mut all_tasks: Vec<Pin<Box<dyn Future<Output = Result<(), AppError>>>>> = Vec::new();

let set_driver_last_location_update = async {
let last_loc_timestamp_exp =
get_config(data.cac_tenant.clone(), "last_location_timstamp_expiry")
.await
.unwrap_or(data.business_configs.last_location_timstamp_expiry);
set_driver_last_location_update(
&data.persistent_redis,
&data.business_configs.last_location_timstamp_expiry,
&last_loc_timestamp_exp,
&driver_id,
&merchant_id,
&latest_driver_location.pt,
Expand Down Expand Up @@ -268,19 +285,33 @@ async fn process_driver_locations(
latest_driver_location.pt, driver_id, merchant_id
);
}

let batch_sze = get_config(data.cac_tenant.clone(), "batch_size")
.await
.unwrap_or(data.business_configs.batch_size) as usize;
let locations = if let Some(RideStatus::INPROGRESS) = driver_ride_status.as_ref() {
let min_loc_accuracy = Accuracy(
get_config(data.cac_tenant.clone(), "min_location_accuracy")
.await
.map_err(|err| {
log::error!("Error fetching min_location_accuracy: {}", err.message());
})
.unwrap_or(data.business_configs.min_location_accuracy),
);
let driver_loc_accuracy_buffer =
get_config(data.cac_tenant.clone(), "driver_location_accuracy_buffer")
.await
.unwrap_or(data.business_configs.driver_location_accuracy_buffer);
let locations = get_filtered_driver_locations(
last_known_location.as_ref(),
locations,
Accuracy(data.business_configs.min_location_accuracy),
data.business_configs.driver_location_accuracy_buffer,
min_loc_accuracy,
driver_loc_accuracy_buffer,
);
if !locations.is_empty() {
if locations.len() > data.business_configs.batch_size as usize {
if locations.len() > batch_sze {
warn!(
"On Ride Way points more than {} points after filtering => {} points",
data.business_configs.batch_size,
batch_sze.clone(),
locations.len()
);
}
Expand Down Expand Up @@ -310,9 +341,7 @@ async fn process_driver_locations(
)
.await?;

if on_ride_driver_locations_count + geo_entries.len() as i64
> data.business_configs.batch_size
{
if on_ride_driver_locations_count + geo_entries.len() as i64 > batch_sze as i64 {
let mut on_ride_driver_locations = get_on_ride_driver_locations(
&data.persistent_redis,
&driver_id,
Expand Down
135 changes: 68 additions & 67 deletions crates/location_tracking_service/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use std::{env::var, sync::Arc};

use crate::{
common::{cac::get_config, geo_polygon::read_geo_polygon, types::*},
common::{geo_polygon::read_geo_polygon, types::*},
tools::logger::LoggerConfig,
};
use rdkafka::{error::KafkaError, producer::FutureProducer, ClientConfig};
Expand Down Expand Up @@ -93,6 +93,7 @@ pub struct RedisConfig {
pub stream_read_count: u64,
}

#[derive(Clone)]
pub struct AppState {
pub non_persistent_redis: Arc<RedisConnectionPool>,
pub persistent_redis: Arc<RedisConnectionPool>,
Expand Down Expand Up @@ -272,69 +273,69 @@ impl AppState {
}
}

impl Clone for AppState {
fn clone(&self) -> Self {
info!("Cloning AppState");
let business_configs = futures::executor::block_on(async {
BusinessConfigs {
auth_token_expiry: get_config(self.cac_tenant.clone(), "auth_token_expiry")
.await
.unwrap_or(self.business_configs.auth_token_expiry),
min_location_accuracy: get_config(self.cac_tenant.clone(), "min_location_accuracy")
.await
.map_err(|err| {
log::error!("Error fetching min_location_accuracy: {}", err.message());
})
.unwrap_or(self.business_configs.min_location_accuracy),
last_location_timstamp_expiry: get_config(
self.cac_tenant.clone(),
"last_location_timstamp_expiry",
)
.await
.unwrap_or(self.business_configs.last_location_timstamp_expiry),
location_update_limit: get_config(self.cac_tenant.clone(), "location_update_limit")
.await
.unwrap_or(self.business_configs.location_update_limit),
location_update_interval: get_config(
self.cac_tenant.clone(),
"location_update_interval",
)
.await
.unwrap_or(self.business_configs.location_update_interval),
batch_size: get_config(self.cac_tenant.clone(), "batch_size")
.await
.unwrap_or(self.business_configs.batch_size),
driver_location_accuracy_buffer: get_config(
self.cac_tenant.clone(),
"driver_location_accuracy_buffer",
)
.await
.unwrap_or(self.business_configs.driver_location_accuracy_buffer),
}
});
AppState {
non_persistent_redis: self.non_persistent_redis.clone(),
persistent_redis: self.persistent_redis.clone(),
drainer_delay: self.drainer_delay,
drainer_size: self.drainer_size,
new_ride_drainer_delay: self.new_ride_drainer_delay,
sender: self.sender.clone(),
polygon: self.polygon.clone(),
blacklist_polygon: self.blacklist_polygon.clone(),
auth_url: self.auth_url.clone(),
auth_api_key: self.auth_api_key.clone(),
bulk_location_callback_url: self.bulk_location_callback_url.clone(),
redis_expiry: self.redis_expiry,
producer: self.producer.clone(),
driver_location_update_topic: self.driver_location_update_topic.clone(),
max_allowed_req_size: self.max_allowed_req_size,
log_unprocessible_req_body: self.log_unprocessible_req_body.clone(),
request_timeout: self.request_timeout,
blacklist_merchants: self.blacklist_merchants.clone(),
bucket_size: self.bucket_size,
nearby_bucket_threshold: self.nearby_bucket_threshold,
cac_tenant: self.cac_tenant.clone(),
business_configs,
}
}
}
// impl Clone for AppState {
// fn clone(&self) -> Self {
// info!("Cloning AppState");
// let business_configs = futures::executor::block_on(async {
// BusinessConfigs {
// auth_token_expiry: get_config(self.cac_tenant.clone(), "auth_token_expiry")
// .await
// .unwrap_or(self.business_configs.auth_token_expiry),
// min_location_accuracy: get_config(self.cac_tenant.clone(), "min_location_accuracy")
// .await
// .map_err(|err| {
// log::error!("Error fetching min_location_accuracy: {}", err.message());
// })
// .unwrap_or(self.business_configs.min_location_accuracy),
// last_location_timstamp_expiry: get_config(
// self.cac_tenant.clone(),
// "last_location_timstamp_expiry",
// )
// .await
// .unwrap_or(self.business_configs.last_location_timstamp_expiry),
// //D location_update_limit: get_config(self.cac_tenant.clone(), "location_update_limit")
// .await
// .unwrap_or(self.business_configs.location_update_limit),
// //D location_update_interval: get_config(
// self.cac_tenant.clone(),
// "location_update_interval",
// )
// .await
// .unwrap_or(self.business_configs.location_update_interval),
// //D batch_size: get_config(self.cac_tenant.clone(), "batch_size")
// .await
// .unwrap_or(self.business_configs.batch_size),
// //D driver_location_accuracy_buffer: get_config(
// self.cac_tenant.clone(),
// "driver_location_accuracy_buffer",
// )
// .await
// .unwrap_or(self.business_configs.driver_location_accuracy_buffer),
// }
// });
// AppState {
// non_persistent_redis: self.non_persistent_redis.clone(),
// persistent_redis: self.persistent_redis.clone(),
// drainer_delay: self.drainer_delay,
// drainer_size: self.drainer_size,
// new_ride_drainer_delay: self.new_ride_drainer_delay,
// sender: self.sender.clone(),
// polygon: self.polygon.clone(),
// blacklist_polygon: self.blacklist_polygon.clone(),
// auth_url: self.auth_url.clone(),
// auth_api_key: self.auth_api_key.clone(),
// bulk_location_callback_url: self.bulk_location_callback_url.clone(),
// redis_expiry: self.redis_expiry,
// producer: self.producer.clone(),
// driver_location_update_topic: self.driver_location_update_topic.clone(),
// max_allowed_req_size: self.max_allowed_req_size,
// log_unprocessible_req_body: self.log_unprocessible_req_body.clone(),
// request_timeout: self.request_timeout,
// blacklist_merchants: self.blacklist_merchants.clone(),
// bucket_size: self.bucket_size,
// nearby_bucket_threshold: self.nearby_bucket_threshold,
// cac_tenant: self.cac_tenant.clone(),
// business_configs,
// }
// }
// }

0 comments on commit 2226a9f

Please sign in to comment.