From 0550863179ff70c9e63bb20c89bf0ca871b14d26 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 3 Oct 2024 20:30:03 +0200 Subject: [PATCH] [KIP-848] Tests for ListGroups filter to list only given group types Co-authored-by: mahajanadhitya --- tests/0080-admin_ut.c | 30 +++- tests/0081-admin.c | 312 +++++++++++++++++++++++++----------------- 2 files changed, 213 insertions(+), 129 deletions(-) diff --git a/tests/0080-admin_ut.c b/tests/0080-admin_ut.c index 4dbed3a672..31886c25c7 100644 --- a/tests/0080-admin_ut.c +++ b/tests/0080-admin_ut.c @@ -528,22 +528,44 @@ static void do_test_ListConsumerGroups(const char *what, q = useq ? useq : rd_kafka_queue_new(rk); if (with_options) { - rd_kafka_consumer_group_state_t duplicate[2] = { + rd_kafka_error_t *error; + rd_kafka_consumer_group_state_t duplicate_states[2] = { RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY, RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY}; + rd_kafka_consumer_group_type_t duplicate_types[2] = { + RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC, + RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC}; + rd_kafka_consumer_group_type_t unknown_type[1] = { + RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN}; options = rd_kafka_AdminOptions_new( rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS); /* Test duplicate error on match states */ - rd_kafka_error_t *error = - rd_kafka_AdminOptions_set_match_consumer_group_states( - options, duplicate, 2); + error = rd_kafka_AdminOptions_set_match_consumer_group_states( + options, duplicate_states, 2); TEST_ASSERT(error && rd_kafka_error_code(error), "%s", "Expected error on duplicate states," " got no error"); rd_kafka_error_destroy(error); + /* Test duplicate error on match group types */ + error = rd_kafka_AdminOptions_set_match_consumer_group_types( + options, duplicate_types, 2); + TEST_ASSERT(error && rd_kafka_error_code(error), "%s", + "Expected error on duplicate group types," + " got no error"); + rd_kafka_error_destroy(error); + + /* Test invalid args error on setting UNKNOWN group type in + * match group types */ + error = rd_kafka_AdminOptions_set_match_consumer_group_types( + options, unknown_type, 1); + TEST_ASSERT(error && rd_kafka_error_code(error), "%s", + "Expected error on Unknown group type," + " got no error"); + rd_kafka_error_destroy(error); + exp_timeout = MY_SOCKET_TIMEOUT_MS * 2; TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( options, exp_timeout, errstr, sizeof(errstr))); diff --git a/tests/0081-admin.c b/tests/0081-admin.c index d3d06b2de8..891deb7670 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -2650,137 +2650,59 @@ static void do_test_DeleteGroups(const char *what, } /** - * @brief Test list groups, creating consumers for a set of groups, - * listing and deleting them at the end. + * @brief Helper for do_test_ListConsumerGroups, makes ListConsumerGroups call + * and checks returned group count equals to \p exp_found. + * Parameter \p exp_type, if not UNKNOWN, needs to match + * returned groups type. */ -static void do_test_ListConsumerGroups(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int request_timeout, - rd_bool_t match_states) { -#define TEST_LIST_CONSUMER_GROUPS_CNT 4 - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - size_t valid_cnt, error_cnt; - rd_bool_t is_simple_consumer_group; - rd_kafka_consumer_group_state_t state; - char errstr[512]; - const char *errstr2, *group_id; - char *list_consumer_groups[TEST_LIST_CONSUMER_GROUPS_CNT]; - const int partitions_cnt = 1; - const int msgs_cnt = 100; - size_t i, found; - char *topic; - rd_kafka_metadata_topic_t exp_mdtopic = {0}; - int64_t testid = test_id_generate(); - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - const rd_kafka_ListConsumerGroups_result_t *res; - const rd_kafka_ConsumerGroupListing_t **groups; - rd_bool_t has_match_states = - test_broker_version >= TEST_BRKVER(2, 7, 0, 0); - - SUB_TEST_QUICK( - "%s ListConsumerGroups with %s, request_timeout %d" - ", match_states %s", - rd_kafka_name(rk), what, request_timeout, RD_STR_ToF(match_states)); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (request_timeout != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS); - - if (match_states) { - rd_kafka_consumer_group_state_t empty = - RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY; - - TEST_CALL_ERROR__( - rd_kafka_AdminOptions_set_match_consumer_group_states( - options, &empty, 1)); - } - - TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( - options, request_timeout, errstr, sizeof(errstr))); - } - - - topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - exp_mdtopic.topic = topic; - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); - - rd_sleep(1); /* Additional wait time for cluster propagation */ - - /* Produce 100 msgs */ - test_produce_msgs_easy(topic, testid, 0, msgs_cnt); - - for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) { - char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt, - NULL); - list_consumer_groups[i] = group; - } +static void +test_ListConsumerGroups_helper(rd_kafka_t *rk, + rd_kafka_AdminOptions_t *option, + rd_kafka_queue_t *q, + char **list_consumer_groups, + size_t list_consumer_groups_cnt, + size_t exp_found, + rd_kafka_consumer_group_type_t exp_type, + rd_bool_t match_states) { + rd_kafka_event_t *rkev = NULL; + const rd_kafka_ListConsumerGroups_result_t *result = NULL; + size_t group_cnt; + size_t error_cnt; + size_t found; + size_t i, j; + const rd_kafka_ConsumerGroupListing_t **groups = NULL; - TIMING_START(&timing, "ListConsumerGroups"); TEST_SAY("Call ListConsumerGroups\n"); - rd_kafka_ListConsumerGroups(rk, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - TIMING_START(&timing, "ListConsumerGroups.queue_poll"); - + rd_kafka_ListConsumerGroups(rk, option, q); /* Poll result queue for ListConsumerGroups result. * Print but otherwise ignore other event types * (typically generic Error events). */ while (1) { rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("ListConsumerGroups: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); if (rkev == NULL) continue; + if (rd_kafka_event_error(rkev)) TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), rd_kafka_event_error_string(rkev)); if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) { + RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) break; - } rd_kafka_event_destroy(rkev); } /* Convert event to proper result */ - res = rd_kafka_event_ListConsumerGroups_result(rkev); - TEST_ASSERT(res, "expected ListConsumerGroups_result, got %s", + result = rd_kafka_event_ListConsumerGroups_result(rkev); + TEST_ASSERT(result, "expected ListConsumerGroups_result, got %s", rd_kafka_event_name(rkev)); - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected ListConsumerGroups to return %s, got %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("ListConsumerGroups: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - groups = rd_kafka_ListConsumerGroups_result_valid(res, &valid_cnt); - rd_kafka_ListConsumerGroups_result_errors(res, &error_cnt); + if (rd_kafka_event_error(rkev)) + TEST_FAIL("ListConsumerGroups failed with %s", + rd_kafka_event_error_string(rkev)); - /* Other tests could be running */ - TEST_ASSERT(valid_cnt >= TEST_LIST_CONSUMER_GROUPS_CNT, - "expected ListConsumerGroups to return at least %" PRId32 - " valid groups," - " got %zu", - TEST_LIST_CONSUMER_GROUPS_CNT, valid_cnt); + groups = rd_kafka_ListConsumerGroups_result_valid(result, &group_cnt); + rd_kafka_ListConsumerGroups_result_errors(result, &error_cnt); TEST_ASSERT(error_cnt == 0, "expected ListConsumerGroups to return 0 errors," @@ -2788,23 +2710,32 @@ static void do_test_ListConsumerGroups(const char *what, error_cnt); found = 0; - for (i = 0; i < valid_cnt; i++) { - int j; - const rd_kafka_ConsumerGroupListing_t *group; - group = groups[i]; - group_id = rd_kafka_ConsumerGroupListing_group_id(group); - is_simple_consumer_group = + for (i = 0; i < group_cnt; i++) { + const rd_kafka_ConsumerGroupListing_t *group = groups[i]; + const char *group_id = + rd_kafka_ConsumerGroupListing_group_id(group); + int is_simple_consumer_group = rd_kafka_ConsumerGroupListing_is_simple_consumer_group( group); - state = rd_kafka_ConsumerGroupListing_state(group); - for (j = 0; j < TEST_LIST_CONSUMER_GROUPS_CNT; j++) { + rd_kafka_consumer_group_state_t state = + rd_kafka_ConsumerGroupListing_state(group); + rd_kafka_consumer_group_type_t type = + rd_kafka_ConsumerGroupListing_type(group); + for (j = 0; j < list_consumer_groups_cnt; j++) { if (!strcmp(list_consumer_groups[j], group_id)) { found++; TEST_ASSERT(!is_simple_consumer_group, "expected a normal group," " got a simple group"); + if (exp_type != + RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN) { + TEST_ASSERT( + type == exp_type, + "Expected the Consumer Group Type " + "to be set by the Broker."); + } - if (!has_match_states) + if (!match_states) break; TEST_ASSERT( @@ -2817,13 +2748,139 @@ static void do_test_ListConsumerGroups(const char *what, } } } - TEST_ASSERT(found == TEST_LIST_CONSUMER_GROUPS_CNT, - "expected to find %d" - " started groups," + + TEST_ASSERT(found == exp_found, + "expected to find %" PRIusz + " consumer groups," " got %" PRIusz, - TEST_LIST_CONSUMER_GROUPS_CNT, found); + exp_found, found); rd_kafka_event_destroy(rkev); +} + +/** + * @brief Test list groups, creating consumers for a set of groups, + * listing and deleting them at the end. + */ +static void do_test_ListConsumerGroups(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int request_timeout, + rd_bool_t match_states, + rd_bool_t match_types) { +#define TEST_LIST_CONSUMER_GROUPS_CNT 4 + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options; + char errstr[512]; + char *list_consumer_groups[TEST_LIST_CONSUMER_GROUPS_CNT]; + const int partitions_cnt = 1; + const int msgs_cnt = 100; + char *topic; + size_t i; + rd_kafka_metadata_topic_t exp_mdtopic = {0}; + int64_t testid = test_id_generate(); + rd_bool_t has_match_states = + test_broker_version >= TEST_BRKVER(2, 7, 0, 0); + rd_bool_t has_match_types = + test_broker_version >= TEST_BRKVER(3, 8, 0, 0); + + rd_kafka_consumer_group_type_t group_protocol_in_use = + RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN; + rd_kafka_consumer_group_type_t group_protocol_not_in_use = + RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN; + + SUB_TEST_QUICK( + "%s ListConsumerGroups with %s, request_timeout %d" + ", match_states %s, match_types %s", + rd_kafka_name(rk), what, request_timeout, RD_STR_ToF(match_states), + RD_STR_ToF(match_types)); + + /* match_states would not work if broker version is below 2.7.0 */ + if (!has_match_states) + match_states = rd_false; + + /* match_types would not work if broker version is below 3.8.0 */ + if (!has_match_types) + match_types = rd_false; + + q = useq ? useq : rd_kafka_queue_new(rk); + + options = + rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS); + + if (request_timeout != -1) { + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, request_timeout, errstr, sizeof(errstr))); + } + + if (match_states) { + rd_kafka_consumer_group_state_t empty = + RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY; + + TEST_CALL_ERROR__( + rd_kafka_AdminOptions_set_match_consumer_group_states( + options, &empty, 1)); + } + + if (match_types) { + if (test_consumer_group_protocol_classic()) { + group_protocol_in_use = + RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC; + group_protocol_not_in_use = + RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER; + } else { + group_protocol_in_use = + RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER; + group_protocol_not_in_use = + RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC; + } + TEST_CALL_ERROR__( + rd_kafka_AdminOptions_set_match_consumer_group_types( + options, &group_protocol_in_use, 1)); + } + + topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + exp_mdtopic.topic = topic; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); + + rd_sleep(1); /* Additional wait time for cluster propagation */ + + /* Produce 100 msgs */ + test_produce_msgs_easy(topic, testid, 0, msgs_cnt); + + for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) { + char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt, + NULL); + list_consumer_groups[i] = group; + } + + test_ListConsumerGroups_helper(rk, options, q, list_consumer_groups, + TEST_LIST_CONSUMER_GROUPS_CNT, + TEST_LIST_CONSUMER_GROUPS_CNT, + group_protocol_in_use, has_match_states); + + if (match_types) { + /* Simply test with the option of protocol not in use */ + rd_kafka_AdminOptions_t *option_group_protocol_not_in_use = + rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS); + rd_kafka_AdminOptions_set_match_consumer_group_types( + option_group_protocol_not_in_use, + &group_protocol_not_in_use, 1); + + test_ListConsumerGroups_helper( + rk, option_group_protocol_not_in_use, q, + list_consumer_groups, TEST_LIST_CONSUMER_GROUPS_CNT, 0, + group_protocol_not_in_use, rd_false); + + rd_kafka_AdminOptions_destroy(option_group_protocol_not_in_use); + } test_DeleteGroups_simple(rk, NULL, (char **)list_consumer_groups, TEST_LIST_CONSUMER_GROUPS_CNT, NULL); @@ -2834,8 +2891,7 @@ static void do_test_ListConsumerGroups(const char *what, rd_free(topic); - if (options) - rd_kafka_AdminOptions_destroy(options); + rd_kafka_AdminOptions_destroy(options); if (!useq) rd_kafka_queue_destroy(q); @@ -5263,8 +5319,14 @@ static void do_test_apis(rd_kafka_type_t cltype) { do_test_DeleteRecords("main queue, op timeout 1500", rk, mainq, 1500); /* List groups */ - do_test_ListConsumerGroups("temp queue", rk, NULL, -1, rd_false); - do_test_ListConsumerGroups("main queue", rk, mainq, 1500, rd_true); + do_test_ListConsumerGroups("temp queue", rk, NULL, -1, rd_false, + rd_true); + do_test_ListConsumerGroups("temp queue", rk, NULL, -1, rd_false, + rd_false); + do_test_ListConsumerGroups("main queue", rk, mainq, 1500, rd_true, + rd_true); + do_test_ListConsumerGroups("main queue", rk, mainq, 1500, rd_true, + rd_false); /* TODO: check this test after KIP-848 admin operation * implementation */