From c6543eb3a2a9df0c706092de75f8888331ae81b1 Mon Sep 17 00:00:00 2001 From: Eric Swanson <64809312+ericswanson-dfinity@users.noreply.github.com> Date: Tue, 6 Jul 2021 13:36:09 -0700 Subject: [PATCH] fix: icx-proxy panic when decoding a response to http_request (#221) * fix: icx-proxy panic when decoding a response to http_request Candid 0.7.1 does not support decoding a Record into an IDLValue. We were using this in an unsupported way with Candid 0.6.x, but it doesn't work anymore. So, this commit specifies the exact type of the streaming http response token. Unfortunately, this means that we will no longer support arbitrary token types, but this is the best that we can do until Candid supports generic types: https://github.com/dfinity/candid/issues/245 --- ic-utils/src/interfaces/http_request.rs | 23 +++++-- icx-proxy/src/main.rs | 80 ++++++++++--------------- 2 files changed, 50 insertions(+), 53 deletions(-) diff --git a/ic-utils/src/interfaces/http_request.rs b/ic-utils/src/interfaces/http_request.rs index ad8ce0c9..20faf9ca 100644 --- a/ic-utils/src/interfaces/http_request.rs +++ b/ic-utils/src/interfaces/http_request.rs @@ -1,6 +1,7 @@ use crate::{call::SyncCall, canister::CanisterBuilder, Canister}; -use candid::{parser::value::IDLValue, CandidType, Deserialize}; +use candid::{parser::value::IDLValue, CandidType, Deserialize, Func, Nat}; use ic_agent::{export::Principal, Agent}; +use serde_bytes::ByteBuf; use std::fmt::Debug; #[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)] @@ -18,10 +19,20 @@ pub struct HttpRequest<'body> { pub body: &'body [u8], } +#[derive(CandidType, Deserialize)] +pub struct Token { + key: String, + content_encoding: String, + index: Nat, + // The sha ensures that a client doesn't stream part of one version of an asset + // followed by part of a different asset, even if not checking the certificate. + sha256: Option, +} + #[derive(CandidType, Deserialize)] pub struct CallbackStrategy { - pub callback: IDLValue, - pub token: IDLValue, + pub callback: Func, + pub token: Token, } #[derive(CandidType, Deserialize)] @@ -42,7 +53,7 @@ pub struct HttpResponse { pub struct StreamingCallbackHttpResponse { #[serde(with = "serde_bytes")] pub body: Vec, - pub token: Option, + pub token: Option, } impl HttpRequestCanister { @@ -87,8 +98,8 @@ impl<'agent> Canister<'agent, HttpRequestCanister> { pub fn http_request_stream_callback<'canister: 'agent, M: Into>( &'canister self, method: M, - token: IDLValue, + token: Token, ) -> impl 'agent + SyncCall<(StreamingCallbackHttpResponse,)> { - self.query_(&method.into()).with_value_arg(token).build() + self.query_(&method.into()).with_arg(token).build() } } diff --git a/icx-proxy/src/main.rs b/icx-proxy/src/main.rs index ddde9c04..7391fbb6 100644 --- a/icx-proxy/src/main.rs +++ b/icx-proxy/src/main.rs @@ -1,5 +1,4 @@ use crate::config::dns_canister_config::DnsCanisterConfig; -use candid::parser::value::IDLValue; use clap::{crate_authors, crate_version, AppSettings, Clap}; use hyper::{ body, @@ -256,58 +255,45 @@ async fn forward_request( match streaming_strategy { StreamingStrategy::Callback(callback) => { - match callback.callback { - IDLValue::Func(streaming_canister_id_id, method_name) => { - let mut callback_token = callback.token; - let logger = logger.clone(); - tokio::spawn(async move { - let canister = - HttpRequestCanister::create(&agent, streaming_canister_id_id); - // We have not yet called http_request_stream_callback. - let mut count = 0; - loop { - count += 1; - if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT { + let streaming_canister_id_id = callback.callback.principal; + let method_name = callback.callback.method; + let mut callback_token = callback.token; + let logger = logger.clone(); + tokio::spawn(async move { + let canister = HttpRequestCanister::create(&agent, streaming_canister_id_id); + // We have not yet called http_request_stream_callback. + let mut count = 0; + loop { + count += 1; + if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT { + sender.abort(); + break; + } + + match canister + .http_request_stream_callback(&method_name, callback_token) + .call() + .await + { + Ok((StreamingCallbackHttpResponse { body, token },)) => { + if sender.send_data(Bytes::from(body)).await.is_err() { sender.abort(); break; } - - match canister - .http_request_stream_callback(&method_name, callback_token) - .call() - .await - { - Ok((StreamingCallbackHttpResponse { body, token },)) => { - if sender.send_data(Bytes::from(body)).await.is_err() { - sender.abort(); - break; - } - if let Some(next_token) = token { - callback_token = next_token; - } else { - break; - } - } - Err(e) => { - slog::debug!( - logger, - "Error happened during streaming: {}", - e - ); - sender.abort(); - break; - } + if let Some(next_token) = token { + callback_token = next_token; + } else { + break; } } - }); - } - _ => { - return Ok(Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body("Streaming callback must be a function.".into()) - .unwrap()) + Err(e) => { + slog::debug!(logger, "Error happened during streaming: {}", e); + sender.abort(); + break; + } + } } - } + }); } }