From 463072fcb5c3796c858b9b1cf4feb61317590afb Mon Sep 17 00:00:00 2001 From: DyrellC Date: Tue, 2 Aug 2022 09:53:15 -0600 Subject: [PATCH 1/5] Add MAC verification to HDF --- lets/src/message/hdf.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/lets/src/message/hdf.rs b/lets/src/message/hdf.rs index f9c273dd..56dc6732 100644 --- a/lets/src/message/hdf.rs +++ b/lets/src/message/hdf.rs @@ -4,10 +4,10 @@ use async_trait::async_trait; use spongos::{ ddml::{ - commands::{sizeof, unwrap, wrap, Absorb, Guard, Mask, Skip}, + commands::{sizeof, unwrap, wrap, Absorb, Commit, Guard, Mask, Skip, Squeeze}, io, modifiers::External, - types::{Maybe, NBytes, Size, Uint8}, + types::{Mac, Maybe, NBytes, Size, Uint8}, }, PRP, }; @@ -22,6 +22,8 @@ use crate::{ }, }; +const MAC: Mac = Mac::new(32); + #[non_exhaustive] #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[allow(clippy::upper_case_acronyms)] @@ -141,7 +143,9 @@ impl ContentSizeof for sizeof::Context { .absorb(Maybe::new(hdf.linked_msg_address.as_ref()))? .mask(&hdf.topic)? .mask(&hdf.publisher)? - .skip(Size::new(hdf.sequence))?; + .skip(Size::new(hdf.sequence))? + .commit()? + .squeeze(&MAC)?; Ok(self) } @@ -178,7 +182,9 @@ where .absorb(Maybe::new(hdf.linked_msg_address.as_ref()))? .mask(&hdf.topic)? .mask(&hdf.publisher)? - .skip(Size::new(hdf.sequence))?; + .skip(Size::new(hdf.sequence))? + .commit()? + .squeeze(&MAC)?; Ok(self) } @@ -228,7 +234,9 @@ where .absorb(Maybe::new(&mut hdf.linked_msg_address))? .mask(&mut hdf.topic)? .mask(&mut hdf.publisher)? - .skip(&mut seq_num)?; + .skip(&mut seq_num)? + .commit()? + .squeeze(&MAC)?; hdf.encoding = encoding.inner(); hdf.version = version.inner(); From 5c2772fe464a19338b59debe05dadb890a3f9081 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Fri, 5 Aug 2022 01:16:45 -0600 Subject: [PATCH 2/5] Add TopicHash, store cursors by hash, hash topic in hdf --- lets/src/message/hdf.rs | 20 ++-- lets/src/message/mod.rs | 2 +- lets/src/message/topic.rs | 47 ++++++++- streams/src/api/cursor_store.rs | 32 +++--- streams/src/api/messages.rs | 7 +- streams/src/api/user.rs | 151 ++++++++++++++++------------ streams/src/message/announcement.rs | 17 +++- 7 files changed, 173 insertions(+), 103 deletions(-) diff --git a/lets/src/message/hdf.rs b/lets/src/message/hdf.rs index 56dc6732..a6a02bdc 100644 --- a/lets/src/message/hdf.rs +++ b/lets/src/message/hdf.rs @@ -17,7 +17,7 @@ use crate::{ id::Identifier, message::{ content::{ContentSizeof, ContentUnwrap, ContentWrap}, - topic::Topic, + topic::{Topic, TopicHash}, version::{HDF_ID, STREAMS_1_VER, UTF8}, }, }; @@ -40,7 +40,7 @@ pub struct HDF { pub linked_msg_address: Option, pub sequence: usize, pub publisher: Identifier, - pub topic: Topic, + pub topic_hash: TopicHash, } impl Default for HDF { @@ -55,13 +55,13 @@ impl Default for HDF { linked_msg_address: Default::default(), sequence: 0, publisher: Default::default(), - topic: Default::default(), + topic_hash: Default::default(), } } } impl HDF { - pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: Topic) -> Result { + pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: &Topic) -> Result { ensure!( message_type >> 4 == 0, anyhow!( @@ -79,7 +79,7 @@ impl HDF { linked_msg_address: None, sequence, publisher, - topic, + topic_hash: topic.into(), }) } @@ -124,8 +124,8 @@ impl HDF { self.linked_msg_address } - pub fn topic(&self) -> &Topic { - &self.topic + pub fn topic_hash(&self) -> &TopicHash { + &self.topic_hash } } @@ -141,7 +141,7 @@ impl ContentSizeof for sizeof::Context { .absorb(Uint8::new(hdf.frame_type))? .skip(payload_frame_count)? .absorb(Maybe::new(hdf.linked_msg_address.as_ref()))? - .mask(&hdf.topic)? + .mask(&hdf.topic_hash)? .mask(&hdf.publisher)? .skip(Size::new(hdf.sequence))? .commit()? @@ -180,7 +180,7 @@ where .absorb(Uint8::new(hdf.frame_type))? .skip(payload_frame_count)? .absorb(Maybe::new(hdf.linked_msg_address.as_ref()))? - .mask(&hdf.topic)? + .mask(&hdf.topic_hash)? .mask(&hdf.publisher)? .skip(Size::new(hdf.sequence))? .commit()? @@ -232,7 +232,7 @@ where anyhow!("first 2 bits of payload-frame-count are reserved"), )? .absorb(Maybe::new(&mut hdf.linked_msg_address))? - .mask(&mut hdf.topic)? + .mask(&mut hdf.topic_hash)? .mask(&mut hdf.publisher)? .skip(&mut seq_num)? .commit()? diff --git a/lets/src/message/mod.rs b/lets/src/message/mod.rs index a5f1e052..36fb0b9d 100644 --- a/lets/src/message/mod.rs +++ b/lets/src/message/mod.rs @@ -24,5 +24,5 @@ pub use hdf::HDF; pub use message::Message; pub use pcf::PCF; pub use preparsed::PreparsedMessage; -pub use topic::Topic; +pub use topic::{Topic, TopicHash}; pub use transport::TransportMessage; diff --git a/lets/src/message/topic.rs b/lets/src/message/topic.rs index fe5529d0..bb38ce34 100644 --- a/lets/src/message/topic.rs +++ b/lets/src/message/topic.rs @@ -11,9 +11,9 @@ use spongos::{ ddml::{ commands::{sizeof, unwrap, wrap, Mask}, io, - types::Bytes, + types::{Bytes, NBytes}, }, - PRP, + KeccakF1600, Spongos, PRP, }; #[derive(Clone, PartialEq, Eq, Debug, Default, Hash)] @@ -105,3 +105,46 @@ where Ok(self) } } + +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Debug, Default, Hash)] +pub struct TopicHash([u8; 16]); + +impl From<&Topic> for TopicHash { + fn from(topic: &Topic) -> Self { + let topic_hash: [u8; 16] = Spongos::::init().sponge(topic.as_ref()); + Self(topic_hash) + } +} + +impl AsRef<[u8]> for TopicHash { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl Mask<&TopicHash> for sizeof::Context { + fn mask(&mut self, topic_hash: &TopicHash) -> anyhow::Result<&mut Self> { + self.mask(NBytes::<[u8; 16]>::new(topic_hash.0)) + } +} + +impl Mask<&TopicHash> for wrap::Context +where + F: PRP, + OS: io::OStream, +{ + fn mask(&mut self, topic_hash: &TopicHash) -> anyhow::Result<&mut Self> { + self.mask(NBytes::<[u8; 16]>::new(topic_hash.0)) + } +} + +impl Mask<&mut TopicHash> for unwrap::Context +where + F: PRP, + IS: io::IStream, +{ + fn mask(&mut self, topic_hash: &mut TopicHash) -> anyhow::Result<&mut Self> { + self.mask(NBytes::<&mut [u8; 16]>::new(&mut topic_hash.0))?; + Ok(self) + } +} diff --git a/streams/src/api/cursor_store.rs b/streams/src/api/cursor_store.rs index 4285e1bb..c3e17da3 100644 --- a/streams/src/api/cursor_store.rs +++ b/streams/src/api/cursor_store.rs @@ -10,25 +10,21 @@ use hashbrown::HashMap; use lets::{ address::MsgId, id::{Identifier, Permissioned}, - message::Topic, + message::{Topic, TopicHash}, }; // Local #[derive(Default, Clone, PartialEq, Eq)] -pub(crate) struct CursorStore(HashMap); +pub(crate) struct CursorStore(HashMap); impl CursorStore { pub(crate) fn new() -> Self { Default::default() } - pub(crate) fn new_branch(&mut self, topic: Topic) -> bool { - self.0.insert(topic, InnerCursorStore::default()).is_none() - } - - pub(crate) fn topics(&self) -> impl Iterator + ExactSizeIterator { - self.0.keys() + pub(crate) fn new_branch(&mut self, topic: &Topic) -> bool { + self.0.insert(topic.into(), InnerCursorStore::default()).is_none() } pub(crate) fn remove(&mut self, id: &Identifier) -> bool { @@ -44,7 +40,7 @@ impl CursorStore { } pub(crate) fn get_permission(&self, topic: &Topic, id: &Identifier) -> Option<&Permissioned> { - self.0.get(topic).and_then(|branch| { + self.0.get(&topic.into()).and_then(|branch| { branch .cursors .iter() @@ -54,7 +50,7 @@ impl CursorStore { } pub(crate) fn get_cursor(&self, topic: &Topic, id: &Identifier) -> Option { - self.0.get(topic).and_then(|branch| { + self.0.get(&topic.into()).and_then(|branch| { branch .cursors .iter() @@ -64,7 +60,7 @@ impl CursorStore { }) } - pub(crate) fn cursors(&self) -> impl Iterator, usize)> + Clone + '_ { + pub(crate) fn cursors(&self) -> impl Iterator, usize)> + Clone + '_ { self.0 .iter() .flat_map(|(topic, branch)| branch.cursors.iter().map(move |(id, cursor)| (topic, id, *cursor))) @@ -74,7 +70,7 @@ impl CursorStore { &self, topic: &Topic, ) -> Option, &usize)>> { - self.0.get(topic).map(|inner| inner.cursors.iter()) + self.0.get(&topic.into()).map(|inner| inner.cursors.iter()) } pub(crate) fn insert_cursor( @@ -99,12 +95,12 @@ impl CursorStore { }; self.0 - .get_mut(topic) + .get_mut(&topic.into()) .and_then(|branch| branch.cursors.insert(id, cursor)) } pub(crate) fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option { - match self.0.get_mut(topic) { + match self.0.get_mut(&topic.into()) { Some(branch) => { branch.latest_link = latest_link; None @@ -114,13 +110,13 @@ impl CursorStore { latest_link, ..Default::default() }; - self.0.insert(topic.clone(), branch) + self.0.insert(topic.into(), branch) } } } pub(crate) fn get_latest_link(&self, topic: &Topic) -> Option { - self.0.get(topic).map(|branch| branch.latest_link) + self.0.get(&topic.into()).map(|branch| branch.latest_link) } } @@ -168,8 +164,8 @@ mod tests { let topic_1 = Topic::new("topic 1".to_string()); let topic_2 = Topic::new("topic 2".to_string()); - branch_store.new_branch(topic_1.clone()); - branch_store.new_branch(topic_2.clone()); + branch_store.new_branch(&topic_1); + branch_store.new_branch(&topic_2); branch_store.insert_cursor(&topic_1, permission.clone(), 10); branch_store.insert_cursor(&topic_2, permission.clone(), 20); diff --git a/streams/src/api/messages.rs b/streams/src/api/messages.rs index a94e076c..6080a904 100644 --- a/streams/src/api/messages.rs +++ b/streams/src/api/messages.rs @@ -18,7 +18,7 @@ use hashbrown::HashMap; use lets::{ address::{Address, MsgId}, id::{Identifier, Permissioned}, - message::{Topic, TransportMessage, HDF}, + message::{Topic, TopicHash, TransportMessage, HDF}, transport::Transport, }; @@ -132,7 +132,7 @@ type PinBoxFut<'a, T> = Pin + 'a>>; struct MessagesState<'a, T> { user: &'a mut User, - ids_stack: Vec<(Topic, Permissioned, usize)>, + ids_stack: Vec<(TopicHash, Permissioned, usize)>, msg_queue: HashMap>, stage: VecDeque<(MsgId, TransportMessage)>, successful_round: bool, @@ -202,7 +202,7 @@ impl<'a, T> MessagesState<'a, T> { } } else { // Stage is empty, populate it with some more messages - let (topic, publisher, cursor) = match self.ids_stack.pop() { + let (topic_hash, publisher, cursor) = match self.ids_stack.pop() { Some(id_cursor) => id_cursor, None => { // new round @@ -217,6 +217,7 @@ impl<'a, T> MessagesState<'a, T> { } }; let base_address = self.user.stream_address()?.base(); + let topic = self.user.topic_by_hash(&topic_hash)?; let rel_address = MsgId::gen(base_address, publisher.identifier(), &topic, cursor + 1); let address = Address::new(base_address, rel_address); diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index bf686073..229c6bc9 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -6,7 +6,7 @@ use core::fmt::{Debug, Formatter, Result as FormatResult}; use anyhow::{anyhow, bail, ensure, Result}; use async_trait::async_trait; use futures::{future, TryStreamExt}; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use rand::{rngs::StdRng, Rng, SeedableRng}; // IOTA @@ -17,8 +17,8 @@ use lets::{ address::{Address, AppAddr, MsgId}, id::{Identifier, Identity, PermissionDuration, Permissioned, Psk, PskId}, message::{ - ContentSizeof, ContentUnwrap, ContentWrap, Message as LetsMessage, PreparsedMessage, Topic, TransportMessage, - HDF, PCF, + ContentSizeof, ContentUnwrap, ContentWrap, Message as LetsMessage, PreparsedMessage, Topic, TopicHash, + TransportMessage, HDF, PCF, }, transport::Transport, }; @@ -75,6 +75,8 @@ struct State { spongos_store: HashMap, base_branch: Topic, + + topics: HashSet, } pub struct User { @@ -117,6 +119,7 @@ impl User { stream_address: None, author_identifier: None, base_branch: Default::default(), + topics: Default::default(), }, } } @@ -167,10 +170,14 @@ impl User { } pub fn topics(&self) -> impl Iterator + ExactSizeIterator { - self.state.cursor_store.topics() + self.state.topics.iter() + } + + pub(crate) fn topic_by_hash(&self, hash: &TopicHash) -> Option { + self.topics().find(|t| &TopicHash::from(*t) == hash).cloned() } - pub(crate) fn cursors(&self) -> impl Iterator, usize)> + '_ { + pub(crate) fn cursors(&self) -> impl Iterator, usize)> + '_ { self.state.cursor_store.cursors() } @@ -238,22 +245,23 @@ impl User { /// in the message. async fn handle_announcement(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { // Check Topic - let topic = preparsed.header().topic().clone(); let publisher = preparsed.header().publisher().clone(); - // Insert new branch into store - self.state.cursor_store.new_branch(topic.clone()); - // From the point of view of cursor tracking, the message exists, regardless of the validity or - // accessibility to its content. Therefore we must update the cursor of the publisher before - // handling the message - self.state - .cursor_store - .insert_cursor(&topic, Permissioned::Admin(publisher), INIT_MESSAGE_NUM); - // Unwrap message let announcement = announcement::Unwrap::default(); let (message, spongos) = preparsed.unwrap(announcement).await?; + let topic = message.payload().content().topic(); + // Insert new branch into store + self.state.cursor_store.new_branch(topic); + self.state.topics.insert(topic.clone()); + + // When handling an announcement it means that no cursors have been stored, as no topics are + // known yet. The message must be unwrapped to retrieve the initial topic before storing cursors + self.state + .cursor_store + .insert_cursor(topic, Permissioned::Admin(publisher), INIT_MESSAGE_NUM); + // Store spongos self.state.spongos_store.insert(address.relative(), spongos); @@ -262,10 +270,10 @@ impl User { let author_ke_pk = *message.payload().content().author_ke_pk(); // Update branch links - self.set_latest_link(&topic, address.relative()); + self.set_latest_link(topic, address.relative()); self.state.author_identifier = Some(author_id.clone()); self.state.exchange_keys.insert(author_id, author_ke_pk); - self.state.base_branch = topic; + self.state.base_branch = topic.clone(); self.state.stream_address = Some(address); Ok(Message::from_lets_message(address, message)) @@ -273,7 +281,9 @@ impl User { async fn handle_branch_announcement(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { // Retrieve header values - let prev_topic = preparsed.header().topic().clone(); + let prev_topic = self + .topic_by_hash(preparsed.header().topic_hash()) + .ok_or_else(|| anyhow!("No known topic that matches header topic"))?; let publisher = preparsed.header().publisher().clone(); let cursor = preparsed.header().sequence(); @@ -309,7 +319,8 @@ impl User { // Store spongos self.state.spongos_store.insert(address.relative(), spongos); // Insert new branch into store - self.state.cursor_store.new_branch(new_topic.clone()); + self.state.cursor_store.new_branch(new_topic); + self.state.topics.insert(new_topic.clone()); // Collect permissions from previous branch and clone them into new branch let prev_permissions = self .cursors_by_topic(&prev_topic)? @@ -389,8 +400,9 @@ impl User { let stream_address = self .stream_address() .ok_or_else(|| anyhow!("before handling a keyload one must have received a stream announcement first"))?; - - let topic = preparsed.header().topic().clone(); + let topic = self + .topic_by_hash(preparsed.header().topic_hash()) + .ok_or_else(|| anyhow!("No known topic that matches header topic"))?; let publisher = preparsed.header().publisher().clone(); // Confirm keyload came from administrator if !self @@ -406,7 +418,7 @@ impl User { // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message self.state.cursor_store.insert_cursor( - preparsed.header().topic(), + &topic, Permissioned::Admin(publisher), preparsed.header().sequence(), ); @@ -458,32 +470,36 @@ impl User { if self.should_store_cursor(&topic, subscriber.as_ref()) { self.state .cursor_store - .insert_cursor(message.header().topic(), subscriber.clone(), INIT_MESSAGE_NUM); + .insert_cursor(&topic, subscriber.clone(), INIT_MESSAGE_NUM); } } // Have to make message before setting branch links due to immutable borrow in keyload::unwrap let final_message = Message::from_lets_message(address, message); // Update branch links - self.set_latest_link(&final_message.header().topic, address.relative()); + self.set_latest_link(&topic, address.relative()); Ok(final_message) } async fn handle_signed_packet(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { - let topic = preparsed.header().topic(); + let topic = self + .topic_by_hash(preparsed.header().topic_hash()) + .ok_or_else(|| anyhow!("No known topic that matches header topic"))?; let publisher = preparsed.header().publisher(); let permission = self .state .cursor_store - .get_permission(topic, publisher) + .get_permission(&topic, publisher) .expect("Publisher does not have a stored cursor on the provided branch") .clone(); // From the point of view of cursor tracking, the message exists, regardless of the validity or // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message - self.state - .cursor_store - .insert_cursor(topic, permission, preparsed.header().sequence()); + self.state.cursor_store.insert_cursor( + &topic, + permission, + preparsed.header().sequence(), + ); // Unwrap message let linked_msg_address = preparsed.header().linked_msg_address().ok_or_else(|| { @@ -504,17 +520,19 @@ impl User { self.state.spongos_store.insert(address.relative(), spongos); // Store message content into stores - self.set_latest_link(message.header().topic(), address.relative()); + self.set_latest_link(&topic, address.relative()); Ok(Message::from_lets_message(address, message)) } async fn handle_tagged_packet(&mut self, address: Address, preparsed: PreparsedMessage) -> Result { - let topic = preparsed.header().topic(); + let topic = self + .topic_by_hash(preparsed.header().topic_hash()) + .ok_or_else(|| anyhow!("No known topic that matches header topic"))?; let publisher = preparsed.header().publisher(); let permission = self .state .cursor_store - .get_permission(topic, publisher) + .get_permission(&topic, publisher) .expect("Publisher does not have a stored cursor on the provided branch") .clone(); // From the point of view of cursor tracking, the message exists, regardless of the validity or @@ -522,7 +540,7 @@ impl User { // handling the message self.state .cursor_store - .insert_cursor(topic, permission, preparsed.header().sequence()); + .insert_cursor(&topic, permission, preparsed.header().sequence()); // Unwrap message let linked_msg_address = preparsed.header().linked_msg_address().ok_or_else(|| { @@ -543,7 +561,7 @@ impl User { self.state.spongos_store.insert(address.relative(), spongos); // Store message content into stores - self.set_latest_link(message.header().topic(), address.relative()); + self.set_latest_link(&topic, address.relative()); Ok(Message::from_lets_message(address, message)) } @@ -654,9 +672,9 @@ where message_types::ANNOUNCEMENT, ANN_MESSAGE_NUM, identity.to_identifier(), - topic.clone(), + &topic, )?; - let content = PCF::new_final_frame().with_content(announcement::Wrap::new(identity)); + let content = PCF::new_final_frame().with_content(announcement::Wrap::new(identity, &topic)); // Wrap message let (transport_msg, spongos) = LetsMessage::new(header, content).wrap().await?; @@ -669,7 +687,8 @@ where let send_response = self.transport.send_message(stream_address, transport_msg).await?; // If a message has been sent successfully, insert the base branch into store - self.state.cursor_store.new_branch(topic.clone()); + self.state.cursor_store.new_branch(&topic); + self.state.topics.insert(topic.clone()); // Commit message to stores self.state .cursor_store @@ -735,7 +754,7 @@ where message_types::BRANCH_ANNOUNCEMENT, user_cursor, identifier.clone(), - prev_topic.clone(), + &prev_topic, )? .with_linked_msg_address(link_to); let content = PCF::new_final_frame().with_content(branch_announcement::Wrap::new( @@ -749,7 +768,8 @@ where let send_response = self.transport.send_message(address, transport_msg).await?; // If message has been sent successfully, create the new branch in store - self.state.cursor_store.new_branch(topic.clone()); + self.state.cursor_store.new_branch(&topic); + self.state.topics.insert(topic.clone()); // Commit message to stores and update cursors self.state.cursor_store.insert_cursor( &prev_topic, @@ -760,7 +780,7 @@ where // Collect permissions from previous branch and clone them into new branch let prev_permissions = self .cursors() - .filter(|cursor| cursor.0 == &prev_topic) + .filter(|cursor| cursor.0 == &TopicHash::from(&prev_topic)) .map(|(_, id, _)| id.clone()) .collect::>>(); for id in prev_permissions { @@ -812,7 +832,7 @@ where message_types::SUBSCRIPTION, SUB_MESSAGE_NUM, identifier.clone(), - base_branch.clone(), + base_branch, )? .with_linked_msg_address(link_to); @@ -866,7 +886,7 @@ where message_types::UNSUBSCRIPTION, new_cursor, identifier.clone(), - base_branch.clone(), + base_branch, )? .with_linked_msg_address(link_to); @@ -969,8 +989,8 @@ where nonce, user_id, )); - let header = HDF::new(message_types::KEYLOAD, new_cursor, identifier.clone(), topic.clone())? - .with_linked_msg_address(link_to); + let header = + HDF::new(message_types::KEYLOAD, new_cursor, identifier.clone(), &topic)?.with_linked_msg_address(link_to); // Wrap message let (transport_msg, spongos) = LetsMessage::new(header, content).wrap().await?; @@ -1113,13 +1133,8 @@ where public_payload.as_ref(), masked_payload.as_ref(), )); - let header = HDF::new( - message_types::SIGNED_PACKET, - new_cursor, - identifier.clone(), - topic.clone(), - )? - .with_linked_msg_address(link_to); + let header = HDF::new(message_types::SIGNED_PACKET, new_cursor, identifier.clone(), &topic)? + .with_linked_msg_address(link_to); // Wrap message let (transport_msg, spongos) = LetsMessage::new(header, content).wrap().await?; @@ -1193,13 +1208,8 @@ where public_payload.as_ref(), masked_payload.as_ref(), )); - let header = HDF::new( - message_types::TAGGED_PACKET, - new_cursor, - identifier.clone(), - topic.clone(), - )? - .with_linked_msg_address(link_to); + let header = HDF::new(message_types::TAGGED_PACKET, new_cursor, identifier.clone(), &topic)? + .with_linked_msg_address(link_to); // Wrap message let (transport_msg, spongos) = LetsMessage::new(header, content).wrap().await?; @@ -1237,8 +1247,12 @@ impl ContentSizeof for sizeof::Context { self.mask(address)?.mask(spongos)?; } - let topics = user_state.cursor_store.topics(); - let amount_topics = topics.len(); + // Only keep topics that exist in cursor store, any others serve no purpose + let topics = user_state + .topics + .iter() + .filter(|t| user_state.cursor_store.get_latest_link(*t).is_some()); + let amount_topics = topics.clone().count(); self.mask(Size::new(amount_topics))?; for topic in topics { @@ -1249,10 +1263,10 @@ impl ContentSizeof for sizeof::Context { .ok_or_else(|| anyhow!("No latest link found in branch <{}>", topic))?; self.mask(&latest_link)?; - let cursors: Vec<(&Topic, &Permissioned, usize)> = user_state + let cursors: Vec<(&TopicHash, &Permissioned, usize)> = user_state .cursor_store .cursors() - .filter(|(t, _, _)| *t == topic) + .filter(|(t, _, _)| *t == &topic.into()) .collect(); let amount_cursors = cursors.len(); self.mask(Size::new(amount_cursors))?; @@ -1294,8 +1308,12 @@ impl<'a> ContentWrap for wrap::Context<&'a mut [u8]> { self.mask(address)?.mask(spongos)?; } - let topics = user_state.cursor_store.topics(); - let amount_topics = topics.len(); + // Only keep topics that exist in cursor store, any others serve no purpose + let topics = user_state + .topics + .iter() + .filter(|t| user_state.cursor_store.get_latest_link(*t).is_some()); + let amount_topics = topics.clone().count(); self.mask(Size::new(amount_topics))?; for topic in topics { @@ -1306,10 +1324,10 @@ impl<'a> ContentWrap for wrap::Context<&'a mut [u8]> { .ok_or_else(|| anyhow!("No latest link found in branch <{}>", topic))?; self.mask(&latest_link)?; - let cursors: Vec<(&Topic, &Permissioned, usize)> = user_state + let cursors: Vec<(&TopicHash, &Permissioned, usize)> = user_state .cursor_store .cursors() - .filter(|(t, _, _)| *t == topic) + .filter(|(t, _, _)| *t == &topic.into()) .collect(); let amount_cursors = cursors.len(); self.mask(Size::new(amount_cursors))?; @@ -1363,6 +1381,7 @@ impl<'a> ContentUnwrap for unwrap::Context<&'a [u8]> { let mut latest_link = MsgId::default(); self.mask(&mut latest_link)?; + user_state.topics.insert(topic.clone()); user_state.cursor_store.set_latest_link(&topic, latest_link); let mut amount_cursors = Size::default(); diff --git a/streams/src/message/announcement.rs b/streams/src/message/announcement.rs index 445f9fed..62b70299 100644 --- a/streams/src/message/announcement.rs +++ b/streams/src/message/announcement.rs @@ -28,7 +28,7 @@ use crypto::keys::x25519; // Streams use lets::{ id::{Identifier, Identity}, - message::{ContentSign, ContentSignSizeof, ContentSizeof, ContentUnwrap, ContentVerify, ContentWrap}, + message::{ContentSign, ContentSignSizeof, ContentSizeof, ContentUnwrap, ContentVerify, ContentWrap, Topic}, }; use spongos::{ ddml::{ @@ -42,11 +42,12 @@ use spongos::{ pub(crate) struct Wrap<'a> { user_id: &'a Identity, + topic: &'a Topic, } impl<'a> Wrap<'a> { - pub(crate) fn new(user_id: &'a Identity) -> Self { - Self { user_id } + pub(crate) fn new(user_id: &'a Identity, topic: &'a Topic) -> Self { + Self { user_id, topic } } } @@ -54,6 +55,7 @@ impl<'a> Wrap<'a> { impl<'a> ContentSizeof> for sizeof::Context { async fn sizeof(&mut self, announcement: &Wrap<'a>) -> Result<&mut Self> { self.mask(&announcement.user_id.to_identifier())? + .mask(announcement.topic)? // TODO: REMOVE ONCE KE IS ENCAPSULATED WITHIN IDENTITY .absorb(&announcement.user_id._ke_sk().public_key())? .sign_sizeof(announcement.user_id) @@ -70,6 +72,7 @@ where { async fn wrap(&mut self, announcement: &mut Wrap<'a>) -> Result<&mut Self> { self.mask(&announcement.user_id.to_identifier())? + .mask(announcement.topic)? // TODO: REMOVE ONCE KE IS ENCAPSULATED WITHIN IDENTITY .absorb(&announcement.user_id._ke_sk().public_key())? .sign(announcement.user_id) @@ -84,15 +87,18 @@ pub(crate) struct Unwrap { author_id: Identifier, // TODO: REMOVE ONCE KE IS ENCAPSULATED WITHIN IDENTITY author_ke_pk: x25519::PublicKey, + topic: Topic, } impl Default for Unwrap { fn default() -> Self { let author_id = Default::default(); let author_ke_pk = x25519::PublicKey::from_bytes([0; x25519::PUBLIC_KEY_LENGTH]); + let topic = Default::default(); Self { author_id, author_ke_pk, + topic, } } } @@ -102,6 +108,10 @@ impl Unwrap { &self.author_id } + pub(crate) fn topic(&self) -> &Topic { + &self.topic + } + pub(crate) fn into_author_id(self) -> Identifier { self.author_id } @@ -120,6 +130,7 @@ where { async fn unwrap(&mut self, announcement: &mut Unwrap) -> Result<&mut Self> { self.mask(&mut announcement.author_id)? + .mask(&mut announcement.topic)? .absorb(&mut announcement.author_ke_pk)? .verify(&announcement.author_id) .await? From 959b0db910961ba002e37040cc38454813abad79 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 22 Aug 2022 11:51:57 -0600 Subject: [PATCH 3/5] reverse TopicHash as cursor_store key --- streams/src/api/cursor_store.rs | 30 ++++++++-------- streams/src/api/messages.rs | 7 ++-- streams/src/api/user.rs | 61 ++++++++++++++++----------------- 3 files changed, 48 insertions(+), 50 deletions(-) diff --git a/streams/src/api/cursor_store.rs b/streams/src/api/cursor_store.rs index c3e17da3..92634027 100644 --- a/streams/src/api/cursor_store.rs +++ b/streams/src/api/cursor_store.rs @@ -10,21 +10,21 @@ use hashbrown::HashMap; use lets::{ address::MsgId, id::{Identifier, Permissioned}, - message::{Topic, TopicHash}, + message::Topic, }; // Local #[derive(Default, Clone, PartialEq, Eq)] -pub(crate) struct CursorStore(HashMap); +pub(crate) struct CursorStore(HashMap); impl CursorStore { pub(crate) fn new() -> Self { Default::default() } - pub(crate) fn new_branch(&mut self, topic: &Topic) -> bool { - self.0.insert(topic.into(), InnerCursorStore::default()).is_none() + pub(crate) fn new_branch(&mut self, topic: Topic) -> bool { + self.0.insert(topic, InnerCursorStore::default()).is_none() } pub(crate) fn remove(&mut self, id: &Identifier) -> bool { @@ -40,7 +40,7 @@ impl CursorStore { } pub(crate) fn get_permission(&self, topic: &Topic, id: &Identifier) -> Option<&Permissioned> { - self.0.get(&topic.into()).and_then(|branch| { + self.0.get(topic).and_then(|branch| { branch .cursors .iter() @@ -50,7 +50,7 @@ impl CursorStore { } pub(crate) fn get_cursor(&self, topic: &Topic, id: &Identifier) -> Option { - self.0.get(&topic.into()).and_then(|branch| { + self.0.get(topic).and_then(|branch| { branch .cursors .iter() @@ -60,7 +60,7 @@ impl CursorStore { }) } - pub(crate) fn cursors(&self) -> impl Iterator, usize)> + Clone + '_ { + pub(crate) fn cursors(&self) -> impl Iterator, usize)> + Clone + '_ { self.0 .iter() .flat_map(|(topic, branch)| branch.cursors.iter().map(move |(id, cursor)| (topic, id, *cursor))) @@ -70,7 +70,7 @@ impl CursorStore { &self, topic: &Topic, ) -> Option, &usize)>> { - self.0.get(&topic.into()).map(|inner| inner.cursors.iter()) + self.0.get(topic).map(|inner| inner.cursors.iter()) } pub(crate) fn insert_cursor( @@ -95,12 +95,12 @@ impl CursorStore { }; self.0 - .get_mut(&topic.into()) + .get_mut(topic) .and_then(|branch| branch.cursors.insert(id, cursor)) } - pub(crate) fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option { - match self.0.get_mut(&topic.into()) { + pub(crate) fn set_latest_link(&mut self, topic: Topic, latest_link: MsgId) -> Option { + match self.0.get_mut(&topic) { Some(branch) => { branch.latest_link = latest_link; None @@ -110,13 +110,13 @@ impl CursorStore { latest_link, ..Default::default() }; - self.0.insert(topic.into(), branch) + self.0.insert(topic, branch) } } } pub(crate) fn get_latest_link(&self, topic: &Topic) -> Option { - self.0.get(&topic.into()).map(|branch| branch.latest_link) + self.0.get(topic).map(|branch| branch.latest_link) } } @@ -164,8 +164,8 @@ mod tests { let topic_1 = Topic::new("topic 1".to_string()); let topic_2 = Topic::new("topic 2".to_string()); - branch_store.new_branch(&topic_1); - branch_store.new_branch(&topic_2); + branch_store.new_branch(topic_1.clone()); + branch_store.new_branch(topic_2.clone()); branch_store.insert_cursor(&topic_1, permission.clone(), 10); branch_store.insert_cursor(&topic_2, permission.clone(), 20); diff --git a/streams/src/api/messages.rs b/streams/src/api/messages.rs index 6080a904..a94e076c 100644 --- a/streams/src/api/messages.rs +++ b/streams/src/api/messages.rs @@ -18,7 +18,7 @@ use hashbrown::HashMap; use lets::{ address::{Address, MsgId}, id::{Identifier, Permissioned}, - message::{Topic, TopicHash, TransportMessage, HDF}, + message::{Topic, TransportMessage, HDF}, transport::Transport, }; @@ -132,7 +132,7 @@ type PinBoxFut<'a, T> = Pin + 'a>>; struct MessagesState<'a, T> { user: &'a mut User, - ids_stack: Vec<(TopicHash, Permissioned, usize)>, + ids_stack: Vec<(Topic, Permissioned, usize)>, msg_queue: HashMap>, stage: VecDeque<(MsgId, TransportMessage)>, successful_round: bool, @@ -202,7 +202,7 @@ impl<'a, T> MessagesState<'a, T> { } } else { // Stage is empty, populate it with some more messages - let (topic_hash, publisher, cursor) = match self.ids_stack.pop() { + let (topic, publisher, cursor) = match self.ids_stack.pop() { Some(id_cursor) => id_cursor, None => { // new round @@ -217,7 +217,6 @@ impl<'a, T> MessagesState<'a, T> { } }; let base_address = self.user.stream_address()?.base(); - let topic = self.user.topic_by_hash(&topic_hash)?; let rel_address = MsgId::gen(base_address, publisher.identifier(), &topic, cursor + 1); let address = Address::new(base_address, rel_address); diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index 229c6bc9..de98c16e 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -177,7 +177,7 @@ impl User { self.topics().find(|t| &TopicHash::from(*t) == hash).cloned() } - pub(crate) fn cursors(&self) -> impl Iterator, usize)> + '_ { + pub(crate) fn cursors(&self) -> impl Iterator, usize)> + '_ { self.state.cursor_store.cursors() } @@ -219,7 +219,7 @@ impl User { /// Sets the latest message link for a specified branch. If the branch does not exist, it is /// created - fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option { + fn set_latest_link(&mut self, topic: Topic, latest_link: MsgId) -> Option { self.state.cursor_store.set_latest_link(topic, latest_link) } @@ -253,7 +253,7 @@ impl User { let topic = message.payload().content().topic(); // Insert new branch into store - self.state.cursor_store.new_branch(topic); + self.state.cursor_store.new_branch(topic.clone()); self.state.topics.insert(topic.clone()); // When handling an announcement it means that no cursors have been stored, as no topics are @@ -270,7 +270,7 @@ impl User { let author_ke_pk = *message.payload().content().author_ke_pk(); // Update branch links - self.set_latest_link(topic, address.relative()); + self.set_latest_link(topic.clone(), address.relative()); self.state.author_identifier = Some(author_id.clone()); self.state.exchange_keys.insert(author_id, author_ke_pk); self.state.base_branch = topic.clone(); @@ -319,7 +319,7 @@ impl User { // Store spongos self.state.spongos_store.insert(address.relative(), spongos); // Insert new branch into store - self.state.cursor_store.new_branch(new_topic); + self.state.cursor_store.new_branch(new_topic.clone()); self.state.topics.insert(new_topic.clone()); // Collect permissions from previous branch and clone them into new branch let prev_permissions = self @@ -331,7 +331,7 @@ impl User { } // Update branch links - self.set_latest_link(new_topic, address.relative()); + self.set_latest_link(new_topic.clone(), address.relative()); Ok(Message::from_lets_message(address, message)) } @@ -477,7 +477,7 @@ impl User { // Have to make message before setting branch links due to immutable borrow in keyload::unwrap let final_message = Message::from_lets_message(address, message); // Update branch links - self.set_latest_link(&topic, address.relative()); + self.set_latest_link(topic, address.relative()); Ok(final_message) } @@ -520,7 +520,7 @@ impl User { self.state.spongos_store.insert(address.relative(), spongos); // Store message content into stores - self.set_latest_link(&topic, address.relative()); + self.set_latest_link(topic, address.relative()); Ok(Message::from_lets_message(address, message)) } @@ -561,7 +561,7 @@ impl User { self.state.spongos_store.insert(address.relative(), spongos); // Store message content into stores - self.set_latest_link(&topic, address.relative()); + self.set_latest_link(topic, address.relative()); Ok(Message::from_lets_message(address, message)) } @@ -687,7 +687,7 @@ where let send_response = self.transport.send_message(stream_address, transport_msg).await?; // If a message has been sent successfully, insert the base branch into store - self.state.cursor_store.new_branch(&topic); + self.state.cursor_store.new_branch(topic.clone()); self.state.topics.insert(topic.clone()); // Commit message to stores self.state @@ -696,12 +696,12 @@ where self.state.spongos_store.insert(stream_address.relative(), spongos); // Update branch links - self.set_latest_link(&topic, stream_address.relative()); + self.set_latest_link(topic.clone(), stream_address.relative()); // Commit Author Identifier and Stream Address to store self.state.stream_address = Some(stream_address); self.state.author_identifier = Some(identifier); - self.state.base_branch = topic.clone(); + self.state.base_branch = topic; Ok(SendResponse::new(stream_address, send_response)) } @@ -768,7 +768,7 @@ where let send_response = self.transport.send_message(address, transport_msg).await?; // If message has been sent successfully, create the new branch in store - self.state.cursor_store.new_branch(&topic); + self.state.cursor_store.new_branch(topic.clone()); self.state.topics.insert(topic.clone()); // Commit message to stores and update cursors self.state.cursor_store.insert_cursor( @@ -779,16 +779,15 @@ where self.state.spongos_store.insert(address.relative(), spongos); // Collect permissions from previous branch and clone them into new branch let prev_permissions = self - .cursors() - .filter(|cursor| cursor.0 == &TopicHash::from(&prev_topic)) - .map(|(_, id, _)| id.clone()) + .cursors_by_topic(&prev_topic)? + .map(|(id, _)| id.clone()) .collect::>>(); for id in prev_permissions { self.state.cursor_store.insert_cursor(&topic, id, INIT_MESSAGE_NUM); } // Update branch links - self.state.cursor_store.set_latest_link(&topic, address.relative()); + self.state.cursor_store.set_latest_link(topic, address.relative()); Ok(SendResponse::new(address, send_response)) } @@ -1016,7 +1015,7 @@ where .insert_cursor(&topic, Permissioned::Admin(identifier), new_cursor); self.state.spongos_store.insert(rel_address, spongos); // Update Branch Links - self.set_latest_link(&topic, message_address.relative()); + self.set_latest_link(topic, message_address.relative()); Ok(SendResponse::new(message_address, send_response)) } @@ -1153,7 +1152,7 @@ where .insert_cursor(&topic, permission.clone(), new_cursor); self.state.spongos_store.insert(rel_address, spongos); // Update Branch Links - self.set_latest_link(&topic, message_address.relative()); + self.set_latest_link(topic, message_address.relative()); Ok(SendResponse::new(message_address, send_response)) } @@ -1228,7 +1227,7 @@ where .insert_cursor(&topic, permission.clone(), new_cursor); self.state.spongos_store.insert(rel_address, spongos); // Update Branch Links - self.set_latest_link(&topic, rel_address); + self.set_latest_link(topic, rel_address); Ok(SendResponse::new(message_address, send_response)) } } @@ -1263,15 +1262,15 @@ impl ContentSizeof for sizeof::Context { .ok_or_else(|| anyhow!("No latest link found in branch <{}>", topic))?; self.mask(&latest_link)?; - let cursors: Vec<(&TopicHash, &Permissioned, usize)> = user_state + let cursors: Vec<(&Permissioned, &usize)> = user_state .cursor_store - .cursors() - .filter(|(t, _, _)| *t == &topic.into()) + .cursors_by_topic(topic) + .ok_or_else(|| anyhow!("No cursors found with topic <{}>", topic))? .collect(); let amount_cursors = cursors.len(); self.mask(Size::new(amount_cursors))?; - for (_, subscriber, cursor) in cursors { - self.mask(subscriber)?.mask(Size::new(cursor))?; + for (subscriber, cursor) in cursors { + self.mask(subscriber)?.mask(Size::new(*cursor))?; } } @@ -1324,15 +1323,15 @@ impl<'a> ContentWrap for wrap::Context<&'a mut [u8]> { .ok_or_else(|| anyhow!("No latest link found in branch <{}>", topic))?; self.mask(&latest_link)?; - let cursors: Vec<(&TopicHash, &Permissioned, usize)> = user_state + let cursors: Vec<(&Permissioned, &usize)> = user_state .cursor_store - .cursors() - .filter(|(t, _, _)| *t == &topic.into()) + .cursors_by_topic(topic) + .ok_or_else(|| anyhow!("No curosrs found with topic <{}>", topic))? .collect(); let amount_cursors = cursors.len(); self.mask(Size::new(amount_cursors))?; - for (_, subscriber, cursor) in cursors { - self.mask(subscriber)?.mask(Size::new(cursor))?; + for (subscriber, cursor) in cursors { + self.mask(subscriber)?.mask(Size::new(*cursor))?; } } @@ -1382,7 +1381,7 @@ impl<'a> ContentUnwrap for unwrap::Context<&'a [u8]> { self.mask(&mut latest_link)?; user_state.topics.insert(topic.clone()); - user_state.cursor_store.set_latest_link(&topic, latest_link); + user_state.cursor_store.set_latest_link(topic.clone(), latest_link); let mut amount_cursors = Size::default(); self.mask(&mut amount_cursors)?; From 3b43db8d8e7753d324b0f8999d15a5f9e596b1be Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 22 Aug 2022 15:26:40 -0600 Subject: [PATCH 4/5] fmt --- streams/src/api/user.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/streams/src/api/user.rs b/streams/src/api/user.rs index de98c16e..9ee7aafb 100644 --- a/streams/src/api/user.rs +++ b/streams/src/api/user.rs @@ -417,11 +417,9 @@ impl User { // From the point of view of cursor tracking, the message exists, regardless of the validity or // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message - self.state.cursor_store.insert_cursor( - &topic, - Permissioned::Admin(publisher), - preparsed.header().sequence(), - ); + self.state + .cursor_store + .insert_cursor(&topic, Permissioned::Admin(publisher), preparsed.header().sequence()); // Unwrap message // Ok to unwrap since an author identifier is set at the same time as the stream address @@ -495,11 +493,9 @@ impl User { // From the point of view of cursor tracking, the message exists, regardless of the validity or // accessibility to its content. Therefore we must update the cursor of the publisher before // handling the message - self.state.cursor_store.insert_cursor( - &topic, - permission, - preparsed.header().sequence(), - ); + self.state + .cursor_store + .insert_cursor(&topic, permission, preparsed.header().sequence()); // Unwrap message let linked_msg_address = preparsed.header().linked_msg_address().ok_or_else(|| { From ba8c8dbd7cd43bd368b958b9ea5cf4905064fd32 Mon Sep 17 00:00:00 2001 From: DyrellC Date: Mon, 29 Aug 2022 11:57:41 -0600 Subject: [PATCH 5/5] fmt --- streams/src/message/announcement.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/streams/src/message/announcement.rs b/streams/src/message/announcement.rs index 8f945c44..58d22ccb 100644 --- a/streams/src/message/announcement.rs +++ b/streams/src/message/announcement.rs @@ -87,10 +87,7 @@ impl Default for Unwrap { fn default() -> Self { let author_id = Default::default(); let topic = Default::default(); - Self { - author_id, - topic, - } + Self { author_id, topic } } }