Oracle Service Bus Transport for Apache Kafka (Part 2)


The first part of this article briefly discussed the motivations that leads to the usage of Kafka in software architectures, and also focused on how the Kafka transport can be installed and how to create Proxy and Business services to read and write from/to Kafka topics.

As expected, as more people use the Kafka transport more they will want to know about how to extract the best performance from it. After all, one of the key reasons to adopt Kafka is high throughput and low latency messaging, reason why it would not make any sense to have a layer in the architecture incapable to keep up with that kind of performance.

The Kafka transport is provided for free to use “AS-IS” but without any official support from Oracle. Bugs, feedback and enhancement requests are welcome but need to be performed on the transport repository on Oracle’s GitHub. The A-Team reserves the right of help in the best-effort capacity.

This second part of the article will cover advanced details related to the Kafka transport, specifically how it can be configured to increase performance and better leverage the hardware resources. Most of the tips shown in this article come from Kafka best practices, reason why this article will assume that you possess a minimal knowledge about how Kafka works. A detailed discussion about the Kafka design is available here.

If achieving better performance is your main objective, please consider other references rather than only this article. Software architectures have many layers that need to be optimized, and the Service Bus transport layer is just one of them. Service Bus/WebLogic provides several tools and resources for fine tuning, and it is strongly recommended to explore each one of them carefully. Also, the design of your Pipelines has a considerable amount of responsibility in how fast your services perform. Excessive use of Service Callouts during the message processing, the usage of blocking threads in Java Callouts, too much payload transformations and JVM garbage collection are some of the factors to consider. Keep these factors in mind while developing your services.

Finally, this article will also show how to capture common issues with few troubleshooting techniques. These techniques can be interesting to double-check if something is working as expected or to find out why something is not working.

Implementing Fault-Tolerance in the Kafka Transport

Software-based systems must have the fault-tolerance attribute measured not only by how much it can handle failures by itself, but also in terms of how fault-tolerant its dependencies are. Typical dependencies that must be considered are the sustaining infrastructure such as the hardware, the network, the operating system, the hypervisor (when using virtualization) and any other stack above those layers.

Equally important are the third-part systems that the system relies on. If to perform a specific task the system needs to interface with other systems, those other systems heavily influence its availability. For instance, a system that depends on other six systems cannot score 99% of fault-tolerance; it should score 93% in the best case scenario, because the other six systems must be continuously up-and-running in order to provide the remaining 6%. The Kafka transport has dependencies that need to be properly understood in order to provide fault-tolerance.

Let’s start discussing Zookeeper. Since the very beginning, Kafka relies on Zookeeper to perform some important tasks, such as using it to perform new leader election for a partition when some of the broker dies. Zookeeper is also used as a repository to maintain offsets, so instead of delegate to the consumers the responsibility of managing the offsets (which would lead to the use of custom coordination algorithms) it can be stored in Zookeeper and the consumers just go there and pick it up.

Recent versions of Kafka allows that the brokers can be used as repository to maintain the offsets. In fact, this is considered the best practice since the brokers provides much better write performance compared to Zookeeper. This change of behavior can be configured in the Kafka transport using the ‘Offsets Storage’ and ‘Dual Commit Enable’ fields, and it is only available for the Proxy Services.

Zookeeper provides built-in clustering features and you should leverage these features, along with other infrastructure details, in your deployment. The official Kafka documentation has a section to discuss that. Good highly available Zookeeper deployments have at least two and preferably three instances of it up-and-running. From the Kafka transport perspective, the Zookeeper instances need to be configured in the Proxy Services via the Endpoint URI field.


The second important dependency is the Kafka brokers itself. While the information about the brokers is not relevant for the Proxy Services, it is a requirement for the Business Services. The Proxy Services acts as consumers to Kafka and consumers interact to Zookeeper to discover which brokers are available. But in the case of Business Services they act as producers, and because that they need to directly interact with the brokers to transmit the messages. Therefore, when configuring the endpoints of your Business Services, you need to specify at least one, preferably two, Kafka brokers:


