Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
Add TopicHash, store cursors by hash, hash topic in hdf
Browse files Browse the repository at this point in the history
  • Loading branch information
DyrellC committed Aug 5, 2022
1 parent 4a0d83e commit d0d268b
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 101 deletions.
20 changes: 10 additions & 10 deletions lets/src/message/hdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
id::Identifier,
message::{
content::{ContentSizeof, ContentUnwrap, ContentWrap},
topic::Topic,
topic::{Topic, TopicHash},
version::{HDF_ID, STREAMS_1_VER, UTF8},
},
};
Expand All @@ -40,7 +40,7 @@ pub struct HDF {
pub linked_msg_address: Option<MsgId>,
pub sequence: usize,
pub publisher: Identifier,
pub topic: Topic,
pub topic_hash: TopicHash,
}

impl Default for HDF {
Expand All @@ -55,13 +55,13 @@ impl Default for HDF {
linked_msg_address: Default::default(),
sequence: 0,
publisher: Default::default(),
topic: Default::default(),
topic_hash: Default::default(),
}
}
}

impl HDF {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: Topic) -> Result<Self> {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: &Topic) -> Result<Self> {
ensure!(
message_type >> 4 == 0,
anyhow!(
Expand All @@ -79,7 +79,7 @@ impl HDF {
linked_msg_address: None,
sequence,
publisher,
topic,
topic_hash: topic.into(),
})
}

Expand Down Expand Up @@ -124,8 +124,8 @@ impl HDF {
self.linked_msg_address
}

pub fn topic(&self) -> &Topic {
&self.topic
pub fn topic_hash(&self) -> &TopicHash {
&self.topic_hash
}
}

Expand All @@ -141,7 +141,7 @@ impl ContentSizeof<HDF> for sizeof::Context {
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.topic_hash)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?
.commit()?
Expand Down Expand Up @@ -180,7 +180,7 @@ where
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.topic_hash)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?
.commit()?
Expand Down Expand Up @@ -232,7 +232,7 @@ where
anyhow!("first 2 bits of payload-frame-count are reserved"),
)?
.absorb(Maybe::new(&mut hdf.linked_msg_address))?
.mask(&mut hdf.topic)?
.mask(&mut hdf.topic_hash)?
.mask(&mut hdf.publisher)?
.skip(&mut seq_num)?
.commit()?
Expand Down
2 changes: 1 addition & 1 deletion lets/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub use hdf::HDF;
pub use message::Message;
pub use pcf::PCF;
pub use preparsed::PreparsedMessage;
pub use topic::Topic;
pub use topic::{Topic, TopicHash};
pub use transport::TransportMessage;
47 changes: 45 additions & 2 deletions lets/src/message/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use spongos::{
ddml::{
commands::{sizeof, unwrap, wrap, Mask},
io,
types::Bytes,
types::{Bytes, NBytes},
},
PRP,
KeccakF1600, Spongos, PRP,
};

#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
Expand Down Expand Up @@ -105,3 +105,46 @@ where
Ok(self)
}
}

#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Debug, Default, Hash)]
pub struct TopicHash([u8; 16]);

impl From<&Topic> for TopicHash {
fn from(topic: &Topic) -> Self {
let topic_hash: [u8; 16] = Spongos::<KeccakF1600>::init().sponge(topic.as_ref());
Self(topic_hash)
}
}

