Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search improvement #59

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ name = "monocle"
path = "src/bin/monocle.rs"

[dependencies]
bgpkit-broker = "0.7.0-beta.5"
bgpkit-parser = { version = "0.10.5", features = ["serde"] }
bgpkit-broker = "0.7.0"
bgpkit-parser = { version = "0.10.9", features = ["serde"] }
oneio = { version = "0.16.7", default-features = false, features = ["remote", "gz", "bz"] }

clap = { version = "4.1", features = ["derive"] }
itertools = "0.12"
itertools = "0.13.0"
rayon = "1.8"
tracing = "0.1"
tracing-subscriber = "0.3"
Expand All @@ -35,7 +35,7 @@ anyhow = "1.0"
tabled = "0.14"
config = { version = "0.13", features = ["toml"] }
dirs = "5"
rusqlite = { version = "0.30", features = ["bundled"] }
rusqlite = { version = "0.31", features = ["bundled"] }
ureq = { version = "2.9", features = ["json"] }
regex = "1.10"
rpki = { version = "0.16.1", features = ["repository"] }
Expand Down
53 changes: 31 additions & 22 deletions src/bin/monocle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(clippy::type_complexity)]
use std::io::Write;
use std::net::IpAddr;
use std::path::PathBuf;
Expand Down Expand Up @@ -363,16 +364,19 @@ enum RadarCommands {
},
}

