diff --git a/Makefile b/Makefile index 014e83a..87c6438 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ test: pcov-disable static-analysis: pcov-disable mkdir -p build/logs/phpstan - ${PHPSTAN} analyse --no-progress --memory-limit=64 + ${PHPSTAN} analyse --no-progress --memory-limit=128M update-dependencies: composer update diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 2985f39..f813037 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -51,7 +51,7 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface /** * @var callable */ - private $consumeCallback; + private $oauthBearerCallback; /** * @var callable @@ -228,6 +228,20 @@ public function withOffsetCommitCallback(callable $offsetCommitCallback): KafkaC return $that; } + /** + * Set callback that is being called on offset commits + * + * @param callable $oauthBearerCallback + * @return KafkaConsumerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaConsumerBuilderInterface + { + $that = clone $this; + $that->oauthBearerCallback = $oauthBearerCallback; + + return $that; + } + /** * Lets you set a custom decoder for the consumed message * @@ -292,5 +306,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void if (null !== $this->offsetCommitCallback) { $conf->setOffsetCommitCb($this->offsetCommitCallback); } + + if (null !== $this->oauthBearerCallback) { + $conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback); + } } } diff --git a/src/Consumer/KafkaConsumerBuilderInterface.php b/src/Consumer/KafkaConsumerBuilderInterface.php index e828696..8f60205 100644 --- a/src/Consumer/KafkaConsumerBuilderInterface.php +++ b/src/Consumer/KafkaConsumerBuilderInterface.php @@ -105,7 +105,15 @@ public function withDecoder(DecoderInterface $decoder): self; * @param callable $logCallback * @return KafkaConsumerBuilderInterface */ - public function withLogCallback(callable $logCallback): KafkaConsumerBuilderInterface; + public function withLogCallback(callable $logCallback): self; + + /** + * Set callback that is being called on offset commits + * + * @param callable $oauthBearerCallback + * @return KafkaConsumerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self; /** * Returns your consumer instance diff --git a/src/Producer/KafkaProducerBuilder.php b/src/Producer/KafkaProducerBuilder.php index bf564f3..d158a8d 100644 --- a/src/Producer/KafkaProducerBuilder.php +++ b/src/Producer/KafkaProducerBuilder.php @@ -39,6 +39,11 @@ final class KafkaProducerBuilder implements KafkaProducerBuilderInterface */ private $logCallback; + /** + * @var callable + */ + private $oauthBearerCallback; + /** * @var EncoderInterface */ @@ -131,6 +136,19 @@ public function withLogCallback(callable $logCallback): KafkaProducerBuilderInte return $this; } + /** + * Callback for OAuth Bearer Token refresh + * + * @param callable $oauthBearerCallback + * @return KafkaProducerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaProducerBuilderInterface + { + $this->oauthBearerCallback = $oauthBearerCallback; + + return $this; + } + /** * Lets you set a custom encoder for produce message * @@ -188,5 +206,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void if (null !== $this->logCallback) { $conf->setLogCb($this->logCallback); } + + if (null !== $this->oauthBearerCallback) { + $conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback); + } } } diff --git a/src/Producer/KafkaProducerBuilderInterface.php b/src/Producer/KafkaProducerBuilderInterface.php index c14a0bb..c8e6b05 100644 --- a/src/Producer/KafkaProducerBuilderInterface.php +++ b/src/Producer/KafkaProducerBuilderInterface.php @@ -50,6 +50,14 @@ public function withErrorCallback(callable $errorCallback): self; */ public function withLogCallback(callable $logCallback): self; + /** + * Callback for OAuth Bearer Token refresh + * + * @param callable $oauthBearerCallback + * @return KafkaProducerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self; + /** * Lets you set a custom encoder for produce message * diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 0519da4..936c574 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -203,6 +203,33 @@ public function testSetRebalanceCallback(): void self::assertArrayHasKey('rebalance_cb', $conf); } + /** + * @return void + * @throws \ReflectionException + */ + public function testSetOAuthBearerTokenRefreshCallback(): void + { + $callback = function () { + // Anonymous test method, no logic required + }; + + $clone = $this->kafkaConsumerBuilder->withOAuthBearerTokenRefreshCallback($callback); + + $reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback'); + $reflectionProperty->setAccessible(true); + + self::assertSame($callback, $reflectionProperty->getValue($clone)); + self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->withOAuthBearerTokenRefreshCallback($callback) + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('oauthbearer_token_refresh_cb', $conf); + } + /** * @return void * @throws \ReflectionException diff --git a/tests/Unit/Producer/KafkaProducerBuilderTest.php b/tests/Unit/Producer/KafkaProducerBuilderTest.php index 9df09db..13f6d15 100644 --- a/tests/Unit/Producer/KafkaProducerBuilderTest.php +++ b/tests/Unit/Producer/KafkaProducerBuilderTest.php @@ -108,6 +108,24 @@ public function testSetErrorCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); } + /** + * @return void + * @throws \ReflectionException + */ + public function testSetOAuthBearerTokenRefreshCallback(): void + { + $callback = function () { + // Anonymous test method, no logic required + }; + + $clone = $this->kafkaProducerBuilder->withOAuthBearerTokenRefreshCallback($callback); + + $reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback'); + $reflectionProperty->setAccessible(true); + + self::assertSame($callback, $reflectionProperty->getValue($clone)); + } + /** * @throws KafkaProducerException */ @@ -132,6 +150,7 @@ public function testBuild(): void ->withDeliveryReportCallback($callback) ->withErrorCallback($callback) ->withLogCallback($callback) + ->withOAuthBearerTokenRefreshCallback($callback) ->build(); self::assertInstanceOf(KafkaProducerInterface::class, $producer);