From 7aa3d0af3b1033c576f56d64bfeb590ec6fe2322 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 20 Jun 2022 16:57:28 -0600 Subject: [PATCH 1/9] Add BranchAnnouncement message type --- .../examples/full-example/scenarios/basic.rs | 58 +++--- .../examples/full-example/scenarios/did.rs | 18 +- streams/src/api/message.rs | 34 +++- streams/src/api/user.rs | 178 ++++++++++++------ streams/src/message/branch_announcement.rs | 137 ++++++++++++++ streams/src/message/message_types.rs | 12 +- streams/src/message/mod.rs | 2 + 7 files changed, 339 insertions(+), 100 deletions(-) create mode 100644 streams/src/message/branch_announcement.rs diff --git a/streams/examples/full-example/scenarios/basic.rs b/streams/examples/full-example/scenarios/basic.rs index 64492775..6c3cbf71 100644 --- a/streams/examples/full-example/scenarios/basic.rs +++ b/streams/examples/full-example/scenarios/basic.rs @@ -19,9 +19,11 @@ use crate::GenericTransport; const PUBLIC_PAYLOAD: &[u8] = b"PUBLICPAYLOAD"; const MASKED_PAYLOAD: &[u8] = b"MASKEDPAYLOAD"; +const BASE_BRANCH: &str = "BASE_BRANCH"; +const BRANCH1: &str = "BRANCH1"; + pub(crate) async fn example(transport: T, author_seed: &str) -> Result<()> { let psk = Psk::from_seed("A pre shared key"); - let branch1_topic = "BRANCH1"; let mut author = User::builder() .with_identity(Ed25519::from_seed(author_seed)) @@ -44,7 +46,7 @@ pub(crate) async fn example(transport: T, author_seed: &str println!("> Author creates stream and sends its announcement"); // Start at index 1, because we can. Will error if its already in use - let announcement = author.create_stream("BASE_BRANCH").await?; + let announcement = author.create_stream(BASE_BRANCH).await?; print_send_result(&announcement); print_user("Author", &author); @@ -69,13 +71,13 @@ pub(crate) async fn example(transport: T, author_seed: &str print_user("Author", &author); println!("> Author creates a new branch"); - println!("Branch topic: {}", branch1_topic); - let branch_announcement = author.new_branch(branch1_topic).await?; + println!("Branch topic: {}", BRANCH1); + let branch_announcement = author.new_branch(BASE_BRANCH, BRANCH1).await?; print_send_result(&branch_announcement); print_user("Author", &author); println!("> Author issues keyload for every user subscribed so far [SubscriberA, PSK] in Branch 1"); - let keyload_as_author = author.send_keyload_for_all(branch1_topic).await?; + let keyload_as_author = author.send_keyload_for_all(BRANCH1).await?; print_send_result(&keyload_as_author); print_user("Author", &author); @@ -87,10 +89,10 @@ pub(crate) async fn example(transport: T, author_seed: &str .expect("Subscriber A did not receive the expected branch announcement"); assert!( branch_1_ann_as_a - .as_announcement() - .expect("expected announcement, found something else") - .author_identifier - .eq(&author.identifier()) + .as_branch_announcement() + .expect("expected branch announcement, found something else") + .topic + .eq(&BRANCH1.into()) ); print_user("Subscriber A", &subscriber_a); let branch_1_ann_as_b = subscriber_b @@ -100,10 +102,10 @@ pub(crate) async fn example(transport: T, author_seed: &str .expect("Subscriber B did not receive the expected branch announcement"); assert!( branch_1_ann_as_b - .as_announcement() - .expect("expected announcement, found something else") - .author_identifier - .eq(&author.identifier()) + .as_branch_announcement() + .expect("expected branch announcement, found something else") + .topic + .eq(&BRANCH1.into()) ); print_user("Subscriber B", &subscriber_b); let branch_1_ann_as_c = subscriber_c @@ -113,10 +115,10 @@ pub(crate) async fn example(transport: T, author_seed: &str .expect("Subscriber C did not receive the expected branch announcement"); assert!( branch_1_ann_as_c - .as_announcement() - .expect("expected announcement, found something else") - .author_identifier - .eq(&author.identifier()) + .as_branch_announcement() + .expect("expected branch announcement, found something else") + .topic + .eq(&BRANCH1.into()) ); print_user("Subscriber C", &subscriber_c); @@ -160,7 +162,7 @@ pub(crate) async fn example(transport: T, author_seed: &str println!("> Author sends a tagged packet linked to the keyload"); let tagged_packet_as_author = author - .send_tagged_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_tagged_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&tagged_packet_as_author); print_user("Author", &author); @@ -215,13 +217,13 @@ pub(crate) async fn example(transport: T, author_seed: &str print_user("Author", &author); println!("> Author issues new keyload in the same branch to incorporate SubscriberB"); - let new_keyload_as_author = author.send_keyload_for_all(branch1_topic).await?; + let new_keyload_as_author = author.send_keyload_for_all(BRANCH1).await?; print_send_result(&new_keyload_as_author); print_user("Author", &author); println!("> Author sends a signed packet"); let signed_packet_as_author = author - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&signed_packet_as_author); print_user("Author", &author); @@ -266,7 +268,7 @@ pub(crate) async fn example(transport: T, author_seed: &str .last() .expect("Subscriber C hasn't received any of the new messages"); let result = subscriber_c - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await; assert!( result.is_err(), @@ -276,7 +278,7 @@ pub(crate) async fn example(transport: T, author_seed: &str println!("> Subscriber A attempts to send a signed packet (but he has readonly permission over the branch!)"); let result = subscriber_a - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await; assert!( result.is_err(), @@ -290,7 +292,7 @@ pub(crate) async fn example(transport: T, author_seed: &str println!("> Author gives Subscriber A write permission"); let new_keyload_as_author = author .send_keyload( - branch1_topic, + BRANCH1, author .subscribers() .map(|s| { @@ -308,7 +310,7 @@ pub(crate) async fn example(transport: T, author_seed: &str println!("> Subscriber A publishes signed packet"); assert_eq!(subscriber_a.sync().await?, 1); let signed_packet_as_a = subscriber_a - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&signed_packet_as_a); print_user("Subscriber A", &subscriber_a); @@ -443,12 +445,12 @@ pub(crate) async fn example(transport: T, author_seed: &str print_user("Author", &author); println!("> Author issues a new keyload to remove all subscribers from the branch"); - let last_keyload = author.send_keyload_for_all(branch1_topic).await?; + let last_keyload = author.send_keyload_for_all(BRANCH1).await?; print_send_result(&last_keyload); print_user("Author", &author); println!("> Author sends a new signed packet"); let last_signed_packet = author - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&last_signed_packet); print_user("Author", &author); @@ -492,12 +494,12 @@ pub(crate) async fn example(transport: T, author_seed: &str println!("> Subscribers A and B try to send a signed packet"); // TODO: THIS SHOULD FAIL ONCE PUBLISHERS ARE TRACKED BY BRANCH AND WE CAN "DEMOTE" SUBSCRIBERS let a_signed_packet = subscriber_a - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&a_signed_packet); print_user("Subscriber A", &subscriber_a); let result = subscriber_b - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await; print_user("Subscriber B", &subscriber_b); assert!(result.is_err()); diff --git a/streams/examples/full-example/scenarios/did.rs b/streams/examples/full-example/scenarios/did.rs index 448be4d3..d3c1a810 100644 --- a/streams/examples/full-example/scenarios/did.rs +++ b/streams/examples/full-example/scenarios/did.rs @@ -26,6 +26,9 @@ use super::utils::{print_send_result, print_user}; const PUBLIC_PAYLOAD: &[u8] = b"PUBLICPAYLOAD"; const MASKED_PAYLOAD: &[u8] = b"MASKEDPAYLOAD"; +const BASE_BRANCH: &str = "BASE_BRANCH"; +const BRANCH1: &str = "BRANCH1"; + pub async fn example(transport: Rc>) -> Result<()> { let did_client = DIDClient::new().await?; println!("> Making DID with method for the Author"); @@ -35,7 +38,6 @@ pub async fn example(transport: Rc>) -> Result<()> { // Generate a simple PSK for storage by users let psk = Psk::from_seed("A pre shared key"); - let branch1_topic = "BRANCH1"; let mut author = User::builder() .with_identity(DID::PrivateKey(author_did_info)) @@ -57,7 +59,7 @@ pub async fn example(transport: Rc>) -> Result<()> { println!("> Author creates stream and sends its announcement"); // Start at index 1, because we can. Will error if its already in use - let announcement = author.create_stream("BASE_BRANCH").await?; + let announcement = author.create_stream(BASE_BRANCH).await?; print_send_result(&announcement); print_user("Author", &author); @@ -88,19 +90,19 @@ pub async fn example(transport: Rc>) -> Result<()> { print_user("Author", &author); println!("> Author creates new branch"); - let branch_announcement = author.new_branch(branch1_topic).await?; + let branch_announcement = author.new_branch(BASE_BRANCH, BRANCH1).await?; print_send_result(&branch_announcement); print_user("Author", &author); println!("> Author issues keyload for everybody [Subscriber A, Subscriber B, PSK]"); - let first_keyload_as_author = author.send_keyload_for_all(branch1_topic).await?; + let first_keyload_as_author = author.send_keyload_for_all(BRANCH1).await?; print_send_result(&first_keyload_as_author); print_user("Author", &author); println!("> Author sends 3 signed packets linked to the keyload"); for _ in 0..3 { let last_msg = author - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&last_msg); } @@ -109,7 +111,7 @@ pub async fn example(transport: Rc>) -> Result<()> { println!("> Author issues new keyload for only Subscriber B and PSK"); let second_keyload_as_author = author .send_keyload( - branch1_topic, + BRANCH1, [Permissioned::Read(subscription_b_as_author.header().publisher())], [psk.to_pskid()], ) @@ -120,7 +122,7 @@ pub async fn example(transport: Rc>) -> Result<()> { println!("> Author sends 2 more signed packets linked to the latest keyload"); for _ in 0..2 { let last_msg = author - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&last_msg); } @@ -128,7 +130,7 @@ pub async fn example(transport: Rc>) -> Result<()> { println!("> Author sends 1 more signed packet linked to the first keyload"); let last_msg = author - .send_signed_packet(branch1_topic, PUBLIC_PAYLOAD, MASKED_PAYLOAD) + .send_signed_packet(BRANCH1, PUBLIC_PAYLOAD, MASKED_PAYLOAD) .await?; print_send_result(&last_msg); print_user("Author", &author); diff --git a/streams/src/api/message.rs b/streams/src/api/message.rs index ef5a1df6..5ef02d81 100644 --- a/streams/src/api/message.rs +++ b/streams/src/api/message.rs @@ -9,11 +9,13 @@ use alloc::vec::Vec; use lets::{ address::Address, id::{Identifier, Permissioned, PskId}, - message::{Message as LetsMessage, PreparsedMessage, TransportMessage, HDF}, + message::{Message as LetsMessage, PreparsedMessage, Topic, TransportMessage, HDF}, }; // Local -use crate::message::{announcement, keyload, signed_packet, subscription, tagged_packet, unsubscription}; +use crate::message::{ + announcement, branch_announcement, keyload, signed_packet, subscription, tagged_packet, unsubscription, +}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Message { @@ -61,6 +63,10 @@ impl Message { matches!(self.content, MessageContent::Announcement { .. }) } + pub fn is_branch_announcement(&self) -> bool { + matches!(self.content, MessageContent::BranchAnnouncement { .. }) + } + pub fn is_keyload(&self) -> bool { matches!(self.content, MessageContent::Keyload { .. }) } @@ -93,6 +99,14 @@ impl Message { } } + pub fn as_branch_announcement(&self) -> Option<&BranchAnnouncement> { + if let MessageContent::BranchAnnouncement(branch_announcement) = &self.content { + Some(branch_announcement) + } else { + None + } + } + pub fn as_keyload(&self) -> Option<&Keyload> { if let MessageContent::Keyload(keyload) = &self.content { Some(keyload) @@ -169,6 +183,7 @@ impl Message { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum MessageContent { Announcement(Announcement), + BranchAnnouncement(BranchAnnouncement), Keyload(Keyload), SignedPacket(SignedPacket), TaggedPacket(TaggedPacket), @@ -182,6 +197,12 @@ pub struct Announcement { pub author_identifier: Identifier, } +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct BranchAnnouncement { + pub topic: Topic, + pub previous_topic: Topic, +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Keyload { pub subscribers: Vec>, @@ -247,6 +268,15 @@ impl From for MessageContent { } } +impl<'a> From> for MessageContent { + fn from(branch_announcement: branch_announcement::Unwrap<'a>) -> Self { + Self::BranchAnnouncement(BranchAnnouncement { + topic: branch_announcement.new_topic().clone(), + previous_topic: branch_announcement.previous_topic().clone(), + }) + } +} + impl<'a> From> for MessageContent { fn from(subscription: subscription::Unwrap<'a>) -> Self { Self::Subscription(Subscription { diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index 25643079..4e99aa70 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -40,7 +40,10 @@ use crate::{ send_response::SendResponse, user_builder::UserBuilder, }, - message::{announcement, keyload, message_types, signed_packet, subscription, tagged_packet, unsubscription}, + message::{ + announcement, branch_announcement, keyload, message_types, signed_packet, subscription, tagged_packet, + unsubscription, + }, }; const ANN_MESSAGE_NUM: usize = 0; // Announcement is always the first message of authors @@ -215,6 +218,7 @@ impl User { let preparsed = msg.parse_header().await?; match preparsed.header().message_type() { message_types::ANNOUNCEMENT => self.handle_announcement(address, preparsed).await, + message_types::BRANCH_ANNOUNCEMENT => self.handle_branch_announcement(address, preparsed).await, message_types::SUBSCRIPTION => self.handle_subscription(address, preparsed).await, message_types::UNSUBSCRIPTION => self.handle_unsubscription(address, preparsed).await, message_types::KEYLOAD => self.handle_keyload(address, preparsed).await, @@ -230,25 +234,15 @@ impl User { // Check Topic let topic = preparsed.header().topic().clone(); let publisher = preparsed.header().publisher(); - let is_base_branch = self.state.stream_address.is_none(); // Insert new branch into store self.state.cursor_store.new_branch(topic.clone()); - - // If the topic of the announcement is not base branch the author cursor needs to be iterated - // on the base branch - if !is_base_branch { - self.state - .cursor_store - .insert_cursor(&self.state.base_branch, publisher, preparsed.header().sequence()); - } - // From the point of view of cursor tracking, the message exists, regardless of the validity or // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message self.state .cursor_store - .insert_cursor(&topic, preparsed.header().publisher(), INIT_MESSAGE_NUM); + .insert_cursor(&topic, publisher, INIT_MESSAGE_NUM); // Unwrap message let announcement = announcement::Unwrap::default(); @@ -265,12 +259,54 @@ impl User { self.set_anchor(&topic, address.relative()); self.set_latest_link(&topic, address.relative()); self.state.author_identifier = Some(author_id); + self.state.exchange_keys.insert(author_id, author_ke_pk); + self.state.base_branch = topic; + self.state.stream_address = Some(address); - if is_base_branch { - self.state.exchange_keys.insert(author_id, author_ke_pk); - self.state.base_branch = topic; - self.state.stream_address = Some(address); - } + Ok(Message::from_lets_message(address, message)) + } + + async fn handle_branch_announcement(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { + // Retrieve header values + let topic = preparsed.header().topic().clone(); + let publisher = preparsed.header().publisher(); + let cursor = preparsed.header().sequence(); + + // Insert new branch into store + self.state.cursor_store.new_branch(topic.clone()); + // From the point of view of cursor tracking, the message exists, regardless of the validity or + // accessibility to its content. Therefore we must update the cursor of the publisher before + // handling the message + self.state + .cursor_store + .insert_cursor(&topic, publisher, INIT_MESSAGE_NUM); + + // Unwrap message + let linked_msg_address = preparsed.header().linked_msg_address().ok_or_else(|| { + anyhow!( + "branch announcement messages must contain the address of the message they are linked to in the header" + ) + })?; + let mut linked_msg_spongos = { + if let Some(spongos) = self.state.spongos_store.get(&linked_msg_address).copied() { + // Spongos must be copied because wrapping mutates it + spongos + } else { + return Ok(Message::orphan(address, preparsed)); + } + }; + let branch_announcement = branch_announcement::Unwrap::new(&mut linked_msg_spongos); + let (message, spongos) = preparsed.unwrap(branch_announcement).await?; + + let previous_topic = message.payload().content().previous_topic(); + // Store spongos + self.state.spongos_store.insert(address.relative(), spongos); + // Store updated cursor for publisher on previous branch + self.state.cursor_store.insert_cursor(previous_topic, publisher, cursor); + + // Update branch links + self.set_anchor(&topic, address.relative()); + self.set_latest_link(&topic, address.relative()); Ok(Message::from_lets_message(address, message)) } @@ -563,70 +599,98 @@ where let stream_rel_address = MsgId::gen(stream_base_address, self.identifier(), &topic, INIT_MESSAGE_NUM); let stream_address = Address::new(stream_base_address, stream_rel_address); + // Prepare HDF and PCF + let header = HDF::new( + message_types::ANNOUNCEMENT, + ANN_MESSAGE_NUM, + self.identifier(), + topic.clone(), + )?; + let content = PCF::new_final_frame().with_content(announcement::Wrap::new(&self.state.user_id)); + + // Wrap message + let (transport_msg, spongos) = LetsMessage::new(header, content).wrap().await?; + + // Attempt to send message + ensure!( + self.transport.recv_message(stream_address).await.is_err(), + anyhow!("stream with address '{}' already exists", stream_address) + ); + let send_response = self.transport.send_message(stream_address, transport_msg).await?; + + // If a message has been sent successfully, insert the base branch into store + self.state.cursor_store.new_branch(topic.clone()); + // Commit message to stores + self.state + .cursor_store + .insert_cursor(&topic, self.identifier(), INIT_MESSAGE_NUM); + self.state.spongos_store.insert(stream_address.relative(), spongos); + + // Update branch links + self.set_anchor(&topic, stream_address.relative()); + self.set_latest_link(&topic, stream_address.relative()); + // Commit Author Identifier and Stream Address to store self.state.stream_address = Some(stream_address); self.state.author_identifier = Some(self.identifier()); self.state.base_branch = topic.clone(); - // Insert the base branch into store - self.state.cursor_store.new_branch(topic.clone()); - - // Create Base Branch - self.new_branch(topic).await + Ok(SendResponse::new(stream_address, send_response)) } /// Prepare new branch Announcement message - pub async fn new_branch>(&mut self, topic: Top) -> Result> { + pub async fn new_branch>(&mut self, from_topic: Top, to_topic: Top) -> Result> { // Check conditions - let stream_address = self - .stream_address() - .ok_or_else(|| anyhow!("before starting a new branch, the stream must be created"))?; + let stream_address = self.stream_address().ok_or_else(|| { + anyhow!("before starting a new branch one must receive the announcement of a stream first") + })?; // Check Topic - let topic: Topic = topic.into(); - let base_branch = self.base_branch().clone(); - let is_base_branch = topic.eq(&base_branch); + let topic: Topic = to_topic.into(); + let prev_topic: Topic = from_topic.into(); + let link_to = self + .get_latest_link(&prev_topic) + .ok_or_else(|| anyhow!("No latest link found in branch <{}>", prev_topic))?; // Update own's cursor - let (user_cursor, address, topic) = if is_base_branch { - (ANN_MESSAGE_NUM, stream_address, base_branch.clone()) - } else { - let cursor = self - .next_cursor(&base_branch) - .map_err(|_| anyhow!("No cursor found in base branch"))?; - let msgid = MsgId::gen(stream_address.base(), self.identifier(), &base_branch, cursor); - let address = Address::new(stream_address.base(), msgid); - (cursor, address, topic) - }; + let user_cursor = self + .next_cursor(&prev_topic) + .map_err(|_| anyhow!("No cursor found in base branch"))?; + let msgid = MsgId::gen(stream_address.base(), self.identifier(), &prev_topic, user_cursor); + let address = Address::new(stream_address.base(), msgid); // Prepare HDF and PCF + // Spongos must be copied because wrapping mutates it + let mut linked_msg_spongos = self + .state + .spongos_store + .get(&link_to) + .copied() + .ok_or_else(|| anyhow!("message '{}' not found in spongos store", link_to))?; let header = HDF::new( - message_types::ANNOUNCEMENT, + message_types::BRANCH_ANNOUNCEMENT, user_cursor, self.identifier(), topic.clone(), - )?; - let content = PCF::new_final_frame().with_content(announcement::Wrap::new(&self.state.user_id)); + )? + .with_linked_msg_address(link_to); + let content = PCF::new_final_frame().with_content(branch_announcement::Wrap::new( + &mut linked_msg_spongos, + &self.state.user_id, + &topic, + &prev_topic, + )); // Wrap message let (transport_msg, spongos) = LetsMessage::new(header, content).wrap().await?; - - // Attempt to send message - ensure!( - self.transport.recv_message(address).await.is_err(), - anyhow!("stream with address '{}' already exists", address) - ); let send_response = self.transport.send_message(address, transport_msg).await?; - // If the branch has not been created yet, create it - if !is_base_branch { - self.state.cursor_store.new_branch(topic.clone()); - self.state - .cursor_store - .insert_cursor(&base_branch, self.identifier(), self.next_cursor(&base_branch)?); - } - - // If message has been sent successfully, commit message to stores + // If message has been sent successfully, create the new branch in store + self.state.cursor_store.new_branch(topic.clone()); + // Commit message to stores and update cursors + self.state + .cursor_store + .insert_cursor(&prev_topic, self.identifier(), self.next_cursor(&prev_topic)?); self.state .cursor_store .insert_cursor(&topic, self.identifier(), INIT_MESSAGE_NUM); diff --git a/streams/src/message/branch_announcement.rs b/streams/src/message/branch_announcement.rs new file mode 100644 index 00000000..7ea4c47c --- /dev/null +++ b/streams/src/message/branch_announcement.rs @@ -0,0 +1,137 @@ +//! `BranchAnnounce` message _wrapping_ and _unwrapping_. +//! +//! The `BranchAnnounce` message creates a new branch in a Stream. +//! +//! It announces the topic for the new branch, as well as informs of the previous branch topic the +//! new branch is being generated from. +//! +//! ```ddml +//! message BranchAnnounce { +//! join(Spongos); +//! mask u8 identifier[32]; +//! mask u8 topic; +//! mask u8 previous_topic; +//! commit; +//! squeeze u8 hash[64]; +//! ed25519(hash) sig; +//! } +//! ``` + +// Rust +use alloc::boxed::Box; + +// 3rd-party +use anyhow::Result; +use async_trait::async_trait; + +// IOTA + +// Streams +use lets::{ + id::{Identifier, Identity}, + message::{ContentSign, ContentSignSizeof, ContentSizeof, ContentUnwrap, ContentVerify, ContentWrap, Topic}, +}; +use spongos::{ + ddml::{ + commands::{sizeof, unwrap, wrap, Commit, Join, Mask}, + io, + }, + Spongos, +}; + +// Local + +pub(crate) struct Wrap<'a> { + initial_state: &'a mut Spongos, + user_id: &'a Identity, + new_topic: &'a Topic, + previous_topic: &'a Topic, +} + +impl<'a> Wrap<'a> { + pub(crate) fn new( + initial_state: &'a mut Spongos, + user_id: &'a Identity, + new_topic: &'a Topic, + previous_topic: &'a Topic, + ) -> Self { + Self { + initial_state, + user_id, + new_topic, + previous_topic, + } + } +} + +#[async_trait(?Send)] +impl<'a> ContentSizeof> for sizeof::Context { + async fn sizeof(&mut self, announcement: &Wrap<'a>) -> Result<&mut Self> { + self.mask(&announcement.user_id.to_identifier())? + .mask(announcement.new_topic)? + .mask(announcement.previous_topic)? + .sign_sizeof(announcement.user_id) + .await? + .commit()?; + Ok(self) + } +} + +#[async_trait(?Send)] +impl<'a, OS> ContentWrap> for wrap::Context +where + OS: io::OStream, +{ + async fn wrap(&mut self, announcement: &mut Wrap<'a>) -> Result<&mut Self> { + self.join(announcement.initial_state)? + .mask(&announcement.user_id.to_identifier())? + .mask(announcement.new_topic)? + .mask(announcement.previous_topic)? + .sign(announcement.user_id) + .await? + .commit()?; + Ok(self) + } +} + +pub(crate) struct Unwrap<'a> { + initial_state: &'a mut Spongos, + new_topic: Topic, + previous_topic: Topic, +} + +impl<'a> Unwrap<'a> { + pub(crate) fn new(initial_state: &'a mut Spongos) -> Self { + Self { + initial_state, + new_topic: Topic::default(), + previous_topic: Topic::default(), + } + } + + pub(crate) fn new_topic(&self) -> &Topic { + &self.new_topic + } + + pub(crate) fn previous_topic(&self) -> &Topic { + &self.previous_topic + } +} + +#[async_trait(?Send)] +impl<'a, IS> ContentUnwrap> for unwrap::Context +where + IS: io::IStream, +{ + async fn unwrap(&mut self, announcement: &mut Unwrap) -> Result<&mut Self> { + let mut author_id = Identifier::default(); + self.join(announcement.initial_state)? + .mask(&mut author_id)? + .mask(&mut announcement.new_topic)? + .mask(&mut announcement.previous_topic)? + .verify(&author_id) + .await? + .commit()?; + Ok(self) + } +} diff --git a/streams/src/message/message_types.rs b/streams/src/message/message_types.rs index bcabd1bc..16ddab46 100644 --- a/streams/src/message/message_types.rs +++ b/streams/src/message/message_types.rs @@ -1,12 +1,14 @@ /// Announcement Message Type pub(crate) const ANNOUNCEMENT: u8 = 0; +/// Branch Announcement Message Type +pub(crate) const BRANCH_ANNOUNCEMENT: u8 = 1; /// Keyload Message Type -pub(crate) const KEYLOAD: u8 = 1; +pub(crate) const KEYLOAD: u8 = 2; /// Signed Packet Message Type -pub(crate) const SIGNED_PACKET: u8 = 2; +pub(crate) const SIGNED_PACKET: u8 = 3; /// Tagged Packet Message Type -pub(crate) const TAGGED_PACKET: u8 = 3; +pub(crate) const TAGGED_PACKET: u8 = 4; /// Subscribe Message Type -pub(crate) const SUBSCRIPTION: u8 = 4; +pub(crate) const SUBSCRIPTION: u8 = 5; /// Unsubscribe Message Type -pub(crate) const UNSUBSCRIPTION: u8 = 5; +pub(crate) const UNSUBSCRIPTION: u8 = 6; diff --git a/streams/src/message/mod.rs b/streams/src/message/mod.rs index d936723a..d0a75792 100644 --- a/streams/src/message/mod.rs +++ b/streams/src/message/mod.rs @@ -19,3 +19,5 @@ pub(crate) mod subscription; pub(crate) mod unsubscription; pub(crate) mod message_types; + +pub(crate) mod branch_announcement; From f72344dc91499484fe32c76bdba288928356468e Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 27 Jun 2022 11:19:30 -0600 Subject: [PATCH 2/9] fmt --- lets/src/transport/tangle.rs | 4 ++-- streams/src/api/messages.rs | 12 +++++++++--- streams/src/api/user_builder.rs | 3 ++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/lets/src/transport/tangle.rs b/lets/src/transport/tangle.rs index 22afdc96..523a7f91 100644 --- a/lets/src/transport/tangle.rs +++ b/lets/src/transport/tangle.rs @@ -124,8 +124,8 @@ impl Address { /// assert_eq!( /// address.to_msg_index().as_ref(), /// &[ - /// 44, 181, 155, 1, 109, 141, 169, 177, 209, 70, 226, 18, 190, 121, 40, 44, 90, 108, 159, 109, 241, 37, 30, 0, - /// 185, 80, 245, 59, 235, 75, 128, 97 + /// 44, 181, 155, 1, 109, 141, 169, 177, 209, 70, 226, 18, 190, 121, 40, 44, 90, 108, 159, + /// 109, 241, 37, 30, 0, 185, 80, 245, 59, 235, 75, 128, 97 /// ], /// ); /// assert_eq!( diff --git a/streams/src/api/messages.rs b/streams/src/api/messages.rs index 5d34269c..aae6b282 100644 --- a/streams/src/api/messages.rs +++ b/streams/src/api/messages.rs @@ -53,7 +53,8 @@ use crate::api::{ /// # let test_transport = Rc::new(RefCell::new(bucket::Client::new())); /// # /// let author_seed = "cryptographically-secure-random-author-seed"; -/// let author_transport: tangle::Client = tangle::Client::for_node("https://chrysalis-nodes.iota.org").await?; +/// let author_transport: tangle::Client = +/// tangle::Client::for_node("https://chrysalis-nodes.iota.org").await?; /// # /// # let test_author_transport = test_transport.clone(); /// # @@ -64,7 +65,8 @@ use crate::api::{ /// .build()?; /// /// let subscriber_seed = "cryptographically-secure-random-subscriber-seed"; -/// let subscriber_transport: tangle::Client = tangle::Client::for_node("https://chrysalis-nodes.iota.org").await?; +/// let subscriber_transport: tangle::Client = +/// tangle::Client::for_node("https://chrysalis-nodes.iota.org").await?; /// # /// # let subscriber_transport = test_transport.clone(); /// # @@ -79,7 +81,11 @@ use crate::api::{ /// .send_signed_packet("BASE_BRANCH", b"public payload", b"masked payload") /// .await?; /// let second_packet = author -/// .send_signed_packet("BASE_BRANCH", b"another public payload", b"another masked payload") +/// .send_signed_packet( +/// "BASE_BRANCH", +/// b"another public payload", +/// b"another masked payload", +/// ) /// .await?; /// /// # diff --git a/streams/src/api/user_builder.rs b/streams/src/api/user_builder.rs index 6eb58239..6206678e 100644 --- a/streams/src/api/user_builder.rs +++ b/streams/src/api/user_builder.rs @@ -186,7 +186,8 @@ impl UserBuilder { /// # async fn main() -> Result<()> { /// # let test_transport = Rc::new(RefCell::new(bucket::Client::new())); /// let author_seed = "author_secure_seed"; - /// let transport: tangle::Client = tangle::Client::for_node("https://chrysalis-nodes.iota.org").await?; + /// let transport: tangle::Client = + /// tangle::Client::for_node("https://chrysalis-nodes.iota.org").await?; /// # /// # let transport = test_transport.clone(); /// # let mut author = User::builder() From abea060653398993ea5599eb3f598ad258ecaf7d Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 27 Jun 2022 11:53:34 -0600 Subject: [PATCH 3/9] updated messages tests --- streams/src/api/messages.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/streams/src/api/messages.rs b/streams/src/api/messages.rs index aae6b282..3ec5c162 100644 --- a/streams/src/api/messages.rs +++ b/streams/src/api/messages.rs @@ -322,7 +322,7 @@ mod tests { use anyhow::Result; - use lets::{address::Address, id::Ed25519, message::Topic, transport::bucket}; + use lets::{address::Address, id::Ed25519, transport::bucket}; use crate::api::{ message::{ @@ -331,6 +331,7 @@ mod tests { }, user::User, }; + use crate::api::message::MessageContent::BranchAnnouncement; type Transport = Rc>; @@ -339,21 +340,21 @@ mod tests { let p = b"payload"; let (mut author, mut subscriber1, announcement_link, transport) = author_subscriber_fixture().await?; - let branch_1 = Topic::from("Branch 1"); - let branch_announcement = author.new_branch(branch_1.clone()).await?; - let keyload_1 = author.send_keyload_for_all_rw(branch_1.clone()).await?; + let branch_1 = "BRANCH_1"; + let branch_announcement = author.new_branch("BASE_BRANCH", branch_1).await?; + let keyload_1 = author.send_keyload_for_all_rw(branch_1).await?; subscriber1.sync().await?; - let _packet_1 = subscriber1.send_signed_packet(branch_1.clone(), &p, &p).await?; + let _packet_1 = subscriber1.send_signed_packet(branch_1, &p, &p).await?; // This packet will never be readable by subscriber2. However, she will still be able to progress // through the next messages - let _packet_2 = subscriber1.send_signed_packet(branch_1.clone(), &p, &p).await?; + let _packet_2 = subscriber1.send_signed_packet(branch_1, &p, &p).await?; let mut subscriber2 = subscriber_fixture("subscriber2", &mut author, announcement_link, transport).await?; author.sync().await?; // This packet has to wait in the `Messages::msg_queue` until `packet` is processed - let keyload_2 = author.send_keyload_for_all_rw(branch_1.clone()).await?; + let keyload_2 = author.send_keyload_for_all_rw(branch_1).await?; subscriber1.sync().await?; let last_signed_packet = subscriber1.send_signed_packet(branch_1, &p, &p).await?; @@ -365,7 +366,7 @@ mod tests { &[ Message { address: address_0, - content: Announcement(..), + content: BranchAnnouncement(..), .. }, Message { From d21dfcec01530f9264c42252a7427ec8a5f9eba3 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 27 Jun 2022 14:37:33 -0600 Subject: [PATCH 4/9] Messages imports for tests --- streams/src/api/messages.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streams/src/api/messages.rs b/streams/src/api/messages.rs index 3ec5c162..acbb8614 100644 --- a/streams/src/api/messages.rs +++ b/streams/src/api/messages.rs @@ -327,11 +327,10 @@ mod tests { use crate::api::{ message::{ Message, - MessageContent::{Announcement, Keyload, SignedPacket}, + MessageContent::{Announcement, BranchAnnouncement, Keyload, SignedPacket}, }, user::User, }; - use crate::api::message::MessageContent::BranchAnnouncement; type Transport = Rc>; From 2a007aeb0f56616ab4a00734fb2c0a08cd26459e Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 4 Jul 2022 12:12:01 -0600 Subject: [PATCH 5/9] remove prev_topic from branch_announcement, carry permissions forward --- streams/src/api/message.rs | 4 +-- streams/src/api/user.rs | 40 ++++++++++++++-------- streams/src/message/branch_announcement.rs | 12 ++----- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/streams/src/api/message.rs b/streams/src/api/message.rs index 18c3c169..46195564 100644 --- a/streams/src/api/message.rs +++ b/streams/src/api/message.rs @@ -202,7 +202,6 @@ pub struct Announcement { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct BranchAnnouncement { pub topic: Topic, - pub previous_topic: Topic, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -273,8 +272,7 @@ impl From for MessageContent { impl<'a> From> for MessageContent { fn from(branch_announcement: branch_announcement::Unwrap<'a>) -> Self { Self::BranchAnnouncement(BranchAnnouncement { - topic: branch_announcement.new_topic().clone(), - previous_topic: branch_announcement.previous_topic().clone(), + topic: branch_announcement.into_new_topic(), }) } } diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index ec3ae303..a5115dba 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -274,18 +274,16 @@ impl User { async fn handle_branch_announcement(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { // Retrieve header values - let topic = preparsed.header().topic().clone(); + let prev_topic = preparsed.header().topic().clone(); let publisher = preparsed.header().publisher().clone(); let cursor = preparsed.header().sequence(); - // Insert new branch into store - self.state.cursor_store.new_branch(topic.clone()); // From the point of view of cursor tracking, the message exists, regardless of the validity or // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message self.state .cursor_store - .insert_cursor(&topic, publisher.clone(), INIT_MESSAGE_NUM); + .insert_cursor(&prev_topic, publisher.clone(), cursor); // Unwrap message let linked_msg_address = preparsed.header().linked_msg_address().ok_or_else(|| { @@ -304,15 +302,23 @@ impl User { let branch_announcement = branch_announcement::Unwrap::new(&mut linked_msg_spongos); let (message, spongos) = preparsed.unwrap(branch_announcement).await?; - let previous_topic = message.payload().content().previous_topic(); + let new_topic = message.payload().content().new_topic(); // Store spongos self.state.spongos_store.insert(address.relative(), spongos); - // Store updated cursor for publisher on previous branch - self.state.cursor_store.insert_cursor(previous_topic, publisher, cursor); + // Insert new branch into store + self.state.cursor_store.new_branch(new_topic.clone()); + // Collect permissions from previous branch and clone them into new branch + let prev_permissions = self.state.cursor_store.cursors() + .filter(|cursor| cursor.0 == &prev_topic) + .map(|(_, id, _)| id.clone()) + .collect::>(); + prev_permissions.into_iter().for_each(|id| { + self.state.cursor_store.insert_cursor(new_topic, id, INIT_MESSAGE_NUM); + }); // Update branch links - self.set_anchor(&topic, address.relative()); - self.set_latest_link(&topic, address.relative()); + self.set_anchor(new_topic, address.relative()); + self.set_latest_link(new_topic, address.relative()); Ok(Message::from_lets_message(address, message)) } @@ -642,7 +648,7 @@ where } /// Prepare new branch Announcement message - pub async fn new_branch>(&mut self, from_topic: Top, to_topic: Top) -> Result> { + pub async fn new_branch(&mut self, from_topic: impl Into, to_topic: impl Into) -> Result> { // Check conditions let stream_address = self .stream_address() @@ -676,14 +682,13 @@ where message_types::BRANCH_ANNOUNCEMENT, user_cursor, identifier.clone(), - topic.clone(), + prev_topic.clone(), )? .with_linked_msg_address(link_to); let content = PCF::new_final_frame().with_content(branch_announcement::Wrap::new( &mut linked_msg_spongos, user_id, &topic, - &prev_topic, )); // Wrap message @@ -696,10 +701,15 @@ where self.state .cursor_store .insert_cursor(&prev_topic, identifier.clone(), self.next_cursor(&prev_topic)?); - self.state - .cursor_store - .insert_cursor(&topic, identifier.clone(), INIT_MESSAGE_NUM); self.state.spongos_store.insert(address.relative(), spongos); + // Collect permissions from previous branch and clone them into new branch + let prev_permissions = self.state.cursor_store.cursors() + .filter(|cursor| cursor.0 == &prev_topic) + .map(|(_, id, _)| (id.clone())) + .collect::>(); + prev_permissions.into_iter().for_each(|id| { + self.state.cursor_store.insert_cursor(&topic, id, INIT_MESSAGE_NUM); + }); // Update branch links self.state.cursor_store.set_anchor(&topic, address.relative()); diff --git a/streams/src/message/branch_announcement.rs b/streams/src/message/branch_announcement.rs index 7ea4c47c..7d7de96c 100644 --- a/streams/src/message/branch_announcement.rs +++ b/streams/src/message/branch_announcement.rs @@ -45,7 +45,6 @@ pub(crate) struct Wrap<'a> { initial_state: &'a mut Spongos, user_id: &'a Identity, new_topic: &'a Topic, - previous_topic: &'a Topic, } impl<'a> Wrap<'a> { @@ -53,13 +52,11 @@ impl<'a> Wrap<'a> { initial_state: &'a mut Spongos, user_id: &'a Identity, new_topic: &'a Topic, - previous_topic: &'a Topic, ) -> Self { Self { initial_state, user_id, new_topic, - previous_topic, } } } @@ -69,7 +66,6 @@ impl<'a> ContentSizeof> for sizeof::Context { async fn sizeof(&mut self, announcement: &Wrap<'a>) -> Result<&mut Self> { self.mask(&announcement.user_id.to_identifier())? .mask(announcement.new_topic)? - .mask(announcement.previous_topic)? .sign_sizeof(announcement.user_id) .await? .commit()?; @@ -86,7 +82,6 @@ where self.join(announcement.initial_state)? .mask(&announcement.user_id.to_identifier())? .mask(announcement.new_topic)? - .mask(announcement.previous_topic)? .sign(announcement.user_id) .await? .commit()?; @@ -97,7 +92,6 @@ where pub(crate) struct Unwrap<'a> { initial_state: &'a mut Spongos, new_topic: Topic, - previous_topic: Topic, } impl<'a> Unwrap<'a> { @@ -105,7 +99,6 @@ impl<'a> Unwrap<'a> { Self { initial_state, new_topic: Topic::default(), - previous_topic: Topic::default(), } } @@ -113,8 +106,8 @@ impl<'a> Unwrap<'a> { &self.new_topic } - pub(crate) fn previous_topic(&self) -> &Topic { - &self.previous_topic + pub(crate) fn into_new_topic(self) -> Topic { + self.new_topic } } @@ -128,7 +121,6 @@ where self.join(announcement.initial_state)? .mask(&mut author_id)? .mask(&mut announcement.new_topic)? - .mask(&mut announcement.previous_topic)? .verify(&author_id) .await? .commit()?; From abdc6bc2d266246baf2f9cab82f2c9ec794ca632 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 4 Jul 2022 12:58:51 -0600 Subject: [PATCH 6/9] Remove anchor from CursorStore --- streams/src/api/cursor_store.rs | 22 ------------ streams/src/api/user.rs | 63 ++++++++++----------------------- 2 files changed, 19 insertions(+), 66 deletions(-) diff --git a/streams/src/api/cursor_store.rs b/streams/src/api/cursor_store.rs index 84de92a5..18db152c 100644 --- a/streams/src/api/cursor_store.rs +++ b/streams/src/api/cursor_store.rs @@ -55,22 +55,6 @@ impl CursorStore { None } - pub(crate) fn set_anchor(&mut self, topic: &Topic, anchor: MsgId) -> Option { - match self.0.get_mut(topic) { - Some(branch) => { - branch.anchor = anchor; - None - } - None => { - let branch = InnerCursorStore { - anchor, - ..Default::default() - }; - self.0.insert(topic.clone(), branch) - } - } - } - pub(crate) fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option { match self.0.get_mut(topic) { Some(branch) => { @@ -87,10 +71,6 @@ impl CursorStore { } } - pub(crate) fn get_anchor(&self, topic: &Topic) -> Option { - self.0.get(topic).map(|branch| branch.anchor) - } - pub(crate) fn get_latest_link(&self, topic: &Topic) -> Option { self.0.get(topic).map(|branch| branch.latest_link) } @@ -99,13 +79,11 @@ impl CursorStore { #[derive(Clone, PartialEq, Eq, Default)] pub(crate) struct InnerCursorStore { cursors: HashMap, - anchor: MsgId, latest_link: MsgId, } impl fmt::Debug for InnerCursorStore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!(f, "\t* anchor: {}", self.anchor)?; writeln!(f, "\t* latest link: {}", self.latest_link)?; writeln!(f, "\t* cursors:")?; for (id, cursor) in self.cursors.iter() { diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index a5115dba..1a6bacec 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -200,21 +200,15 @@ impl User { self.state.psk_store.remove(&pskid).is_some() } - /// Sets the anchor message link for a specified branch. If the branch does not exist, it is - /// created - fn set_anchor(&mut self, topic: &Topic, anchor: MsgId) -> Option { - self.state.cursor_store.set_anchor(topic, anchor) - } - /// Sets the latest message link for a specified branch. If the branch does not exist, it is /// created fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option { self.state.cursor_store.set_latest_link(topic, latest_link) } - fn get_anchor(&self, topic: &Topic) -> Option { + /*fn get_anchor(&self, topic: &Topic) -> Option { self.state.cursor_store.get_anchor(topic) - } + }*/ fn get_latest_link(&self, topic: &Topic) -> Option { self.state.cursor_store.get_latest_link(topic) @@ -262,7 +256,6 @@ impl User { let author_ke_pk = *message.payload().content().author_ke_pk(); // Update branch links - self.set_anchor(&topic, address.relative()); self.set_latest_link(&topic, address.relative()); self.state.author_identifier = Some(author_id.clone()); self.state.exchange_keys.insert(author_id, author_ke_pk); @@ -317,7 +310,6 @@ impl User { }); // Update branch links - self.set_anchor(new_topic, address.relative()); self.set_latest_link(new_topic, address.relative()); Ok(Message::from_lets_message(address, message)) @@ -384,6 +376,9 @@ impl User { } async fn handle_keyload(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { + let stream_address = self.stream_address() + .ok_or_else(|| anyhow!("before handling a keyload one must have received a stream announcement first"))?; + // From the point of view of cursor tracking, the message exists, regardless of the validity or // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message @@ -397,16 +392,10 @@ impl User { let author_identifier = self.state.author_identifier.as_ref().ok_or_else(|| { anyhow!("before receiving keyloads one must have received the announcement of a stream first") })?; - self.stream_address() - .ok_or_else(|| anyhow!("before handling a keyload one must have received a stream announcement first"))?; - let prev_msg = preparsed - .header() - .linked_msg_address() - .ok_or_else(|| anyhow!("a keyload must have a previously linked message"))?; let mut announcement_spongos = self .state .spongos_store - .get(&prev_msg) + .get(&stream_address.relative()) .copied() .expect("a subscriber that has received an stream announcement must keep its spongos in store"); @@ -636,7 +625,6 @@ where self.state.spongos_store.insert(stream_address.relative(), spongos); // Update branch links - self.set_anchor(&topic, stream_address.relative()); self.set_latest_link(&topic, stream_address.relative()); // Commit Author Identifier and Stream Address to store @@ -712,7 +700,6 @@ where }); // Update branch links - self.state.cursor_store.set_anchor(&topic, address.relative()); self.state.cursor_store.set_latest_link(&topic, address.relative()); Ok(SendResponse::new(address, send_response)) } @@ -729,10 +716,7 @@ where // Get base branch topic let base_branch = &self.state.base_branch; // Link message to channel announcement - let link_to = self - .get_anchor(base_branch) - .ok_or_else(|| anyhow!("No anchor found in branch <{}>", base_branch))?; - + let link_to = stream_address.relative(); let rel_address = MsgId::gen(stream_address.base(), &identifier, base_branch, SUB_MESSAGE_NUM); // Prepare HDF and PCF @@ -794,8 +778,8 @@ where let base_branch = &self.state.base_branch; // Link message to channel announcement let link_to = self - .get_anchor(base_branch) - .ok_or_else(|| anyhow!("No anchor found in branch <{}>", base_branch))?; + .get_latest_link(base_branch) + .ok_or_else(|| anyhow!("No latest link found in branch <{}>", base_branch))?; // Update own's cursor let new_cursor = self.next_cursor(base_branch)?; @@ -857,19 +841,20 @@ where let identifier = user_id.to_identifier(); // Check Topic let topic = topic.into(); - // Link message to anchor in branch + // Link message to edge of branch let link_to = self - .get_anchor(&topic) - .ok_or_else(|| anyhow!("No anchor found in branch <{}>", topic))?; + .get_latest_link(&topic) + .ok_or_else(|| anyhow!("No latest message found in branch <{}>", topic))?; // Update own's cursor let new_cursor = self.next_cursor(&topic)?; let rel_address = MsgId::gen(stream_address.base(), &identifier, &topic, new_cursor); // Prepare HDF and PCF - let mut linked_msg_spongos = self + // All Keyload messages will attach to stream Announcement message spongos + let mut announcement_msg_spongos = self .state .spongos_store - .get(&link_to) + .get(&stream_address.relative()) .copied() .expect("a subscriber that has received an stream announcement must keep its spongos in store"); @@ -901,7 +886,7 @@ where }) .collect::>>()?; // collect to handle possible error let content = PCF::new_final_frame().with_content(keyload::Wrap::new( - &mut linked_msg_spongos, + &mut announcement_msg_spongos, subscribers_with_keys.iter().copied(), &psk_ids_with_psks, encryption_key, @@ -1130,15 +1115,11 @@ impl ContentSizeof for sizeof::Context { for topic in topics { self.mask(topic)?; - let anchor = user_state - .cursor_store - .get_anchor(topic) - .ok_or_else(|| anyhow!("No anchor found in branch <{}>", topic))?; let latest_link = user_state .cursor_store .get_latest_link(topic) .ok_or_else(|| anyhow!("No latest link found in branch <{}>", topic))?; - self.mask(&anchor)?.mask(&latest_link)?; + self.mask(&latest_link)?; let cursors: Vec<(&Topic, &Identifier, usize)> = user_state .cursor_store @@ -1191,15 +1172,11 @@ impl<'a> ContentWrap for wrap::Context<&'a mut [u8]> { for topic in topics { self.mask(topic)?; - let anchor = user_state - .cursor_store - .get_anchor(topic) - .ok_or_else(|| anyhow!("No anchor found in branch <{}>", topic))?; let latest_link = user_state .cursor_store .get_latest_link(topic) .ok_or_else(|| anyhow!("No latest link found in branch <{}>", topic))?; - self.mask(&anchor)?.mask(&latest_link)?; + self.mask(&latest_link)?; let cursors: Vec<(&Topic, &Identifier, usize)> = user_state .cursor_store @@ -1255,11 +1232,9 @@ impl<'a> ContentUnwrap for unwrap::Context<&'a [u8]> { for _ in 0..amount_topics.inner() { let mut topic = Topic::default(); self.mask(&mut topic)?; - let mut anchor = MsgId::default(); let mut latest_link = MsgId::default(); - self.mask(&mut anchor)?.mask(&mut latest_link)?; + self.mask(&mut latest_link)?; - user_state.cursor_store.set_anchor(&topic, anchor); user_state.cursor_store.set_latest_link(&topic, latest_link); let mut amount_cursors = Size::default(); From 3d5bad594b4a9d9dc8208a184f8849c01fde64e9 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 4 Jul 2022 15:29:26 -0600 Subject: [PATCH 7/9] fmt/clippy --- streams/src/api/user.rs | 31 +++++++++++++--------- streams/src/message/branch_announcement.rs | 6 +---- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index 1a6bacec..2223525e 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -206,9 +206,9 @@ impl User { self.state.cursor_store.set_latest_link(topic, latest_link) } - /*fn get_anchor(&self, topic: &Topic) -> Option { - self.state.cursor_store.get_anchor(topic) - }*/ + // fn get_anchor(&self, topic: &Topic) -> Option { + // self.state.cursor_store.get_anchor(topic) + // } fn get_latest_link(&self, topic: &Topic) -> Option { self.state.cursor_store.get_latest_link(topic) @@ -301,13 +301,14 @@ impl User { // Insert new branch into store self.state.cursor_store.new_branch(new_topic.clone()); // Collect permissions from previous branch and clone them into new branch - let prev_permissions = self.state.cursor_store.cursors() + let prev_permissions = self + .cursors() .filter(|cursor| cursor.0 == &prev_topic) .map(|(_, id, _)| id.clone()) .collect::>(); - prev_permissions.into_iter().for_each(|id| { + for id in prev_permissions { self.state.cursor_store.insert_cursor(new_topic, id, INIT_MESSAGE_NUM); - }); + }; // Update branch links self.set_latest_link(new_topic, address.relative()); @@ -376,7 +377,8 @@ impl User { } async fn handle_keyload(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { - let stream_address = self.stream_address() + let stream_address = self + .stream_address() .ok_or_else(|| anyhow!("before handling a keyload one must have received a stream announcement first"))?; // From the point of view of cursor tracking, the message exists, regardless of the validity or @@ -636,7 +638,11 @@ where } /// Prepare new branch Announcement message - pub async fn new_branch(&mut self, from_topic: impl Into, to_topic: impl Into) -> Result> { + pub async fn new_branch( + &mut self, + from_topic: impl Into, + to_topic: impl Into, + ) -> Result> { // Check conditions let stream_address = self .stream_address() @@ -691,13 +697,14 @@ where .insert_cursor(&prev_topic, identifier.clone(), self.next_cursor(&prev_topic)?); self.state.spongos_store.insert(address.relative(), spongos); // Collect permissions from previous branch and clone them into new branch - let prev_permissions = self.state.cursor_store.cursors() + let prev_permissions = self + .cursors() .filter(|cursor| cursor.0 == &prev_topic) - .map(|(_, id, _)| (id.clone())) + .map(|(_, id, _)| id.clone()) .collect::>(); - prev_permissions.into_iter().for_each(|id| { + for id in prev_permissions { self.state.cursor_store.insert_cursor(&topic, id, INIT_MESSAGE_NUM); - }); + }; // Update branch links self.state.cursor_store.set_latest_link(&topic, address.relative()); diff --git a/streams/src/message/branch_announcement.rs b/streams/src/message/branch_announcement.rs index 7d7de96c..7e1e9085 100644 --- a/streams/src/message/branch_announcement.rs +++ b/streams/src/message/branch_announcement.rs @@ -48,11 +48,7 @@ pub(crate) struct Wrap<'a> { } impl<'a> Wrap<'a> { - pub(crate) fn new( - initial_state: &'a mut Spongos, - user_id: &'a Identity, - new_topic: &'a Topic, - ) -> Self { + pub(crate) fn new(initial_state: &'a mut Spongos, user_id: &'a Identity, new_topic: &'a Topic) -> Self { Self { initial_state, user_id, From 783383555172a4647dd1a75ceed7b8353ec5cf7e Mon Sep 17 00:00:00 2001 From: DyrellC Date: Tue, 5 Jul 2022 16:40:29 -0600 Subject: [PATCH 8/9] clean up --- streams/src/api/messages.rs | 2 +- streams/src/api/user.rs | 13 ++++--------- streams/src/message/announcement.rs | 4 ---- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/streams/src/api/messages.rs b/streams/src/api/messages.rs index f1ef416d..66d43fa3 100644 --- a/streams/src/api/messages.rs +++ b/streams/src/api/messages.rs @@ -328,7 +328,7 @@ mod tests { use crate::api::{ message::{ Message, - MessageContent::{Announcement, BranchAnnouncement, Keyload, SignedPacket}, + MessageContent::{BranchAnnouncement, Keyload, SignedPacket}, }, user::User, }; diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index 2223525e..817288c4 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -206,10 +206,6 @@ impl User { self.state.cursor_store.set_latest_link(topic, latest_link) } - // fn get_anchor(&self, topic: &Topic) -> Option { - // self.state.cursor_store.get_anchor(topic) - // } - fn get_latest_link(&self, topic: &Topic) -> Option { self.state.cursor_store.get_latest_link(topic) } @@ -308,7 +304,7 @@ impl User { .collect::>(); for id in prev_permissions { self.state.cursor_store.insert_cursor(new_topic, id, INIT_MESSAGE_NUM); - }; + } // Update branch links self.set_latest_link(new_topic, address.relative()); @@ -391,9 +387,8 @@ impl User { ); // Unwrap message - let author_identifier = self.state.author_identifier.as_ref().ok_or_else(|| { - anyhow!("before receiving keyloads one must have received the announcement of a stream first") - })?; + // Ok to unwrap since an author identifier is set at the same time as the stream address + let author_identifier = self.state.author_identifier.as_ref().unwrap(); let mut announcement_spongos = self .state .spongos_store @@ -704,7 +699,7 @@ where .collect::>(); for id in prev_permissions { self.state.cursor_store.insert_cursor(&topic, id, INIT_MESSAGE_NUM); - }; + } // Update branch links self.state.cursor_store.set_latest_link(&topic, address.relative()); diff --git a/streams/src/message/announcement.rs b/streams/src/message/announcement.rs index 13c1b09f..445f9fed 100644 --- a/streams/src/message/announcement.rs +++ b/streams/src/message/announcement.rs @@ -110,10 +110,6 @@ impl Unwrap { pub(crate) fn author_ke_pk(&self) -> &x25519::PublicKey { &self.author_ke_pk } - - pub(crate) fn into_parts(self) -> (Identifier, x25519::PublicKey) { - (self.author_id, self.author_ke_pk) - } } #[async_trait(?Send)] From 18aa0d8fa99ee664713221a9581b855c90352786 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 11 Jul 2022 22:10:34 -0600 Subject: [PATCH 9/9] add cursors_by_topic --- streams/src/api/cursor_store.rs | 4 ++++ streams/src/api/user.rs | 12 +++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/streams/src/api/cursor_store.rs b/streams/src/api/cursor_store.rs index 18db152c..d4b70ff4 100644 --- a/streams/src/api/cursor_store.rs +++ b/streams/src/api/cursor_store.rs @@ -48,6 +48,10 @@ impl CursorStore { .flat_map(|(topic, branch)| branch.cursors.iter().map(move |(id, cursor)| (topic, id, *cursor))) } + pub(crate) fn cursors_by_topic(&self, topic: &Topic) -> Option> { + self.0.get(topic).map(|inner| inner.cursors.iter()) + } + pub(crate) fn insert_cursor(&mut self, topic: &Topic, id: Identifier, cursor: usize) -> Option { if let Some(branch) = self.0.get_mut(topic) { return branch.cursors.insert(id, cursor); diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index 817288c4..b7f56e6d 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -169,6 +169,13 @@ impl User { self.state.cursor_store.cursors() } + fn cursors_by_topic(&self, topic: &Topic) -> Result> { + self.state + .cursor_store + .cursors_by_topic(topic) + .ok_or_else(|| anyhow!("previous topic {} not found in store", topic)) + } + pub fn subscribers(&self) -> impl Iterator + Clone + '_ { self.state.exchange_keys.keys() } @@ -298,9 +305,8 @@ impl User { self.state.cursor_store.new_branch(new_topic.clone()); // Collect permissions from previous branch and clone them into new branch let prev_permissions = self - .cursors() - .filter(|cursor| cursor.0 == &prev_topic) - .map(|(_, id, _)| id.clone()) + .cursors_by_topic(&prev_topic)? + .map(|(id, _)| id.clone()) .collect::>(); for id in prev_permissions { self.state.cursor_store.insert_cursor(new_topic, id, INIT_MESSAGE_NUM);