Oracle Service Bus Transport for Apache Kafka (Part 1)

Introduction

Few of us realize it but the heart of OOP (Object-Oriented Programming) has more to do with messaging than with objects and classes, or with related design principles like inheritance and polymorphism. At very beginning, OOP was designed to help developers implement data exchange between objects, so these objects would be coordinated to execute some business process and divide the problem into smaller pieces with a key benefit being code that is easier to maintain. Another original OOP design concept dictates that all objects in the system are accountable for data sharing; no object holds more responsibility than another regarding messaging. This design principle helps OO applications to scale as a whole because objects can scale individually.

Considering that the firsts object-oriented languages (i.e.: LISP, MIT ALGOL, Simula 67) were created in the 60’s, we can all agree that messaging has been around for at least four decades. However, messaging looks quite different today from these original OO concepts and this discrepancy is a direct result of industry demands and technology evolution. Let’s do a quick review:

– To increase performance, objects were supposed to run concurrently using threads;

– To increase scalability, objects were supposed to be spread across different machines;

– To increase availability, messages were stored in a central repository within machines;

– To increase manageability, this repository controls most part of the message handling;

– To increase adoption, standards were created having in mind all these characteristics;

Most of these characteristics define what we refer to today as MOM (Message-Oriented Middleware). This type of middleware is commonly used for exchanging messages in a reliable, fault-tolerant and high performance way. Numerous standards were created to define APIs for MOM’s, with JMS (Java Message Service) and AMQP (Advanced Message Queuing Protocol) as perhaps the most popular ones, with many great implementations available, both commercial and open-source.

But industry demands never slow down, and like any other technology, MOM’s have to keep up with them in order not to fall into obsolescence. There is one major demand currently threatening most implementations and that is data volume: the need to handle not hundreds or thousands but millions of messages per second. The volume of data generated had increased exponentially in the last decade and keeps growing every second. Most implementations follow a MOM design concept – dictated by an API specification – in which the consumers and producers need to be as lightweight as possible, so the MOM ends up handling most parts of the messaging. While this is good from an ease-of-use standpoint, it creates a serious bottleneck in the architecture that eventually prevents it from scaling up and meets these ever increasing data volumes.

Of course, not all applications need to handle millions of messages simultaneously and most of these implementations might seem “good-enough”. However, what happens if one of these applications needs to be modified to handle a high data volume scenario? What are the consequences? Common situations include application adjustments and fine tuning that by itself bring a new set of complexities to the architecture. For instance:

– Disable message persistence to improve performance, but potentially losing messages;

– For JVM-based implementations, increase the heap size, and possibly cause long GC pauses;

– Trimming the Java EE application server to use only JMS, perhaps losing flexibility and supportability;

– Giving producer/consumer’s more responsibility, but breaking the API specification;

– Using high-speed appliances to improve performance, but also increasing the TCO;

It seems that industry demands over the years caused us to deviate a lot from the original OOP design that dictates that the system scales as a whole when each object scales individually. This principle is more critical than ever in an era where scalability is the most important driver due to high data volumes. Finally, we’re also in the Cloud Computing era where a significant percentage of enterprise applications and computing infrastructure is either running in, or moving to, the Cloud.

Some years ago, a few LinkedIn enterprise architects responsible for the company’s data infrastructure faced the same challenges while working to build data pipelines throughout the company. They were trying to capture and manage relational database changes, NoSQL database changes, ETL tasks, messaging, metrics, and integrate tools like Apache Hadoop, etc. After ending up with different approaches for each scenario, they realized that maybe one single approach would be more maintainable, but keeping in mind all the challenges from each scenario related with scalability. This is the simplistic overview about how Apache Kafka was created. Apache Kafka (Kafka for short) is a distributed, partitioned and replicated commit log service that provides features similar to other messaging systems, but with a unique design that allows it to handle terabytes of data with a single-node. Most importantly – the Kafka architecture allows it to scale as whole because each layer (including producers and consumers) is designed from the ground up to be scalable.

After have seen numerous requests from customers and partners about being able to integrate with Kafka, the A-Team decided to write a native transport for Oracle Service Bus (Service Bus for short) to allow the connection and data exchange with Kafka – supporting message consumption and production to Kafka topics. This is done in a way that allows Service Bus to scale jointly with Kafka, both vertically and horizontally.

