Skip to content

Commit

Permalink
fix: changed approach to metadata query
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Baldoni <[email protected]>
  • Loading branch information
gabrik committed Nov 14, 2024
1 parent bc78a71 commit 7bae670
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 47 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ darling = "0.20"
env_logger = "0.11.0"
flume = { version = "0.11" }
futures = "0.3.28"
log = "0.4"
tracing = "0.1"
proc-macro2 = "1.0"
quote = "1.0"
serde = { version = "1.0.160", features = ["derive"] }
Expand Down
8 changes: 6 additions & 2 deletions zrpc-derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ async-trait = { workspace = true }
base64 = { workspace = true }
darling = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
proc-macro2 = { workspace = true }
quote = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
syn = { workspace = true }
syn-serde = { workspace = true }
tokio = { workspace = true, features = ["io-std","io-util", "rt-multi-thread"]}
tokio = { workspace = true, features = [
"io-std",
"io-util",
"rt-multi-thread",
] }
tracing = { workspace = true }
zenoh = { workspace = true }
zenoh-util = { workspace = true }
zrpc = { version = "0.8.4-dev", path = "../zrpc" }
Expand Down
3 changes: 0 additions & 3 deletions zrpc-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ impl<'a> ServiceGenerator<'a> {
.map_err(|e| {
zrpc::prelude::Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;();

let ids = res
.into_iter()
.filter_map(|e| e.into_result().ok())
Expand All @@ -554,9 +553,7 @@ impl<'a> ServiceGenerator<'a> {
)
})
.collect::<std::result::Result<std::vec::Vec<zenoh::config::ZenohId>, zrpc::prelude::Status>>()?;

let metadatas = self.ch.get_servers_metadata(&ids, self.tout).await?;

let mut ids: Vec<zenoh::config::ZenohId> = metadatas
.into_iter()
.filter(|m| m.labels.is_superset(&self.labels))
Expand Down
2 changes: 1 addition & 1 deletion zrpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ base64 = { workspace = true }
bincode = { workspace = true, optional = true }
flume = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
serde_cbor = { workspace = true, optional = true }
serde_derive = { workspace = true }
serde_json = { workspace = true, optional = true }
serde_yaml = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
zenoh = { workspace = true }
zenoh-util = { workspace = true }

Expand Down
143 changes: 143 additions & 0 deletions zrpc/examples/check-servers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*********************************************************************************
* Copyright (c) 2022 ZettaScale Technology
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
* Contributors:
* ZettaScale Zenoh Team, <[email protected]>
*********************************************************************************/
use tokio::io::AsyncReadExt;
use zenoh::config::ZenohId;
use zenoh::key_expr::format::KeFormat;

use zenoh::key_expr::KeyExpr;
use zrpc::prelude::*;

use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;
use zenoh::Session;

#[allow(unused)]
#[derive(Clone, Debug)]
pub struct ServerChecker {
pub(crate) ch: RPCClientChannel,
pub(crate) z: Session,
pub(crate) tout: Duration,
pub(crate) labels: HashSet<String>,
pub(crate) service_name: String,
kef: String,
}

// generated client code

impl ServerChecker {
pub fn new(session: Session, service_name: String) -> Self {
let kef = format!("@rpc/${{zid:*}}/service/{service_name}");
Self {
kef,
ch: RPCClientChannel::new(session.clone(), &service_name),
z: session,
tout: std::time::Duration::from_secs(60u16 as u64),
labels: HashSet::new(),
service_name: service_name.clone(),
}
}

async fn find_servers_livelines(&self) -> Result<Vec<ZenohId>, Status> {
let res = self
.z
.liveliness()
.get(format!("@rpc/*/service/{}", self.service_name))
.await
.map_err(|e| {
Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;

let ids = res
.into_iter()
.filter_map(|e| e.into_result().ok())
.map(|e| self.extract_id_from_ke(e.key_expr()))
.collect::<Result<Vec<ZenohId>, Status>>()?;

Ok(ids)
}

async fn get_servers_metadata(&self) -> Result<Vec<ServerMetadata>, Status> {
let ids = self.find_servers_livelines().await?;

// get server metadata
self.ch.get_servers_metadata(&ids, self.tout).await
}

// this could be improved by caching the server id
// maybe by using this https://github.com/moka-rs/moka
async fn find_servers(&self) -> Result<Vec<ZenohId>, Status> {
// get server metadata
let metadatas = self.get_servers_metadata().await?;

// filter the metadata based on labels
let ids: Vec<ZenohId> = metadatas
.into_iter()
.filter(|m| m.labels.is_superset(&self.labels))
.map(|m| m.id)
.collect();

Ok(ids)
}

fn extract_id_from_ke(&self, ke: &KeyExpr) -> Result<ZenohId, Status> {
let ke_format = KeFormat::new(&self.kef)
.map_err(|e| Status::internal_error(format!("Cannot create KE format: {e:?}")))?;
let id_str = ke_format
.parse(ke)
.map_err(|e| Status::internal_error(format!("Unable to parse key expression: {e:?}")))?
.get("zid")
.map_err(|e| {
Status::internal_error(format!(
"Unable to get server id from key expression: {e:?}"
))
})?;

ZenohId::from_str(id_str)
.map_err(|e| Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}")))
}
}

#[tokio::main]
async fn main() {
{
let config = zenoh::config::Config::from_file("./zenoh.json").unwrap();
let zsession = zenoh::open(config).await.unwrap();

let z = zsession.clone();

let server_checker = ServerChecker::new(z, "K8sController".into());

println!("Checking server liveliness");
let res = server_checker.find_servers_livelines().await.unwrap();
println!("Res is: {:?}", res);

press_to_continue().await;
println!("Checking server metadata");
let res = server_checker.get_servers_metadata().await.unwrap();
println!("Res is: {:?}", res);

press_to_continue().await;
println!("Checking server ids");
let res = server_checker.find_servers().await.unwrap();
println!("Res is: {:?}", res);

press_to_continue().await;
}
}

async fn press_to_continue() {
println!("Press ENTER to continue...");
let buffer = &mut [0u8];
tokio::io::stdin().read_exact(buffer).await.unwrap();
}
71 changes: 39 additions & 32 deletions zrpc/src/rpcchannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ extern crate serde;

use std::time::Duration;

use log::trace;
use serde::{Deserialize, Serialize};
use zenoh::config::ZenohId;
use zenoh::handlers::FifoChannelHandler;
Expand Down Expand Up @@ -69,7 +68,7 @@ impl RPCClientChannel {
"@rpc/{}/service/{}/{}",
server_id, self.service_name, method
);
trace!("Sending {:?} to {:?}", request, selector);
tracing::debug!("Sending {:?} to {:?}", request, selector);
Ok(self
.z
.get(&selector)
Expand All @@ -95,13 +94,14 @@ impl RPCClientChannel {
U: Serialize + Clone + std::fmt::Debug,
for<'de3> U: Deserialize<'de3>,
{
tracing::debug!("calling function: {request:?}");
let data_receiver = self
.send(server_id, &request, method, tout)
.await
.map_err(|e| Status::new(Code::InternalError, format!("communication error: {e:?}")))?;
//takes only one, eval goes to only one
let reply = data_receiver.recv_async().await;
log::trace!("Response from zenoh is {:?}", reply);
tracing::debug!("Response from zenoh is {:?}", reply);
if let Ok(reply) = reply {
match reply.result() {
Ok(sample) => {
Expand All @@ -125,15 +125,15 @@ impl RPCClientChannel {
}
}
Err(e) => {
log::error!("Unable to get sample from {e:?}");
tracing::error!("Unable to get sample from {e:?}");
Err(Status::new(
Code::InternalError,
format!("Unable to get sample from {e:?}"),
))
}
}
} else {
log::error!("No data from server");
tracing::error!("No data from server");
Err(Status::new(
Code::InternalError,
format!("No data from call_fun for Request {:?}", request),
Expand All @@ -146,34 +146,41 @@ impl RPCClientChannel {
ids: &[ZenohId],
tout: Duration,
) -> Result<Vec<ServerMetadata>, Status> {
let ke = "@rpc/*/metadata".to_string();
let data = self
.z
.get(ke)
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.timeout(tout)
.await
.map_err(|e| Status::new(Code::InternalError, format!("communication error: {e:?}")))?;
let mut tot_metadata = Vec::with_capacity(ids.len());

let metadata = data
.into_iter()
// getting only reply with Sample::Ok
.filter_map(|r| r.into_result().ok())
// getting only the ones we can deserialize
.filter_map(|s| {
// This is infallible
let raw_data = s.payload().to_bytes().to_vec();
deserialize::<WireMessage>(&raw_data).ok()
})
// get only the ones that do not have errors
.filter_map(|wmgs| wmgs.payload)
// get the ones where we can actually deserialize the payload
.filter_map(|pl| deserialize::<ServerMetadata>(&pl).ok())
// filter by the IDs providing the needed service
.filter(|m| ids.contains(&m.id))
.collect();
for id in ids {
//do one query per id... not efficient
let ke = format!("@rpc/{id}/metadata");
let data = self
.z
.get(ke)
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.timeout(tout)
.await
.map_err(|e| {
Status::new(Code::InternalError, format!("communication error: {e:?}"))
})?;
let metadata = data
.into_iter()
// getting only reply with Sample::Ok
.filter_map(|r| r.into_result().ok())
// getting only the ones we can deserialize
.filter_map(|s| {
// This is infallible
let raw_data = s.payload().to_bytes().to_vec();
deserialize::<WireMessage>(&raw_data).ok()
})
// get only the ones that do not have errors
.filter_map(|wmgs| wmgs.payload)
// get the ones where we can actually deserialize the payload
.filter_map(|pl| deserialize::<ServerMetadata>(&pl).ok())
// filter by the IDs providing the needed service
.filter(|m| ids.contains(&m.id))
.collect::<Vec<ServerMetadata>>();

Ok(metadata)
tot_metadata.extend_from_slice(&metadata);
}
Ok(tot_metadata)
}
}
16 changes: 8 additions & 8 deletions zrpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Server {
)
})?;

log::debug!("[Server] declared queryabled on: {ke}");
tracing::debug!("[Server] declared queryabled on: {ke}");

for k in self.services.keys() {
let ke = format!("@rpc/{}/service/{k}", self.instance_uuid());
Expand Down Expand Up @@ -147,11 +147,11 @@ impl Server {

let ke = query.key_expr().clone();

log::debug!("[Server] received query on: {ke}");
tracing::debug!("[Server] received query on: {ke}");

let fut: ServerTaskFuture = match Self::get_token(&ke, 2) {
Some("service") => {
log::debug!("[Server] call to service");
tracing::debug!("[Server] call to service");
let service_name = Self::get_service_name(&ke)?;
let svc = self
.services
Expand All @@ -173,14 +173,14 @@ impl Server {
Box::pin(Self::service_call(svc, ke.clone(), payload))
}
Some("metadata") => {
log::debug!("[Server] call to metadata");
tracing::debug!("[Server] call to metadata");
Box::pin(Self::server_metadata(
self.labels.clone(),
self.instance_uuid(),
))
}
Some(_) | None => {
log::warn!("[Server] unknown call");
tracing::error!("[Server] unknown call");
// this calls returns internal error
Box::pin(Self::create_error())
}
Expand All @@ -200,7 +200,7 @@ impl Server {
}
};
let res = query.reply(ke, sample).await;
log::trace!("Query Result is: {res:?}");
tracing::debug!("Query Result is: {res:?}");
});
}

Expand All @@ -223,7 +223,7 @@ impl Server {

match svc.call(msg).await {
Ok(msg) => {
log::trace!("Service response: {msg:?}");
tracing::debug!("Service response: {msg:?}");
let wmsg = WireMessage {
payload: Some(msg.body),
status: Status::ok(""),
Expand All @@ -233,7 +233,7 @@ impl Server {
.map_err(|e| Status::internal_error(format!("Serialization error: {e:?}")))
}
Err(e) => {
log::trace!("Service error is : {e:?}");
tracing::error!("Service error is : {e:?}");
let wmsg = WireMessage {
payload: None,
status: e,
Expand Down

0 comments on commit 7bae670

Please sign in to comment.