Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

[v2.0] Branch Announcement Message #262

Merged
merged 11 commits into from
Jul 13, 2022
60 changes: 31 additions & 29 deletions streams/examples/full-example/scenarios/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use crate::GenericTransport;
const PUBLIC_PAYLOAD: &[u8] = b"PUBLICPAYLOAD";
const MASKED_PAYLOAD: &[u8] = b"MASKEDPAYLOAD";

const BASE_BRANCH: &str = "BASE_BRANCH";
const BRANCH1: &str = "BRANCH1";

pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str) -> Result<()> {
let psk = Psk::from_seed("A pre shared key");
let branch1_topic = "BRANCH1";

let mut author = User::builder()
.with_identity(Ed25519::from_seed(author_seed))
Expand All @@ -43,14 +45,14 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
.build()?;

// Confirm that users have id's
let author_id = author.identifier().expect("author should have identifier");
let _author_id = author.identifier().expect("author should have identifier");
let subscriber_a_id = subscriber_a.identifier().expect("subscriber A should have identifier");
let subscriber_b_id = subscriber_b.identifier().expect("subscriber B should have identifier");
assert!(subscriber_c.identifier().is_none());

println!("> Author creates stream and sends its announcement");
// Start at index 1, because we can. Will error if its already in use
let announcement = author.create_stream("BASE_BRANCH").await?;
let announcement = author.create_stream(BASE_BRANCH).await?;
print_send_result(&announcement);
print_user("Author", &author);

Expand All @@ -72,13 +74,13 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
print_user("Author", &author);

println!("> Author creates a new branch");
println!("Branch topic: {}", branch1_topic);
let branch_announcement = author.new_branch(branch1_topic).await?;
println!("Branch topic: {}", BRANCH1);
let branch_announcement = author.new_branch(BASE_BRANCH, BRANCH1).await?;
print_send_result(&branch_announcement);
print_user("Author", &author);

println!("> Author issues keyload for every user subscribed so far [SubscriberA, PSK] in Branch 1");
let keyload_as_author = author.send_keyload_for_all(branch1_topic).await?;
let keyload_as_author = author.send_keyload_for_all(BRANCH1).await?;
print_send_result(&keyload_as_author);
print_user("Author", &author);

Expand All @@ -90,10 +92,10 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
.expect("Subscriber A did not receive the expected branch announcement");
assert!(
branch_1_ann_as_a
.as_announcement()
.expect("expected announcement, found something else")
.author_identifier
.eq(&author_id)
.as_branch_announcement()
.expect("expected branch announcement, found something else")
.topic
.eq(&BRANCH1.into())
);
print_user("Subscriber A", &subscriber_a);
let branch_1_ann_as_b = subscriber_b
Expand All @@ -103,10 +105,10 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
.expect("Subscriber B did not receive the expected branch announcement");
assert!(
branch_1_ann_as_b
.as_announcement()
.expect("expected announcement, found something else")
.author_identifier
.eq(&author_id)
.as_branch_announcement()
.expect("expected branch announcement, found something else")
.topic
.eq(&BRANCH1.into())
);
print_user("Subscriber B", &subscriber_b);
let branch_1_ann_as_c = subscriber_c
Expand All @@ -116,10 +118,10 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
.expect("Subscriber C did not receive the expected branch announcement");
assert!(
branch_1_ann_as_c
.as_announcement()
.expect("expected announcement, found something else")
.author_identifier
.eq(&author_id)
.as_branch_announcement()
.expect("expected branch announcement, found something else")
.topic
.eq(&BRANCH1.into())
);
print_user("Subscriber C", &subscriber_c);

Expand Down Expand Up @@ -163,7 +165,7 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str

println!("> Author sends a tagged packet linked to the keyload");
let tagged_packet_as_author = author
.send_tagged_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_tagged_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&tagged_packet_as_author);
print_user("Author", &author);
Expand Down Expand Up @@ -218,13 +220,13 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
print_user("Author", &author);

println!("> Author issues new keyload in the same branch to incorporate SubscriberB");
let new_keyload_as_author = author.send_keyload_for_all(branch1_topic).await?;
let new_keyload_as_author = author.send_keyload_for_all(BRANCH1).await?;
print_send_result(&new_keyload_as_author);
print_user("Author", &author);

