How to optimize MirrorMaker2 for High Performance Apache Kafka Replication

There are two things that have really contributed to Apache Kafka’s success as a messaging and streaming platform. First, it is extremely scalable and allows for dynamic growth for data distribution. This has been at the core of what Kafka was designed for from the very beginning, providing the ability to scale up a cluster of nodes to allow for increased message throughput on demand. The second is that Apache Kafka has been built by and for the developer. Having a developer-first approach allows Kafka to be tweaked, tuned and used in ways that would not have been originally imagined.

One of the areas that Apache Kafka has grown into is the area of data replication between clusters. This is a natural progression for any messaging or streaming platform that moves from application specific use cases into enterprise wide adoption — the need for data to be available and replicated outside of a single cluster and into a global ecosystem. Early versions of Kafka, using MirrorMaker 1, tried to solve the simplest use case by focusing on moving the data but not the context. This allowed for data recovery in the event of failure but not contextual recovery of the data stream. Applications had to track metadata about what clients had received, what messages had been consumed and where they left off in the data stream. Because of this, a number of third parties built solutions to solve the enterprise-class data replication challenge. Some of these solutions were open source but not open license. Of the truly open source solutions, many were designed for specific application paradigms, leading to a very confusing approach to how cluster data replication should be done in Apache Kafka.

Thankfully Apache Kafka 2.5 prioritized improvements in the native replication support provided by MirrorMaker 1, and MirrorMaker 2 was introduced as a new approach to providing data replication. In early versions of Kafka, MirrorMaker 1 acted like a simple client application. It used the Producer and Consumer APIs, which meant that it was configured, managed and operated just like any other consumer or producer application. It was essentially a purpose-built data pump to forward data from one cluster to the other. There was no native fault tolerance or high availability; all operational aspects were left to the administrator. In MirrorMaker 2, data replication has been moved into Kafka’s managed layer by using the Apache Kafka Connect framework. Data replication now acts like a connector, which allows MirrorMaker 2 to inherit all of the fault tolerant and high availability aspects of the connector framework.

The move to leveraging the connector framework for data replication makes perfect sense and provides significant improvements in management and performance, but changes like this are never easy. By moving the operational model to the connector framework configuration, optimization, tuning and management have all changed.

The migration from MirrorMaker1 to Mirrormaker2 highlights one huge benefit and one huge drawback that come with Apache Kafka being built by and for developers. The benefit is that, typically, tools that are built for developers tend to have many options that can be tweaked and tuned so that the tool can be adjusted to just the right settings for any developer’s needs. But this benefit is also its biggest drawback. Apache Kafka has many options that have to be tweaked and tuned so that the tool can be adjusted to just the right settings to meet the applications specific needs. From a development perspective, the more options the better, but from an operational perspective, too many options can be a nightmare.

This is the inherent challenge in leveraging tools like Apache Kafka and specifically MirrorMaker2 for cluster replication. Out of the box the configuration looks simple and easy with very few options that need to be addressed.

clusters = source, destination# connection information for each clustersource.bootstrap.servers = tibco-kafka-source:9092
destination.bootstrap.servers = tibco-kafka-destination:9092
# enable and configure individual replication flowssource->destination.enabled = true
source->destination.topics = TIBCO.STREAMING.*
destination->source.enabled = false
# mirror maker configurationtasks.max = 1
replication.factor = 1
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 5

The above example configuration quickly sets up replication from a source cluster to a destination cluster and enables unidirectional replication of all data published to topic TIBCO.STREAMING.* in the source cluster to be mirrored into the destination cluster. The complexity of the setup can be scaled up and hardened by increasing nodes in each cluster, increasing the replication factor, and/or enabling acks = all. This upfront simplicity makes it very easy to get replication up and running, but tuning and optimizing replication to actually meet the performance characteristics that are needed can be a real challenge.

There are many options that can and need to be tweaked depending on what goals your company needs with regards to replication and performance. In addition, how you

