Skip to content

Commit

Permalink
Merge pull request #50 from bgpkit/feature/new-collectors
Browse files Browse the repository at this point in the history
add new route-views collectors
  • Loading branch information
digizeph authored Aug 6, 2024
2 parents 7a4d2ee + b8c098c commit bbfeee0
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 114 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1.0"
tracing = "0.1"
lazy_static = "1"
dotenvy = "0.15"

#############################################
# Optional dependencies
Expand All @@ -44,18 +46,16 @@ tracing-subscriber = { version = "0.3", optional = true }
indicatif = { version = "0.17.7", optional = true }
futures-util = { version = "0.3.28", optional = true }
itertools = { version = "0.12.0", optional = true }
dotenvy = { version = "0.15", optional = true }
tempfile = { version = "3.8", optional = true }
which = { version = "5.0", optional = true }
bgpkit-commons = { version = "0.5", optional = true }

# crawler dependencies
futures = { version = "0.3", optional = true }
oneio = { version = "0.16.0", features = ["s3"], optional = true }
oneio = { version = "0.17.0", features = ["s3"], optional = true }
regex = { version = "1", optional = true }
scraper = { version = "0.17", optional = true }
tokio = { version = "1", optional = true, features = ["full"] }
lazy_static = { version = "1", optional = true }

