Skip to content

Commit

Permalink
chore: bump to zenoh 1.0.0 (#8)
Browse files Browse the repository at this point in the history
* chore: bump to zenoh 1.0.0-alpha.6

Signed-off-by: Gabriele Baldoni <[email protected]>

* chore: bump zenoh 1.0.0, updating macros

Signed-off-by: Gabriele Baldoni <[email protected]>

* style: cargo format

Signed-off-by: Gabriele Baldoni <[email protected]>

* ci: adding cleanup step

Signed-off-by: Gabriele Baldoni <[email protected]>

* deps: tracking zenoh main

Signed-off-by: Gabriele Baldoni <[email protected]>

* deps: zenoh

Signed-off-by: Gabriele Baldoni <[email protected]>

* deps: updated zenoh

Signed-off-by: Gabriele Baldoni <[email protected]>

* deps: sync zenoh

Signed-off-by: Gabriele Baldoni <[email protected]>

* chore: prepare beta release

Signed-off-by: Gabriele Baldoni <[email protected]>

* deps: zenoh 1.0.0

Signed-off-by: Gabriele Baldoni <[email protected]>

* chore: release 0.8.0

Signed-off-by: Gabriele Baldoni <[email protected]>

---------

Signed-off-by: Gabriele Baldoni <[email protected]>
  • Loading branch information
gabrik authored Oct 21, 2024
1 parent 23a5d17 commit 14a64b2
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 112 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --release --verbose
args: --release --verbose

- name: Clean up
if: always()
uses: actions-rs/cargo@v1
with:
command: clean
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc"
license = " EPL-2.0 OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/ZettaScaleLabs/zenoh-rpc"
version = "0.8.0-alpha.1"
version = "0.8.0"

[profile.release]
codegen-units = 1
Expand Down Expand Up @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [
"rt",
"time",
] }
zenoh = { version = "0.11.0", default-features = false }
zenoh-codec = { version = "0.11.0" }
zenoh-core = { version = "0.11.0" }
zenoh-ext = { version = "0.11.0" }
zenoh-macros = { version = "0.11.0" }
zenoh-protocol = { version = "0.11.0" }
zenoh-util = { version = "0.11.0" }
zenoh = { version= "1.0.0", default-features = false }
zenoh-codec = { version= "1.0.0" }
zenoh-core = { version= "1.0.0" }
zenoh-ext = { version= "1.0.0" }
zenoh-macros = { version= "1.0.0" }
zenoh-protocol = { version= "1.0.0" }
zenoh-util = { version= "1.0.0" }
7 changes: 2 additions & 5 deletions zrpc-derive/examples/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,10 @@ impl Hello for MyServer {
#[tokio::main]
async fn main() {
env_logger::init();
use zenoh::prelude::r#async::*;

let mut config = zenoh::config::Config::default();
config
.set_mode(Some(zenoh::config::whatami::WhatAmI::Peer))
.unwrap();
let zsession = Arc::new(zenoh::open(config).res().await.unwrap());
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
let zsession = zenoh::open(config).await.unwrap();

let z = zsession.clone();
let client = HelloClient::builder(zsession).build();
Expand Down
32 changes: 11 additions & 21 deletions zrpc-derive/examples/simplified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
use zenoh::config::ZenohId;
use zenoh::key_expr::format::KeFormat;
use zenoh::prelude::ZenohId;

use zenoh::key_expr::KeyExpr;
use zrpc::prelude::*;

use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::ops::Deref;
use std::str::{self, FromStr};
use std::time::Duration;
use zenoh::{Session, SessionDeclarations};

use serde::{Deserialize, Serialize};
use zenoh::prelude::r#async::*;
use zenoh::Session;

// this is the user defined trait
#[async_trait::async_trait]
Expand Down Expand Up @@ -239,7 +238,7 @@ impl Deref for SubResponse {
}

pub struct HelloClientBuilder<'a> {
pub z: Arc<Session>,
pub z: Session,
pub labels: HashSet<String>,
ke_format: KeFormat<'a>,
pub tout: Duration,
Expand Down Expand Up @@ -284,15 +283,15 @@ impl<'a> HelloClientBuilder<'a> {
pub struct HelloClient<'a> {
pub(crate) ch: RPCClientChannel,
pub(crate) ke_format: KeFormat<'a>,
pub(crate) z: Arc<Session>,
pub(crate) z: Session,
pub(crate) tout: Duration,
pub(crate) labels: HashSet<String>,
}

// generated client code

impl<'a> HelloClient<'a> {
pub fn builder(z: Arc<zenoh::Session>) -> HelloClientBuilder<'a> {
pub fn builder(z: zenoh::Session) -> HelloClientBuilder<'a> {
HelloClientBuilder {
z,
labels: HashSet::new(),
Expand Down Expand Up @@ -349,21 +348,15 @@ impl<'a> HelloClient<'a> {
.z
.liveliness()
.get("@rpc/*/service/Hello")
.res()
.await
.map_err(|e| {
Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;

let ids = res
.into_iter()
.map(|e| {
self.extract_id_from_ke(
&e.sample
.map_err(|_| Status::unavailable("Cannot get value from sample"))?
.key_expr,
)
})
.filter_map(|e| e.into_result().ok())
.map(|e| self.extract_id_from_ke(e.key_expr()))
.collect::<Result<Vec<ZenohId>, Status>>()?;

// get server metadata
Expand Down Expand Up @@ -400,13 +393,10 @@ impl<'a> HelloClient<'a> {
async fn main() {
{
env_logger::init();
use zenoh::prelude::r#async::*;

let mut config = zenoh::config::Config::default();
config
.set_mode(Some(zenoh::config::whatami::WhatAmI::Peer))
.unwrap();
let zsession = Arc::new(zenoh::open(config).res().await.unwrap());
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
let zsession = zenoh::open(config).await.unwrap();

let z = zsession.clone();

Expand Down
24 changes: 10 additions & 14 deletions zrpc-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl<'a> ServiceGenerator<'a> {
quote! {
#[automatically_derived]
#vis struct #client_builder_ident<'a> {
pub(crate) z: std::sync::Arc<zenoh::Session>,
pub(crate) z: zenoh::Session,
ke_format: zenoh::key_expr::format::KeFormat<'a>,
pub(crate) tout: std::time::Duration,
pub(crate) labels: std::collections::HashSet<std::string::String>,
Expand Down Expand Up @@ -513,7 +513,7 @@ impl<'a> ServiceGenerator<'a> {
#[derive(Clone, Debug)]
#vis struct #client_ident<'a> {
pub(crate) ch : zrpc::prelude::RPCClientChannel,
pub(crate) z: std::sync::Arc<zenoh::Session>,
pub(crate) z: zenoh::Session,
pub(crate) ke_format: zenoh::key_expr::format::KeFormat<'a>,
pub(crate) tout: std::time::Duration,
pub(crate) labels: std::collections::HashSet<std::string::String>,
Expand All @@ -523,7 +523,7 @@ impl<'a> ServiceGenerator<'a> {
impl<'a> #client_ident<'a> {


#vis fn builder(z: std::sync::Arc<zenoh::Session>) -> #client_builder_ident<'a> {
#vis fn builder(z: zenoh::Session) -> #client_builder_ident<'a> {
#client_builder_ident {
z,
labels: std::collections::HashSet::new(),
Expand All @@ -535,34 +535,30 @@ impl<'a> ServiceGenerator<'a> {

#(#fns)*

async fn find_server(&self) -> std::result::Result<zenoh::prelude::ZenohId, zrpc::prelude::Status> {
use zenoh::prelude::r#async::AsyncResolve;
use zenoh::SessionDeclarations;
async fn find_server(&self) -> std::result::Result<zenoh::config::ZenohId, zrpc::prelude::Status> {

let res = self
.z
.liveliness()
.get(#rpc_ke)
.res()
.await
.map_err(|e| {
zrpc::prelude::Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;();

let ids = res
.into_iter()
.filter_map(|e| e.into_result().ok())
.map(|e| {
self.extract_id_from_ke(
&e.sample
.map_err(|_| zrpc::prelude::Status::unavailable("Cannot get value from sample"))?
.key_expr,
e.key_expr()
)
})
.collect::<std::result::Result<std::vec::Vec<zenoh::prelude::ZenohId>, zrpc::prelude::Status>>()?;
.collect::<std::result::Result<std::vec::Vec<zenoh::config::ZenohId>, zrpc::prelude::Status>>()?;

let metadatas = self.ch.get_servers_metadata(&ids, self.tout).await?;

let mut ids: Vec<zenoh::prelude::ZenohId> = metadatas
let mut ids: Vec<zenoh::config::ZenohId> = metadatas
.into_iter()
.filter(|m| m.labels.is_superset(&self.labels))
.map(|m| m.id)
Expand All @@ -572,7 +568,7 @@ impl<'a> ServiceGenerator<'a> {
.ok_or(zrpc::prelude::Status::unavailable("No servers found"))
}

fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result<zenoh::prelude::ZenohId, zrpc::prelude::Status> {
fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result<zenoh::config::ZenohId, zrpc::prelude::Status> {
use std::str::FromStr;
let id_str = self
.ke_format
Expand All @@ -581,7 +577,7 @@ impl<'a> ServiceGenerator<'a> {
.get("zid")
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to get server id from key expression: {e:?}")))?;

zenoh::prelude::ZenohId::from_str(id_str)
zenoh::config::ZenohId::from_str(id_str)
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}")))

}
Expand Down
49 changes: 25 additions & 24 deletions zrpc-derive/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
* ADLINK fog05 team, <[email protected]>
*********************************************************************************/

use std::sync::Arc;
use std::{str::FromStr, sync::Arc};
use tokio::sync::Mutex;

use async_trait::async_trait;
use zenoh::config::{EndPoint, ZenohId};
//importing the macros
use zenoh::prelude::r#async::*;
use zrpc::prelude::*;
use zrpc_derive::service;

Expand Down Expand Up @@ -55,34 +55,35 @@ impl Hello for MyServer {
fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::config::Config {
let mut config = zenoh::config::Config::default();
config.set_id(id).unwrap();
config
.set_mode(Some(zenoh::config::whatami::WhatAmI::Peer))
.unwrap();
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
config.scouting.multicast.set_enabled(Some(false)).unwrap();
config.listen.endpoints.push(listen.parse().unwrap());
config.connect.endpoints.push(connect.parse().unwrap());

let listen: Vec<EndPoint> = vec![listen.parse().unwrap()];
let connect: Vec<EndPoint> = vec![connect.parse().unwrap()];
config.listen.endpoints.set(listen).unwrap();
config.connect.endpoints.set(connect).unwrap();

config
}

async fn wait_for_peer(session: &zenoh::Session, id: ZenohId) {
while !session.info().peers_zid().res().await.any(|e| e == id) {
while !session.info().peers_zid().await.any(|e| e == id) {
tokio::time::sleep(std::time::Duration::from_secs(1)).await
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn service_call() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a0").unwrap();
let client_zid = ZenohId::from_str("a1").unwrap();

let client_config = configure_zenoh(
client_zid,
"tcp/127.0.0.1:9003".to_string(),
"tcp/127.0.0.1:9002".to_string(),
);

let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let client_session = zenoh::open(client_config).await.unwrap();

let c_zid_server = server_zid;
tokio::task::spawn(async move {
Expand All @@ -91,7 +92,7 @@ async fn service_call() {
"tcp/127.0.0.1:9002".to_string(),
"tcp/127.0.0.1:9003".to_string(),
);
let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
wait_for_peer(&server_session, client_zid).await;

let service = MyServer {
Expand Down Expand Up @@ -138,8 +139,8 @@ async fn service_call() {

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn service_unavailable() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a2").unwrap();
let client_zid = ZenohId::from_str("a3").unwrap();

let server_config = configure_zenoh(
server_zid,
Expand All @@ -153,8 +154,8 @@ async fn service_unavailable() {
"tcp/127.0.0.1:9004".to_string(),
);

let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
let client_session = zenoh::open(client_config).await.unwrap();

// Check zenoh sessions are connected
wait_for_peer(&server_session, client_zid).await;
Expand All @@ -170,16 +171,16 @@ async fn service_unavailable() {

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn server_not_matching() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a4").unwrap();
let client_zid = ZenohId::from_str("a5").unwrap();

let client_config = configure_zenoh(
client_zid,
"tcp/127.0.0.1:9007".to_string(),
"tcp/127.0.0.1:9006".to_string(),
);

let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let client_session = zenoh::open(client_config).await.unwrap();

let c_zid_server = server_zid;
let st = tokio::task::spawn(async move {
Expand All @@ -188,7 +189,7 @@ async fn server_not_matching() {
"tcp/127.0.0.1:9006".to_string(),
"tcp/127.0.0.1:9007".to_string(),
);
let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
wait_for_peer(&server_session, client_zid).await;

let service = MyServer {
Expand Down Expand Up @@ -220,16 +221,16 @@ async fn server_not_matching() {

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn server_matching() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a6").unwrap();
let client_zid = ZenohId::from_str("a7").unwrap();

let client_config = configure_zenoh(
client_zid,
"tcp/127.0.0.1:9009".to_string(),
"tcp/127.0.0.1:9008".to_string(),
);

let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let client_session = zenoh::open(client_config).await.unwrap();

let c_zid_server = server_zid;
tokio::task::spawn(async move {
Expand All @@ -238,7 +239,7 @@ async fn server_matching() {
"tcp/127.0.0.1:9008".to_string(),
"tcp/127.0.0.1:9009".to_string(),
);
let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
wait_for_peer(&server_session, client_zid).await;

let service = MyServer {
Expand Down
Loading

0 comments on commit 14a64b2

Please sign in to comment.