It is important to note that there is no need to configure all the Kafka instances in your Business Service. The instances described in the Endpoint URIs field are used only to perform the initial connection to the Kafka cluster, and then the full set of instances is automatically discovered. For this reason, set at least two instances; so if one instance is down during the endpoint activation, the other one can take over the discovery process.

This discovery process does not happen only during endpoint activation. Instead, the Kafka transport periodically asks for the current state of the cluster about running instances. This is extremely important for the Business Services to effectively load balance across partitions while transmitting the messages. It would make no sense keep a static state of the cluster while the brokers are insert/removed from the cluster. This could seriously affect the system availability and scalability. To control the frequency of this discovery process you can use the Metadata Fetch Timeout and Maximum Metadata Age fields.


Fine Tuning for High Throughput Message Consumption

In Kafka, when a message is sent to a topic, the message is broadcasted to any consumer that subscribes to that topic. In this context, a consumer is a logical entity that is associated to a unique group. For this to work, each consumer must carry a group identifier. If two or more processes subscribe to a topic and share the same group identifier, then they will be handled as one single consumer. By using this design, Kafka can have the guarantee that each consumer will be able to perform its task in a fault-tolerant and scalable fashion, due the use of multiple processes instead of one.

The concept of groups in Kafka is very powerful, by simplifying the way to implement messaging styles like P2P (Point-To-Point) and Publish/Subscribe. Unlike other messaging systems like JMS, Kafka does not use different types of destinations. There is one single abstraction called topics. The differentiation of how a message is going to be delivered, whether using P2P or Publish/Subscribe, is a simple question of having processes belonging to different groups.


In the Kafka transport, you can use the Group Identifier field to configure which group the Proxy Service will belong. The Group Identifier field is available in the Transport Details tab of the Proxy Service.


When a new Proxy Service is created, the Kafka transport take care of copying the name you set for it to the Group Identifier field, in order to make sure that the Proxy Service will always belong to a group. But the Group Identifier field allows the value to be changed, reason why you should be extremely careful about the value you set. For instance, if you create two Proxy Services – in the same or different Service Bus projects – that subscribes to a topic to perform distinct tasks when a message arrives, it can happen that only one of them receives the message if they use the same group identifier. In order to work, you will have to set different group identifiers so then each Proxy Service processes a copy of the message.

If you are concerned about scalability, the concept of groups can be quite interesting to scale the system using the Service Bus cluster. In runtime, Service Bus deploys the Proxy Service on each managed server of the cluster. Because each copy of the Proxy Service uses the same Group Identifier value set in design time, for the Kafka perspective they will be seem as one single consumer, causing a natural load-balance between them to process the messages. That creates an interesting way to scale message consumption both vertically and horizontally. It can scale in (vertical scalability) because you can have a machine with multiple managed server JVMs, and it can scale out (horizontal scalability) because you can spread the managed servers across different machines in a cluster. Either way adding more hardware resources will positively impact the message consumption throughput.


The concept of groups can also be applied in case of multiple clusters. For instance, consider a Service Bus cluster running a Proxy Service that subscribes to a topic using the group identifier “X”. If another Service Bus cluster runs a Proxy Service that subscribes to the same topic and also uses “X” as the group identifier, then those Service Bus clusters will load-balance in an active-active style to process messages from the topic. Alternatively, if in this case different group identifiers are used, both of them will receive a copy of the messages, and by having only one Service Bus cluster actually processing the messages and having the other one in stand-by mode, it is possible to implement an active-passive style of processing. This can be particularly interesting if those Service Bus clusters are spread in different data centers.

Kafka also provides support for data centers synchronization via a feature called Data Mirroring. That feature can be useful to maintain different data centers synchronized, having data from one or more clusters from one data center copied to a remote destination cluster in another data center. You can read more information about this here.