# api dependencies
axum = { version = "0.7", optional = true }
Expand All @@ -73,10 +73,10 @@ async-nats = { version = "0.34.0", optional = true }
default = []
cli = [
# command-line interface
"clap", "dirs", "humantime", "num_cpus", "tracing-subscriber", "tabled", "itertools", "dotenvy", "tempfile", "which",
"clap", "dirs", "humantime", "num_cpus", "tracing-subscriber", "tabled", "itertools", "tempfile", "which",
"bgpkit-commons",
# crawler
"futures", "oneio", "regex", "scraper", "tokio", "lazy_static",
"futures", "oneio", "regex", "scraper", "tokio",
# notification
"nats",
# database
Expand Down
61 changes: 33 additions & 28 deletions src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async fn try_send_heartbeat(url: Option<String>) -> Result<(), BrokerError> {

/// update the database with data crawled from the given collectors
async fn update_database(
db: LocalBrokerDb,
mut db: LocalBrokerDb,
collectors: Vec<Collector>,
days: Option<u32>,
send_heartbeat: bool,
Expand All @@ -258,31 +258,28 @@ async fn update_database(
};

let now = Utc::now();
let latest_date;
if let Some(d) = days {
// if days is specified, we crawl data from d days ago
latest_date = Some(Utc::now().date_naive() - Duration::days(d as i64));
} else {
// otherwise, we crawl data from the latest timestamp in the database
latest_date = match db.get_latest_timestamp().await.unwrap().map(|t| t.date()) {
Some(t) => {
let start_date = t - Duration::days(1);
info!(
"update broker db from the latest date - 1 in db: {}",
start_date
);
Some(start_date)
}
None => {
// if bootstrap is false and we have an empty database we crawl data from 30 days ago
let date = Utc::now().date_naive() - Duration::days(30);
info!(
"empty database, bootstrapping data from {} days ago ({})",
30, date
);
Some(date)
}
};

let latest_ts_map: HashMap<String, NaiveDateTime> = db
.get_latest_files()
.await
.into_iter()
.map(|f| (f.collector_id.clone(), f.ts_start))
.collect();

let mut collector_updated = false;
for c in &collectors {
if !latest_ts_map.contains_key(&c.id) {
info!(
"collector {} not found in database, inserting collector meta information first...",
&c.id
);
db.insert_collector(c).await.unwrap();
collector_updated = true;
}
}
if collector_updated {
info!("collector list updated, reload collectors list into memory");
db.reload_collectors().await;
}

// crawl all collectors in parallel, 5 collectors in parallel by default, unordered.
Expand All @@ -292,7 +289,15 @@ async fn update_database(
debug!("unordered buffer size is {}", BUFFER_SIZE);

let mut stream = futures::stream::iter(&collectors)
.map(|c| crawl_collector(c, latest_date))
.map(|c| {
let latest_date;
if let Some(d) = days {
latest_date = Some(Utc::now().date_naive() - Duration::days(d as i64));
} else {
latest_date = latest_ts_map.get(&c.id).cloned().map(|ts| ts.date());
}
crawl_collector(c, latest_date)
})
.buffer_unordered(BUFFER_SIZE);

info!(
Expand Down Expand Up @@ -715,7 +720,7 @@ fn main() {
data_url: collector.data_url.clone(),
}
})
.sorted_by(|a, b| a.activated_on.cmp(&b.activated_on))
.sorted_by(|a, b| a.name.cmp(&b.name))
.collect();

if missing_collectors.is_empty() {
Expand Down
122 changes: 84 additions & 38 deletions src/crawler/collector.rs → src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::BrokerError;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::info;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -12,7 +13,7 @@ pub struct Collector {

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Config {
projects: Vec<ConfProject>,
pub projects: Vec<ConfProject>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ConfProject {
Expand All @@ -25,6 +26,19 @@ pub struct ConfCollector {
url: String,
}

impl Config {
pub fn to_project_map(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
for p in &self.projects {
let project = p.name.clone();
for c in &p.collectors {
map.insert(c.id.clone(), project.clone());
}
}
map
}
}

pub fn load_collectors() -> Result<Vec<Collector>, BrokerError> {
// load config
info!("loading default collectors config");
Expand Down Expand Up @@ -166,153 +180,185 @@ lazy_static! {
{
"name": "routeviews",
"collectors": [
{
"id": "amsix.ams",
"url": "https://archive.routeviews.org/amsix.ams/bgpdata"
},
{
"id": "cix.atl",
"url": "https://archive.routeviews.org/cix.atl/bgpdata"
},
{
"id": "decix.jhb",
"url": "https://archive.routeviews.org/decix.jhb/bgpdata"
},
{
"id": "iraq-ixp.bgw",
"url": "https://archive.routeviews.org/iraq-ixp.bgw/bgpdata"
},
{
"id": "pacwave.lax",
"url": "https://archive.routeviews.org/pacwave.lax/bgpdata"
},
{
"id": "pit.scl",
"url": "https://archive.routeviews.org/pit.scl/bgpdata"
},
{
"id": "pitmx.qro",
"url": "https://archive.routeviews.org/pitmx.qro/bgpdata"
},
{
"id": "route-views2",
"url": "http://archive.routeviews.org/bgpdata"
"url": "https://archive.routeviews.org/bgpdata"
},
{
"id": "route-views3",
"url": "http://archive.routeviews.org/route-views3/bgpdata"
"url": "https://archive.routeviews.org/route-views3/bgpdata"
},
{
"id": "route-views4",
"url": "http://archive.routeviews.org/route-views4/bgpdata"
"url": "https://archive.routeviews.org/route-views4/bgpdata"
},
{
"id": "route-views5",
"url": "http://archive.routeviews.org/route-views5/bgpdata"
"url": "https://archive.routeviews.org/route-views5/bgpdata"
},
{
"id": "route-views6",
"url": "http://archive.routeviews.org/route-views6/bgpdata"
"url": "https://archive.routeviews.org/route-views6/bgpdata"
},
{
"id": "route-views7",
"url": "https://archive.routeviews.org/route-views7/bgpdata"
},
{
"id":"route-views.amsix",
"url": "http://archive.routeviews.org/route-views.amsix/bgpdata"
"url": "https://archive.routeviews.org/route-views.amsix/bgpdata"
},
{
"id":"route-views.chicago",
"url": "http://archive.routeviews.org/route-views.chicago/bgpdata"
"url": "https://archive.routeviews.org/route-views.chicago/bgpdata"
},
{
"id":"route-views.chile",
"url": "http://archive.routeviews.org/route-views.chile/bgpdata"
"url": "https://archive.routeviews.org/route-views.chile/bgpdata"
},
{
"id":"route-views.eqix",
"url": "http://archive.routeviews.org/route-views.eqix/bgpdata"
"url": "https://archive.routeviews.org/route-views.eqix/bgpdata"
},
{
"id":"route-views.flix",
"url": "http://archive.routeviews.org/route-views.flix/bgpdata"
"url": "https://archive.routeviews.org/route-views.flix/bgpdata"
},
{
"id":"route-views.gorex",
"url": "http://archive.routeviews.org/route-views.gorex/bgpdata"
"url": "https://archive.routeviews.org/route-views.gorex/bgpdata"
},
{
"id":"route-views.isc",
"url": "http://archive.routeviews.org/route-views.isc/bgpdata"
"url": "https://archive.routeviews.org/route-views.isc/bgpdata"
},
{
"id":"route-views.kixp",
"url": "http://archive.routeviews.org/route-views.kixp/bgpdata"
"url": "https://archive.routeviews.org/route-views.kixp/bgpdata"
},
{
"id":"route-views.jinx",
"url": "http://archive.routeviews.org/route-views.jinx/bgpdata"
"url": "https://archive.routeviews.org/route-views.jinx/bgpdata"
},
{
"id":"route-views.linx",
"url": "http://archive.routeviews.org/route-views.linx/bgpdata"
"url": "https://archive.routeviews.org/route-views.linx/bgpdata"
},
{
"id":"route-views.napafrica",
"url": "http://archive.routeviews.org/route-views.napafrica/bgpdata"
"url": "https://archive.routeviews.org/route-views.napafrica/bgpdata"
},
{
"id":"route-views.nwax",
"url": "http://archive.routeviews.org/route-views.nwax/bgpdata"
"url": "https://archive.routeviews.org/route-views.nwax/bgpdata"
},
{
"id":"route-views.phoix",
"url": "http://archive.routeviews.org/route-views.phoix/bgpdata"
"url": "https://archive.routeviews.org/route-views.phoix/bgpdata"
},
{
"id":"route-views.telxatl",
"url": "http://archive.routeviews.org/route-views.telxatl/bgpdata"
"url": "https://archive.routeviews.org/route-views.telxatl/bgpdata"
},
{
"id":"route-views.wide",
"url": "http://archive.routeviews.org/route-views.wide/bgpdata"
"url": "https://archive.routeviews.org/route-views.wide/bgpdata"
},
{
"id":"route-views.sydney",
"url": "http://archive.routeviews.org/route-views.sydney/bgpdata"
"url": "https://archive.routeviews.org/route-views.sydney/bgpdata"
},
{
"id":"route-views.saopaulo",
"url": "http://archive.routeviews.org/route-views.saopaulo/bgpdata"
"url": "https://archive.routeviews.org/route-views.saopaulo/bgpdata"
},
{
"id":"route-views2.saopaulo",
"url": "http://archive.routeviews.org/route-views2.saopaulo/bgpdata"
"url": "https://archive.routeviews.org/route-views2.saopaulo/bgpdata"
},
{
"id":"route-views.sg",
"url": "http://archive.routeviews.org/route-views.sg/bgpdata"
"url": "https://archive.routeviews.org/route-views.sg/bgpdata"
},
{
"id":"route-views.perth",
"url": "http://archive.routeviews.org/route-views.perth/bgpdata"
"url": "https://archive.routeviews.org/route-views.perth/bgpdata"
},
{
"id":"route-views.peru",
"url": "http://archive.routeviews.org/route-views.peru/bgpdata"
"url": "https://archive.routeviews.org/route-views.peru/bgpdata"
},
{
"id":"route-views.sfmix",
"url": "http://archive.routeviews.org/route-views.sfmix/bgpdata"
"url": "https://archive.routeviews.org/route-views.sfmix/bgpdata"
},
{
"id":"route-views.siex",
"url": "http://archive.routeviews.org/route-views.siex/bgpdata"
"url": "https://archive.routeviews.org/route-views.siex/bgpdata"
},
{
"id":"route-views.soxrs",
"url": "http://archive.routeviews.org/route-views.soxrs/bgpdata"
"url": "https://archive.routeviews.org/route-views.soxrs/bgpdata"
},
{
"id":"route-views.mwix",
"url": "http://archive.routeviews.org/route-views.mwix/bgpdata"
"url": "https://archive.routeviews.org/route-views.mwix/bgpdata"
},
{
"id":"route-views.rio",
"url": "http://archive.routeviews.org/route-views.rio/bgpdata"
"url": "https://archive.routeviews.org/route-views.rio/bgpdata"
},
{
"id":"route-views.fortaleza",
"url": "http://archive.routeviews.org/route-views.fortaleza/bgpdata"
"url": "https://archive.routeviews.org/route-views.fortaleza/bgpdata"
},
{
"id":"route-views.gixa",
"url": "http://archive.routeviews.org/route-views.gixa/bgpdata"
"url": "https://archive.routeviews.org/route-views.gixa/bgpdata"
},
{
"id":"route-views.bdix",
"url": "http://archive.routeviews.org/route-views.bdix/bgpdata"
"url": "https://archive.routeviews.org/route-views.bdix/bgpdata"
},
{
"id":"route-views.bknix",
"url": "http://archive.routeviews.org/route-views.bknix/bgpdata"
"url": "https://archive.routeviews.org/route-views.bknix/bgpdata"
},
{
"id":"route-views.ny",
"url": "http://archive.routeviews.org/route-views.ny/bgpdata"
"url": "https://archive.routeviews.org/route-views.ny/bgpdata"
},
{
"id":"route-views.uaeix",
"url": "http://archive.routeviews.org/route-views.uaeix/bgpdata"
"url": "https://archive.routeviews.org/route-views.uaeix/bgpdata"
}
]
}
Expand Down
Loading

0 comments on commit bbfeee0

Please sign in to comment.