Skip to content

Latest commit

 

History

History
156 lines (120 loc) · 7.34 KB

README.md

File metadata and controls

156 lines (120 loc) · 7.34 KB

This document features multiple integrations of the Echo library, and explains how to replicate data over websockets.

Integrations

Replication basics

Up until now, all the interactions we had with a site occurred on a single machine. Also, we didn't really look at the API surface that lets sites communicate with each other, and how we may use it to replicate content across multiple physical machines.

Additionally, while sites do, indeed, offer replication, they're not the lowest level abstraction that does so. Let's look at the inheritance hierarchy of MutableSite :

classDiagram
  direction BT
  class ReceiveExchange
  class SendExchange
  class Exchange
  class Site
  class MutableSite
  Exchange    --|> ReceiveExchange
  Exchange    --|> SendExchange
  Site        --|> Exchange
  MutableSite --|> Site
Loading

In fact, the ReceiveExchange and SendExchange interfaces are the lowest-level components that can sync content. The replication protocol is asymmetric : a ReceiveExchange will receive requests from other sites, and respond with the events it has. A SendExchange starts by sending some requests, and will then receive the events from a ReceiveExchange.

Both ReceiveExchange and SendExchange offer a Link that lets them emit and send some messages :

// Link.kt
fun interface Link<in I, out O> {
  fun talk(incoming: Flow<I>): Flow<O>
}

 // Exchange.kt
fun interface SendExchange<in I, out O> {
  fun outgoing(): Link<I, O>
}
fun interface ReceiveExchange<out I, in O> {
  fun incoming(): Link<O, I>
}

A Link exposes a cold asymmetric communication channel, based on flows. The protocol messages will then flow on the links. The two sync() methods then simply "glue" two opposite Link together, or two compatible Exchange<I, O> :

// The first sync method makes use of some rendez-vous channels to establish
// communication between two opposing Link.
suspend fun <I, O> sync(
    first: Link<I, O>,
    second: Link<O, I>,
): Unit = coroutineScope {
  val fToS = Channel<O>()
  val sToF = Channel<I>()
  launch {
    first
        .talk(sToF.consumeAsFlow())
        .onEach { fToS.send(it) }
        .onCompletion { fToS.close() }
        .collect()
  }
  launch {
    second
        .talk(fToS.consumeAsFlow())
        .onEach { sToF.send(it) }
        .onCompletion { sToF.close() }
        .collect()
  }
}

// The second sync method groups exchanges by pairs, and establishes the sync
// process for each pair <incoming(), outgoing()>.
suspend fun <I, O> sync(
    vararg exchanges: Exchange<I, O>,
): Unit = coroutineScope {
  exchanges.asSequence().windowed(2).forEach { (left, right) ->
    launch { sync(left.incoming(), right.outgoing()) }
    launch { sync(left.outgoing(), right.incoming()) }
  }
}

The complete source is available here.

As we've seen, an Exchange simply implements both SendExchange and ReceiveExchange. What's the difference with a Site then ?

Sites have two additional properties : they provide access to an observable aggregated model from the events, and they do not manage generic messages (instead, they use the echo protocol messages, which are necessary to replicate events). The aggregated value is the Site.value flow that we have been using throughout the examples. On the other hand, exchanges are a great abstraction for the replication protocol, since they do not care about the type of the underlying events, as they do not try to aggregate them.

Websocket replication

Now that we've seen that the communication protocol is implemented with flows of messages, and that the sync() function can work with generic messages, it becomes clear that it will be easy to replicate sites across different machines, assuming we can transmit the protocol messages. For example, to send messages over websockets, two steps are needed :

  1. Transform the protocol messages to websocket frames
  2. Send the websocket frames through a dedicated websocket library

The echo-ktor-websockets library solves the first problem by offering some encodeToFrame() and decodeToFrame() methods on Exchange :

val remote: Exchange<Frame, Frame> = remote() // obtained through the client or server integrations
val local = mutableSite(identifier, initial, projection)

sync(remote, local.encodeToFrame()) // sync over the network

Internally, the encodeToFrame() and decodeFromFrame() functions make use of Protobufs to efficiently store the messages in binary format, which are then mapped to binary websocket frames.

Server

The echo-ktor-server module provides some server-side integration to serve a websocket-based Exchange over a websocket connection. In fact, building a generic replication server can be done in exactly 10 lines of code:

fun main() {
  val site = exchange()
  embeddedServer(CIO, port = 1234) {
        install(WebSockets)
        routing {
          route("snd") { sender { site.encodeToFrame() } }
          route("rcv") { receiver { site.encodeToFrame() } }
        }
      }
      .start(wait = true)
}

Check the sample out here.

In this example, a generic Exchange, which handles encoded events, is served through the /snd and /rcv endpoints. Note that you could also decide to specify a different SyncStrategy on the exchange, for instance if you'd like to enforce one-shot sync rather than continuous sync.

The sender { } and receiver { } are similar to a webSocket { } block, and let you use the request parameters to decide which instance of Exchange the caller should interact with. This is particularly useful when you want to feature multiple independent collaborative sessions.

Client

The echo-ktor-client modules integrates well with websockets managed by the server library :

private val Client = HttpClient(Js) { 
    install(WebSockets) 
}

val remote = Client.wssExchange(receiver = { url("/snd") }, sender = { url("/rcv") })
val local = mutableSite(identifier, initial, projection)

sync(remote, local.encodeToFrame()) // sync over the network

Check the sample out here.

In this example, an exchange is generated over a secure websocket connection, using an HttpClient. It's also possible to create exchanges over insecure connections using the HttpClient.wsExchange(...) builder.