Skip to content

Commit

Permalink
Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded ba…
Browse files Browse the repository at this point in the history
…ckpressure) (#158)

* remove message sequence

* test consumer with implicit rebalance

* misc + format

* remove artefact

* don't check a lot of messages

* fix typo

* slow down first consumer to lower message to fit CI timeout

* remove helpers

* use exact benchmark version to avoid missing thresholds error (as no thresholds so far)

* add deprecated marks for backpressure, change comment for future dev

* address comments
  • Loading branch information
blindspotbounty authored Apr 26, 2024
1 parent 8be7e2f commit 298067a
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 391 deletions.
32 changes: 0 additions & 32 deletions Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
//
//===----------------------------------------------------------------------===//

import Crdkafka
import struct Foundation.UUID

public struct KafkaConsumerConfiguration {
Expand All @@ -23,37 +22,6 @@ public struct KafkaConsumerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
public struct BackPressureStrategy: Sendable, Hashable {
enum _BackPressureStrategy: Sendable, Hashable {
case watermark(low: Int, high: Int)
}

let _internal: _BackPressureStrategy

private init(backPressureStrategy: _BackPressureStrategy) {
self._internal = backPressureStrategy
}

/// A back pressure strategy based on high and low watermarks.
///
/// The consumer maintains a buffer size between a low watermark and a high watermark
/// to control the flow of incoming messages.
///
/// - Parameter low: The lower threshold for the buffer size (low watermark).
/// - Parameter high: The upper threshold for the buffer size (high watermark).
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
return .init(backPressureStrategy: .watermark(low: low, high: high))
}
}

/// The backpressure strategy to be used for message consumption.
/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
public var backPressureStrategy: BackPressureStrategy = .watermark(
low: 10,
high: 50
)

/// A struct representing the different Kafka message consumption strategies.
public struct ConsumptionStrategy: Sendable, Hashable {
enum _ConsumptionStrategy: Sendable, Hashable {
Expand Down
Loading

0 comments on commit 298067a

Please sign in to comment.