From e1e26746030557993ef5658ac5bdff00eb818b5a Mon Sep 17 00:00:00 2001 From: Leonardo Arcari Date: Thu, 14 Dec 2023 20:31:34 +0100 Subject: [PATCH] Refactor `MarketDataService` logic for better latency and scalability (#208) * Bump Rust dependencies * Refactor `dcapal-backend` folder structure * Refactor MarketDataService logic for better latency and scalability * Fix typo * Remove deadcode * Remove unused imports --- .github/workflows/build-test.yml | 2 +- Cargo.lock | 149 +++--- dcapal-backend/Cargo.toml | 3 +- .../docs/rest/public/import/post.md | 2 +- dcapal-backend/src/{ => app}/domain/entity.rs | 11 +- .../src/app/domain/market_data_utils.rs | 37 ++ dcapal-backend/src/app/domain/mod.rs | 2 + dcapal-backend/src/app/infra/mod.rs | 2 + dcapal-backend/src/{ => app/infra}/stats.rs | 3 +- .../src/{domain => app/infra}/utils.rs | 12 +- dcapal-backend/src/app/mod.rs | 4 + .../src/{domain => app/services}/command.rs | 5 +- .../{domain => app/services}/ip2location.rs | 0 .../src/app/services/market_data.rs | 348 +++++++++++++ .../src/{domain => app/services}/mod.rs | 2 - .../src/app/workers/market_discovery.rs | 148 ++++++ dcapal-backend/src/app/workers/mod.rs | 2 + .../src/app/workers/price_updater.rs | 85 +++ dcapal-backend/src/config.rs | 2 +- dcapal-backend/src/domain/market_data.rs | 487 ------------------ dcapal-backend/src/error.rs | 2 +- dcapal-backend/src/lib.rs | 102 ++-- dcapal-backend/src/maintenance.rs | 71 --- dcapal-backend/src/ports/inbound/mod.rs | 4 + .../src/{api => ports/inbound/rest}/mod.rs | 15 +- dcapal-backend/src/ports/mod.rs | 5 + .../src/{ => ports/outbound}/adapter/cw.rs | 6 +- .../src/{ => ports/outbound}/adapter/ipapi.rs | 0 .../{ => ports/outbound}/adapter/kraken.rs | 6 +- .../src/{ => ports/outbound}/adapter/mod.rs | 12 + .../src/{ => ports/outbound}/adapter/yahoo.rs | 2 +- dcapal-backend/src/ports/outbound/mod.rs | 5 + .../{ => ports/outbound}/repository/dto.rs | 2 +- .../outbound}/repository/market_data/mod.rs | 2 +- .../repository/market_data/redis_asset.rs | 4 +- .../repository/market_data/redis_market.rs | 6 +- .../{ => ports/outbound}/repository/mod.rs | 4 +- dcapal-optimizer-wasm/Cargo.toml | 2 +- 38 files changed, 836 insertions(+), 720 deletions(-) rename dcapal-backend/src/{ => app}/domain/entity.rs (95%) create mode 100644 dcapal-backend/src/app/domain/market_data_utils.rs create mode 100644 dcapal-backend/src/app/domain/mod.rs create mode 100644 dcapal-backend/src/app/infra/mod.rs rename dcapal-backend/src/{ => app/infra}/stats.rs (96%) rename dcapal-backend/src/{domain => app/infra}/utils.rs (92%) create mode 100644 dcapal-backend/src/app/mod.rs rename dcapal-backend/src/{domain => app/services}/command.rs (92%) rename dcapal-backend/src/{domain => app/services}/ip2location.rs (100%) create mode 100644 dcapal-backend/src/app/services/market_data.rs rename dcapal-backend/src/{domain => app/services}/mod.rs (65%) create mode 100644 dcapal-backend/src/app/workers/market_discovery.rs create mode 100644 dcapal-backend/src/app/workers/mod.rs create mode 100644 dcapal-backend/src/app/workers/price_updater.rs delete mode 100644 dcapal-backend/src/domain/market_data.rs delete mode 100644 dcapal-backend/src/maintenance.rs create mode 100644 dcapal-backend/src/ports/inbound/mod.rs rename dcapal-backend/src/{api => ports/inbound/rest}/mod.rs (89%) create mode 100644 dcapal-backend/src/ports/mod.rs rename dcapal-backend/src/{ => ports/outbound}/adapter/cw.rs (97%) rename dcapal-backend/src/{ => ports/outbound}/adapter/ipapi.rs (100%) rename dcapal-backend/src/{ => ports/outbound}/adapter/kraken.rs (98%) rename dcapal-backend/src/{ => ports/outbound}/adapter/mod.rs (58%) rename dcapal-backend/src/{ => ports/outbound}/adapter/yahoo.rs (99%) create mode 100644 dcapal-backend/src/ports/outbound/mod.rs rename dcapal-backend/src/{ => ports/outbound}/repository/dto.rs (89%) rename dcapal-backend/src/{ => ports/outbound}/repository/market_data/mod.rs (96%) rename dcapal-backend/src/{ => ports/outbound}/repository/market_data/redis_asset.rs (96%) rename dcapal-backend/src/{ => ports/outbound}/repository/market_data/redis_market.rs (97%) rename dcapal-backend/src/{ => ports/outbound}/repository/mod.rs (96%) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 385416fc..e534ea53 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -30,7 +30,7 @@ jobs: - name: Lint run: | cd dcapal-backend - cargo fmt -- --check + cargo fmt --all -- --check cargo clippy -- -D warnings - name: Build and Test diff --git a/Cargo.lock b/Cargo.lock index e1af045f..4774371a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,9 +68,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6" dependencies = [ "anstyle", "anstyle-parse", @@ -88,30 +88,30 @@ checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -321,9 +321,9 @@ dependencies = [ [[package]] name = "borsh" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf617fabf5cdbdc92f774bfe5062d870f228b80056d41180797abf48bed4056e" +checksum = "9897ef0f1bd2362169de6d7e436ea2237dc1085d7d1e4db75f4be34d86f309d1" dependencies = [ "borsh-derive", "cfg_aliases", @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "borsh-derive" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f404657a7ea7b5249e36808dff544bc88a28f26e0ac40009f674b7a009d14be3" +checksum = "478b41ff04256c5c8330f3dfdaaae2a5cc976a8e75088bafa4625b0d0208de8c" dependencies = [ "once_cell", "proc-macro-crate", @@ -421,9 +421,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.10" +version = "4.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272" +checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" dependencies = [ "clap_builder", "clap_derive", @@ -431,9 +431,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.9" +version = "4.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1" +checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" dependencies = [ "anstream", "anstyle", @@ -639,19 +639,6 @@ dependencies = [ "syn 2.0.39", ] -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown 0.14.3", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "dcapal-backend" version = "0.5.0" @@ -662,7 +649,6 @@ dependencies = [ "chrono", "config", "const_format", - "dashmap", "deadpool-redis", "failsafe", "futures", @@ -755,9 +741,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" dependencies = [ "powerfmt", "serde", @@ -1159,9 +1145,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", "http 0.2.11", @@ -1221,7 +1207,7 @@ dependencies = [ "futures-util", "h2 0.3.22", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1396,9 +1382,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" @@ -1636,9 +1622,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -1818,15 +1804,15 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.60" +version = "0.10.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800" +checksum = "6b8419dc8cc6d866deb801274bba2e6f8f6108c1bb7fcc10ee5ab864931dbb45" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -1856,9 +1842,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.96" +version = "0.9.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f" +checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", @@ -2002,9 +1988,9 @@ checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" [[package]] name = "portable-atomic" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bccab0e7fd7cc19f820a1c8c91720af652d0c88dc9664dd72aef2614f04af3b" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] name = "powerfmt" @@ -2020,10 +2006,11 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-crate" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8366a6159044a37876a2b9817124296703c586a5c92e2c53751fa06d8d43e8" +checksum = "97dc5fea232fc28d2f597b37c4876b348a40e33f3b02cc975c8d006d78d94b1a" dependencies = [ + "toml_datetime", "toml_edit", ] @@ -2252,7 +2239,7 @@ dependencies = [ "futures-util", "h2 0.3.22", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "hyper 0.14.27", "hyper-tls", "ipnet", @@ -2361,9 +2348,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.26" +version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.1", "errno", @@ -2380,9 +2367,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "schannel" @@ -2445,9 +2432,9 @@ dependencies = [ [[package]] name = "serde-wasm-bindgen" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ba92964781421b6cef36bf0d7da26d201e96d84e1b10e7ae6ed416e516906d" +checksum = "67d27afff48127b3edfe6ad379a2bd0795d2cfa56730ca2745e5b89386126404" dependencies = [ "js-sys", "serde", @@ -2747,9 +2734,19 @@ dependencies = [ [[package]] name = "test-log" -version = "0.2.13" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6159ab4116165c99fc88cce31f99fa2c9dbe08d3691cb38da02fc3b45f357d2b" +dependencies = [ + "env_logger", + "test-log-macros", +] + +[[package]] +name = "test-log-macros" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66edd6b6cd810743c0c71e1d085e92b01ce6a72782032e3f794c8284fe4bcdd" +checksum = "7ba277e77219e9eea169e8508942db1bf5d8a41ff2db9b20aab5a5aadc9fa25d" dependencies = [ "proc-macro2", "quote", @@ -2832,9 +2829,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" dependencies = [ "backtrace", "bytes", @@ -2895,15 +2892,15 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.5" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" [[package]] name = "toml_edit" -version = "0.20.7" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" +checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" dependencies = [ "indexmap 2.1.0", "toml_datetime", @@ -3045,9 +3042,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" @@ -3063,9 +3060,9 @@ checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -3435,9 +3432,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.19" +version = "0.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "829846f3e3db426d4cee4510841b71a8e58aa2a76b1132579487ae430ccd9c7b" +checksum = "b67b5f0a4e7a27a64c651977932b9dc5667ca7fc31ac44b03ed37a0cf42fdfff" dependencies = [ "memchr", ] @@ -3472,18 +3469,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.28" +version = "0.7.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d6f15f7ade05d2a4935e34a457b936c23dc70a05cc1d97133dc99e7a3fe0f0e" +checksum = "306dca4455518f1f31635ec308b6b3e4eb1b11758cefafc782827d0aa7acb5c7" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.28" +version = "0.7.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b" +checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" dependencies = [ "proc-macro2", "quote", diff --git a/dcapal-backend/Cargo.toml b/dcapal-backend/Cargo.toml index 317d8af9..f61dd529 100644 --- a/dcapal-backend/Cargo.toml +++ b/dcapal-backend/Cargo.toml @@ -18,7 +18,7 @@ reqwest = { version = "0.11.22", features = ["gzip", "json"] } serde = "1.0.193" serde_json = "1.0.108" thiserror = "1.0.50" -tokio = { version = "1.33.0", features = ["full"] } +tokio = { version = "1.35.0", features = ["full"] } tower = "0.4.13" tower-http = { version = "0.5.0", features = ["trace"] } tracing = "0.1.40" @@ -32,7 +32,6 @@ strum_macros = "0.25.3" config = "0.13.4" lazy_static = "1.4.0" parking_lot = "0.12.1" -dashmap = "5.5.3" metrics = "0.21.1" metrics-exporter-prometheus = "0.12.1" tracing-appender = "0.2.3" diff --git a/dcapal-backend/docs/rest/public/import/post.md b/dcapal-backend/docs/rest/public/import/post.md index 39624311..154046e3 100644 --- a/dcapal-backend/docs/rest/public/import/post.md +++ b/dcapal-backend/docs/rest/public/import/post.md @@ -1,6 +1,6 @@ # Import portfolio -Temporarely upload DcaPal portfolio definition to later import it in DcaPal client +Temporarily upload DcaPal portfolio definition to later import it in DcaPal client **URL** : `/import/portfolio` diff --git a/dcapal-backend/src/domain/entity.rs b/dcapal-backend/src/app/domain/entity.rs similarity index 95% rename from dcapal-backend/src/domain/entity.rs rename to dcapal-backend/src/app/domain/entity.rs index cc1dc342..899e0fa8 100644 --- a/dcapal-backend/src/domain/entity.rs +++ b/dcapal-backend/src/app/domain/entity.rs @@ -1,10 +1,9 @@ use chrono::{Duration, Timelike, Utc}; use serde::{Deserialize, Serialize}; +use crate::app::infra::utils::Expiring; use crate::DateTime; -use super::utils::Expiring; - pub type AssetId = String; pub type MarketId = String; @@ -164,13 +163,13 @@ pub struct Market { } impl Market { - pub fn new(id: MarketId, base: Asset, quote: Asset) -> Self { + pub fn new(id: MarketId, base: Asset, quote: Asset, price: Option) -> Self { Self { id, pair: format!("{}/{}", base.id().to_uppercase(), quote.id().to_uppercase()), base, quote, - price: None, + price, } } @@ -178,8 +177,8 @@ impl Market { &self.price } - pub fn set_price(&mut self, price: f64, ts: DateTime) { - self.price.replace(Price::new(price, ts)); + pub fn set_price(&mut self, price: Price) { + self.price.replace(price); } pub fn is_fiat(&self) -> bool { diff --git a/dcapal-backend/src/app/domain/market_data_utils.rs b/dcapal-backend/src/app/domain/market_data_utils.rs new file mode 100644 index 00000000..05000b67 --- /dev/null +++ b/dcapal-backend/src/app/domain/market_data_utils.rs @@ -0,0 +1,37 @@ +use chrono::Utc; +use tracing::{error, warn}; + +use crate::{config::PriceProvider, ports::outbound::adapter::PriceProviders}; + +use super::entity::{Market, Price}; + +pub async fn fetch_market_price( + market: &Market, + providers: &PriceProviders, + provider: PriceProvider, +) -> Option { + let now = Utc::now(); + let price = match provider { + PriceProvider::CryptoWatch => providers.cw.fetch_market_price(market, now).await, + PriceProvider::Kraken => providers.kraken.fetch_market_price(market, now).await, + PriceProvider::Yahoo => providers.yahoo.fetch_market_price(market, now).await, + }; + + match price { + Ok(Some(px)) => Some(Price::new(px, now)), + Ok(None) => { + warn!( + "Cannot fetch {} price for any frequency (ts={now})", + market.id + ); + None + } + Err(e) => { + error!( + "Cannot fetch {} price for any frequency (ts={now}): {e:?}", + market.id + ); + None + } + } +} diff --git a/dcapal-backend/src/app/domain/mod.rs b/dcapal-backend/src/app/domain/mod.rs new file mode 100644 index 00000000..ea79fcdf --- /dev/null +++ b/dcapal-backend/src/app/domain/mod.rs @@ -0,0 +1,2 @@ +pub mod entity; +pub mod market_data_utils; diff --git a/dcapal-backend/src/app/infra/mod.rs b/dcapal-backend/src/app/infra/mod.rs new file mode 100644 index 00000000..24f205c9 --- /dev/null +++ b/dcapal-backend/src/app/infra/mod.rs @@ -0,0 +1,2 @@ +pub mod stats; +pub mod utils; diff --git a/dcapal-backend/src/stats.rs b/dcapal-backend/src/app/infra/stats.rs similarity index 96% rename from dcapal-backend/src/stats.rs rename to dcapal-backend/src/app/infra/stats.rs index 64b1304a..cbb14157 100644 --- a/dcapal-backend/src/stats.rs +++ b/dcapal-backend/src/app/infra/stats.rs @@ -14,7 +14,8 @@ use metrics::{histogram, increment_counter}; use tracing::{info, log::error}; use crate::{ - domain::ip2location::Ip2LocationService, error::Result, repository::StatsRepository, AppContext, + app::services::ip2location::Ip2LocationService, error::Result, + ports::outbound::repository::StatsRepository, AppContext, }; const BASE: &str = "dcapal"; diff --git a/dcapal-backend/src/domain/utils.rs b/dcapal-backend/src/app/infra/utils.rs similarity index 92% rename from dcapal-backend/src/domain/utils.rs rename to dcapal-backend/src/app/infra/utils.rs index f994f6e6..ba29fb49 100644 --- a/dcapal-backend/src/domain/utils.rs +++ b/dcapal-backend/src/app/infra/utils.rs @@ -4,7 +4,7 @@ use std::{ }; use futures::Future; -use tokio::sync::{OnceCell, RwLock}; +use tokio::sync::{watch, OnceCell, RwLock}; pub struct ExpiringOnceCellValue { pub value: T, @@ -129,3 +129,13 @@ impl Expiring for ExpiringOption { } } } + +pub type StopToken = watch::Receiver; + +pub async fn should_stop(stop_rx: &mut StopToken) { + while stop_rx.changed().await.is_ok() { + if *stop_rx.borrow_and_update() { + return; + } + } +} diff --git a/dcapal-backend/src/app/mod.rs b/dcapal-backend/src/app/mod.rs new file mode 100644 index 00000000..c63ae601 --- /dev/null +++ b/dcapal-backend/src/app/mod.rs @@ -0,0 +1,4 @@ +pub mod domain; +pub mod infra; +pub mod services; +pub mod workers; diff --git a/dcapal-backend/src/domain/command.rs b/dcapal-backend/src/app/services/command.rs similarity index 92% rename from dcapal-backend/src/domain/command.rs rename to dcapal-backend/src/app/services/command.rs index d9a6aff6..08baef93 100644 --- a/dcapal-backend/src/domain/command.rs +++ b/dcapal-backend/src/app/services/command.rs @@ -1,12 +1,11 @@ use jsonschema::JSONSchema; use crate::{ + app::domain::entity::{Asset, AssetId}, error::{DcaError, Result}, - repository::market_data::MarketDataRepository, + ports::outbound::repository::market_data::MarketDataRepository, }; -use super::entity::{Asset, AssetId}; - pub struct ConversionRateQuery { pub base: Asset, pub quote: Asset, diff --git a/dcapal-backend/src/domain/ip2location.rs b/dcapal-backend/src/app/services/ip2location.rs similarity index 100% rename from dcapal-backend/src/domain/ip2location.rs rename to dcapal-backend/src/app/services/ip2location.rs diff --git a/dcapal-backend/src/app/services/market_data.rs b/dcapal-backend/src/app/services/market_data.rs new file mode 100644 index 00000000..8127eea8 --- /dev/null +++ b/dcapal-backend/src/app/services/market_data.rs @@ -0,0 +1,348 @@ +use std::{collections::HashMap, sync::Arc}; + +use chrono::Utc; +use parking_lot::RwLock; +use tracing::{error, info, warn}; + +use crate::{ + app::{ + domain::entity::{Asset, AssetId, AssetKind, Market, MarketId, Price}, + services::command::ConversionRateQuery, + }, + error::{DcaError, Result}, + ports::outbound::repository::market_data::MarketDataRepository, +}; + +pub struct MarketDataService { + repo: Arc, + markets: RwLock>>, + pricers: RwLock>>, + price_deps: RwLock>>, + assets_cache: RwLock, +} + +impl MarketDataService { + pub fn new(repo: Arc) -> Self { + let markets = RwLock::new(HashMap::new()); + let pricers = RwLock::new(HashMap::new()); + let assets_cache = RwLock::new(AssetsCache::new()); + let price_deps = RwLock::new(HashMap::new()); + + Self { + repo, + markets, + pricers, + price_deps, + assets_cache, + } + } + + pub async fn get_assets_by_type(&self, kind: AssetKind) -> Arc> { + { + let cache = self.assets_cache.read(); + match kind { + AssetKind::Crypto => { + if let Some(assets) = &cache.crypto { + return assets.clone(); + } + } + AssetKind::Fiat => { + if let Some(assets) = &cache.fiats { + return assets.clone(); + } + } + } + } + + let assets = self + .repo + .load_assets_by_type(kind) + .await + .unwrap_or_else(|e| { + error!("{:?}", e); + vec![] + }); + + let mut cache = self.assets_cache.write(); + + match kind { + AssetKind::Crypto => { + cache.crypto = Some(Arc::new(assets)); + cache.crypto.as_ref().unwrap().clone() + } + AssetKind::Fiat => { + cache.fiats = Some(Arc::new(assets)); + cache.fiats.as_ref().unwrap().clone() + } + } + } + + pub fn invalidate_asset_cache(&self) { + let mut cache = self.assets_cache.write(); + cache.crypto = None; + cache.fiats = None; + } + + /// Lookup a [`Market`] by [`MarketId`] + pub async fn get_market(&self, id: &MarketId) -> Result>> { + { + let markets = self.markets.read(); + if let Some(market) = markets.get(id) { + return Ok(Some(market.clone())); + } + } + + let market = match self.load_market(id).await { + Ok(m) => m, + Err(e) => { + error!("{:?}", e); + None + } + }; + + let Some(market) = market else { + return Ok(None); + }; + + let mut markets = self.markets.write(); + markets.insert(id.clone(), market.clone()); + Ok(Some(market)) + } + + async fn load_market(&self, id: &MarketId) -> Result>> { + let mkt = self.repo.find_market(id).await.map_err(|e| { + error!(mkt = id, "Error occured in loading market: {}", e); + DcaError::MarketNotFound(id.clone()) + })?; + + match mkt { + Some(mkt) => Ok(Some(Arc::new(mkt))), + None => { + info!("Cannot find market '{}'", id); + Ok(None) + } + } + } + + pub fn set_price(&self, id: &MarketId, price: Price) -> bool { + // Get a Market copy + let mut updated = { + let markets = self.markets.read(); + let Some(market) = markets.get(id) else { + return false; + }; + market.as_ref().clone() + }; + + // Update Market price + { + let mut markets = self.markets.write(); + updated.set_price(price); + markets.insert(id.clone(), Arc::new(updated)); + } + + // Invalidate dependent syntetic rates + let mut pricers = self.pricers.write(); + let mut price_deps = self.price_deps.write(); + if let Some(deps) = price_deps.get(id) { + for pair in deps { + pricers.remove(pair); + } + } + + // Clear dependency list + price_deps.remove(id); + + true + } + + pub async fn get_conversion_rate(&self, cmd: ConversionRateQuery) -> Result> { + let (base, quote) = (cmd.base.id(), cmd.quote.id()); + let pair = (base.clone(), quote.clone()); + + { + let pricers = self.pricers.read(); + if let Some(price) = pricers.get(&pair) { + return Ok(*price); + } + } + + if let Some((price, deps)) = self.compute_conversion_rate(base, quote).await? { + { + // Track market dependencies to this syntetic rate + let mut price_deps = self.price_deps.write(); + for dep in deps { + price_deps.entry(dep).or_default().push(pair.clone()); + } + } + + // Update cached rate + let mut pricers = self.pricers.write(); + pricers.insert(pair, Some(price)); + Ok(Some(price)) + } else { + Ok(None) + } + } + + async fn compute_conversion_rate( + &self, + base: &AssetId, + quote: &AssetId, + ) -> Result)>> { + // Base/base => 1. + if base == quote { + return Ok(Some((Price::new(1., Utc::now()), vec![]))); + } + + let base = normalized_asset(base); + let quote = normalized_asset(quote); + + // Find base/quote market + let id = format!("{}{}", base, quote); + let mkt = self.get_market(&id).await?; + if let Some(m) = mkt { + if let Some(px) = m.price() { + info!("Computed conversion rate for market {}", m.id); + return Ok(Some((*px, vec![id]))); + } + } + + // Find quote/base market + let id = format!("{}{}", quote, base); + let mkt = self.get_market(&id).await?; + if let Some(m) = mkt { + if let Some(px) = m.price() { + let rate = Price::new(1. / px.price, px.ts); + info!("Computed conversion rate for market {}", m.id); + return Ok(Some((rate, vec![id]))); + } + } + + // Find alternative markets + let Some((base_usd_id, base_usd_px)) = self.get_base_usd_price(&base, "e).await? else { + return Ok(None); + }; + + let base_quote_id = format!("{}{}", base, quote); + let usd_quote_id = format!("{}{}", "usd", quote); + let quote_usd_id = format!("{}{}", quote, "usd"); + + let usd_quote = self.get_market(&usd_quote_id).await?; + if let Some(usd_quote) = usd_quote { + if let Some(usd_quote_px) = usd_quote.price() { + let price = base_usd_px.price * usd_quote_px.price; + let ts = std::cmp::min(base_usd_px.ts, usd_quote_px.ts); + let rate = Price::new(price, ts); + info!( + "Computed conversion rate for market {} triangulating between markets", + base_quote_id + ); + return Ok(Some((rate, vec![base_usd_id, usd_quote_id]))); + } + } + + let quote_usd = self.get_market("e_usd_id).await?; + if let Some(quote_usd) = quote_usd { + if let Some(quote_usd_px) = quote_usd.price() { + let price = base_usd_px.price / quote_usd_px.price; + let ts = std::cmp::min(base_usd_px.ts, quote_usd_px.ts); + let rate = Price::new(price, ts); + info!( + "Computed conversion rate for market {} triangulating between markets", + base_quote_id + ); + return Ok(Some((rate, vec![base_quote_id, usd_quote_id]))); + } + } + + warn!( + base = base, + quote = quote, + "Price not available for markets '{}' and '{}'", + usd_quote_id, + quote_usd_id + ); + + Ok(None) + } + + async fn get_base_usd_price( + &self, + base: &AssetId, + quote: &AssetId, + ) -> Result> { + let base_usd_id = format!("{}{}", base, "usd"); + let usd_base_id = format!("{}{}", "usd", base); + + let base_usd = self.get_market(&base_usd_id).await?; + if let Some(ref m) = base_usd { + if let Some(px) = m.price() { + return Ok(Some((base_usd_id, *px))); + } + } + + let usd_base = self.get_market(&usd_base_id).await?; + if let Some(ref m) = usd_base { + if let Some(px) = m.price() { + return Ok(Some(( + usd_base_id, + Price { + price: 1. / px.price, + ts: px.ts, + }, + ))); + } + } + + match (base_usd, usd_base) { + (None, None) => { + warn!( + base = base, + quote = quote, + "Cannot find any of markets: '{}', '{}'", + base_usd_id, + usd_base_id + ); + Ok(None) + } + (_, _) => { + warn!( + base = base, + quote = quote, + "Price not available for any of markets: '{}', '{}'", + base_usd_id, + usd_base_id + ); + Ok(None) + } + } + } +} + +fn normalized_asset(id: &AssetId) -> AssetId { + lazy_static::lazy_static! { + static ref NORMALIZED: HashMap<&'static str, &'static str> = { + [("eth2", "eth"), ("eth2.s", "eth")].into_iter().collect() + }; + } + + NORMALIZED + .get(id.as_str()) + .unwrap_or(&id.as_str()) + .to_string() +} + +struct AssetsCache { + fiats: Option>>, + crypto: Option>>, +} + +impl AssetsCache { + fn new() -> Self { + Self { + fiats: None, + crypto: None, + } + } +} diff --git a/dcapal-backend/src/domain/mod.rs b/dcapal-backend/src/app/services/mod.rs similarity index 65% rename from dcapal-backend/src/domain/mod.rs rename to dcapal-backend/src/app/services/mod.rs index 034282f0..05b825e6 100644 --- a/dcapal-backend/src/domain/mod.rs +++ b/dcapal-backend/src/app/services/mod.rs @@ -1,5 +1,3 @@ pub mod command; -pub mod entity; pub mod ip2location; pub mod market_data; -pub mod utils; diff --git a/dcapal-backend/src/app/workers/market_discovery.rs b/dcapal-backend/src/app/workers/market_discovery.rs new file mode 100644 index 00000000..4f01a040 --- /dev/null +++ b/dcapal-backend/src/app/workers/market_discovery.rs @@ -0,0 +1,148 @@ +use chrono::{TimeZone, Utc}; +use std::{sync::Arc, time::Duration}; +use tracing::{debug, error, info}; + +use crate::{ + app::{ + domain::market_data_utils::fetch_market_price, + infra::utils::{should_stop, StopToken}, + services::market_data::MarketDataService, + }, + config::PriceProvider, + error::Result, + ports::outbound::{ + adapter::PriceProviders, + repository::{market_data::MarketDataRepository, MiscRepository}, + }, + AppContext, DateTime, +}; + +/// Worker to periodically discover new crypto assets and markets. As of today, new markets are +/// checked every 24 hours. +pub struct MarketDiscoveryWorker { + market_data_service: Arc, + misc_repo: Arc, + market_data_repo: Arc, + price_provider: PriceProvider, + providers: Arc, +} + +impl MarketDiscoveryWorker { + pub fn new(ctx: &AppContext) -> Self { + let market_data_service = ctx.services.mkt_data.clone(); + let misc_repo = ctx.repos.misc.clone(); + let market_data_repo = ctx.repos.mkt_data.clone(); + let price_provider = ctx.config.app.providers.price_provider; + let providers = ctx.providers.clone(); + + Self { + market_data_service, + misc_repo, + market_data_repo, + price_provider, + providers, + } + } + + pub async fn run(&self, mut stop_token: StopToken) { + let mut sleep = tokio::time::sleep(Duration::from_millis(50)); + + loop { + tokio::select! { + _ = sleep => {} + _ = should_stop(&mut stop_token) => break, + } + + // Reset next check timeout + sleep = tokio::time::sleep(Duration::from_secs(60)); + + let res = is_outdated(&self.misc_repo).await; + if let Err(e) = res { + error!("Failed to fetch last update time: {:?}", e); + continue; + } + + let (is_outdated, last_fetched_ts) = res.unwrap(); + if !is_outdated { + debug!( + "Kraken assets already fetched today ({})", + last_fetched_ts.map(|t| t.to_string()).unwrap_or_default() + ); + continue; + } + + if let Err(e) = self.discover_new_markets().await { + error!("Failed to update Kraken Assets and Markets data: {:?}", e); + } + + let now = Utc::now(); + if let Err(e) = self.misc_repo.set_cw_last_fetched(now).await { + error!("Failed to update last update time: {:?}", e); + } + + // Reset next check timeout + sleep = tokio::time::sleep(Duration::from_secs(60)); + } + } + + async fn discover_new_markets(&self) -> Result<()> { + // Collect assets and markets from Kraken + let (assets, markets) = self + .providers + .kraken + .fetch_assets(&self.market_data_repo) + .await?; + + // Store assets in repository + for a in assets { + info!("Storing asset '{}'", a.id()); + self.market_data_repo + .store_asset(&a) + .await + .unwrap_or_else(|e| { + error!( + "Failed to store asset '{}': {} ({})", + a.id(), + e, + serde_json::to_string(&a).unwrap() + ); + }) + } + + // Store markets in repository + for mut m in markets { + info!("Fetching price for market '{}'", m.id); + let Some(price) = fetch_market_price(&m, &self.providers, self.price_provider).await + else { + continue; + }; + + m.set_price(price); + + info!("Storing market '{}'", m.id); + if let Err(e) = self.market_data_repo.store_market(&m).await { + error!( + "Failed to store market '{}': {} ({})", + m.id, + e, + serde_json::to_string(&m).unwrap() + ); + } + } + + self.market_data_service.invalidate_asset_cache(); + + Ok(()) + } +} + +async fn is_outdated(misc: &MiscRepository) -> Result<(bool, Option)> { + let last_fetched = misc.get_cw_last_fetched().await?; + if let Some(ts) = last_fetched { + let ts_day = Utc.from_utc_datetime(&ts.naive_utc()).date_naive(); + let today = Utc::now().date_naive(); + return Ok((ts_day < today, Some(ts))); + } + + Ok((true, None)) +} diff --git a/dcapal-backend/src/app/workers/mod.rs b/dcapal-backend/src/app/workers/mod.rs new file mode 100644 index 00000000..057ba5e6 --- /dev/null +++ b/dcapal-backend/src/app/workers/mod.rs @@ -0,0 +1,2 @@ +pub mod market_discovery; +pub mod price_updater; diff --git a/dcapal-backend/src/app/workers/price_updater.rs b/dcapal-backend/src/app/workers/price_updater.rs new file mode 100644 index 00000000..c7417b34 --- /dev/null +++ b/dcapal-backend/src/app/workers/price_updater.rs @@ -0,0 +1,85 @@ +use std::{sync::Arc, time::Duration}; + +use chrono::Utc; +use tracing::{error, info, warn}; + +use crate::{ + app::{ + domain::market_data_utils::fetch_market_price, + infra::utils::{should_stop, StopToken}, + services::market_data::MarketDataService, + }, + config::PriceProvider, + error::Result, + ports::outbound::{adapter::PriceProviders, repository::market_data::MarketDataRepository}, + AppContext, +}; + +/// Worker periodically updating market prices. As of today, prices are refreshed every 5 minutes. +pub struct PriceUpdaterWorker { + period: Duration, + market_data_service: Arc, + market_data_repo: Arc, + price_provider: PriceProvider, + providers: Arc, +} + +impl PriceUpdaterWorker { + pub fn new(ctx: &AppContext, period: Duration) -> Self { + let market_data_service = ctx.services.mkt_data.clone(); + let market_data_repo = ctx.repos.mkt_data.clone(); + let price_provider = ctx.config.app.providers.price_provider; + let providers = ctx.providers.clone(); + + Self { + period, + market_data_service, + market_data_repo, + price_provider, + providers, + } + } + + pub async fn run(&self, mut stop_token: StopToken) { + let mut sleep = tokio::time::sleep(Duration::from_millis(50)); + loop { + tokio::select! { + _ = sleep => {} + _ = should_stop(&mut stop_token) => break, + } + + if let Err(e) = self.update_prices().await { + error!("Error occurred while updating prices: {e:?}"); + } + + sleep = tokio::time::sleep(self.period); + let next = Utc::now() + chrono::Duration::from_std(self.period).unwrap(); + info!("Next PriceUpdaterWorker execution: {next}"); + } + } + + async fn update_prices(&self) -> Result<()> { + // Get all known markets + let markets = self.market_data_repo.load_markets().await?; + + for mut m in markets { + let Some(price) = fetch_market_price(&m, &self.providers, self.price_provider).await + else { + warn!("Failed to fetch price update for market {}", m.id); + continue; + }; + + m.set_price(price); + if let Err(e) = self.market_data_repo.update_mkt_price(&m).await { + error!("Failed to store market price update {m:?}: {e:?}"); + } + + self.market_data_service.set_price(&m.id, price); + + // Please the rate limiter + tokio::time::sleep(Duration::from_millis(100)).await; + } + + Ok(()) + } +} diff --git a/dcapal-backend/src/config.rs b/dcapal-backend/src/config.rs index cdfb00d2..93c7cd68 100644 --- a/dcapal-backend/src/config.rs +++ b/dcapal-backend/src/config.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::error::Result; -#[derive(Clone, Debug, Serialize, Deserialize, strum_macros::Display)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, strum_macros::Display)] #[serde(rename_all = "lowercase")] pub enum PriceProvider { CryptoWatch, diff --git a/dcapal-backend/src/domain/market_data.rs b/dcapal-backend/src/domain/market_data.rs deleted file mode 100644 index ec62153b..00000000 --- a/dcapal-backend/src/domain/market_data.rs +++ /dev/null @@ -1,487 +0,0 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; - -use chrono::Utc; -use dashmap::DashMap; -use parking_lot::RwLock; -use tracing::{error, info, warn}; - -use crate::{ - config::Config, - error::{DcaError, Result}, - repository::market_data::MarketDataRepository, - Provider, -}; - -use super::{ - command::ConversionRateQuery, - entity::{Asset, AssetId, AssetKind, Market, MarketId, Price}, - utils::{Expiring, ExpiringOnceCell, ExpiringOption}, -}; - -pub struct MarketDataService { - config: Arc, - repo: Arc, - providers: Arc, - mkt_loaders: DashMap>>>>, - pricers: DashMap<(AssetId, AssetId), Arc>>>, - assets_cache: RwLock, -} - -impl MarketDataService { - const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60); - - pub fn new( - config: Arc, - repo: Arc, - providers: Arc, - ) -> Self { - let mkt_loaders = DashMap::new(); - let pricers = DashMap::new(); - let assets_cache = RwLock::new(AssetsCache::new()); - - Self { - config, - repo, - providers, - mkt_loaders, - pricers, - assets_cache, - } - } - - pub async fn get_assets_by_type(&self, kind: AssetKind) -> Arc> { - { - let cache = self.assets_cache.read(); - match kind { - AssetKind::Crypto => { - if let Some(assets) = &cache.crypto { - return assets.clone(); - } - } - AssetKind::Fiat => { - if let Some(assets) = &cache.fiats { - return assets.clone(); - } - } - } - } - - let assets = self - .repo - .load_assets_by_type(kind) - .await - .unwrap_or_else(|e| { - error!("{:?}", e); - vec![] - }); - - let mut cache = self.assets_cache.write(); - - match kind { - AssetKind::Crypto => { - cache.crypto = Some(Arc::new(assets)); - cache.crypto.as_ref().unwrap().clone() - } - AssetKind::Fiat => { - cache.fiats = Some(Arc::new(assets)); - cache.fiats.as_ref().unwrap().clone() - } - } - } - - fn invalidate_asset_cache(&self) { - let mut cache = self.assets_cache.write(); - cache.crypto = None; - cache.fiats = None; - } - - pub async fn get_market(&self, id: MarketId) -> Result>> { - let loader = self - .mkt_loaders - .entry(id.clone()) - .or_insert_with(|| { - Arc::new(ExpiringOnceCell::new(|market: &Option>| { - if let Some(ref m) = market { - m.is_price_outdated() - } else { - false - } - })) - }) - .clone(); - - let market = loader - .get_or_try_init(|| async { self.load_market(&id).await }) - .await; - - if market.is_ok() { - return market; - } - - let e = market.unwrap_err(); - error!("{:?}", e); - if matches!(e, DcaError::PriceNotAvailableId(_)) { - Err(e) - } else { - Ok(None) - } - } - - async fn load_market(&self, id: &MarketId) -> Result>> { - let mkt = self.repo.find_market(id).await.map_err(|e| { - error!(mkt = id, "Error occured in loading market: {}", e); - DcaError::MarketNotFound(id.clone()) - })?; - - if mkt.is_none() { - info!("Cannot find market '{}'", id); - return Ok(None); - } - - let mkt = self - .refresh_mkt_price(mkt.unwrap()) - .await - .map(Arc::new) - .map_err(|e| { - error!(mkt = id, "Error occured in fetching price: {:?}", e); - DcaError::PriceNotAvailableId(id.clone()) - })?; - - if let Err(e) = self.repo.update_mkt_price(&mkt).await { - error!(mkt = id, "Failed to update price: {}", e); - } - - Ok(Some(mkt)) - } - - async fn refresh_mkt_price(&self, mut mkt: Market) -> Result { - use crate::config::PriceProvider; - - info!( - mkt = mkt.id, - "Fetching price from {}", self.config.app.providers.price_provider - ); - - let now = Utc::now(); - let price = match self.config.app.providers.price_provider { - PriceProvider::CryptoWatch => self.providers.cw.fetch_market_price(&mkt, now).await?, - PriceProvider::Kraken => self.providers.kraken.fetch_market_price(&mkt, now).await?, - PriceProvider::Yahoo => self.providers.yahoo.fetch_market_price(&mkt, now).await?, - }; - if let Some(px) = price { - mkt.set_price(px, now); - Ok(mkt) - } else { - error!( - mkt = mkt.id, - "Cannot fetch price for any frequency (ts={})", now - ); - Ok(mkt) - } - } - - pub async fn get_conversion_rate(&self, cmd: ConversionRateQuery) -> Result> { - let (base, quote) = (cmd.base.id(), cmd.quote.id()); - let pair = (base.clone(), quote.clone()); - - let pricer = self - .pricers - .entry(pair.clone()) - .or_insert_with(|| { - Arc::new(ExpiringOnceCell::new(|p: &ExpiringOption| { - p.is_outdated() - })) - }) - .clone(); - - let price = pricer - .get_or_try_init(|| async { self.compute_conversion_rate(base, quote).await }) - .await; - - let Err(e) = price else { - return price.map(|p| p.into()); - }; - - if let DcaError::PriceNotAvailableId(ref id) = e { - if let Some(p) = pricer.get().await { - if p.is_expired { - warn!("Serving outdated price for Market '{id}'"); - } - - return Ok(p.value.into()); - } - } - - Err(e) - } - - async fn compute_conversion_rate( - &self, - base: &AssetId, - quote: &AssetId, - ) -> Result> { - // Base/base => 1. - if base == quote { - return Ok(ExpiringOption::Some(Price::new(1., Utc::now()))); - } - - let base = normalized_asset(base); - let quote = normalized_asset(quote); - - // Find base/quote market - let id = format!("{}{}", base, quote); - let mkt = self.get_market(id).await?; - if let Some(m) = mkt { - if let Some(px) = m.price() { - let rate = ExpiringOption::Some(*px); - info!( - "Computed conversion rate for market {}. Expiring at {}", - m.id, - Utc::now() + rate.time_to_live_chrono() - ); - return Ok(rate); - } - } - - // Find quote/base market - let id = format!("{}{}", quote, base); - let mkt = self.get_market(id).await?; - if let Some(m) = mkt { - if let Some(px) = m.price() { - let rate = ExpiringOption::Some(Price::new(1. / px.price, px.ts)); - info!( - "Computed conversion rate for market {}. Expiring at {}", - m.id, - Utc::now() + rate.time_to_live_chrono() - ); - return Ok(rate); - } - } - - // Find alternative markets - let Some(base_usd_px) = self.get_base_usd_price(&base, "e).await? else { - return Ok(ExpiringOption::None(Instant::now(), Self::DEFAULT_TTL)); - }; - - let base_quote_id = format!("{}{}", base, quote); - let usd_quote_id = format!("{}{}", "usd", quote); - let quote_usd_id = format!("{}{}", quote, "usd"); - - let usd_quote = self.get_market(usd_quote_id.clone()).await?; - if let Some(usd_quote) = usd_quote { - if let Some(usd_quote_px) = usd_quote.price() { - let price = base_usd_px.price * usd_quote_px.price; - let ts = std::cmp::min(base_usd_px.ts, usd_quote_px.ts); - let rate = ExpiringOption::Some(Price::new(price, ts)); - info!( - "Computed conversion rate for market {} triangulating between markets. Expiring at {} (min of {} and {})", - base_quote_id, - Utc::now() + rate.time_to_live_chrono(), - base_usd_px.ts, - usd_quote_px.ts - ); - return Ok(rate); - } - } - - let quote_usd = self.get_market(quote_usd_id.clone()).await?; - if let Some(quote_usd) = quote_usd { - if let Some(quote_usd_px) = quote_usd.price() { - let price = base_usd_px.price / quote_usd_px.price; - let ts = std::cmp::min(base_usd_px.ts, quote_usd_px.ts); - let rate = ExpiringOption::Some(Price::new(price, ts)); - info!( - "Computed conversion rate for market {} triangulating between markets. Expiring at {} (min of {} and {})", - base_quote_id, - Utc::now() + rate.time_to_live_chrono(), - base_usd_px.ts, - quote_usd_px.ts - ); - return Ok(rate); - } - } - - warn!( - base = base, - quote = quote, - "Price not available for markets '{}' and '{}'", - usd_quote_id, - quote_usd_id - ); - - Ok(ExpiringOption::None(Instant::now(), Self::DEFAULT_TTL)) - } - - async fn get_base_usd_price(&self, base: &AssetId, quote: &AssetId) -> Result> { - let base_usd_id = format!("{}{}", base, "usd"); - let usd_base_id = format!("{}{}", "usd", base); - - let base_usd = self.get_market(base_usd_id.clone()).await?; - if let Some(ref m) = base_usd { - if let Some(px) = m.price() { - return Ok(Some(*px)); - } - } - - let usd_base = self.get_market(usd_base_id.clone()).await?; - if let Some(ref m) = usd_base { - if let Some(px) = m.price() { - return Ok(Some(Price { - price: 1. / px.price, - ts: px.ts, - })); - } - } - - match (base_usd, usd_base) { - (None, None) => { - warn!( - base = base, - quote = quote, - "Cannot find any of markets: '{}', '{}'", - base_usd_id, - usd_base_id - ); - Ok(None) - } - (_, _) => { - warn!( - base = base, - quote = quote, - "Price not available for any of markets: '{}', '{}'", - base_usd_id, - usd_base_id - ); - Ok(None) - } - } - } - - pub async fn update_cw_data(&self) -> Result<()> { - // Collect assets and markets from CW - let (assets, markets) = self.providers.cw.fetch_assets(&self.repo).await?; - - // Store assets in repository - for a in assets { - info!("Storing asset '{}'", a.id()); - self.repo.store_asset(&a).await.unwrap_or_else(|e| { - error!( - "Failed to store asset '{}': {} ({})", - a.id(), - e, - serde_json::to_string(&a).unwrap() - ); - }) - } - - // Store markets in repository - for m in &markets { - info!("Storing market '{}'", m.id); - self.repo.store_market(m).await.unwrap_or_else(|e| { - error!( - "Failed to store market '{}': {} ({})", - m.id, - e, - serde_json::to_string(m).unwrap() - ); - }) - } - - // Invalidate caches - self.invalidate_asset_cache(); - self.mkt_loaders.clear(); - self.pricers.clear(); - - Ok(()) - } - - pub async fn update_kraken_data(&self) -> Result<()> { - // Collect assets and markets from Kraken - let (assets, markets) = self.providers.kraken.fetch_assets(&self.repo).await?; - - // Store assets in repository - for a in assets { - info!("Storing asset '{}'", a.id()); - self.repo.store_asset(&a).await.unwrap_or_else(|e| { - error!( - "Failed to store asset '{}': {} ({})", - a.id(), - e, - serde_json::to_string(&a).unwrap() - ); - }) - } - - // Store markets in repository - for m in &markets { - info!("Storing market '{}'", m.id); - self.repo.store_market(m).await.unwrap_or_else(|e| { - error!( - "Failed to store market '{}': {} ({})", - m.id, - e, - serde_json::to_string(m).unwrap() - ); - }) - } - - // Invalidate caches - self.invalidate_asset_cache(); - self.mkt_loaders.clear(); - self.pricers.clear(); - - Ok(()) - } - - pub async fn update_market_prices(&self) -> Result<()> { - let markets = self.repo.load_markets().await?; - - let mut failed_id = None; - for m in &markets { - // Trigger market refresh - if let Err(e) = self.get_market(m.id.clone()).await { - warn!("{:?}", e); - failed_id = Some(m.id.clone()); - } - // Please the rate limiter - tokio::time::sleep(Duration::from_millis(50)).await; - } - - if let Some(id) = failed_id { - Err(DcaError::PriceNotAvailableId(id)) - } else { - Ok(()) - } - } -} - -fn normalized_asset(id: &AssetId) -> AssetId { - lazy_static::lazy_static! { - static ref NORMALIZED: HashMap<&'static str, &'static str> = { - [("eth2", "eth"), ("eth2.s", "eth")].into_iter().collect() - }; - } - - NORMALIZED - .get(id.as_str()) - .unwrap_or(&id.as_str()) - .to_string() -} - -struct AssetsCache { - fiats: Option>>, - crypto: Option>>, -} - -impl AssetsCache { - fn new() -> Self { - Self { - fiats: None, - crypto: None, - } - } -} diff --git a/dcapal-backend/src/error.rs b/dcapal-backend/src/error.rs index f5869920..1555398d 100644 --- a/dcapal-backend/src/error.rs +++ b/dcapal-backend/src/error.rs @@ -7,7 +7,7 @@ use hyper::StatusCode; use redis::RedisError; use tracing::error; -use crate::domain::entity::{AssetId, MarketId}; +use crate::app::domain::entity::{AssetId, MarketId}; #[derive(thiserror::Error)] pub enum DcaError { diff --git a/dcapal-backend/src/lib.rs b/dcapal-backend/src/lib.rs index d68b15b4..55099b5a 100644 --- a/dcapal-backend/src/lib.rs +++ b/dcapal-backend/src/lib.rs @@ -1,18 +1,28 @@ -pub mod adapter; -pub mod api; +pub mod app; pub mod config; -pub mod domain; pub mod error; -pub mod maintenance; -pub mod repository; -pub mod stats; +pub mod ports; use crate::{ + app::{ + infra, + services::{ip2location::Ip2LocationService, market_data::MarketDataService}, + workers::{market_discovery::MarketDiscoveryWorker, price_updater::PriceUpdaterWorker}, + }, config::Config, error::{DcaError, Result}, + ports::{ + inbound::rest, + outbound::{ + adapter::{CryptoWatchProvider, IpApi, KrakenProvider, PriceProviders, YahooProvider}, + repository::{ + market_data::MarketDataRepository, ImportedRepository, MiscRepository, + StatsRepository, + }, + }, + }, }; -use adapter::{CryptoWatchProvider, IpApi, KrakenProvider, YahooProvider}; use axum::{ extract::connect_info::IntoMakeServiceWithConnectInfo, middleware, @@ -21,12 +31,8 @@ use axum::{ }; use chrono::prelude::*; use deadpool_redis::{Pool, Runtime}; -use domain::{ip2location::Ip2LocationService, market_data::MarketDataService}; use futures::future::BoxFuture; use metrics::{counter, describe_counter, describe_histogram, Unit}; -use repository::{ - market_data::MarketDataRepository, ImportedRepository, MiscRepository, StatsRepository, -}; use std::{ net::{AddrParseError, SocketAddr}, sync::Arc, @@ -54,7 +60,7 @@ pub struct AppContextInner { redis: Pool, services: Services, repos: Arc, - providers: Arc, + providers: Arc, } pub type AppContext = Arc; @@ -73,19 +79,11 @@ struct Repository { pub imported: Arc, } -#[derive(Clone)] -pub struct Provider { - pub cw: Arc, - pub kraken: Arc, - pub yahoo: Arc, - pub ipapi: Arc, -} - pub struct DcaServer { addr: SocketAddr, app: IntoMakeServiceWithConnectInfo, SocketAddr>, ctx: AppContext, - maintenance_handle: Option>, + worker_handlers: Vec>, stop_tx: tokio::sync::watch::Sender, } @@ -108,7 +106,7 @@ impl DcaServer { imported: Arc::new(ImportedRepository::new(redis.clone())), }); - let providers = Arc::new(Provider { + let providers = Arc::new(PriceProviders { cw: Arc::new(CryptoWatchProvider::new( http.clone(), &config.app.providers, @@ -131,11 +129,7 @@ impl DcaServer { }; let services = Services { - mkt_data: Arc::new(MarketDataService::new( - config.clone(), - repos.mkt_data.clone(), - providers.clone(), - )), + mkt_data: Arc::new(MarketDataService::new(repos.mkt_data.clone())), ip2location, }; @@ -150,19 +144,19 @@ impl DcaServer { let app = Router::new() .route("/", get(|| async { "Greetings from DCA-Pal APIs!" })) - .route("/assets/fiat", get(api::get_assets_fiat)) - .route("/assets/crypto", get(api::get_assets_crypto)) - .route("/price/:asset", get(api::get_price)) - .route("/import/portfolio", post(api::import_portfolio)) - .route("/import/portfolio/:id", get(api::get_imported_portfolio)) + .route("/assets/fiat", get(rest::get_assets_fiat)) + .route("/assets/crypto", get(rest::get_assets_crypto)) + .route("/price/:asset", get(rest::get_price)) + .route("/import/portfolio", post(rest::import_portfolio)) + .route("/import/portfolio/:id", get(rest::get_imported_portfolio)) .route_layer( ServiceBuilder::new() .layer(TraceLayer::new_for_http()) .layer(middleware::from_fn_with_state( ctx.clone(), - stats::requests_stats, + infra::stats::requests_stats, )) - .layer(middleware::from_fn(stats::latency_stats)), + .layer(middleware::from_fn(infra::stats::latency_stats)), ) .with_state(ctx.clone()) .into_make_service_with_connect_info(); @@ -182,7 +176,7 @@ impl DcaServer { addr, app, ctx, - maintenance_handle: None, + worker_handlers: Vec::new(), stop_tx, }) } @@ -191,14 +185,26 @@ impl DcaServer { info!("Initializing metrics"); self.init_metrics().await; - info!("Starting Maintenance task"); + info!("Starting MarketDiscovery worker"); { let ctx = self.ctx.clone(); let stop_rx = self.stop_tx.subscribe(); let handle = tokio::spawn(async move { - maintenance::run(ctx, stop_rx).await; + let worker = MarketDiscoveryWorker::new(&ctx); + worker.run(stop_rx).await; }); - self.maintenance_handle.replace(handle); + self.worker_handlers.push(handle); + } + + info!("Starting PriceUpdater worker"); + { + let ctx = self.ctx.clone(); + let stop_rx = self.stop_tx.subscribe(); + let handle = tokio::spawn(async move { + let worker = PriceUpdaterWorker::new(&ctx, Duration::from_secs(5 * 60)); + worker.run(stop_rx).await; + }); + self.worker_handlers.push(handle); } info!("Starting DcaServer at {}", &self.addr); @@ -214,19 +220,23 @@ impl DcaServer { } pub async fn init_metrics(&self) { - describe_counter!(stats::VISITORS_TOTAL, Unit::Count, "Number of API visitors"); describe_counter!( - stats::REQUESTS_TOTAL, + infra::stats::VISITORS_TOTAL, + Unit::Count, + "Number of API visitors" + ); + describe_counter!( + infra::stats::REQUESTS_TOTAL, Unit::Count, "Number of requests processed" ); describe_histogram!( - stats::LATENCY_SUMMARY, + infra::stats::LATENCY_SUMMARY, Unit::Microseconds, "Summary of endpoint response time" ); describe_counter!( - stats::IMPORTED_PORTFOLIOS_TOTAL, + infra::stats::IMPORTED_PORTFOLIOS_TOTAL, Unit::Count, "Number of portfolios imported" ); @@ -235,13 +245,13 @@ impl DcaServer { if let Err(e) = refresh_total_visitors_stats(&self.ctx.repos.stats).await { error!( "Failed to refresh Prometheus {} metric: {e:?}", - stats::VISITORS_TOTAL + infra::stats::VISITORS_TOTAL ); } if let Err(e) = refresh_imported_portfolios_stats(&self.ctx.repos.stats).await { error!( "Failed to refresh Prometheus {} metric: {e:?}", - stats::IMPORTED_PORTFOLIOS_TOTAL + infra::stats::IMPORTED_PORTFOLIOS_TOTAL ); } } @@ -276,7 +286,7 @@ async fn refresh_total_visitors_stats(stats_repo: &StatsRepository) -> Result<() let geo = stats_repo.find_visitor_ip(&ip).await?; if let Some(geo) = geo { counter!( - stats::VISITORS_TOTAL, + infra::stats::VISITORS_TOTAL, count as u64, &[ ("ip", geo.ip), @@ -292,7 +302,7 @@ async fn refresh_total_visitors_stats(stats_repo: &StatsRepository) -> Result<() async fn refresh_imported_portfolios_stats(stats_repo: &StatsRepository) -> Result<()> { let count = stats_repo.get_imported_portfolio_count().await?; - counter!(stats::IMPORTED_PORTFOLIOS_TOTAL, count as u64); + counter!(infra::stats::IMPORTED_PORTFOLIOS_TOTAL, count as u64); Ok(()) } diff --git a/dcapal-backend/src/maintenance.rs b/dcapal-backend/src/maintenance.rs deleted file mode 100644 index eca3ce21..00000000 --- a/dcapal-backend/src/maintenance.rs +++ /dev/null @@ -1,71 +0,0 @@ -use chrono::{TimeZone, Utc}; -use std::time::Duration; -use tokio::sync::watch; -use tracing::{debug, error, info}; - -use crate::{error::Result, repository::MiscRepository, AppContext, DateTime}; - -async fn should_stop(stop_rx: &mut watch::Receiver) { - while stop_rx.changed().await.is_ok() { - if *stop_rx.borrow_and_update() { - return; - } - } -} - -pub async fn run(ctx: AppContext, mut stop_rx: watch::Receiver) { - let mkt_data = &ctx.services.mkt_data; - - let mut is_running = true; - while is_running { - // Wait for time - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(1)) => {} - _ = should_stop(&mut stop_rx) => { is_running = false; } - } - - let res = is_outdated(&ctx.repos.misc).await; - if let Err(e) = res { - error!("Failed to fetch last update time: {:?}", e); - continue; - } - - let (is_outdated, last_fetched_ts) = res.unwrap(); - if !is_outdated { - debug!( - "CW assets already fetched today ({})", - last_fetched_ts.map(|t| t.to_string()).unwrap_or_default() - ); - continue; - } - - let mut _is_err = false; - - info!("Updating Kraken market data"); - if let Err(e) = mkt_data.update_kraken_data().await { - error!("Failed to update Kraken Assets and Markets data: {:?}", e); - _is_err = true; - } - - if let Err(e) = mkt_data.update_market_prices().await { - error!("Failed to update Market prices: {:?}", e); - _is_err = true; - } - - let now = Utc::now(); - if let Err(e) = ctx.repos.misc.set_cw_last_fetched(now).await { - error!("Failed to update last update time: {:?}", e); - } - } -} - -async fn is_outdated(misc: &MiscRepository) -> Result<(bool, Option)> { - let last_fetched = misc.get_cw_last_fetched().await?; - if let Some(ts) = last_fetched { - let ts_day = Utc.from_utc_datetime(&ts.naive_utc()).date_naive(); - let today = Utc::now().date_naive(); - return Ok((ts_day < today, Some(ts))); - } - - Ok((true, None)) -} diff --git a/dcapal-backend/src/ports/inbound/mod.rs b/dcapal-backend/src/ports/inbound/mod.rs new file mode 100644 index 00000000..8ed8e2e5 --- /dev/null +++ b/dcapal-backend/src/ports/inbound/mod.rs @@ -0,0 +1,4 @@ +//! [`ports::inbound`](self) module contains components to interact with the +//! system *from the outside* + +pub mod rest; diff --git a/dcapal-backend/src/api/mod.rs b/dcapal-backend/src/ports/inbound/rest/mod.rs similarity index 89% rename from dcapal-backend/src/api/mod.rs rename to dcapal-backend/src/ports/inbound/rest/mod.rs index 08ca564a..4f5940ea 100644 --- a/dcapal-backend/src/api/mod.rs +++ b/dcapal-backend/src/ports/inbound/rest/mod.rs @@ -1,11 +1,13 @@ +//! The [`rest`](self) module implements the REST API of the system + use std::time::Duration; -use crate::domain::command::{ConversionRateQuery, ImportPortfolioCmd}; -use crate::domain::entity::AssetKind; -use crate::domain::utils::Expiring; +use crate::app::domain::entity::AssetKind; +use crate::app::infra::utils::Expiring; +use crate::app::services::command::{ConversionRateQuery, ImportPortfolioCmd}; use crate::error::{DcaError, Result}; -use crate::repository::ImportedPortfolio; -use crate::{stats, AppContext}; +use crate::ports::outbound::repository::ImportedPortfolio; +use crate::{infra::stats, AppContext}; use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Response}; @@ -17,7 +19,8 @@ use lazy_static::lazy_static; use metrics::increment_counter; use serde::{Deserialize, Serialize}; -static PORTFOLIO_SCHEMA_STR: &str = include_str!("../../docs/schema/portfolio/v1/schema.json"); +static PORTFOLIO_SCHEMA_STR: &str = + include_str!("../../../../docs/schema/portfolio/v1/schema.json"); lazy_static! { static ref ASSETS_CACHE_CONTROL: CacheControl = CacheControl::new() diff --git a/dcapal-backend/src/ports/mod.rs b/dcapal-backend/src/ports/mod.rs new file mode 100644 index 00000000..61228dc6 --- /dev/null +++ b/dcapal-backend/src/ports/mod.rs @@ -0,0 +1,5 @@ +//! [`ports`](self) module contains components dealing with the outer world. +//! Rest APIs, Redis interfaces and adapters to third-party services are all examples of a `Port`. + +pub mod inbound; +pub mod outbound; diff --git a/dcapal-backend/src/adapter/cw.rs b/dcapal-backend/src/ports/outbound/adapter/cw.rs similarity index 97% rename from dcapal-backend/src/adapter/cw.rs rename to dcapal-backend/src/ports/outbound/adapter/cw.rs index d76f32e1..8e0b243f 100644 --- a/dcapal-backend/src/adapter/cw.rs +++ b/dcapal-backend/src/ports/outbound/adapter/cw.rs @@ -8,10 +8,10 @@ use std::{ use tracing::{debug, error, info}; use crate::{ + app::domain::entity::{Asset, Crypto, Fiat, Market, MarketId, OHLCFrequency}, config, - domain::entity::{Asset, Crypto, Fiat, Market, MarketId, OHLCFrequency}, error::{DcaError, Result}, - repository::market_data::MarketDataRepository, + ports::outbound::repository::market_data::MarketDataRepository, DateTime, }; @@ -78,7 +78,7 @@ impl CryptoWatchProvider { .get(&a.quote.symbol) .expect("Quote asset not found"); - Market::new(pair, base.clone(), quote.clone()) + Market::new(pair, base.clone(), quote.clone(), None) }) .collect::>(); diff --git a/dcapal-backend/src/adapter/ipapi.rs b/dcapal-backend/src/ports/outbound/adapter/ipapi.rs similarity index 100% rename from dcapal-backend/src/adapter/ipapi.rs rename to dcapal-backend/src/ports/outbound/adapter/ipapi.rs diff --git a/dcapal-backend/src/adapter/kraken.rs b/dcapal-backend/src/ports/outbound/adapter/kraken.rs similarity index 98% rename from dcapal-backend/src/adapter/kraken.rs rename to dcapal-backend/src/ports/outbound/adapter/kraken.rs index 70345996..5f53b647 100644 --- a/dcapal-backend/src/adapter/kraken.rs +++ b/dcapal-backend/src/ports/outbound/adapter/kraken.rs @@ -11,10 +11,10 @@ use std::{ use tracing::{debug, error, warn}; use crate::{ + app::domain::entity::{Asset, AssetId, Crypto, Fiat, Market, MarketId, OHLCFrequency}, config, - domain::entity::{Asset, AssetId, Crypto, Fiat, Market, MarketId, OHLCFrequency}, error::{DcaError, Result}, - repository::market_data::MarketDataRepository, + ports::outbound::repository::market_data::MarketDataRepository, DateTime, }; @@ -185,6 +185,7 @@ impl KrakenProvider { format!("{base_id}{quote_id}"), base.clone(), quote.clone(), + None, )) }) .collect::>(); @@ -402,6 +403,7 @@ async fn resolve_assets_data_kraken_only( format!("{base_id}{quote_id}"), base.clone(), quote.clone(), + None, )) }) .collect::>(); diff --git a/dcapal-backend/src/adapter/mod.rs b/dcapal-backend/src/ports/outbound/adapter/mod.rs similarity index 58% rename from dcapal-backend/src/adapter/mod.rs rename to dcapal-backend/src/ports/outbound/adapter/mod.rs index eade8ee6..54025516 100644 --- a/dcapal-backend/src/adapter/mod.rs +++ b/dcapal-backend/src/ports/outbound/adapter/mod.rs @@ -1,8 +1,12 @@ +//! The [`adapter`](self) module contains adapters to third-party services + mod cw; mod ipapi; mod kraken; mod yahoo; +use std::sync::Arc; + pub use cw::*; pub use ipapi::*; pub use kraken::*; @@ -18,3 +22,11 @@ type DefaultCircuitBreaker = StateMachine< OrElse, ConsecutiveFailures>, (), >; + +#[derive(Clone)] +pub struct PriceProviders { + pub cw: Arc, + pub kraken: Arc, + pub yahoo: Arc, + pub ipapi: Arc, +} diff --git a/dcapal-backend/src/adapter/yahoo.rs b/dcapal-backend/src/ports/outbound/adapter/yahoo.rs similarity index 99% rename from dcapal-backend/src/adapter/yahoo.rs rename to dcapal-backend/src/ports/outbound/adapter/yahoo.rs index 10b49772..dbb09251 100644 --- a/dcapal-backend/src/adapter/yahoo.rs +++ b/dcapal-backend/src/ports/outbound/adapter/yahoo.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use tracing::{debug, warn}; use crate::{ - domain::entity::{Market, OHLCFrequency}, + app::domain::entity::{Market, OHLCFrequency}, error::{DcaError, Result}, DateTime, }; diff --git a/dcapal-backend/src/ports/outbound/mod.rs b/dcapal-backend/src/ports/outbound/mod.rs new file mode 100644 index 00000000..6eda6621 --- /dev/null +++ b/dcapal-backend/src/ports/outbound/mod.rs @@ -0,0 +1,5 @@ +//! [`ports::outbound`](self) module contains components allowing the system to +//! interact *with the outside* + +pub mod adapter; +pub mod repository; diff --git a/dcapal-backend/src/repository/dto.rs b/dcapal-backend/src/ports/outbound/repository/dto.rs similarity index 89% rename from dcapal-backend/src/repository/dto.rs rename to dcapal-backend/src/ports/outbound/repository/dto.rs index 2a4b32c2..38805071 100644 --- a/dcapal-backend/src/repository/dto.rs +++ b/dcapal-backend/src/ports/outbound/repository/dto.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use crate::domain::entity::{AssetId, Market, MarketId, Price}; +use crate::app::domain::entity::{AssetId, Market, MarketId, Price}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct MarketDto { diff --git a/dcapal-backend/src/repository/market_data/mod.rs b/dcapal-backend/src/ports/outbound/repository/market_data/mod.rs similarity index 96% rename from dcapal-backend/src/repository/market_data/mod.rs rename to dcapal-backend/src/ports/outbound/repository/market_data/mod.rs index 04265480..d0a8e788 100644 --- a/dcapal-backend/src/repository/market_data/mod.rs +++ b/dcapal-backend/src/ports/outbound/repository/market_data/mod.rs @@ -2,7 +2,7 @@ mod redis_asset; mod redis_market; use crate::{ - domain::entity::{Asset, AssetId, AssetKind, Market, MarketId}, + app::domain::entity::{Asset, AssetId, AssetKind, Market, MarketId}, error::{DcaError, Result}, }; diff --git a/dcapal-backend/src/repository/market_data/redis_asset.rs b/dcapal-backend/src/ports/outbound/repository/market_data/redis_asset.rs similarity index 96% rename from dcapal-backend/src/repository/market_data/redis_asset.rs rename to dcapal-backend/src/ports/outbound/repository/market_data/redis_asset.rs index 540bd810..e87cdbf3 100644 --- a/dcapal-backend/src/repository/market_data/redis_asset.rs +++ b/dcapal-backend/src/ports/outbound/repository/market_data/redis_asset.rs @@ -2,9 +2,9 @@ use axum::async_trait; use tracing::{debug, error}; use crate::{ - domain::entity::{Asset, AssetId, AssetKind}, + app::domain::entity::{Asset, AssetId, AssetKind}, error::{DcaError, Result}, - repository::REDIS_BASE, + ports::outbound::repository::REDIS_BASE, }; const ASSET_KEY: &str = concatcp!(REDIS_BASE, ':', "asset"); diff --git a/dcapal-backend/src/repository/market_data/redis_market.rs b/dcapal-backend/src/ports/outbound/repository/market_data/redis_market.rs similarity index 97% rename from dcapal-backend/src/repository/market_data/redis_market.rs rename to dcapal-backend/src/ports/outbound/repository/market_data/redis_market.rs index ef908dcf..92e023f6 100644 --- a/dcapal-backend/src/repository/market_data/redis_market.rs +++ b/dcapal-backend/src/ports/outbound/repository/market_data/redis_market.rs @@ -3,9 +3,9 @@ use futures::{StreamExt, TryStreamExt}; use tracing::{debug, error}; use crate::{ - domain::entity::{Market, MarketId}, + app::domain::entity::{Market, MarketId}, error::{DcaError, Result}, - repository::{dto::MarketDto, REDIS_BASE}, + ports::outbound::repository::{dto::MarketDto, REDIS_BASE}, }; use super::MarketDataRepository; @@ -164,6 +164,6 @@ async fn resolve_market(market: MarketDto, repo: &MarketDataRepository) -> Resul error!(mkt = market.id, "Quote asset not found: {}", &market.quote); Ok(None) } - (Some(b), Some(q)) => Ok(Some(Market::new(market.id, b, q))), + (Some(b), Some(q)) => Ok(Some(Market::new(market.id, b, q, market.price))), } } diff --git a/dcapal-backend/src/repository/mod.rs b/dcapal-backend/src/ports/outbound/repository/mod.rs similarity index 96% rename from dcapal-backend/src/repository/mod.rs rename to dcapal-backend/src/ports/outbound/repository/mod.rs index e56d389b..a6b303e8 100644 --- a/dcapal-backend/src/repository/mod.rs +++ b/dcapal-backend/src/ports/outbound/repository/mod.rs @@ -1,3 +1,5 @@ +//! The [`repository`](self) module contains interfaces to persistent storage services, like Redis. + pub mod dto; pub mod market_data; @@ -8,7 +10,7 @@ use chrono::{TimeZone, Utc}; use redis::AsyncCommands; use uuid::Uuid; -use crate::{domain::ip2location::GeoData, error::Result, DateTime}; +use crate::{app::services::ip2location::GeoData, error::Result, DateTime}; const REDIS_BASE: &str = "dcapal:be"; diff --git a/dcapal-optimizer-wasm/Cargo.toml b/dcapal-optimizer-wasm/Cargo.toml index 47913f92..293e3df9 100644 --- a/dcapal-optimizer-wasm/Cargo.toml +++ b/dcapal-optimizer-wasm/Cargo.toml @@ -22,7 +22,7 @@ console_error_panic_hook = { version = "0.1.7", optional = true } log = "0.4.20" minilp = "0.2.2" serde = { version = "1.0.193", features = ["derive"] } -serde-wasm-bindgen = "0.6.1" +serde-wasm-bindgen = "0.6.2" futures = "0.3.29" anyhow = "1.0.62" thiserror = "1"