Skip to content

Commit

Permalink
repo: APIs to fetch keys, collections, and records from a repository
Browse files Browse the repository at this point in the history
  • Loading branch information
str4d committed May 12, 2024
1 parent a9dc6fa commit 149f4ae
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 0 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ atrium-api = { version = "0.21.0", path = "atrium-api" }
atrium-xrpc = { version = "0.10.5", path = "atrium-xrpc" }
atrium-xrpc-client = { version = "0.5.2", path = "atrium-xrpc-client" }

# async in streams
async-stream = "0.3"

# async in traits
# Can be removed once MSRV is at least 1.75.0.
async-trait = "0.1.68"
Expand Down
3 changes: 3 additions & 0 deletions atrium-repo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ license.workspace = true
keywords.workspace = true

[dependencies]
atrium-api.workspace = true

# Async
async-stream.workspace = true
futures = { workspace = true, features = ["std"] }
tokio.workspace = true
tokio-util = { workspace = true, features = ["compat"] }
Expand Down
1 change: 1 addition & 0 deletions atrium-repo/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mod car;
mod mst;
pub mod repo;
229 changes: 229 additions & 0 deletions atrium-repo/src/repo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use async_stream::try_stream;
use atrium_api::types::{
string::{Did, Nsid, RecordKey},
Collection,
};
use futures::Stream;
use ipld_core::cid::Cid;
use serde::Deserialize;
use tokio::io::{AsyncRead, AsyncSeek, BufReader};

use crate::{
car::{self, IndexedReader},
mst::{self, Located},
};

