Skip to content

Commit

Permalink
Merge branch 'main' into fetch-error-rework
Browse files Browse the repository at this point in the history
  • Loading branch information
crowlKats authored Oct 19, 2024
2 parents 80029fb + d1cd1fa commit 6c6a53d
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 134 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ci.generate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ const Runners = {
macosArm: {
os: "macos",
arch: "aarch64",
runner:
`\${{ github.repository == 'denoland/deno' && 'self-hosted' || '${macosArmRunner}' }}`,
runner: macosArmRunner,
},
windowsX86: {
os: "windows",
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ jobs:
skip: '${{ !contains(github.event.pull_request.labels.*.name, ''ci-full'') && (github.event_name == ''pull_request'') }}'
- os: macos
arch: aarch64
runner: '${{ github.repository == ''denoland/deno'' && ''self-hosted'' || ''macos-14'' }}'
runner: macos-14
job: test
profile: debug
- os: macos
arch: aarch64
runner: '${{ (!contains(github.event.pull_request.labels.*.name, ''ci-full'') && (github.event_name == ''pull_request'')) && ''ubuntu-22.04'' || github.repository == ''denoland/deno'' && ''self-hosted'' || ''macos-14'' }}'
runner: '${{ (!contains(github.event.pull_request.labels.*.name, ''ci-full'') && (github.event_name == ''pull_request'')) && ''ubuntu-22.04'' || ''macos-14'' }}'
job: test
profile: release
skip: '${{ !contains(github.event.pull_request.labels.*.name, ''ci-full'') && (github.event_name == ''pull_request'') }}'
Expand Down
132 changes: 83 additions & 49 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::service::SignallingRc;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
use deno_core::error::AnyError;
use deno_core::external;
use deno_core::futures::future::poll_fn;
use deno_core::futures::TryFutureExt;
Expand Down Expand Up @@ -146,12 +145,32 @@ macro_rules! clone_external {
}};
}

#[derive(Debug, thiserror::Error)]
pub enum HttpNextError {
#[error(transparent)]
Resource(deno_core::error::AnyError),
#[error("{0}")]
Io(#[from] io::Error),
#[error(transparent)]
WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError),
#[error("{0}")]
Hyper(#[from] hyper::Error),
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
Canceled(#[from] deno_core::Canceled),
#[error(transparent)]
HttpPropertyExtractor(deno_core::error::AnyError),
#[error(transparent)]
UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError),
}

#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
external: *const c_void,
) -> Result<ResourceId, AnyError> {
) -> Result<ResourceId, HttpNextError> {
// SAFETY: external is deleted before calling this op.
let http = unsafe { take_external!(external, "op_http_upgrade_raw") };

Expand All @@ -177,7 +196,7 @@ pub fn op_http_upgrade_raw(
upgraded.write_all(&bytes).await?;
break upgraded;
}
Err(err) => return Err(err),
Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)),
}
};

Expand All @@ -193,7 +212,7 @@ pub fn op_http_upgrade_raw(
}
read_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
Ok::<_, HttpNextError>(())
});
spawn(async move {
let mut buf = [0; 1024];
Expand All @@ -204,7 +223,7 @@ pub fn op_http_upgrade_raw(
}
upgraded_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
Ok::<_, HttpNextError>(())
});