Now let’s discuss another type of scalability, which is a variation of the vertical scalability. Having one machine with multiple managed server JVMs can help to leverage a system running with multiple CPU cores, because the tasks can be executed in parallel. For instance, considering a machine equipped with sixteen CPU cores. We could have four of these cores dedicated to operating system tasks and use the other twelve cores to process messages from Kafka, using twelve managed server JVMs. The problem with this approach is that we do not have any guarantees from the operating system that each core will be solely dedicated to one process, because from the operating system perspective; a task is executed in a CPU core by the usage of threads, and the operating system schedules the threads with priorities that we cannot control. While the operating system works to provide a fair chance to each process to run its tasks, it may happen that processes with larger number of threads get more priority, leaving other processes waiting.

But this can be achieved using multiple threads per managed server JVM, breaking down the messages coming from topics into multiple streams and process each stream with a thread. In Kafka, a stream is a high-speed data structure that continuously fetches data from a partition in the cluster. By default, each Proxy Service created is configured to handle one thread, consequentially being able to process only one stream. To increase the number of threads you can use the Consumer Threads field available in the Transport Detail tab of the Proxy Service.


In the example above, the Proxy Service is configured to create sixteen threads. During the endpoint activation the Kafka transport will request sixteen streams to the cluster and each one of these streams will be handled using a dedicated thread, increasing the message consumption throughput considerably.

Make sure to set enough partitions for the topic, otherwise this feature will not work correctly. The best practice is that the number of partitions of a topic should be equals to the number of consumer threads available. Partitions are configured in a per-topic basis, in the Kafka cluster.

Regardless of how many threads are configured for the Proxy Service, all the work related to process streams is scheduled by the Work Manager specified in the Dispatch Policy field. The default WebLogic Work Manager is selected when a Proxy Service is created, but it is strongly recommended to use a custom one, for two reasons. First, because the threads are kept in a waiting state to continuously listen for new messages, WebLogic can eventually presume that they become stucked and react accordingly. This situation can be avoided by using a custom Work Manager capable of ignoring stuck threads.

Secondly, because WebLogic uses a single-thread pool to execute any type of work, there is no way to guarantee that a Proxy Service will have a fair share from this pool unless of using a Work Manager with minimum and maximum thread constraint set. This is particularly important for scenarios where Service Bus manages different types of services.


If multiple Proxy Services using the Kafka transport are deployed in the same Service Bus domain, do not share the same Work Manager between them. This may cause thread starvation if the maximum thread constraint of the Work Manager is not high enough to handle the sum of all threads. As a best practice, create one Work Manager for each Proxy Service within your Service Bus domain.

Implementing Partitioning During Message Transmission

In runtime, each topic in Kafka is implemented using a cluster-aware partitioned log. This design is very scalable because a topic can handle a number of messages that goes beyond the limits of a single server; because the topic data is spread in the cluster using multiple partitions. In this context, a partition is a sequence of messages continuously append, in which each message is uniquely identified within the partition through a sequence identifier called offset.


By using multiple partitions, Kafka not only can handle more data but also scale out the message consumption by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. With this approach Kafka ensures that the consumer is the only reader of that partition, being able to consume the data in an ordered fashion.

The partition assignment across the consumers is dynamic, even when new brokers/consumers join the cluster, or when they fail and leave it. This is accomplished by a process called Rebalancing that assigns the partitions across the consumer processes. While there is no way to control how the partitions are assigned to the consumer processes, there is a way to control to which partitions the messages are sent. In fact, the absent of control about how the messages are spread across the partitions leads to a huge chance of overhead in the message consumption, if messages end up always in the same partition.

Hopefully, the Kafka transport provides ways to implement message partitioning across the brokers. This can be accomplished by using special headers in the message that is sent downstream to Kafka. During the transmission of a message to Kafka, the following algorithm is used:

1) If a partition is specified in the message header, then it its used.

2) Otherwise, if a key is specified, it is used to figure the partition.

3) Otherwise, a randomly selected partition will be used instead.

The rationale of this algorithm is that if no additional information is provided in the message header, the message will be sent anyway but with no control about which partition will be selected. Providing the appropriated information in the message header, either assigning a partition or a key, provides this fine control over partition assignment. In order to work, the header must have the information written just before the message is sent downstream, and the best way to implement this is by using the Transport Headers action.

To illustrate this scenario, consider a Business Service called “DispatchOrders”, created using the Kafka transport. Before routing the message to this Business Service, use a Transport Headers action in the Route node to set the header values. The header options available are: ‘partition’ and ‘message-key’. Here is an example of setting the message-key.


The approach to set the partition header is the same. The Transport Headers action can also be used when messages are published in the middle of the Pipeline, so it is not restricted to be used with routing nodes. Assigning both the partition and the message-key value in the header it is pointless because if provided, the partition always takes precedence during the message transmission.

Improving the Batching of the Kafka Transport

The Kafka transport was designed from the ground to leverage the new producer API, to obtain the best possible results in runtime in terms of transmission throughput. Because the producer API sends all the messages asynchronously there is no blocking on the Business Services when messages are dispatched; and the threads used to perform this dispatch immediately return to the WebLogic thread pool. An internal listener is created for every message dispatched, to receive the confirmation of the message transmission. When this confirmation arrives (with or without an error) the response thread is scheduled to execute the response Pipeline, if any.

Because messages are sent asynchronously, they are not immediately written into the partitions, which causes the messages to be locally buffered waiting for a single dispatch. A background thread created by the producer API manages to accumulate the messages and send in a batch through the TCP connection established with each of the brokers it needs to communicate with. This batching approach considerably improves the transmission throughput by minimizing the overhead that the network may cause.

The Kafka transport provides options to configure how this batching is performed. Most scenarios will be fine using the standard configuration, but for high volume scenarios where Service Bus must write to Kafka, changing these options can positively affect the performance.


The most important fields in this context are the Buffer Memory and the Batch Size, and both fields are measured in terms of bytes. The Buffer Memory field controls the amount of bytes to be buffered and it defaults to 32MB. Changing this value affects the message dispatch and the JVM heap footprint. You can also control what happens when this buffer becomes full with the Block On Buffer Full field. The Batch Size controls the size of each batch and it defaults to 16KB. Changing this value will directly affect how frequent are the writes over the network.

In scenarios of heavy load, the value set in the Batch Size field will correct control the frequency of the writes because the batch will quickly get full. However, under moderated load the Batch Size can take a while to fill up and you can use the Linger field as an alternative to control the write frequency. This field is measured in milliseconds and any positive value set will create a delay that waits for new messages to be batched together.

Delivery Semantics during Message Transmission

In a distributed architecture, any type of failure can happen during the transmission of messages, and prevents the consumers to receive them. Therefore, it is up to the producer of the message describe the appropriate behavior to handle failures, otherwise data might be lost. In Kafka is no different. When a message is sent, the producer needs to have some degree of guarantee in regards its delivery, in order to decide if the transaction should be considered complete or, assume that it failed and try it again.

This section will describe some options available in the Kafka transport to customize the behavior of the message transmission regarding durability. But to use these options correctly, you need to understand how replication works in Kafka.

The concept of partitions was briefly discussed before, but one important detail about it was not mentioned: In Kafka, each partition can have multiple copies across the cluster. This is primarily done to provide high availability to the partitions, so the message consumption from a partition can continue even when the broker that hosts that partition fails. The number of replicas of the partitions can be set administratively in a per-topic basis so you can better control how available are the partitions depending of the cluster size.

For instance, if you set the replication factor of a topic to three, each partition within that topic will have three replicas in the cluster. One of these replicas is selected as leader, and the other replicas became followers. The difference between leaders and followers is that only the leader is responsible for reading and writing into the partition, and the followers are used for backup purposes. Because the followers maintain a synchronized copy of the partition with the leader, they are known as ISR’s, acronym for In-Sync Replicas. The more ISR’s exist for a partition then more highly available it will be. But this high availability does not come for free; the message transmission can be heavily impacted in which regards to latency. For instance, when using the highest durability, the message transmitted needs to wait until all ISR’s acknowledges to the leader to be considered completed.