#[derive(Debug, Deserialize)]
struct Commit {

Check warning on line 17 in atrium-repo/src/repo.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] reported by reviewdog 🐶 warning: fields `version`, `rev`, and `prev` are never read --> atrium-repo/src/repo.rs:19:5 | 17 | struct Commit { | ------ fields in this struct 18 | did: Did, 19 | version: u32, | ^^^^^^^ 20 | data: Cid, 21 | rev: String, | ^^^ 22 | prev: Option<Cid>, | ^^^^ | = note: `Commit` has a derived impl for the trait `Debug`, but this is intentionally ignored during dead code analysis = note: `#[warn(dead_code)]` on by default Raw Output: atrium-repo/src/repo.rs:17:8:w:warning: fields `version`, `rev`, and `prev` are never read --> atrium-repo/src/repo.rs:19:5 | 17 | struct Commit { | ------ fields in this struct 18 | did: Did, 19 | version: u32, | ^^^^^^^ 20 | data: Cid, 21 | rev: String, | ^^^ 22 | prev: Option<Cid>, | ^^^^ | = note: `Commit` has a derived impl for the trait `Debug`, but this is intentionally ignored during dead code analysis = note: `#[warn(dead_code)]` on by default __END__
did: Did,
version: u32,
data: Cid,
rev: String,
prev: Option<Cid>,
}

#[derive(Debug)]
pub struct Repository<R: AsyncRead + AsyncSeek> {
db: IndexedReader<BufReader<R>>,
latest_commit: Commit,
}

impl<R: AsyncRead + AsyncSeek + Unpin + Send> Repository<R> {
/// Loads a repository from the given reader.
pub async fn load(reader: R) -> Result<Self, Error> {
let mut db = IndexedReader::new(BufReader::new(reader)).await?;
let root = db.header().roots[0];

let commit_block = db.get_block(&root).await?;
let latest_commit: Commit = serde_ipld_dagcbor::from_reader(&commit_block[..])?;

Ok(Self { db, latest_commit })
}

/// Returns the DID for the repository's user.
pub fn did(&self) -> &Did {
&self.latest_commit.did
}

/// Parses the data with the given CID as an MST node.
async fn read_mst_node(&mut self, cid: Cid) -> Result<mst::Node, Error> {
let node_bytes = self.db.get_block(&cid).await?;
let node = mst::Node::parse(&node_bytes)?;
Ok(node)
}

/// Parses the data with the given CID as a record of the specified collection.
async fn read_record<C: Collection>(&mut self, cid: Cid) -> Result<C::Record, Error> {
let data = self.db.get_block(&cid).await?;
let parsed: C::Record = serde_ipld_dagcbor::from_reader(&data[..])?;
Ok(parsed)
}

async fn resolve_subtree<K>(
&mut self,
link: Cid,
prefix: &[u8],
key_fn: impl Fn(&[u8], Cid) -> Result<K, Error>,
stack: &mut Vec<Located<K>>,
) -> Result<(), Error> {
let node = self.read_mst_node(link).await?;

// Read the entries from the node in reverse order; pushing each
// entry onto the stack un-reverses their order.
for entry in node.reversed_entries_with_prefix(prefix) {
stack.push(match entry {
Located::Entry((key, cid)) => Located::Entry(key_fn(key, cid)?),
Located::InSubtree(cid) => Located::InSubtree(cid),
});
}

Ok(())
}

async fn resolve_subtree_reversed<K>(
&mut self,
link: Cid,
prefix: &[u8],
key_fn: impl Fn(&[u8], Cid) -> Result<K, Error>,
stack: &mut Vec<Located<K>>,
) -> Result<(), Error> {
let node = self.read_mst_node(link).await?;

// Read the entries from the node in forward order; pushing each
// entry onto the stack reverses their order.
for entry in node.entries_with_prefix(prefix) {
stack.push(match entry {
Located::Entry((key, cid)) => Located::Entry(key_fn(key, cid)?),
Located::InSubtree(cid) => Located::InSubtree(cid),
});
}

Ok(())
}

/// Returns a stream of all keys in this repository.
pub fn keys<'a>(&'a mut self) -> impl Stream<Item = Result<String, Error>> + 'a {

Check warning on line 105 in atrium-repo/src/repo.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] reported by reviewdog 🐶 warning: the following explicit lifetimes could be elided: 'a --> atrium-repo/src/repo.rs:105:5 | 105 | pub fn keys<'a>(&'a mut self) -> impl Stream<Item = Result<String, Error>> + 'a { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes = note: `#[warn(clippy::needless_lifetimes)]` on by default help: elide the lifetimes | 105 - pub fn keys<'a>(&'a mut self) -> impl Stream<Item = Result<String, Error>> + 'a { 105 + pub fn keys(&mut self) -> impl Stream<Item = Result<String, Error>> + '_ { | Raw Output: atrium-repo/src/repo.rs:105:5:w:warning: the following explicit lifetimes could be elided: 'a --> atrium-repo/src/repo.rs:105:5 | 105 | pub fn keys<'a>(&'a mut self) -> impl Stream<Item = Result<String, Error>> + 'a { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes = note: `#[warn(clippy::needless_lifetimes)]` on by default help: elide the lifetimes | 105 - pub fn keys<'a>(&'a mut self) -> impl Stream<Item = Result<String, Error>> + 'a { 105 + pub fn keys(&mut self) -> impl Stream<Item = Result<String, Error>> + '_ { | __END__
// Start from the root of the tree.
let mut stack = vec![Located::InSubtree(self.latest_commit.data)];

try_stream! {
while let Some(located) = stack.pop() {
match located {
Located::Entry(key) => yield key,
Located::InSubtree(link) => {
self.resolve_subtree(
link,
&[],
|key, _| Ok(std::str::from_utf8(key)?.to_string()),
&mut stack,
)
.await?
}
}
}
}
}

/// Returns a stream of the records contained in the given collection.
pub fn get_collection<'a, C: Collection + 'a>(
&'a mut self,
) -> impl Stream<Item = Result<(RecordKey, C::Record), Error>> + 'a {
let prefix = fmt_prefix(C::nsid());

// Start from the root of the tree.
let mut stack = vec![Located::InSubtree(self.latest_commit.data)];

try_stream! {
while let Some(located) = stack.pop() {
match located {
Located::Entry((rkey, cid)) => yield (rkey, self.read_record::<C>(cid).await?),
Located::InSubtree(link) => {
self.resolve_subtree(
link,
prefix.as_bytes(),
|key, cid| Ok((parse_recordkey(&key[prefix.len()..])?, cid)),
&mut stack,
)
.await?
}
}
}
}
}

/// Returns a stream of the records contained in the given collection, in reverse
/// order.
pub fn get_collection_reversed<'a, C: Collection + 'a>(
&'a mut self,
) -> impl Stream<Item = Result<(RecordKey, C::Record), Error>> + 'a {
let prefix = fmt_prefix(C::nsid());

// Start from the root of the tree.
let mut stack = vec![Located::InSubtree(self.latest_commit.data)];

try_stream! {
while let Some(located) = stack.pop() {
match located {
Located::Entry((rkey, cid)) => yield (rkey, self.read_record::<C>(cid).await?),
Located::InSubtree(link) => {
self.resolve_subtree_reversed(
link,
prefix.as_bytes(),
|key, cid| Ok((parse_recordkey(&key[prefix.len()..])?, cid)),
&mut stack,
)
.await?
}
}
}
}
}

/// Returns the specified record from the repository, or `None` if it does not exist.
pub async fn get<C: Collection>(
&mut self,
rkey: &RecordKey,
) -> Result<Option<C::Record>, Error> {
let key = C::repo_path(rkey);

// Start from the root of the tree.
let mut link = self.latest_commit.data;

loop {
let node = self.read_mst_node(link).await?;
match node.get(key.as_bytes()) {
None => return Ok(None),
Some(Located::Entry(cid)) => return Ok(Some(self.read_record::<C>(cid).await?)),
Some(Located::InSubtree(cid)) => link = cid,
}
}
}
}

#[inline(always)]
fn fmt_prefix(nsid: Nsid) -> String {
let mut prefix: String = nsid.into();
prefix.push('/');
prefix
}

fn parse_recordkey(key: &[u8]) -> Result<RecordKey, Error> {
std::str::from_utf8(key)?
.parse::<RecordKey>()
.map_err(Error::InvalidRecordKey)
}

/// Errors that can occur while interacting with a repository.
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("CAR error: {0}")]
Car(#[from] car::Error),
#[error("Invalid key: {0}")]
InvalidKey(#[from] std::str::Utf8Error),
#[error("Invalid RecordKey: {0}")]
InvalidRecordKey(&'static str),
#[error("MST error: {0}")]
Mst(#[from] mst::Error),
#[error("serde_ipld_dagcbor decoding error: {0}")]
Parse(#[from] serde_ipld_dagcbor::DecodeError<std::io::Error>),
}

0 comments on commit 149f4ae

Please sign in to comment.