RXRemote is a module that allows a client to subscribe to RxJs Observables on a remote server. Clients can be either node or browser instances.
On disconnect, RxRemote will attempt to reconnect and restart observables where they left off.
Installing with NPM
$ npm install rxremote
RxRemote provides functionality that is similar to a WebSocketSubject. However, RxRemote adds the concept of cursors to your Observables. This allows RxRemote to handle reconnections transparently to the client application.
server:
import http from 'http';
import ObservablesServer from 'rxremote/observables_server';
const httpServer = http.createServer();
httpServer.listen(5000);
const observablesServer = new ObservablesServer(httpServer, {
counter() {
return Rx.Observable.of(1,2,3);
}
});
client:
import ObservablesClient from 'rxremote/observables_client';
const client = new ObservablesClient('ws://localhost:5000');
const source = client.observable('counter')
const subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
// => Next: 1
// => Next: 2
// => Next: 3
// => Completed
Usually when a disconnection event happens, an error will be emitted on all open observables and it will be up to the client application to resubscribe.
You can have the ObservableClient
handle this resubscription transparently
by structuring your observable to emit objects that look like:
{
cursor: 1 // Some value that can be used to resume your observable
value: 'hello' // The main value object that you are observing
}
A cursor
can be any number, string or JSON-serializable object that your
observable can use to resume where it left off.
For example:
server:
const observablesServer = new ObservablesServer(httpServer, {
counter(cursor) {
return Rx.Observable.interval(1000).map(x => ({
cursor: x,
value: cursor + x
}));
}
});
client:
const client = new ObservablesClient('ws://localhost:5000');
const source = client.observable('counter')
const subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
// => Next: 1
// => Next: 2
// => Next: 3
// -- Network event causes a reconnection
// => Next: 4
// => Next: 5
// => Next: 6
This observable emits text strings suitable for sending to a log file
This observable emits an event object when a connection is open or closed. The objects look like:
{
type: 'string', // 'connection-closed' or 'connection-open',
connectionId: 'number', // a numberic value that is unique to this connection
sessionId: 'string', // a uuid that is generated on the client and reused to call connections
ipAddress: 'string' // The IP address of the remote connection
}
This is a reference to the internal WebSocketServer.
Returns an observable that will marshall subscriptions to the remote server.
If the client in a disconnected state, this will attempt to reconnect. This does nothing if the client already in a connected or connecting state.
This is an observable that emits a true
boolean value when the client is
connected and a false
boolean value when the client is disconnected.
If this client is in a disconnected state, this observable will emit a timestamp that represents when the client will try to make a new connection.
This is a UUID that is generated once per instance of the client VM. It will stay the same for each connection that is established. This is useful for generating "presence" lists of connected clients.
If you're building an RxJs based application in node, you might find these other modules handy:
- rxnotifier - Notification channels backed by redis and/or PostgreSQL
- rxeventstore - Persist and query your data using the Event Sourcing pattern