diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index aea225340a..489bd3fbca 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -30,6 +30,7 @@ socket.max.fails | * | 0 .. 1000000 | 1
broker.address.ttl | * | 0 .. 86400000 | 1000 | low | How long to cache the broker address resolving results (milliseconds).
*Type: integer*
broker.address.family | * | any, v4, v6 | any | low | Allowed broker IP address families: any, v4, v6
*Type: enum value*
connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info).
*Type: integer*
+enable.broker.lazy_creation | * | true, false | true | low | When enabled the client will only create broker main thread it needs to communicate with. When disabled the client will create the broker main thread once added.
*Type: boolean*
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`.
*Type: integer*
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
*Type: integer*
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed.
*Type: integer*
diff --git a/src/rdkafka.c b/src/rdkafka.c
index 8ffd91b643..d0b907735c 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2004,6 +2004,74 @@ static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) {
return ret;
}
+/**
+ * @brief serve to create broker thread once need to
+ *
+ */
+static void rd_kafka_broker_thread_create_serve(rd_kafka_t *rk) {
+ rd_kafka_broker_t *rkb = NULL;
+
+#ifndef _WIN32
+ sigset_t newset, oldset;
+ /* Block all signals in newly created thread.
+ * To avoid race condition we block all signals in the calling
+ * thread, which the new thread will inherit its sigmask from,
+ * and then restore the original sigmask of the calling thread when
+ * we're done creating the thread.
+ * NOTE: term_sig remains unblocked since we use it on termination
+ * to quickly interrupt system calls. */
+ sigemptyset(&oldset);
+ sigfillset(&newset);
+ if (rk->rk_conf.term_sig)
+ sigdelset(&newset, rk->rk_conf.term_sig);
+ pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+
+#endif
+
+ TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+ rd_kafka_broker_lock(rkb);
+
+ /* For linux, type of rkb_thread is unsigned int
+ * For windows, type of rkb_thread is void */
+#if defined(_WIN32)
+ if (rkb->rkb_thread != NULL) {
+#else
+ if (rkb->rkb_thread != 0) {
+#endif
+ rd_kafka_broker_unlock(rkb);
+ continue;
+ }
+
+ if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_INIT ||
+ (rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT &&
+ !rd_kafka_terminating(rkb->rkb_rk) &&
+ !rd_kafka_fatal_error_code(rkb->rkb_rk) &&
+ (!rkb->rkb_rk->rk_conf.sparse_connections ||
+ rkb->rkb_persistconn.internal ||
+ rd_atomic32_get(&rkb->rkb_persistconn.coord) ||
+ rkb->rkb_ops->rkq_qlen > 0))) {
+
+ if (thrd_create(&rkb->rkb_thread,
+ rd_kafka_broker_process, rkb) != thrd_success) {
+
+ rd_kafka_log(rk, LOG_CRIT, "THREAD",
+ "Unable to create broker thread");
+
+ /* Send ERR op back to application for processing. */
+ rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+ "Unable to create broker thread");
+
+ rd_free(rkb);
+ }
+ }
+ rd_kafka_broker_unlock(rkb);
+ }
+
+#ifndef _WIN32
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+}
/**
* Main loop for Kafka handler thread.
@@ -2059,6 +2127,10 @@ static int rd_kafka_thread_main (void *arg) {
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
rd_kafka_cgrp_serve(rk->rk_cgrp);
+
+ if (rk->rk_conf.broker_thread_lazy_creation)
+ rd_kafka_broker_thread_create_serve(rk);
+
rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
}
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index 588f1a5583..218c58c44c 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -5660,26 +5660,26 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
* the broker thread until we've finalized the rkb. */
rd_kafka_broker_lock(rkb);
rd_kafka_broker_keep(rkb); /* broker thread's refcnt */
- if (thrd_create(&rkb->rkb_thread,
- rd_kafka_broker_thread_main, rkb) != thrd_success) {
- rd_kafka_broker_unlock(rkb);
-
- rd_kafka_log(rk, LOG_CRIT, "THREAD",
- "Unable to create broker thread");
-
- /* Send ERR op back to application for processing. */
- rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
- "Unable to create broker thread");
-
- rd_free(rkb);
-
+ if (!rkb->rkb_rk->rk_conf.broker_thread_lazy_creation ||
+ (rkb->rkb_rk->rk_conf.broker_thread_lazy_creation &&
+ source != RD_KAFKA_LEARNED)) {
+ if (thrd_create(&rkb->rkb_thread,
+ rd_kafka_broker_thread_main, rkb) != thrd_success) {
+ rd_kafka_broker_unlock(rkb);
+
+ rd_kafka_log(rk, LOG_CRIT, "THREAD",
+ "Unable to create broker thread");
+ /* Send ERR op back to application for processing. */
+ rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+ "Unable to create broker thread");
+ rd_free(rkb);
#ifndef _WIN32
- /* Restore sigmask of caller */
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif
-
- return NULL;
- }
+ return NULL;
+ }
+ }
if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
if (rk->rk_conf.security_protocol ==
@@ -5715,8 +5715,10 @@ rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
#ifndef _WIN32
- /* Restore sigmask of caller */
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ if (!rkb->rkb_rk->rk_conf.broker_thread_lazy_creation) {
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+ }
#endif
return rkb;
@@ -6104,6 +6106,10 @@ int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) {
return rd_kafka_brokers_add0(rk, brokerlist);
}
+void rd_kafka_broker_process(void* rbk) {
+ rd_kafka_broker_thread_main(rbk);
+}
+
/**
* @brief Adds a new broker or updates an existing one.
diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h
index 02c08bc961..7c5573acaa 100644
--- a/src/rdkafka_broker.h
+++ b/src/rdkafka_broker.h
@@ -460,6 +460,8 @@ void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
const char *errstr);
+void rd_kafka_broker_process(void* rbk);
+
void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);
#define rd_kafka_broker_destroy(rkb) \
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index ed1787fbd8..de51387517 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -562,6 +562,13 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"it needs to communicate with. When disabled the client "
"will maintain connections to all brokers in the cluster.",
0, 1, 1 },
+ { _RK_GLOBAL, "enable.broker.lazy_creation",
+ _RK_C_BOOL,
+ _RK(broker_thread_lazy_creation),
+ "When enabled the client will only create broker main thread "
+ "it needs to communicate with. When disabled the client "
+ "will create the broker main thread once added.",
+ 0, 1, 1},
{ _RK_GLOBAL|_RK_DEPRECATED, "reconnect.backoff.jitter.ms", _RK_C_INT,
_RK(reconnect_jitter_ms),
"No longer used. See `reconnect.backoff.ms` and "
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
index ac08651d83..884062d4c1 100644
--- a/src/rdkafka_conf.h
+++ b/src/rdkafka_conf.h
@@ -216,6 +216,7 @@ struct rd_kafka_conf_s {
int reconnect_jitter_ms;
int connections_max_idle_ms;
int sparse_connections;
+ int broker_thread_lazy_creation;
int sparse_connect_intvl;
int api_version_request;
int api_version_request_timeout_ms;