Connecting ICS and Apache Kafka via REST Proxy API

Introduction

Apache Kafka (Kafka for short) is a proven and well known technology for a variety of reasons. First it is very scalable and has the capability of handling hundreds of thousands of messages per second without the need of expensive hardware; and close to zero fine tuning, as you can read here. But another reason is due its client API capabilities. Kafka allows connections from different platforms, by leveraging a number of client APIs that make it easy for developers to connect to and transact with Kafka. Being able to easily connect to a technology is a major requirement for open-source projects.

In nutshell, Kafka clients APIs are divided into three categories:

* Native Clients: This is the preferred way to develop client applications that must connect to Kafka. These APIs allow high-performance connectivity and leverage most of the features found in Kafka’s clustering protocol. By using this API, developers are responsible for writing code to handle aspects like fault-tolerance, offset management, etc. An example of this is the Oracle Service Bus Transport for Kafka has been built using the native clients, which can be found here.

* Connect API: SDK that allows the creation of reusable clients, which run on top of a pre-built connector infrastructure that takes care of details such as fault-tolerance, execution runtime and offset management. The Oracle GoldenGate adapter has been built on top of this SDK, as you can read here

* Rest Proxy API: For all those applications that for some reason can neither use the native clients nor the connect API, there is an option to connect to Kafka using the REST Proxy API. This is an open-source project maintained by Confluent, the company behind Kafka that allows REST-based calls against Kafka, to perform transactions and administrative tasks. You can read more about this project here.

The objective of this blog is to detail how Kafka’s REST Proxy API can be used to allow connectivity from Oracle ICS (Integration Cloud Service). By leveraging the native REST adapter from ICS, it is possible to implement integration scenarios in which messages can be sent to Kafka. This blog is going to show the technical details about the REST Proxy API infrastructure and how to implement a use case on top of it.

Use_Case_Diagram

Figure 1: Use case where a request is made using SOAP and ICS delivers it to Kafka.

The use case is about leveraging ICS transformation capabilities to allow applications limited to the SOAP protocol to be able to send messages to Kafka. Maybe there are some applications out there that have no REST support, and can only interact with SOAP-based endpoints. In this pattern, ICS can be used to adapt and transform the message so it could be properly delivered to Kafka. SOAP is just an example; it could be the case of any other protocol/technology supported by ICS. Plus, any Oracle SaaS application that has built-in connectivity with ICS can also benefit from this pattern.

Getting Started with the REST Proxy API

As mentioned before, the REST Proxy API is an open-source project maintained by Confluent. Its source-code can be found on GitHub, here. So please be aware that the REST Proxy API will not be part of any Kafka deployment by default. That means that if you download and install a community version of Kafka, the bits for the REST Proxy API will not be there. You need to explicitly build the project and integrate with your Kafka install. This can be a little tricky since the REST Proxy API project depends on other projects such as commons, rest-utils and the schema-registry.

Luckily; the Confluent folks provide an open-source version of their product, which has everything pre-integrated including the REST API Proxy and the other dependencies. This distribution is called Confluent Open Source and can be downloaded here. It is strongly recommended to start using this distribution, so you can be sure that you face no errors that might be the result of bad compilation/building/packaging. Oracle’s own distribution of Kafka called Event Hub Cloud Service could be used as well.

Once you have a Kafka install that has the REST Proxy API, you will be good to go. Everything was built to be O.O.T.B through easy-to-use scripts. The only thing you have to keep in mind is about the services dependencies. In a typical Kafka deployment, the brokers depend on the Zookeeper service that has to be continuously up and running. Zookeeper is required to keep metadata information about the brokers, partitions and topics in a highly available fashion. Zookeeper’s default port is 2181.

The services from the REST Proxy API also depend on Zookeeper. To have a REST Proxy API deployment, you need a service called the REST Server. The REST Server depends on Zookeeper. Also, the REST Server depends on another service called Schema Registry – which in turn depends on Zookeeper as well. Figure 2 summarizes this dependency relationship between the services.

Services_Depedencies

Figure 2: Dependency relationship between the REST Proxy API services.

