Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: atrium-repo #168

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"atrium-api",
"atrium-cli",
"atrium-repo",
"atrium-xrpc",
"atrium-xrpc-client",
]
Expand All @@ -25,6 +26,9 @@ atrium-api = { version = "0.21.0", path = "atrium-api" }
atrium-xrpc = { version = "0.10.5", path = "atrium-xrpc" }
atrium-xrpc-client = { version = "0.5.2", path = "atrium-xrpc-client" }

# async in streams
async-stream = "0.3"

# async in traits
# Can be removed once MSRV is at least 1.75.0.
async-trait = "0.1.68"
Expand All @@ -41,11 +45,13 @@ serde = "1.0.160"
serde_bytes = "0.11.9"
serde_json = "1.0.96"
serde_html_form = "0.2.6"
unsigned-varint = "0.8"

# Networking
futures = { version = "0.3.30", default-features = false, features = ["alloc"] }
http = "1.1.0"
tokio = { version = "1.36", default-features = false }
tokio-util = "0.7"

# HTTP client integrations
isahc = "1.7.2"
Expand Down
9 changes: 9 additions & 0 deletions atrium-repo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

Initial release.
30 changes: 30 additions & 0 deletions atrium-repo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "atrium-repo"
version = "0.0.0"
authors = ["Jack Grigg <[email protected]>"]
edition.workspace = true
rust-version.workspace = true
description = "Library for handling AT Protocol (Bluesky) repositories and MSTs"
documentation = "https://docs.rs/atrium-repo"
readme = "README.md"
repository.workspace = true
license.workspace = true
keywords.workspace = true

[dependencies]
atrium-api.workspace = true

# Async
async-stream.workspace = true
futures = { workspace = true, features = ["std"] }
tokio.workspace = true
tokio-util = { workspace = true, features = ["compat"] }

# Encodings
ipld-core = { workspace = true, features = ["serde"] }
serde = { workspace = true, features = ["derive"] }
serde_ipld_dagcbor.workspace = true
unsigned-varint = { workspace = true, features = ["futures"] }

# Errors
thiserror.workspace = true
10 changes: 10 additions & 0 deletions atrium-repo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ATrium Repo

[![](https://img.shields.io/crates/v/atrium-repo)](https://crates.io/crates/atrium-repo)
[![](https://img.shields.io/docsrs/atrium-repo)](https://docs.rs/atrium-repo)
[![](https://img.shields.io/crates/l/atrium-repo)](https://github.com/sugyan/atrium/blob/main/LICENSE)
[![Rust](https://github.com/sugyan/atrium/actions/workflows/api.yml/badge.svg?branch=main)](https://github.com/sugyan/atrium/actions/workflows/api.yml)

ATrium Repo is a Rust library for ATProto repositories, and in particular the Merkle Search Tree (MST) data structure.

TBD.
121 changes: 121 additions & 0 deletions atrium-repo/src/car.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::{collections::HashMap, convert::Infallible};

use futures::{AsyncReadExt as _, AsyncSeekExt as _};
use ipld_core::cid::{multihash::Multihash, Cid, Version};
use serde::Deserialize;
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, SeekFrom};
use tokio_util::compat::TokioAsyncReadCompatExt;
use unsigned_varint::io::ReadError;

#[derive(Debug, Deserialize)]
pub struct V1Header {
pub version: u64,
pub roots: Vec<Cid>,
}

async fn read_cid<R: futures::AsyncRead + futures::AsyncSeek + Unpin>(
mut reader: R,
) -> Result<Cid, Error> {
let version = unsigned_varint::aio::read_u64(&mut reader).await?;
let codec = unsigned_varint::aio::read_u64(&mut reader).await?;

// CIDv0 has the fixed `0x12 0x20` prefix
if [version, codec] == [0x12, 0x20] {
let mut digest = [0u8; 32];
reader.read_exact(&mut digest).await?;
let mh = Multihash::wrap(version, &digest).expect("Digest is always 32 bytes.");
return Ok(Cid::new_v0(mh)?);
}

let version = Version::try_from(version)?;
match version {
Version::V0 => Err(Error::InvalidCidV0),
Version::V1 => {
let start = reader.stream_position().await?;
let _code = unsigned_varint::aio::read_u64(&mut reader).await?;
let size = unsigned_varint::aio::read_u64(&mut reader).await?;
let len = (reader.stream_position().await? - start) + size;

let mut mh_bytes = vec![0; len as usize];
reader.seek(SeekFrom::Start(start)).await?;
reader.read_exact(&mut mh_bytes).await?;

let mh = Multihash::from_bytes(&mh_bytes)?;
Ok(Cid::new(version, codec, mh)?)
}
}
}

/// An indexed reader for CAR files.
#[derive(Debug)]
pub struct IndexedReader<R: AsyncRead + AsyncSeek> {
reader: R,
header: V1Header,
index: HashMap<Cid, (u64, usize)>,
}

impl<R: AsyncRead + AsyncSeek + Unpin> IndexedReader<R> {
pub async fn new(mut reader: R) -> Result<Self, Error> {
// Read the header.
let header_len = unsigned_varint::aio::read_usize((&mut reader).compat()).await?;
let mut header_bytes = vec![0; header_len];
reader.read_exact(&mut header_bytes).await?;
let header: V1Header = serde_ipld_dagcbor::from_slice(&header_bytes)?;

// Build the index.
let mut index = HashMap::new();
loop {
match unsigned_varint::aio::read_u64((&mut reader).compat()).await {
Ok(data_len) => {
let start = reader.stream_position().await?;
let cid = read_cid((&mut reader).compat()).await?;
let offset = reader.stream_position().await?;
let len = data_len - (offset - start);
reader.seek(SeekFrom::Start(offset + len)).await?;
index.insert(cid, (offset, len as usize));
}
Err(ReadError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => Err(e)?,
}
}

Ok(Self {
reader,
header,
index,
})
}

pub fn header(&self) -> &V1Header {
&self.header
}

pub async fn get_block(&mut self, cid: &Cid) -> Result<Vec<u8>, Error> {
let (offset, len) = self.index.get(cid).ok_or_else(|| Error::CidNotFound)?;
let mut buf = vec![0; *len];

self.reader.seek(SeekFrom::Start(*offset)).await?;
self.reader.read_exact(&mut buf).await?;

Ok(buf)
}
}

/// Errors that can occur while interacting with a CAR.
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Invalid CID: {0}")]
Cid(#[from] ipld_core::cid::Error),
#[error("CID does not exist in CAR")]
CidNotFound,
#[error("Invalid explicit CID v0")]
InvalidCidV0,
#[error("Invalid varint: {0}")]
InvalidVarint(#[from] unsigned_varint::io::ReadError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Invalid Multihash: {0}")]
Multihash(#[from] ipld_core::cid::multihash::Error),
#[error("serde_ipld_dagcbor decoding error: {0}")]
Parse(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
}
3 changes: 3 additions & 0 deletions atrium-repo/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod car;
mod mst;
pub mod repo;
Loading
Loading