Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to use tokio::spawn in rust client? #598

Open
sternezsl opened this issue May 6, 2024 · 10 comments
Open

how to use tokio::spawn in rust client? #598

sternezsl opened this issue May 6, 2024 · 10 comments

Comments

@sternezsl
Copy link

sternezsl commented May 6, 2024

I would like to concurrently send requests to fbthrift server, so I created a client with the following code:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let client1 = conn.clone();

    let mut tasks = vec![];
    tasks.push(tokio::spawn(async move {
        let result = client1.method1(10).await;
        println!("result: {}", result.unwrap());
    }));

    let client2 = conn.clone();
    tasks.push(tokio::spawn(async move {
        let result = client2.method1(11).await;
        println!("result: {}", result.unwrap());
    }));
    for task in tasks {
        task.await?;
    }
    Ok(())
}

where the TestService::method1 is very simple, it just adds one to the received number

service TestService {
  i32 method1(1: i32 req);
}

the client subspended after receive the first request's result:

> cargo run -r
result: 11

but if I tweak the code with use futures::future::try_join_all

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let response = try_join_all(vec![conn.method1(10), conn.method1(11)]).await;
    println!("{response:?}");
    Ok(())
}

it works and I got the expected result:

> cargo run -r
Ok([11, 12])

Myy question is what is wrong with the tokio::spawn version?

@slawlor
Copy link
Contributor

slawlor commented May 7, 2024

Hello and thank you for opening this issue!

A few questions to follow up with

  1. Do you have a complete example you can share?
  2. Can you try some of the other tokio primatives and see if they work? i.e. JoinSet or tokio::join!?
  3. With your "tweak" can you try to clone the connection for both queries and see if the issue reproduces? It may be that the clone of the connection is improper which might be causing the problem. Or it needs to be wrapped into an Arc in order to work. I have a guess that dropping the conn will close the tcp connection, or cloning it won't properly open multiple connections.

Thanks!

@sternezsl
Copy link
Author

sternezsl commented May 7, 2024

Thanks for your reply

Hello and thank you for opening this issue!

A few questions to follow up with

  1. Do you have a complete example you can share?
  • fbthrift git version: 5140b62e0c
  • rust-shed git version: 0fb315837c

client source code

  • Cargo.toml
[package]
name = "fbthrift-client"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name = "client"
path = "src/main.rs"

[dependencies]
fbthrift = { path = "../fbthrift/thrift/lib/rust" }
fbthrift_demo_if = { path = "../fbthrift-demo" }
fbthrift_tcp = { path = "../rust-shed/shed/fbthrift_ext/tcp" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3.30"
anyhow = "1.0.75"


[build-dependencies]
thrift_compiler = { path = "../rust-shed/shed/thrift_compiler" }
  • main.rs
use fbthrift_demo_if::TestService;
use fbthrift_tcp::TcpTransport;
use fbthrift::CompactProtocol;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let client1 = conn.clone();

    let mut tasks = vec![];
    tasks.push(tokio::spawn(async move {
        let result = client1.method1(10).await;
        println!("result: {}", result.unwrap());
    }));

    let client2 = conn.clone();
    tasks.push(tokio::spawn(async move {
        let result = client2.method1(11).await;
        println!("result: {}", result.unwrap());
    }));
    for task in tasks {
        task.await?;
    }
    Ok(())
}

server source code

  • demo.thrift
```namespace cpp2 demo

service TestService {
  i32 method1(1: i32 req);
}

- DemoHandler.h
```c++
#pragma once

#include "gen-cpp2/TestService.h"
#include <folly/logging/xlog.h>

namespace demo {
namespace cpp2 {
class ExampleHandler : public TestServiceSvIf {
public:
  folly::SemiFuture<int> semifuture_method1(int req) override {
    XLOG(INFO) << "server: receive " << req;
    return folly::makeSemiFuture(req + 1);
  }
};
} // namespace cpp2
} // namespace demo
  • Cargo.toml
[package]
name = "fbthrift_demo_if"
version = "0.0.1+unstable"
authors = ["Daniel Xu <[email protected]>", "Facebook"]
edition = "2021"
license = "Apache-2.0"
build = "src/thrift_build.rs"

[lib]
path = "src/thrift_lib.rs"
test = false
doctest = false

[[test]]
name = "fbthrift_test"
path = "tests/lib.rs"