Although may look like, but none of these services can become a SPOF (Single Point of Failure) or SPOB (Single Point of Bottleneck) in Kafka’s architecture. All of them were designed from scratch to be idempotent and stateless. Therefore, you can have multiple copies of each service running behind a load balancer to meet your performance and availability goals. In order to start a Kafka deployment with the REST Proxy API; you need to execute the following scripts, in the order shown on listing 1.

/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &

/bin/kafka-server-start /etc/kafka/server.properties &

/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &

/bin/kafka-rest-start /etc/kafka-rest/kafka-rest.properties &

Listing 1: Starting a Kafka deployment with the REST Proxy API.

As you can see on listing 1, every script references a properties configuration file. These files are used to customize the behavior of a given service. Most properties on these files has been preset to meet a variety of workloads; so unless you are trying to fine tune a given service – most likely you won’t need to change them.

There is an exception though. For most production environments you will run these services on different boxes for high availability purposes. However; if you choose to run them within the same box, you might need to adjust some ports to avoid conflicts. That can be easily accomplished by editing the respective properties file and adjusting the corresponding property. If you are unsure about which property to change, consult the configuration properties documentation here.

Setting Up a Public Load Balancer

This section might be considered optional depending on the situation. In order for ICS to connect to the REST Proxy API, it needs to have network access to the endpoints exposed by the REST Server. This happens because ICS runs on the OPC (Oracle Public Cloud) and can only access endpoints that are publicly available on the internet (or endpoints exposed through the connectivity agent). Therefore, you may need to set up a load balancer in front of your REST Servers to allow for this connection. This should be considered a best practice because otherwise, you would need to setup firewall rules to allow public internet access to the boxes that holds your REST Servers. Moreover, running without a load balancer would make difficult to transparently change your infrastructure if you need to scale up/down your REST servers. This blog will show how to set up OTD (Oracle Traffic Director) in front of the REST servers but any other load balancer that supports TCP/HTTP would also suit the needs.

In OTD, the first step would be creating a server pool that has all the exposed REST Server endpoints. In the setup built for this blog, I had a REST Server running over the port 6666. Figure 3 shows an example of server pool named rest-proxy-pool.

OTD_Config_1

Figure 3: Creating a server pool that references the REST Server services.