fn elem_to_string(elem: &BgpElem, json: bool, pretty: bool) -> String {
fn elem_to_string(elem: &BgpElem, json: bool, pretty: bool, collector: &str) -> String {
if json {
let val = json!(elem);
let mut val = json!(elem);
val.as_object_mut()
.unwrap()
.insert("collector".to_string(), collector.into());
if pretty {
serde_json::to_string_pretty(&val).unwrap()
} else {
val.to_string()
}
} else {
elem.to_string()
format!("{}|{}", elem, collector)
}
}

Expand Down Expand Up @@ -402,8 +406,9 @@ fn main() {
return;
}

let file_path = file_path.to_str().unwrap();
let parser = parser_with_filters(
file_path.to_str().unwrap(),
file_path,
&filters.origin_asn,
&filters.prefix,
&filters.include_super,
Expand All @@ -419,7 +424,7 @@ fn main() {

let mut stdout = std::io::stdout();
for elem in parser {
let output_str = elem_to_string(&elem, json, pretty);
let output_str = elem_to_string(&elem, json, pretty, "");
if let Err(e) = writeln!(stdout, "{}", &output_str) {
if e.kind() != std::io::ErrorKind::BrokenPipe {
eprintln!("{e}");
Expand Down Expand Up @@ -503,18 +508,19 @@ fn main() {
return;
}

let (sender, receiver): (Sender<BgpElem>, Receiver<BgpElem>) = channel();
let (sender, receiver): (Sender<(BgpElem, String)>, Receiver<(BgpElem, String)>) =
channel();
// progress bar
let (pb_sender, pb_receiver): (Sender<u8>, Receiver<u8>) = channel();
let (pb_sender, pb_receiver): (Sender<u32>, Receiver<u32>) = channel();

// dedicated thread for handling output of results
let writer_thread = thread::spawn(move || match sqlite_db {
Some(db) => {
let mut msg_cache = vec![];
let mut msg_count = 0;
for elem in receiver {
for (elem, collector) in receiver {
msg_count += 1;
msg_cache.push(elem);
msg_cache.push((elem, collector));
if msg_cache.len() >= 100000 {
db.insert_elems(&msg_cache);
msg_cache.clear();
Expand All @@ -527,8 +533,8 @@ fn main() {
println!("processed {total_items} files, found {msg_count} messages, written into file {sqlite_path_str}");
}
None => {
for elem in receiver {
let output_str = elem_to_string(&elem, json, pretty);
for (elem, collector) in receiver {
let output_str = elem_to_string(&elem, json, pretty, collector.as_str());
println!("{output_str}");
}
}
Expand All @@ -541,24 +547,25 @@ fn main() {
}

let sty = indicatif::ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {eta}",
"[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {eta} left; {msg}",
)
.unwrap()
.progress_chars("##-");
let pb = indicatif::ProgressBar::new(total_items as u64);
pb.set_style(sty);
for _ in pb_receiver.iter() {
let mut total_count: u64 = 0;
for count in pb_receiver.iter() {
total_count += count as u64;
pb.set_message(format!("found {total_count} messages"));
pb.inc(1);
}
});

let urls = items
.iter()
.map(|x| x.url.to_string())
.collect::<Vec<String>>();

urls.into_par_iter()
.for_each_with((sender, pb_sender), |(s, pb_sender), url| {
items
.into_par_iter()
.for_each_with((sender, pb_sender), |(s, pb_sender), item| {
let url = item.url;
let collector = item.collector_id;
info!("start parsing {}", url.as_str());
let parser = parser_with_filters(
url.as_str(),
Expand All @@ -575,12 +582,14 @@ fn main() {
)
.unwrap();

let mut elems_count = 0;
for elem in parser {
s.send(elem).unwrap()
s.send((elem, collector.clone())).unwrap();
elems_count += 1;
}

if show_progress {
pb_sender.send(0).unwrap();
pb_sender.send(elems_count).unwrap();
}
info!("finished parsing {}", url.as_str());
});
Expand Down
31 changes: 9 additions & 22 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ impl MsgStore {
}

fn initialize_msgs_db(db: &mut MonocleDatabase, reset: bool) {
if reset {
db.conn.execute("drop table if exists elems", []).unwrap();
}
db.conn
.execute(
r#"
create table if not exists elems (
timestamp INTEGER,
elem_type TEXT,
collector TEXT,
peer_ip TEXT,
peer_asn INTEGER,
prefix TEXT,
Expand All @@ -63,10 +67,6 @@ impl MsgStore {
[],
)
.unwrap();

if reset {
db.conn.execute("delete from elems", []).unwrap();
}
}

#[inline(always)]
Expand All @@ -78,11 +78,11 @@ impl MsgStore {
}
}

pub fn insert_elems(&self, elems: &[BgpElem]) {
pub fn insert_elems(&self, elems: &[(BgpElem, String)]) {
for elems in elems.chunks(10000) {
let values = elems
.iter()
.map(|elem| {
.map(|(elem, collector)| {
let t = match elem.elem_type {
// bgpkit_parser::ElemType::ANNOUNCE => "A",
// bgpkit_parser::ElemType::WITHDRAW => "W",
Expand All @@ -91,9 +91,10 @@ impl MsgStore {
};
let origin_string = elem.origin_asns.as_ref().map(|asns| asns.first().unwrap());
format!(
"('{}','{}','{}','{}','{}', {},{},{},{},{},{},{},'{}',{},{})",
"('{}','{}','{}', '{}','{}','{}', {},{},{},{},{},{},{},'{}',{},{})",
elem.timestamp as u32,
t,
collector,
elem.peer_ip,
elem.peer_asn,
elem.prefix,
Expand All @@ -116,7 +117,7 @@ impl MsgStore {
.to_string();
let query = format!(
"INSERT INTO elems (\
timestamp, elem_type, peer_ip, peer_asn, prefix, next_hop, \
timestamp, elem_type, collector, peer_ip, peer_asn, prefix, next_hop, \
as_path, origin_asns, origin, local_pref, med, communities,\
atomic, aggr_asn, aggr_ip)\
VALUES {values};"
Expand All @@ -125,17 +126,3 @@ impl MsgStore {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use bgpkit_parser::BgpkitParser;

#[test]
fn test_insert() {
let store = MsgStore::new(&Some("test.sqlite3".to_string()), false);
let url = "https://spaces.bgpkit.org/parser/update-example.gz";
let elems: Vec<BgpElem> = BgpkitParser::new(url).unwrap().into_elem_iter().collect();
store.insert_elems(&elems);
}
}