println!("> Author sends a signed packet");
let signed_packet_as_author = author
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&signed_packet_as_author);
print_user("Author", &author);
Expand Down Expand Up @@ -269,7 +271,7 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
.last()
.expect("Subscriber C hasn't received any of the new messages");
let result = subscriber_c
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await;
assert!(
result.is_err(),
Expand All @@ -279,7 +281,7 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str

println!("> Subscriber A attempts to send a signed packet (but he has readonly permission over the branch!)");
let result = subscriber_a
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await;
assert!(
result.is_err(),
Expand All @@ -293,7 +295,7 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
println!("> Author gives Subscriber A write permission");
let new_keyload_as_author = author
.send_keyload(
branch1_topic,
BRANCH1,
author
.subscribers()
.map(|s| {
Expand All @@ -313,7 +315,7 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
println!("> Subscriber A publishes signed packet");
assert_eq!(subscriber_a.sync().await?, 1);
let signed_packet_as_a = subscriber_a
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&signed_packet_as_a);
print_user("Subscriber A", &subscriber_a);
Expand Down Expand Up @@ -447,12 +449,12 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
print_user("Author", &author);

println!("> Author issues a new keyload to remove all subscribers from the branch");
let last_keyload = author.send_keyload_for_all(branch1_topic).await?;
let last_keyload = author.send_keyload_for_all(BRANCH1).await?;
print_send_result(&last_keyload);
print_user("Author", &author);
println!("> Author sends a new signed packet");
let last_signed_packet = author
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&last_signed_packet);
print_user("Author", &author);
Expand Down Expand Up @@ -496,12 +498,12 @@ pub(crate) async fn example<T: GenericTransport>(transport: T, author_seed: &str
println!("> Subscribers A and B try to send a signed packet");
// TODO: THIS SHOULD FAIL ONCE PUBLISHERS ARE TRACKED BY BRANCH AND WE CAN "DEMOTE" SUBSCRIBERS
let a_signed_packet = subscriber_a
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&a_signed_packet);
print_user("Subscriber A", &subscriber_a);
let result = subscriber_b
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await;
print_user("Subscriber B", &subscriber_b);
assert!(result.is_err());
Expand Down
18 changes: 10 additions & 8 deletions streams/examples/full-example/scenarios/did.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use super::utils::{print_send_result, print_user};
const PUBLIC_PAYLOAD: &[u8] = b"PUBLICPAYLOAD";
const MASKED_PAYLOAD: &[u8] = b"MASKEDPAYLOAD";

const BASE_BRANCH: &str = "BASE_BRANCH";
const BRANCH1: &str = "BRANCH1";

pub async fn example(transport: Rc<RefCell<tangle::Client>>) -> Result<()> {
let did_client = DIDClient::new().await?;
println!("> Making DID with method for the Author");
Expand All @@ -35,7 +38,6 @@ pub async fn example(transport: Rc<RefCell<tangle::Client>>) -> Result<()> {

// Generate a simple PSK for storage by users
let psk = Psk::from_seed("A pre shared key");
let branch1_topic = "BRANCH1";

let mut author = User::builder()
.with_identity(DID::PrivateKey(author_did_info))
Expand All @@ -57,7 +59,7 @@ pub async fn example(transport: Rc<RefCell<tangle::Client>>) -> Result<()> {

println!("> Author creates stream and sends its announcement");
// Start at index 1, because we can. Will error if its already in use
let announcement = author.create_stream("BASE_BRANCH").await?;
let announcement = author.create_stream(BASE_BRANCH).await?;
print_send_result(&announcement);
print_user("Author", &author);

Expand Down Expand Up @@ -85,19 +87,19 @@ pub async fn example(transport: Rc<RefCell<tangle::Client>>) -> Result<()> {
print_user("Author", &author);

println!("> Author creates new branch");
let branch_announcement = author.new_branch(branch1_topic).await?;
let branch_announcement = author.new_branch(BASE_BRANCH, BRANCH1).await?;
print_send_result(&branch_announcement);
print_user("Author", &author);

println!("> Author issues keyload for everybody [Subscriber A, Subscriber B, PSK]");
let first_keyload_as_author = author.send_keyload_for_all(branch1_topic).await?;
let first_keyload_as_author = author.send_keyload_for_all(BRANCH1).await?;
print_send_result(&first_keyload_as_author);
print_user("Author", &author);

println!("> Author sends 3 signed packets linked to the keyload");
for _ in 0..3 {
let last_msg = author
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&last_msg);
}
Expand All @@ -106,7 +108,7 @@ pub async fn example(transport: Rc<RefCell<tangle::Client>>) -> Result<()> {
println!("> Author issues new keyload for only Subscriber B and PSK");
let second_keyload_as_author = author
.send_keyload(
branch1_topic,
BRANCH1,
[Permissioned::Read(subscription_b_as_author.header().publisher())],
[psk.to_pskid()],
)
Expand All @@ -117,15 +119,15 @@ pub async fn example(transport: Rc<RefCell<tangle::Client>>) -> Result<()> {
println!("> Author sends 2 more signed packets linked to the latest keyload");
for _ in 0..2 {
let last_msg = author
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&last_msg);
}
print_user("Author", &author);

println!("> Author sends 1 more signed packet linked to the first keyload");
let last_msg = author
.send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD)
.await?;
print_send_result(&last_msg);
print_user("Author", &author);
Expand Down
26 changes: 4 additions & 22 deletions streams/src/api/cursor_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,17 @@ impl CursorStore {
.flat_map(|(topic, branch)| branch.cursors.iter().map(move |(id, cursor)| (topic, id, *cursor)))
}

pub(crate) fn cursors_by_topic(&self, topic: &Topic) -> Option<impl Iterator<Item = (&Identifier, &usize)>> {
self.0.get(topic).map(|inner| inner.cursors.iter())
}

pub(crate) fn insert_cursor(&mut self, topic: &Topic, id: Identifier, cursor: usize) -> Option<usize> {
if let Some(branch) = self.0.get_mut(topic) {
return branch.cursors.insert(id, cursor);
}
None
}

pub(crate) fn set_anchor(&mut self, topic: &Topic, anchor: MsgId) -> Option<InnerCursorStore> {
match self.0.get_mut(topic) {
Some(branch) => {
branch.anchor = anchor;
None
}
None => {
let branch = InnerCursorStore {
anchor,
..Default::default()
};
self.0.insert(topic.clone(), branch)
}
}
}

pub(crate) fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option<InnerCursorStore> {
match self.0.get_mut(topic) {
Some(branch) => {
Expand All @@ -87,10 +75,6 @@ impl CursorStore {
}
}

pub(crate) fn get_anchor(&self, topic: &Topic) -> Option<MsgId> {
self.0.get(topic).map(|branch| branch.anchor)
}

pub(crate) fn get_latest_link(&self, topic: &Topic) -> Option<MsgId> {
self.0.get(topic).map(|branch| branch.latest_link)
}
Expand All @@ -99,13 +83,11 @@ impl CursorStore {
#[derive(Clone, PartialEq, Eq, Default)]
pub(crate) struct InnerCursorStore {
cursors: HashMap<Identifier, usize>,
anchor: MsgId,
latest_link: MsgId,
}

impl fmt::Debug for InnerCursorStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "\t* anchor: {}", self.anchor)?;
writeln!(f, "\t* latest link: {}", self.latest_link)?;
writeln!(f, "\t* cursors:")?;
for (id, cursor) in self.cursors.iter() {
Expand Down
32 changes: 30 additions & 2 deletions streams/src/api/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use alloc::vec::Vec;
use lets::{
address::Address,
id::{Identifier, Permissioned, PskId},
message::{Message as LetsMessage, PreparsedMessage, TransportMessage, HDF},
message::{Message as LetsMessage, PreparsedMessage, Topic, TransportMessage, HDF},
};

// Local
use crate::message::{announcement, keyload, signed_packet, subscription, tagged_packet, unsubscription};
use crate::message::{
announcement, branch_announcement, keyload, signed_packet, subscription, tagged_packet, unsubscription,
};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Message {
Expand Down Expand Up @@ -63,6 +65,10 @@ impl Message {
matches!(self.content, MessageContent::Announcement { .. })
}

pub fn is_branch_announcement(&self) -> bool {
matches!(self.content, MessageContent::BranchAnnouncement { .. })
}

pub fn is_keyload(&self) -> bool {
matches!(self.content, MessageContent::Keyload { .. })
}
Expand Down Expand Up @@ -95,6 +101,14 @@ impl Message {
}
}

pub fn as_branch_announcement(&self) -> Option<&BranchAnnouncement> {
if let MessageContent::BranchAnnouncement(branch_announcement) = &self.content {
Some(branch_announcement)
} else {
None
}
}

pub fn as_keyload(&self) -> Option<&Keyload> {
if let MessageContent::Keyload(keyload) = &self.content {
Some(keyload)
Expand Down Expand Up @@ -171,6 +185,7 @@ impl Message {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum MessageContent {
Announcement(Announcement),
BranchAnnouncement(BranchAnnouncement),
Keyload(Keyload),
SignedPacket(SignedPacket),
TaggedPacket(TaggedPacket),
Expand All @@ -184,6 +199,11 @@ pub struct Announcement {
pub author_identifier: Identifier,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct BranchAnnouncement {
pub topic: Topic,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Keyload {
pub subscribers: Vec<Permissioned<Identifier>>,
Expand Down Expand Up @@ -249,6 +269,14 @@ impl From<announcement::Unwrap> for MessageContent {
}
}

impl<'a> From<branch_announcement::Unwrap<'a>> for MessageContent {
fn from(branch_announcement: branch_announcement::Unwrap<'a>) -> Self {
Self::BranchAnnouncement(BranchAnnouncement {
topic: branch_announcement.into_new_topic(),
})
}
}

impl<'a> From<subscription::Unwrap<'a>> for MessageContent {
fn from(subscription: subscription::Unwrap<'a>) -> Self {
Self::Subscription(Subscription {
Expand Down
Loading