Skip to content

Commit

Permalink
test: registry store (#297)
Browse files Browse the repository at this point in the history
* test: jwt secure

* test: service registry

* test: more edge case with service registry

* test: gateway store
  • Loading branch information
giangndm authored May 28, 2024
1 parent 465e4a0 commit e856cb8
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ We are actively refactoring entire media server and network stack with [sans-io-
| MoQ | Media-over-Quic ||
| Monitoring | Dashboard for monitoring ||
| Recording | Record stream ||
| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | |
| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | 🚧 |
| Connector | External event handling ||

Status:
Expand Down
131 changes: 122 additions & 9 deletions packages/media_gateway/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use self::service::ServiceStore;

mod service;

#[derive(Debug)]
const MAX_MEMORY_USAGE: u8 = 80;
const MAX_DISK_USAGE: u8 = 90;

#[derive(Debug, PartialEq)]
pub struct PingEvent {
pub cpu: u8,
pub memory: u8,
Expand Down Expand Up @@ -52,8 +55,8 @@ impl GatewayStore {

pub fn on_ping(&mut self, now: u64, from: u32, ping: PingEvent) {
log::debug!("[GatewayStore] on ping from {from} data {:?}", ping);
let node_usage = node_usage(&ping, 80, 90);
let webrtc_usage = webrtc_usage(&ping, 80, 90);
let node_usage = node_usage(&ping);
let webrtc_usage = webrtc_usage(&ping);
match ping.origin {
Origin::Media(_) => match (node_usage, webrtc_usage, ping.webrtc) {
(Some(_node), Some(webrtc), Some(stats)) => self.webrtc.on_node_ping(now, from, webrtc, stats),
Expand Down Expand Up @@ -85,27 +88,137 @@ impl GatewayStore {
}
}

fn node_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option<u8> {
if ping.memory >= max_memory {
fn node_usage(ping: &PingEvent) -> Option<u8> {
if ping.memory >= MAX_MEMORY_USAGE {
return None;
}

if ping.disk >= max_disk {
if ping.disk >= MAX_DISK_USAGE {
return None;
}

Some(ping.cpu)
}

fn webrtc_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option<u8> {
if ping.memory >= max_memory {
fn webrtc_usage(ping: &PingEvent) -> Option<u8> {
if ping.memory >= MAX_MEMORY_USAGE {
return None;
}

if ping.disk >= max_disk {
if ping.disk >= MAX_DISK_USAGE {
return None;
}

let webrtc = ping.webrtc.as_ref()?;
webrtc.active.then(|| ping.cpu.max(((webrtc.live * 100) / webrtc.max) as u8))
}

#[cfg(test)]
mod tests {
use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, GatewayOrigin, MediaOrigin, Origin, ServiceStats};

use crate::ServiceKind;

use super::{GatewayStore, PingEvent};

#[test]
fn local_ping() {
let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 });
store.on_ping(
0,
1,
PingEvent {
cpu: 0,
memory: 0,
disk: 0,
origin: Origin::Media(MediaOrigin {}),
webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }),
},
);

assert_eq!(store.best_for(ServiceKind::Webrtc, None), Some(1));

assert_eq!(store.pop_output(), None);
store.on_tick(100);
assert_eq!(
store.pop_output(),
Some(PingEvent {
cpu: 0,
memory: 0,
disk: 0,
origin: Origin::Gateway(GatewayOrigin {
location: Some(Location { lat: 1.0, lon: 1.0 }),
zone: 0,
}),
webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }),
})
);
}

#[test]
fn local_reject_max_usage() {
let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 });
store.on_ping(
0,
1,
PingEvent {
cpu: 10,
memory: 80,
disk: 20,
origin: Origin::Media(MediaOrigin {}),
webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }),
},
);

store.on_ping(
0,
2,
PingEvent {
cpu: 10,
memory: 20,
disk: 90,
origin: Origin::Media(MediaOrigin {}),
webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }),
},
);

assert_eq!(store.best_for(ServiceKind::Webrtc, None), None);
}