The Kafka transport is provided for free to use “AS-IS” but without any official support from Oracle. Bugs, feedback and enhancement requests are welcome but need to be performed on the transport repository on Oracle’s GitHub. The A-Team reserves the right of help in the best-effort capacity.

In order to detail how this transport works, an article divided in two parts will be provided. This first part of the article will provide a basic introduction, covering how the transport can be installed and how to create Proxy and Business services to read and write from/to Kafka. The second part of the article will cover more advanced details like how to implement message partitioning and how to scale the architecture as a whole. These articles will not cover Kafka in detail but will instead focus on the Service Bus aspects of the solution. If you need a Kafka overview, it is strongly recommended you read its official documentation and spend some time trying out its quick start. Jay Kreps, one of the Kafka co-creators, wrote a very good in-depth article about Kafka that also covers all the details.

Installing the Transport in Service Bus

The first thing you need to do is to make sure that some of the Kafka libraries are available on the Service Bus classpath. At runtime, the transport implementation use classes from these libraries to establish connections to Kafka. As a best practice, put these libraries in the following folder: $OSB_DOMAIN/lib. JAR files within this folder will be picked up and added dynamically to the end of the server classpath at server startup. Which libraries to pick up will depend of which Kafka version are you planning to use. There were some changes in Kafka in which regards its client API. Specifically, the consumer API changed significantly in versions 0.8.X, 0.9.X and 0.10.X.

If you are using Kafka newer releases (Starting from 0.9.X or later) then you should copy the following libraries:

– $KAFKA_HOME/libs/slf4j-api-X.X.X.jar

$KAFKA_HOME/libs/kafka-clients-X.X.X.X.jar

If you are using Kafka older releases (Any release from 0.8.X) then you should copy the following libraries:

– $KAFKA_HOME/libs/kafka_2.10-0.8.2.1.jar

$KAFKA_HOME/libs/kafka-clients-0.8.2.1.jar

– $KAFKA_HOME/libs/log4j-1.2.16.jar

– $KAFKA_HOME/libs/metrics-core-2.2.0.jar

$KAFKA_HOME/libs/scala-library-2.10.4.jar

– $KAFKA_HOME/libs/zkclient-0.3.jar

– $KAFKA_HOME/libs/zookeeper-3.4.6.jar

Here, $KAFKA_HOME should point to the exactly location where Kafka was installed. Keep in mind that the actual file names may be different depending on which Kafka version you are using.

The second step is deploying the transport in Service Bus. Currently, there are two versions of the transport:

* Transport for Kafka 0.8.X: This version should be used if you are using Kafka 0.8.X releases. Download it here.

* Transport for Kafka 0.9.X: This version should be used if you are using Kafka 0.9.X or later releases. Download it here.

The source-code for the Kafka Transport is available on Oracle’s GitHub. Therefore; you can compile the source-code yourself and create your own binaries, instead of using the ones provided in this blog. Here is the GitHub repository: https://github.com/oracle/osb-kafka-transport

Download the zip file and extract its contents into a staging folder.

Copy the implementation files (kafka-transport.ear and kafka-transport.jar) to the same folder where the other transports are kept:

$MW_HOME/osb/lib/transports

And copy the JDeveloper plugin descriptor (transport-kafka.xml) to the plugins folder:

$MW_HOME/osb/config/plugins

Finally, you need to deploy the implementation files into your Service Bus domain. You can perform this deployment manually or using the WLST script (install.py) available with the transport. If you choose to perform the deployment using the WLST script, make sure to provide the exactly location of the implementation files when asked. This will happen if you run the script from a location that is different from where the files were copied. When installing the implementation files in the domain, the script makes a best effort to identify all possible targets in which the transport should run, which includes the Admin Server and clusters. However; if your domain has a custom installation (i.e.: SOA, MFT, BAM, Service Bus) make sure to target the implementation files only on Service Bus targets and the Admin Server, manually after the script finishes.

If you choose to perform the deployment manually, execute the following steps:

1) Connect into your Service Bus domain.

2) Deploy the kafka-transport.jar file as a library, targeting to all Service Bus servers/clusters and the Admin Server.

