Skip to content

Commit

Permalink
fix: icx-proxy panic when decoding a response to http_request (#221)
Browse files Browse the repository at this point in the history
* fix: icx-proxy panic when decoding a response to http_request

Candid 0.7.1 does not support decoding a Record into an IDLValue.  We were using this in an unsupported way with Candid 0.6.x, but it doesn't work anymore.

So, this commit specifies the exact type of the streaming http response token.  Unfortunately, this means that we will no longer support arbitrary token types, but this is the best that we can do until Candid supports generic types: dfinity/candid#245
  • Loading branch information
ericswanson-dfinity authored Jul 6, 2021
1 parent 88606ec commit c6543eb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 53 deletions.
23 changes: 17 additions & 6 deletions ic-utils/src/interfaces/http_request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{call::SyncCall, canister::CanisterBuilder, Canister};
use candid::{parser::value::IDLValue, CandidType, Deserialize};
use candid::{parser::value::IDLValue, CandidType, Deserialize, Func, Nat};
use ic_agent::{export::Principal, Agent};
use serde_bytes::ByteBuf;
use std::fmt::Debug;

#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
Expand All @@ -18,10 +19,20 @@ pub struct HttpRequest<'body> {
pub body: &'body [u8],
}

#[derive(CandidType, Deserialize)]
pub struct Token {
key: String,
content_encoding: String,
index: Nat,
// The sha ensures that a client doesn't stream part of one version of an asset
// followed by part of a different asset, even if not checking the certificate.
sha256: Option<ByteBuf>,
}

#[derive(CandidType, Deserialize)]
pub struct CallbackStrategy {
pub callback: IDLValue,
pub token: IDLValue,
pub callback: Func,
pub token: Token,
}

#[derive(CandidType, Deserialize)]
Expand All @@ -42,7 +53,7 @@ pub struct HttpResponse {
pub struct StreamingCallbackHttpResponse {
#[serde(with = "serde_bytes")]
pub body: Vec<u8>,
pub token: Option<IDLValue>,
pub token: Option<Token>,
}

impl HttpRequestCanister {
Expand Down Expand Up @@ -87,8 +98,8 @@ impl<'agent> Canister<'agent, HttpRequestCanister> {
pub fn http_request_stream_callback<'canister: 'agent, M: Into<String>>(
&'canister self,
method: M,
token: IDLValue,
token: Token,
) -> impl 'agent + SyncCall<(StreamingCallbackHttpResponse,)> {
self.query_(&method.into()).with_value_arg(token).build()
self.query_(&method.into()).with_arg(token).build()
}
}
80 changes: 33 additions & 47 deletions icx-proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::config::dns_canister_config::DnsCanisterConfig;
use candid::parser::value::IDLValue;
use clap::{crate_authors, crate_version, AppSettings, Clap};
use hyper::{
body,
Expand Down Expand Up @@ -256,58 +255,45 @@ async fn forward_request(

match streaming_strategy {
StreamingStrategy::Callback(callback) => {
match callback.callback {
IDLValue::Func(streaming_canister_id_id, method_name) => {
let mut callback_token = callback.token;
let logger = logger.clone();
tokio::spawn(async move {
let canister =
HttpRequestCanister::create(&agent, streaming_canister_id_id);
// We have not yet called http_request_stream_callback.
let mut count = 0;
loop {
count += 1;
if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT {
let streaming_canister_id_id = callback.callback.principal;
let method_name = callback.callback.method;
let mut callback_token = callback.token;
let logger = logger.clone();
tokio::spawn(async move {
let canister = HttpRequestCanister::create(&agent, streaming_canister_id_id);
// We have not yet called http_request_stream_callback.
let mut count = 0;
loop {
count += 1;
if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT {
sender.abort();
break;
}

match canister
.http_request_stream_callback(&method_name, callback_token)
.call()
.await
{
Ok((StreamingCallbackHttpResponse { body, token },)) => {
if sender.send_data(Bytes::from(body)).await.is_err() {
sender.abort();
break;
}

match canister
.http_request_stream_callback(&method_name, callback_token)
.call()
.await
{
Ok((StreamingCallbackHttpResponse { body, token },)) => {
if sender.send_data(Bytes::from(body)).await.is_err() {
sender.abort();
break;
}
if let Some(next_token) = token {
callback_token = next_token;
} else {
break;
}
}
Err(e) => {
slog::debug!(
logger,
"Error happened during streaming: {}",
e
);
sender.abort();
break;
}
if let Some(next_token) = token {
callback_token = next_token;
} else {
break;
}
}
});
}
_ => {
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body("Streaming callback must be a function.".into())
.unwrap())
Err(e) => {
slog::debug!(logger, "Error happened during streaming: {}", e);
sender.abort();
break;
}
}
}
}
});
}
}

Expand Down

0 comments on commit c6543eb

Please sign in to comment.