From 4cf6fabce20eb3050c5b543d249e931ea3d3cad5 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Sat, 23 Nov 2024 16:26:35 -0800 Subject: [PATCH] quic: update more QUIC implementation Signed-off-by: James M Snell PR-URL: https://github.com/nodejs/node/pull/55986 Reviewed-By: Yagiz Nizipli Reviewed-By: Matteo Collina Reviewed-By: Stephen Belanger --- doc/api/errors.md | 36 + lib/internal/errors.js | 3 + lib/internal/quic/quic.js | 1470 ++++++++++++----- lib/internal/quic/symbols.js | 4 + ...-quic-internal-endpoint-listen-defaults.js | 4 +- .../test-quic-internal-endpoint-options.js | 24 +- ...test-quic-internal-endpoint-stats-state.js | 36 +- 7 files changed, 1088 insertions(+), 489 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index 18240e08c6f141..f6bf2dc4814c88 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2436,6 +2436,18 @@ Accessing `Object.prototype.__proto__` has been forbidden using [`Object.setPrototypeOf`][] should be used to get and set the prototype of an object. + + +### `ERR_QUIC_APPLICATION_ERROR` + + + +> Stability: 1 - Experimental + +A QUIC application error occurred. + ### `ERR_QUIC_CONNECTION_FAILED` @@ -2478,6 +2490,30 @@ added: Opening a QUIC stream failed. + + +### `ERR_QUIC_TRANSPORT_ERROR` + + + +> Stability: 1 - Experimental + +A QUIC transport error occurred. + + + +### `ERR_QUIC_VERSION_NEGOTIATION_ERROR` + + + +> Stability: 1 - Experimental + +A QUIC session failed because version negotiation is required. + ### `ERR_REQUIRE_ASYNC_MODULE` diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 9045046c100308..214bed12e62d19 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1647,9 +1647,12 @@ E('ERR_PARSE_ARGS_UNKNOWN_OPTION', (option, allowPositionals) => { E('ERR_PERFORMANCE_INVALID_TIMESTAMP', '%d is not a valid timestamp', TypeError); E('ERR_PERFORMANCE_MEASURE_INVALID_OPTIONS', '%s', TypeError); +E('ERR_QUIC_APPLICATION_ERROR', 'A QUIC application error occurred. %d [%s]', Error); E('ERR_QUIC_CONNECTION_FAILED', 'QUIC connection failed', Error); E('ERR_QUIC_ENDPOINT_CLOSED', 'QUIC endpoint closed: %s (%d)', Error); E('ERR_QUIC_OPEN_STREAM_FAILED', 'Failed to open QUIC stream', Error); +E('ERR_QUIC_TRANSPORT_ERROR', 'A QUIC transport error occurred. %d [%s]', Error); +E('ERR_QUIC_VERSION_NEGOTIATION_ERROR', 'The QUIC session requires version negotiation', Error); E('ERR_REQUIRE_ASYNC_MODULE', 'require() cannot be used on an ESM ' + 'graph with top-level await. Use import() instead. To see where the' + ' top-level await comes from, use --experimental-print-required-tla.', Error); diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index a4b53e2407072c..a76708a37ec1d2 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -5,10 +5,13 @@ /* c8 ignore start */ const { + ArrayBufferPrototypeTransfer, ArrayIsArray, ArrayPrototypePush, ObjectDefineProperties, + SafeSet, SymbolAsyncDispose, + SymbolIterator, Uint8Array, } = primordials; @@ -60,12 +63,16 @@ const { const { codes: { + ERR_ILLEGAL_CONSTRUCTOR, ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, ERR_INVALID_STATE, + ERR_QUIC_APPLICATION_ERROR, ERR_QUIC_CONNECTION_FAILED, ERR_QUIC_ENDPOINT_CLOSED, ERR_QUIC_OPEN_STREAM_FAILED, + ERR_QUIC_TRANSPORT_ERROR, + ERR_QUIC_VERSION_NEGOTIATION_ERROR, }, } = require('internal/errors'); @@ -98,7 +105,9 @@ const { kHandshake, kHeaders, kOwner, + kRemoveSession, kNewSession, + kRemoveStream, kNewStream, kPathValidation, kReset, @@ -123,6 +132,30 @@ const { QuicStreamState, } = require('internal/quic/state'); +const { assert } = require('internal/assert'); + +const dc = require('diagnostics_channel'); +const onEndpointCreatedChannel = dc.channel('quic.endpoint.created'); +const onEndpointListeningChannel = dc.channel('quic.endpoint.listen'); +const onEndpointClosingChannel = dc.channel('quic.endpoint.closing'); +const onEndpointClosedChannel = dc.channel('quic.endpoint.closed'); +const onEndpointErrorChannel = dc.channel('quic.endpoint.error'); +const onEndpointBusyChangeChannel = dc.channel('quic.endpoint.busy.change'); +const onEndpointClientSessionChannel = dc.channel('quic.session.created.client'); +const onEndpointServerSessionChannel = dc.channel('quic.session.created.server'); +const onSessionOpenStreamChannel = dc.channel('quic.session.open.stream'); +const onSessionReceivedStreamChannel = dc.channel('quic.session.received.stream'); +const onSessionSendDatagramChannel = dc.channel('quic.session.send.datagram'); +const onSessionUpdateKeyChannel = dc.channel('quic.session.update.key'); +const onSessionClosingChannel = dc.channel('quic.session.closing'); +const onSessionClosedChannel = dc.channel('quic.session.closed'); +const onSessionReceiveDatagramChannel = dc.channel('quic.session.receive.datagram'); +const onSessionReceiveDatagramStatusChannel = dc.channel('quic.session.receive.datagram.status'); +const onSessionPathValidationChannel = dc.channel('quic.session.path.validation'); +const onSessionTicketChannel = dc.channel('quic.session.ticket'); +const onSessionVersionNegotiationChannel = dc.channel('quic.session.version.negotiation'); +const onSessionHandshakeChannel = dc.channel('quic.session.handshake'); + /** * @typedef {import('../socketaddress.js').SocketAddress} SocketAddress * @typedef {import('../crypto/keys.js').KeyObject} KeyObject @@ -144,8 +177,8 @@ const { * @property {bigint|number} [handshakeTimeout] The handshake timeout * @property {bigint|number} [maxStreamWindow] The maximum stream window * @property {bigint|number} [maxWindow] The maximum window - * @property {number} [rxDiagnosticLoss] The receive diagnostic loss (range 0.0-1.0) - * @property {number} [txDiagnosticLoss] The transmit diagnostic loss (range 0.0-1.0) + * @property {number} [rxDiagnosticLoss] The receive diagnostic loss probability (range 0.0-1.0) + * @property {number} [txDiagnosticLoss] The transmit diagnostic loss probability (range 0.0-1.0) * @property {number} [udpReceiveBufferSize] The UDP receive buffer size * @property {number} [udpSendBufferSize] The UDP send buffer size * @property {number} [udpTTL] The UDP TTL @@ -231,64 +264,65 @@ const { /** * Called when the Endpoint receives a new server-side Session. * @callback OnSessionCallback + * @this {QuicEndpoint} * @param {QuicSession} session - * @param {QuicEndpoint} endpoint * @returns {void} */ /** * @callback OnStreamCallback + * @this {QuicSession} * @param {QuicStream} stream - * @param {QuicSession} session * @returns {void} */ /** * @callback OnDatagramCallback + * @this {QuicSession} * @param {Uint8Array} datagram - * @param {QuicSession} session - * @param {boolean} early + * @param {boolean} early A datagram is early if it was received before the TLS handshake completed * @returns {void} */ /** * @callback OnDatagramStatusCallback + * @this {QuicSession} * @param {bigint} id * @param {'lost'|'acknowledged'} status - * @param {QuicSession} session * @returns {void} */ /** * @callback OnPathValidationCallback + * @this {QuicSession} * @param {'aborted'|'failure'|'success'} result * @param {SocketAddress} newLocalAddress * @param {SocketAddress} newRemoteAddress * @param {SocketAddress} oldLocalAddress * @param {SocketAddress} oldRemoteAddress * @param {boolean} preferredAddress - * @param {QuicSession} session * @returns {void} */ /** * @callback OnSessionTicketCallback + * @this {QuicSession} * @param {object} ticket - * @param {QuicSession} session * @returns {void} */ /** * @callback OnVersionNegotiationCallback + * @this {QuicSession} * @param {number} version * @param {number[]} requestedVersions * @param {number[]} supportedVersions - * @param {QuicSession} session * @returns {void} */ /** * @callback OnHandshakeCallback + * @this {QuicSession} * @param {string} sni * @param {string} alpn * @param {string} cipher @@ -296,7 +330,6 @@ const { * @param {string} validationErrorReason * @param {number} validationErrorCode * @param {boolean} earlyDataAccepted - * @param {QuicSession} session * @returns {void} */ @@ -327,6 +360,14 @@ const { * @returns {void} */ +/** + * @typedef {object} StreamCallbackConfiguration + * @property {OnBlockedCallback} [onblocked] The blocked callback + * @property {OnStreamErrorCallback} [onreset] The reset callback + * @property {OnHeadersCallback} [onheaders] The headers callback + * @property {OnTrailersCallback} [ontrailers] The trailers callback + */ + /** * Provdes the callback configuration for Sessions. * @typedef {object} SessionCallbackConfiguration @@ -340,38 +381,126 @@ const { */ /** - * @typedef {object} StreamCallbackConfiguration - * @property {OnStreamErrorCallback} onerror The error callback + * @typedef {object} ProcessedSessionCallbackConfiguration + * @property {OnStreamCallback} onstream The stream callback + * @property {OnDatagramCallback} [ondatagram] The datagram callback + * @property {OnDatagramStatusCallback} [ondatagramstatus] The datagram status callback + * @property {OnPathValidationCallback} [onpathvalidation] The path validation callback + * @property {OnSessionTicketCallback} [onsessionticket] The session ticket callback + * @property {OnVersionNegotiationCallback} [onversionnegotiation] The version negotation callback + * @property {OnHandshakeCallback} [onhandshake] The handshake callback + * @property {StreamCallbackConfiguration} stream The processed stream callbacks + */ + +/** + * Provides the callback configuration for the Endpoint. + * @typedef {object} EndpointCallbackConfiguration + * @property {OnSessionCallback} onsession The session callback + * @property {OnStreamCallback} onstream The stream callback + * @property {OnDatagramCallback} [ondatagram] The datagram callback + * @property {OnDatagramStatusCallback} [ondatagramstatus] The datagram status callback + * @property {OnPathValidationCallback} [onpathvalidation] The path validation callback + * @property {OnSessionTicketCallback} [onsessionticket] The session ticket callback + * @property {OnVersionNegotiationCallback} [onversionnegotiation] The version negotiation callback + * @property {OnHandshakeCallback} [onhandshake] The handshake callback * @property {OnBlockedCallback} [onblocked] The blocked callback * @property {OnStreamErrorCallback} [onreset] The reset callback * @property {OnHeadersCallback} [onheaders] The headers callback * @property {OnTrailersCallback} [ontrailers] The trailers callback + * @property {SocketAddress} [address] The local address to bind to + * @property {bigint|number} [retryTokenExpiration] The retry token expiration + * @property {bigint|number} [tokenExpiration] The token expiration + * @property {bigint|number} [maxConnectionsPerHost] The maximum number of connections per host + * @property {bigint|number} [maxConnectionsTotal] The maximum number of total connections + * @property {bigint|number} [maxStatelessResetsPerHost] The maximum number of stateless resets per host + * @property {bigint|number} [addressLRUSize] The size of the address LRU cache + * @property {bigint|number} [maxRetries] The maximum number of retries + * @property {bigint|number} [maxPayloadSize] The maximum payload size + * @property {bigint|number} [unacknowledgedPacketThreshold] The unacknowledged packet threshold + * @property {bigint|number} [handshakeTimeout] The handshake timeout + * @property {bigint|number} [maxStreamWindow] The maximum stream window + * @property {bigint|number} [maxWindow] The maximum window + * @property {number} [rxDiagnosticLoss] The receive diagnostic loss probability (range 0.0-1.0) + * @property {number} [txDiagnosticLoss] The transmit diagnostic loss probability (range 0.0-1.0) + * @property {number} [udpReceiveBufferSize] The UDP receive buffer size + * @property {number} [udpSendBufferSize] The UDP send buffer size + * @property {number} [udpTTL] The UDP TTL + * @property {boolean} [noUdpPayloadSizeShaping] Disable UDP payload size shaping + * @property {boolean} [validateAddress] Validate the address + * @property {boolean} [disableActiveMigration] Disable active migration + * @property {boolean} [ipv6Only] Use IPv6 only + * @property {'reno'|'cubic'|'bbr'|number} [cc] The congestion control algorithm + * @property {ArrayBufferView} [resetTokenSecret] The reset token secret + * @property {ArrayBufferView} [tokenSecret] The token secret */ /** - * Provides the callback configuration for the Endpoint. - * @typedef {object} EndpointCallbackConfiguration + * @typedef {object} ProcessedEndpointCallbackConfiguration * @property {OnSessionCallback} onsession The session callback - * @property {SessionCallbackConfiguration} session The session callback configuration - * @property {StreamCallbackConfiguration} stream The stream callback configuration + * @property {SessionCallbackConfiguration} session The processesd session callbacks */ setCallbacks({ + // QuicEndpoint callbacks + + /** + * Called when the QuicEndpoint C++ handle has closed and we need to finish + * cleaning up the JS side. + * @param {number} context Identifies the reason the endpoint was closed. + * @param {number} status If context indicates an error, provides the error code. + */ onEndpointClose(context, status) { this[kOwner][kFinishClose](context, status); }, + /** + * Called when the QuicEndpoint C++ handle receives a new server-side session + * @param {*} session The QuicSession C++ handle + */ onSessionNew(session) { this[kOwner][kNewSession](session); }, + + // QuicSession callbacks + + /** + * Called when the underlying session C++ handle is closed either normally + * or with an error. + * @param {number} errorType + * @param {number} code + * @param {string} [reason] + */ onSessionClose(errorType, code, reason) { this[kOwner][kFinishClose](errorType, code, reason); }, + + /** + * Called when a datagram is received on this session. + * @param {Uint8Array} uint8Array + * @param {boolean} early + */ onSessionDatagram(uint8Array, early) { this[kOwner][kDatagram](uint8Array, early); }, + + /** + * Called when the status of a datagram is received. + * @param {bigint} id + * @param {'lost' | 'acknowledged'} status + */ onSessionDatagramStatus(id, status) { this[kOwner][kDatagramStatus](id, status); }, + + /** + * Called when the session handshake completes. + * @param {string} sni + * @param {string} alpn + * @param {string} cipher + * @param {string} cipherVersion + * @param {string} validationErrorReason + * @param {number} validationErrorCode + * @param {boolean} earlyDataAccepted + */ onSessionHandshake(sni, alpn, cipher, cipherVersion, validationErrorReason, validationErrorCode, @@ -380,12 +509,37 @@ setCallbacks({ validationErrorReason, validationErrorCode, earlyDataAccepted); }, - onSessionPathValidation(...args) { - this[kOwner][kPathValidation](...args); + + /** + * Called when the session path validation completes. + * @param {'aborted'|'failure'|'success'} result + * @param {SocketAddress} newLocalAddress + * @param {SocketAddress} newRemoteAddress + * @param {SocketAddress} oldLocalAddress + * @param {SocketAddress} oldRemoteAddress + * @param {boolean} preferredAddress + */ + onSessionPathValidation(result, newLocalAddress, newRemoteAddress, + oldLocalAddress, oldRemoteAddress, preferredAddress) { + this[kOwner][kPathValidation](result, newLocalAddress, newRemoteAddress, + oldLocalAddress, oldRemoteAddress, + preferredAddress); }, + + /** + * Called when the session generates a new TLS session ticket + * @param {object} ticket An opaque session ticket + */ onSessionTicket(ticket) { this[kOwner][kSessionTicket](ticket); }, + + /** + * Called when the session receives a session version negotiation request + * @param {*} version + * @param {*} requestedVersions + * @param {*} supportedVersions + */ onSessionVersionNegotiation(version, requestedVersions, supportedVersions) { @@ -393,148 +547,45 @@ setCallbacks({ // Note that immediately following a version negotiation event, the // session will be destroyed. }, + + /** + * Called when a new stream has been received for the session + * @param {object} stream The QuicStream C++ handle + * @param {number} direction The stream direction (0 == bidi, 1 == uni) + */ onStreamCreated(stream, direction) { const session = this[kOwner]; - // The event is ignored if the session has been destroyed. + // The event is ignored and the stream destroyed if the session has been destroyed. if (session.destroyed) { stream.destroy(); return; }; session[kNewStream](stream, direction); }, + + // QuicStream callbacks onStreamBlocked() { + // Called when the stream C++ handle has been blocked by flow control. this[kOwner][kBlocked](); }, onStreamClose(error) { + // Called when the stream C++ handle has been closed. this[kOwner][kError](error); }, onStreamReset(error) { + // Called when the stream C++ handle has received a stream reset. this[kOwner][kReset](error); }, onStreamHeaders(headers, kind) { + // Called when the stream C++ handle has received a full block of headers. this[kOwner][kHeaders](headers, kind); }, onStreamTrailers() { + // Called when the stream C++ handle is ready to receive trailing headers. this[kOwner][kTrailers](); }, }); -/** - * @param {'use'|'ignore'|'default'} policy - * @returns {number} - */ -function getPreferredAddressPolicy(policy = 'default') { - switch (policy) { - case 'use': return PREFERRED_ADDRESS_USE; - case 'ignore': return PREFERRED_ADDRESS_IGNORE; - case 'default': return DEFAULT_PREFERRED_ADDRESS_POLICY; - } - throw new ERR_INVALID_ARG_VALUE('options.preferredAddressPolicy', policy); -} - -/** - * @param {TlsOptions} tls - */ -function processTlsOptions(tls) { - validateObject(tls, 'options.tls'); - const { - sni, - alpn, - ciphers = DEFAULT_CIPHERS, - groups = DEFAULT_GROUPS, - keylog = false, - verifyClient = false, - tlsTrace = false, - verifyPrivateKey = false, - keys, - certs, - ca, - crl, - } = tls; - - if (sni !== undefined) { - validateString(sni, 'options.tls.sni'); - } - if (alpn !== undefined) { - validateString(alpn, 'options.tls.alpn'); - } - if (ciphers !== undefined) { - validateString(ciphers, 'options.tls.ciphers'); - } - if (groups !== undefined) { - validateString(groups, 'options.tls.groups'); - } - validateBoolean(keylog, 'options.tls.keylog'); - validateBoolean(verifyClient, 'options.tls.verifyClient'); - validateBoolean(tlsTrace, 'options.tls.tlsTrace'); - validateBoolean(verifyPrivateKey, 'options.tls.verifyPrivateKey'); - - if (certs !== undefined) { - const certInputs = ArrayIsArray(certs) ? certs : [certs]; - for (const cert of certInputs) { - if (!isArrayBufferView(cert) && !isArrayBuffer(cert)) { - throw new ERR_INVALID_ARG_TYPE('options.tls.certs', - ['ArrayBufferView', 'ArrayBuffer'], cert); - } - } - } - - if (ca !== undefined) { - const caInputs = ArrayIsArray(ca) ? ca : [ca]; - for (const caCert of caInputs) { - if (!isArrayBufferView(caCert) && !isArrayBuffer(caCert)) { - throw new ERR_INVALID_ARG_TYPE('options.tls.ca', - ['ArrayBufferView', 'ArrayBuffer'], caCert); - } - } - } - - if (crl !== undefined) { - const crlInputs = ArrayIsArray(crl) ? crl : [crl]; - for (const crlCert of crlInputs) { - if (!isArrayBufferView(crlCert) && !isArrayBuffer(crlCert)) { - throw new ERR_INVALID_ARG_TYPE('options.tls.crl', - ['ArrayBufferView', 'ArrayBuffer'], crlCert); - } - } - } - - const keyHandles = []; - if (keys !== undefined) { - const keyInputs = ArrayIsArray(keys) ? keys : [keys]; - for (const key of keyInputs) { - if (isKeyObject(key)) { - if (key.type !== 'private') { - throw new ERR_INVALID_ARG_VALUE('options.tls.keys', key, 'must be a private key'); - } - ArrayPrototypePush(keyHandles, key[kKeyObjectHandle]); - } else if (isCryptoKey(key)) { - if (key.type !== 'private') { - throw new ERR_INVALID_ARG_VALUE('options.tls.keys', key, 'must be a private key'); - } - ArrayPrototypePush(keyHandles, key[kKeyObjectInner][kKeyObjectHandle]); - } else { - throw new ERR_INVALID_ARG_TYPE('options.tls.keys', ['KeyObject', 'CryptoKey'], key); - } - } - } - - return { - sni, - alpn, - ciphers, - groups, - keylog, - verifyClient, - tlsTrace, - verifyPrivateKey, - keys: keyHandles, - certs, - ca, - crl, - }; -} - class QuicStream { /** @type {object} */ #handle; @@ -546,8 +597,6 @@ class QuicStream { #state; /** @type {number} */ #direction; - /** @type {OnStreamErrorCallback} */ - #onerror; /** @type {OnBlockedCallback|undefined} */ #onblocked; /** @type {OnStreamErrorCallback|undefined} */ @@ -558,47 +607,48 @@ class QuicStream { #ontrailers; /** + * @param {symbol} privateSymbol * @param {StreamCallbackConfiguration} config * @param {object} handle * @param {QuicSession} session */ - constructor(config, handle, session, direction) { - validateObject(config, 'config'); - this.#stats = new QuicStreamStats(kPrivateConstructor, handle.stats); - this.#state = new QuicStreamState(kPrivateConstructor, handle.stats); + constructor(privateSymbol, config, handle, session, direction) { + if (privateSymbol !== kPrivateConstructor) { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + const { onblocked, - onerror, onreset, onheaders, ontrailers, } = config; + if (onblocked !== undefined) { - validateFunction(onblocked, 'config.onblocked'); - this.#state.wantsBlock = true; - this.#onblocked = onblocked; + this.#onblocked = onblocked.bind(this); } if (onreset !== undefined) { - validateFunction(onreset, 'config.onreset'); - this.#state.wantsReset = true; - this.#onreset = onreset; + this.#onreset = onreset.bind(this); } if (onheaders !== undefined) { - validateFunction(onheaders, 'config.onheaders'); - this.#state.wantsHeaders = true; - this.#onheaders = onheaders; + this.#onheaders = onheaders.bind(this); } if (ontrailers !== undefined) { - validateFunction(ontrailers, 'config.ontrailers'); - this.#state.wantsTrailers = true; - this.#ontrailers = ontrailers; + this.#ontrailers = ontrailers.bind(this); } - validateFunction(onerror, 'config.onerror'); - this.#onerror = onerror; this.#handle = handle; + this.#handle[kOwner] = true; + this.#session = session; this.#direction = direction; - this.#handle[kOwner] = true; + + this.#stats = new QuicStreamStats(kPrivateConstructor, this.#handle.stats); + + this.#state = new QuicStreamState(kPrivateConstructor, this.#handle.stats); + this.#state.wantsBlock = !!this.#onblocked; + this.#state.wantsReset = !!this.#onreset; + this.#state.wantsHeaders = !!this.#onheaders; + this.#state.wantsTrailers = !!this.#ontrailers; } /** @type {QuicStreamStats} */ @@ -618,23 +668,55 @@ class QuicStream { return this.#direction === 0 ? 'bidi' : 'uni'; } + /** @returns {boolean} */ + get destroyed() { + return this.#handle === undefined; + } + + destroy(error) { + if (this.destroyed) return; + // TODO(@jasnell): pass an error code + this.#stats[kFinishClose](); + this.#state[kFinishClose](); + this.#onblocked = undefined; + this.#onreset = undefined; + this.#onheaders = undefined; + this.#ontrailers = undefined; + this.#session[kRemoveStream](this); + this.#session = undefined; + this.#handle.destroy(); + this.#handle = undefined; + } + [kBlocked]() { + // The blocked event should only be called if the stream was created with + // an onblocked callback. The callback should always exist here. + assert(this.#onblocked, 'Unexpected stream blocked event'); this.#onblocked(this); } [kError](error) { - this.#onerror(error, this); + this.destroy(error); } [kReset](error) { + // The reset event should only be called if the stream was created with + // an onreset callback. The callback should always exist here. + assert(this.#onreset, 'Unexpected stream reset event'); this.#onreset(error, this); } [kHeaders](headers, kind) { + // The headers event should only be called if the stream was created with + // an onheaders callback. The callback should always exist here. + assert(this.#onheaders, 'Unexpected stream headers event'); this.#onheaders(headers, kind, this); } [kTrailers]() { + // The trailers event should only be called if the stream was created with + // an ontrailers callback. The callback should always exist here. + assert(this.#ontrailers, 'Unexpected stream trailers event'); this.#ontrailers(this); } @@ -665,15 +747,15 @@ class QuicSession { /** @type {object|undefined} */ #handle; /** @type {PromiseWithResolvers} */ - #pendingClose; + #pendingClose = Promise.withResolvers(); // eslint-disable-line node-core/prefer-primordials /** @type {SocketAddress|undefined} */ #remoteAddress = undefined; /** @type {QuicSessionState} */ #state; /** @type {QuicSessionStats} */ #stats; - /** @type {QuicStream[]} */ - #streams = []; + /** @type {Set} */ + #streams = new SafeSet(); /** @type {OnStreamCallback} */ #onstream; /** @type {OnDatagramCallback|undefined} */ @@ -692,15 +774,17 @@ class QuicSession { #streamConfig; /** - * @param {SessionCallbackConfiguration} config - * @param {StreamCallbackConfiguration} streamConfig - * @param {object} [handle] - * @param {QuicEndpoint} [endpoint] + * @param {symbol} privateSymbol + * @param {ProcessedSessionCallbackConfiguration} config + * @param {object} handle + * @param {QuicEndpoint} endpoint */ - constructor(config, streamConfig, handle, endpoint) { - validateObject(config, 'config'); - this.#stats = new QuicSessionStats(kPrivateConstructor, handle.stats); - this.#state = new QuicSessionState(kPrivateConstructor, handle.state); + constructor(privateSymbol, config, handle, endpoint) { + // Instances of QuicSession can only be created internally. + if (privateSymbol !== kPrivateConstructor) { + throw new ERR_ILLEGAL_CONSTRUCTOR(); + } + // The config should have already been validated by the QuicEndpoing const { ondatagram, ondatagramstatus, @@ -709,37 +793,47 @@ class QuicSession { onsessionticket, onstream, onversionnegotiation, + stream, } = config; + if (ondatagram !== undefined) { - validateFunction(ondatagram, 'config.ondatagram'); - validateFunction(ondatagramstatus, 'config.ondatagramstatus'); - this.#state.hasDatagramListener = true; - this.#ondatagram = ondatagram; - this.#ondatagramstatus = ondatagramstatus; + this.#ondatagram = ondatagram.bind(this); + } + if (ondatagramstatus !== undefined) { + this.#ondatagramstatus = ondatagramstatus.bind(this); } - validateFunction(onhandshake, 'config.onhandshake'); if (onpathvalidation !== undefined) { - validateFunction(onpathvalidation, 'config.onpathvalidation'); - this.#state.hasPathValidationListener = true; - this.#onpathvalidation = onpathvalidation; + this.#onpathvalidation = onpathvalidation.bind(this); } if (onsessionticket !== undefined) { - validateFunction(onsessionticket, 'config.onsessionticket'); - this.#state.hasSessionTicketListener = true; - this.#onsessionticket = onsessionticket; + this.#onsessionticket = onsessionticket.bind(this); } - validateFunction(onstream, 'config.onstream'); if (onversionnegotiation !== undefined) { - validateFunction(onversionnegotiation, 'config.onversionnegotiation'); - this.#state.hasVersionNegotiationListener = true; - this.#onversionnegotiation = onversionnegotiation; + this.#onversionnegotiation = onversionnegotiation.bind(this); + } + if (onhandshake !== undefined) { + this.#onhandshake = onhandshake.bind(this); + } + + // It is ok for onstream to be undefined if the session is not expecting + // or wanting to receive incoming streams. If a stream is received and + // no onstream callback is specified, a warning will be emitted and the + // stream will just be immediately destroyed. + if (onstream !== undefined) { + this.#onstream = onstream.bind(this); } - this.#onhandshake = onhandshake; - this.#onstream = onstream; - this.#handle = handle; this.#endpoint = endpoint; - this.#pendingClose = Promise.withResolvers(); // eslint-disable-line node-core/prefer-primordials + this.#streamConfig = stream; + + this.#handle = handle; this.#handle[kOwner] = this; + this.#stats = new QuicSessionStats(kPrivateConstructor, handle.stats); + + this.#state = new QuicSessionState(kPrivateConstructor, handle.state); + this.#state.hasDatagramListener = !!ondatagram; + this.#state.hasPathValidationListener = !!onpathvalidation; + this.#state.hasSessionTicketListener = !!onsessionticket; + this.#state.hasVersionNegotiationListener = !!onversionnegotiation; } /** @type {boolean} */ @@ -756,9 +850,12 @@ class QuicSession { /** @type {QuicEndpoint} */ get endpoint() { return this.#endpoint; } - /** @type {Path} */ + /** + * The path is the local and remote addresses of the session. + * @type {Path} + */ get path() { - if (this.#isClosedOrClosing) return undefined; + if (this.destroyed) return undefined; if (this.#remoteAddress === undefined) { const addr = this.#handle.getRemoteAddress(); if (addr !== undefined) { @@ -785,8 +882,16 @@ class QuicSession { if (handle === undefined) { throw new ERR_QUIC_OPEN_STREAM_FAILED(); } - const stream = new QuicStream(this.#streamConfig, handle, this, 0); - ArrayPrototypePush(this.#streams, stream); + const stream = new QuicStream(kPrivateConstructor, this.#streamConfig, handle, + this, 0 /* Bidirectional */); + this.#streams.add(stream); + + if (onSessionOpenStreamChannel.hasSubscribers) { + onSessionOpenStreamChannel.publish({ + stream, + session: this, + }); + } return stream; } @@ -804,11 +909,60 @@ class QuicSession { if (handle === undefined) { throw new ERR_QUIC_OPEN_STREAM_FAILED(); } - const stream = new QuicStream(this.#streamConfig, handle, this, 1); - ArrayPrototypePush(this.#streams, stream); + const stream = new QuicStream(kPrivateConstructor, this.#streamConfig, handle, + this, 1 /* Unidirectional */); + this.#streams.add(stream); + + if (onSessionOpenStreamChannel.hasSubscribers) { + onSessionOpenStreamChannel.publish({ + stream, + session: this, + }); + } + return stream; } + /** + * Send a datagram. The id of the sent datagram will be returned. The status + * of the sent datagram will be reported via the datagram-status event if + * possible. + * + * If a string is given it will be encoded as UTF-8. + * + * If an ArrayBufferView is given, the view will be copied. + * @param {ArrayBufferView|string} datagram The datagram payload + * @returns {bigint} The datagram ID + */ + sendDatagram(datagram) { + if (this.#isClosedOrClosing) { + throw new ERR_INVALID_STATE('Session is closed'); + } + if (typeof datagram === 'string') { + datagram = Buffer.from(datagram, 'utf8'); + } else { + if (!isArrayBufferView(datagram)) { + throw new ERR_INVALID_ARG_TYPE('datagram', + ['ArrayBufferView', 'string'], + datagram); + } + datagram = new Uint8Array(ArrayBufferPrototypeTransfer(datagram.buffer), + datagram.byteOffset, + datagram.byteLength); + } + const id = this.#handle.sendDatagram(datagram); + + if (onSessionSendDatagramChannel.hasSubscribers) { + onSessionSendDatagramChannel.publish({ + id, + length: datagram.byteLength, + session: this, + }); + } + + return id; + } + /** * Initiate a key update. */ @@ -817,6 +971,11 @@ class QuicSession { throw new ERR_INVALID_STATE('Session is closed'); } this.#handle.updateKey(); + if (onSessionUpdateKeyChannel.hasSubscribers) { + onSessionUpdateKeyChannel.publish({ + session: this, + }); + } } /** @@ -833,6 +992,11 @@ class QuicSession { if (!this.#isClosedOrClosing) { this.#isPendingClose = true; this.#handle?.gracefulClose(); + if (onSessionClosingChannel.hasSubscribers) { + onSessionClosingChannel.publish({ + session: this, + }); + } } return this.closed; } @@ -854,39 +1018,98 @@ class QuicSession { * the closed promise will be rejected with that error. If no error is given, * the closed promise will be resolved. * @param {any} error + * @return {Promise} Returns this.closed */ destroy(error) { - // TODO(@jasnell): Implement. + if (this.destroyed) return; + // First, forcefully and immediately destroy all open streams, if any. + for (const stream of this.#streams) { + stream.destroy(error); + } + // The streams should remove themselves when they are destroyed but let's + // be doubly sure. + if (this.#streams.size) { + process.emitWarning( + `The session is destroyed with ${this.#streams.size} active streams. ` + + 'This should not happen and indicates a bug in Node.js. Please open an ' + + 'issue in the Node.js GitHub repository at https://github.com/nodejs/node ' + + 'to report the problem.', + ); + } + this.#streams.clear(); + + // Remove this session immediately from the endpoint + this.#endpoint[kRemoveSession](this); + this.#endpoint = undefined; + this.#isPendingClose = false; + + if (error) { + // If the session is still waiting to be closed, and error + // is specified, reject the closed promise. + this.#pendingClose.reject?.(error); + } else { + this.#pendingClose.resolve?.(); + } + this.#pendingClose.reject = undefined; + this.#pendingClose.resolve = undefined; + + this.#remoteAddress = undefined; + this.#state[kFinishClose](); + this.#stats[kFinishClose](); + + this.#onstream = undefined; + this.#ondatagram = undefined; + this.#ondatagramstatus = undefined; + this.#onpathvalidation = undefined; + this.#onsessionticket = undefined; + this.#onversionnegotiation = undefined; + this.#onhandshake = undefined; + this.#streamConfig = undefined; + + // Destroy the underlying C++ handle + this.#handle.destroy(); + this.#handle = undefined; + + if (onSessionClosedChannel.hasSubscribers) { + onSessionClosedChannel.publish({ + session: this, + }); + } + + return this.closed; } /** - * Send a datagram. The id of the sent datagram will be returned. The status - * of the sent datagram will be reported via the datagram-status event if - * possible. - * - * If a string is given it will be encoded as UTF-8. - * - * If an ArrayBufferView is given, the view will be copied. - * @param {ArrayBufferView|string} datagram The datagram payload - * @returns {bigint} The datagram ID + * @param {number} errorType + * @param {number} code + * @param {string} [reason] */ - sendDatagram(datagram) { - if (this.#isClosedOrClosing) { - throw new ERR_INVALID_STATE('Session is closed'); + [kFinishClose](errorType, code, reason) { + // If code is zero, then we closed without an error. Yay! We can destroy + // safely without specifying an error. + if (code === 0) { + this.destroy(); + return; } - if (typeof datagram === 'string') { - datagram = Buffer.from(datagram, 'utf8'); - } else { - if (!isArrayBufferView(datagram)) { - throw new ERR_INVALID_ARG_TYPE('datagram', - ['ArrayBufferView', 'string'], - datagram); + + // Otherwise, errorType indicates the type of error that occurred, code indicates + // the specific error, and reason is an optional string describing the error. + switch (errorType) { + case 0: /* Transport Error */ + this.destroy(new ERR_QUIC_TRANSPORT_ERROR(code, reason)); + break; + case 1: /* Application Error */ + this.destroy(new ERR_QUIC_APPLICATION_ERROR(code, reason)); + break; + case 2: /* Version Negotiation Error */ + this.destroy(new ERR_QUIC_VERSION_NEGOTIATION_ERROR()); + break; + case 3: /* Idle close */ { + // An idle close is not really an error. We can just destroy. + this.destroy(); + break; } - datagram = new Uint8Array(datagram.buffer.transfer(), - datagram.byteOffset, - datagram.byteLength); } - return this.#handle.sendDatagram(datagram); } /** @@ -894,8 +1117,19 @@ class QuicSession { * @param {boolean} early */ [kDatagram](u8, early) { + // The datagram event should only be called if the session was created with + // an ondatagram callback. The callback should always exist here. + assert(this.#ondatagram, 'Unexpected datagram event'); if (this.destroyed) return; - this.#ondatagram(u8, this, early); + this.#ondatagram(u8, early); + + if (onSessionReceiveDatagramChannel.hasSubscribers) { + onSessionReceiveDatagramChannel.publish({ + length: u8.byteLength, + early, + session: this, + }); + } } /** @@ -904,7 +1138,17 @@ class QuicSession { */ [kDatagramStatus](id, status) { if (this.destroyed) return; - this.#ondatagramstatus(id, status, this); + // The ondatagramstatus callback may not have been specified. That's ok. + // We'll just ignore the event in that case. + this.#ondatagramstatus?.(id, status); + + if (onSessionReceiveDatagramStatusChannel.hasSubscribers) { + onSessionReceiveDatagramStatusChannel.publish({ + id, + status, + session: this, + }); + } } /** @@ -917,18 +1161,41 @@ class QuicSession { */ [kPathValidation](result, newLocalAddress, newRemoteAddress, oldLocalAddress, oldRemoteAddress, preferredAddress) { + // The path validation event should only be called if the session was created + // with an onpathvalidation callback. The callback should always exist here. + assert(this.#onpathvalidation, 'Unexpected path validation event'); if (this.destroyed) return; this.#onpathvalidation(result, newLocalAddress, newRemoteAddress, - oldLocalAddress, oldRemoteAddress, preferredAddress, - this); + oldLocalAddress, oldRemoteAddress, preferredAddress); + + if (onSessionPathValidationChannel.hasSubscribers) { + onSessionPathValidationChannel.publish({ + result, + newLocalAddress, + newRemoteAddress, + oldLocalAddress, + oldRemoteAddress, + preferredAddress, + session: this, + }); + } } /** * @param {object} ticket */ [kSessionTicket](ticket) { + // The session ticket event should only be called if the session was created + // with an onsessionticket callback. The callback should always exist here. + assert(this.#onsessionticket, 'Unexpected session ticket event'); if (this.destroyed) return; - this.#onsessionticket(ticket, this); + this.#onsessionticket(ticket); + if (onSessionTicketChannel.hasSubscribers) { + onSessionTicketChannel.publish({ + ticket, + session: this, + }); + } } /** @@ -937,8 +1204,21 @@ class QuicSession { * @param {number[]} supportedVersions */ [kVersionNegotiation](version, requestedVersions, supportedVersions) { + // The version negotiation event should only be called if the session was + // created with an onversionnegotiation callback. The callback should always + // exist here. if (this.destroyed) return; - this.#onversionnegotiation(version, requestedVersions, supportedVersions, this); + this.#onversionnegotiation(version, requestedVersions, supportedVersions); + this.destroy(new ERR_QUIC_VERSION_NEGOTIATION_ERROR()); + + if (onSessionVersionNegotiationChannel.hasSubscribers) { + onSessionVersionNegotiationChannel.publish({ + version, + requestedVersions, + supportedVersions, + session: this, + }); + } } /** @@ -953,8 +1233,23 @@ class QuicSession { [kHandshake](sni, alpn, cipher, cipherVersion, validationErrorReason, validationErrorCode, earlyDataAccepted) { if (this.destroyed) return; - this.#onhandshake(sni, alpn, cipher, cipherVersion, validationErrorReason, - validationErrorCode, earlyDataAccepted, this); + // The onhandshake callback may not have been specified. That's ok. + // We'll just ignore the event in that case. + this.#onhandshake?.(sni, alpn, cipher, cipherVersion, validationErrorReason, + validationErrorCode, earlyDataAccepted); + + if (onSessionHandshakeChannel.hasSubscribers) { + onSessionHandshakeChannel.publish({ + sni, + alpn, + cipher, + cipherVersion, + validationErrorReason, + validationErrorCode, + earlyDataAccepted, + session: this, + }); + } } /** @@ -962,9 +1257,30 @@ class QuicSession { * @param {number} direction */ [kNewStream](handle, direction) { - const stream = new QuicStream(this.#streamConfig, handle, this, direction); - ArrayPrototypePush(this.#streams, stream); - this.#onstream(stream, this); + const stream = new QuicStream(kPrivateConstructor, this.#streamConfig, handle, + this, direction); + + // A new stream was received. If we don't have an onstream callback, then + // there's nothing we can do about it. Destroy the stream in this case. + if (this.#onstream === undefined) { + process.emitWarning('A new stream was received but no onstream callback was provided'); + stream.destroy(); + return; + } + this.#streams.add(stream); + + this.#onstream(stream); + + if (onSessionReceivedStreamChannel.hasSubscribers) { + onSessionReceivedStreamChannel.publish({ + stream, + session: this, + }); + } + } + + [kRemoveStream](stream) { + this.#streams.delete(stream); } [kInspect](depth, options) { @@ -988,95 +1304,189 @@ class QuicSession { }, opts)}`; } - [kFinishClose](errorType, code, reason) { - // TODO(@jasnell): Finish the implementation - } - async [SymbolAsyncDispose]() { await this.close(); } } -function validateStreamConfig(config, name = 'config') { - validateObject(config, name); - if (config.onerror !== undefined) - validateFunction(config.onerror, `${name}.onerror`); - if (config.onblocked !== undefined) - validateFunction(config.onblocked, `${name}.onblocked`); - if (config.onreset !== undefined) - validateFunction(config.onreset, `${name}.onreset`); - if (config.onheaders !== undefined) - validateFunction(config.onheaders, `${name}.onheaders`); - if (config.ontrailers !== undefined) - validateFunction(config.ontrailers, `${name}.ontrailers`); - return config; -} - -function validateSessionConfig(config, name = 'config') { - validateObject(config, name); - if (config.onstream !== undefined) - validateFunction(config.onstream, `${name}.onstream`); - if (config.ondatagram !== undefined) - validateFunction(config.ondatagram, `${name}.ondatagram`); - if (config.ondatagramstatus !== undefined) - validateFunction(config.ondatagramstatus, `${name}.ondatagramstatus`); - if (config.onpathvalidation !== undefined) - validateFunction(config.onpathvalidation, `${name}.onpathvalidation`); - if (config.onsessionticket !== undefined) - validateFunction(config.onsessionticket, `${name}.onsessionticket`); - if (config.onversionnegotiation !== undefined) - validateFunction(config.onversionnegotiation, `${name}.onversionnegotiation`); - if (config.onhandshake !== undefined) - validateFunction(config.onhandshake, `${name}.onhandshake`); - return config; -} - -function validateEndpointConfig(config) { - validateObject(config, 'config'); - validateFunction(config.onsession, 'config.onsession'); - if (config.session !== undefined) - validateSessionConfig(config.session, 'config.session'); - if (config.stream !== undefined) - validateStreamConfig(config.stream, 'config.stream'); - return config; -} - class QuicEndpoint { - /** @type {SocketAddress|undefined} */ + /** + * The local socket address on which the endpoint is listening (lazily created) + * @type {SocketAddress|undefined} + */ #address = undefined; - /** @type {boolean} */ + /** + * When true, the endpoint has been marked busy and is temporarily not accepting + * new sessions (only used when the Endpoint is acting as a server) + * @type {boolean} + */ #busy = false; - /** @type {object} */ + /** + * The underlying C++ handle for the endpoint. When undefined the endpoint is + * considered to be closed. + * @type {object} + */ #handle; - /** @type {boolean} */ + /** + * True if endpoint.close() has been called and the [kFinishClose] method has + * not yet been called. + * @type {boolean} + */ #isPendingClose = false; - /** @type {boolean} */ + /** + * True if the endpoint is acting as a server and actively listening for connections. + * @type {boolean} + */ #listening = false; - /** @type {PromiseWithResolvers} */ - #pendingClose; - /** @type {any} */ + /** + * A promise that is resolved when the endpoint has been closed (or rejected if + * the endpoint closes abruptly due to an error). + * @type {PromiseWithResolvers} + */ + #pendingClose = Promise.withResolvers(); // eslint-disable-line node-core/prefer-primordials + /** + * If destroy() is called with an error, the error is stored here and used to reject + * the pendingClose promise when [kFinishClose] is called. + * @type {any} + */ #pendingError = undefined; - /** @type {QuicSession[]} */ - #sessions = []; - /** @type {QuicEndpointState} */ + /** + * The collection of active sessions. + * @type {Set} + */ + #sessions = new SafeSet(); + /** + * The internal state of the endpoint. Used to efficiently track and update the + * state of the underlying c++ endpoint handle. + * @type {QuicEndpointState} + */ #state; - /** @type {QuicEndpointStats} */ + /** + * The collected statistics for the endpoint. + * @type {QuicEndpointStats} + */ #stats; - /** @type {OnSessionCallback} */ + /** + * The user provided callback that is invoked when a new session is received. + * (used only when the endpoint is acting as a server) + * @type {OnSessionCallback} + */ #onsession; - /** @type {SessionCallbackConfiguration} */ + /** + * The callback configuration used for new sessions (client or server) + * @type {ProcessedSessionCallbackConfiguration} + */ #sessionConfig; - /** @type {StreamCallbackConfiguration */ - #streamConfig; /** * @param {EndpointCallbackConfiguration} config - * @param {EndpointOptions} [options] + * @returns {StreamCallbackConfiguration} + */ + #processStreamConfig(config) { + validateObject(config, 'config'); + const { + onblocked, + onreset, + onheaders, + ontrailers, + } = config; + + if (onblocked !== undefined) { + validateFunction(onblocked, 'config.onblocked'); + } + if (onreset !== undefined) { + validateFunction(onreset, 'config.onreset'); + } + if (onheaders !== undefined) { + validateFunction(onheaders, 'config.onheaders'); + } + if (ontrailers !== undefined) { + validateFunction(ontrailers, 'ontrailers'); + } + + return { + __proto__: null, + onblocked, + onreset, + onheaders, + ontrailers, + }; + } + + /** + * + * @param {EndpointCallbackConfiguration} config + * @returns {ProcessedSessionCallbackConfiguration} */ - constructor(config, options = kEmptyObject) { - validateEndpointConfig(config); - this.#onsession = config.onsession; - this.#sessionConfig = config.session; - this.#streamConfig = config.stream; + #processSessionConfig(config) { + validateObject(config, 'config'); + const { + onstream, + ondatagram, + ondatagramstatus, + onpathvalidation, + onsessionticket, + onversionnegotiation, + onhandshake, + } = config; + if (onstream !== undefined) { + validateFunction(onstream, 'config.onstream'); + } + if (ondatagram !== undefined) { + validateFunction(ondatagram, 'config.ondatagram'); + } + if (ondatagramstatus !== undefined) { + validateFunction(ondatagramstatus, 'config.ondatagramstatus'); + } + if (onpathvalidation !== undefined) { + validateFunction(onpathvalidation, 'config.onpathvalidation'); + } + if (onsessionticket !== undefined) { + validateFunction(onsessionticket, 'config.onsessionticket'); + } + if (onversionnegotiation !== undefined) { + validateFunction(onversionnegotiation, 'config.onversionnegotiation'); + } + if (onhandshake !== undefined) { + validateFunction(onhandshake, 'config.onhandshake'); + } + return { + __proto__: null, + onstream, + ondatagram, + ondatagramstatus, + onpathvalidation, + onsessionticket, + onversionnegotiation, + onhandshake, + stream: this.#processStreamConfig(config), + }; + } + /** + * @param {EndpointCallbackConfiguration} config + * @returns {ProcessedEndpointCallbackConfiguration} + */ + #processEndpointConfig(config) { + validateObject(config, 'config'); + const { + onsession, + } = config; + + if (onsession !== undefined) { + validateFunction(config.onsession, 'config.onsession'); + } + + return { + __proto__: null, + onsession, + session: this.#processSessionConfig(config), + }; + } + + /** + * @param {EndpointCallbackConfiguration} options + * @returns {EndpointOptions} + */ + #processEndpointOptions(options) { validateObject(options, 'options'); let { address } = options; const { @@ -1115,8 +1525,7 @@ class QuicEndpoint { } } - this.#pendingClose = Promise.withResolvers(); // eslint-disable-line node-core/prefer-primordials - this.#handle = new Endpoint_({ + return { __proto__: null, address: address?.[kSocketAddressHandle], retryTokenExpiration, @@ -1143,10 +1552,43 @@ class QuicEndpoint { cc, resetTokenSecret, tokenSecret, - }); + }; + } + + #newSession(handle) { + const session = new QuicSession(kPrivateConstructor, this.#sessionConfig, handle, this); + this.#sessions.add(session); + return session; + } + + /** + * @param {EndpointCallbackConfiguration} config + */ + constructor(config = kEmptyObject) { + const { + onsession, + session, + } = this.#processEndpointConfig(config); + + // Note that the onsession callback is only used for server sessions. + // If the callback is not specified, calling listen() will fail but + // connect() can still be called. + if (onsession !== undefined) { + this.#onsession = onsession.bind(this); + } + this.#sessionConfig = session; + + this.#handle = new Endpoint_(this.#processEndpointOptions(config)); this.#handle[kOwner] = this; this.#stats = new QuicEndpointStats(kPrivateConstructor, this.#handle.stats); this.#state = new QuicEndpointState(kPrivateConstructor, this.#handle.state); + + if (onEndpointCreatedChannel.hasSubscribers) { + onEndpointCreatedChannel.publish({ + endpoint: this, + config, + }); + } } /** @type {QuicEndpointStats} */ @@ -1174,11 +1616,16 @@ class QuicEndpoint { throw new ERR_INVALID_STATE('Endpoint is closed'); } // The val is allowed to be any truthy value - val = !!val; // Non-op if there is no change - if (val !== this.#busy) { - this.#busy = val; + if (!!val !== this.#busy) { + this.#busy = !this.#busy; this.#handle.markBusy(this.#busy); + if (onEndpointBusyChangeChannel.hasSubscribers) { + onEndpointBusyChangeChannel.publish({ + endpoint: this, + busy: this.#busy, + }); + } } } @@ -1196,19 +1643,127 @@ class QuicEndpoint { } /** - * Configures the endpoint to listen for incoming connections. - * @param {SessionOptions} [options] + * @param {TlsOptions} tls */ - listen(options = kEmptyObject) { - if (this.#isClosedOrClosing) { - throw new ERR_INVALID_STATE('Endpoint is closed'); + #processTlsOptions(tls) { + validateObject(tls, 'options.tls'); + const { + sni, + alpn, + ciphers = DEFAULT_CIPHERS, + groups = DEFAULT_GROUPS, + keylog = false, + verifyClient = false, + tlsTrace = false, + verifyPrivateKey = false, + keys, + certs, + ca, + crl, + } = tls; + + if (sni !== undefined) { + validateString(sni, 'options.tls.sni'); } - if (this.#listening) { - throw new ERR_INVALID_STATE('Endpoint is already listening'); + if (alpn !== undefined) { + validateString(alpn, 'options.tls.alpn'); + } + if (ciphers !== undefined) { + validateString(ciphers, 'options.tls.ciphers'); + } + if (groups !== undefined) { + validateString(groups, 'options.tls.groups'); + } + validateBoolean(keylog, 'options.tls.keylog'); + validateBoolean(verifyClient, 'options.tls.verifyClient'); + validateBoolean(tlsTrace, 'options.tls.tlsTrace'); + validateBoolean(verifyPrivateKey, 'options.tls.verifyPrivateKey'); + + if (certs !== undefined) { + const certInputs = ArrayIsArray(certs) ? certs : [certs]; + for (const cert of certInputs) { + if (!isArrayBufferView(cert) && !isArrayBuffer(cert)) { + throw new ERR_INVALID_ARG_TYPE('options.tls.certs', + ['ArrayBufferView', 'ArrayBuffer'], cert); + } + } } - validateObject(options, 'options'); + if (ca !== undefined) { + const caInputs = ArrayIsArray(ca) ? ca : [ca]; + for (const caCert of caInputs) { + if (!isArrayBufferView(caCert) && !isArrayBuffer(caCert)) { + throw new ERR_INVALID_ARG_TYPE('options.tls.ca', + ['ArrayBufferView', 'ArrayBuffer'], caCert); + } + } + } + + if (crl !== undefined) { + const crlInputs = ArrayIsArray(crl) ? crl : [crl]; + for (const crlCert of crlInputs) { + if (!isArrayBufferView(crlCert) && !isArrayBuffer(crlCert)) { + throw new ERR_INVALID_ARG_TYPE('options.tls.crl', + ['ArrayBufferView', 'ArrayBuffer'], crlCert); + } + } + } + const keyHandles = []; + if (keys !== undefined) { + const keyInputs = ArrayIsArray(keys) ? keys : [keys]; + for (const key of keyInputs) { + if (isKeyObject(key)) { + if (key.type !== 'private') { + throw new ERR_INVALID_ARG_VALUE('options.tls.keys', key, 'must be a private key'); + } + ArrayPrototypePush(keyHandles, key[kKeyObjectHandle]); + } else if (isCryptoKey(key)) { + if (key.type !== 'private') { + throw new ERR_INVALID_ARG_VALUE('options.tls.keys', key, 'must be a private key'); + } + ArrayPrototypePush(keyHandles, key[kKeyObjectInner][kKeyObjectHandle]); + } else { + throw new ERR_INVALID_ARG_TYPE('options.tls.keys', ['KeyObject', 'CryptoKey'], key); + } + } + } + + return { + __proto__: null, + sni, + alpn, + ciphers, + groups, + keylog, + verifyClient, + tlsTrace, + verifyPrivateKey, + keys: keyHandles, + certs, + ca, + crl, + }; + } + + /** + * @param {'use'|'ignore'|'default'} policy + * @returns {number} + */ + #getPreferredAddressPolicy(policy = 'default') { + switch (policy) { + case 'use': return PREFERRED_ADDRESS_USE; + case 'ignore': return PREFERRED_ADDRESS_IGNORE; + case 'default': return DEFAULT_PREFERRED_ADDRESS_POLICY; + } + throw new ERR_INVALID_ARG_VALUE('options.preferredAddressPolicy', policy); + } + + /** + * @param {SessionOptions} options + */ + #processSessionOptions(options) { + validateObject(options, 'options'); const { version, minVersion, @@ -1217,18 +1772,48 @@ class QuicEndpoint { transportParams = kEmptyObject, tls = kEmptyObject, qlog = false, + sessionTicket, } = options; - this.#handle.listen({ + return { + __proto__: null, version, minVersion, - preferredAddressPolicy: getPreferredAddressPolicy(preferredAddressPolicy), + preferredAddressPolicy: this.#getPreferredAddressPolicy(preferredAddressPolicy), application, transportParams, - tls: processTlsOptions(tls), + tls: this.#processTlsOptions(tls), qlog, - }); + sessionTicket, + }; + } + + /** + * Configures the endpoint to listen for incoming connections. + * @param {SessionOptions} [options] + */ + listen(options = kEmptyObject) { + if (this.#isClosedOrClosing) { + throw new ERR_INVALID_STATE('Endpoint is closed'); + } + if (this.#onsession === undefined) { + throw new ERR_INVALID_STATE( + 'Endpoint is not configured to accept sessions. Specify an onsession ' + + 'callback when creating the endpoint', + ); + } + if (this.#listening) { + throw new ERR_INVALID_STATE('Endpoint is already listening'); + } + this.#handle.listen(this.#processSessionOptions(options)); this.#listening = true; + + if (onEndpointListeningChannel.hasSubscribers) { + onEndpointListeningChannel.publish({ + endpoint: this, + options, + }); + } } /** @@ -1241,45 +1826,34 @@ class QuicEndpoint { if (this.#isClosedOrClosing) { throw new ERR_INVALID_STATE('Endpoint is closed'); } - if (this.#busy) { - throw new ERR_INVALID_STATE('Endpoint is busy'); - } - if (address === undefined || !SocketAddress.isSocketAddress(address)) { - if (typeof address === 'object' && address !== null) { - address = new SocketAddress(address); - } else { + if (!SocketAddress.isSocketAddress(address)) { + if (address == null || typeof address !== 'object') { throw new ERR_INVALID_ARG_TYPE('address', 'SocketAddress', address); } + address = new SocketAddress(address); } - validateObject(options, 'options'); - const { - version, - minVersion, - preferredAddressPolicy = 'default', - application = kEmptyObject, - transportParams = kEmptyObject, - tls = kEmptyObject, - qlog = false, - sessionTicket, - } = options; + const processedOptions = this.#processSessionOptions(options); + const { sessionTicket } = processedOptions; - const handle = this.#handle.connect(address[kSocketAddressHandle], { - version, - minVersion, - preferredAddressPolicy: getPreferredAddressPolicy(preferredAddressPolicy), - application, - transportParams, - tls: processTlsOptions(tls), - qlog, - }, sessionTicket); + const handle = this.#handle.connect(address[kSocketAddressHandle], + processedOptions, sessionTicket); if (handle === undefined) { throw new ERR_QUIC_CONNECTION_FAILED(); } - const session = new QuicSession(this.#sessionConfig, this.#streamConfig, handle, this); - ArrayPrototypePush(this.#sessions, session); + const session = this.#newSession(handle); + + if (onEndpointClientSessionChannel.hasSubscribers) { + onEndpointClientSessionChannel.publish({ + endpoint: this, + session, + address, + options, + }); + } + return session; } @@ -1289,10 +1863,16 @@ class QuicEndpoint { * not be accepted or created. The returned promise will be resolved when * closing is complete, or will be rejected if the endpoint is closed abruptly * due to an error. - * @returns {Promise} + * @returns {Promise} Returns this.closed */ close() { if (!this.#isClosedOrClosing) { + if (onEndpointClosingChannel.hasSubscribers) { + onEndpointClosingChannel.publish({ + endpoint: this, + hasPendingError: this.#pendingError !== undefined, + }); + } this.#isPendingClose = true; this.#handle?.closeGracefully(); } @@ -1310,12 +1890,22 @@ class QuicEndpoint { /** @type {boolean} */ get destroyed() { return this.#handle === undefined; } + /** + * Return an iterator over all currently active sessions associated + * with this endpoint. + * @type {SetIterator} + */ + get sessions() { + return this.#sessions[SymbolIterator](); + } + /** * Forcefully terminates the endpoint by immediately destroying all sessions * after calling close. If an error is given, the closed promise will be * rejected with that error. If no error is given, the closed promise will * be resolved. - * @param {any} error + * @param {any} [error] + * @returns {Promise} Returns this.closed */ destroy(error) { if (!this.#isClosedOrClosing) { @@ -1329,6 +1919,7 @@ class QuicEndpoint { for (const session of this.#sessions) { session.destroy(error); } + return this.closed; } ref() { @@ -1339,63 +1930,100 @@ class QuicEndpoint { if (this.#handle !== undefined) this.#handle.ref(false); } - [kFinishClose](context, status) { - if (this.#handle === undefined) return; - this.#isPendingClose = false; - this.#address = undefined; - this.#busy = false; - this.#listening = false; - this.#isPendingClose = false; - this.#stats[kFinishClose](); - this.#state[kFinishClose](); - this.#sessions = []; - + #maybeGetCloseError(context, status) { switch (context) { case kCloseContextClose: { - if (this.#pendingError !== undefined) { - this.#pendingClose.reject(this.#pendingError); - } else { - this.#pendingClose.resolve(); - } - break; + return this.#pendingError; } case kCloseContextBindFailure: { - this.#pendingClose.reject( - new ERR_QUIC_ENDPOINT_CLOSED('Bind failure', status)); - break; + return new ERR_QUIC_ENDPOINT_CLOSED('Bind failure', status); } case kCloseContextListenFailure: { - this.#pendingClose.reject( - new ERR_QUIC_ENDPOINT_CLOSED('Listen failure', status)); - break; + return new ERR_QUIC_ENDPOINT_CLOSED('Listen failure', status); } case kCloseContextReceiveFailure: { - this.#pendingClose.reject( - new ERR_QUIC_ENDPOINT_CLOSED('Receive failure', status)); - break; + return new ERR_QUIC_ENDPOINT_CLOSED('Receive failure', status); } case kCloseContextSendFailure: { - this.#pendingClose.reject( - new ERR_QUIC_ENDPOINT_CLOSED('Send failure', status)); - break; + return new ERR_QUIC_ENDPOINT_CLOSED('Send failure', status); } case kCloseContextStartFailure: { - this.#pendingClose.reject( - new ERR_QUIC_ENDPOINT_CLOSED('Start failure', status)); - break; + return new ERR_QUIC_ENDPOINT_CLOSED('Start failure', status); } } + // Otherwise return undefined. + } - this.#pendingError = undefined; + [kFinishClose](context, status) { + if (this.#handle === undefined) return; + this.#handle = undefined; + this.#stats[kFinishClose](); + this.#state[kFinishClose](); + this.#address = undefined; + this.#busy = false; + this.#listening = false; + this.#isPendingClose = false; + + // As QuicSessions are closed they are expected to remove themselves + // from the sessions collection. Just in case they don't, let's force + // it by resetting the set so we don't leak memory. Let's emit a warning, + // tho, if the set is not empty at this point as that would indicate a + // bug in Node.js that should be fixed. + if (this.#sessions.size > 0) { + process.emitWarning( + `The endpoint is closed with ${this.#sessions.size} active sessions. ` + + 'This should not happen and indicates a bug in Node.js. Please open an ' + + 'issue in the Node.js GitHub repository at https://github.com/nodejs/node ' + + 'to report the problem.', + ); + } + this.#sessions.clear(); + + // If destroy was called with an error, then the this.#pendingError will be + // set. Or, if context indicates an error condition that caused the endpoint + // to be closed, the status will indicate the error code. In either case, + // we will reject the pending close promise at this point. + const maybeCloseError = this.#maybeGetCloseError(context, status); + if (maybeCloseError !== undefined) { + if (onEndpointErrorChannel.hasSubscribers) { + onEndpointErrorChannel.publish({ + endpoint: this, + error: maybeCloseError, + }); + } + this.#pendingClose.reject(maybeCloseError); + } else { + // Otherwise we are good to resolve the pending close promise! + this.#pendingClose.resolve(); + } + if (onEndpointClosedChannel.hasSubscribers) { + onEndpointClosedChannel.publish({ + endpoint: this, + }); + } + + // Note that we are intentionally not clearing the + // this.#pendingClose.promise here. this.#pendingClose.resolve = undefined; this.#pendingClose.reject = undefined; - this.#handle = undefined; + this.#pendingError = undefined; } [kNewSession](handle) { - const session = new QuicSession(this.#sessionConfig, this.#streamConfig, handle, this); - ArrayPrototypePush(this.#sessions, session); - this.#onsession(session, this); + const session = this.#newSession(handle); + if (onEndpointServerSessionChannel.hasSubscribers) { + onEndpointServerSessionChannel.publish({ + endpoint: this, + session, + }); + } + this.#onsession(session); + } + + // Called by the QuicSession when it closes to remove itself from + // the active sessions tracked by the QuicEndpoint. + [kRemoveSession](session) { + this.#sessions.delete(session); } async [SymbolAsyncDispose]() { await this.close(); } @@ -1423,65 +2051,27 @@ class QuicEndpoint { } }; -ObjectDefineProperties(QuicEndpoint, { - CC_ALGO_RENO: { - __proto__: null, - value: CC_ALGO_RENO, - writable: false, - configurable: false, - enumerable: true, - }, - CC_ALGO_CUBIC: { - __proto__: null, - value: CC_ALGO_CUBIC, - writable: false, - configurable: false, - enumerable: true, - }, - CC_ALGO_BBR: { - __proto__: null, - value: CC_ALGO_BBR, - writable: false, - configurable: false, - enumerable: true, - }, - CC_ALGO_RENO_STR: { - __proto__: null, - value: CC_ALGO_RENO_STR, - writable: false, - configurable: false, - enumerable: true, - }, - CC_ALGO_CUBIC_STR: { - __proto__: null, - value: CC_ALGO_CUBIC_STR, - writable: false, - configurable: false, - enumerable: true, - }, - CC_ALGO_BBR_STR: { +function readOnlyConstant(value) { + return { __proto__: null, - value: CC_ALGO_BBR_STR, + value, writable: false, configurable: false, enumerable: true, - }, + }; +} + +ObjectDefineProperties(QuicEndpoint, { + CC_ALGO_RENO: readOnlyConstant(CC_ALGO_RENO), + CC_ALGO_CUBIC: readOnlyConstant(CC_ALGO_CUBIC), + CC_ALGO_BBR: readOnlyConstant(CC_ALGO_BBR), + CC_ALGP_RENO_STR: readOnlyConstant(CC_ALGO_RENO_STR), + CC_ALGO_CUBIC_STR: readOnlyConstant(CC_ALGO_CUBIC_STR), + CC_ALGO_BBR_STR: readOnlyConstant(CC_ALGO_BBR_STR), }); ObjectDefineProperties(QuicSession, { - DEFAULT_CIPHERS: { - __proto__: null, - value: DEFAULT_CIPHERS, - writable: false, - configurable: false, - enumerable: true, - }, - DEFAULT_GROUPS: { - __proto__: null, - value: DEFAULT_GROUPS, - writable: false, - configurable: false, - enumerable: true, - }, + DEFAULT_CIPHERS: readOnlyConstant(DEFAULT_CIPHERS), + DEFAULT_GROUPS: readOnlyConstant(DEFAULT_GROUPS), }); module.exports = { diff --git a/lib/internal/quic/symbols.js b/lib/internal/quic/symbols.js index fa2c98320b860e..c436b5c4b787ff 100644 --- a/lib/internal/quic/symbols.js +++ b/lib/internal/quic/symbols.js @@ -24,7 +24,9 @@ const kFinishClose = Symbol('kFinishClose'); const kHandshake = Symbol('kHandshake'); const kHeaders = Symbol('kHeaders'); const kOwner = Symbol('kOwner'); +const kRemoveSession = Symbol('kRemoveSession'); const kNewSession = Symbol('kNewSession'); +const kRemoveStream = Symbol('kRemoveStream'); const kNewStream = Symbol('kNewStream'); const kPathValidation = Symbol('kPathValidation'); const kReset = Symbol('kReset'); @@ -42,7 +44,9 @@ module.exports = { kHandshake, kHeaders, kOwner, + kRemoveSession, kNewSession, + kRemoveStream, kNewStream, kPathValidation, kReset, diff --git a/test/parallel/test-quic-internal-endpoint-listen-defaults.js b/test/parallel/test-quic-internal-endpoint-listen-defaults.js index 987e191b759cdd..598eac7693aa1a 100644 --- a/test/parallel/test-quic-internal-endpoint-listen-defaults.js +++ b/test/parallel/test-quic-internal-endpoint-listen-defaults.js @@ -26,9 +26,7 @@ describe('quic internal endpoint listen defaults', { skip: !hasQuic }, async () it('are reasonable and work as expected', async () => { const endpoint = new QuicEndpoint({ onsession() {}, - session: {}, - stream: {}, - }, {}); + }); ok(!endpoint.state.isBound); ok(!endpoint.state.isReceiving); diff --git a/test/parallel/test-quic-internal-endpoint-options.js b/test/parallel/test-quic-internal-endpoint-options.js index 91c1e6d5b2d312..b9ebaa0ffef2d3 100644 --- a/test/parallel/test-quic-internal-endpoint-options.js +++ b/test/parallel/test-quic-internal-endpoint-options.js @@ -22,15 +22,9 @@ describe('quic internal endpoint options', { skip: !hasQuic }, async () => { inspect, } = require('util'); - const callbackConfig = { - onsession() {}, - session: {}, - stream: {}, - }; - it('invalid options', async () => { ['a', null, false, NaN].forEach((i) => { - throws(() => new QuicEndpoint(callbackConfig, i), { + throws(() => new QuicEndpoint(i), { code: 'ERR_INVALID_ARG_TYPE', }); }); @@ -38,9 +32,7 @@ describe('quic internal endpoint options', { skip: !hasQuic }, async () => { it('valid options', async () => { // Just Works... using all defaults - new QuicEndpoint(callbackConfig, {}); - new QuicEndpoint(callbackConfig); - new QuicEndpoint(callbackConfig, undefined); + new QuicEndpoint(); }); it('various cases', async () => { @@ -190,13 +182,13 @@ describe('quic internal endpoint options', { skip: !hasQuic }, async () => { for (const value of valid) { const options = {}; options[key] = value; - new QuicEndpoint(callbackConfig, options); + new QuicEndpoint(options); } for (const value of invalid) { const options = {}; options[key] = value; - throws(() => new QuicEndpoint(callbackConfig, options), { + throws(() => new QuicEndpoint(options), { code: 'ERR_INVALID_ARG_VALUE', }); } @@ -204,7 +196,7 @@ describe('quic internal endpoint options', { skip: !hasQuic }, async () => { }); it('endpoint can be ref/unrefed without error', async () => { - const endpoint = new QuicEndpoint(callbackConfig, {}); + const endpoint = new QuicEndpoint(); endpoint.unref(); endpoint.ref(); endpoint.close(); @@ -212,17 +204,17 @@ describe('quic internal endpoint options', { skip: !hasQuic }, async () => { }); it('endpoint can be inspected', async () => { - const endpoint = new QuicEndpoint(callbackConfig, {}); + const endpoint = new QuicEndpoint({}); strictEqual(typeof inspect(endpoint), 'string'); endpoint.close(); await endpoint.closed; }); it('endpoint with object address', () => { - new QuicEndpoint(callbackConfig, { + new QuicEndpoint({ address: { host: '127.0.0.1:0' }, }); - throws(() => new QuicEndpoint(callbackConfig, { address: '127.0.0.1:0' }), { + throws(() => new QuicEndpoint({ address: '127.0.0.1:0' }), { code: 'ERR_INVALID_ARG_TYPE', }); }); diff --git a/test/parallel/test-quic-internal-endpoint-stats-state.js b/test/parallel/test-quic-internal-endpoint-stats-state.js index 6992e4ac09df08..f0302d2791e2b3 100644 --- a/test/parallel/test-quic-internal-endpoint-stats-state.js +++ b/test/parallel/test-quic-internal-endpoint-stats-state.js @@ -33,11 +33,7 @@ describe('quic internal endpoint stats and state', { skip: !hasQuic }, () => { } = require('node:assert'); it('endpoint state', () => { - const endpoint = new QuicEndpoint({ - onsession() {}, - session: {}, - stream: {}, - }); + const endpoint = new QuicEndpoint(); strictEqual(endpoint.state.isBound, false); strictEqual(endpoint.state.isReceiving, false); @@ -66,11 +62,7 @@ describe('quic internal endpoint stats and state', { skip: !hasQuic }, () => { }); it('state is not readable after close', () => { - const endpoint = new QuicEndpoint({ - onsession() {}, - session: {}, - stream: {}, - }, {}); + const endpoint = new QuicEndpoint(); endpoint.state[kFinishClose](); throws(() => endpoint.state.isBound, { name: 'Error', @@ -78,11 +70,7 @@ describe('quic internal endpoint stats and state', { skip: !hasQuic }, () => { }); it('state constructor argument is ArrayBuffer', () => { - const endpoint = new QuicEndpoint({ - onsession() {}, - session: {}, - stream: {}, - }, {}); + const endpoint = new QuicEndpoint(); const Cons = endpoint.state.constructor; throws(() => new Cons(kPrivateConstructor, 1), { code: 'ERR_INVALID_ARG_TYPE' @@ -90,11 +78,7 @@ describe('quic internal endpoint stats and state', { skip: !hasQuic }, () => { }); it('endpoint stats', () => { - const endpoint = new QuicEndpoint({ - onsession() {}, - session: {}, - stream: {}, - }); + const endpoint = new QuicEndpoint(); strictEqual(typeof endpoint.stats.isConnected, 'boolean'); strictEqual(typeof endpoint.stats.createdAt, 'bigint'); @@ -134,11 +118,7 @@ describe('quic internal endpoint stats and state', { skip: !hasQuic }, () => { }); it('stats are still readble after close', () => { - const endpoint = new QuicEndpoint({ - onsession() {}, - session: {}, - stream: {}, - }, {}); + const endpoint = new QuicEndpoint(); strictEqual(typeof endpoint.stats.toJSON(), 'object'); endpoint.stats[kFinishClose](); strictEqual(endpoint.stats.isConnected, false); @@ -147,11 +127,7 @@ describe('quic internal endpoint stats and state', { skip: !hasQuic }, () => { }); it('stats constructor argument is ArrayBuffer', () => { - const endpoint = new QuicEndpoint({ - onsession() {}, - session: {}, - stream: {}, - }, {}); + const endpoint = new QuicEndpoint(); const Cons = endpoint.stats.constructor; throws(() => new Cons(kPrivateConstructor, 1), { code: 'ERR_INVALID_ARG_TYPE',