Skip to content

Commit

Permalink
KafkaConsumer: back pressure + improved read speed (#139)
Browse files Browse the repository at this point in the history
* Add Back Pressure to `KafkaConsumer`

Motivation:

Closes #131.

Modifications:

* re-add `KafkaConsumerConfiguration.backPressureStrategy:
  BackPressureStrategy`, currently allowing users to add
  high-low-watermark backpressure to their `KafkaConsumer`s
* `KafkaConsumer`:
    * make `KafkaConsumerMessages` use `NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark`
      as backpressure strategy
    * remove `rd_kafka_poll_set_consumer` -> use two separate queues for
      consumer events and consumer messages so we can exert backpressure
      on the consumer message queue
    * remove idle polling mechanism where incoming messages were
      discarded when `KafkaConsumerMessages` was terminated -> we now
      have to independent queues
    * rename `.pollForAndYieldMessage` -> `.pollForEventsAndMessages`
    * refactor `State` and add `ConsumerMessagesSequenceState`
* `KafkaProducer`:
    * rename `.consumptionStopped` -> `.eventConsumptionFinished`
* `RDKafkaClient`:
    * bring back `consumerPoll()`
    * `eventPoll()`: only queue main queue for events since consumer messages are now handled on a different queue

* KafkaConsumer: two state machines

Modifications:

* have two state machines:
    1. consumer state itself
    2. state of consumer messages async sequence

* KafkaConsumer: merge both state machines

* Refactor + DocC

* Review Franz + Blindspot

Modifications:

* `KafkaConsumer`:
    * end consumer message poll loop when async sequence drops message
    * do not sleep if we picked up reading new messages again after we
      finished reading a partition
    * `messageRunLoop`:
        * fix `fatalError` where `newMessagesProduced()` is invoked after `stopProducing()`
    * add func `batchConsumerPoll` that reads a batch of messages to
      avoid acquiring the lock in `messageRunLoop` too often
  • Loading branch information
felixschlegel authored Nov 1, 2023
1 parent 73bc4b4 commit f1800c2
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 189 deletions.
33 changes: 32 additions & 1 deletion Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@ public struct KafkaConsumerConfiguration {
/// Default: `.milliseconds(100)`
public var pollInterval: Duration = .milliseconds(100)

/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
public struct BackPressureStrategy: Sendable, Hashable {
enum _BackPressureStrategy: Sendable, Hashable {
case watermark(low: Int, high: Int)
}

let _internal: _BackPressureStrategy

private init(backPressureStrategy: _BackPressureStrategy) {
self._internal = backPressureStrategy
}

/// A back pressure strategy based on high and low watermarks.
///
/// The consumer maintains a buffer size between a low watermark and a high watermark
/// to control the flow of incoming messages.
///
/// - Parameter low: The lower threshold for the buffer size (low watermark).
/// - Parameter high: The upper threshold for the buffer size (high watermark).
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
return .init(backPressureStrategy: .watermark(low: low, high: high))
}
}

/// The backpressure strategy to be used for message consumption.
/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
public var backPressureStrategy: BackPressureStrategy = .watermark(
low: 10,
high: 50
)

/// A struct representing the different Kafka message consumption strategies.
public struct ConsumptionStrategy: Sendable, Hashable {
enum _ConsumptionStrategy: Sendable, Hashable {
Expand Down Expand Up @@ -63,7 +94,7 @@ public struct KafkaConsumerConfiguration {
}

/// The strategy used for consuming messages.
/// See ``KafkaConfiguration/ConsumptionStrategy`` for more information.
/// See ``KafkaConsumerConfiguration/ConsumptionStrategy-swift.struct-swift.struct`` for more information.
public var consumptionStrategy: ConsumptionStrategy

// MARK: - Consumer-specific Config Properties
Expand Down
Loading

0 comments on commit f1800c2

Please sign in to comment.