3) Deploy the kafka-transport.ear file as a application, targeting to all Service Bus servers/clusters and the Admin Server.

3.1) Name the application as “Service Bus Kafka Transport Provider”.

Receiving Data from Apache Kafka

This section will show how to create a Service Bus Proxy Service that fetches messages from a Kafka topic and processes them using a Pipeline. The following steps will assume that you have an up-and-running Kafka deployment, and that a topic named “orders” was previously created.

Creating the Service Bus Artifacts

1) In the Service Bus Console, create a session to modify the projects.

2) Create a new project called “article” under the projects folder.

3) Right-click the newly created project and choose Create > Pipeline.

4) Name this Pipeline as “OrdersListenerImpl”.

5) In the Service Type field, choose “Messaging”

6) Set “Text” as the Request Type and “None” as the Response Type.

7) Leave the “Expose as a Proxy Service” check box unchecked.

8) Click in the Create button to finish.

9) Open the Message Flow editor.

10) Implement the Message Flow with a Pipeline-Pair, creating one single stage in the request pipeline that prints the headers and the body of the incoming message. Here is an example of this implementation.

figure-1

11) Right-click the article project and choose Create a Proxy Service.

12) Name this Proxy Service as “OrdersListener”.

13) In the Protocol field choose “kafka” then, click in the Next button.

figure-2

14) Set “Text” as the Request Type and “None” as the Response Type.

15) Click in the Next button to continue.

16) In the Endpoint URI field, enter with the server details. If you are using Kafka 0.8.X then that should point to Zookeeper. Otherwise; if you are using Kafka 0.9.X or newer releases, then that should point to the brokers.

figure-3

17) Click on the Create button to finish.

18) In the General page, associate this Proxy Service with the previously created Pipeline.

figure-4

19) Click on the Transport Details tab.

20) In the Topic Name field, enter with “orders”.

F1

21) Save the entire configuration and activate the changes.

Testing the Scenario

In order to test this scenario, you can use the producer utility tool that comes with Kafka. In the “bin” folder of your Apache Kafka installation, type:

$KAFKA_HOME/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic orders <ENTER>

This will open the producer console for sending messages. Anything you type and hit the enter key will be immediately sent to the Kafka topic. For this simple test, type:

Hello World from Apache Kafka <ENTER>

You should see an output on Service Bus like this:

<Jun 16, 2015 7:02:41 PM EDT> <Warning> <oracle.osb.logging.pipeline> <BEA-000000> < [HandleIncomingPayload, HandleIncomingPayload_request, Logging, REQUEST] <con:endpoint name="ProxyService$article$OrdersListener" xmlns:con="http://www.bea.com/wli/sb/context">
  <con:service/>
  <con:transport>
    <con:uri>localhost:2181</con:uri>
    <con:mode>request</con:mode>
    <con:qualityOfService>best-effort</con:qualityOfService>
    <con:request xsi:type="kaf:KafkaRequestMetaDataXML" xmlns:kaf="http://oracle/ateam/sb/transports/kafka" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
      <tran:headers xsi:type="kaf:KafkaRequestHeadersXML" xmlns:tran="http://www.bea.com/wli/sb/transports">
        <kaf:message-key/>
        <kaf:partition>1</kaf:partition>
        <kaf:offset>5</kaf:offset>
      </tran:headers>
    </con:request>
  </con:transport>
  <con:security>
    <con:transportClient>
      <con:username>weblogic</con:username>
      <con:principals>
        <con:group>AdminChannelUsers</con:group>
        <con:group>Administrators</con:group>
        <con:group>IntegrationAdministrators</con:group>
        <con:group>Monitors</con:group>
      </con:principals>
    </con:transportClient>
  </con:security>
</con:endpoint>> 

<Jun 16, 2015 7:02:41 PM EDT> <Warning> <oracle.osb.logging.pipeline> <BEA-000000> < [HandleIncomingPayload, HandleIncomingPayload_request, Logging, REQUEST] Hello World from Apache Kafka>

Sending Data to Apache Kafka

This section will show how to create a Service Bus Business Service that sends messages to a Kafka topic. The following steps will assume that you have an up-and-running Kafka deployment, and that a topic named “orders” was previously created.