The second step is the creation of a route under your virtual server configuration that will forward any request that matches a certain pattern to the server pool created above. In the REST Proxy API, any request that intends to perform a transaction (which could be either to produce or consume messages) goes through a URI pattern that starts with /topics/*. Therefore; create a route that uses this pattern, as shown on figure 4.

OTD_Config_2

Figure 4: Creating a route to forward requests to the server pool.

Finally, you need to make sure that a functional HTTP listener is associated with your virtual server. This HTTP listener will be used by ICS when it sends messages out. In the setup built for this blog, I have used a HTTP listener on top of the port 8080 for non-SSL requests. Figure 5 depicts this.

OTD_Config_3

Figure 5: HTTP listener created to allow external communication.

Before moving to the following sections; it would be a good idea to validate the setup built so far, since there are a lot of moving parts that can fail. The best way to validate this is by sending a message to a topic using the REST Proxy API and checking if that message is received using Kafka’s console consumer. Thus, start a new console consumer instance to listen for messages sent to the topic orders as shown in listing 2.

/bin/kafka-console-consumer –bootstrap-server <BROKER_ADDRESS>:9092 –topic orders

Listing 2: Starting a new console consumer that listens for messages.

Then, send a message out using the REST Proxy API exposed by your load balancer. Remember that the request should pass through the HTTP listener configured on OTD. Listing 3 shows a cURL example that sends a simple message to the topic using the infrastructure built so far.

curl -X POST -H “Content-Type: application/vnd.kafka.json.v1+json” –data ‘{“records”:[{“key”:”12345″, “value”:{“message”:”Hello World”}}]}’ “http://<OTD_ADDRESS>:8080/topics/orders”

Listing 3: HTTP POST to send a message to the topic using the REST Proxy API.

If everything was setup correctly, you should see the JSON payload sent to the topic in the output of the console consumer started on listing 2. There are some interesting things to comment about the example shown in listing 3. Firstly, you may have noticed that actual payload sent has a strictly defined structure. It is a JSON payload with only one root element called “records”. This element’s value is an array with multiple entries of type key/value. This means that you can send multiple records in once with a single request to maximize throughput, since you can avoid having to perform multiple network calls.

Secondly, the “key” field is not mandatory. If you send a record containing only the value, that will work as well. However, it is highly recommended to use a key every time you send a message out. That will give you more control over how the messages will be grouped together within the partitions in Kafka, therefore considerably improving the partition persistence/replication over the cluster.

Thirdly, you may also have noticed the content type header used in the cURL command. Instead of using a simple application/json as most applications would use, we used application/vnd.kafka.json.v1+json. This is a requirement for the REST Proxy API to work. Keep this in mind while developing flows in ICS.

Message Design for REST Proxy API

Now it is time to start thinking about how are we going to map the SOAP messages sent to ICS into the JSON payload that needs to be sent to the REST Proxy API. This exercise is important because once you start using ICS to build the flow, it will ask for payload samples and message schemas that you may not have in first hand. Therefore, this section will focus on generating these artifacts.

Let’s start by designing the SOAP messages. In this use case we are going to have ICS receiving order confirmation requests. Each order confirmation request will contain the details of an order made by a certain customer. Listing 4 shows an example of this SOAP message.

<soapenv:Envelope xmlns:blog=”http://cloud.oracle.com/paas/ics/blogs”

   xmlns:soapenv=”http://schemas.xmlsoap.org/soap/envelope/”>

   <soapenv:Body>

      <blog:confirmOrder>

         <order>

            <orderId>PO000897</orderId>

            <custId>C00803</custId>

            <dateTime>2017-02-09:11:06:35</dateTime>

            <amount>89.90</amount>

         </order>

      </blog:confirmOrder>

   </soapenv:Body>

</soapenv:Envelope>

Listing 4: SOAP message containing the order confirmation request.

In order to build the SOAP message shown in listing 4, it is necessary to have the corresponding message schemas typically found in a WSDL document. You can download the WSDL used to build this blog here. It will be necessary when we setup the connection in ICS later.

The message that we really want to send to Kafka is in the JSON format. It has essentially all the fields shown on listing 4, except for the “orderId” field. Listing 5 shows the JSON message we need to send.

   “custId”:”C00803″,

   “dateTime”:”2017-02-09:11:06:35″,

   “amount”:89.90

}

Listing 5: JSON message containing the order confirmation request.

The “orderId” field was omitted on purpose. We are going to use this field as the key for the record that will be sent to Kafka. By using this design we can provide a way to track orders by their identifiers. If you recall to the JSON payload shown in listing 3, you will figure that the JSON payload shown in listing 5 will be the portion used in the “value” field. Listing 6 shows the concrete payload that needs to be built so the REST Proxy API can properly process the payload.

{  
   “records”:[  
      {  
         “key”:”PO000897″,
         “value”: {  
            “custId”:”C00803″,
            “dateTime”:”2017-02-09:11:06:35″,
            “amount”:89.90
         }
      }
   ]
}

Listing 6: JSON payload used to process messages using the REST Proxy API.

Keep in mind that although the REST Proxy API will receive the payload shown on listing 6, what the topic consumers will effectively receive is only the record containing the key and the value set. When the consumer reads the “value” field of the record, it will have access to the actual payload containing the order confirmation request. Figure 6 shows the mapping that needs to be implemented in ICS.

Abstract_Source_Target_Mapping

Figure 6: Message mapping to be implemented in ICS.

Developing the Integration Flow

Now that we passed by the configuration necessary to establish communication with the REST Proxy API; we can start the development of the integration flow in ICS. Let’s start with the configuration of the connections.

Create an SOAP-based connection as shown in figure 7. Since this connection will be used for inbound, you can skip any configuration about security. Go ahead and attach the WSDL that contains the schemas into this newly created connection.

Creating_SOAP_Conn_2

Figure 7: SOAP-based connection used for inbound processing.

Next, create a REST-based connection as shown in figure 8. This is the connection that will be used to send messages out to Kafka. Therefore, make sure to set in the “REST API Base URL” field the correct endpoint that should point to your load balancer. Also make sure to set the /topics resource after the port.

Creating_REST_Conn_2

Figure 8: REST-based connection used for outbound processing.

With the inbound and outbound connections properly created, go ahead and create a new integration. For this use case we are going to use Basic Map Data as integration style/pattern, although you could also leverage the outbound connection to REST Proxy API in orchestration -based integrations.

Creating_Flow_1

Figure 9: Using Basic Map Data as the integration style for the use case.

Name the integration as OrderService and provide some description, as shown in figure 10. Once the integration flow is created, go ahead and drag the SOAP connection to the source area of the flow. That will trigger the SOAP endpoint creation wizard. Go through the wizard details until you reach the last page. Accept all values suggested by default. Then, click on the “Done” button to finish it.

Creating_Flow_2

Figure 10: Setting up the details for the newly created integration.

ICS will create the source mapping according to the information gathered from the wizard; along with the information from the WSDL attached to the connection, as shown in figure 11. Up to this point, we can now drag the REST connection to the target area of the flow. That will trigger the REST endpoint creation wizard.

Creating_Flow_6

Figure 11: Integration flow with the inbound mapping built.

Differently from the SOAP endpoint creation wizard; we will make some changes in the options shown in the REST endpoint creation wizard. The first one is setting the Kafka topic name in the “Relative Source URI” field. This is important because ICS will use this information to build the final URI that will be sent to the REST Proxy API. Therefore, make sure to set the appropriate topic name. For this use case, we are using a topic named orders, as shown in figure 12. Also, please select the option “Configure Request Payload” before clicking next.

Creating_Flow_7

Figure 12: Setting up details about the REST endpoint behavior.

In the next page, you will need to associate the schema that will be used to parse the request payload. Select the “JSON Sample” and upload a JSON sample file that contains a payload like the one shown in listing 6. Please make sure to provide a JSON sample that has at least two sample values in the array section. ICS validates if the samples provided has enough information that can be used to generate the internal schemas. If some JSON sample has an array construct then ICS will ask for at least two values within the array, to make sure that it is going to deal with a list of values instead of a single value. You can grab a copy of a valid JSON sample for this use case here.

Creating_Flow_8

Figure 13: Setting up details about schemas and media types.

In the “Type of Payload” section, make sure to select the “Other Media Type” option to allow the usage of custom media types. Then; set application/vnd.kafka.json.v1+json as the value, as shown in figure 13. Click next and review the options set. If everything looks like what is shown in figure 14 then you can click the “Done” button to finish the wizard.

Creating_Flow_9

Figure 14: Summary page of the REST endpoint creation wizard.

ICS will bring together the request and response mappings and expects that you set them up. Thus, go ahead and create the mappings for both request and response. For the request mapping, you should simply associate the fields as shown in figure 15. Remember that this field mapping should mimic what we had shown before in figure 6, including the usage of the “orderId” field as the record key.

Creating_Flow_11

Figure 15: Request mapping configuration.

The response mapping is way simpler: the only thing you have to do is associating the “orderId” field to the “confirmationId” field. The idea here is providing the user a way to know if the transaction was 100% successful or not. By returning the same order identifier value provided we will be doing this because otherwise; if any failure happens during the message transmission then the REST Proxy API will make sure to propagate an fault back to the caller, which in turn will force ICS to simply catch this fault and propagated it back. Figure 16 shows the response mapping.

Creating_Flow_12

Figure 16: Response mapping configuration.

Now set up some tracking fields (For this use case using the “orderId” field would be a good idea) and finish the integration flow as shown in figure 17. Now you are ready to activate and test the integration to check the end-to-end behavior of the use case.

Creating_Flow_13

Figure 17: Integration flow 100% complete in ICS.

You can download a copy of this use case here. Once the integration is active, you can validate if it is working correctly by starting a console consumer like shown in listing 2. Then, open your favorite SOAP client utility and import the WSDL from the integration. You can easily access the integration’s WSDL in the UI; by clicking in the information icon of the integration, just like shown in figure 18.

Creating_Flow_14

Figure 18: Retrieving the integration’s WSDL from the UI.

Once the WSDL is properly imported in your SOAP client utility, send a payload request like the one shown in listing 4 to validate the integration. If everything was setup correctly, you should see the JSON payload sent to the topic in the output of the console consumer started on listing 2.

Conclusion

This blog has shown in details how to configure ICS to send messages to Kafka. Since ICS has no built-in adapter for Kafka, it was used the REST Proxy API project that is part of the Kafka ecosystem.

Add Your Comment