Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): replace callbacks in RpcModule and Methods #1387

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
114 changes: 114 additions & 0 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ impl Methods {
}
}

/// Remove a method by name.
pub fn remove(&mut self, name: &'static str) -> Option<MethodCallback> {
self.mut_callbacks().remove(name)
}

/// Inserts the method callback for a given name, replacing any existing
/// method with the same name.
pub fn insert_replacing(&mut self, name: &'static str, callback: MethodCallback) -> Option<MethodCallback> {
prestwich marked this conversation as resolved.
Show resolved Hide resolved
prestwich marked this conversation as resolved.
Show resolved Hide resolved
let prev = self.remove(name);
let _ = self.verify_and_insert(name, callback);
prev
}

/// Helper for obtaining a mut ref to the callbacks HashMap.
fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> {
Arc::make_mut(&mut self.callbacks)
Expand All @@ -266,6 +279,23 @@ impl Methods {
Ok(())
}

/// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other`
/// into `self`, removing and returning any existing methods with the same
/// name.
pub fn merge_replacing(&mut self, other: impl Into<Methods>) -> Vec<(&'static str, MethodCallback)> {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
prestwich marked this conversation as resolved.
Show resolved Hide resolved
let mut other = other.into();
let callbacks = self.mut_callbacks();

let mut removed = Vec::with_capacity(other.callbacks.len());
for (name, callback) in other.mut_callbacks().drain() {
if let Some(prev) = callbacks.remove(name) {
removed.push((name, prev));
}
callbacks.insert(name, callback);
}
removed
}

/// Returns the method callback.
pub fn method(&self, method_name: &str) -> Option<&MethodCallback> {
self.callbacks.get(method_name)
Expand Down Expand Up @@ -521,6 +551,16 @@ impl<Context> From<RpcModule<Context>> for Methods {
}

impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// Return a mutable reference to the currently registered methods.
pub fn methods_mut(&mut self) -> &mut Methods {
Copy link
Member

@niklasad1 niklasad1 Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to expose this, can you remove it or make it private?

It's not used currently.

&mut self.methods
}

/// Remove a method by name.
pub fn remove_method(&mut self, method_name: &'static str) -> Option<MethodCallback> {
self.methods.remove(method_name)
}

/// Register a new synchronous RPC method, which computes the response with the given callback.
///
/// ## Examples
Expand Down Expand Up @@ -551,6 +591,20 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
)
}

/// As [`Self::register_method`], but replaces and returns the method if
/// it already exists.
pub fn replace_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Option<MethodCallback>
where
R: IntoResponse + 'static,
F: Fn(Params, &Context, &Extensions) -> R + Send + Sync + 'static,
{
let prev = self.remove_method(method_name);
// Errors here can be ignored, as we know the method is not already
// registered (we just removed it).
let _ = self.register_method(method_name, callback);
prev
}

/// Register a new asynchronous RPC method, which computes the response with the given callback.
///
/// ## Examples
Expand Down Expand Up @@ -589,6 +643,25 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
)
}

/// As [`Self::register_async_method`], but replaces and returns the method
/// if it already exists.
pub fn replace_async_method<R, Fun, Fut>(
&mut self,
method_name: &'static str,
callback: Fun,
) -> Option<MethodCallback>
where
R: IntoResponse + 'static,
Fut: Future<Output = R> + Send,
Fun: (Fn(Params<'static>, Arc<Context>, Extensions) -> Fut) + Clone + Send + Sync + 'static,
{
let prev = self.remove_method(method_name);
// Errors here can be ignored, as we know the method is not already
// registered (we just removed it).
let _ = self.register_async_method(method_name, callback);
prev
}

/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform
/// expensive computations.
Expand Down Expand Up @@ -632,6 +705,19 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(callback)
}

/// As [`Self::register_blocking_method`], but replaces and returns the method if it already exists.
pub fn replace_blocking_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Option<MethodCallback>
where
R: IntoResponse + 'static,
F: Fn(Params, Arc<Context>, &Extensions) -> R + Clone + Send + Sync + 'static,
{
let prev = self.remove_method(method_name);
// Errors here can be ignored, as we know the method is not already
// registered (we just removed it).
let _ = self.register_blocking_method(method_name, callback);
prev
}

/// Register a new publish/subscribe interface using JSON-RPC notifications.
///
/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
Expand Down Expand Up @@ -820,6 +906,34 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(callback)
}

/// As [`Self::register_subscription`] but replaces and returns the method
/// if it already exists.
pub fn replace_subscription<R, F, Fut>(
&mut self,
subscribe_method_name: &'static str,
notif_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Option<MethodCallback>
where
Context: Send + Sync + 'static,
F: (Fn(Params<'static>, PendingSubscriptionSink, Arc<Context>, Extensions) -> Fut)
+ Send
+ Sync
+ Clone
+ 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoSubscriptionCloseResponse + Send,
{
let prev = self.methods.remove(subscribe_method_name);
self.methods.remove(unsubscribe_method_name);
prestwich marked this conversation as resolved.
Show resolved Hide resolved

// Errors here can be ignored, as we know the method is not already
// registered (we just removed it).
let _ = self.register_subscription(subscribe_method_name, notif_method_name, unsubscribe_method_name, callback);
prev
}

/// Similar to [`RpcModule::register_subscription`] but a little lower-level API
/// where handling the subscription is managed the user i.e, polling the subscription
/// such as spawning a separate task to do so.
Expand Down