Creating the Service Bus Artifacts

1) In the Service Bus Console, create a session to modify the projects.

2) Right-click the article project and choose Create > Business Service.

3) Name this Business Service as “OrdersProducer”.

4) In the Transport field, choose “kafka” then, click in the Next button.

figure-6

5) Set “Text” as the Request Type and “None” as the Response Type.

6) Click on the Next button to continue.

7) In the Endpoint URIs section, enter with the Kafka broker server details.

figure-7

8) Click the Create button to finish.

9) Click the Transport Details tab.

10) In the Topic Name field, enter with “orders”.

F2

11) Save the entire configuration and activate the changes.

Testing the Scenario

In order to test this scenario, you can use the Test Console that comes with Service Bus.

figure-9

Type “Hello World from Oracle Service Bus” and click on the Execute button to perform the test. Since we have already deployed a Proxy Service that fetches messages from the Kafka topic, you should see a similar output on Service Bus:

<Jun 16, 2015 7:27:48 PM EDT> <Warning> <oracle.osb.logging.pipeline> <BEA-000000> < [HandleIncomingPayload, HandleIncomingPayload_request, Logging, REQUEST] <con:endpoint name="ProxyService$article$OrdersListener" xmlns:con="http://www.bea.com/wli/sb/context">
  <con:service/>
  <con:transport>
    <con:uri>localhost:2181</con:uri>
    <con:mode>request</con:mode>
    <con:qualityOfService>best-effort</con:qualityOfService>
    <con:request xsi:type="kaf:KafkaRequestMetaDataXML" xmlns:kaf="http://oracle/ateam/sb/transports/kafka" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
      <tran:headers xsi:type="kaf:KafkaRequestHeadersXML" xmlns:tran="http://www.bea.com/wli/sb/transports">
        <kaf:message-key/>
        <kaf:partition>1</kaf:partition>
        <kaf:offset>6</kaf:offset>
      </tran:headers>
    </con:request>
  </con:transport>
  <con:security>
    <con:transportClient>
      <con:username>weblogic</con:username>
      <con:principals>
        <con:group>AdminChannelUsers</con:group>
        <con:group>Administrators</con:group>
        <con:group>IntegrationAdministrators</con:group>
        <con:group>Monitors</con:group>
      </con:principals>
    </con:transportClient>
  </con:security>
</con:endpoint>> 

<Jun 16, 2015 7:27:48 PM EDT> <Warning> <oracle.osb.logging.pipeline> <BEA-000000> < [HandleIncomingPayload, HandleIncomingPayload_request, Logging, REQUEST] Hello World from Oracle Service Bus>

Development using FMW JDeveloper 12c

Alternatively, you can use the Fusion Middleware JDeveloper 12c to create Service Bus projects and also implement integration scenarios using the Kafka transport. This is probably a better approach than using the Service Bus Console when designing end-to-end integration flows, since you can leverage other features available in the IDE such as the debugger, mappers, XPath editors, drag-and-drop components, etc.

F3

Depending of which version of the Fusion Middleware JDeveloper you use, there will be some warnings in the log stating “Failed to invoke edit for <ENDPOINT>: SCA endpoint not available”. This is a known bug related to custom transports that Oracle engineering is already working to solve. But apart from these warnings everything works fine and you will be able to deploy the Service Bus project from the IDE normally. Also, you will notice that any custom transport is not available in the palette, but you can create then using the File > New approach.

Short Interview with Jay Kreps

While I was writing this article, I had the pleasure and honor to talk with Jay Kreps, one of the Kafka co-creators and CEO of Confluent – the company founded by the team who built Kafka and which provides an enterprise-ready version of Kafka called Stream Data Platform. Here’s a transcript of that brief interview.

– Tell us a Little Bit About Yourself:

“I’m the CEO at Confluent. I’m also one of the co-creators of Apache Kafka. I was previously one of the lead architects for data infrastructure at LinkedIn where Kafka was developed.”

– How popular is Apache Kafka Today?

“It’s really popular. You can see some of the user’s who have added themselves publically here

– What do you Think about the Service Bus Kafka Transport?

“I’m really excited to see the integration. We want to be able to plug in everything!”

