Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

[Stardust] Transport WIP #284

Open
wants to merge 5 commits into
base: stardust
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions lets/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ version = "0.1.2"
[features]
default = ["utangle-client"]
# Enable the IOTA-Tangle transport client (implies `std` features)
tangle-client = ["iota-client/async", "futures", "iota-crypto/blake2b"]
tangle-client = ["iota-client", "futures", "iota-crypto/blake2b"]
# Enable the wasm-compatible IOTA-Tangle transport client (incompatile with `tangle-client` feature due to `iota-client/async` using `tokio`. Implies `std` feature)
tangle-client-wasm = ["iota-client/wasm", "futures"]
tangle-client-wasm = ["futures"]
# Enable the Streams-specific uTangle Client
utangle-client = ["reqwest", "bee-ternary", "serde", "rayon", "iota-crypto/curl-p"]
# Enable Iota Identity for use with Streams
Expand All @@ -36,12 +36,14 @@ iota-crypto = {version = "0.9.1", default-features = false, features = ["x25519"
anyhow = {version = "1.0", default-features = false}
async-trait = {version = "0.1", default-features = false}
hex = {version = "0.4", default-features = false}
prefix-hex = { version = "0.5", default-features = false }

# Optional dependencies
bee-ternary = {version = "0.5.2", default-features = false, optional = true}
futures = {version = "0.3.8", default-features = false, optional = true}
futures = {version = "0.3.24", default-features = false, optional = true}
identity_iota = {git = "https://github.com/iotaledger/identity.rs", rev = "d3920c2", default-features = false, optional = true}
iota-client = {version = "1.1.1", default-features = false, optional = true}
#iota-client = {version = "1.1.1", default-features = false, optional = true}
iota-client = {git = "https://github.com/iotaledger/iota.rs", rev = "dd4fcc689d39174b65e605c84b81aa002aaa408d", default-features = false, optional = true}
parking_lot = {version = "0.11.2", default-features = false, optional = true}
reqwest = {version = "0.11.11", optional = true, default-features = false, features = ["json", "rustls-tls"]}
serde = {version = "1.0", default-features = false, features = ["derive"], optional = true}
Expand Down
10 changes: 5 additions & 5 deletions lets/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use core::{
use serde_big_array::BigArray;

// IOTA
use crypto::hashes::{blake2b::Blake2b256, Digest};
use crypto::hashes::{blake2b::Blake2b160, Digest};

// Streams
use spongos::{
Expand Down Expand Up @@ -103,14 +103,14 @@ impl Address {
self.appaddr
}

/// Hash the content of the [`Address`] using `Blake2b256`
pub fn to_blake2b(self) -> [u8; 32] {
let hasher = Blake2b256::new();
/// Hash the content of the [`Address`] using `Blake2b160`
pub fn to_blake2b(self) -> [u8; 20] {
let hasher = Blake2b160::new();
hasher.chain(self.base()).chain(self.relative()).finalize().into()
}

/// Hash the content of the [`Address`] using `Blake2b256`
pub fn to_msg_index(self) -> [u8; 32] {
pub fn to_msg_index(self) -> [u8; 20] {
self.to_blake2b()
}
}
Expand Down
65 changes: 42 additions & 23 deletions lets/src/transport/tangle.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Rust
use alloc::{boxed::Box, vec::Vec};
use alloc::{boxed::Box, string::ToString, vec::Vec};
use core::{
convert::{TryFrom, TryInto},
marker::PhantomData,
str::FromStr,
};

// 3rd-party
Expand All @@ -13,7 +14,10 @@ use futures::{
};

// IOTA
use iota_client::bee_message::{payload::Payload, Message as IotaMessage};
use iota_client::{
block::{payload::Payload, Block, BlockId},
node_api::indexer::query_parameters::QueryParameter,
};

// Streams

Expand Down Expand Up @@ -50,7 +54,6 @@ impl<Message, SendResponse> Client<Message, SendResponse> {
.map_err(|e| Error::IotaClient("building client", e))?
.with_local_pow(true)
.finish()
.await
.map_err(|e| Error::External(e.into()))?,
PhantomData,
))
Expand All @@ -70,8 +73,8 @@ impl<Message, SendResponse> Client<Message, SendResponse> {
#[async_trait(?Send)]
impl<Message, SendResponse> Transport<'_> for Client<Message, SendResponse>
where
Message: Into<Vec<u8>> + TryFrom<IotaMessage, Error = crate::error::Error>,
SendResponse: TryFrom<IotaMessage, Error = crate::error::Error>,
Message: Into<Vec<u8>> + TryFrom<Block, Error = crate::error::Error>,
SendResponse: TryFrom<Block, Error = crate::error::Error>,
{
type Msg = Message;
type SendResponse = SendResponse;
Expand All @@ -85,14 +88,18 @@ where
where
Message: 'async_trait,
{
self.client()
.message()
.with_index(address.to_msg_index())
let tag = prefix_hex::encode(address.to_msg_index());
let block = self
.client()
.block()
.with_tag(tag.as_bytes().to_vec())
.with_data(msg.into())
.finish()
.await
.map_err(|e| Error::IotaClient("sending message", e))?
.try_into()
.map_err(|e| Error::IotaClient("sending message", e))?;
let id = block.id();
let _id_str = id.to_string();
block.try_into()
}

/// Retrieves a message indexed at the provided [`Address`] from the tangle. Errors if no
Expand All @@ -101,39 +108,51 @@ where
/// # Arguments
/// * `address`: The address of the message to retrieve.
async fn recv_messages(&mut self, address: Address) -> Result<Vec<Message>> {
let msg_ids = self
let tag = prefix_hex::encode(address.to_msg_index());
let output_ids = self
.client()
.get_message()
.index(address.to_msg_index())
.basic_output_ids(vec![QueryParameter::Tag(tag)])
.await
.map_err(|e| Error::IotaClient("get messages by index", e))?;

if msg_ids.is_empty() {
if output_ids.is_empty() {
return Err(Error::MessageMissing(address, "transport"));
}

let msgs = try_join_all(msg_ids.iter().map(|msg| {
let outputs = try_join_all(output_ids.iter().map(|output| {
self.client()
.get_message()
.data(msg)
.get_output(output)
.map_err(|e| Error::IotaClient("receiving message", e))
.and_then(|iota_message| ready(iota_message.try_into()))
.and_then(|output| {
ready(
BlockId::from_str(&output.metadata.block_id)
.map_err(|e| Error::IotaClient("creating BlockId", e.into())),
)
})
}))
.await?;

let msgs = try_join_all(outputs.iter().map(|blockid| {
self.client()
.get_block(blockid)
.map_err(|e| Error::IotaClient("get iota block by id", e))
.and_then(|msg| ready(msg.try_into()))
}))
.await?;
Ok(msgs)
}
}

impl TryFrom<IotaMessage> for TransportMessage {
impl TryFrom<Block> for TransportMessage {
type Error = crate::error::Error;
fn try_from(message: IotaMessage) -> Result<Self> {
if let Some(Payload::Indexation(indexation)) = message.payload() {
fn try_from(message: Block) -> Result<Self> {
if let Some(Payload::TaggedData(indexation)) = message.payload() {
Ok(Self::new(indexation.data().into()))
} else {
Err(Error::Malformed(
"payload from the Tangle",
"IndexationPayload",
alloc::string::ToString::to_string(&message.id().0),
"TaggedDataPayload",
alloc::string::ToString::to_string(&message.id()),
))
}
}
Expand Down
2 changes: 1 addition & 1 deletion streams/src/api/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ where
.transport
.recv_message(address)
.await
.map_err(|e| Error::Transport(address, "recv_message", e))?;
.map_err(|e| Error::Transport(address, "receive message", e))?;
self.handle_message(address, msg).await
}

Expand Down
2 changes: 1 addition & 1 deletion streams/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Any {0} message must be linked to a previous message by including the address of
#[error("Topic {0} not found in store")]
TopicNotFound(Topic),

#[error("Transport error while trying to {0} for address {1}; Error: {2}")]
#[error("Transport error while trying to {1} for address {0}; Error: {2}")]
Transport(Address, &'static str, LetsError),

#[error("PSK by id {0} is not known")]
Expand Down