https://bugs.gentoo.org/915433 https://github.com/confluentinc/librdkafka/pull/4449 From 8b311b8a850805f4ec9bb068c0edb31492ad03fe Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 27 Sep 2023 11:08:33 +0200 Subject: [PATCH 1/3] tmpabuf refactor and fix for insufficient buffer allocation --- CHANGELOG.md | 10 ++++++ src/rdkafka_buf.h | 25 ++++++++++++--- src/rdkafka_metadata.c | 59 +++++++++++++++++++----------------- src/rdkafka_metadata_cache.c | 38 +++++++++++------------ src/rdkafka_topic.c | 36 +++++++++++++--------- 5 files changed, 100 insertions(+), 68 deletions(-) diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index ccd563cc6..623ec49ae 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -49,21 +49,36 @@ typedef struct rd_tmpabuf_s { size_t of; char *buf; int failed; - int assert_on_fail; + rd_bool_t assert_on_fail; } rd_tmpabuf_t; /** - * @brief Allocate new tmpabuf with \p size bytes pre-allocated. + * @brief Initialize new tmpabuf of non-final \p size bytes. */ static RD_UNUSED void -rd_tmpabuf_new(rd_tmpabuf_t *tab, size_t size, int assert_on_fail) { - tab->buf = rd_malloc(size); - tab->size = size; +rd_tmpabuf_new(rd_tmpabuf_t *tab, size_t size, rd_bool_t assert_on_fail) { + tab->buf = NULL; + tab->size = RD_ROUNDUP(size, 8); tab->of = 0; tab->failed = 0; tab->assert_on_fail = assert_on_fail; } +/** + * @brief Add a new allocation of \p _size bytes, + * rounded up to maximum word size, + * for \p _times times. + */ +#define rd_tmpabuf_add_alloc_times(_tab, _size, _times) \ + (_tab)->size += RD_ROUNDUP(_size, 8) * _times + +#define rd_tmpabuf_add_alloc(_tab, _size) \ + rd_tmpabuf_add_alloc_times(_tab, _size, 1) +/** + * @brief Finalize tmpabuf pre-allocating tab->size bytes. + */ +#define rd_tmpabuf_finalize(_tab) (_tab)->buf = rd_malloc((_tab)->size) + /** * @brief Free memory allocated by tmpabuf */ diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index f96edf658..6c2f60ae3 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -164,7 +164,8 @@ static rd_kafka_metadata_internal_t *rd_kafka_metadata_copy_internal( * Because of this we copy all the structs verbatim but * any pointer fields needs to be copied explicitly to update * the pointer address. */ - rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/); + rd_tmpabuf_new(&tbuf, size, rd_true /*assert on fail*/); + rd_tmpabuf_finalize(&tbuf); mdi = rd_tmpabuf_write(&tbuf, src, sizeof(*mdi)); md = &mdi->metadata; @@ -506,11 +507,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, * no more than 4 times larger than the wire representation. * This is increased to 5 times in case if we want to compute partition * to rack mapping. */ - rd_tmpabuf_new(&tbuf, - sizeof(*mdi) + rkb_namelen + - (rkbuf->rkbuf_totlen * 4 + - (compute_racks ? rkbuf->rkbuf_totlen : 0)), - 0 /*dont assert on fail*/); + rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/); + rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi)); + rd_tmpabuf_add_alloc(&tbuf, rkb_namelen); + rd_tmpabuf_add_alloc(&tbuf, rkbuf->rkbuf_totlen * + (4 + (compute_racks ? 1 : 0))); + + rd_tmpabuf_finalize(&tbuf); if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)))) { rd_kafka_broker_unlock(rkb); @@ -1603,35 +1606,37 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, rd_kafka_metadata_internal_t *mdi; rd_kafka_metadata_t *md; rd_tmpabuf_t tbuf; - size_t topic_names_size = 0; - int total_partition_cnt = 0; size_t i; int curr_broker = 0; - /* Calculate total partition count and topic names size before - * allocating memory. */ - for (i = 0; i < topic_cnt; i++) { - topic_names_size += 1 + strlen(topics[i].topic); - total_partition_cnt += topics[i].partition_cnt; - } - /* If the replication factor is given, num_brokers must also be given */ rd_assert(replication_factor <= 0 || num_brokers > 0); /* Allocate contiguous buffer which will back all the memory * needed by the final metadata_t object */ - rd_tmpabuf_new( - &tbuf, - sizeof(*mdi) + (sizeof(*md->topics) * topic_cnt) + - topic_names_size + (64 /*topic name size..*/ * topic_cnt) + - (sizeof(*md->topics[0].partitions) * total_partition_cnt) + - (sizeof(*mdi->topics) * topic_cnt) + - (sizeof(*mdi->topics[0].partitions) * total_partition_cnt) + - (sizeof(*mdi->brokers) * RD_ROUNDUP(num_brokers, 8)) + - (replication_factor > 0 ? RD_ROUNDUP(replication_factor, 8) * - total_partition_cnt * sizeof(int) - : 0), - 1 /*assert on fail*/); + rd_tmpabuf_new(&tbuf, sizeof(*mdi), rd_true /*assert on fail*/); + + rd_tmpabuf_add_alloc(&tbuf, topic_cnt * sizeof(*md->topics)); + rd_tmpabuf_add_alloc(&tbuf, topic_cnt * sizeof(*mdi->topics)); + rd_tmpabuf_add_alloc(&tbuf, num_brokers * sizeof(*md->brokers)); + + /* Calculate total partition count and topic names size before + * allocating memory. */ + for (i = 0; i < topic_cnt; i++) { + rd_tmpabuf_add_alloc(&tbuf, 1 + strlen(topics[i].topic)); + rd_tmpabuf_add_alloc(&tbuf, + topics[i].partition_cnt * + sizeof(*md->topics[i].partitions)); + rd_tmpabuf_add_alloc(&tbuf, + topics[i].partition_cnt * + sizeof(*mdi->topics[i].partitions)); + if (replication_factor > 0) + rd_tmpabuf_add_alloc_times( + &tbuf, replication_factor * sizeof(int), + topics[i].partition_cnt); + } + + rd_tmpabuf_finalize(&tbuf); mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); memset(mdi, 0, sizeof(*mdi)); diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index 18f19a4d0..1530e699e 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -249,8 +249,6 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( rd_kafka_metadata_broker_internal_t *brokers_internal, size_t broker_cnt) { struct rd_kafka_metadata_cache_entry *rkmce, *old; - size_t topic_len; - size_t racks_size = 0; rd_tmpabuf_t tbuf; int i; @@ -261,34 +259,32 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( * any pointer fields needs to be copied explicitly to update * the pointer address. * See also rd_kafka_metadata_cache_delete which frees this. */ - topic_len = strlen(mtopic->topic) + 1; + rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/); + + rd_tmpabuf_add_alloc(&tbuf, sizeof(*rkmce)); + rd_tmpabuf_add_alloc(&tbuf, strlen(mtopic->topic) + 1); + rd_tmpabuf_add_alloc(&tbuf, mtopic->partition_cnt * + sizeof(*mtopic->partitions)); + rd_tmpabuf_add_alloc(&tbuf, + mtopic->partition_cnt * + sizeof(*metadata_internal_topic->partitions)); for (i = 0; include_racks && i < mtopic->partition_cnt; i++) { size_t j; - racks_size += RD_ROUNDUP( - metadata_internal_topic->partitions[i].racks_cnt * - sizeof(char *), - 8); + rd_tmpabuf_add_alloc( + &tbuf, metadata_internal_topic->partitions[i].racks_cnt * + sizeof(char *)); for (j = 0; j < metadata_internal_topic->partitions[i].racks_cnt; j++) { - racks_size += RD_ROUNDUP( - strlen(metadata_internal_topic->partitions[i] - .racks[j]) + - 1, - 8); + rd_tmpabuf_add_alloc( + &tbuf, strlen(metadata_internal_topic->partitions[i] + .racks[j]) + + 1); } } - rd_tmpabuf_new( - &tbuf, - RD_ROUNDUP(sizeof(*rkmce), 8) + RD_ROUNDUP(topic_len, 8) + - (mtopic->partition_cnt * - RD_ROUNDUP(sizeof(*mtopic->partitions), 8)) + - (mtopic->partition_cnt * - RD_ROUNDUP(sizeof(*metadata_internal_topic->partitions), 8)) + - racks_size, - 1 /*assert on fail*/); + rd_tmpabuf_finalize(&tbuf); rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce)); diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 3b3986d43..b63a0bbea 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1831,38 +1831,44 @@ rd_kafka_topic_info_t *rd_kafka_topic_info_new_with_rack( const rd_kafka_metadata_partition_internal_t *mdpi) { rd_kafka_topic_info_t *ti; rd_tmpabuf_t tbuf; - size_t tlen = RD_ROUNDUP(strlen(topic) + 1, 8); - size_t total_racks_size = 0; int i; + rd_bool_t has_racks = rd_false; + rd_tmpabuf_new(&tbuf, 0, rd_true /* assert on fail */); + + rd_tmpabuf_add_alloc(&tbuf, sizeof(*ti)); + rd_tmpabuf_add_alloc(&tbuf, strlen(topic) + 1); for (i = 0; i < partition_cnt; i++) { size_t j; if (!mdpi[i].racks) continue; + if (unlikely(!has_racks)) + has_racks = rd_true; + for (j = 0; j < mdpi[i].racks_cnt; j++) { - total_racks_size += - RD_ROUNDUP(strlen(mdpi[i].racks[j]) + 1, 8); + rd_tmpabuf_add_alloc(&tbuf, + strlen(mdpi[i].racks[j]) + 1); } - total_racks_size += - RD_ROUNDUP(sizeof(char *) * mdpi[i].racks_cnt, 8); + rd_tmpabuf_add_alloc(&tbuf, sizeof(char *) * mdpi[i].racks_cnt); + } + + /* Only bother allocating this if at least one + * rack is there. */ + if (has_racks) { + rd_tmpabuf_add_alloc( + &tbuf, sizeof(rd_kafka_metadata_partition_internal_t) * + partition_cnt); } - if (total_racks_size) /* Only bother allocating this if at least one - rack is there. */ - total_racks_size += - RD_ROUNDUP(sizeof(rd_kafka_metadata_partition_internal_t) * - partition_cnt, - 8); + rd_tmpabuf_finalize(&tbuf); - rd_tmpabuf_new(&tbuf, sizeof(*ti) + tlen + total_racks_size, - 1 /* assert on fail */); ti = rd_tmpabuf_alloc(&tbuf, sizeof(*ti)); ti->topic = rd_tmpabuf_write_str(&tbuf, topic); ti->partition_cnt = partition_cnt; ti->partitions_internal = NULL; - if (total_racks_size) { + if (has_racks) { ti->partitions_internal = rd_tmpabuf_alloc( &tbuf, sizeof(*ti->partitions_internal) * partition_cnt);