Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhance]support broker thread lazy creation #4763

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ socket.max.fails | * | 0 .. 1000000 | 1
broker.address.ttl | * | 0 .. 86400000 | 1000 | low | How long to cache the broker address resolving results (milliseconds). <br>*Type: integer*
broker.address.family | * | any, v4, v6 | any | low | Allowed broker IP address families: any, v4, v6 <br>*Type: enum value*
connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info). <br>*Type: integer*
enable.broker.lazy_creation | * | true, false | true | low | When enabled the client will only create broker main thread it needs to communicate with. When disabled the client will create the broker main thread once added. <br>*Type: boolean*
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`. <br>*Type: integer*
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. <br>*Type: integer*
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed. <br>*Type: integer*
Expand Down
72 changes: 72 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,74 @@ static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) {
return ret;
}

/**
* @brief serve to create broker thread once need to
*
*/
static void rd_kafka_broker_thread_create_serve(rd_kafka_t *rk) {
rd_kafka_broker_t *rkb = NULL;

#ifndef _WIN32
sigset_t newset, oldset;
/* Block all signals in newly created thread.
* To avoid race condition we block all signals in the calling
* thread, which the new thread will inherit its sigmask from,
* and then restore the original sigmask of the calling thread when
* we're done creating the thread.
* NOTE: term_sig remains unblocked since we use it on termination
* to quickly interrupt system calls. */
sigemptyset(&oldset);
sigfillset(&newset);
if (rk->rk_conf.term_sig)
sigdelset(&newset, rk->rk_conf.term_sig);
pthread_sigmask(SIG_SETMASK, &newset, &oldset);

#endif

TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_kafka_broker_lock(rkb);

/* For linux, type of rkb_thread is unsigned int
* For windows, type of rkb_thread is void */
#if defined(_WIN32)
if (rkb->rkb_thread != NULL) {
#else
if (rkb->rkb_thread != 0) {
#endif
rd_kafka_broker_unlock(rkb);
continue;
}

if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_INIT ||
(rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT &&
!rd_kafka_terminating(rkb->rkb_rk) &&
!rd_kafka_fatal_error_code(rkb->rkb_rk) &&
(!rkb->rkb_rk->rk_conf.sparse_connections ||
rkb->rkb_persistconn.internal ||
rd_atomic32_get(&rkb->rkb_persistconn.coord) ||
rkb->rkb_ops->rkq_qlen > 0))) {

if (thrd_create(&rkb->rkb_thread,
rd_kafka_broker_process, rkb) != thrd_success) {

rd_kafka_log(rk, LOG_CRIT, "THREAD",
"Unable to create broker thread");

/* Send ERR op back to application for processing. */
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
"Unable to create broker thread");

rd_free(rkb);
}
}
rd_kafka_broker_unlock(rkb);
}

#ifndef _WIN32
/* Restore sigmask of caller */
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif
}

/**
* Main loop for Kafka handler thread.
Expand Down Expand Up @@ -2059,6 +2127,10 @@ static int rd_kafka_thread_main (void *arg) {
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
rd_kafka_cgrp_serve(rk->rk_cgrp);

if (rk->rk_conf.broker_thread_lazy_creation)
rd_kafka_broker_thread_create_serve(rk);

rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
}

Expand Down
46 changes: 26 additions & 20 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -5660,26 +5660,26 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
* the broker thread until we've finalized the rkb. */
rd_kafka_broker_lock(rkb);
rd_kafka_broker_keep(rkb); /* broker thread's refcnt */
if (thrd_create(&rkb->rkb_thread,
rd_kafka_broker_thread_main, rkb) != thrd_success) {
rd_kafka_broker_unlock(rkb);

rd_kafka_log(rk, LOG_CRIT, "THREAD",
"Unable to create broker thread");

/* Send ERR op back to application for processing. */
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
"Unable to create broker thread");

rd_free(rkb);

if (!rkb->rkb_rk->rk_conf.broker_thread_lazy_creation ||
(rkb->rkb_rk->rk_conf.broker_thread_lazy_creation &&
source != RD_KAFKA_LEARNED)) {
if (thrd_create(&rkb->rkb_thread,
rd_kafka_broker_thread_main, rkb) != thrd_success) {
rd_kafka_broker_unlock(rkb);
rd_kafka_log(rk, LOG_CRIT, "THREAD",
"Unable to create broker thread");
/* Send ERR op back to application for processing. */
rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
"Unable to create broker thread");
rd_free(rkb);
#ifndef _WIN32
/* Restore sigmask of caller */
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
/* Restore sigmask of caller */
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif

return NULL;
}
return NULL;
}
}

if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
if (rk->rk_conf.security_protocol ==
Expand Down Expand Up @@ -5715,8 +5715,10 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,


#ifndef _WIN32
/* Restore sigmask of caller */
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
if (!rkb->rkb_rk->rk_conf.broker_thread_lazy_creation) {
/* Restore sigmask of caller */
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
}
#endif

return rkb;
Expand Down Expand Up @@ -6104,6 +6106,10 @@ int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) {
return rd_kafka_brokers_add0(rk, brokerlist);
}

void rd_kafka_broker_process(void* rbk) {
rd_kafka_broker_thread_main(rbk);
}


/**
* @brief Adds a new broker or updates an existing one.
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
const char *errstr);

void rd_kafka_broker_process(void* rbk);

void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);

#define rd_kafka_broker_destroy(rkb) \
Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,13 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"it needs to communicate with. When disabled the client "
"will maintain connections to all brokers in the cluster.",
0, 1, 1 },
{ _RK_GLOBAL, "enable.broker.lazy_creation",
_RK_C_BOOL,
_RK(broker_thread_lazy_creation),
"When enabled the client will only create broker main thread "
"it needs to communicate with. When disabled the client "
"will create the broker main thread once added.",
0, 1, 1},
{ _RK_GLOBAL|_RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT,
_RK(reconnect_jitter_ms),
"No longer used. See `reconnect.backoff.ms` and "
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ struct rd_kafka_conf_s {
int reconnect_jitter_ms;
int connections_max_idle_ms;
int sparse_connections;
int broker_thread_lazy_creation;
int sparse_connect_intvl;
int api_version_request;
int api_version_request_timeout_ms;
Expand Down