[dependencies]
anyhow = "1.0.71"
async-trait = "0.1.71"
codegen_includer_proc_macro = { path = "../rust-shed/shed/codegen_includer_proc_macro" }
const-cstr = "0.3.0"
fbthrift = { path = "../fbthrift/thrift/lib/rust" }
fbthrift_demo_if__types = { package = "fbthrift_demo_if_types", path = "src/types" }
futures = { version = "0.3.28", features = ["async-await", "compat"] }
ref-cast = "1.0.18"
thiserror = "1.0.43"
tracing = "0.1.35"
tracing-futures = { version = "0.2.5", features = ["futures-03"] }

[dev-dependencies]
proptest = "1.0"
serde_json = { version = "1.0.100", features = ["float_roundtrip", "unbounded_depth"] }

[build-dependencies]
thrift_compiler = { path = "../rust-shed/shed/thrift_compiler" }

[features]
default = ["thrift_library_unittests_disabled"]
thrift_library_unittests_disabled = []
  1. Can you try some of the other tokio primatives and see if they work? i.e. JoinSet or tokio::join!?

Yes, tokio::join! works

  1. With your "tweak" can you try to clone the connection for both queries and see if the issue reproduces? It may be that the clone of the connection is improper which might be causing the problem. Or it needs to be wrapped into an Arc in order to work. I have a guess that dropping the conn will close the tcp connection, or cloning it won't properly open multiple connections.

After I cloned the connection for both queries, it works, too.

use fbthrift_demo_if::TestService;
use fbthrift_tcp::TcpTransport;
use fbthrift::CompactProtocol;
use futures::future::try_join_all;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
    let transport = TcpTransport::new(stream);
    let conn = <dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport);
    let response = try_join_all(vec![conn.clone().method1(10), conn.clone().method1(11)]).await;
    println!("{response:?}");
    Ok(())
}

Thanks!

If necessary, I could mail both of the client and server to you. Thank you very much.

@slawlor
Copy link
Contributor

slawlor commented May 7, 2024

Can you attach an archive of what you're trying to run here? I think I'm still missing some of the logic to setup your server and verify this. The client code looks potentially complete but I don't see how you're running the thrift server here.

@sternezsl
Copy link
Author

Can you attach an archive of what you're trying to run here? I think I'm still missing some of the logic to setup your server and verify this. The client code looks potentially complete but I don't see how you're running the thrift server here.

sorry about that, I wrote the server with C++.
code.tar.gz

@slawlor
Copy link
Contributor

slawlor commented May 9, 2024

  • fbthrift git version: 5140b62e0c

Are you sure about this revision? Can you drop the full SHA hash here? I can't pull this revision (I'm trying to setup a repro of your problem now)

@sternezsl
Copy link
Author

  • fbthrift git version: 5140b62e0c

Are you sure about this revision? Can you drop the full SHA hash here? I can't pull this revision (I'm trying to setup a repro of your problem now)

the above one is my local commit hash, I'm sorry for bothering you due to my carelessness.

commit 0fb315837c6cc5a823f759332f11d8a4885e826f (HEAD)
Author: Open Source Bot [email protected]
Date: Sat Feb 24 09:34:25 2024 -0800

@slawlor
Copy link
Contributor

slawlor commented May 16, 2024

Hello again, I just wanted to let you know I'm still working on setting up a repro sample. We've hit some hiccups building the thrift compiler, but we're still progressing. It may just take some time to investigate.

As a last option, can you try wrapping the original client in the tokio sample in an Arc and using that to make your requests? I have a suspicion where the problem is, but unfortunately am still trying to setup the repro to confirm it. So it would look something like

let stream = tokio::net::TcpStream::connect("127.0.0.1:6060").await?;
let transport = TcpTransport::new(stream);
let conn = Arc::new(<dyn fbthrift_demo_if::TestService>::new(CompactProtocol, transport));
let client = conn.clone();

@sternezsl
Copy link
Author

sternezsl commented May 17, 2024

Thanks for you effort. But still not working.

# cargo run -r
     Running `target/release/client`
result: 11
^C

@sternezsl
Copy link
Author

@slawlor AFAK, within Meta you use srserver/srclient framework, do you have any plan to open source these components?

@yfeldblum
Copy link
Contributor

@slawlor AFAK, within Meta you use srserver/srclient framework, do you have any plan to open source these components?

Think of thrift itself as generic, general-purpose, not tied into meta-specific internal implementation details. And think of SR as the opposite - it depends on the generic thrift but then also connects it to internal implementation details such as service discovery, logging, and configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants