From abc8a2d33daf8d47cff815a915971effa4411574 Mon Sep 17 00:00:00 2001 From: Sergey Prokhorov Date: Fri, 22 Nov 2024 19:18:56 +0100 Subject: [PATCH] Use public JSON api instead of private msgpack one See https://github.com/svpcom/wfb-ng/issues/382 --- .github/workflows/build.yml | 2 +- CMakeLists.txt | 5 +- config_osd.json | 50 ++++- src/main.cpp | 8 +- src/wfbcli.cpp | 361 +++++++++++++----------------------- 5 files changed, 181 insertions(+), 245 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bed8293..75c1806 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -56,7 +56,7 @@ jobs: rm -r /boot/* #save space apt-get update apt clean - apt-get install -y cmake g++ git pkg-config librockchip-mpp-dev libcairo-dev libdrm-dev libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev libspdlog-dev nlohmann-json3-dev libmsgpack-dev + apt-get install -y cmake g++ git pkg-config librockchip-mpp-dev libcairo-dev libdrm-dev libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev libspdlog-dev nlohmann-json3-dev apt clean cmake -B build diff --git a/CMakeLists.txt b/CMakeLists.txt index 058d4b7..60f4cf5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,16 +29,15 @@ set(SOURCE_FILES file(GLOB ICONS src/icons/*.png) file(GLOB OSD_CONFIGS *_osd.json) -include_directories("/usr/include/libdrm" "/usr/include/cairo" "/usr/include/spdlog" "/usr/include/msgpack") +include_directories("/usr/include/libdrm" "/usr/include/cairo" "/usr/include/spdlog") configure_file("${PROJECT_NAME}_config.h.in" "${PROJECT_NAME}_config.h") find_package(spdlog REQUIRED) find_package(nlohmann_json 3 REQUIRED) -#find_package(msgpack REQUIRED) # called msgpack-c in Ubunty add_executable(${PROJECT_NAME} ${SOURCE_FILES}) -target_link_libraries(${PROJECT_NAME} rockchip_mpp pthread drm m cairo spdlog::spdlog nlohmann_json::nlohmann_json) # msgpack) +target_link_libraries(${PROJECT_NAME} rockchip_mpp pthread drm m cairo spdlog::spdlog nlohmann_json::nlohmann_json) # Embed gstreamer find_package(PkgConfig REQUIRED) diff --git a/config_osd.json b/config_osd.json index 205da2e..aa5278d 100644 --- a/config_osd.json +++ b/config_osd.json @@ -297,32 +297,64 @@ "name": "Dump raw facts to the scren (remove `--` from `type` to enable)", "type": "---DebugWidget", "x": 10, - "y": -100, + "y": -200, "facts": [ { "name": "mavlink.heartbeet.base_mode.armed" }, { - "name": "mavlink.radio_status.rssi", + "name": "wfbcli.rx.packets.all", "tags": { - "sysid": "3", - "compid": "68" + "id": "video rx" } }, { - "name": "mavlink.gps_raw.lat" + "name": "wfbcli.rx.packets.lost", + "tags": { + "id": "video rx" + } }, { - "name": "mavlink.gps_raw.lon" + "name": "wfbcli.rx.ant_stats.freq", + "tags": { + "id": "video rx", + "ant_id": "0" + } }, { - "name": "mavlink.gps_raw.fix_type" + "name": "wfbcli.rx.ant_stats.mcs", + "tags": { + "id": "video rx", + "ant_id": "0" + } }, { - "name": "mavlink.global_position_int.lat" + "name": "wfbcli.rx.ant_stats.rssi_avg", + "tags": { + "id": "video rx", + "ant_id": "0" + } + }, + { + "name": "wfbcli.rx.ant_stats.freq", + "tags": { + "id": "video rx", + "ant_id": "1" + } }, { - "name": "mavlink.global_position_int.lon" + "name": "wfbcli.rx.ant_stats.mcs", + "tags": { + "id": "video rx", + "ant_id": "1" + } + }, + { + "name": "wfbcli.rx.ant_stats.rssi_avg", + "tags": { + "id": "video rx", + "ant_id": "1" + } } ] }, diff --git a/src/main.cpp b/src/main.cpp index 3860e58..3ed0fc5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -543,6 +543,7 @@ int main(int argc, char **argv) int mp4_fragmentation_mode = 0; uint16_t listen_port = 5600; uint16_t mavlink_port = 14550; + uint16_t wfb_port = 8103; uint16_t mode_width = 0; uint16_t mode_height = 0; uint32_t mode_vrefresh = 0; @@ -693,6 +694,11 @@ int main(int argc, char **argv) continue; } + __OnArgument("--wfb-api-port") { + wfb_port = atoi(__ArgValue); + continue; + } + __OnArgument("--version") { printf("PixelPilot Rockchip %d.%d\n", APP_VERSION_MAJOR, APP_VERSION_MINOR); return 0; @@ -806,7 +812,7 @@ int main(int argc, char **argv) assert(!ret); } wfb_thread_params *wfb_args = (wfb_thread_params *)malloc(sizeof *wfb_args); - wfb_args->port = 8003; // TODO: configurable + wfb_args->port = wfb_port; ret = pthread_create(&tid_wfbcli, NULL, __WFB_CLI_THREAD__, wfb_args); assert(!ret); diff --git a/src/wfbcli.cpp b/src/wfbcli.cpp index e7a3e09..5b16c8c 100644 --- a/src/wfbcli.cpp +++ b/src/wfbcli.cpp @@ -1,11 +1,16 @@ - +/** + * Client for WFB-ng stats API + */ #include #include #include +#include #include #include #include // for close +#include +#include #include #include @@ -16,122 +21,93 @@ #include "wfbcli.hpp" extern "C" { #include "osd.h" -#include } using json = nlohmann::json; -int wfb_thread_signal = 0; +#define SERVER_IP "127.0.0.1" +#define BUFFER_SIZE 10 * 1024 -// Helper function to receive `len` bytes from the socket -bool recv_all(int socket, char* buffer, size_t len) { - size_t total_received = 0; - while (total_received < len) { - ssize_t bytes_received = recv(socket, buffer + total_received, len - total_received, 0); - if (bytes_received <= 0) { - return false; // Connection closed or error - } - total_received += bytes_received; - } - return true; -} - -/* -{'type': 'rx', 'timestamp': 1728342615.1328719, 'id': 'video rx', 'tx_wlan': 0, - 'packets': {'all': (661, 3967), - 'all_bytes': (783901, 4704911), - 'dec_ok': (661, 3967), - 'fec_rec': (28, 271), - 'lost': (0, 32), - 'dec_err': (0, 0), - 'bad': (0, 0), - 'out': (466, 2904), - 'out_bytes': (528304, 3286747)}, - 'rx_ant_stats': { - ((5805, 1, 20), 1): (661, -38, -37, -37, 8, 31, 37), - ((5805, 1, 20), 0): (661, -42, -40, -40, 5, 30, 35)}, - 'session': {'fec_type': 'VDM_RS', - 'fec_k': 8, - 'fec_n': 12, - 'epoch': 0}} -{'type': 'rx', 'timestamp': 1728342614.2870364, 'id': 'tunnel rx', 'tx_wlan': 0, -'packets': {'all': (0, 0), - 'all_bytes': (0, 0), - 'dec_ok': (0, 0), - 'fec_rec': (0, 0), - 'lost': (0, 0), - 'dec_err': (0, 0), - 'bad': (0, 0), - 'out': (0, 0), - 'out_bytes': (0, 0)}, -'rx_ant_stats': {}, -'session': None} - */ +int wfb_thread_signal = 0; -/* -{'type': 'rx', 'timestamp': 1728342615.1976957, 'id': 'mavlink rx', 'tx_wlan': 0, - 'packets': {'all': (24, 104), - 'all_bytes': (11902, 51476), - 'dec_ok': (24, 104), - 'fec_rec': (0, 3), - 'lost': (0, 0), - 'dec_err': (0, 0), - 'bad': (0, 0), - 'out': (12, 55), - 'out_bytes': (5820, 26798)}, - 'rx_ant_stats': {((5805, 1, 20), 1): (24, -38, -38, -38, 20, 28, 36), - ((5805, 1, 20), 0): (24, -42, -40, -40, 18, 27, 34)}, - 'session': {'fec_type': 'VDM_RS', - 'fec_k': 1, - 'fec_n': 2, - 'epoch': 0}} - */ int process_rx(json packet) { void *batch = osd_batch_init(2); auto id = packet.at("id").template get(); - json packets = packet.at("packets"); - osd_tag tags[3]; + osd_tag tags[2]; strcpy(tags[0].key, "id"); strcpy(tags[0].val, id.c_str()); + + json packets = packet.at("packets"); for (auto &[key, val] : packets.items()) { auto total = val.at(1).template get(); osd_add_uint_fact(batch, (std::string("wfbcli.rx.packets.") + key).c_str(), tags, 1, total); } - //json rx_ant_stats = packets.at("rx_ant_stats"); - // for (auto &[key, val] : rx_ant_stats) { ... } - // json session = packets.at("session"); + json rx_ant_stats = packet.at("rx_ant_stats"); + for (auto &obj : rx_ant_stats) { + auto ant_id = obj.at("ant").template get(); + strcpy(tags[1].key, "ant_id"); + strncpy(tags[1].val, std::to_string(ant_id).c_str(), sizeof(tags[1].val)); + + osd_add_uint_fact(batch, "wfbcli.rx.ant_stats.freq", tags, 2, + obj.at("freq").template get()); + osd_add_uint_fact(batch, "wfbcli.rx.ant_stats.mcs", tags, 2, + obj.at("mcs").template get()); + osd_add_uint_fact(batch, "wfbcli.rx.ant_stats.bw", tags, 2, + obj.at("bw").template get()); + + osd_add_uint_fact(batch, "wfbcli.rx.ant_stats.pkt_recv", tags, 2, + obj.at("pkt_recv").template get()); + osd_add_int_fact(batch, "wfbcli.rx.ant_stats.rssi_avg", tags, 2, + obj.at("rssi_avg").template get()); + osd_add_double_fact(batch, "wfbcli.rx.ant_stats.snr_avg", tags, 2, + obj.at("snr_avg").template get()); + } + //json session = packets.at("session"); osd_publish_batch(batch); return 0; } -/* -{'type': 'tx', 'timestamp': 1728342616.1666563, 'id': 'tunnel tx', - 'packets': {'fec_timeouts': (0, 0), - 'incoming': (2, 1055), - 'incoming_bytes': (0, 261), - 'injected': (5, 2576), - 'injected_bytes': (200, 100610), - 'dropped': (0, 0), - 'truncated': (0, 0)}, - 'latency': {255: (5, 0, 28, 74, 134)}, - 'rf_temperature': {0: 42, 1: 42}} - */ int process_tx(json packet) { + void *batch = osd_batch_init(2); + auto id = packet.at("id").template get(); + osd_tag tags[2]; + strcpy(tags[0].key, "id"); + strcpy(tags[0].val, id.c_str()); + + json packets = packet.at("packets"); + for (auto &[key, val] : packets.items()) { + auto total = val.at(1).template get(); + osd_add_uint_fact(batch, (std::string("wfbcli.tx.packets.") + key).c_str(), tags, 1, total); + } + json tx_ant_stats = packet.at("tx_ant_stats"); + for (auto &obj : tx_ant_stats) { + auto ant_id = obj.at("ant").template get(); + strcpy(tags[1].key, "ant_id"); + strncpy(tags[1].val, std::to_string(ant_id).c_str(), sizeof(tags[1].val)); + + osd_add_uint_fact(batch, "wfbcli.tx.ant_stats.pkt_sent", tags, 2, + obj.at("pkt_sent").template get()); + osd_add_uint_fact(batch, "wfbcli.tx.ant_stats.pkt_drop", tags, 2, + obj.at("pkt_drop").template get()); + osd_add_uint_fact(batch, "wfbcli.tx.ant_stats.lat_avg", tags, 2, + obj.at("lat_avg").template get()); + } + //json temp = packets.at("rf_temperature"); + osd_publish_batch(batch); return 0; } -/* -{'type': 'cli_title', - 'cli_title': 'WFB-ng_24.9.26.72534 @gs wlx782288192c76 [default]', - 'is_cluster': False} - */ int process_title(json packet) { - auto title = packet.at("cli_title").template get(); + auto profile = packet.at("profile").template get(); auto is_cluster = packet.at("is_cluster").template get(); + auto channel = packet.at("settings").at("common").at("wifi_channel").template get(); + auto version = packet.at("settings").at("common").at("version").template get(); void *batch = osd_batch_init(2); - osd_add_str_fact(batch, "wfbcli.cli_title", nullptr, 0, title.c_str()); + osd_add_str_fact(batch, "wfbcli.profile", nullptr, 0, profile.c_str()); + osd_add_str_fact(batch, "wfbcli.version", nullptr, 0, version.c_str()); osd_add_bool_fact(batch, "wfbcli.is_cluster", nullptr, 0, is_cluster); + osd_add_uint_fact(batch, "wfbcli.wifi_channel", nullptr, 0, channel); osd_publish_batch(batch); return 0; } @@ -142,168 +118,91 @@ int process_packet(json packet) { process_rx(packet); } else if (type == "tx") { process_tx(packet); - } else if (type == "cli_title") { + } else if (type == "settings") { process_title(packet); } else { SPDLOG_ERROR("Unknown wfbcli packet type {}", type); return -1; } - return 0; + return 0; } -size_t msgpack_object_print(const msgpack_object o) -{ - size_t ret; - - switch (o.type) { - case MSGPACK_OBJECT_NIL: - SPDLOG_DEBUG("nil"); - break; +// Code below is partially generated by chatgpt - case MSGPACK_OBJECT_BOOLEAN: - SPDLOG_DEBUG("Bool {}", (o.via.boolean ? "true" : "false")); - break; +void handle_server_connection(int sock) { + std::string partial_data; // To accumulate incomplete data across reads + char buffer[BUFFER_SIZE] = {0}; - case MSGPACK_OBJECT_POSITIVE_INTEGER: - SPDLOG_DEBUG("Pos int {}", o.via.u64); - break; - - case MSGPACK_OBJECT_NEGATIVE_INTEGER: - SPDLOG_DEBUG("Neg int {}", o.via.i64); - break; - - case MSGPACK_OBJECT_FLOAT32: - case MSGPACK_OBJECT_FLOAT64: - SPDLOG_DEBUG("Float {}", o.via.f64); - break; - - case MSGPACK_OBJECT_STR: - SPDLOG_DEBUG("Str {}, {}", o.via.str.size, o.via.str.ptr); - break; - - case MSGPACK_OBJECT_BIN: - /*if (bytes_contain_zero(&o.via.bin)) { - SPDLOG_DEBUG("the value contains zero"); - return -1; - }*/ - SPDLOG_DEBUG("Bin {} {}", (int)o.via.bin.size, o.via.bin.ptr); - break; - - case MSGPACK_OBJECT_EXT: - SPDLOG_DEBUG("not support type: MSGPACK_OBJECT_EXT"); - return -1; - - case MSGPACK_OBJECT_ARRAY: - SPDLOG_DEBUG("ARRAY"); - /*PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, "["); - if (o.via.array.size != 0) { - msgpack_object *p = o.via.array.ptr; - msgpack_object *const pend = o.via.array.ptr + o.via.array.size; - PRINT_JSONSTR_CALL(ret, msgpack_object_print_jsonstr, aux_buffer, aux_buffer_size, *p); - ++p; - for (; p < pend; ++p) { - PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, ","); - PRINT_JSONSTR_CALL(ret, msgpack_object_print_jsonstr, aux_buffer, aux_buffer_size, *p); - } - } - PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, "]");*/ - break; - - case MSGPACK_OBJECT_MAP: - SPDLOG_DEBUG("map"); - /*PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, "{"); - if (o.via.map.size != 0) { - msgpack_object_kv *p = o.via.map.ptr; - msgpack_object_kv *const pend = o.via.map.ptr + o.via.map.size; - - for (; p < pend; ++p) { - if (p->key.type != MSGPACK_OBJECT_STR) { - DEBUG("the key of in a map must be string.\n"); - return -1; - } - if (p != o.via.map.ptr) { - PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, ","); - } - PRINT_JSONSTR_CALL(ret, msgpack_object_print_jsonstr, aux_buffer, aux_buffer_size, p->key); - PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, ":"); - PRINT_JSONSTR_CALL(ret, msgpack_object_print_jsonstr, aux_buffer, aux_buffer_size, p->val); - } - } - PRINT_JSONSTR_CALL(ret, snprintf, aux_buffer, aux_buffer_size, "}"); - break;*/ - - default: - SPDLOG_DEBUG("unknown type"); - return -1; - } + while (!wfb_thread_signal) { + ssize_t bytes_read = recv(sock, buffer, BUFFER_SIZE - 1, 0); + if (bytes_read <= 0) { + SPDLOG_ERROR("Server disconnected or error occurred"); + close(sock); + return; + } - return 0; + buffer[bytes_read] = '\0'; // Null-terminate the received data + partial_data += buffer; // Append to the accumulated data + + // Process each complete line + size_t newline_pos; + while ((newline_pos = partial_data.find('\n')) != std::string::npos) { + std::string line = partial_data.substr(0, newline_pos); + partial_data.erase(0, newline_pos + 1); // Remove the processed line + + try { + // Parse the line as JSON + nlohmann::json parsed_json = nlohmann::json::parse(line); + process_packet(parsed_json); + } catch (const nlohmann::json::parse_error &e) { + SPDLOG_ERROR("Failed to parse JSON: {}", e.what()); + } + } + } } -void parse_msg(char *buf, uint size) { - msgpack_zone z; - msgpack_object obj; +// Function to reconnect to the server in a loop with retries +int reconnect_to_server(int port) { + while (!wfb_thread_signal) { + SPDLOG_DEBUG("Attempting to connect to WFB API server..."); + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + perror("Socket creation failed"); + } else { + struct sockaddr_in server_address; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port); + + if (inet_pton(AF_INET, SERVER_IP, &server_address.sin_addr) > 0) { + if (connect(sock, (struct sockaddr *)&server_address, sizeof(server_address)) == 0) { + SPDLOG_DEBUG("Successfully connected to WFB API server."); + return sock; + } else { + SPDLOG_ERROR("Connection failed"); + } + } else { + SPDLOG_ERROR("Invalid address/Address not supported"); + } - msgpack_zone_init(&z, size); - msgpack_unpack(buf, size, NULL, &z, &obj); - msgpack_object_print(obj); - msgpack_zone_destroy(&z); + close(sock); // Clean up the socket if connection fails + } + + SPDLOG_WARN("Reconnection failed. Retrying in 1 second"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } } void *__WFB_CLI_THREAD__(void *param) { wfb_thread_params *p = (wfb_thread_params *)param; pthread_setname_np(pthread_self(), "__WFB_CLI"); - int client_socket; - struct sockaddr_in server_address; - uint32_t size; - char buf[128 * 1024]; - - if ((client_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - spdlog::error("Socket creation failed {}", strerror(errno)); - return 0; - } - - server_address.sin_family = AF_INET; - server_address.sin_port = htons(p->port); - - // Convert IPv4 and IPv6 addresses from text to binary form - if (inet_pton(AF_INET, "127.0.0.1", &server_address.sin_addr) <= 0) { - spdlog::error("Invalid address or Address not supported {}", strerror(errno)); - close(client_socket); - return 0; - } - - // Connect to the server - if (connect(client_socket, reinterpret_cast(&server_address), sizeof(server_address)) < 0) { - spdlog::error("Connection failed", strerror(errno)); - close(client_socket); - return 0; - } - try { - while (!wfb_thread_signal) { - if (!recv_all(client_socket, reinterpret_cast(&size), sizeof(size))) { - throw std::runtime_error("Failed to read size prefix or connection closed"); - } - - // Convert the size from network byte order to host byte order - size = ntohl(size); + while (!wfb_thread_signal) { + int sock = reconnect_to_server(p->port); + handle_server_connection(sock); + // If we return from handle_server_connection, the server is disconnected + } - if (size > sizeof(buf)) { - throw std::runtime_error("WFB stats packet is too large"); - } - - if (!recv_all(client_socket, buf, size)) { - throw std::runtime_error("Failed to read size prefix or connection closed"); - } - parse_msg(buf, size); - json as_json = json::from_msgpack(buf, buf + size, false, true); - process_packet(as_json); - } - } catch (const std::exception &e) { - spdlog::error("WFB stats error: {}", e.what()); - } - close(client_socket); spdlog::info("WFB_CLI thread done."); return nullptr; }