impl AsRef<[u8]> for TopicHash {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl Mask<&TopicHash> for sizeof::Context {
fn mask(&mut self, topic_hash: &TopicHash) -> anyhow::Result<&mut Self> {
self.mask(NBytes::<[u8; 16]>::new(topic_hash.0))
}
}

impl<OS, F> Mask<&TopicHash> for wrap::Context<OS, F>
where
F: PRP,
OS: io::OStream,
{
fn mask(&mut self, topic_hash: &TopicHash) -> anyhow::Result<&mut Self> {
self.mask(NBytes::<[u8; 16]>::new(topic_hash.0))
}
}

impl<IS, F> Mask<&mut TopicHash> for unwrap::Context<IS, F>
where
F: PRP,
IS: io::IStream,
{
fn mask(&mut self, topic_hash: &mut TopicHash) -> anyhow::Result<&mut Self> {
self.mask(NBytes::<&mut [u8; 16]>::new(&mut topic_hash.0))?;
Ok(self)
}
}
38 changes: 20 additions & 18 deletions streams/src/api/cursor_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,29 @@ use hashbrown::HashMap;
// IOTA

// Streams
use lets::{address::MsgId, id::Identifier, message::Topic};
use lets::{
address::MsgId,
id::Identifier,
message::{Topic, TopicHash},
};

// Local

#[derive(Default, Clone, PartialEq, Eq)]
pub(crate) struct CursorStore(HashMap<Topic, InnerCursorStore>);
pub(crate) struct CursorStore(HashMap<TopicHash, InnerCursorStore>);

impl CursorStore {
pub(crate) fn new() -> Self {
Default::default()
}

pub(crate) fn new_branch(&mut self, topic: Topic) -> bool {
self.0.insert(topic, InnerCursorStore::default()).is_none()
}

pub(crate) fn topics(&self) -> impl Iterator<Item = &Topic> + ExactSizeIterator {
self.0.keys()
pub(crate) fn new_branch(&mut self, topic: &Topic) -> bool {
self.0.insert(topic.into(), InnerCursorStore::default()).is_none()
}

pub(crate) fn is_cursor_tracked(&self, topic: &Topic, id: &Identifier) -> bool {
self.0
.get(topic)
.get(&topic.into())
.map_or(false, |branch| branch.cursors.contains_key(id))
}

Expand All @@ -39,28 +39,30 @@ impl CursorStore {
}

pub(crate) fn get_cursor(&self, topic: &Topic, id: &Identifier) -> Option<usize> {
self.0.get(topic).and_then(|branch| branch.cursors.get(id).copied())
self.0
.get(&topic.into())
.and_then(|branch| branch.cursors.get(id).copied())
}

pub(crate) fn cursors(&self) -> impl Iterator<Item = (&Topic, &Identifier, usize)> + Clone + '_ {
pub(crate) fn cursors(&self) -> impl Iterator<Item = (&TopicHash, &Identifier, usize)> + Clone + '_ {
self.0
.iter()
.flat_map(|(topic, branch)| branch.cursors.iter().map(move |(id, cursor)| (topic, id, *cursor)))
}

pub(crate) fn cursors_by_topic(&self, topic: &Topic) -> Option<impl Iterator<Item = (&Identifier, &usize)>> {
self.0.get(topic).map(|inner| inner.cursors.iter())
self.0.get(&topic.into()).map(|inner| inner.cursors.iter())
}

pub(crate) fn insert_cursor(&mut self, topic: &Topic, id: Identifier, cursor: usize) -> Option<usize> {
if let Some(branch) = self.0.get_mut(topic) {
if let Some(branch) = self.0.get_mut(&topic.into()) {
return branch.cursors.insert(id, cursor);
}
None
}

pub(crate) fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option<InnerCursorStore> {
match self.0.get_mut(topic) {
match self.0.get_mut(&topic.into()) {
Some(branch) => {
branch.latest_link = latest_link;
None
Expand All @@ -70,13 +72,13 @@ impl CursorStore {
latest_link,
..Default::default()
};
self.0.insert(topic.clone(), branch)
self.0.insert(topic.into(), branch)
}
}
}

pub(crate) fn get_latest_link(&self, topic: &Topic) -> Option<MsgId> {
self.0.get(topic).map(|branch| branch.latest_link)
self.0.get(&topic.into()).map(|branch| branch.latest_link)
}
}

Expand Down Expand Up @@ -123,8 +125,8 @@ mod tests {
let topic_1 = Topic::new("topic 1".to_string());
let topic_2 = Topic::new("topic 2".to_string());

branch_store.new_branch(topic_1.clone());
branch_store.new_branch(topic_2.clone());
branch_store.new_branch(&topic_1);
branch_store.new_branch(&topic_2);

branch_store.insert_cursor(&topic_1, identifier.clone(), 10);
branch_store.insert_cursor(&topic_2, identifier.clone(), 20);
Expand Down
9 changes: 5 additions & 4 deletions streams/src/api/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use hashbrown::HashMap;
use lets::{
address::{Address, MsgId},
id::Identifier,
message::{Topic, TransportMessage, HDF},
message::{TopicHash, TransportMessage, HDF},
transport::Transport,
};

Expand Down Expand Up @@ -132,7 +132,7 @@ type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;

struct MessagesState<'a, T> {
user: &'a mut User<T>,
ids_stack: Vec<(Topic, Identifier, usize)>,
ids_stack: Vec<(TopicHash, Identifier, usize)>,
msg_queue: HashMap<MsgId, VecDeque<(MsgId, TransportMessage)>>,
stage: VecDeque<(MsgId, TransportMessage)>,
successful_round: bool,
Expand Down Expand Up @@ -202,16 +202,17 @@ impl<'a, T> MessagesState<'a, T> {
}
} else {
// Stage is empty, populate it with some more messages
let (topic, publisher, cursor) = match self.ids_stack.pop() {
let (topic_hash, publisher, cursor) = match self.ids_stack.pop() {
Some(id_cursor) => id_cursor,
None => {
// new round
self.successful_round = false;
self.ids_stack = self.user.cursors().map(|(t, p, c)| (t.clone(), p.clone(), c)).collect();
self.ids_stack = self.user.cursors().map(|(t, p, c)| (*t, p.clone(), c)).collect();
self.ids_stack.pop()?
}
};
let base_address = self.user.stream_address()?.base();
let topic = self.user.topic_by_hash(&topic_hash)?;
let rel_address = MsgId::gen(base_address, &publisher, &topic, cursor + 1);
let address = Address::new(base_address, rel_address);

Expand Down
Loading

0 comments on commit d0d268b

Please sign in to comment.