leverage Kafka plays a big role in how these options affect your replication optimization. A great example of this is tuning and optimizations will have different effects depending on how you manage ordering by using a single partition versus multiple partitions on a topic. Increasing options like tasks.max for example seems like a natural way to increase performance because it increases the number of tasks available to the MirrorMaker connector. In our setup above we have tasks.max set to 1 so all partitions on TIBCO.STREAMING.* will be handled by a single task in the MirrorMaker connector. Logically, it might make sense to increase the tasks.max to provide parallelism so that the number of tasks match the number of physical cores that are available across the MirrorMaker cluster. If all the topics encapsulated in TIBCO.STREAMING.* have many many partitions, this makes perfect sense, however if TIBCO.STREAMING.* only encapsulates two topics and each topic only has one partition then increasing tasks.max to something larger than 2 will not have the desired effect of increasing threading and parallelism on this setup.

While there are many options that can be tuned, a few of them have the biggest impact on improving performance in MirrorMaker 2. Several of the default server parameters anticipate large numbers of client connections, but this is typically not the case with MirrorMaker 2 using the connector interface. Therefore the default settings must be overwritten to achieve adequate performance with MirrorMaker 2. The following table provides recommended settings to improve throughput and performance when using MirrorMaker 2.

While there are many options in Kafka that will increase performance, the parameters above have the largest impact in opening up the throughput for replication using MirrorMaker 2. Now the next question is how to set these parameters. Of course the easiest approach is to set these directly in the broker configuration for both the source and destination cluster, however in many cases this is not feasible because the source and destination cluster also have client connections that should not inherit these larger values. Therefore these parameters need to be overwritten as part of the MirrorMaker 2 connector configuration.

Overwriting server properties in MirrorMaker 2 is a little more complex compared to MirrorMaker 1, because MirrorMaker 2 now uses the connector interface. Thankfully there is a method that allows for specific control over setting these parameters in both a standalone or dedicated cluster when using MirrorMaker 2. Overwriting these parameters takes on the syntax of cluster.paramater = value and they can be passed into a dedicated MirrorMaker 2 cluster when using the bin/connect-mirror-maker.sh script. When using this script a mm2.properties file is required so adding parameters to our original configuration above allows for MirrorMaker to overwrite the default server parameters.

clusters = source, destination# connection information for each clustersource.bootstrap.servers = tibco-kafka-source:9092
destination.bootstrap.servers = tibco-kafka-destination:9092
# enable and configure individual replication flowssource->destination.enabled = true
source->destination.topics = TIBCO.STREAMING.*
destination->source.enabled = false
# mirror maker configurationtasks.max = 1
replication.factor = 1
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 5
# source cluster over writessource.max.poll.records = 20000
source.receive.buffer.bytes = 33554432
source.send.buffer.bytes = 33554432
source.max.partition.fetch.bytes = 33554432
source.message.max.bytes = 37755000
source.compression.type = gzip
source.max.request.size = 26214400
source.buffer.memory = 524288000
source.batch.size = 524288
# destination cluster over writesdestination.max.poll.records = 20000
destination.receive.buffer.bytes = 33554432
destination.send.buffer.bytes = 33554432
destination.max.partition.fetch.bytes = 33554432
destination.message.max.bytes = 37755000
destination.compression.type = gzip
destination.max.request.size = 26214400
destination.buffer.memory = 524288000
destination.batch.size = 52428

Lastly, it is also possible to overwrite these values for the specific connector/connection paths. For example, if I wanted to set the max.partition.fetch.bytes to one value for replication from source->destination and different value for destination->source, I could do that by setting the parameter like below.

clusters = source, destination# connection information for each clustersource.bootstrap.servers = tibco-kafka-source:9092
destination.bootstrap.servers = tibco-kafka-destination:9092
# enable and configure individual replication flowssource->destination.enabled = true
source->destination.topics = TIBCO.STREAMING.*
destination->source.enabled = true
destination->source.topics = TIBCO.STREAMING.RESPONSE.*
# mirror maker configurationtasks.max = 1
replication.factor = 1
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 5
# direction specific settingssource->destination.max.partition.fetch.bytes = 33554432
destination->source.max.partition.fetch.bytes = 16777216

For more detailed information on how to deploy and use Apache Kafka using MirrorMaker 2 or more details on how to leverage Apache Kafka in enterprise communication infrastructure, please visit TIBCO’s community site (https://community.tibco.com/wiki/tibco-messaging-and-tibco-activespaces-article-links-quick-access) which includes a special area just for enterprise messaging, data streaming and enabling event driven architectures using messaging communications.

Messaging Evangelist, with a background in Computer Science and Communications working on messaging and streaming communication for over 20 years.