diff --git a/ic-utils/src/interfaces/http_request.rs b/ic-utils/src/interfaces/http_request.rs index ad8ce0c9..20faf9ca 100644 --- a/ic-utils/src/interfaces/http_request.rs +++ b/ic-utils/src/interfaces/http_request.rs @@ -1,6 +1,7 @@ use crate::{call::SyncCall, canister::CanisterBuilder, Canister}; -use candid::{parser::value::IDLValue, CandidType, Deserialize}; +use candid::{parser::value::IDLValue, CandidType, Deserialize, Func, Nat}; use ic_agent::{export::Principal, Agent}; +use serde_bytes::ByteBuf; use std::fmt::Debug; #[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)] @@ -18,10 +19,20 @@ pub struct HttpRequest<'body> { pub body: &'body [u8], } +#[derive(CandidType, Deserialize)] +pub struct Token { + key: String, + content_encoding: String, + index: Nat, + // The sha ensures that a client doesn't stream part of one version of an asset + // followed by part of a different asset, even if not checking the certificate. + sha256: Option, +} + #[derive(CandidType, Deserialize)] pub struct CallbackStrategy { - pub callback: IDLValue, - pub token: IDLValue, + pub callback: Func, + pub token: Token, } #[derive(CandidType, Deserialize)] @@ -42,7 +53,7 @@ pub struct HttpResponse { pub struct StreamingCallbackHttpResponse { #[serde(with = "serde_bytes")] pub body: Vec, - pub token: Option, + pub token: Option, } impl HttpRequestCanister { @@ -87,8 +98,8 @@ impl<'agent> Canister<'agent, HttpRequestCanister> { pub fn http_request_stream_callback<'canister: 'agent, M: Into>( &'canister self, method: M, - token: IDLValue, + token: Token, ) -> impl 'agent + SyncCall<(StreamingCallbackHttpResponse,)> { - self.query_(&method.into()).with_value_arg(token).build() + self.query_(&method.into()).with_arg(token).build() } } diff --git a/icx-proxy/src/main.rs b/icx-proxy/src/main.rs index ddde9c04..7391fbb6 100644 --- a/icx-proxy/src/main.rs +++ b/icx-proxy/src/main.rs @@ -1,5 +1,4 @@ use crate::config::dns_canister_config::DnsCanisterConfig; -use candid::parser::value::IDLValue; use clap::{crate_authors, crate_version, AppSettings, Clap}; use hyper::{ body, @@ -256,58 +255,45 @@ async fn forward_request( match streaming_strategy { StreamingStrategy::Callback(callback) => { - match callback.callback { - IDLValue::Func(streaming_canister_id_id, method_name) => { - let mut callback_token = callback.token; - let logger = logger.clone(); - tokio::spawn(async move { - let canister = - HttpRequestCanister::create(&agent, streaming_canister_id_id); - // We have not yet called http_request_stream_callback. - let mut count = 0; - loop { - count += 1; - if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT { + let streaming_canister_id_id = callback.callback.principal; + let method_name = callback.callback.method; + let mut callback_token = callback.token; + let logger = logger.clone(); + tokio::spawn(async move { + let canister = HttpRequestCanister::create(&agent, streaming_canister_id_id); + // We have not yet called http_request_stream_callback. + let mut count = 0; + loop { + count += 1; + if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT { + sender.abort(); + break; + } + + match canister + .http_request_stream_callback(&method_name, callback_token) + .call() + .await + { + Ok((StreamingCallbackHttpResponse { body, token },)) => { + if sender.send_data(Bytes::from(body)).await.is_err() { sender.abort(); break; } - - match canister - .http_request_stream_callback(&method_name, callback_token) - .call() - .await - { - Ok((StreamingCallbackHttpResponse { body, token },)) => { - if sender.send_data(Bytes::from(body)).await.is_err() { - sender.abort(); - break; - } - if let Some(next_token) = token { - callback_token = next_token; - } else { - break; - } - } - Err(e) => { - slog::debug!( - logger, - "Error happened during streaming: {}", - e - ); - sender.abort(); - break; - } + if let Some(next_token) = token { + callback_token = next_token; + } else { + break; } } - }); - } - _ => { - return Ok(Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body("Streaming callback must be a function.".into()) - .unwrap()) + Err(e) => { + slog::debug!(logger, "Error happened during streaming: {}", e); + sender.abort(); + break; + } + } } - } + }); } }