Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lacrymology/messaging #139

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
64dd049
use common-config collection names instead of hardcoded values
Lacrymology Oct 15, 2013
e22102e
save new sticky messages in the db and delete them when clear
Lacrymology Oct 15, 2013
58b2d7d
use message model instead of hardcoded object
Lacrymology Oct 18, 2013
67de678
export error codes constants for ease of use
Lacrymology Oct 18, 2013
becec38
send message on outOfSync
Lacrymology Oct 23, 2013
9ce7782
have the mongo driver filter out inexisting clips, and notify them
Lacrymology Oct 26, 2013
66d736e
more natural arguments order
Lacrymology Oct 26, 2013
b31e274
started playing message code
Lacrymology Oct 26, 2013
d964696
send 'started playing' message to caspa
Lacrymology Oct 26, 2013
774818b
send 'file not found' message to caspa
Lacrymology Oct 26, 2013
9526d69
make target that runs a node shell with the mosto environment
Lacrymology Oct 29, 2013
e80cda0
fix the "sticky" message implementation
Lacrymology Oct 29, 2013
2d02176
better event name
Lacrymology Oct 30, 2013
7ed6aa1
all messages are saved now
Lacrymology Nov 1, 2013
7df8c59
save end-date for fixed errors
Lacrymology Nov 1, 2013
bd009b0
bugfix
Lacrymology Nov 4, 2013
a11af33
bugfix
Lacrymology Nov 6, 2013
283ab49
create a blank clip when a file is not found instead of just dropping it
Lacrymology Nov 4, 2013
5f732d5
replace broken medias for an equivalent amount of blank medias
Lacrymology Nov 11, 2013
67aec45
keep a list of "active" messages
Lacrymology Nov 16, 2013
9c7a595
add a reference to status messages. [code, reference] pairs are kept …
Lacrymology Nov 16, 2013
28325c5
remove messages based on code and reference
Lacrymology Nov 16, 2013
617ab57
don't delete the mostomessages collection
Lacrymology Nov 16, 2013
40353b7
send message on melted disconnect
Lacrymology Nov 16, 2013
8516a21
relay events from melted-node
Lacrymology Nov 16, 2013
a9406bd
bugfix
Lacrymology Nov 16, 2013
2239856
fix: fix the failing message for every broken media in a playlist
Lacrymology Nov 18, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ debug-test: videos melted-check
clean-test:
rm ${TEST_VIDEOS} ${TEST_XMLS}

node:
${NODE}
15 changes: 14 additions & 1 deletion drivers/mvcp/melted-node-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,29 @@ var melted_node = require('melted-node')
, logger = mbc.logger().addLogger('MELTED-NODE-DRIVER')
, melted_log = mbc.logger().addLogger('MELTED-NODE')
, uuid = require('node-uuid')
, events = require('events')
, util = require('util')
;

function melted(host, port, timeout) {
this.uuid = uuid.v4();
logger.debug(this.uuid + " - Creating server instance [" + host + ":" + port + "]");
this.mlt = new melted_node(host, port, melted_log, timeout);
var self = this;
['response-timeout', 'command-error', 'command-response', 'start-connection', 'connected', 'disconnected', 'connection-error', 'reconnect', 'disconnect'].forEach(function(event) {
self.mlt.on(event, function() {
arguments = Array.prototype.slice.call(arguments, 0);
arguments.splice(0,0, event);
self.emit.apply(self, arguments);
});
});
this.commandQueue = Q.resolve();
logger.debug(this.uuid + " - Server instance created [" + this.mlt.host + ":" + this.mlt.port + "]");
events.EventEmitter.call(this);
}

util.inherits(melted, events.EventEmitter)