#[test]
fn remote_ping() {
let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 });
store.on_ping(
0,
257,
PingEvent {
cpu: 0,
memory: 0,
disk: 0,
origin: Origin::Gateway(GatewayOrigin {
location: Some(Location { lat: 2.0, lon: 2.0 }),
zone: 256,
}),
webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }),
},
);

assert_eq!(store.best_for(ServiceKind::Webrtc, None), Some(257));

assert_eq!(store.pop_output(), None);
store.on_tick(100);
assert_eq!(
store.pop_output(),
Some(PingEvent {
cpu: 0,
memory: 0,
disk: 0,
origin: Origin::Gateway(GatewayOrigin {
location: Some(Location { lat: 1.0, lon: 1.0 }),
zone: 0,
}),
webrtc: None,
})
);
}
}
113 changes: 111 additions & 2 deletions packages/media_gateway/src/store/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ impl ServiceStore {
if n.stats.active {
stats.active = true;
}
stats.live = n.stats.live;
stats.max = n.stats.max;
stats.live += n.stats.live;
stats.max += n.stats.max;
}

Some(stats)
Expand Down Expand Up @@ -214,3 +214,112 @@ fn distance(node1: &Location, node2: &Location) -> f32 {
//TODO make it more accuracy
((node1.lat - node2.lat).powi(2) + (node1.lon - node2.lon).powi(2)).sqrt()
}

#[cfg(test)]
mod tests {
use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, ServiceStats};

use crate::{store::service::PING_TIMEOUT, ServiceKind};

use super::ServiceStore;

#[test]
fn empty_store() {
let store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 });
assert_eq!(store.best_for(None), None);
assert_eq!(store.best_for(Some(Location { lat: 1.0, lon: 1.0 })), None);

assert_eq!(store.local_stats(), None);
}

#[test]
fn local_store() {
let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 });

store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true });
store.on_node_ping(0, 2, 50, ServiceStats { live: 60, max: 1000, active: true });

//should got lowest usage
assert_eq!(store.best_for(None), Some(2));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(2));
assert_eq!(store.local_stats(), Some(ServiceStats { live: 160, max: 2000, active: true }));

//after node2 increase usage should fallback to node1
store.on_node_ping(0, 2, 61, ServiceStats { live: 120, max: 1000, active: true });

assert_eq!(store.best_for(None), Some(1));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(1));

//after remove should fallback to remain
store.remove_node(1);

assert_eq!(store.best_for(None), Some(2));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(2));
}

#[test]
fn remote_zones_store() {
let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 });

store.on_gateway_ping(0, 256, 256, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true });
store.on_gateway_ping(0, 256, 257, 50, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true });

//should got lowest usage gateway node
assert_eq!(store.best_for(None), Some(257));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257));

//after gateway 257 increase usage should switch to 256
store.on_gateway_ping(0, 256, 257, 65, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true });

assert_eq!(store.best_for(None), Some(256));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(256));

//should fallback to remain gateway
store.remove_gateway(256, 256);

assert_eq!(store.best_for(None), Some(257));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257));
}

#[test]
fn local_and_remote_zones() {
let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 });

store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true });
store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true });

//should got local zone if don't provide location
assert_eq!(store.best_for(None), Some(1));

//should got closest zone gaetway
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257));

//after remove local should fallback to other zone
store.remove_node(1);

assert_eq!(store.best_for(None), Some(257));
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), Some(257));

//after remove other zone should return None
store.remove_gateway(256, 257);

assert_eq!(store.best_for(None), None);
assert_eq!(store.best_for(Some(Location { lat: 2.0, lon: 2.0 })), None);
}

#[test]
fn clear_timeout() {
let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 });

store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true });
store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true });

assert_eq!(store.local_sources.len(), 1);
assert_eq!(store.zone_sources.len(), 1);

store.on_tick(PING_TIMEOUT);

assert_eq!(store.local_sources.len(), 0);
assert_eq!(store.zone_sources.len(), 0);
}
}
Loading

0 comments on commit e856cb8

Please sign in to comment.