– Would you like to see Oracle Supporting Kafka on its Cloud Offerings?

“Sure!”

On behalf of the Oracle community, I would like to thank Jay for his time and support on this project.

Conclusion

Apache Kafka is getting a considerable amount of traction from the industry with its ability to handle high volumes of data (terabytes-scale) with minimum hardware resources, allowing Cloud-based architectures to achieve high throughput and low latency messaging. Providing a native transport that supports Kafka in Service Bus allows for some interesting possibilities and we hope customers consider this option when thinking about using these technologies to solve challenging integration and messaging problems.

This article is the first part of a series that intend to show how Service Bus can be leveraged to connect and exchange data with Kafka, allowing developers and architects to gain from the both worlds. You can start reading the part two of this article here.

Comments

  1. Guys I know that is out of topic but how can I get your help? We got an OSB-381502 on a large mission critical OSB cluster and the local oracle support is saying is normal, we have proofs that is not normal of course

  2. Can we integrate Oracle AQ and Apache Kafka.?

    • Yes. By using the Apache Kafka Transport you can create applications in Service Bus that connects Oracle AQ with Kafka. Service Bus; as you may know, has a JCA adapter for AQ (Known as “Oracle JCA Adapter for AQ”) which can be used to listen messages from AQ. The Kafka transport would do the rest of the job – sending out the message to Kafka.

      Thanks,

      Ricardo

  3. Hi,
    We have weblogic 12.1.3.
    Did the following steps
    1. copied the kafka jars to odb_domain/lib. we have a different version so the below jars were copied
    kafka_2.11-0.10.0.0-cp1
    kafka-clients-0.10.0.0-cp1
    log4j-1.2.17
    metrics-core-2.2.0
    scala-library-2.11.8
    zkclient-0.8
    zookeeper-3.4.6

    2. copied the transport files to the transports folder
    3. copied the xml to the plugins folder
    4. deployed the jar and ear from the admin console
    5. restarted the server

    However, i do not see the kafka transport listed.
    could you please help?

    • It might be incompatibility with the new version of Apache Kafka. When I built this transport I tested with Kafka 0.8.2.1 – and its dependencies. I wouldn’t be surprised if maybe the logs are telling you that some lib version if misplaced. A friend tried that recently and found out that it was log4j version that was causing the incompatibility.

      Try to match up the dependencies to the version that the transport supports (Kafka 0.8.2.1) – starting from the dependencies until you reach the main lib.

      Thanks,

      Ricardo

  4. Many thanks for this great article.

    I’d like to evaluate the kafka transport for OSB 12.2.1. Therefore I did a quick start installation of SOA Suite 12.2.1 on SLES11, JDK 1.8.0_66.

    Then I followed exactly the steps described:

    # Copying kafka libraries, to be added to the classpath:
    tar zxvf ${HOME_INSTALL_FILES_KAFKA}kafka_2.11-0.10.0.0.tgz \
    –strip-components=2 \
    -C ${DOMAIN_HOME}lib/ \
    kafka_2.11-0.10.0.0/libs/slf4j-api-1.7.21.jar \
    kafka_2.11-0.10.0.0/libs/kafka-clients-0.10.0.0.jar \
    kafka_2.11-0.10.0.0/libs/kafka_2.11-0.10.0.0.jar \
    kafka_2.11-0.10.0.0/libs/metrics-core-2.2.0.jar \
    kafka_2.11-0.10.0.0/libs/scala-library-2.11.8.jar \
    kafka_2.11-0.10.0.0/libs/zkclient-0.8.jar \
    kafka_2.11-0.10.0.0/libs/zookeeper-3.4.6.jar \

    # Copying kafka transports
    unzip ${HOME_INSTALL_FILES_ORACLE}kafka-transport-0.3.1.zip kafka-transport.ear kafka-transport.jar -d ${ORACLE_HOME}osb/lib/transports
    ls -lisa ${ORACLE_HOME}osb/lib/transports

    # Copying kafka plugin descriptor
    unzip ${HOME_INSTALL_FILES_ORACLE}kafka-transport-0.3.1.zip transport-kafka.xml -d ${ORACLE_HOME}osb/config/plugins
    ls -lisa ${ORACLE_HOME}osb/config/plugins

    # Deploying kafka
    cd ${HOME_INSTALL_FILES_ORACLE}
    unzip kafka-transport-0.3.1.zip install.py kafka-transport.ear kafka-transport.jar
    ${ORACLE_HOME}oracle_common/common/bin/wlst.sh install.py

    # Bugfixing “Issues with the Service Bus Console”
    # Adding the lines:
    desc.res.gallery.kafka=The Kafka transport allows you to create proxy and business services that communicate with Apache Kafka brokers.
    desc.res.gallery.kafka.proxy=The Kafka transport allows you to create proxy services that receive messages from Apache Kafka brokers.
    desc.res.gallery.kafka.business=The Kafka transport allows you to create business services that route messages to Apache Kafka brokers.
    # to:
    oracle/soa/osb/console/folder/l10n/FolderBundle.properties
    oracle/soa/osb/console/folder/l10n/FolderBundle_en.properties
    oracle/soa/osb/console/folder/l10n/FolderBundle_de.properties
    # of:
    ${ORACLE_HOME}osb/lib/osbconsoleEar/webapp/WEB-INF/lib/adflib_osb_folder.jar

    Service Bus Kafka Transport Provider and transport-kafka(12.1.3,12.1.3) is deployed and “Active”. All works fine. No error occured. The log file is clean.

    But unfortunately, I can’t see “kafka” in the list of transports.

    Do you have any idea, what could be wrong?

    Best regards

    Stephan Rudolph

  5. Hi ..Thank you for your Quick Response.. really Appreciate it..

    We are using 12.1.3 and we have imported the transports into /osb/transports folder and ran the install.py script
    also copied the kafka libraries listed inthe article into domain_home/lib.
    After the complete bounce of all servers, osb test console is not working also we are not able to deploy any services in oracle service bus as it is erroring out in activation.
    errors
    An unexpected error occurred accessing the test service: No runtime server is configured in the domain.

    please let us know if you need more information.

    Thanks,
    Balaji

  6. Hi , Thank you for the article. its great. We have installed the transport by following the steps exactly given. Even after the bounce , we do not see the kafka transport in the osb console. we could see the transport application successfully deployed and targeted in weblogic console deployments.

    can you please let us know what we are missing in the installation. below is the log of the instally.py script.

    Appreciate your help.

    Enter the location of the “kafka-transport.jar” file: ……/osb/lib/transports
    Enter the location of the “kafka-transport.ear” file: provided ……./osb/lib/transports
    Deploying application from ../Middleware/Infra_Home/osb/lib/transports/kafka-transport.jar to targets AdminServer,BAM_CLUSTER,SOA_CLUSTER,OSB_CLUSTER (upload=false) …

    .Completed the deployment of Application with status completed
    Current Status of your Deployment:
    Deployment command type: deploy
    Deployment State : completed
    Deployment Message : no message
    Deploying application from …./Middleware/Infra_Home/osb/lib/transports/kafka-transport.ear to targets AdminServer,BAM_CLUSTER,SOA_CLUSTER,OSB_CLUSTER (upload=false) …

    .Completed the deployment of Application with status completed
    Current Status of your Deployment:
    Deployment command type: deploy
    Deployment State : completed
    Deployment Message : no message
    Disconnected from weblogic server: AdminServer

  7. The Apache Kafka transport has not been tested with 11g version of Oracle Service Bus, only 12c. Therefore, issues like this might happen and unfortunately I cannot provide details to fix it. If possible, use the 12c version instead of 11g.

    If your customer really needs to use 11g and would like to leverage the Apache Kafka transport, please get in touch with the Oracle account team that supports your customer and ask them to request A-Team’s help to back port the transport to 11g. We need a customer use case to justify this back port.

    Thanks,

    Ricardo Ferreira

  8. Vignesh Kumar says:

    Thanks for the article. I am trying implement the same in OSB 11G. The Kafka transport EAR file is failing saying Version 6 java is not supported. I changed the EAR to Verison 5. After that EAR file is not getting activated and giving below error –
    Could not initialize class org.apache.kafka.clients.producer.KafkaProducer. I have added the mentioned files in the Lib folder of OSB home.
    Will this transport supports OSB 11G. If yes, what else do i need to follow.

Add Your Comment