Skip to content

Commit

Permalink
Use common for api
Browse files Browse the repository at this point in the history
  • Loading branch information
sugyan committed Nov 26, 2024
1 parent a395d90 commit 5bad0a1
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 43 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ hickory-resolver = "0.24.1"
http = "1.1.0"
lru = "0.12.4"
moka = "0.12.8"
tokio = { version = "1.39", default-features = false }
tokio = { version = "1.41", default-features = false }

# HTTP client integrations
isahc = "1.7.2"
Expand Down
1 change: 1 addition & 0 deletions atrium-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
keywords.workspace = true

[dependencies]
atrium-common.workspace = true
atrium-xrpc.workspace = true
chrono = { workspace = true, features = ["serde"] }
http.workspace = true
Expand Down
12 changes: 10 additions & 2 deletions atrium-api/src/agent/atp_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ pub type AtpSession = crate::com::atproto::server::create_session::Output;
pub struct CredentialSession<S, T>
where
S: AtpSessionStore + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
T: XrpcClient + Send + Sync,
{
store: Arc<inner::Store<S>>,
store: Arc<inner::InnerStore<S>>,
inner: Arc<inner::Client<S, T>>,
atproto_service: AtprotoService<inner::Client<S, T>>,
}
Expand All @@ -33,9 +34,10 @@ impl<S, T> CredentialSession<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
pub fn new(xrpc: T, store: S) -> Self {
let store = Arc::new(inner::Store::new(store, xrpc.base_uri()));
let store = Arc::new(inner::InnerStore::new(store, xrpc.base_uri()));
let inner = Arc::new(inner::Client::new(Arc::clone(&store), xrpc));
Self {
store: Arc::clone(&store),
Expand Down Expand Up @@ -148,6 +150,7 @@ impl<S, T> HttpClient for CredentialSession<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
async fn send_http(
&self,
Expand All @@ -161,6 +164,7 @@ impl<S, T> XrpcClient for CredentialSession<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
fn base_uri(&self) -> String {
self.inner.base_uri()
Expand All @@ -183,6 +187,7 @@ impl<S, T> SessionManager for CredentialSession<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
async fn did(&self) -> Option<Did> {
self.store.get_session().await.map(|session| session.data.did)
Expand All @@ -209,6 +214,7 @@ pub struct AtpAgent<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
session_manager: Wrapper<CredentialSession<S, T>>,
inner: Agent<Wrapper<CredentialSession<S, T>>>,
Expand All @@ -218,6 +224,7 @@ impl<S, T> AtpAgent<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
/// Create a new agent.
pub fn new(xrpc: T, store: S) -> Self {
Expand Down Expand Up @@ -284,6 +291,7 @@ impl<S, T> Deref for AtpAgent<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
type Target = Agent<Wrapper<CredentialSession<S, T>>>;

Expand Down
38 changes: 32 additions & 6 deletions atrium-api/src/agent/atp_agent/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{AtpSession, AtpSessionStore};
use crate::did_doc::DidDocument;
use crate::types::string::Did;
use crate::types::TryFromUnknown;
use atrium_common::store::Store;
use atrium_xrpc::error::{Error, Result, XrpcErrorKind};
use atrium_xrpc::types::AuthorizationToken;
use atrium_xrpc::{HttpClient, OutputDataOrBytes, XrpcClient, XrpcRequest};
Expand All @@ -12,7 +13,7 @@ use std::sync::{Arc, RwLock};
use tokio::sync::{Mutex, Notify};

struct WrapperClient<S, T> {
store: Arc<Store<S>>,
store: Arc<InnerStore<S>>,
proxy_header: RwLock<Option<String>>,
labelers_header: Arc<RwLock<Option<Vec<String>>>>,
inner: Arc<T>,
Expand Down Expand Up @@ -69,6 +70,7 @@ impl<S, T> XrpcClient for WrapperClient<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
fn base_uri(&self) -> String {
self.store.get_endpoint()
Expand All @@ -91,7 +93,7 @@ where
}

pub struct Client<S, T> {
store: Arc<Store<S>>,
store: Arc<InnerStore<S>>,
inner: WrapperClient<S, T>,
is_refreshing: Arc<Mutex<bool>>,
notify: Arc<Notify>,
Expand All @@ -101,8 +103,9 @@ impl<S, T> Client<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
pub fn new(store: Arc<Store<S>>, xrpc: T) -> Self {
pub fn new(store: Arc<InnerStore<S>>, xrpc: T) -> Self {
let inner = WrapperClient {
store: Arc::clone(&store),
labelers_header: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -245,6 +248,7 @@ impl<S, T> XrpcClient for Client<S, T>
where
S: AtpSessionStore + Send + Sync,
T: XrpcClient + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
fn base_uri(&self) -> String {
self.inner.base_uri()
Expand All @@ -270,12 +274,12 @@ where
}
}

pub struct Store<S> {
pub struct InnerStore<S> {
inner: S,
endpoint: RwLock<String>,
}

impl<S> Store<S> {
impl<S> InnerStore<S> {
pub fn new(inner: S, initial_endpoint: String) -> Self {
Self { inner, endpoint: RwLock::new(initial_endpoint) }
}
Expand All @@ -289,9 +293,31 @@ impl<S> Store<S> {
}
}

impl<S> AtpSessionStore for Store<S>
impl<S> Store<(), AtpSession> for InnerStore<S>
where
S: AtpSessionStore + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
type Error = S::Error;

async fn get(&self, key: &()) -> core::result::Result<Option<AtpSession>, Self::Error> {
self.inner.get(key).await
}
async fn set(&self, key: (), session: AtpSession) -> core::result::Result<(), Self::Error> {
self.inner.set(key, session).await
}
async fn del(&self, key: &()) -> core::result::Result<(), Self::Error> {
self.inner.del(key).await
}
async fn clear(&self) -> core::result::Result<(), Self::Error> {
self.inner.clear().await
}
}

impl<S> AtpSessionStore for InnerStore<S>
where
S: AtpSessionStore + Send + Sync,
S::Error: std::error::Error + Send + Sync + 'static,
{
async fn get_session(&self) -> Option<AtpSession> {
self.inner.get_session().await
Expand Down
35 changes: 23 additions & 12 deletions atrium-api/src/agent/atp_agent/store.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
mod memory;

use super::AtpSession;
use atrium_common::store::{memory::MemoryStore, Store};
use std::future::Future;

pub use self::memory::MemorySessionStore;
pub(crate) use super::AtpSession;

#[cfg_attr(not(target_arch = "wasm32"), trait_variant::make(Send))]
pub trait AtpSessionStore {
#[must_use]
fn get_session(&self) -> impl Future<Output = Option<AtpSession>>;
#[must_use]
fn set_session(&self, session: AtpSession) -> impl Future<Output = ()>;
#[must_use]
fn clear_session(&self) -> impl Future<Output = ()>;
pub trait AtpSessionStore: Store<(), AtpSession>
where
Self: Send + Sync,
{
fn get_session(&self) -> impl Future<Output = Option<AtpSession>> {
async { self.get(&()).await.ok().flatten() }
}
fn set_session(&self, session: AtpSession) -> impl Future<Output = ()> {
async {
self.set((), session).await.ok();
}
}
fn clear_session(&self) -> impl Future<Output = ()> {
async {
self.clear().await.ok();
}
}
}

pub type MemorySessionStore = MemoryStore<(), AtpSession>;

impl AtpSessionStore for MemorySessionStore {}
20 changes: 0 additions & 20 deletions atrium-api/src/agent/atp_agent/store/memory.rs

This file was deleted.

0 comments on commit 5bad0a1

Please sign in to comment.