-
Notifications
You must be signed in to change notification settings - Fork 353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
provides reworked AeronTransport implementation #972
base: master
Are you sure you want to change the base?
Conversation
@@ -11,7 +11,7 @@ | |||
protected UnboundedProcessor sender = new UnboundedProcessor(); | |||
|
|||
public BaseDuplexConnection() { | |||
onClose.doFinally(s -> doOnClose()).subscribe(); | |||
onClose.subscribe(null, t -> doOnClose(), this::doOnClose); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated, non-blocking. What's the plan with the deprecation? reactor/reactor-core#2431 (comment)
Should onClose be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I will do that in a separate PR
@@ -11,7 +11,7 @@ | |||
protected UnboundedProcessor sender = new UnboundedProcessor(); | |||
|
|||
public BaseDuplexConnection() { | |||
onClose.doFinally(s -> doOnClose()).subscribe(); | |||
onClose.subscribe(null, t -> doOnClose(), this::doOnClose); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this still run for cancellation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think it should at all. In this case, we wait for onError or onComplete
rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronChannelAddress.java
Show resolved
Hide resolved
rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java
Outdated
Show resolved
Hide resolved
rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java
Outdated
Show resolved
Hide resolved
.subscribe( | ||
null, | ||
__ -> { | ||
synchronized (activeConnections) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this duplication be improved by using doOnTerminate or doFinally?
rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServerTransport.java
Outdated
Show resolved
Hide resolved
Low value drive by review of a draft PR. Mainly me trying to learn from the PR. |
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
…on/AeronClientTransport.java Co-authored-by: Yuri Schimke <[email protected]>
…on/AeronServerTransport.java Co-authored-by: Yuri Schimke <[email protected]>
uncomments largePayload tests Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
0753b6e
to
6aa3d8a
Compare
Signed-off-by: Oleh Dokuka [email protected]