Skip to content

Commit

Permalink
LTS : Refactoring for CAC Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
khuzema786 committed Jan 2, 2024
1 parent 9f3fc96 commit ac656b7
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 402 deletions.
4 changes: 2 additions & 2 deletions crates/location_tracking_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ prometheus = { version = "0.13.3", features = ["process"] }

shared = { git = "https://github.com/nammayatri/shared-kernel-rs", branch = "main" }
macros = { git = "https://github.com/nammayatri/shared-kernel-rs", branch = "main" }
cac_client = { git = "ssh://[email protected]/picaf/context-aware-config.git", rev = "555ed8b" }
superposition_client = { git = "ssh://[email protected]/picaf/context-aware-config.git", rev = "555ed8b" }
cac_client = { git = "https://github.com/khuzema786/context-aware-config.git", rev = "7dceb4c" }
superposition_client = { git = "https://github.com/khuzema786/context-aware-config.git", rev = "7dceb4c" }

[dev-dependencies]
pprof = { version = "0.12", features = ["flamegraph"] }
105 changes: 70 additions & 35 deletions crates/location_tracking_service/src/common/cac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,35 @@
*/

use crate::environment::{CacConfig, SuperpositionClientConfig};
use crate::tools::error::AppError;
use actix_web::rt;
use cac_client as cac;
use shared::tools::error::AppError;
use rand::Rng;
use serde::de::DeserializeOwned;
use serde_json::json;
use std::time::Duration;
use superposition_client as spclient;

pub async fn init_cac_clients(cac_conf: CacConfig) -> Result<(), AppError> {
let cac_hostname: String = cac_conf.cac_hostname;
let hostname = cac_conf.cac_hostname;
let polling_interval: Duration = Duration::from_secs(cac_conf.cac_polling_interval);
let update_cac_periodically = cac_conf.update_cac_periodically;
let cac_tenants: Vec<String> = cac_conf.cac_tenants;
let tenant = cac_conf.cac_tenant;

for tenant in cac_tenants {
cac::CLIENT_FACTORY
.create_client(
tenant.to_string(),
update_cac_periodically,
polling_interval,
cac_hostname.to_string(),
)
.await
.map_err(|err| {
AppError::CacConfigFailed(format!(
"{}: Failed to acquire cac_client : {}",
tenant, err
))
})?;
}
cac::CLIENT_FACTORY
.create_client(
tenant.to_owned(),
update_cac_periodically,
polling_interval,
hostname,
)
.await
.map_err(|err| {
AppError::CacClientInitFailed(format!(
"{}: Failed to acquire cac_client : {}",
tenant, err
))
})?;

Ok(())
}
Expand All @@ -45,23 +46,57 @@ pub async fn init_superposition_clients(
) -> Result<(), AppError> {
let hostname: String = superposition_client_config.superposition_hostname;
let poll_frequency = superposition_client_config.superposition_poll_frequency;
let cac_tenants: Vec<String> = cac_conf.cac_tenants;
let tenant = cac_conf.cac_tenant;

for tenant in cac_tenants {
rt::spawn(
spclient::CLIENT_FACTORY
.create_client(tenant.to_string(), poll_frequency, hostname.to_string())
.await
.map_err(|err| {
AppError::CacConfigFailed(format!(
"{}: Failed to acquire superposition_client : {}",
tenant, err
))
})?
.clone()
.run_polling_updates(),
);
}
rt::spawn(
spclient::CLIENT_FACTORY
.create_client(tenant.to_owned(), poll_frequency, hostname)
.await
.map_err(|err| {
AppError::CacClientInitFailed(format!(
"Failed to acquire superposition_client for tenent {} : {}",
tenant, err
))
})?
.clone()
.run_polling_updates(),
);

Ok(())
}

pub async fn get_config<T>(tenant_name: String, key: &str) -> Result<T, AppError>
where
T: DeserializeOwned,
{
let cac_client = cac::CLIENT_FACTORY.get_client(tenant_name.to_owned());
let superpostion_client = spclient::CLIENT_FACTORY
.get_client(tenant_name.to_owned())
.await;

match (cac_client, superpostion_client) {
(Ok(cac_client), Ok(superpostion_client)) => {
let mut ctx = serde_json::Map::new();
let variant_ids = superpostion_client
.get_applicable_variant(&json!(ctx.clone()), rand::thread_rng().gen_range(1..100))
.await;
ctx.insert(String::from("variantIds"), variant_ids.into());
let res = cac_client
.eval(ctx)
.map_err(|err| AppError::CacConfigFailed(err.to_string()))?;

match res.get(key) {
Some(val) => Ok(serde_json::from_value(val.clone())
.map_err(|err| AppError::CacConfigFailed(err.to_string()))?),
None => Err(AppError::CacConfigFailed(format!(
"Key does not exist in cac client's response for tenant {}",
tenant_name
))),
}
}
_ => Err(AppError::CacConfigFailed(format!(
"Failed to fetch instance of cac client or superposition client for tenant {}",
tenant_name
))),
}
}
87 changes: 1 addition & 86 deletions crates/location_tracking_service/src/common/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::environment::BuisnessConfigs;

/* Copyright 2022-23, Juspay India Pvt Ltd
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License
as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program
Expand All @@ -11,11 +9,7 @@ use super::types::*;
use crate::tools::error::AppError;
use cac_client as cac;
use geo::{point, Intersects};
use rand::Rng;
use serde_json::{json, Map, Value};
use shared::tools::error::AppError;
use std::{f64::consts::PI, sync::Arc};
use superposition_client as spclient;
use std::f64::consts::PI;

/// Retrieves the name of the city based on latitude and longitude coordinates.
///
Expand Down Expand Up @@ -228,82 +222,3 @@ pub fn distance_between_in_meters(latlong1: &Point, latlong2: &Point) -> f64 {
pub fn cat_maybes<T>(options: Vec<Option<T>>) -> Vec<T> {
options.into_iter().flatten().collect()
}

pub fn get_cac_client(tenant_name: String) -> Result<Arc<cac::Client>, String> {
cac::CLIENT_FACTORY.get_client(tenant_name)
}

pub async fn get_superposition_client(
tenant_name: String,
) -> Result<Arc<spclient::Client>, String> {
spclient::CLIENT_FACTORY.get_client(tenant_name).await
}

pub async fn get_config_from_cac_client(
tenant_name: String,
key: String,
mut ctx: Map<String, Value>,
buisness_cfgs: BuisnessConfigs,
toss: i8,
) -> Result<Value, AppError> {
let cacclient: Result<Arc<cac::Client>, String> = get_cac_client(tenant_name.clone());
let superclient: Result<Arc<spclient::Client>, String> =
get_superposition_client(tenant_name.clone()).await;
match (cacclient, superclient) {
(Ok(cacclient), Ok(superclient)) => {
let variant_ids = superclient
.get_applicable_variant(&json!(ctx.clone()), toss)
.await;
ctx.insert(String::from("variantIds"), variant_ids.clone().into());
let res = cacclient.eval(ctx.clone());
match res {
Ok(res) => match res.get(&key) {
Some(val) => Ok(val.clone()),
_ => {
log::error!("Key does not exist in cac client's response for tenant {}, trying fetch the same key from default config", tenant_name);
get_default_config(tenant_name, key, buisness_cfgs).await
}
},
_ => {
log::error!("Failed to fetch config from cac client for tenant {}, trying fetch default config", tenant_name);
get_default_config(tenant_name, key, buisness_cfgs).await
}
}
}
(Err(_), Ok(_)) => {
log::error!(
"Failed to fetch instance of cac client for tenant {}",
tenant_name
);
get_default_config(tenant_name, key, buisness_cfgs).await
}
(Ok(_), Err(_)) => {
log::error!(
"Failed to fetch instance of superposition client for tenant {}",
tenant_name
);
get_default_config(tenant_name, key, buisness_cfgs).await
}
_ => {
log::error!(
"Failed to fetch instance of cac client and superposition client for tenant {}",
tenant_name
);
get_default_config(tenant_name, key, buisness_cfgs).await
}
}
}

pub async fn get_default_config(
tenant_name: String,
key: String,
def_buisness_cfgs: BuisnessConfigs,
) -> Result<Value, AppError> {
println!("Fetching default config for tenant {}", tenant_name);
def_buisness_cfgs.get_field(&key)
}

pub fn get_random_number() -> i8 {
let mut rng = rand::thread_rng();
rng.gen_range(1..100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn ride_end(
&data.persistent_redis,
&request_body.driver_id,
&request_body.merchant_id,
data.batch_size,
data.business_configs.batch_size,
)
.await?;

Expand Down
24 changes: 13 additions & 11 deletions crates/location_tracking_service/src/domain/action/ui/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ pub async fn update_driver_location(
&data.persistent_redis,
&data.auth_url,
&data.auth_api_key,
&data.auth_token_expiry,
&data.business_configs.auth_token_expiry,
&token,
&merchant_id,
)
.await?;

if locations.len() > data.batch_size as usize {
if locations.len() > data.business_configs.batch_size as usize {
warn!(
"Way points more than {} points => {} points",
data.batch_size,
data.business_configs.batch_size,
locations.len()
);
}
Expand Down Expand Up @@ -138,8 +138,8 @@ pub async fn update_driver_location(
sliding_window_limiter(
&data.persistent_redis,
&sliding_rate_limiter_key(&driver_id, &city, &merchant_id),
data.location_update_limit,
data.location_update_interval as u32,
data.business_configs.location_update_limit,
data.business_configs.location_update_interval as u32,
)
.await?;

Expand Down Expand Up @@ -219,7 +219,7 @@ async fn process_driver_locations(
let set_driver_last_location_update = async {
set_driver_last_location_update(
&data.persistent_redis,
&data.last_location_timstamp_expiry,
&data.business_configs.last_location_timstamp_expiry,
&driver_id,
&merchant_id,
&latest_driver_location.pt,
Expand Down Expand Up @@ -273,14 +273,14 @@ async fn process_driver_locations(
let locations = get_filtered_driver_locations(
last_known_location.as_ref(),
locations,
data.min_location_accuracy,
data.driver_location_accuracy_buffer,
data.business_configs.min_location_accuracy,
data.business_configs.driver_location_accuracy_buffer,
);
if !locations.is_empty() {
if locations.len() > data.batch_size as usize {
if locations.len() > data.business_configs.batch_size as usize {
warn!(
"On Ride Way points more than {} points after filtering => {} points",
data.batch_size,
data.business_configs.batch_size,
locations.len()
);
}
Expand Down Expand Up @@ -310,7 +310,9 @@ async fn process_driver_locations(
)
.await?;

if on_ride_driver_locations_count + geo_entries.len() as i64 > data.batch_size {
if on_ride_driver_locations_count + geo_entries.len() as i64
> data.business_configs.batch_size
{
let mut on_ride_driver_locations = get_on_ride_driver_locations(
&data.persistent_redis,
&driver_id,
Expand Down
Loading

0 comments on commit ac656b7

Please sign in to comment.