Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpClientTransport: Supress ClosedReceiveChannelException when server disconnects #226

Open
yuriykulikov opened this issue Apr 19, 2022 · 2 comments
Labels
Milestone

Comments

@yuriykulikov
Copy link
Contributor

Motivation

As of 0.15.4, when the client loses the connection to the server, a ClosedReceiveChannelException is thrown in one of the client coroutines. It is propagated to the CoroutineExceptionHandler and is printed. It looks like this (using TcpClientTransport):

Exception in thread "DefaultDispatcher-worker-1 @rSocket-tcp-client#73" kotlinx.coroutines.channels.ClosedReceiveChannelException: Unexpected EOF: expected 3 more bytes
	at io.ktor.utils.io.ByteBufferChannel.readFullySuspend(ByteBufferChannel.kt:573)
	at io.ktor.utils.io.ByteBufferChannel.access$readFullySuspend(ByteBufferChannel.kt:24)
	at io.ktor.utils.io.ByteBufferChannel$readFullySuspend$1.invokeSuspend(ByteBufferChannel.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
	Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineName(rSocket-tcp-client), CoroutineId(73), "rSocket-tcp-client#73":StandaloneCoroutine{Cancelling}@61931601, Dispatchers.IO]

Desired solution

In my opinion, this exception can be supressed, because if the channel is closed, the client cannot do anything about it and should just quit or reconnect. This can be achieved by attaching a CoroutineExceptionHandler to the TcpConnection coroutineContext:

@TransportApi
internal class TcpConnection(
        socket: Socket,
        coroutineContext: CoroutineContext,
        override val pool: ObjectPool<ChunkBuffer>
) : Connection {
    override val coroutineContext: CoroutineContext = coroutineContext + CoroutineExceptionHandler { _, throwable ->
        when (throwable) {
            is ClosedReceiveChannelException -> Unit // NOP
            else -> throw throwable
        }
    }
// ...
}

Considered alternatives

An alternative would be to catch the exception close to the readPacket invokation, but doing so breaks the responder rSocket coroutineScope behavior. For example, if client code relies on the scope cancellation to log disconnects, this cancellation won't happen.

Additional context

Exception is usually thrown when reading packet length:

/*
 * Copyright 2015-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.rsocket.kotlin.transport.ktor.tcp

import io.ktor.network.sockets.*
import io.ktor.util.cio.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.Connection
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlin.coroutines.*

@TransportApi
internal class TcpConnection(
    socket: Socket,
    override val coroutineContext: CoroutineContext,
    override val pool: ObjectPool<ChunkBuffer>
) : Connection {
    private val socketConnection = socket.connection()

    private val sendChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)
    private val receiveChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)

    init {
        launch {
            socketConnection.output.use {
                while (isActive) {
                    val packet = sendChannel.receive()
                    val length = packet.remaining.toInt()
                    try {
                        writePacket {
                            @Suppress("INVISIBLE_MEMBER") writeLength(length)
                            writePacket(packet)
                        }
                        flush()
                    } catch (e: Throwable) {
                        packet.close()
                        throw e
                    }
                }
            }
        }
        launch {
            socketConnection.input.apply {
                while (isActive) {
                    // original code
                    // val length = @Suppress("INVISIBLE_MEMBER") readPacket(3).readLength()
                    // val packet = readPacket(length)
                    // end of original code
                    // new code
                    val packet = try {
                        val length = @Suppress("INVISIBLE_MEMBER") readPacket(3).readLength()
                        readPacket(length)
                    } catch (closedReceiveChannelException: ClosedReceiveChannelException) {
                        println("Receive channel was closed!")
                        // this is the exception
                        // if replaced with break or cancel, then responder rSocket scope won't be cancelled
                        throw closedReceiveChannelException
                    }
                    // end of new code
                    try {
                        receiveChannel.send(packet)
                    } catch (cause: Throwable) {
                        packet.close()
                        throw cause
                    }
                }
            }
        }
        coroutineContext.job.invokeOnCompletion {
            @Suppress("INVISIBLE_MEMBER") sendChannel.fullClose(it)
            @Suppress("INVISIBLE_MEMBER") receiveChannel.fullClose(it)
            socketConnection.input.cancel(it)
            socketConnection.output.close(it)
            socketConnection.socket.close()
        }
    }

    override suspend fun send(packet: ByteReadPacket): Unit = sendChannel.send(packet)
    override suspend fun receive(): ByteReadPacket = receiveChannel.receive()
}
@ghost
Copy link

ghost commented Apr 26, 2022

Hey, thanks for raising this. There is already similar opened issue #148.
Im now in progress of rewriting transport API to support QUIC, and I will try to solve this unexpected error in some way to not clutter users with such errors.

@rocketraman
Copy link

Same thing happens on the server-side when a client disconnects.

@whyoleg whyoleg added this to the 0.17.0 milestone Oct 5, 2022
@whyoleg whyoleg modified the milestone: 0.16.0 - Transport API/Internals rework, QUIC, Netty Nov 24, 2022
@whyoleg whyoleg added the bug label May 3, 2023
@whyoleg whyoleg removed this from the 0.16.0 - Transport API/Internals rework, QUIC, Netty milestone May 3, 2023
@whyoleg whyoleg added this to the 0.17.0 milestone Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants