From 07bfca1c97a18ef6fad3ea3e1582effb5e4971ca Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Thu, 11 Jan 2024 19:52:24 -0800 Subject: [PATCH 1/5] msgq: pre-allocate full queue size --- messaging/msgq.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index af93bbf60..abbefe73f 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -105,6 +105,13 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ close(fd); return -1; } + + rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); + if (rc < 0){ + close(fd); + return -1; + } + char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); close(fd); From bb72bd408918baec139422e37232361fa6de63e3 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Mon, 15 Jan 2024 15:49:59 -0800 Subject: [PATCH 2/5] optional --- messaging/msgq.cc | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index abbefe73f..0570ed4f4 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -25,6 +25,10 @@ #include "cereal/messaging/msgq.h" + +const bool preallocate = std::getenv("MSGQ_PREALLOCATE") != nullptr; + + void sigusr2_handler(int signal) { assert(signal == SIGUSR2); } @@ -106,10 +110,12 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ return -1; } - rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); - if (rc < 0){ - close(fd); - return -1; + if (preallocate) { + rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); + if (rc < 0){ + close(fd); + return -1; + } } char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); From 18aca47b44bbb348a6e2ca21c84bb54e77c64a11 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Mon, 15 Jan 2024 16:11:25 -0800 Subject: [PATCH 3/5] try this --- messaging/impl_msgq.cc | 2 +- messaging/msgq.cc | 7 ++----- messaging/msgq.h | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index 8f2c10a08..c27f073a1 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -155,7 +155,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en } q = new msgq_queue_t; - int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); + int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE, true); if (r != 0){ return r; } diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 0570ed4f4..f7fa09215 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -26,9 +26,6 @@ #include "cereal/messaging/msgq.h" -const bool preallocate = std::getenv("MSGQ_PREALLOCATE") != nullptr; - - void sigusr2_handler(int signal) { assert(signal == SIGUSR2); } @@ -87,7 +84,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ return; } -int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ +int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate) { assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes std::signal(SIGUSR2, sigusr2_handler); @@ -110,7 +107,7 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ return -1; } - if (preallocate) { + if (preallocate && (std::getenv("MSGQ_PREALLOCATE") != nullptr)) { rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); if (rc < 0){ close(fd); diff --git a/messaging/msgq.h b/messaging/msgq.h index 0a72a3864..c9e407170 100644 --- a/messaging/msgq.h +++ b/messaging/msgq.h @@ -57,7 +57,7 @@ int msgq_msg_init_size(msgq_msg_t *msg, size_t size); int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size); int msgq_msg_close(msgq_msg_t *msg); -int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size); +int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate = false); void msgq_close_queue(msgq_queue_t *q); void msgq_init_publisher(msgq_queue_t * q); void msgq_init_subscriber(msgq_queue_t * q); From e65f59609eca74d3c1af67f26ca85140175b1a37 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Mon, 15 Jan 2024 16:22:29 -0800 Subject: [PATCH 4/5] try this --- messaging/msgq.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/messaging/msgq.cc b/messaging/msgq.cc index f7fa09215..dc87cb808 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -108,7 +108,9 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preall } if (preallocate && (std::getenv("MSGQ_PREALLOCATE") != nullptr)) { - rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); + do { + rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); + } while (rc == EINTR); if (rc < 0){ close(fd); return -1; From 441715205b61c488357baac294404e71e0f78345 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Wed, 17 Jan 2024 13:10:39 -0800 Subject: [PATCH 5/5] msgq: optimize queue size --- services.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/services.py b/services.py index 6436e3181..5a24c0125 100755 --- a/services.py +++ b/services.py @@ -126,4 +126,18 @@ def build_header(): if __name__ == "__main__": - print(build_header()) + #print(build_header()) + + # get ms + import capnp + from cereal import log + for k, v in SERVICE_LIST.items(): + sz = None + dat = log.Event.new_message() + try: + dat.init(k) + sz = dat.total_size.word_count*8 + except capnp.lib.capnp.KjException: + # TODO: lists + pass + print(k.ljust(30), sz)