melted.prototype._sendCommand = function(command) {
logger.debug(this.uuid + " - Sending command: " + command);
return this.mlt.sendCommand(command);
Expand Down Expand Up @@ -115,7 +128,7 @@ melted.prototype.getServerStatus = function() {
}
var err = new Error(self.uuid + " - Error getting server status in response object: " + response)
throw (err);
}).fail(function() {
}).fail(function(error) {
var err = new Error(self.uuid + " - Error getting server status: " + error);
logger.error(err.message);
throw err;
Expand Down
46 changes: 31 additions & 15 deletions drivers/playlists/mongo-driver.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var mbc = require('mbc-common')
, g_config = mbc.config.Mosto.General
, config = mbc.config.Mosto.Mongo
, mubsub = require("mubsub")
, moment = require("moment")
Expand All @@ -7,7 +8,9 @@ var mbc = require('mbc-common')
, util = require ('util')
, logger = mbc.logger().addLogger('MONGO-DRIVER')
, models = require('../../models/Mosto')
, _ = require('underscore');
, _ = require('underscore')
, fs = require('fs')
;

function drop_err(callback, err_handler) {
return function(err,v) {
Expand Down Expand Up @@ -158,22 +161,35 @@ mongo_driver.prototype.createPlaylist = function(sched, callback) {
var playlist_id = (sched._id.toHexString && sched._id.toHexString()) || sched._id;

var medias = [];
var get_length = function(l) {
var length = moment(l, "HH:mm:ss.SSS");
return moment.duration({
hours: length.hours(),
minutes: length.minutes(),
seconds: length.seconds(),
milliseconds: length.milliseconds(),
})
};
pieces.forEach(function(block, order) {
var block_id = (block._id.toHexString && block._id.toHexString()) || block._id;
var orig_order = order;
var clip_name = block.name;
var media = {};
media.id = (block._id.toHexString && block._id.toHexString()) || block._id;
media.orig_order = order;
media.name = block.name;
// TODO: don't know what goes in type
var type = "default";
var file = block.file;
var length = moment(block.durationraw, "HH:mm:ss.SSS");
var fps = block.video.fps;
medias.push({"id": block_id, "orig_order": orig_order, "playlist_id": playlist_id, "name": clip_name, "type": type, "file": file,
"length": moment.duration({
hours: length.hours(),
minutes: length.minutes(),
seconds: length.seconds(),
milliseconds: length.milliseconds(),
}), "fps": parseFloat(fps)});
media.type = "default";
if(!fs.existsSync(block.file)) {
media.file = g_config.blank;
media.broken = block.file;
}
else {
media.file = block.file;
}
media.length = get_length(block.durationraw);
media.fps = block.video.fps;
medias.push(media);
if(media.broken) {
self.emit('file-not-found', media);
}
});

var playlist = new models.Playlist({"id": playlist_id, "name": name, "start": startDate, "end": endDate, "mode": "snap", "medias": medias});
Expand Down
98 changes: 79 additions & 19 deletions drivers/status/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ var events = require('events');
var util = require('util');
var _ = require('underscore');
var mbc = require('mbc-common');
var Models = require('mbc-common/models/App')
var Collections = mbc.config.Common.Collections;
var logger = mbc.logger().addLogger('PUBSUB-DRIVER');
var uuid = require('node-uuid');
var Q = require('q');
var assert = require('assert');

var defaults = { // copied from caspa/models.App.Status
_id: 2,
Expand All @@ -27,17 +32,12 @@ var defaults = { // copied from caspa/models.App.Status
on_air: false,
};

function MostoMessage(value, description, message) {
this.value = value;
this.description = description;
this.message = message;
}

function CaspaDriver() {
events.EventEmitter.call(this);
var self = this;
this.status = _.clone(defaults);
this.db = mbc.db();
this.messagesCollection = this.db.collection(Collections.Mostomessages);
this.publisher = mbc.pubsub();
}
util.inherits(CaspaDriver, events.EventEmitter);
Expand All @@ -58,7 +58,7 @@ CaspaDriver.prototype.setupAll = function() {

CaspaDriver.prototype.setupStatus = function(callback) {
var self = this;
var col = this.db.collection('status');
var col = this.db.collection(Collections.Status);
col.findOne({_id: 2}, function(err, res) {
if( err )
// err.. do something?
Expand All @@ -76,8 +76,27 @@ CaspaDriver.prototype.setupStatus = function(callback) {
};

CaspaDriver.prototype.setupMessages = function(callback) {

// I think we should assume at init there's no sticky errors?
this.db.collection('mostomessages').remove(callback);
Q.denodeify(this.messagesCollection.findItems)({status: "failing"}).then(function(messages) {
messages.forEach(function(message) {
message.status = 'fixed';
message.end = moment().valueOf();
});
return Q.denodeify(this.messagesCollection.save)(messages)
}).then(callback);

this.activeMessages = new Models.MessagesCollection();
this.activeMessages.on('add', function(message) {
if(message.get('status') != 'failing') {
this.remove(message);
}
});
this.activeMessages.on('change:status', function(message, value) {
if(value != 'failing') {
this.remove(message);
}
});
};

CaspaDriver.prototype.setStatus = function(meltedStatus) {
Expand Down Expand Up @@ -177,21 +196,62 @@ CaspaDriver.prototype.publish = function(channel, status) {
this.publisher.publishJSON(channel, status);
};

CaspaDriver.prototype.publishMessage = function(code, description, message, sticky) {
message = new MostoMessage(code, description, message);
var method = 'emit';
if( sticky ) {
// I create an id with the timestamp to be able to cancel the error afterwards
message.stickId = (new moment()).valueOf();
method = 'create';
/*
Publishes a message through redis. If the message code is considered an
ongoing error (such as mosto connectivity errors), it's saved to the database
*/
CaspaDriver.prototype.publishMessage = function(code, message, description, reference) {
var status = {};
(code !== undefined) && (status.code = code);
description && (status.description = description);
message && (status.message = message);
reference && (status.reference = reference);

var existing = this.activeMessages.findWhere(status);
if(existing) {
// don't publish the same message twice
return existing;
}

status = new Models.MostoMessage(status);
var method = 'create';
status.set('_id', uuid());
this.messagesCollection.save(status.toJSON(), {safe:false});
this.activeMessages.add(status);
this.publisher.publishJSON(["mostoMessage", method].join('.'),
{ model: message });
return message;
{ model: status.toJSON() });
return status;
};

CaspaDriver.prototype.dropMessage = function(message) {
this.publisher.publish("mostoMessage.delete", { model: message });
CaspaDriver.prototype.CODES = {
BLANK: 201,
SYNC: 202,
PLAY: 203,
MELTED_CONN: 501,
FILE_NOT_FOUND: 502,
};

/*
updates the model in the database setting status='fixed' and returns a
promise that resolves once the object is updated in the database, and the
signal is published through redis
*/
CaspaDriver.prototype.dropMessage = function(code, reference) {
var self = this;
var message = this.activeMessages.findWhere({ code: code, reference: reference });
if(!message)
return Q.resolve(false);
message.set('status', 'fixed');
assert(!this.activeMessages.get(message));
message.set('end', moment().valueOf());
var mobj = message.toJSON();
return Q.ninvoke(this.messagesCollection, 'findById', mobj._id).then(function() {
return Q.ninvoke(self.messagesCollection, 'update', {_id: mobj._id}, mobj);
}).then(function() {
self.publisher.publishJSON("mostoMessage.update", { model: mobj });
}).then(function() {
return true;
});
};

exports = module.exports = function() {
Expand Down
6 changes: 6 additions & 0 deletions heartbeats.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ heartbeats.prototype.startMvcpServer = function(callback) {
var result = self.server.initServer();
result.then(function() {
logger.info("MVCP server started");
self.server.on('reconnect', function(had_error) {
self.emit('melted-disconnected');
});
self.server.on('connected', function() {
self.emit('melted-connected');
});
if (callback !== undefined) {
callback();
}
Expand Down
14 changes: 13 additions & 1 deletion models/Mosto.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ Mosto.MeltedCollection = Backbone.Collection.extend({
},

initMvcpServer: function() {
var self = this;
this.driver.on('reconnect', function(had_error) {
self.trigger('melted-disconnected');
});
this.driver.on('connected', function() {
self.trigger('melted-connected');
});
return this.driver.initServer();
},

Expand Down Expand Up @@ -229,6 +236,11 @@ Mosto.MeltedCollection = Backbone.Collection.extend({
m.push.apply(m, self.getBlankMedias(get(media, 'end'), options.until));
}
}
if( get(media, 'broken') ) {
/* finally, if this is a "file not found" clip, replace it by blanks */
logger.warn("Clip is missing. Replacing by blanks");
m.splice.apply(m, [i, 1].concat(self.getBlankMedias(get(media, 'start'), get(media, 'end'))));
}
});
}
}
Expand Down Expand Up @@ -259,7 +271,7 @@ Mosto.MeltedCollection = Backbone.Collection.extend({
logger.debug("Playlist cleaned");});
});
// });

var expected = self.getExpectedMedia();

var addClip = function(media) {
Expand Down
35 changes: 35 additions & 0 deletions mosto.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ mosto.prototype.initDriver = function() {
self.playlists.removePlaylist(id);
});

this.pl_driver.on('file-not-found', function(media) {
var error = self.status_driver.publishMessage(self.status_driver.CODES.FILE_NOT_FOUND, JSON.stringify(media), undefined, media.file);
});

this.playlists.on('remove:playlists', function(playlist) {
var broken = playlist.get('medias').filter(function(m) { return m.get("broken") });
broken.forEach(function(model) {
if(model.get('broken')) {
// a broken file was removed, drop messages regarding it
self.status_driver.dropMessage(self.status_driver.CODES.FILE_NOT_FOUND, model.get('broken'));
}
});
});

this.playlists.on('melted-disconnected:melted_medias', function() {
self.status_driver.publishMessage(self.status_driver.CODES.MELTED_CONN, "Connection with melted lost", undefined, "playlists");
});
this.playlists.on('melted-connected:melted_medias', function() {
self.status_driver.dropMessage(self.status_driver.CODES.MELTED_CONN, 'playlists');
});

self.pl_driver.start();
};

Expand Down Expand Up @@ -181,6 +202,20 @@ mosto.prototype.initHeartbeats = function() {
self.emit('playing');
});

self.on('playing', function() {
self.status_driver.publishMessage(self.status_driver.CODES.PLAY);
});
self.heartbeats.on('outOfSync', function() {
self.status_driver.publishMessage(self.status_driver.CODES.SYNC);
});

self.heartbeats.on('melted-disconnected', function() {
self.status_driver.publishMessage(self.status_driver.CODES.MELTED_CONN, "Connection with melted lost", undefined, "heartbeats");
});
self.heartbeats.on('melted-connected', function() {
self.status_driver.dropMessage(self.status_driver.CODES.MELTED_CONN, 'heartbeats');
});

self.heartbeats.init();
};

Expand Down