Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #273 from DyrellC/v2.0/signature-in-hdf
Browse files Browse the repository at this point in the history
[V2.0] HDF Update and TopicHash Introduction
  • Loading branch information
Brord van Wierst authored Sep 20, 2022
2 parents af0716c + ba8c8db commit c8d2e30
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 108 deletions.
38 changes: 23 additions & 15 deletions lets/src/message/hdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use async_trait::async_trait;

use spongos::{
ddml::{
commands::{sizeof, unwrap, wrap, Absorb, Guard, Mask, Skip},
commands::{sizeof, unwrap, wrap, Absorb, Commit, Guard, Mask, Skip, Squeeze},
io,
modifiers::External,
types::{Maybe, NBytes, Size, Uint8},
types::{Mac, Maybe, NBytes, Size, Uint8},
},
PRP,
};
Expand All @@ -17,11 +17,13 @@ use crate::{
id::Identifier,
message::{
content::{ContentSizeof, ContentUnwrap, ContentWrap},
topic::Topic,
topic::{Topic, TopicHash},
version::{HDF_ID, STREAMS_1_VER, UTF8},
},
};

const MAC: Mac = Mac::new(32);

#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[allow(clippy::upper_case_acronyms)]
Expand All @@ -38,7 +40,7 @@ pub struct HDF {
pub linked_msg_address: Option<MsgId>,
pub sequence: usize,
pub publisher: Identifier,
pub topic: Topic,
pub topic_hash: TopicHash,
}

impl Default for HDF {
Expand All @@ -53,13 +55,13 @@ impl Default for HDF {
linked_msg_address: Default::default(),
sequence: 0,
publisher: Default::default(),
topic: Default::default(),
topic_hash: Default::default(),
}
}
}

impl HDF {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: Topic) -> Result<Self> {
pub fn new(message_type: u8, sequence: usize, publisher: Identifier, topic: &Topic) -> Result<Self> {
ensure!(
message_type >> 4 == 0,
anyhow!(
Expand All @@ -77,7 +79,7 @@ impl HDF {
linked_msg_address: None,
sequence,
publisher,
topic,
topic_hash: topic.into(),
})
}

Expand Down Expand Up @@ -122,8 +124,8 @@ impl HDF {
self.linked_msg_address
}

pub fn topic(&self) -> &Topic {
&self.topic
pub fn topic_hash(&self) -> &TopicHash {
&self.topic_hash
}
}

Expand All @@ -139,9 +141,11 @@ impl ContentSizeof<HDF> for sizeof::Context {
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.topic_hash)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;
.skip(Size::new(hdf.sequence))?
.commit()?
.squeeze(&MAC)?;

Ok(self)
}
Expand Down Expand Up @@ -176,9 +180,11 @@ where
.absorb(Uint8::new(hdf.frame_type))?
.skip(payload_frame_count)?
.absorb(Maybe::new(hdf.linked_msg_address.as_ref()))?
.mask(&hdf.topic)?
.mask(&hdf.topic_hash)?
.mask(&hdf.publisher)?
.skip(Size::new(hdf.sequence))?;
.skip(Size::new(hdf.sequence))?
.commit()?
.squeeze(&MAC)?;

Ok(self)
}
Expand Down Expand Up @@ -226,9 +232,11 @@ where
anyhow!("first 2 bits of payload-frame-count are reserved"),
)?
.absorb(Maybe::new(&mut hdf.linked_msg_address))?
.mask(&mut hdf.topic)?
.mask(&mut hdf.topic_hash)?
.mask(&mut hdf.publisher)?
.skip(&mut seq_num)?;
.skip(&mut seq_num)?
.commit()?
.squeeze(&MAC)?;

hdf.encoding = encoding.inner();
hdf.version = version.inner();
Expand Down
2 changes: 1 addition & 1 deletion lets/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub use hdf::HDF;
pub use message::Message;
pub use pcf::PCF;
pub use preparsed::PreparsedMessage;
pub use topic::Topic;
pub use topic::{Topic, TopicHash};
pub use transport::TransportMessage;
47 changes: 45 additions & 2 deletions lets/src/message/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use spongos::{
ddml::{
commands::{sizeof, unwrap, wrap, Mask},
io,
types::Bytes,
types::{Bytes, NBytes},
},
PRP,
KeccakF1600, Spongos, PRP,
};

#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
Expand Down Expand Up @@ -105,3 +105,46 @@ where
Ok(self)
}
}

#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Debug, Default, Hash)]
pub struct TopicHash([u8; 16]);

impl From<&Topic> for TopicHash {
fn from(topic: &Topic) -> Self {
let topic_hash: [u8; 16] = Spongos::<KeccakF1600>::init().sponge(topic.as_ref());
Self(topic_hash)
}
}

impl AsRef<[u8]> for TopicHash {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl Mask<&TopicHash> for sizeof::Context {
fn mask(&mut self, topic_hash: &TopicHash) -> anyhow::Result<&mut Self> {
self.mask(NBytes::<[u8; 16]>::new(topic_hash.0))
}
}

impl<OS, F> Mask<&TopicHash> for wrap::Context<OS, F>
where
F: PRP,
OS: io::OStream,
{
fn mask(&mut self, topic_hash: &TopicHash) -> anyhow::Result<&mut Self> {
self.mask(NBytes::<[u8; 16]>::new(topic_hash.0))
}
}

impl<IS, F> Mask<&mut TopicHash> for unwrap::Context<IS, F>
where
F: PRP,
IS: io::IStream,
{
fn mask(&mut self, topic_hash: &mut TopicHash) -> anyhow::Result<&mut Self> {
self.mask(NBytes::<&mut [u8; 16]>::new(&mut topic_hash.0))?;
Ok(self)
}
}
10 changes: 3 additions & 7 deletions streams/src/api/cursor_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ impl CursorStore {
self.0.insert(topic, InnerCursorStore::default()).is_none()
}

pub(crate) fn topics(&self) -> impl Iterator<Item = &Topic> + ExactSizeIterator {
self.0.keys()
}

pub(crate) fn remove(&mut self, id: &Identifier) -> bool {
let removals = self.0.values_mut().flat_map(|branch| {
branch
Expand Down Expand Up @@ -103,8 +99,8 @@ impl CursorStore {
.and_then(|branch| branch.cursors.insert(id, cursor))
}

pub(crate) fn set_latest_link(&mut self, topic: &Topic, latest_link: MsgId) -> Option<InnerCursorStore> {
match self.0.get_mut(topic) {
pub(crate) fn set_latest_link(&mut self, topic: Topic, latest_link: MsgId) -> Option<InnerCursorStore> {
match self.0.get_mut(&topic) {
Some(branch) => {
branch.latest_link = latest_link;
None
Expand All @@ -114,7 +110,7 @@ impl CursorStore {
latest_link,
..Default::default()
};
self.0.insert(topic.clone(), branch)
self.0.insert(topic, branch)
}
}
}
Expand Down
Loading

0 comments on commit c8d2e30

Please sign in to comment.