Ok(())
Expand All @@ -223,7 +242,7 @@ pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
) -> Result<ResourceId, HttpNextError> {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
Expand Down Expand Up @@ -690,7 +709,7 @@ pub async fn op_http_set_response_body_resource(
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<bool, AnyError> {
) -> Result<bool, HttpNextError> {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
Expand All @@ -705,9 +724,15 @@ pub async fn op_http_set_response_body_resource(
let resource = {
let mut state = state.borrow_mut();
if auto_close {
state.resource_table.take_any(stream_rid)?
state
.resource_table
.take_any(stream_rid)
.map_err(HttpNextError::Resource)?
} else {
state.resource_table.get_any(stream_rid)?
state
.resource_table
.get_any(stream_rid)
.map_err(HttpNextError::Resource)?
}
};

Expand Down Expand Up @@ -814,17 +839,17 @@ async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
) -> Result<(), HttpNextError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
serve_http2_unconditional(io, svc, cancel)
.await
.map_err(|e| e.into())
.map_err(HttpNextError::Hyper)
} else {
serve_http11_unconditional(io, svc, cancel)
.await
.map_err(|e| e.into())
.map_err(HttpNextError::Hyper)
}
}

Expand All @@ -833,7 +858,7 @@ fn serve_https(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
connection_cancel_handle,
Expand All @@ -852,11 +877,11 @@ fn serve_https(
if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() {
serve_http2_unconditional(io, svc, listen_cancel_handle)
.await
.map_err(|e| e.into())
.map_err(HttpNextError::Hyper)
} else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() {
serve_http11_unconditional(io, svc, listen_cancel_handle)
.await
.map_err(|e| e.into())
.map_err(HttpNextError::Hyper)
} else {
serve_http2_autodetect(io, svc, listen_cancel_handle).await
}
Expand All @@ -870,7 +895,7 @@ fn serve_http(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
connection_cancel_handle,
Expand All @@ -891,7 +916,7 @@ fn serve_http_on<HTTP>(
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>>
) -> JoinHandle<Result<(), HttpNextError>>
where
HTTP: HttpPropertyExtractor,
{
Expand Down Expand Up @@ -922,7 +947,7 @@ struct HttpLifetime {
}

struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
Expand Down Expand Up @@ -982,12 +1007,13 @@ impl Drop for HttpJoinHandle {
pub fn op_http_serve<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] listener_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
) -> Result<(ResourceId, &'static str, String), HttpNextError>
where
HTTP: HttpPropertyExtractor,
{
let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)
.map_err(HttpNextError::Resource)?;

let listen_properties = HTTP::listen_properties_from_listener(&listener)?;

Expand All @@ -1002,7 +1028,8 @@ where
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
.try_or_cancel(listen_cancel_clone.clone())
.await?;
.await
.map_err(HttpNextError::HttpPropertyExtractor)?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
Expand All @@ -1011,7 +1038,7 @@ where
);
}
#[allow(unreachable_code)]
Ok::<_, AnyError>(())
Ok::<_, HttpNextError>(())
});

// Set the handle after we start the future
Expand All @@ -1031,25 +1058,25 @@ where
pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] connection_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
) -> Result<(ResourceId, &'static str, String), HttpNextError>
where
HTTP: HttpPropertyExtractor,
{
let connection =
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)
.map_err(HttpNextError::Resource)?;

let listen_properties = HTTP::listen_properties_from_connection(&connection)?;

let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));

let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
connection,
&listen_properties,
resource.lifetime(),
tx,
);
let handle = serve_http_on::<HTTP>(
connection,
&listen_properties,
resource.lifetime(),
tx,
);

// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle)
Expand Down Expand Up @@ -1095,12 +1122,13 @@ pub fn op_http_try_wait(
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<*const c_void, AnyError> {
) -> Result<*const c_void, HttpNextError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
.resource_table
.get::<HttpJoinHandle>(rid)?;
.get::<HttpJoinHandle>(rid)
.map_err(HttpNextError::Resource)?;

let cancel = join_handle.listen_cancel_handle();
let next = async {
Expand All @@ -1127,13 +1155,12 @@ pub async fn op_http_wait(

// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
return Ok(null());
}
if let HttpNextError::Io(err) = &err {
if err.kind() == io::ErrorKind::NotConnected {
return Ok(null());
}
}

return Err(err);
}

Expand All @@ -1146,7 +1173,7 @@ pub fn op_http_cancel(
state: &mut OpState,
#[smi] rid: ResourceId,
graceful: bool,
) -> Result<(), AnyError> {
) -> Result<(), deno_core::error::AnyError> {
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;

if graceful {
Expand All @@ -1166,11 +1193,12 @@ pub async fn op_http_close(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
graceful: bool,
) -> Result<(), AnyError> {
) -> Result<(), HttpNextError> {
let join_handle = state
.borrow_mut()
.resource_table
.take::<HttpJoinHandle>(rid)?;
.take::<HttpJoinHandle>(rid)
.map_err(HttpNextError::Resource)?;

if graceful {
http_general_trace!("graceful shutdown");
Expand Down Expand Up @@ -1216,23 +1244,26 @@ impl UpgradeStream {
}
}

async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
async fn read(
self: Rc<Self>,
buf: &mut [u8],
) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
Ok(Pin::new(&mut *read).read(buf).await?)
Pin::new(&mut *read).read(buf).await
}
.try_or_cancel(cancel_handle)
.await
}

async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
Ok(Pin::new(&mut *write).write(buf).await?)
Pin::new(&mut *write).write(buf).await
}
.try_or_cancel(cancel_handle)
.await
Expand All @@ -1242,7 +1273,7 @@ impl UpgradeStream {
self: Rc<Self>,
buf1: &[u8],
buf2: &[u8],
) -> Result<usize, AnyError> {
) -> Result<usize, std::io::Error> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;

let total = buf1.len() + buf2.len();
Expand Down Expand Up @@ -1295,9 +1326,12 @@ pub async fn op_raw_write_vectored(
#[smi] rid: ResourceId,
#[buffer] buf1: JsBuffer,
#[buffer] buf2: JsBuffer,
) -> Result<usize, AnyError> {
let resource: Rc<UpgradeStream> =
state.borrow().resource_table.get::<UpgradeStream>(rid)?;
) -> Result<usize, HttpNextError> {
let resource: Rc<UpgradeStream> = state
.borrow()
.resource_table
.get::<UpgradeStream>(rid)
.map_err(HttpNextError::Resource)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten)
}
Loading

0 comments on commit 6c6a53d

Please sign in to comment.