You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
As of 0.15.4, when the client loses the connection to the server, a ClosedReceiveChannelException is thrown in one of the client coroutines. It is propagated to the CoroutineExceptionHandler and is printed. It looks like this (using TcpClientTransport):
Exception in thread "DefaultDispatcher-worker-1 @rSocket-tcp-client#73" kotlinx.coroutines.channels.ClosedReceiveChannelException: Unexpected EOF: expected 3 more bytes
at io.ktor.utils.io.ByteBufferChannel.readFullySuspend(ByteBufferChannel.kt:573)
at io.ktor.utils.io.ByteBufferChannel.access$readFullySuspend(ByteBufferChannel.kt:24)
at io.ktor.utils.io.ByteBufferChannel$readFullySuspend$1.invokeSuspend(ByteBufferChannel.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineName(rSocket-tcp-client), CoroutineId(73), "rSocket-tcp-client#73":StandaloneCoroutine{Cancelling}@61931601, Dispatchers.IO]
Desired solution
In my opinion, this exception can be supressed, because if the channel is closed, the client cannot do anything about it and should just quit or reconnect. This can be achieved by attaching a CoroutineExceptionHandler to the TcpConnectioncoroutineContext:
An alternative would be to catch the exception close to the readPacket invokation, but doing so breaks the responder rSocketcoroutineScope behavior. For example, if client code relies on the scope cancellation to log disconnects, this cancellation won't happen.
Additional context
Exception is usually thrown when reading packet length:
/* * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.*/packageio.rsocket.kotlin.transport.ktor.tcpimportio.ktor.network.sockets.*importio.ktor.util.cio.*importio.ktor.utils.io.*importio.ktor.utils.io.core.*importio.ktor.utils.io.core.internal.*importio.ktor.utils.io.pool.*importio.rsocket.kotlin.*importio.rsocket.kotlin.Connectionimportio.rsocket.kotlin.frame.io.*importio.rsocket.kotlin.internal.*importkotlinx.coroutines.*importkotlinx.coroutines.channels.ClosedReceiveChannelExceptionimportkotlin.coroutines.*
@TransportApi
internalclassTcpConnection(
socket:Socket,
overridevalcoroutineContext:CoroutineContext,
overridevalpool:ObjectPool<ChunkBuffer>
) : Connection {
privateval socketConnection = socket.connection()
privateval sendChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)
privateval receiveChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)
init {
launch {
socketConnection.output.use {
while (isActive) {
val packet = sendChannel.receive()
val length = packet.remaining.toInt()
try {
writePacket {
@Suppress("INVISIBLE_MEMBER") writeLength(length)
writePacket(packet)
}
flush()
} catch (e:Throwable) {
packet.close()
throw e
}
}
}
}
launch {
socketConnection.input.apply {
while (isActive) {
// original code// val length = @Suppress("INVISIBLE_MEMBER") readPacket(3).readLength()// val packet = readPacket(length)// end of original code// new codeval packet =try {
val length = @Suppress("INVISIBLE_MEMBER") readPacket(3).readLength()
readPacket(length)
} catch (closedReceiveChannelException:ClosedReceiveChannelException) {
println("Receive channel was closed!")
// this is the exception// if replaced with break or cancel, then responder rSocket scope won't be cancelledthrow closedReceiveChannelException
}
// end of new codetry {
receiveChannel.send(packet)
} catch (cause:Throwable) {
packet.close()
throw cause
}
}
}
}
coroutineContext.job.invokeOnCompletion {
@Suppress("INVISIBLE_MEMBER") sendChannel.fullClose(it)
@Suppress("INVISIBLE_MEMBER") receiveChannel.fullClose(it)
socketConnection.input.cancel(it)
socketConnection.output.close(it)
socketConnection.socket.close()
}
}
overridesuspendfunsend(packet:ByteReadPacket): Unit= sendChannel.send(packet)
overridesuspendfunreceive(): ByteReadPacket= receiveChannel.receive()
}
The text was updated successfully, but these errors were encountered:
Hey, thanks for raising this. There is already similar opened issue #148.
Im now in progress of rewriting transport API to support QUIC, and I will try to solve this unexpected error in some way to not clutter users with such errors.
Motivation
As of
0.15.4
, when the client loses the connection to the server, a ClosedReceiveChannelException is thrown in one of the client coroutines. It is propagated to the CoroutineExceptionHandler and is printed. It looks like this (using TcpClientTransport):Desired solution
In my opinion, this exception can be supressed, because if the channel is closed, the client cannot do anything about it and should just quit or reconnect. This can be achieved by attaching a
CoroutineExceptionHandler
to theTcpConnection
coroutineContext
:Considered alternatives
An alternative would be to catch the exception close to the
readPacket
invokation, but doing so breaks the responderrSocket
coroutineScope
behavior. For example, if client code relies on the scope cancellation to log disconnects, this cancellation won't happen.Additional context
Exception is usually thrown when reading packet length:
The text was updated successfully, but these errors were encountered: