diff --git a/agent/Cargo.lock b/agent/Cargo.lock new file mode 100644 index 00000000000..c528f4177d8 --- /dev/null +++ b/agent/Cargo.lock @@ -0,0 +1,926 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "agent" +version = "0.1.0" +dependencies = [ + "anyhow", + "openssl", + "serde", + "serde-inline-default", + "tokio", + "tokio-childstream", + "tokio-openssl", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "anyhow" +version = "1.0.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" + +[[package]] +name = "bytelinebuf" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dac1814fba5e4c02dd39c16fb4af791e4475986f73169ad5ad0b6422fe05b2" +dependencies = [ + "bytes", + "futures", + "pin-project", +] + +[[package]] +name = "bytes" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + +[[package]] +name = "cc" +version = "1.0.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "indexmap" +version = "2.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.153" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" + +[[package]] +name = "memchr" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" + +[[package]] +name = "miniz_oxide" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.48.0", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.4.2", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-sys" +version = "0.9.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.48.5", +] + +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + +[[package]] +name = "proc-macro2" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.197" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde-inline-default" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9980133dc534d02ab08df3b384295223a45090c40a4c46240e3eaa982b495910" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_derive" +version = "1.0.197" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_spanned" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +dependencies = [ + "serde", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" + +[[package]] +name = "socket2" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "syn" +version = "2.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + +[[package]] +name = "tokio" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "tracing", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-childstream" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7de07a7f1b663a15c553464bba34de7bd98847cfd905b768dda2ef922f046a12" +dependencies = [ + "bytelinebuf", + "bytes", + "futures", + "pin-project", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-openssl" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ffab79df67727f6acf57f1ff743091873c24c579b1e2ce4d8f53e47ded4d63d" +dependencies = [ + "futures-util", + "openssl", + "openssl-sys", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "toml" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e40bb779c5187258fd7aad0eb68cb8706a0a81fa712fbea808ab43c4b8374c4" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.4", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +dependencies = [ + "windows_aarch64_gnullvm 0.52.4", + "windows_aarch64_msvc 0.52.4", + "windows_i686_gnu 0.52.4", + "windows_i686_msvc 0.52.4", + "windows_x86_64_gnu 0.52.4", + "windows_x86_64_gnullvm 0.52.4", + "windows_x86_64_msvc 0.52.4", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" + +[[package]] +name = "winnow" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8" +dependencies = [ + "memchr", +] diff --git a/agent/Cargo.toml b/agent/Cargo.toml new file mode 100644 index 00000000000..19ab1781561 --- /dev/null +++ b/agent/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "agent" +version = "0.1.0" +edition = "2021" + +[dependencies] +#may or may not be used on old distro +#openssl = { version = "0.10", features = ["vendored"] } +openssl = { version = "*" } +tokio = { version = "*", features = ["full", "tracing"] } +tokio-childstream = "*" +tokio-util = "*" +tokio-openssl = "*" +tracing = "*" +tracing-subscriber = "*" +toml = "*" +serde = { version = "*", features = ["derive"] } +serde-inline-default = "*" +anyhow = "*" \ No newline at end of file diff --git a/agent/src/bin/client.rs b/agent/src/bin/client.rs new file mode 100644 index 00000000000..39de87da15b --- /dev/null +++ b/agent/src/bin/client.rs @@ -0,0 +1,335 @@ +///! This binary will : +///! - have its stdin/stdout/stderr used by rsync client +///! - open a http/connect socket to the daemon +///! - eventually use a TLS authentication with the daemon +///! - transmit data with bidirectional connection + +use std::io; +use std::io::prelude::*; +use std::net::TcpStream; +use std::io::BufReader; +use std::fmt::Debug; +use std::thread; +use std::env; +use std::ffi::OsString; +use std::fs; +use std::path::PathBuf; +use std::process::exit; +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, TryRecvError}; + +use openssl::ssl::{Ssl, SslAcceptor, SslConnector, SslFiletype, SslMethod, SslStream, SslVerifyMode}; +use openssl::x509::store::X509StoreBuilder; +use openssl::x509::X509; +use tracing::{Level, event}; +use serde::Deserialize; +use serde_inline_default::serde_inline_default; +use anyhow::{anyhow, bail, Context, Result}; + +use agent::{BUFFER_SIZE, get_config, init_tracing, LogLevel, TL_SIZE}; + +const CONFIG_PATH: &str = "/tmp/original.toml"; +const DEBUG_CONFIG_PATH: &str = "/tmp/test.toml"; + +// TODO review defaults +#[serde_inline_default] +#[derive(Deserialize, Debug)] +struct Config { + // port to connect to remote server in HTTPS + #[serde_inline_default("443".into())] + https_port: String, + + // TLS verify cert + #[serde_inline_default(false)] + verify_https_cert: bool, + + // TLS certificate trust (None = system truststore) + #[serde_inline_default(None)] + https_ca_path: Option, + + // local port on the remote server (should not be changed) + #[serde_inline_default("873".into())] + target_port: String, + + // Check authentication server certificate + #[serde_inline_default(false)] + authentication_verify: bool, + + // Specify authentication CA (used to trust server cert) + #[serde_inline_default(Some(PathBuf::from("/root/ca.pem")))] + authentication_ca: Option, + + // Specify authentication cert + #[serde_inline_default(Some(PathBuf::from("/root/client-cert.pem")))] + authentication_certificate: Option, + + // Specify authentication cert key + #[serde_inline_default(Some(PathBuf::from("/root/client-key.pem")))] + authentication_key: Option, + + // Log level (default debug in debug build) + #[cfg(debug_assertions)] + #[serde_inline_default(LogLevel(Level::TRACE))] + log_level: LogLevel, + + // Log level (default info in release build) + #[cfg(not(debug_assertions))] + #[serde_inline_default(LogLevel(Level::INFO))] + log_level: LogLevel, +} + +/// main with auto error printing +#[tracing::instrument] +fn main() -> Result<()> { + // startup + let config: Config = get_config(CONFIG_PATH, DEBUG_CONFIG_PATH)?; + init_tracing(&config.log_level); + event!(Level::TRACE, "Configuration {:?}", config); + + // initiate connection + let (host, command) = get_command()?; + let stream = connect(&config, host)?; + + // Line based, HTTP part of the protocol (proxy through apache to daemon) + let mut read_stream = BufReader::new(stream); + let target = String::from("localhost:") + &config.target_port; + write_proxy_header(read_stream.get_mut(), &target)?; + read_proxy_response(&mut read_stream)?; + + // Connect with ssl to daemon + let stream = read_stream.into_inner(); + let mut read_stream = BufReader::new(ssl_connect(stream, &config)?); + + // Line based, daemon part of the protocol (run more rsync) + read_stream.get_mut().write_all(command.as_encoded_bytes())?; + read_command_response(&mut read_stream) + .with_context(|| format!("Remote rsync run failed '{}'", command.to_string_lossy()))?; + + // Asynchronous part of the protocol (once rsync can start communicating) + read_stream.get_mut().get_mut().get_mut().set_nonblocking(true)?; + + // Since stdin cannot be put into non blocking mode, we must read it in a separate thread + let stdin = spawn_stdin_channel(); + + // loop over communication channel to to transmit bytes when needed + let mut stderr = io::stderr(); + let mut stdout = io::stdout(); + io::stderr().write_all("Starting\n".as_bytes())?; + loop { + let mut processed_data = false; + processed_data |= proxy_stdin(&stdin, read_stream.get_mut())?; + processed_data |= proxy_stderrout(&mut read_stream, &mut stdout, &mut stderr)?; + if !processed_data { + // if we didn't process data, we let the CPU do other things before coming back + thread::yield_now(); + } + } + // exit will be called from inside the loop when needed, so this is expected +} + +/// Get command line arguments as passed by rsync +#[tracing::instrument] +fn get_command() -> Result<(OsString, OsString)> { + let mut args = env::args_os(); + args.next(); // skip self path + let host = args.next().ok_or_else(|| anyhow!("Must be run by rsync (host argument missing)"))?; + let command = args.next().ok_or_else(|| anyhow!("Must be run by rsync (command arguments missing)"))?; + let mut command = args.fold(command, |mut a1,a2| { a1.push(" "); a1.push(a2); a1 }); + command.push("\n"); + Ok((host, command)) +} + +/// Connect with TLS to HTTPS endpoint +#[tracing::instrument] +fn connect(config: &Config, host: OsString) -> Result> { + // use a string to allow for evolution (like ipv6 ...) + let connect_string = String::from(host.to_string_lossy()) + ":" + &config.https_port; + let stream = TcpStream::connect(&connect_string) + .with_context(|| format!("Unable to connect to {}", connect_string))?; + + // Configure TLS connection + let mut builder = SslConnector::builder(SslMethod::tls_client())?; + builder.set_verify(if config.verify_https_cert { SslVerifyMode::PEER } + else { SslVerifyMode::NONE }); + if let Some(cert_path) = &config.https_ca_path { + let pem = fs::read(cert_path)?; + let cert_list = X509::stack_from_pem(&pem)?; + let mut store = X509StoreBuilder::new()?; + for cert in cert_list { + store.add_cert(cert)?; + } + builder.set_verify_cert_store(store.build())?; + } + let connector = builder.build(); + + // TLS part of the protocol + connector.connect(&host.to_string_lossy(), stream) + .with_context(|| format!("Unable open TLS connection to {}", connect_string)) +} + +/// Initiate HTTP CONNECT over an existing connection +#[tracing::instrument] +fn write_proxy_header(stream: &mut W, dest: &str) -> Result<()> { + let connect_string = format!( + "CONNECT {0} HTTP/1.1\r\n\ + Host: {0}\r\n\ + \r\n", + dest + ); + stream.write_all(connect_string.as_bytes()) + .with_context(|| "Cannot write to HTTP socket") +} + +/// Read HTTP response after a CONNECT +#[tracing::instrument] +fn read_proxy_response(stream: &mut T) -> Result<()> { + let mut line = String::new(); + stream.read_line(&mut line)?; + if line.starts_with("HTTP/1.0 200 ") { + while line != "\r\n" { + line = String::new(); + stream.read_line(&mut line)?; + } + return Ok(()) + } + Err(anyhow!("Proxy CONNECT to rudder server failed: {}", line)) +} + +/// Initiate TLS connexion over an the HTTP tunnel +#[tracing::instrument] +fn ssl_connect(stream: SslStream, config: &Config) -> Result>> { + // TThis is called acceptor but it works for clients too + let mut builder = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls_client())?; + if config.authentication_verify { + builder.set_verify(SslVerifyMode::PEER); + } else { + builder.set_verify(SslVerifyMode::NONE); + } + if let Some(file) = &config.authentication_ca { + builder.set_ca_file(file)?; + } + if let Some(file) = &config.authentication_certificate { + builder.set_certificate_file(file, SslFiletype::PEM)?; + } + if let Some(file) = &config.authentication_key { + builder.set_private_key_file(file, SslFiletype::PEM)?; + } + let context = builder.build().into_context(); + let ssl = Ssl::new(&context)?; + let stream = ssl.connect(stream)?; + Ok(stream) +} + +/// Read daemon response after sending remote rsync command +#[tracing::instrument] +fn read_command_response(stream: &mut T) -> Result<()> { + let mut line = String::new(); + stream.read_line(&mut line)?; + if line.starts_with("OK:") { + return Ok(()) + } + Err(anyhow!(line)) +} + +/// Loop to wait for stdin and write any data to the channel +/// The only goal of this is to avoid blocking on stdin (a thread can afford to block) +/// We redirect to a channel and not tow the target socket directly because this would lock the socket for read AND write +#[tracing::instrument] +fn spawn_stdin_channel() -> Receiver> { + let (tx, rx) = mpsc::channel(); + event!(Level::DEBUG, "Starting stdin channel"); + thread::spawn(move || { + let mut stdin = io::stdin(); + let mut buffer = [0_u8; BUFFER_SIZE]; + loop { + let n = match stdin.read(&mut buffer) { + Ok(n) => n, + // any error here is fatal + Err(e) => { + let stderr = io::stderr(); + let mut handle = stderr.lock(); + // handling error in error is too hard, just unwrap here + write!(handle, "Error reading stdin: {}.\nTerminating!\n", e).unwrap(); + exit(2); + } + }; + let vec = buffer[0..n].to_vec(); + if let Err(e) = tx.send(vec) { + // any error here is fatal + let stderr = io::stderr(); + let mut handle = stderr.lock(); + // handling error in error is too hard, just unwrap here + write!(handle, "Error writing stdin channel: {}.\nTerminating!\n", e).unwrap(); + exit(2); + } + } + }); + rx +} + +/// Proxy stdin (from channel) into the socket +fn proxy_stdin(stdin: &Receiver>, stream: &mut W) -> Result { + match stdin.try_recv() { + Ok(data) => { + event!(Level::DEBUG, "Read from stdin ({})='{:?}'", data.len(), data); + stream.write_all(&data)?; + Ok(true) + }, + Err(TryRecvError::Empty) => Ok(false), + Err(e) => Err(e).with_context(|| "Unable read from stdin thread"), + } +} + +/// Utility method to extract int from TLV +fn load_u24(value: &[u8]) -> usize { + let mut buf = [0_u8; 8]; + buf[5] = value[0]; + buf[6] = value[1]; + buf[7] = value[2]; + usize::from_be_bytes(buf) +} + + +/// Redirect data from socket to proper target (stdout or stderr) +/// Also handles exit code from remore rsync +fn proxy_stderrout(stream: &mut R, stdout: &mut W1, stderr: &mut W2) -> Result { + let mut buffer: [u8; 2048] = [0; BUFFER_SIZE]; + match stream.read_exact(&mut buffer[0..TL_SIZE]) { + Ok(()) => {}, + Err(e) => return match e.kind() { + io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => Ok(false), + _ => Err(e).with_context(|| "Protocol error, no TLV header"), + }, + } + let len = load_u24(&buffer[1..TL_SIZE]); + event!(Level::TRACE, "Receiving packet of {:?}/{:?}", buffer[0], len); + if buffer[0] == 0 { + // remote termination with exit code, no need to terminate thread, everything is handled by exit + event!(Level::INFO, "Exit with value {}", len); + exit(len as i32); + } else { + let code = buffer[0]; + loop { + match stream.read_exact(&mut buffer[0..len]) { + Ok(()) => break, // done + Err(e) => match e.kind() { + io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => { }, // keep looping because we know there are more data in flight + _ => return Err(e).context("Protocol error, length does not match TLV"), + }, + } + } + event!(Level::TRACE, "Write to stdout ({})='{:?}'", len, &buffer[..len]); + if code == 1 { + stdout.write_all(&buffer[..len])?; + stdout.flush()?; // may be useful if there is a buffer + } else if code== 2 { + stderr.write_all(&buffer[..len])?; + stderr.flush()?; // may be useful if there is a buffer + } else { + bail!("Protocol error, unknown type in TLV! {}", code); + } + } + Ok(true) +} + + diff --git a/agent/src/bin/daemon.rs b/agent/src/bin/daemon.rs new file mode 100644 index 00000000000..9ea0d89e1ab --- /dev/null +++ b/agent/src/bin/daemon.rs @@ -0,0 +1,309 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, AsyncBufReadExt, AsyncRead, AsyncBufRead, AsyncWrite}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; +use tokio::sync::Mutex; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tokio_openssl::SslStream; + +use std::process::Stdio; +use std::sync::Arc; +use std::fmt::Debug; +use std::path::PathBuf; +use std::pin::Pin; + +use tracing::{Level, event}; +use serde::Deserialize; +use serde_inline_default::serde_inline_default; +use agent::{BUFFER_SIZE, TL_SIZE, get_config, init_tracing, LogLevel, ReadStreamPoller}; +use anyhow::{Context, Result}; +use openssl::ssl::{Ssl, SslAcceptor, SslContext, SslFiletype, SslMethod, SslVerifyMode, SslVersion}; + +const CONFIG_PATH: &str = "/tmp/original.toml"; +const DEBUG_CONFIG_PATH: &str = "/tmp/test.toml"; + +// TODO review defaults +#[serde_inline_default] +#[derive(Deserialize, Debug)] +struct Config { + // where to listen on + #[serde_inline_default("localhost:873".into())] + listen: String, + + // Allow no authentication + #[serde_inline_default(true)] + authentication: bool, + + // Specify authentication CA + #[serde_inline_default(Some(PathBuf::from("/root/client-cert.pem")))] + authentication_ca: Option, + + // Specify authentication cert + #[serde_inline_default(PathBuf::from("/root/server-cert.pem"))] + authentication_certificate: PathBuf, + + // Specify authentication cert key + #[serde_inline_default(PathBuf::from("/root/server-key.pem"))] + authentication_key: PathBuf, + + // Log level (default debug in debug build) + #[cfg(debug_assertions)] + #[serde_inline_default(LogLevel(Level::DEBUG))] + log_level: LogLevel, + + // Log level (default info in release build) + #[cfg(not(debug_assertions))] + #[serde_inline_default(LogLevel(Level::INFO))] + log_level: LogLevel, +} + +/// main with auto error printing +#[tokio::main] +async fn main() -> Result<()> { + // startup + let config: Config = get_config(CONFIG_PATH, DEBUG_CONFIG_PATH)?; + init_tracing(&config.log_level); + event!(Level::TRACE, "Configuration {:?}", config); + + // tcp for tests, we'll switch to openssl easily + let listener = TcpListener::bind(&config.listen).await?; + event!(Level::DEBUG, "Listening on: {}", config.listen); + + // Prepare ssl context only once + let ssl_context = Arc::new(create_ssl_context(&config)?); + + loop { + // Asynchronously wait for an inbound socket. + let (socket, _) = listener.accept().await?; + + // Handle connection in dedicated task + let ssl = ssl_context.clone(); + tokio::spawn(async move { + match handle_connection(socket, ssl).await { + Ok(()) => event!(Level::INFO, "socket closed"), + Err(e) => event!(Level::DEBUG, "Socket handling error {:?}", e), + } + }); + } +} + +fn create_ssl_context(config: &Config) -> Result { + // mozilla TLS https://wiki.mozilla.org/Security/Server_Side_TLS + let mut acceptor = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls_server())?; + acceptor.set_private_key_file(&config.authentication_key, SslFiletype::PEM)?; + acceptor.set_certificate_file(&config.authentication_certificate, SslFiletype::PEM)?; + if config.authentication { + acceptor.set_verify(SslVerifyMode::PEER); + } else { + acceptor.set_verify(SslVerifyMode::NONE); + } + // no need for default trust store since we are communicating with our agents + if let Some(file) = &config.authentication_ca { + acceptor.set_ca_file(file)?; + } + // TODO should we set_client_ca_list() ? + // TODO shoud we set_session_id_context() ? + let context_builder = acceptor.build(); + Ok(context_builder.into_context()) +} + +/// Handle a single incoming connection +#[tracing::instrument] +async fn handle_connection(socket: TcpStream, ssl_context: Arc) -> Result<()>{ + event!(Level::INFO, "Servicing one connection!"); + let socket = accept_ssl(socket, ssl_context).await?; + // Line based, synchronous part of the protocol + let mut stream = BufReader::new(socket); + // 1/ Client -> One line containing the rsync command + let command = read_command(&mut stream).await.context("Missing rsync command")?; + // 2/ Server -> One line starting with "OK: " or "ERR: " depending on the command run + let mut process = run_process(command, &mut stream).await; + + let stdout = process.stdout.take().context("Missing stdout access")?; + let stdin = process.stdin.take().context("Missing stdin access")?; + let stderr = process.stderr.take().context("Missing stderr access")?; + + // Asynchronous part of the protocol + // - Client -> any data that go to stdin of the rsync command + // - Server -> TLV with either: + // T=stdout -> data that go to stdout of the client + // T=stderr -> data that go to stderr of the client + // T=exit -> exit code just before closing the connexion + let stream_ref = Arc::new(Mutex::new(stream)); + let stream_ref1 = stream_ref.clone(); + let stream_ref2 = stream_ref.clone(); + let stream_ref3 = stream_ref.clone(); + let token1 = CancellationToken::new(); + let token2 = token1.clone(); + let token3 = token1.clone(); + let mut handles = Vec::new(); + // can't user io::copy because of above split stream mutex + handles.push(tokio::spawn( + async move { select! { + res = proxy_stdin(stream_ref1, stdin) => { token1.cancel(); res } + _ = token1.cancelled() => Ok(()) + }} + )); + handles.push(tokio::spawn( + async move { select! { + res = proxy_stderr(stderr, stream_ref2) => { token2.cancel(); res } + _ = token2.cancelled() => Ok(()) + }} + )); + handles.push(tokio::spawn( + async move { select! { + res = proxy_stdout(stdout, stream_ref3) => { token3.cancel(); res } + _ = token3.cancelled() => Ok(()) + }} + )); + // Only process exit code after all proxy have finished + for h in handles { + match h.await { + Ok(Ok(())) => {} + Ok(Err(e)) => event!(Level::ERROR, "Service Error {:?}", e), + Err(e) => event!(Level::ERROR, "Join Error {:?}", e), + } + } + event!(Level::INFO, "Ended serving connection!"); + + let exit = process.wait().await.expect("failed try wait"); + // 16 bit exit code + let code = exit.code().unwrap_or(0) & 0xFFFF; + let mut buf = vec![0; 4]; + store_u24(&mut buf[1..4], code as usize); + stream_ref.lock() + .await + .write_all(&buf[0..4]) + .await?; + event!(Level::TRACE, "Stdout closed"); Ok(()) +} + +#[tracing::instrument] +async fn accept_ssl(socket: TcpStream, ssl_context: Arc) -> Result> { + let ssl = Ssl::new(&ssl_context)?; + let mut ssl_stream = SslStream::new(ssl, socket)?; + let pinned = Pin::new(&mut ssl_stream); + // server side handshake + pinned.accept().await?; + Ok(ssl_stream) +} + +#[tracing::instrument] +/// Read one line of command from a buffered stream +async fn read_command(stream: &mut ABR) -> Option { + let mut command: String = String::new(); + let n = stream + .read_line(&mut command) + .await + .expect("failed to read data from socket"); + event!(Level::TRACE, "Got data from socket ({})='{}'", n, command); + if n == 0 { + return None; + } + if command.ends_with('\n') { + command.pop(); + if command.ends_with('\r') { + command.pop(); + } + } + Some(command) +} + +#[tracing::instrument] +async fn run_process(command: String, stream: &mut AW) -> Child { + let mut split = command.split(' '); + let bin = split.next().unwrap(); + let args = split.collect::>(); + let process = + Command::new(bin) + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap_or_else(|_| panic!("Could not spawn process :{}:\n", command)); + + stream + .write_all("OK: running\n".as_bytes()) + .await + .expect("failed to write data to socket"); + event!(Level::TRACE, "Process started '{}'", command); + process +} + +// copy length to 3 next bytes in the buffer +fn store_u24(buf: &mut [u8], value: usize) { + // Big endian is the usual network encoding + let len = value.to_be_bytes(); + // that's why size is limited to 2^24 + buf[0] = len[5]; + buf[1] = len[6]; + buf[2] = len[7]; +} +#[tracing::instrument] +// rsync command's stdout on the server -> client socket +// also handle process termination +async fn proxy_stdout(mut stdout: ChildStdout, stream_ref: Arc>) -> Result<()> { + let mut buf = vec![0; BUFFER_SIZE]; + buf[0] = 1; // 1 = stdout + loop { + // read from rsync process and write to connexion + let n = stdout + .read(&mut buf[TL_SIZE..]) + .await?; + event!(Level::TRACE, "Data from stdout ({})='{:?}'", n, &buf[TL_SIZE..n+TL_SIZE]); + if n == 0 { + return Ok(()); + } else { + store_u24(&mut buf[1..4], n); + stream_ref.lock() + .await + .write_all(&buf[0..n+TL_SIZE]) // + .await?; + } + } +} + +#[tracing::instrument] +// rsync command's stderr on the server -> client socket +async fn proxy_stderr(mut stderr: ChildStderr, stream_ref: Arc>) -> Result<()> { + let mut buf = vec![0; BUFFER_SIZE]; + buf[0] = 2; // 2 = stderr + loop { + // read from rsync process and write to connexion + let n = stderr + .read(&mut buf[TL_SIZE..]) + .await?; + event!(Level::TRACE, "Data from stderr ({})='{:?}'", n, &buf[TL_SIZE..n+TL_SIZE]); + if n == 0 { + // no need for specific process, stdout will be closed too + event!(Level::TRACE, "Stderr closed"); + return Ok(()); + } else { + store_u24(&mut buf[1..4], n); + stream_ref.lock() + .await + .write_all(&buf[0..n+TL_SIZE]) + .await?; + } + } +} + +#[tracing::instrument] +// client socket -> rsync command's stdin on the server +async fn proxy_stdin(stream_ref: Arc>, mut stdin: ChildStdin) -> Result<()> { + let mut buf = vec![0; BUFFER_SIZE]; + // read from connexion and write to rsync process + loop { + let poller = ReadStreamPoller::new(stream_ref.clone(), &mut buf); + let n = poller.await?; + event!(Level::TRACE, "Data from socket ({})='{:?}'", n, &buf[0..n]); + if n == 0 { + event!(Level::TRACE, "Socket closed"); + } else { + stdin.write_all(&buf[0..n]) + .await?; + } + } + +} diff --git a/agent/src/lib.rs b/agent/src/lib.rs new file mode 100644 index 00000000000..927495cb436 --- /dev/null +++ b/agent/src/lib.rs @@ -0,0 +1,139 @@ +///! Here we put things that serve both the daemon and the client + +use std::cell::RefCell; +use std::fmt::{Debug, Formatter}; +use std::{fs, io, task}; +use std::future::Future; +use std::path::Path; +use std::pin::{Pin, pin}; +use std::str::FromStr; +use std::sync::Arc; +use std::task::Poll; +use std::ops::DerefMut; +use serde::{Deserialize, Deserializer}; +use serde::de::Visitor; +use tokio::io::{AsyncRead, ReadBuf}; +use tokio::sync::Mutex; +use tracing::Level; +use tracing_subscriber::fmt::format::FmtSpan; +use anyhow::{Context, Result}; + +/// Size of the send/receive buffer +// must be < 2^24 +// > classic ip packet size, but maybe tcp window (65k or more) would be better +pub const BUFFER_SIZE: usize = 2048; +/// Size of TL in TLV frames +pub const TL_SIZE: usize = 4; + +/// Get configuration from config file +pub fn get_config Deserialize<'a> + Debug>(normal_path: &str, debug_path: &str) -> Result { + let mut config_path = normal_path; + // Allow for an alternate config path in debug builds + #[cfg(debug_assertions)] + if Path::new(debug_path).exists() { + config_path = debug_path; + } + + // look for configuration + let config: C = + if let Ok(data) = fs::read_to_string(config_path) { + toml::from_str(&data) + .with_context(|| format!("Unable to parse configuration file {}", config_path))? + } else { + // Config implement default for all fields so "" is always valid + toml::from_str("").unwrap() + }; + Ok(config) +} + +/// Log level newtype to allow deserializing it from configuration +#[derive(Debug)] +pub struct LogLevel(pub Level); +impl<'de> Deserialize<'de> for LogLevel { + fn deserialize(deserializer: D) -> Result where D: Deserializer<'de> { + deserializer.deserialize_str(LogLevelVisitor) + } +} +struct LogLevelVisitor; +impl<'de> Visitor<'de> for LogLevelVisitor { + type Value = LogLevel; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("one of : ERROR, WARN, INFO, DEBUG, TRACE") + } + fn visit_str(self, v: &str) -> Result { + match Level::from_str(v) { + Ok(l) => Ok(LogLevel(l)), + Err(e) => Err(E::custom(format!("Invalid log_level {}: {}", v, e))), + } + } +} + +/// Initialize tokio tracing subscriber +pub fn init_tracing(log_level: &LogLevel) { + // initiate logger + let mut subscriber_builder = tracing_subscriber::fmt() + .compact() + .with_writer(io::stderr) + .with_file(true) + .with_line_number(true) + .with_thread_ids(false) + .with_max_level(log_level.0); + if log_level.0 == Level::TRACE { + // add function tracing if trace is enabled + subscriber_builder = subscriber_builder.with_span_events(FmtSpan::ENTER); + } + subscriber_builder.init(); +} + +/// Implement future over a stream held behind a mutex +/// This is needed because in a proxy the stream can be both waiting for read and being written to +/// So we need +/// 1/ a mutex to allow both reading and writing by separate tasks +/// 2/ something that waits for reading while holding the mutex +/// 3/ something that releases the mutex when there is nothing to read to allow for writing when necessary +/// +/// The current solution releases the mutex each time the poll in pending, which is enough currently +/// - we may need to add a read_timeout to the socket to make is more reactive +/// - we may want to add a signal mechanism to wake up the task exactly when a write is required +pub struct ReadStreamPoller<'a, AR: AsyncRead> { + // We have to use a tokio mutex because the writing task tasks awaits with the mutex on hold + inner: Arc>, + // we need to be able to mutably access to the buffer while pinned and inner in locked, so RefCell + buffer: RefCell>, +} + +impl<'a, AR: AsyncRead> ReadStreamPoller<'a, AR> { + // Buffer is better to be handled by the caller + pub fn new(inner: Arc>, buf: &'a mut [u8]) -> Self { + ReadStreamPoller { + inner, + buffer: RefCell::new(ReadBuf::new(buf)), + } + } +} +impl<'a, AR: AsyncRead + Unpin> Future for ReadStreamPoller<'a, AR> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match self.inner.try_lock() { + Ok(mut guard) => { + let pinned = pin!(guard.deref_mut()); + let mut buf = self.buffer.borrow_mut(); + match pinned.poll_read(cx, &mut buf) { + Poll::Ready(Ok(())) => { + Poll::Ready(Ok(buf.filled().len())) + }, + Poll::Ready(Err(e)) => { + match e.kind() { + io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => Poll::Pending, + _ => Poll::Ready(Err(e)), + } + }, + Poll::Pending => Poll::Pending, + } + } + Err(_) => Poll::Pending, + } + } +}