Kafka provides ways to customize this behavior and that was captured as options in the Kafka transport, so you can change it to afford your specific requirements. When you create a Business Service using the Kafka transport, you can use the Acknowledge field to configure the appropriated behavior regarding durability. The available options are:

Without Acknowledge: The fastest option in terms of latency, but data loss might happen.

Leader Acknowledge: Waits for the leader to acknowledge, with a modest latency increase.

ISRs Acknowledge: Highest durability. All ISR’s need to acknowledge, so the latency is higher.

Also, the Timeout field can be modified to customize the amount of time to wait until declare that the durability option chosen was not meet. It defaults to thirty seconds, but this value can be changed to accommodate specific scenarios such as slow network deployments.


Regardless of which durability option is configured, the Kafka transport provides a way to follow up if the message was sent successfully, by making available special headers in the response metadata. As explained before, for each message dispatched an internal listener is created to receive the confirmation of the message transmission. This confirmation is sent by the leader of the partition chosen, and contains the message offset in the partition. The response metadata created is based on this confirmation, which means that if no response metadata is available after the message is sent, you can be positive that a failure happened during the transmission, and the details of this failure will come in a form of an exception.


So far has been discussed how durability can be configured, but what happens if a message transmission is considered failed? How many retries Service Bus must perform? How long between each retry should Service Bus wait? Those are some of the questions that may rise during the implementation of services that send messages to Kafka. In the Kafka transport, this is configured in the Transport tab of your Business Service.


These options are self-explanatory and well detailed in the official Service Bus documentation. However, there is one detail that you must be aware of. In Kafka, there are two specific properties that control how retries are performed, the ‘retries’ and the ‘’. In runtime, the Kafka transport copy the value set in the Retry Count field to the retries property directly, without any extra processing. But in the case of the Retry Iteration Interval field, instead of copying the value set to the property directly, a calculation is performed to discover the appropriated value.

The reason why this is done this way is due the difference of unit precision. While the Retry Iteration Interval field has second precision, the has millisecond precision. Thus, the first step of the calculation is bringing the Retry Iteration Interval value to the millisecond precision, to then get an approximated value for the property, by dividing the obtained value with the value set in the Retry Count field. For instance, if the value set in the Retry Count field is three and the value set in the Retry Iteration Interval field is one, then the value set in the property is 333ms.

If you need that the property work with specific values, you can set it using the Custom Properties field, that will be explained in the next section.

Guidelines to Perform General Troubleshooting

This section will focus on how to identify and troubleshoot some issues that might happen with the Kafka transport. The first thing to learn is that the Kafka transport logs most internal things in the server output. During a troubleshooting section, make sure to capture the server output and read the logs for additional insight.

For instance, if during server startup the Kafka transport is not able to load the main Kafka libraries from the Service Bus classpath, it will log in the server output the following message:

<Jun 22, 2015 6:09:20 PM EDT> <Warning> <> <OSB-000000> <Kafka transport could not be registered due to missing libraries. For using the Kafka transport, its libraries must be available in the classpath.>

This might explain why the Kafka transport seems not to be available in Service Bus. Similarly, if a Proxy Service is not able to connect to Zookeeper, it will print the following log in the server output:

<Jun 22, 2015 6:30:18 PM EDT> <Error> <> <OSB-000000> <Error while starting a Kafka endpoint.> org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000



        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(

        at java.lang.reflect.Method.invoke(


        at com.sun.proxy.$Proxy158.activationComplete(Unknown Source)

The Kafka transport provides some JVM properties to customize specific behaviors. During endpoints activation, the Kafka transport builds native connections to Kafka using the producer and consumer APIs. For such, it needs to create Kafka specific properties and fills it with the values set in design-time. To display these specific properties in the server output, you can use the following JVM property:

This JVM property defaults to false. If it is set to true, the following log is printed in the server output during the endpoint activation:

<Jun 22, 2015 6:44:59 PM EDT> <Info> <> <OSB-000000> <The endpoint ‘DispatchOrders’ is being created using the following properties>













The Kafka transport was designed to delay the activation of Proxy Service-based endpoints until the server reaches the RUNNING state. This is important to ensure that the server does not start processing messages during startup, scenario that can significantly slow down the server and delay the startup if the number of messages to consume is too high. For this reason, when the Kafka transport is initialized, it creates an internal timer that keeps polling the server about its state. Each check happens every five seconds, but you can set small or higher values using the following JVM property:<NEW_VALUE>

This JVM property is measured in milliseconds. Keep in mind that there is a maximum possible value for this property, which is one minute. Any value set that is greater than one minute will be automatically adjusted to one minute to prevent any tentative of damaging the Service Bus startup.

When the Service Bus server reaches the RUNNING state, all the endpoints are started and the internal timer is destroyed automatically. But reaching the RUNNING state is not the only criteria to destroy the internal timer. As expected, it might happen that the server never reaches the RUNNING state for some reason. In this case, the internal timer will be destroyed if the startup takes more than five minutes. For most Service Bus deployments five minutes may be considered more than enough, but in case of servers with longer startup times, the endpoints may never be started because the internal timer responsible for this was already destroyed when the server finally got into the RUNNING state. To prevent this from happen, you can change the timeout value by using the following JVM property:<NEW_VALUE>

This JVM property is measured in milliseconds. Adjust it to make sure that the internal timer responsible for starting the Proxy Services waits time enough to do it.

While working with Proxy Services created with the Kafka transport, there will be situations where you want to make sure if the number of threads set in the Consumer Threads field is actually working. This can be accomplished by using the standard WebLogic monitoring tools available in the administration console. For instance, by accessing the Monitoring tab of the Kafka transport deployment, you can use the workload sub-tab to check if a specific Work Manager scheduled the appropriated work.


The example above shows a Work Manager with sixteen pending requests, value that matches with the number of threads set in the Consumer Threads field. Of course, In order to work, the Work Manager needs to be explicitly associated with the Proxy Service.

Another way to verify if the value set in the Consumer Threads field is working is by taking thread dumps from the JVM. In the thread dump report, you will have entries that look like this:

“[ACTIVE] ExecuteThread: ’21’ for queue: ‘weblogic.kernel.Default (self-tuning)'” Id=161 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3c0fcd49

               at sun.misc.Unsafe.park(Native Method)

               –  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3c0fcd49

               at java.util.concurrent.locks.LockSupport.park(

               at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(

               at java.util.concurrent.LinkedBlockingQueue.take(

               at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)

               at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)

               at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)

               at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)




               at com.bea.alsb.platform.weblogic.WlsWorkManagerServiceImpl$




The rationale here is looking for the following string:


The number of times that this string appears in the thread dump report need to match with the value set in the Consumer Threads field.

Another important feature included in the Kafka transport is the Custom Properties field, available in all types of services created. The primary purpose of this field is to provide a way to directly set any specific Kafka property. This can be quite useful if some property introduced in a particular release of Kafka has no equivalent in the user interface; situation that may occur often due the Open Source nature of Kafka, where new releases are provided by the community in a much faster pace.


Any property set in this field is used by the Kafka transport to establish native connections to Kafka. However, there is an important aspect about precedence that needs to be carefully considered. If some property is set in this field, and this property has an equivalent in the user interface, the value used will be the one set in the Custom Properties field, overriding any value set in the user interface.


This article is the part two of a series that intend to show how Service Bus can be leveraged to connect and exchange data with Kafka, allowing developers and architects to gain from both worlds. It was explained advanced techniques that can be applied to the Kafka transport, and also how to identify and troubleshoot common issues.


  1. Not that I am aware of. Most clients that interact with RabbitMQ does not have any support for AMQP. Because RabbitMQ has support for JMS extensions, you can try connect to it using the Service Bus JMS transport and/or the JCS JMS adapter.


    Ricardo Ferreira

  2. Hi ,
    Is there a custom transport to integrate with RabbitMQ ?

Add Your Comment