Skip to content

Commit

Permalink
feat: integrate RpcMiddleware in IPC (paradigmxyz#7790)
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerZheng authored Apr 23, 2024
1 parent 00ca9cd commit ee1c811
Show file tree
Hide file tree
Showing 12 changed files with 483 additions and 197 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/ethereum/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::EthEvmConfig;
use reth_primitives::{
bytes,
constants::{BEACON_ROOTS_ADDRESS, SYSTEM_ADDRESS},
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/args/rpc_server_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl RethRpcConfig for RpcServerArgs {
.max_subscriptions_per_connection(self.rpc_max_subscriptions_per_connection.get())
}

fn ipc_server_builder(&self) -> IpcServerBuilder {
fn ipc_server_builder(&self) -> IpcServerBuilder<Identity, Identity> {
IpcServerBuilder::default()
.max_subscriptions_per_connection(self.rpc_max_subscriptions_per_connection.get())
.max_request_body_size(self.rpc_max_request_size_bytes())
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait RethRpcConfig {
fn http_ws_server_builder(&self) -> ServerBuilder<Identity, Identity>;

/// Returns the default ipc server builder
fn ipc_server_builder(&self) -> IpcServerBuilder;
fn ipc_server_builder(&self) -> IpcServerBuilder<Identity, Identity>;

/// Creates the [RpcServerConfig] from cli args.
fn rpc_server_config(&self) -> RpcServerConfig;
Expand Down
1 change: 0 additions & 1 deletion crates/optimism/node/src/evm/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ mod tests {
use revm::L1_BLOCK_CONTRACT;
use std::{collections::HashMap, str::FromStr};

use crate::OptimismEvmConfig;
use reth_revm::test_utils::StateProviderTest;

fn create_op_state_provider() -> StateProviderTest {
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ serde_json.workspace = true
tracing.workspace = true
bytes.workspace = true
thiserror.workspace = true
futures-util = "0.3.30"

[dev-dependencies]
tokio-stream = { workspace = true, features = ["sync"] }
reth-tracing.workspace = true
2 changes: 1 addition & 1 deletion crates/rpc/ipc/src/server/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where

while i < self.futures.len() {
if self.futures[i].poll_unpin(cx).is_ready() {
// Using `swap_remove` since we don't care about ordering
// Using `swap_remove` since we don't care about ordering,
// but we do care about removing being `O(1)`.
//
// We don't increment `i` in this branch, since we now
Expand Down
196 changes: 50 additions & 146 deletions crates/rpc/ipc/src/server/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! IPC request handling adapted from [`jsonrpsee`] http request handling
use std::sync::Arc;

use futures::{stream::FuturesOrdered, StreamExt};
use jsonrpsee::{
batch_response_error,
Expand All @@ -8,58 +10,44 @@ use jsonrpsee::{
tracing::server::{rx_log_from_json, tx_log_from_str},
JsonRawValue,
},
server::IdProvider,
types::{
error::{reject_too_many_subscriptions, ErrorCode},
ErrorObject, Id, InvalidRequest, Notification, Params, Request,
},
BatchResponseBuilder, BoundedSubscriptions, CallOrSubscription, MethodCallback, MethodResponse,
MethodSink, Methods, ResponsePayload, SubscriptionState,
server::middleware::rpc::RpcServiceT,
types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Request},
BatchResponseBuilder, MethodResponse, ResponsePayload,
};
use std::{sync::Arc, time::Instant};
use tokio::sync::OwnedSemaphorePermit;
use tokio_util::either::Either;
use tracing::instrument;

type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>;

#[derive(Debug, Clone)]
pub(crate) struct Batch<'a> {
pub(crate) struct Batch<S> {
data: Vec<u8>,
call: CallData<'a>,
}

#[derive(Debug, Clone)]
pub(crate) struct CallData<'a> {
conn_id: usize,
methods: &'a Methods,
id_provider: &'a dyn IdProvider,
sink: &'a MethodSink,
max_response_body_size: u32,
max_log_length: u32,
request_start: Instant,
bounded_subscriptions: BoundedSubscriptions,
rpc_service: S,
}

// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
#[instrument(name = "batch", skip(b), level = "TRACE")]
pub(crate) async fn process_batch_request(b: Batch<'_>) -> Option<String> {
let Batch { data, call } = b;
pub(crate) async fn process_batch_request<S>(
b: Batch<S>,
max_response_body_size: usize,
) -> Option<String>
where
for<'a> S: RpcServiceT<'a> + Send,
{
let Batch { data, rpc_service } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
let mut got_notif = false;
let mut batch_response =
BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);
let mut batch_response = BatchResponseBuilder::new_with_limit(max_response_body_size);

let mut pending_calls: FuturesOrdered<_> = batch
.into_iter()
.filter_map(|v| {
if let Ok(req) = serde_json::from_str::<Request<'_>>(v.get()) {
Some(Either::Right(async {
execute_call(req, call.clone()).await.into_response()
}))
Some(Either::Right(rpc_service.call(req)))
} else if let Ok(_notif) = serde_json::from_str::<Notif<'_>>(v.get()) {
// notifications should not be answered.
got_notif = true;
Expand Down Expand Up @@ -95,92 +83,32 @@ pub(crate) async fn process_batch_request(b: Batch<'_>) -> Option<String> {
}
}

pub(crate) async fn process_single_request(
pub(crate) async fn process_single_request<S>(
data: Vec<u8>,
call: CallData<'_>,
) -> Option<CallOrSubscription> {
rpc_service: &S,
) -> Option<MethodResponse>
where
for<'a> S: RpcServiceT<'a> + Send,
{
if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
Some(execute_call_with_tracing(req, call).await)
Some(execute_call_with_tracing(req, rpc_service).await)
} else if serde_json::from_slice::<Notif<'_>>(&data).is_ok() {
None
} else {
let (id, code) = prepare_error(&data);
Some(CallOrSubscription::Call(MethodResponse::error(id, ErrorObject::from(code))))
Some(MethodResponse::error(id, ErrorObject::from(code)))
}
}

#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")]
pub(crate) async fn execute_call_with_tracing<'a>(
#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(req, rpc_service), level = "TRACE")]
pub(crate) async fn execute_call_with_tracing<'a, S>(
req: Request<'a>,
call: CallData<'_>,
) -> CallOrSubscription {
execute_call(req, call).await
}

pub(crate) async fn execute_call(req: Request<'_>, call: CallData<'_>) -> CallOrSubscription {
let CallData {
methods,
max_response_body_size,
max_log_length,
conn_id,
id_provider,
sink,
request_start,
bounded_subscriptions,
} = call;

rx_log_from_json(&req, call.max_log_length);

let params = Params::new(req.params.as_ref().map(|params| params.get()));
let name = &req.method;
let id = req.id;

let response = match methods.method_with_name(name) {
None => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
CallOrSubscription::Call(response)
}
Some((_name, method)) => match method {
MethodCallback::Sync(callback) => {
let response = (callback)(id, params, max_response_body_size as usize);
CallOrSubscription::Call(response)
}
MethodCallback::Async(callback) => {
let id = id.into_owned();
let params = params.into_owned();
let response =
(callback)(id, params, conn_id, max_response_body_size as usize).await;
CallOrSubscription::Call(response)
}
MethodCallback::AsyncWithDetails(_callback) => {
unimplemented!()
}
MethodCallback::Subscription(callback) => {
if let Some(p) = bounded_subscriptions.acquire() {
let conn_state =
SubscriptionState { conn_id, id_provider, subscription_permit: p };
let response = callback(id, params, sink.clone(), conn_state).await;
CallOrSubscription::Subscription(response)
} else {
let response = MethodResponse::error(
id,
reject_too_many_subscriptions(bounded_subscriptions.max()),
);
CallOrSubscription::Call(response)
}
}
MethodCallback::Unsubscription(callback) => {
// Don't adhere to any resource or subscription limits; always let unsubscribing
// happen!
let result = callback(id, params, conn_id, max_response_body_size as usize);
CallOrSubscription::Call(result)
}
},
};

tx_log_from_str(response.as_response().as_result(), max_log_length);
let _ = request_start;
response
rpc_service: &S,
) -> MethodResponse
where
for<'b> S: RpcServiceT<'b> + Send,
{
rpc_service.call(req).await
}

#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
Expand All @@ -192,31 +120,15 @@ fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodRespons
response
}

#[allow(dead_code)]
pub(crate) struct HandleRequest {
pub(crate) methods: Methods,
pub(crate) max_request_body_size: u32,
pub(crate) max_response_body_size: u32,
pub(crate) max_log_length: u32,
pub(crate) batch_requests_supported: bool,
pub(crate) conn: Arc<OwnedSemaphorePermit>,
pub(crate) bounded_subscriptions: BoundedSubscriptions,
pub(crate) method_sink: MethodSink,
pub(crate) id_provider: Arc<dyn IdProvider>,
}

pub(crate) async fn handle_request(request: String, input: HandleRequest) -> Option<String> {
let HandleRequest {
methods,
max_response_body_size,
max_log_length,
conn,
bounded_subscriptions,
method_sink,
id_provider,
..
} = input;

pub(crate) async fn call_with_service<S>(
request: String,
rpc_service: S,
max_response_body_size: usize,
conn: Arc<OwnedSemaphorePermit>,
) -> Option<String>
where
for<'a> S: RpcServiceT<'a> + Send,
{
enum Kind {
Single,
Batch,
Expand All @@ -231,31 +143,23 @@ pub(crate) async fn handle_request(request: String, input: HandleRequest) -> Opt
})
.unwrap_or(Kind::Single);

let call = CallData {
conn_id: 0,
methods: &methods,
id_provider: &*id_provider,
sink: &method_sink,
max_response_body_size,
max_log_length,
request_start: Instant::now(),
bounded_subscriptions,
};

// Single request or notification
let res = if matches!(request_kind, Kind::Single) {
let response = process_single_request(request.into_bytes(), call).await;
let response = process_single_request(request.into_bytes(), &rpc_service).await;
match response {
Some(CallOrSubscription::Call(response)) => Some(response.to_result()),
Some(CallOrSubscription::Subscription(_)) => {
Some(response) if response.is_method_call() => Some(response.to_result()),
_ => {
// subscription responses are sent directly over the sink, return a response here
// would lead to duplicate responses for the subscription response
None
}
None => None,
}
} else {
process_batch_request(Batch { data: request.into_bytes(), call }).await
process_batch_request(
Batch { data: request.into_bytes(), rpc_service },
max_response_body_size,
)
.await
};

drop(conn);
Expand Down
Loading

0 comments on commit ee1c811

Please sign in to comment.