From f912a8732f536e1612390fcd20a1bed5b7adf7cf Mon Sep 17 00:00:00 2001 From: wimax-grapl <69007229+wimax-grapl@users.noreply.github.com> Date: Sat, 5 Sep 2020 13:22:00 -0700 Subject: [PATCH] Metric Forwarder lambda (doesn't forward yet; strictly a skeleton for me to bolt more stuff into next) (#245) --- src/rust/Cargo.lock | 56 +++++++++++++- src/rust/Cargo.toml | 1 + src/rust/Dockerfile | 8 +- src/rust/metric-forwarder/Cargo.toml | 25 +++++++ src/rust/metric-forwarder/Dockerfile | 5 ++ .../metric-forwarder/based_on_github_repo.md | 27 +++++++ .../metric-forwarder/src/deser_logs_data.rs | 41 +++++++++++ src/rust/metric-forwarder/src/error.rs | 13 ++++ src/rust/metric-forwarder/src/main.rs | 73 +++++++++++++++++++ 9 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 src/rust/metric-forwarder/Cargo.toml create mode 100644 src/rust/metric-forwarder/Dockerfile create mode 100644 src/rust/metric-forwarder/based_on_github_repo.md create mode 100644 src/rust/metric-forwarder/src/deser_logs_data.rs create mode 100644 src/rust/metric-forwarder/src/error.rs create mode 100644 src/rust/metric-forwarder/src/main.rs diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 46fbecf78d..58e06023b3 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -9,6 +9,12 @@ dependencies = [ "gimli", ] +[[package]] +name = "adler" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" + [[package]] name = "adler32" version = "1.1.0" @@ -181,7 +187,7 @@ dependencies = [ "addr2line", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.3.7", "object", "rustc-demangle", ] @@ -435,6 +441,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" +[[package]] +name = "crc32fast" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.4.2" @@ -803,6 +818,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +[[package]] +name = "flate2" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "766d0e77a2c1502169d4a93ff3b8c15a71fd946cd0126309752104e5f3c46d94" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide 0.4.1", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1721,6 +1748,24 @@ dependencies = [ "autocfg 1.0.0", ] +[[package]] +name = "metric-forwarder" +version = "0.0.1" +dependencies = [ + "async-trait", + "aws_lambda_events 0.2.7", + "base64 0.10.1", + "flate2", + "grapl-config", + "lambda_runtime", + "log 0.4.8", + "serde_json", + "thiserror", + "tokio 0.2.21", + "tokio-compat", + "uuid 0.8.1", +] + [[package]] name = "miniz_oxide" version = "0.3.7" @@ -1730,6 +1775,15 @@ dependencies = [ "adler32", ] +[[package]] +name = "miniz_oxide" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d7559a8a40d0f97e1edea3220f698f78b1c5ab67532e49f68fde3910323b722" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.6.22" diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index e047ff51b4..17be8134af 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -9,6 +9,7 @@ members = [ "./graph-merger", "./grapl-config", "./grapl-observe", + "./metric-forwarder", "./node-identifier", "./sysmon-subgraph-generator", ] diff --git a/src/rust/Dockerfile b/src/rust/Dockerfile index 2d71801c4b..0072432d87 100644 --- a/src/rust/Dockerfile +++ b/src/rust/Dockerfile @@ -67,7 +67,10 @@ COPY --chown=grapl ./grapl-config/Cargo.toml grapl-config/ RUN mkdir -p grapl-observe/src RUN touch grapl-observe/src/lib.rs COPY --chown=grapl ./grapl-observe/Cargo.toml grapl-observe/ - +# copy in Cargo.toml for metric-forwarder +RUN mkdir -p metric-forwarder/src +RUN touch metric-forwarder/src/lib.rs +COPY --chown=grapl ./metric-forwarder/Cargo.toml metric-forwarder/ # copy in Cargo.toml for graph-merger RUN mkdir -p graph-merger/src RUN echo 'fn main() {}' > graph-merger/src/main.rs @@ -114,6 +117,7 @@ RUN rm -rf target/*/*/.fingerprint/graph-merger* RUN rm -rf target/*/*/.fingerprint/grapl-config* RUN rm -rf target/*/*/.fingerprint/grapl-graph-descriptions* RUN rm -rf target/*/*/.fingerprint/grapl-observe* +RUN rm -rf target/*/*/.fingerprint/metric-forwarder* RUN rm -rf target/*/*/.fingerprint/node-identifier* RUN rm -rf target/*/*/.fingerprint/sysmon-subgraph-generator* # copy in the sources @@ -125,6 +129,7 @@ COPY --chown=grapl ./graph-generator-lib graph-generator-lib COPY --chown=grapl ./graph-merger graph-merger COPY --chown=grapl ./grapl-config grapl-config COPY --chown=grapl ./grapl-observe grapl-observe +COPY --chown=grapl ./metric-forwarder metric-forwarder COPY --chown=grapl ./node-identifier node-identifier COPY --chown=grapl ./sysmon-subgraph-generator sysmon-subgraph-generator # copy in the top-level Cargo.toml and Cargo.lock @@ -145,6 +150,7 @@ RUN mkdir -p /home/grapl/dist && \ cp target/x86_64-unknown-linux-musl/${release_target}/analyzer-dispatcher dist/analyzer-dispatcher && \ cp target/x86_64-unknown-linux-musl/${release_target}/generic-subgraph-generator dist/generic-subgraph-generator && \ cp target/x86_64-unknown-linux-musl/${release_target}/graph-merger dist/graph-merger && \ + cp target/x86_64-unknown-linux-musl/${release_target}/metric-forwarder dist/metric-forwarder && \ cp target/x86_64-unknown-linux-musl/${release_target}/node-identifier dist/node-identifier && \ cp target/x86_64-unknown-linux-musl/${release_target}/node-identifier-retry-handler dist/node-identifier-retry-handler && \ cp target/x86_64-unknown-linux-musl/${release_target}/sysmon-subgraph-generator dist/sysmon-subgraph-generator \ diff --git a/src/rust/metric-forwarder/Cargo.toml b/src/rust/metric-forwarder/Cargo.toml new file mode 100644 index 0000000000..7143dc0ea8 --- /dev/null +++ b/src/rust/metric-forwarder/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "metric-forwarder" +version = "0.0.1" +authors = ["Max Wittek "] +edition = "2018" + +[dependencies] +grapl-config = {path="../grapl-config", version="*"} + +async-trait = "0.1.22" +aws_lambda_events = "0.2.1" +base64 = "0.10.1" +flate2 = "1.0.6" +lambda_runtime = "0.2.*" +log = "0.4.6" +# dont need it yet + causes openssl issues +# rusoto_cloudwatch = "0.43.0" +serde_json = "1.0.53" +thiserror = "1.0.20" +tokio-compat = "0.1.*" +tokio = { version = "0.2.*", features = ["sync", "rt-core", "macros", "time", "rt-threaded"] } + +[dependencies.uuid] +version = "*" +features = ["v4"] diff --git a/src/rust/metric-forwarder/Dockerfile b/src/rust/metric-forwarder/Dockerfile new file mode 100644 index 0000000000..5349b97f4a --- /dev/null +++ b/src/rust/metric-forwarder/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine AS grapl-analyzer-dispatcher +RUN apk add --no-cache libgcc +ARG release_target="debug" +COPY --from=grapl/grapl-rust-src-build /home/grapl/target/x86_64-unknown-linux-musl/${release_target}/metric-forwarder / +CMD ["./metric-forwarder"] diff --git a/src/rust/metric-forwarder/based_on_github_repo.md b/src/rust/metric-forwarder/based_on_github_repo.md new file mode 100644 index 0000000000..09d5c6c073 --- /dev/null +++ b/src/rust/metric-forwarder/based_on_github_repo.md @@ -0,0 +1,27 @@ +Pieces of this code are based on this repo: + +https://github.com/demvsystems/cloudwatch-to-slack-lambda + +As such, in following with the MIT license, I'm copy-pasting the below notice. + +MIT License + +Copyright (c) 2019 DEMV Systems + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/rust/metric-forwarder/src/deser_logs_data.rs b/src/rust/metric-forwarder/src/deser_logs_data.rs new file mode 100644 index 0000000000..1ebc99f671 --- /dev/null +++ b/src/rust/metric-forwarder/src/deser_logs_data.rs @@ -0,0 +1,41 @@ +use crate::error::MetricForwarderError; +use crate::error::MetricForwarderError::{ + DecodeBase64Error, GunzipToStringError, ParseStringToLogsdataError, PoorlyFormattedEventError, +}; +use aws_lambda_events::event::cloudwatch_logs::{CloudwatchLogsData, CloudwatchLogsEvent}; + +fn parse_string_to_logsdata(gunzipped: String) -> Result { + use serde_json::from_str; + + from_str(&gunzipped).map_err(|err| ParseStringToLogsdataError(err.to_string())) +} + +fn base64_decode_raw_log_to_gzip(data: &str) -> Result, MetricForwarderError> { + use base64::decode; + + decode(data).map_err(|err| DecodeBase64Error(err.to_string())) +} + +fn gunzip_to_string(gzipped: Vec) -> Result { + use flate2::read::GzDecoder; + use std::io::Read; + + let mut raw_data = String::new(); + match GzDecoder::new(gzipped.as_slice()).read_to_string(&mut raw_data) { + Ok(_) => Ok(raw_data), + Err(err) => Err(GunzipToStringError(err.to_string())), + } +} + +pub fn aws_event_to_cloudwatch_logs_data( + event: CloudwatchLogsEvent, +) -> Result { + if let Some(data) = event.aws_logs.data { + let result = base64_decode_raw_log_to_gzip(&data) + .and_then(gunzip_to_string) + .and_then(parse_string_to_logsdata); + result + } else { + Err(PoorlyFormattedEventError()) + } +} diff --git a/src/rust/metric-forwarder/src/error.rs b/src/rust/metric-forwarder/src/error.rs new file mode 100644 index 0000000000..f6e265c6c1 --- /dev/null +++ b/src/rust/metric-forwarder/src/error.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Error, Debug, Clone)] +pub enum MetricForwarderError { + #[error("Couldn't create CloudwatchLogsData from gunzipped json: {0}")] + ParseStringToLogsdataError(String), + #[error("Couldn't base64 decode aws log data: {0}")] + DecodeBase64Error(String), + #[error("Couldn't gunzip decoded aws log data: {0}")] + GunzipToStringError(String), + #[error("Poorly formatted CloudwatchLogEvent")] + PoorlyFormattedEventError(), +} diff --git a/src/rust/metric-forwarder/src/main.rs b/src/rust/metric-forwarder/src/main.rs new file mode 100644 index 0000000000..a80b0a920d --- /dev/null +++ b/src/rust/metric-forwarder/src/main.rs @@ -0,0 +1,73 @@ +#![type_length_limit = "1214269"] +// Our types are simply too powerful + +mod deser_logs_data; +mod error; + +use aws_lambda_events::event::cloudwatch_logs::{CloudwatchLogsData, CloudwatchLogsEvent}; +use lambda_runtime::error::HandlerError; +use lambda_runtime::lambda; +use lambda_runtime::Context; +use log::info; + +use tokio::runtime::Runtime; + +// not yet, gotta solve openssl issue +// use rusoto_cloudwatch::{CloudWatch, CloudWatchClient}; + +fn handler(event: CloudwatchLogsEvent, ctx: Context) -> Result<(), HandlerError> { + info!("Handling event"); + + let logs = deser_logs_data::aws_event_to_cloudwatch_logs_data(event); + match logs { + Ok(logs) => { + // Now we have the actual logs. + // Parse the statsd format, + // then forward them to CloudWatch + Ok(()) + } + Err(e) => Err(HandlerError::from(e.to_string().as_str())), + } +} + +/* +// TODO wimax Sep'20 - need to figure out how to do a local cloudwatch... +fn init_cloudwatch_client() -> CloudwatchClient { + info!("Connecting to local http://s3:9000"); + CloudWatchClient::new_with( + HttpClient::new().expect("failed to create request dispatcher"), + rusoto_credential::StaticProvider::new_minimal( + "minioadmin".to_owned(), + "minioadmin".to_owned(), + ), + Region::Custom { + name: "locals3".to_string(), + endpoint: "http://s3:9000".to_string(), + }, + ) +} +*/ + +#[tokio::main] +async fn main() -> Result<(), Box> { + grapl_config::init_grapl_log!(); + + let is_local = std::env::var("IS_LOCAL").is_ok(); + + if is_local { + info!("Running locally"); + + /* + loop { + if let Err(e) = local_handler().await { + error!("local_handler: {}", e); + }; + } + */ + } else { + info!("Running in AWS"); + lambda!(handler); + } + + Ok(()) +}