diff --git a/CONFIGURATION.md b/CONFIGURATION.md index aea225340a..489bd3fbca 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -30,6 +30,7 @@ socket.max.fails | * | 0 .. 1000000 | 1 broker.address.ttl | * | 0 .. 86400000 | 1000 | low | How long to cache the broker address resolving results (milliseconds).
*Type: integer* broker.address.family | * | any, v4, v6 | any | low | Allowed broker IP address families: any, v4, v6
*Type: enum value* connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info).
*Type: integer* +enable.broker.lazy_creation | * | true, false | true | low | When enabled the client will only create broker main thread it needs to communicate with. When disabled the client will create the broker main thread once added.
*Type: boolean* reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`.
*Type: integer* reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
*Type: integer* reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed.
*Type: integer* diff --git a/src/rdkafka.c b/src/rdkafka.c index 8ffd91b643..d0b907735c 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2004,6 +2004,74 @@ static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) { return ret; } +/** + * @brief serve to create broker thread once need to + * + */ +static void rd_kafka_broker_thread_create_serve(rd_kafka_t *rk) { + rd_kafka_broker_t *rkb = NULL; + +#ifndef _WIN32 + sigset_t newset, oldset; + /* Block all signals in newly created thread. + * To avoid race condition we block all signals in the calling + * thread, which the new thread will inherit its sigmask from, + * and then restore the original sigmask of the calling thread when + * we're done creating the thread. + * NOTE: term_sig remains unblocked since we use it on termination + * to quickly interrupt system calls. */ + sigemptyset(&oldset); + sigfillset(&newset); + if (rk->rk_conf.term_sig) + sigdelset(&newset, rk->rk_conf.term_sig); + pthread_sigmask(SIG_SETMASK, &newset, &oldset); + +#endif + + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { + rd_kafka_broker_lock(rkb); + + /* For linux, type of rkb_thread is unsigned int + * For windows, type of rkb_thread is void */ +#if defined(_WIN32) + if (rkb->rkb_thread != NULL) { +#else + if (rkb->rkb_thread != 0) { +#endif + rd_kafka_broker_unlock(rkb); + continue; + } + + if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_INIT || + (rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT && + !rd_kafka_terminating(rkb->rkb_rk) && + !rd_kafka_fatal_error_code(rkb->rkb_rk) && + (!rkb->rkb_rk->rk_conf.sparse_connections || + rkb->rkb_persistconn.internal || + rd_atomic32_get(&rkb->rkb_persistconn.coord) || + rkb->rkb_ops->rkq_qlen > 0))) { + + if (thrd_create(&rkb->rkb_thread, + rd_kafka_broker_process, rkb) != thrd_success) { + + rd_kafka_log(rk, LOG_CRIT, "THREAD", + "Unable to create broker thread"); + + /* Send ERR op back to application for processing. */ + rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, + "Unable to create broker thread"); + + rd_free(rkb); + } + } + rd_kafka_broker_unlock(rkb); + } + +#ifndef _WIN32 + /* Restore sigmask of caller */ + pthread_sigmask(SIG_SETMASK, &oldset, NULL); +#endif +} /** * Main loop for Kafka handler thread. @@ -2059,6 +2127,10 @@ static int rd_kafka_thread_main (void *arg) { RD_KAFKA_Q_CB_CALLBACK, NULL, NULL); if (rk->rk_cgrp) /* FIXME: move to timer-triggered */ rd_kafka_cgrp_serve(rk->rk_cgrp); + + if (rk->rk_conf.broker_thread_lazy_creation) + rd_kafka_broker_thread_create_serve(rk); + rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT); } diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 588f1a5583..218c58c44c 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5660,26 +5660,26 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, * the broker thread until we've finalized the rkb. */ rd_kafka_broker_lock(rkb); rd_kafka_broker_keep(rkb); /* broker thread's refcnt */ - if (thrd_create(&rkb->rkb_thread, - rd_kafka_broker_thread_main, rkb) != thrd_success) { - rd_kafka_broker_unlock(rkb); - - rd_kafka_log(rk, LOG_CRIT, "THREAD", - "Unable to create broker thread"); - - /* Send ERR op back to application for processing. */ - rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, - "Unable to create broker thread"); - - rd_free(rkb); - + if (!rkb->rkb_rk->rk_conf.broker_thread_lazy_creation || + (rkb->rkb_rk->rk_conf.broker_thread_lazy_creation && + source != RD_KAFKA_LEARNED)) { + if (thrd_create(&rkb->rkb_thread, + rd_kafka_broker_thread_main, rkb) != thrd_success) { + rd_kafka_broker_unlock(rkb); + + rd_kafka_log(rk, LOG_CRIT, "THREAD", + "Unable to create broker thread"); + /* Send ERR op back to application for processing. */ + rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, + "Unable to create broker thread"); + rd_free(rkb); #ifndef _WIN32 - /* Restore sigmask of caller */ - pthread_sigmask(SIG_SETMASK, &oldset, NULL); + /* Restore sigmask of caller */ + pthread_sigmask(SIG_SETMASK, &oldset, NULL); #endif - - return NULL; - } + return NULL; + } + } if (rkb->rkb_source != RD_KAFKA_INTERNAL) { if (rk->rk_conf.security_protocol == @@ -5715,8 +5715,10 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, #ifndef _WIN32 - /* Restore sigmask of caller */ - pthread_sigmask(SIG_SETMASK, &oldset, NULL); + if (!rkb->rkb_rk->rk_conf.broker_thread_lazy_creation) { + /* Restore sigmask of caller */ + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + } #endif return rkb; @@ -6104,6 +6106,10 @@ int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) { return rd_kafka_brokers_add0(rk, brokerlist); } +void rd_kafka_broker_process(void* rbk) { + rd_kafka_broker_thread_main(rbk); +} + /** * @brief Adds a new broker or updates an existing one. diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index 02c08bc961..7c5573acaa 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -460,6 +460,8 @@ void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, const char *errstr); +void rd_kafka_broker_process(void* rbk); + void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb); #define rd_kafka_broker_destroy(rkb) \ diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index ed1787fbd8..de51387517 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -562,6 +562,13 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "it needs to communicate with. When disabled the client " "will maintain connections to all brokers in the cluster.", 0, 1, 1 }, + { _RK_GLOBAL, "enable.broker.lazy_creation", + _RK_C_BOOL, + _RK(broker_thread_lazy_creation), + "When enabled the client will only create broker main thread " + "it needs to communicate with. When disabled the client " + "will create the broker main thread once added.", + 0, 1, 1}, { _RK_GLOBAL|_RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT, _RK(reconnect_jitter_ms), "No longer used. See `reconnect.backoff.ms` and " diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index ac08651d83..884062d4c1 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -216,6 +216,7 @@ struct rd_kafka_conf_s { int reconnect_jitter_ms; int connections_max_idle_ms; int sparse_connections; + int broker_thread_lazy_creation; int sparse_connect_intvl; int api_version_request; int api_version_request_timeout_ms;