diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index 8f2c10a08..c27f073a1 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -155,7 +155,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en } q = new msgq_queue_t; - int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); + int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE, true); if (r != 0){ return r; } diff --git a/messaging/msgq.cc b/messaging/msgq.cc index af93bbf60..dc87cb808 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -25,6 +25,7 @@ #include "cereal/messaging/msgq.h" + void sigusr2_handler(int signal) { assert(signal == SIGUSR2); } @@ -83,7 +84,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ return; } -int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ +int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate) { assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes std::signal(SIGUSR2, sigusr2_handler); @@ -105,6 +106,17 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ close(fd); return -1; } + + if (preallocate && (std::getenv("MSGQ_PREALLOCATE") != nullptr)) { + do { + rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t)); + } while (rc == EINTR); + if (rc < 0){ + close(fd); + return -1; + } + } + char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); close(fd); diff --git a/messaging/msgq.h b/messaging/msgq.h index 0a72a3864..c9e407170 100644 --- a/messaging/msgq.h +++ b/messaging/msgq.h @@ -57,7 +57,7 @@ int msgq_msg_init_size(msgq_msg_t *msg, size_t size); int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size); int msgq_msg_close(msgq_msg_t *msg); -int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size); +int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate = false); void msgq_close_queue(msgq_queue_t *q); void msgq_init_publisher(msgq_queue_t * q); void msgq_init_subscriber(msgq_queue_t * q); diff --git a/services.py b/services.py index 6436e3181..5a24c0125 100755 --- a/services.py +++ b/services.py @@ -126,4 +126,18 @@ def build_header(): if __name__ == "__main__": - print(build_header()) + #print(build_header()) + + # get ms + import capnp + from cereal import log + for k, v in SERVICE_LIST.items(): + sz = None + dat = log.Event.new_message() + try: + dat.init(k) + sz = dat.total_size.word_count*8 + except capnp.lib.capnp.KjException: + # TODO: lists + pass + print(k.ljust(30), sz)