This document features multiple integrations of the Echo library, and explains how to replicate data over websockets.
Up until now, all the interactions we had with a site occurred on a single machine. Also, we didn't really look at the API surface that lets sites communicate with each other, and how we may use it to replicate content across multiple physical machines.
Additionally, while sites do, indeed, offer replication, they're not the lowest level abstraction that does so. Let's look at the inheritance hierarchy of MutableSite
:
classDiagram
direction BT
class ReceiveExchange
class SendExchange
class Exchange
class Site
class MutableSite
Exchange --|> ReceiveExchange
Exchange --|> SendExchange
Site --|> Exchange
MutableSite --|> Site
In fact, the ReceiveExchange
and SendExchange
interfaces are the lowest-level components that can sync content. The replication protocol is asymmetric : a ReceiveExchange
will receive requests from other sites, and respond with the events it has. A SendExchange
starts by sending some requests, and will then receive the events from a ReceiveExchange
.
Both ReceiveExchange
and SendExchange
offer a Link
that lets them emit and send some messages :
// Link.kt
fun interface Link<in I, out O> {
fun talk(incoming: Flow<I>): Flow<O>
}
// Exchange.kt
fun interface SendExchange<in I, out O> {
fun outgoing(): Link<I, O>
}
fun interface ReceiveExchange<out I, in O> {
fun incoming(): Link<O, I>
}
A Link
exposes a cold asymmetric communication channel, based on flows. The protocol messages will then flow on the links. The two sync()
methods then simply "glue" two opposite Link
together, or two compatible Exchange<I, O>
:
// The first sync method makes use of some rendez-vous channels to establish
// communication between two opposing Link.
suspend fun <I, O> sync(
first: Link<I, O>,
second: Link<O, I>,
): Unit = coroutineScope {
val fToS = Channel<O>()
val sToF = Channel<I>()
launch {
first
.talk(sToF.consumeAsFlow())
.onEach { fToS.send(it) }
.onCompletion { fToS.close() }
.collect()
}
launch {
second
.talk(fToS.consumeAsFlow())
.onEach { sToF.send(it) }
.onCompletion { sToF.close() }
.collect()
}
}
// The second sync method groups exchanges by pairs, and establishes the sync
// process for each pair <incoming(), outgoing()>.
suspend fun <I, O> sync(
vararg exchanges: Exchange<I, O>,
): Unit = coroutineScope {
exchanges.asSequence().windowed(2).forEach { (left, right) ->
launch { sync(left.incoming(), right.outgoing()) }
launch { sync(left.outgoing(), right.incoming()) }
}
}
The complete source is available here.
As we've seen, an Exchange
simply implements both SendExchange
and ReceiveExchange
. What's the difference with a Site
then ?
Sites have two additional properties : they provide access to an observable aggregated model from the events, and they do not manage generic messages (instead, they use the echo protocol messages, which are necessary to replicate events). The aggregated value is the Site.value
flow that we have been using throughout the examples. On the other hand, exchanges are a great abstraction for the replication protocol, since they do not care about the type of the underlying events, as they do not try to aggregate them.
Now that we've seen that the communication protocol is implemented with flows of messages, and that the sync()
function can work with generic messages, it becomes clear that it will be easy to replicate sites across different machines, assuming we can transmit the protocol messages. For example, to send messages over websockets, two steps are needed :
- Transform the protocol messages to websocket frames
- Send the websocket frames through a dedicated websocket library
The echo-ktor-websockets library solves the first problem by offering some encodeToFrame()
and decodeToFrame()
methods on Exchange
:
val remote: Exchange<Frame, Frame> = remote() // obtained through the client or server integrations
val local = mutableSite(identifier, initial, projection)
sync(remote, local.encodeToFrame()) // sync over the network
Internally, the encodeToFrame()
and decodeFromFrame()
functions make use of Protobufs to efficiently store the messages in binary format, which are then mapped to binary websocket frames.
The echo-ktor-server module provides some server-side integration to serve a websocket-based Exchange
over a websocket connection. In fact, building a generic replication server can be done in exactly 10 lines of code:
fun main() {
val site = exchange()
embeddedServer(CIO, port = 1234) {
install(WebSockets)
routing {
route("snd") { sender { site.encodeToFrame() } }
route("rcv") { receiver { site.encodeToFrame() } }
}
}
.start(wait = true)
}
Check the sample out here.
In this example, a generic Exchange
, which handles encoded events, is served through the /snd
and /rcv
endpoints. Note that you could also decide to specify a different SyncStrategy
on the exchange, for instance if you'd like to enforce one-shot sync rather than continuous sync.
The sender { }
and receiver { }
are similar to a webSocket { }
block, and let you use the request parameters to decide which instance of Exchange
the caller should interact with. This is particularly useful when you want to feature multiple independent collaborative sessions.
The echo-ktor-client modules integrates well with websockets managed by the server library :
private val Client = HttpClient(Js) {
install(WebSockets)
}
val remote = Client.wssExchange(receiver = { url("/snd") }, sender = { url("/rcv") })
val local = mutableSite(identifier, initial, projection)
sync(remote, local.encodeToFrame()) // sync over the network
Check the sample out here.
In this example, an exchange is generated over a secure websocket connection, using an HttpClient
. It's also possible to create exchanges over insecure connections using the HttpClient.wsExchange(...)
builder.