Finally a Google Cloud Pub/Sub Java 8 driver that you can wrap your head around its internals and put the fun (i.e., backpressure-aware, reactive, efficient, batteries-included, and simple!) back into messaging.
-
It is backpressure-aware, because it only pulls batch of messages whenever you need one and tell it to pull one. It doesn’t operate a covert thread (pool) to pull whenever it sees a fit and ram it through the application.
-
It is reactive, because every request is non-blocking, asynchronous, and wired up with the rest of the application using Reactive Streams.
-
It is efficient, because it works in batches, the basic unit of message exchange between your driver and Pub/Sub. You
pull()
some, youack()
some. One-message-at-a-time is an illusion created by drivers and incurs a significant performance penalty along with operational complexity under the hood. -
It is batteries-included, because it provides goodies (out of the box metrics integration, an adaptive rate limiter to help you avoid burning money by continuously pulling and nack’ing messages when something is wrong with your consumer, a
ScheduledExecutorService
implementation with a bounded task queue to mitigate backpressure violating consumption) that assist real-world production deployments. -
It is simple, because there are 2 dozens of classes where half is used to represent JSON models transmitted over the wire and the rest is just reactive streams gluing. There are no smart retry mechanisms (though thanks to reactive streams, you can introduce yours), there are no message lease time extending background tasks (hence, you better consume your batch before it times out), etc. Spend a minute on the source code and you are ready to write your own Pub/Sub driver!
Due to the constraints on the development resources (read as, "it is just me"), I needed to take some shortcuts to get this project going:
-
Reactor Core for Reactive Streams
-
Reactor Netty for HTTP requests
-
Google Auth Library[1] for authentication
-
Micrometer for metrics (optional)
You first need to add the reactor-pubsub
artifact into your list of
dependencies:
<dependency>
<groupId>com.vlkan</groupId>
<artifactId>reactor-pubsub</artifactId>
<version>${reactor-pubsub.version}</version>
</dependency>
(Note that the Java 9 module name is com.vlkan.pubsub
.)
You can create a publisher and publish a message as follows:
// Create the publisher.
String projectName = "awesome-project";
PubsubPublisherConfig publisherConfig = PubsubPublisherConfig
.builder()
.setProjectName(projectName)
.setTopicName("awesome-topic")
.build();
PubsubPublisher publisher = PubsubPublisher
.builder()
.setConfig(publisherConfig)
.build();
// Publish a message.
publisher
.publishMessage(
new PubsubDraftedMessage(
"Yolo like nobody's watching!"
.getBytes(StandardCharsets.UTF_8)))))
.doOnSuccess(publishResponse ->
System.out.format(
"Published awesome message ids: %s%n",
publishResponse.getMessageIds()))
.subscribe();
Note that PubsubDraftedMessage
constructor has two variants:
-
PubsubDraftedMessage(byte[] payload)
-
PubsubDraftedMessage(byte[] payload, Map<String, String> attributes)
PubsubPublisher
provides the following auxiliary methods:
-
publishMessages(List<PubsubDraftedMessage> messages)
-
publishMessage(List<PubsubDraftedMessage> message)
-
publish(PubsubPublishRequest publishRequest)
You can create a subscriber and start receiving messages from a subscription as follows:
// Create a puller.
String subscriptionName = "awesome-subscription";
PubsubPullerConfig pullerConfig = PubsubPullerConfig
.builder()
.setProjectName(projectName)
.setSubscriptionName(subscriptionName)
.build();
PubsubPuller puller = PubsubPuller
.builder()
.setConfig(pullerConfig)
.build();
// Create an acker.
PubsubAckerConfig ackerConfig = PubsubAckerConfig
.builder()
.setProjectName(projectName)
.setSubscriptionName(subscriptionName)
.build();
PubsubAcker acker = PubsubAcker
.builder()
.setConfig(ackerConfig)
.build();
// Pull and consume continuously.
puller
.pullAll()
.concatMap(pullResponse -> {
int messageCount = pullResponse.getReceivedMessages().size();
System.out.format("Just got awesome %d messages!%n", messageCount);
return acker.ackPullResponse(pullResponse);
})
.subscribe();
Note that PubsubAcker
provides the following auxiliary methods:
-
ackPullResponse(PubsubPullResponse pullResponse)
-
ackMessages(List<PubsubReceivedMessage> messages)
-
ackMessage(PubsubReceivedMessage message)
-
ackIds(List<String> ackIds)
-
ackId(String ackId)
-
ack(PubsubAckRequest ackRequest)
The project ships a couple of utilities where you might find them handy in assembling your messaging pipeline. Even though they are optional, we strongly recommend their usage.
We strongly encourage everyone to employ the provided rate limiter while consuming messages. The rationale is simple: In order to avoid burning GCP bills for nothing, you better cut down the consumption rate if the rest of the system is indicating a failure.
reactor-pubsub
provides the following utilities for rate limiting purposes:
-
RateLimiter
is a simple (package local) rate limiter. -
StagedRateLimiter
is a rate limiter with multiple stages. Each stage is composed of a success rate and failure rate pair. In the absence of failure acknowledgements, excessive permit claims replace the active stage with the next faster one, if there is any. Likewise, excessive failure acknowledgements replace the active stage with the next slower one, if there is any.
One can employ the StagedRateLimiter
for a PubsubPuller
as follows:
// Create the staged rate limiter and its reactor decorator.
String stagedRateLimiterName = projectName + '/' + subscriptionName;
StagedRateLimiter stagedRateLimiter = StagedRateLimiter
.builder()
.setName(stagedRateLimiterName)
.setSpec("1/1m:, 1/30s:1/1m, 1/1s:2/1m, :1/3m") // (default)
.build();
StagedRateLimiterReactorDecoratorFactory stagedRateLimiterReactorDecoratorFactory =
StagedRateLimiterReactorDecoratorFactory
.builder()
.setStagedRateLimiter(stagedRateLimiter)
.build();
Function<Flux<PubsubPullResponse>, Flux<PubsubPullResponse>> stagedRateLimiterFluxDecorator =
stagedRateLimiterReactorDecoratorFactory.ofFlux();
// Employ the staged rate limiter.
puller
.pullAll()
.concatMap(pullResponse -> {
// ...
return acker.ackPullResponse(pullResponse);
})
.transform(stagedRateLimiterFluxDecorator)
.subscribe();
The stages are described in increasing success rate limit order using a
specification format as follows: 1/1m:, 1/30s:1/1m, 1/1s:2/1m, :1/3m
. The
specification is a comma-separated list of [success rate limit]:[failure rate
limit] pairs where, e.g., 1/1h
is used to denote a rate limit of a single
permit per 1 hour. Temporal unit must be one of h(ours), m(inutes), or
s(econds). The initial failure rate limit and the last success rate limit can be
omitted to indicate no rate limits.) This example will result in the following
stages.
StagedRateLimiter
stages for specification 1/1m:, 1/30s:1/1m, 1/1s:2/1m, :1/3m
.
stage | success rate limit | failure rate limit |
---|---|---|
1 |
1/1m (once per minute) |
infinite |
2 |
1/30s (once per 30 second) |
1/1m (once per minute) |
3 |
1/1s (once per second) |
2/1m (twice per minute) |
4 |
infinite |
1/3m (once per 3 minute) |
By contract, initially the active stage is set to the one with the slowest success rate limit.
PubsubPuller
, PubsubAccessTokenCache
, and
StagedRateLimiterReactorDecoratorFactory
optionally receive either a
ScheduledExecutorService
or a Reactor Scheduler
in their builders for timed
invocations. One can explicitly change the implicit scheduler used by any
Reactor Mono<T>
or Flux<T>
as well. (See
Threading and
Schedulers in Reactor reference manual.) We strongly suggest employing a common
dedicated scheduler for all these cases with a bounded task queue. That said,
unfortunately neither the default Reactor Scheduler
s nor the
ScheduledExecutorService
implementations provided by the Java Standard library
allow one to put a bound on the task queue size. This shortcoming is severely
prone to hiding backpressure problems. (See the
the
relevant concurrency-interest discussion.) To mitigate this, we provide
BoundedScheduledThreadPoolExecutor
wrapper and strongly recommend to employ it
in your Reactor assembly line. Even though this will incur an extra thread
context switching cost, this is almost negligible for a majority of the use
cases and the benefit will overweight this minor expense. The usage is as simple
as follows:
// Create the executor.
ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors());
BoundedScheduledThreadPoolExecutor boundedExecutor =
new BoundedScheduledThreadPoolExecutor(100, executor);
Scheduler scheduler = Schedulers.fromExecutorService(boundedExecutor);
// Set the access token cache executor.
PubsubAccessTokenCache
.builder()
.setExecutorService(executor)
// ...
.build();
// Set the puller scheduler.
PubsubPuller puller = PubsubPuller
.builder()
.setScheduler(scheduler)
// ...
.build();
// Employ the scheduler in the Reactor pipeline.
puller
.pullAll()
.concatMap(pullResponse -> {
// ...
return acker.ackPullResponse(pullResponse);
})
.flatMap(this::doSomeOtherAsyncIO)
.subscribeOn(scheduler)
.subscribe();
It is a common pitfall to build a message consumption pipeline as follows:
puller
.pullAll()
.concatMap(pullResponse -> businessLogic
.execute(pullResponse)
.then(acker.ackPullResponse(pullResponse)))
.subscribe();
Here the Flux<PubsubPullResponse>
returned by pullAll()
will be terminated
if any of the methods along the reactive chain (pullAll()
,
businessLogic.execute()
, ack()
, etc.) throws an exception. No matter how
many doOnError()
, onErrorResume()
you plaster there, the damage has been
done, the subscription has been cancelled, and pullAll()
will not continue
pulling anymore. Note that this applies to any
Flux
and nothing
new to the way we leverage it here. To prevent such premature stream
termination, you need to retry subscribing. While this can be done as simple as
calling retry()
, you might also want to check out more fancy options like
retryBackoff()
. As one final remark, make sure you deal (log?) with the error
prior to retrying.
See
How
to use retryWhen
for exponential backoff? in Reactor reference manual.
Unless one provided, all PubsubPublisher
, PubsubPuller
and PubsubAcker
classes use the PubsubAccessTokenCache.getDefaultInstance()
and
PubsubClient.getDefaultInstance()
defaults. By default,
PubsubAccessTokenCache
leverages GoogleCredentials.getApplicationDefault()
provided by the google-auth-library-oauth2-http
artifact. This function
determines the credentials by trying out the following steps in order:
-
Credentials file pointed to by the
GOOGLE_APPLICATION_CREDENTIALS
environment variable -
Credentials provided by the Google Cloud SDK
gcloud auth application-default login
command -
Google App Engine built-in credentials
-
Google Cloud Shell built-in credentials
-
Google Compute Engine built-in credentials
Rather than relying on this mechanism, one can explicitly set the credentials as follows:
// Create the access token cache.
PubsubAccessTokenCache accessTokenCache = PubsubAccessTokenCache
.builder()
.setCredentials("awesome-password") // null falls back to the defaults
.build();
// Create the client.
PubsubClient client = PubsubClient
.builder()
.setAccessTokenCache(accessTokenCache)
.build();
// Create the puller.
PubsubPuller puller = PubsubPuller
.builder()
.setClient(client)
// ...
.build();
// Create the ack'er.
PubsubAcker acker = PubsubAcker
.builder()
.setClient(client)
// ...
.build();
// Create the publisher.
PubsubPublisher publisher = PubsubPublisher
.builder()
.setClient(client)
// ...
.build();
Given Micrometer is used for metrics, you first need to have it in your list of dependencies:
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
Both PubsubClient
and StagedRateLimiterReactorDecoratorFactory
provide
means to configure metrics. Each can be simply configured as follows:
// Create a meter registry.
MeterRegistry meterRegistry = ...;
// Pass the meter registry to the Pub/Sub client.
PubsubClient
.builder()
.setMeterRegistry(meterRegistry)
.setMeterNamePrefix("pubsub.client") // default
.setMeterTags(Collections.emptyMap()) // default
// ...
.build();
// Pass the meter registry to the rate limiter factory.
StagedRateLimiterReactorDecoratorFactory
.builder()
.setMeterRegistry(meterRegistry)
.setMeterNamePrefix("pubsub.stagedRateLimiter") // default
.setMeterTags(Collections.emptyMap()) // default
// ...
.build();
Above will publish metrics with the following footprints:
Name | Tags | Description |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
permit wait period distribution summary |
There are a couple of details that need further elaboration here:
-
When
PubsubPullerConfig#pullPeriod
is set to zero (default),pull
requests will only get completed when there are messages. Hence, one might experience high latencies in queues that frequently become empty. -
When
PubsubPullerConfig#pullPeriod
is set to a value greater than zero, repeatedly executedpull
requests byPubsubPuller#pullAll()
will get followed by apullPeriod
delay after an empty response. Hence the publishedpubsub.client.pull.latency
metrics are a combination of both the full and the empty responses. -
As of this writing, Pub/Sub blocks every
pull
requests at least ~1.5 seconds before returning an empty response.
Pub/Sub provides an emulator
to test your applications locally. In order to use it in combination with
reactor-pubsub
, you need to configure the baseUrl
of the PubsubClient
as
follows:
// Create a custom client.
PubsubClientConfig clientConfig = PubsubClientConfig
.builder()
.setBaseUrl("http://localhost:8085")
.build();
PubsubClient client = PubsubClient
.builder()
.setConfig(clientConfig)
.build();
// Create a publisher.
PubsubPublisher publisher = PubsubPublisher
.builder()
.setClient(client)
// ...
.build();
// Create a puller.
PubsubPuller puller = PubsubPuller
.builder()
.setClient(client)
// ...
.build();
// Create an acker.
PubsubAcker acker = PubsubAcker
.builder()
.setClient(client)
// ...
.build();
One of the most frequent questions reactor-pubsub
is challenged with is how
does it perform given the official Pub/Sub client uses Protobuf over HTTP/2
(gRPC), whereas reactor-pubsub
uses JSON over HTTP/1?
Before going into convincing figures to elaborate on reactor-pubsub
s
performance characteristics, there is one thing that deserves attention in
particular: JSON over HTTP/1 is a deliberate design decision for simplicity
rather than a fallback due to technical limitations. Even though it is
opinionated, reactor-pubsub
strives to serve as a Pub/Sub client that
leverages frequently used tools (e.g., JSON) and idioms Java developers are
accustomed to. Further, in case of failures, it should be trivial to spot the
smoking gun using a decent IDE debugger.
reactor-pubsub
source code ships a reproducible benchmark
along with its results. As shared there, one can
retrieve (i.e., pull
& ack
) a payload of 781 MiB in 2,083 ms using two
2.70GHz CPU cores, pull batch size 50, pull concurrency 5, and message payload
length 16 KiB. That is, 11,998 messages per second on a single core!* Do you
still need more juice? 🙇 Go ahead and create a ticket with your use case,
observed performance, and implementation details.
I (Volkan Yazıcı) would like to take this opportunity to share the historical account from my perspective to justify the effort and defend it against any potential NIH syndrome accusations.
Why did I feel a need to implement a Pub/Sub Java driver from scratch? At
bol.com, we heavily use Pub/Sub. There we started our pursuit
like the rest of the Pub/Sub users with
the official
Java drivers provided by Google. Later on we started bumping into backpressure
problems: tasks on the shared ScheduledExecutorService
were somehow awkwardly
dating back and constantly piling up. That was the point I introduced a
BoundedScheduledThreadPoolExecutor
and shit hit the fan. I figured the official Pub/Sub driver was ramming the
fetched batch of messages through the shared executor. My first reaction was to
cut down the pull buffer size and the concurrent pull count. That solved a
majority of our backpressure-related problems, though created a new one:
efficiency. Then I started examining the source code and wasted quite a lot of
time trying to make forsaken
FlowControlSettings
work. This disappointing inquiry resulted in something remarkable: I understood
how Pub/Sub works and amazed by the extent of complexity for such a simple task.
I have already been using Reactive Streams (RxJava and Reactor) every single
work day in the last five years and compiled a thick collection of lessons and
recipes out of it. The more I examined the official Pub/Sub Java driver source
code, the more I was convinced that I could very well engineer this into
something way more simple. I know how to pump JSON payloads over HTTP via
Reactor Netty and enjoy a backpressure-aware, reactive comfort out of the box.
But that wasn’t the tipping point I had decided to implement my own Pub/Sub Java
driver. I made my mind when I witnessed that
Google
engineers are clueless about these problems.
Why all the fuss about the rate limiting? One morning I came to the office and read an e-mail from one of the platform teams asking how come we managed to burn hundreds of dollars worth of Pub/Sub messaging in the middle of the night. One of the application (non-critical) databases happened to go down for a couple of hours and during that period nodes constantly sucked up messages and nack’ed them due to the database failure. This is an opinionated Pub/Sub driver and in my opinion you should not relentlessly burn Pub/Sub bills if the rest of the application is shouting out there is something going on wrong. Hence, please configure and use the god damn rate limiter.
If you have encountered an unlisted security vulnerability or other unexpected behaviour that has security impact, please report them privately to the [email protected] email address.
Copyright © 2019-2020 Volkan Yazıcı
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.