Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msgq: optimize queue size #570

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint, bool check_en
}

q = new msgq_queue_t;
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE, true);
if (r != 0){
return r;
}
Expand Down
14 changes: 13 additions & 1 deletion messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "cereal/messaging/msgq.h"


void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);
}
Expand Down Expand Up @@ -83,7 +84,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
return;
}

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate) {
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR2, sigusr2_handler);

Expand All @@ -105,6 +106,17 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
close(fd);
return -1;
}

if (preallocate && (std::getenv("MSGQ_PREALLOCATE") != nullptr)) {
do {
rc = fallocate(fd, 0, 0, size + sizeof(msgq_header_t));
} while (rc == EINTR);
if (rc < 0){
close(fd);
return -1;
}
}

char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);

Expand Down
2 changes: 1 addition & 1 deletion messaging/msgq.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ int msgq_msg_init_size(msgq_msg_t *msg, size_t size);
int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size);
int msgq_msg_close(msgq_msg_t *msg);

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size);
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size, bool preallocate = false);
void msgq_close_queue(msgq_queue_t *q);
void msgq_init_publisher(msgq_queue_t * q);
void msgq_init_subscriber(msgq_queue_t * q);
Expand Down
16 changes: 15 additions & 1 deletion services.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,18 @@ def build_header():


if __name__ == "__main__":
print(build_header())
#print(build_header())

# get ms
import capnp
from cereal import log
for k, v in SERVICE_LIST.items():
sz = None
dat = log.Event.new_message()
try:
dat.init(k)
sz = dat.total_size.word_count*8
except capnp.lib.capnp.KjException:
# TODO: lists
pass
print(k.ljust(30), sz)
Loading