Skip to content

Commit

Permalink
add oauth callback (#7)
Browse files Browse the repository at this point in the history
* add oauth callback

* fix typos

* fix test

* adjust interfaces
  • Loading branch information
nick-zh authored Sep 7, 2021
1 parent d2f1403 commit c0c7b80
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ test: pcov-disable

static-analysis: pcov-disable
mkdir -p build/logs/phpstan
${PHPSTAN} analyse --no-progress --memory-limit=64
${PHPSTAN} analyse --no-progress --memory-limit=128M

update-dependencies:
composer update
Expand Down
20 changes: 19 additions & 1 deletion src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
/**
* @var callable
*/
private $consumeCallback;
private $oauthBearerCallback;

/**
* @var callable
Expand Down Expand Up @@ -228,6 +228,20 @@ public function withOffsetCommitCallback(callable $offsetCommitCallback): KafkaC
return $that;
}

/**
* Set callback that is being called on offset commits
*
* @param callable $oauthBearerCallback
* @return KafkaConsumerBuilderInterface
*/
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaConsumerBuilderInterface
{
$that = clone $this;
$that->oauthBearerCallback = $oauthBearerCallback;

return $that;
}

/**
* Lets you set a custom decoder for the consumed message
*
Expand Down Expand Up @@ -292,5 +306,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void
if (null !== $this->offsetCommitCallback) {
$conf->setOffsetCommitCb($this->offsetCommitCallback);
}

if (null !== $this->oauthBearerCallback) {
$conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback);
}
}
}
10 changes: 9 additions & 1 deletion src/Consumer/KafkaConsumerBuilderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,15 @@ public function withDecoder(DecoderInterface $decoder): self;
* @param callable $logCallback
* @return KafkaConsumerBuilderInterface
*/
public function withLogCallback(callable $logCallback): KafkaConsumerBuilderInterface;
public function withLogCallback(callable $logCallback): self;

/**
* Set callback that is being called on offset commits
*
* @param callable $oauthBearerCallback
* @return KafkaConsumerBuilderInterface
*/
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self;

/**
* Returns your consumer instance
Expand Down
22 changes: 22 additions & 0 deletions src/Producer/KafkaProducerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ final class KafkaProducerBuilder implements KafkaProducerBuilderInterface
*/
private $logCallback;

/**
* @var callable
*/
private $oauthBearerCallback;

/**
* @var EncoderInterface
*/
Expand Down Expand Up @@ -131,6 +136,19 @@ public function withLogCallback(callable $logCallback): KafkaProducerBuilderInte
return $this;
}

/**
* Callback for OAuth Bearer Token refresh
*
* @param callable $oauthBearerCallback
* @return KafkaProducerBuilderInterface
*/
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaProducerBuilderInterface
{
$this->oauthBearerCallback = $oauthBearerCallback;

return $this;
}

/**
* Lets you set a custom encoder for produce message
*
Expand Down Expand Up @@ -188,5 +206,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void
if (null !== $this->logCallback) {
$conf->setLogCb($this->logCallback);
}

if (null !== $this->oauthBearerCallback) {
$conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback);
}
}
}
8 changes: 8 additions & 0 deletions src/Producer/KafkaProducerBuilderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public function withErrorCallback(callable $errorCallback): self;
*/
public function withLogCallback(callable $logCallback): self;

/**
* Callback for OAuth Bearer Token refresh
*
* @param callable $oauthBearerCallback
* @return KafkaProducerBuilderInterface
*/
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self;

/**
* Lets you set a custom encoder for produce message
*
Expand Down
27 changes: 27 additions & 0 deletions tests/Unit/Consumer/KafkaConsumerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,33 @@ public function testSetRebalanceCallback(): void
self::assertArrayHasKey('rebalance_cb', $conf);
}

/**
* @return void
* @throws \ReflectionException
*/
public function testSetOAuthBearerTokenRefreshCallback(): void
{
$callback = function () {
// Anonymous test method, no logic required
};

$clone = $this->kafkaConsumerBuilder->withOAuthBearerTokenRefreshCallback($callback);

$reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback');
$reflectionProperty->setAccessible(true);

self::assertSame($callback, $reflectionProperty->getValue($clone));
self::assertNotSame($clone, $this->kafkaConsumerBuilder);

$consumer = $clone
->withAdditionalBroker('localhost')
->withSubscription('test')
->withOAuthBearerTokenRefreshCallback($callback)
->build();
$conf = $consumer->getConfiguration();
self::assertArrayHasKey('oauthbearer_token_refresh_cb', $conf);
}

/**
* @return void
* @throws \ReflectionException
Expand Down
19 changes: 19 additions & 0 deletions tests/Unit/Producer/KafkaProducerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ public function testSetErrorCallback(): void
self::assertSame($callback, $reflectionProperty->getValue($clone));
}

/**
* @return void
* @throws \ReflectionException
*/
public function testSetOAuthBearerTokenRefreshCallback(): void
{
$callback = function () {
// Anonymous test method, no logic required
};

$clone = $this->kafkaProducerBuilder->withOAuthBearerTokenRefreshCallback($callback);

$reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback');
$reflectionProperty->setAccessible(true);

self::assertSame($callback, $reflectionProperty->getValue($clone));
}

/**
* @throws KafkaProducerException
*/
Expand All @@ -132,6 +150,7 @@ public function testBuild(): void
->withDeliveryReportCallback($callback)
->withErrorCallback($callback)
->withLogCallback($callback)
->withOAuthBearerTokenRefreshCallback($callback)
->build();

self::assertInstanceOf(KafkaProducerInterface::class, $producer);
Expand Down

0 comments on commit c0c7b80

Please sign in to comment.