If only there was a way to glue all this together…There is a way, Alpakkas are here to save us!What is Alpakka?The Alpakka project is an open source initiative to implement stream-aware and reactive integration pipelines for Java and Scala.
It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure.
We will be talking about integration with Kafka, you cand find the alpakka-kafka repository here, and like in the rest of the akka community you can find excellent documentation here.
Ok, but… what is Akka Streams?Akka Stream is an implementation of the Reactive Streams project based on Akka actors, there are many more like RxJava, Monix and Spring Reactor.
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
Akka Streams documentation explains it as follows:The purpose is to offer an intuitive and safe way to formulate stream processing setups such that we can then execute them efficiently and with bounded resource usage — no more OutOfMemoryErrors.
In order to achieve this our streams need to be able to limit the buffering that they employ, they need to be able to slow down producers if the consumers cannot keep up.
This feature is called back-pressure and is at the core of the Reactive Streams initiative of which Akka is a founding member.
For you this means that the hard problem of propagating and reacting to back-pressure has been incorporated in the design of Akka Streams alreadyThe back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand.
The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.
It’s just what wee need to prevent the fast consumer – slow consumer problem@impurepics explains it in a more graphic wayLet’s see an exampleThe important thing to notice here is that the zip stage consumes at a fixed rate of 1 element per second, but its source could generate elements much faster than that (in fact it could generate all elements at once if there was demand downstream), however being akka streams demand based, the rates from both stages are restricted to the slowest one, in others words the zip demands to the source 1 element each second hence the source only generates one element per second.
We need a source of retry records that we could backpressure when we need to delay processing, and that source should translate that backpressure to the KafkaConsumer pausing it.
Here is where alpakka-kafka comes into scene.
It provide us with a Source of ConsumerRecord[K, V] that comes in various flavors depending on how we want to deal with commits, maybe we want to use external storage or it is ok for us to use auto commit and get at-most-once delivery semantics.
We want at-least-once delivery semantics and store offsets in Kafka, so we will choose Consumer.
When we use the Consumer.
committableSource the Source that we get emits elements of type CommittableMessage[K, V], this elements contains the kafka ConsumerRecord[K, V] but also a CommittableOffset that have a method commitScaladsl and holds a reference to the KafkaConsumer that is created by the Source.
Strictly speaking it holds a reference to an actor KafkaAsyncConsumerCommitterRef that is created whithin the Source and sends messages to another actor KafkaConsumerActor.
This is the one where all the logic is implemented and is the one that creates the KafkaConsumer and also is the one that will pause and resume the KafkaConsumer depending on downstream demand.
So the Committer.
sink is extremely simple, just builds batches of CommittableOffset and calls commit.
Re implementing retries with alpakkaWe now have a Source of records that we could backpressure, but we still must solve the non-blocking delay.
I’ve implemented a very simple custom GraphStage named DelayFlow to achieve this.
It uses akka.
Scheduler provided by the ActorSystem to schedule the record processing when is not the due date yet.
This is a “Light-weight scheduler for running asynchronous tasks after some deadline in the future.
Not terribly precise but cheap” and is based on a Hashed Wheel Timer.
That’s it, now we have a Flow that can be attached to the alpakka source to delay records processing.
Using a single KafkaConsumer to fetch from all topicsThere is one subtle detail that I wish to add.
Because we will be using N topics to achieve different delays (thanks Lukasz) we could start N streams, that means N KafkaConsumerActor, N KafkaConsumer, N HeartbeatThread, you get the idea.
Luckily for us KafkaConsumer can suscribe to multiple topics and alpakka sources doesn’t prevent us from using this feature, we can pass a list of topic names to Subscriptions.
This brings a better resource usage but not without cost.
Now we are mixing again different delays because the Source will emit a single stream of merged records from the different topics, so the DelayFlow now may be receiving an element from topic 30_min that must be delayed before an element from topic 5_min that can be processed right away.
(2)Alpakka saves us again, besides the CommittableSource that we already talk about it provides a Consumer.
It’s a Source of Sources and its type looks like this: Source[(TopicPartition, Source[CommittableMessage[K, V], NotUsed]), Control].
commitWithMetadataPartitionedSource support tracking the automatic partition assignment from Kafka.
When a topic-partition is assigned to a consumer, this source will emit a tuple with the assigned topic-partition and a corresponding source.
When a topic-partition is revoked, the corresponding source completes.
With this we can attach a different instance of DelayFlow to each partition and avoid the mixed delays problem.
Hooray!The last step is to merge the Flows of CommittableFull code can be found here.
I hope you have enjoyed this post.
(1) Kafka guarantees that “messages sent by a producer to a particular topic partition will be appended in the order they are sent” but doesn’t mean that messages from different producers are appended in the order they are sent because there is no global order between different processes and due to network delays and unsynchronized clocks you could see cases where offset X has a slightly greater timestamp than offset X + 1(2) After my first review of this post I’ve noticed that this is true even if we only subscribe to a single topic because a consumer may be assigned with more than one partition and there are no order guarantees between partitions.
However this has a greater impact when we subscribe to topics with different delays, e.
5 min and 30 min.