Oracle GoldenGate Big Data Adapter: Apache Kafka Producer

Introduction

Apache Kafka is a distributed, partitioned, replicated commit log service that provides the functionality of a Java Messaging System. The Oracle GoldenGate for Big Data Kafka Handler acts as a Kafka Producer that writes serialized change capture data from an Oracle GoldenGate Trail to a Kafka Topic.

In this article we will setup the Oracle GoldenGate Big Data Kafka Handler, configure data apply from Oracle 12c tables, and show examples of the different big data formatters available in the Kafka Handler.

This document covers functionality present in Oracle GoldenGate version 12.3; which may not be available in earlier product releases.

The concepts, scripts, and information presented in this article are for educational purposes only. They are not supported by Oracle Development or Support, and come with no guarantee or warrant for functionality in any environment other than the test system used to prepare this article. Before applying any changes presented in this article to your environment, you should thoroughly test to assess functionality and performance implications.

Main Article

The Oracle GoldenGate for Big Data product contains built-in support to write operation data from Oracle GoldenGate trail records into various Big Data targets, such as Apache Kafka. The functionality is separated into handlers that integrate with third party applications and formatters, which transform the data into various formats, such as Avro, JSON, delimited text, and XML. The Oracle GoldenGate Big Data Kafka module seamlessly integrates with Oracle GoldenGate Replicat for data apply to Kafka Topics.

For this article, we shall be using an Apache Kafka version 1.1.0 running on a Linux Virtual Machine as the Oracle GoldenGate target instance. Oracle GoldenGate for Big Data version 12.3.1.1 and Java version 8 complete the requisite software components. Environment variables set in the shell include:

# User specific environment and startup programs
export KAFKA_HOME=/home/oracle/kafka_2.11-0.9.0.0
export JAVA_HOME=/usr/java/jdk1.8.0_92
export JRE_HOME=/usr/java/jdk1.8.0_92/jre
export LD_LIBRARY_PATH=/usr/java/jdk1.8.0_92/jre/lib/amd64/server
export OGG_HOME=/u01/OGG123BD
#
export PATH=$KAFKA_HOME/bin:$JAVA_HOME/bin:$JRE_HOME/bin:$KAFKA_HOME:$PATH

Installation

Oracle GoldenGate for BigData requires JAVA 1.7 or later; both the Java Runtime Environment (JRE) and Java Development Kit (JDK) are supported.

For Replicat to function as a Kafka Producer, the Big Data Kafka module may be integrated with any database version of Oracle GoldenGate; however, the best practice is to install Oracle GoldenGate for Non-Oracle Databases; also known as generic GoldenGate, which is packaged as part of the Oracle GoldenGate for Big Data release.

Configuring the Apache Kafka Handler

To setup the Kafka Handler for data delivery, we need to configure three files: (1) a GoldenGate Replicat parameter file, (2) a GoldenGate Big Data properties file, and (3) a customer Kafka Producer properties file. I have placed all three files in the Oracle GoldenGate configuration file default directory; dirprm.

Replicat Parameter File

Below is my Replicat parameter file:

REPLICAT RKAFKA
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
GROUPTRANSOPS 10000
MAP PDBORCL.TPC.ORDERS, TARGET TPC.ORDERS;
MAP PDBORCL.TPC.ORDERS_PRODUCTS, TARGET TPC.ORDERS_PRODUCTS;
MAP PDBORCL.TPC.ORDERS_STATUS_HISTORY, TARGET TPC.ORDERS_STATUS_HISTORY;

Important items to note in the Replicat parameter file:

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props : Overrides the default Replicat product code by loading the GG JAVA object and Kafka specific functionality. In this example my Kafka properties file is named kafka.props in the GoldenGate dirprm directory.

GROUPTRANSOPS 10000 : Groups 10,000 transactions from the source trail files into a single target transaction. This is the default and improves the performance of Big Data integrations. Typically you do not specify default settings in GoldenGate parameter files; however, in this case it is a good idea to do so to minimize confusion in environments where the database specific GoldenGate application is installed.

Properties File

Below is my Kafka properties file:

## RKAFKA properties for Kafka Topic apply
##
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp}
gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|
gg.handler.kafkahandler.SchemaTopicName=tpcSchemaTopic
gg.handler.kafkahandler.BlockingSend=false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
##
gg.log=log4j
gg.log.level=INFO
##
gg.report.time=30sec
##
gg.classpath=dirprm/:/home/oracle/kafka_2.11-0.9.0.0/libs/*
##
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

Items to note in the Kafka properties file:

gg.handlerlist : Defines a unique name for this handler. All properties defined with this handler name will used to make the Kafka Producer connection.

gg.handler.[handler name].type : Defines the Oracle GoldenGate Big Data Handler to use.

gg.handler.[handler name].KafkaProducerConfigFile : File that contains the configuration properties for an Apache Kafka Producer. This file must be in a location defined by the classpath setting.

gg.handler.[handler name].topicMappingTemplate : A template string value to resolve the Kafka topic name at runtime.

gg.handler.[handler name].keyMappingTemplate : A template string value to resolve the Kafka message key at runtime.

gg.handler.[handler name].format : Formatter to be used when writing data to the Kafka Topic: xml, delimitedtext, json, avro_row, or avro_op.

gg.handler.[handler name].BlockingSend : Defines how messaged are sent to Kafka; true for Blocking Mode or false for Non-Blocking Mode. In Blocking Mode, messages are delivered synchronous and the Kafka Handler waits for an acknowledgment from the Kafka server before sending the next message. This mode provides guarantees message delivery, but has reduced performance. In Non-Blocking Mode messages are delivered asynchronous. Messages are published to Kafka as they are read from the GoldenGate Trail by Replicat. For best performance, Non-Blocking Mode is best practice.

gg.classpath : Defines the location of the Kafka libraries required by the Big Data Handler to connect to Kafka and format messages, and the location of the Apache Kafka producer configuration file.

Kafka Producer Properties File

My Kafka Producer properties file contains:

bootstrap.servers=kafka-0:9092,kafka-0:9093,kafka-0:9094
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000
send.buffer.bytes = 5024000

These configuration items are defined in the Apache Kafka documentation. Any configurable Producer options may be defined in this file. Items to note in the Kafka Producer properties file include:

– bootstrap.servers : A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Since these servers are just used for the initial connection to discover the full cluster membership, this list need not contain the full set of servers; however, you may want more than one in case a server is down. If no server in this list is available the GoldenGate Replicat will abend.

– acks : The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.

– batch.size : The Producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

– linger.ms : The amount of time the Producer waits for messages to arrive before submitting the batch.

– max.request.size : The maximum size of a request. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.

– send.buffer.bytes : The size of the TCP send buffer to use when sending data.

Data Formats

As noted previously, the following data formats are supported by the GoldenGate Kafka Handler: xml, delimitedtext, json, avro_row, or avro_op. Let’s capture some Oracle Database records and see the Kafka Topic data created for each format.

XML Format

To enable the xml data format, modify the kafka.props file as follows:

gg.handler.kafkahandler.format=xml

I then start the RKAFKA Replicat to process data coming over from my Oracle Database. Data is published to the Topic in the xml format:

<operation table=’TPC.ORDERS_STATUS_HISTORY’ type=’I’ ts=’2016-06-16 17:32:22.997966′ current_ts=’2016-06-21T12:51:59.505014′ pos=’00000000000000151048′ numCols=’6′>
<col name=’ORDERS_STATUS_HISTORY_ID’ index=’0′>
<before missing=’true’/>
<after><![CDATA[2]]></after>
</col>
<col name=’ORDERS_ID’ index=’1′>
<before missing=’true’/>
<after><![CDATA[7]]></after>
</col>
<col name=’ORDERS_STATUS’ index=’2′>
<before missing=’true’/>
<after><![CDATA[1]]></after>
</col>
<col name=’DATE_ADDED’ index=’3′>
<before missing=’true’/>
<after><![CDATA[2016-06-16:13:32:23.172212000]]></after>
</col>
<col name=’CUSTOMER_NOTIFIED’ index=’4′>
<before missing=’true’/>
<after><![CDATA[1]]></after>
</col>
<col name=’COMMENTS’ index=’5′>
<before missing=’true’/>
<after><![CDATA[Order received, customer notified]]></after>
</col>
</operation>

Delimited Text Format

Setting delimitedtext as the output format in the kafka.props file:

gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|

Publishes the following output to the Kafka Topic:

I|TPC.ORDERS_STATUS_HISTORY|2016-06-16 17:32:22.997966|2016-06-21T13:07:00.913024|00000000000000151048|2|7|1|2016-06-16:13:32:23.172212000|1|Order received, customer notified

Note that I changed the default delimiter to pipe (|) from the default Hive delimiter (CDATA[001]) for clarity.

JSON Format

Setting json as the output format in the kafka.props file:

gg.handler.kafkahandler.format=json

Publishes the following output to the Kafka Topic:

{“table”:”TPC.ORDERS_STATUS_HISTORY”,”op_type”:”I”,”op_ts”:”2016-06-16 17:32:22.997966″,”current_ts”:”2016-06-21T13:27:22.392000″,”pos”:”00000000000000151048″,”after”:{“ORDERS_STATUS_HISTORY_ID”:2,”ORDERS_ID”:7,”ORDERS_STATUS”:1,”DATE_ADDED”:”2016-06-16:13:32:23.172212000″,”CUSTOMER_NOTIFIED”:1,”COMMENTS”:”Order received, customer notified”}}

Avro Operation Format

The Avro Operation Formatter formats operation data from the source trail file into messages in an Avro binary array format. Each insert, update, delete and truncate operation will be formatted into an individual Avro message. The Avro Operation Formatter takes the before and after image data from the GoldenGate Trail and formats the data into an Avro binary representation of the operation data.

When the Avro formatters are used a schema Topic must be specified if schema data is to be propagated. Set the following on the kafka.props file to enable Avro Operation Format.

gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.SchemaTopicName=tpcSchemaTopic

For Avero Operation, Kafka handler will publish the following to the data topic:

2TPC.ORDERS_STATUS_HISTORYU42016-06-22 16:53:40.00064742016-06-22T13:13:13.650000(000000000200000022730ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@�?:2016-06-16:13:29:49.144858000�?BOrder received, customer notified@@”@:2016-06-16:13:29:49.1448580008customer aknowledged receipt
2TPC.ORDERS_STATUS_HISTORYU42016-06-22 17:06:13.00066542016-06-22T13:13:13.670000(000000000200000031510ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@”@:2016-06-16:13:29:49.144858000�?8customer aknowledged receipt@@�?:2016-06-16:13:29:49.144858000�?BOrder received, customer notified

Avro schemas are represented as JSONs. Avro schemas define the format of generated Avro messages and are required to serialize and deserialize Avro messages. Avro schemas are generated on a JIT (just in time) basis when the first operation for a table is encountered. Avro schemas are specific to a table definition, which means that a separate Avro schema will be generated for every table encountered for processed operations. The Kafka handler will publish the following to the Avro schema topic:

{
“type” : “record”,
“name” : “ORDERS_STATUS_HISTORY”,
“namespace” : “TPC”,
“fields” : [ {
“name” : “table”,
“type” : “string”
}, {
“name” : “op_type”,
“type” : “string”
}, {
“name” : “op_ts”,
“type” : “string”
}, {
“name” : “current_ts”,
“type” : “string”
}, {
“name” : “pos”,
“type” : “string”
}, {
“name” : “primary_keys”,
“type” : {
“type” : “array”,
“items” : “string”
}
}, {
“name” : “tokens”,
“type” : {
“type” : “map”,
“values” : “string”
},
“default” : { }
}, {
“name” : “before”,
“type” : [ “null”, {
“type” : “record”,
“name” : “columns”,
“fields” : [ {
“name” : “ORDERS_STATUS_HISTORY_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_HISTORY_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_STATUS”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_isMissing”,
“type” : “boolean”
}, {
“name” : “DATE_ADDED”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “DATE_ADDED_isMissing”,
“type” : “boolean”
}, {
“name” : “CUSTOMER_NOTIFIED”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “CUSTOMER_NOTIFIED_isMissing”,
“type” : “boolean”
}, {
“name” : “COMMENTS”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “COMMENTS_isMissing”,
“type” : “boolean”
} ]
} ],
“default” : null
}, {
“name” : “after”,
“type” : [ “null”, “columns” ],
“default” : null
} ]
}

Avro Row Format

The Avro Row Formatter formats operation data from the source trail file into messages in an Avro binary array format. The Avro Row Formatter formats operations from the source trail file into a format that represents the row data. This format is more compact than the output from the Avro Operation Formatter.

When the Avro formatters are used a schema Topic must be specified if schema data is to be propagated. Set the following on the kafka.props file to enable Avro Row Format.

gg.handler.kafkahandler.format=avro_row
gg.handler.kafkahandler.SchemaTopicName=tpcSchemaTopic

For Avero Operation, Kafka handler will publish the following to the data topic:

TPC.ORDERS_STATUS_HISTORYU42016-06-22 16:53:40.00064742016-06-22T13:10:21.786000(000000000200000022730ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@”@:2016-06-16:13:29:49.1448580008customer aknowledged receipt2TPC.ORDERS_STATUS_HISTORYU42016-06-22 17:06:13.00066542016-06-22T13:10:21.804000(000000000200000031510ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@�?:2016-06-16:13:29:49.144858000�?BOrder received, customer notified

Avro schemas are represented as JSONs. Avro schemas define the format of generated Avro messages and are required to serialize and deserialize Avro messages. Avro schemas are generated on a JIT (just in time) basis when the first operation for a table is encountered. Avro schemas are specific to a table definition, which means that a separate Avro schema will be generated for every table encountered for processed operations. The Kafka handler will publish the following to the Avro schema topic:

{
“type” : “record”,
“name” : “ORDERS_STATUS_HISTORY”,
“namespace” : “TPC”,
“fields” : [ {
“name” : “table”,
“type” : “string”
}, {
“name” : “op_type”,
“type” : “string”
}, {
“name” : “op_ts”,
“type” : “string”
}, {
“name” : “current_ts”,
“type” : “string”
}, {
“name” : “pos”,
“type” : “string”
}, {
“name” : “primary_keys”,
“type” : {
“type” : “array”,
“items” : “string”
}
}, {
“name” : “tokens”,
“type” : {
“type” : “map”,
“values” : “string”
},
“default” : { }
}, {
“name” : “before”,
“type” : [ “null”, {
“type” : “record”,
“name” : “columns”,
“fields” : [ {
“name” : “ORDERS_STATUS_HISTORY_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_HISTORY_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_STATUS”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_isMissing”,
“type” : “boolean”
}, {
“name” : “DATE_ADDED”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “DATE_ADDED_isMissing”,
“type” : “boolean”
}, {
“name” : “CUSTOMER_NOTIFIED”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “CUSTOMER_NOTIFIED_isMissing”,
“type” : “boolean”
}, {
“name” : “COMMENTS”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “COMMENTS_isMissing”,
“type” : “boolean”
} ]
} ],
“default” : null
}, {
“name” : “after”,
“type” : [ “null”, “columns” ],
“default” : null
} ]
}

Publishing to Multiple Topics

The Kafka Handler allows operation data from the source trail file to be published to separate topics based on the corresponding table name of the operation data. This feature allows sorting of operation data from the source trail file by the source table name. The feature is enabled by setting the following configuration in the kafka.props file:

gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.mode=op

The mode must be set to op and the Kafka topic name used is the fully qualified table name of the source operation. The topics are automatically created and with the topic name equal to the fully-qualified table name.

When topic partitioning is enabled, running Kafka commands to list out all topics displays the following in my test environment:

[oracle@kafka-0 ~]$ $KAFKA_HOME/bin/kafka-topics.sh –list –zookeeper localhost:2181
ORDERS
ORDERS_PRODUCTS
ORDERS_STATUS_HISTORY

 

Summary

In this article we introduced you to the Oracle GoldenGate Big Data Adapter for Apache Kafka by providing an example on how to setup and configure the Oracle GoldenGate Replicat to act as a Kafka Producer. We further demonstrated the data formatter capabilities and how to enable publishing to multiple Kafka Topics.

For more information on what other articles are available for Oracle GoldenGate please view our index page.

Comments

  1. Boris Tyukin says:

    Hi Loren,

    do you have any recommendations on how to handle very large BLOBs with Oracle–>Kafka? in our size our blobs can be near 100Mb on the extreme end. One common method to handle with custom java kafka producers is to split them and then combine on consumer side, or store in HDFS and keep a reference in Kafka message. But since we have no control of GoldenGate, what is the best way to support large blobs?

    • Hi Boris,

      Since Apache Kafka limits the size of messages OGG for Big Data may apply into the stream, we are limited to their API specification. But, you can use the OGG-BD HDFS handler to store them in HDFS and have a reference in Kafka as you described. Without knowing a lot of detail about your BLOB data and specific use case, we really cannot provide any guidance other than this. I spoke with Product Management and they recommend that you open a SR to enhance the OGG for Big Data product to support the splitting of BLOB data into smaller packets that can be applied to KAFKA. We will need specifics about the use case, BLOB data, and some details about how your consumer logic puts the messages back together for use downstream.

      Regards,
      Loren Penton
      Oracle Data Integration A-Team

  2. Bibin John says:

    Hi Loren,

    Could you please add an article for cassandra capture using OGGBD 12.3.2.1.0?

    Thank you
    Bibin John

  3. Boris Tyukin says:

    Hi Loren,

    I’ve read almost all your posts about GoldenGate and I am very impressed with a wealth of your knowledge and willingness to share it with the world. We are thinking about purchasing GG for Kafka but I have a few questions that I am still not clear on after reading the docs 3 times. We need to stream data from our vendor-managed Oracle system into Kafka. They have to stay on 11g version for a while and they already have GG classical version (non-Big Data) that they use for DR and other purposes.

    Our plan is to license Oracle GG for Big Data and use trail files, produced by that GG instance and then replicate changes to Kafka. But according to this doc https://docs.oracle.com/goldengate/bd1221/gg-bd/GBDIG/toc.htm#GBDIG108 it looks like a special version of GG must be installed at the source database. Is that right? Does it mean DBAs would have to support two versions of GG on the same database?

    My other concern is a version of Kafka, supported by GG as we want to run the latest/greatest, provided with our Cloudera distro. Does GG assume/certifies a specific version of Kafka?

    Lastly, if we want to use latest GG features, can we still use it with Oracle 11g or we have to upgrade database to 12?

    Hope my questions make sense,
    Boris

    • Hi Boris,

      The use case you presented seems fairly straight forward; but, you may want to run this by your Oracle account team as they know your environment and implementation better that I.

      To me, the simplest solution would be to add an Extract Data Pump to the existing OGG Classic instance running at the Oracle Database server that will transport the captured data to an OGG Big Data 12.3.2.1 instance that will apply the data to Kafka.

      As of this date OGG-BD 12.3.2.1 is the latest release, supports Kafka versions 0.9.0.* through 1.0.0, and will be able to process the pre-12.3.2.1 OGG Trails coming from the OGG Classic instance. This OGG-BD instance may reside on the Oracle Database server with the OGG Classic instance, or may be placed on another server; so, yes, the DBAs will need to manage two different OGG environments.

      You did not specify which version of Oracle 11g your source database may be; but OGG Classic 12.2 supports Oracle Database 11.2.0.1 and above, while OGG Classic 12.3 and OGG Microservices Architecture support Oracle Database 11.2.0.4 and above.

      Regards,
      Loren

      • Boris Tyukin says:

        thanks for the detailed response, Loren!

        Just to make sure I understand. Our source Oracle database v11.2.0.4.0 and it runs older OGG Classic version 11. Our target environment is Kafka and OGG BD 12.3.2.1 let say installed on a dedicated Linux server with no Oracle DB on it.

        Are you saying that it would be fine to ship trails, produced by OGG 11 Classic to a remote server running OGG BD 12, without upgrading OGG 11 at the source? People who will be managing source OGG and target OGG are different teams if it matters.

        thanks,
        Boris

        • Boris Tyukin says:

          I guess I am confused by this line in the doc:

          The Oracle GoldenGate for Big Data can be installed with the same major release as your Oracle GoldenGate instance. Therefore, 11.1.x releases of the Big Data can only be installed to 11.1.x releases of Oracle GoldenGate; 11.2.x with 11.2.x, and 12.1.2.x with 12.1.2.x.

          • That is indeed confusing.

            It is referring to functionality that allows customers to combine Big Data functionality into their existing Oracle GoldenGate instance (which is not a best practice and frowned upon because it is a support and maintenance nightmare). What this means is that if a customer really wants to add Big Data Replicat to their OGG instance capturing from a database; then the existing OGG instance and the Big Data Adapters must be the same release version.

            Regards,
            Loren

        • Boris,

          You really need to be talking to your Oracle Account Team. Oracle GoldenGate 11.1 is 8 year old software and Oracle A-Team only works with the latest and upcoming product releases. I cannot say for certain that the 12.3 target will be able to process the 11.1 Trails.

          Regards,
          Loren

  4. Hello Loren,

    I get the following error. can you please help resolve?

    t property: topicPartitioning:=”table” (class: oracle.goldengate.handler.kafka.KafkaHandler).
    at oracle.goldengate.datasource.DataSourceLauncher.(DataSourceLauncher.java:161)
    at oracle.goldengate.datasource.UserExitMain.main(UserExitMain.java:108)
    Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [oracle.goldengate.datasource.GGDataSource]: Factory method ‘getDataSource’ threw exception; nested exception is oracle.goldengate.util.ConfigException: Unable to set property on handler ‘kafkahandler’ (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: topicPartitioning:=”table” (class: oracle.goldengate.handler.kafka.KafkaHandler).
    2017-09-21T12:47:31.179-0400 ERROR OGG-01668 Oracle GoldenGate Delivery, rkafka.prm: PROCESS ABENDING.

    I also tried with the recommendation

    – Remove or comment out the settings:
    gg.handler.kafkahandler.TopicName=oggtopic
    gg.handler.kafkahandler.mode=tx

    – Add the settings:
    gg.handler.kafkahandler.topicPartitioning=table
    gg.handler.kafkahandler.mode=op

    But no luck!

    Thanks,
    Sujitha

    • Hi Sujitha,

      There is not enough information here to provide any direction. The error message is saying that the property “table” cannot be set for the handler “kafkahandler”, is that the name you defined via the gg.handlerlist setting?

      If you provide your complete properties file, I’ll take a look; however, A-Team is not able to provide customer support or troubleshooting assistance via our blog. For that you should open a ticket with Oracle Support.

      Regards,
      Loren Penton

      • Loren,

        I have the properties file below:

        fyi – I am following – Step by Step Installation of 12.2 GoldenGate Replicat for KafKa example (Doc ID 2238324.1)

        ######### kafka.props
        gg.handlerlist = kafkahandler
        gg.handler.kafkahandler.type = kafka
        gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
        #gg.handler.kafkahandler.TopicName =oggtopic
        gg.handler.kafkahandler.format =json
        #gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
        gg.handler.kafkahandler.BlockingSend =false
        gg.handler.kafkahandler.includeTokens=false
        gg.handler.kafkahandler.topicPartitioning=table
        gg.handler.kafkahandler.mode=op
        #gg.handler.kafkahandler.mode =tx
        #gg.handler.kafkahandler.maxGroupSize =100, 1Mb
        #gg.handler.kafkahandler.minGroupSize =50, 500Kb

        goldengate.userexit.timestamp=utc
        goldengate.userexit.writers=javawriter
        javawriter.stats.display=TRUE
        javawriter.stats.full=TRUE
        gg.log=log4j
        gg.log.level=INFO
        gg.report.time=30sec
        gg.classpath=dirprm/:/u01/kafka_2.11-0.9.0.1/libs/*:
        javawriter.bootoptions=-Xmx128m -Xms32m -Djava.class.path=ggjava/ggjava.jar:./dirprm -Dlog_file_name=my_kafka

        ###########custom_kafka_producer.properties
        bootstrap.servers=localhost:9092 ### this points to kafka server listener configured in step I-4-2-2
        acks=1
        compression.type=gzip
        reconnect.backoff.ms=1000
        value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
        key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
        # 100KB per partition
        batch.size=102400
        linger.ms=10000

        Unable to start the replicat process

        Thanks,
        Sujitha

        • Sujitha,

          I cannot troubleshoot documents published by Oracle Support in their Knowledge Base. The document you referenced is unrelated to this blog article. Please open a ticket for assistance with Oracle Support.

          Regards,
          Loren Penton

  5. Thomas Robert says:

    Hi Loren,

    I am unable to get the “Publishing to Multiple Topics” to work. I am following the official documentation:
    Integrating Oracle GoldenGate for Big Data – Using the Kafka Handler – Chapter 5.2.

    There is says:

    “The Kafka Handler allows operation data from the source trail file to be published to separate topics based on the corresponding table name of the operation data. It allows the sorting of operation data from the source trail file by the source table name. It is enabled by setting the following configuration property in the Java Adapter properties file as follows:

    gg.handler.kafka.topicPartitioning=table
    gg.handler.kafka.mode=op

    The mode must be set to op and the Kafka topic name used is the fully qualified table name of the source operation.”

    But that doesn’t work for me.
    If I comment the default topic, like

    # gg.handler.kafkahandler.TopicName =oggtopic

    then my replicat doesn’t start anymore, with the error message:

    ERROR OGG-15051 Java or JNI exception:
    oracle.goldengate.util.GGException: org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘userExitDataSource’ defined in class path resource [oracle/goldengate/datasource/DataSource-context.xml]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [oracle.goldengate.datasource.GGDataSource]: Factory method ‘getDataSource’ threw exception; nested exception is oracle.goldengate.util.ConfigException: A valid topic name needs to be defined for the Kafka Handler or configure table selection based on table name.

    What’s wrong??

    • Hi Thomas,

      Excellent question as I did not cover this in the original blog article. To have OGG create a topic for each table being replicated, make the following changes:
      1. In the kafka server properties file add: auto.create.topics.enable=true
      – The OGG documentation says this is the default, but that was not the case in my test environment.

      2. Modify the OGG Replicat kafka properties file as follows (these settings are from my properties file for the handler named “kafkahandler”):
      – Remove or comment out the settings:
      gg.handler.kafkahandler.TopicName=oggtopic
      gg.handler.kafkahandler.mode=tx

      – Add the settings:
      gg.handler.kafkahandler.topicPartitioning=table
      gg.handler.kafkahandler.mode=op

      I then tested by starting Zookeeper, the Kafka server, the OGG Replicat, and two Kafka Consumers. I did not predefine any Kafka Topics, instead I let OGG create the topics on the fly. Here are my test results:

      ./bin/kafka-console-consumer.sh –zookeeper kafka-0:2181 –topic TPC.ORDERS –from-beginning

      TPC.ORDERS[1]I42016-09-13 14:23:08.99936242017-02-09T08:57:30.189000(00000000000000010775[1] ORDERS_ID[1][1]t?@[1]??[1]Loren Penton[1]3268W 33rd St[1]New York[1]90458[1]GA[1]United States[1]504.555.1212[1]*loren@lorenpenton.com[1]@[1]Loren Penton[1]3268W 33rd St[1]New York[1]90458[1]GA[1]United States[1]@[1]Loren Penton[1]3268W 33rd St[1]New York[1]90458[1]GA[1]United States[1]@[1]Credit Card[1] American Express[1]Loren Penton[1] 43541516626308514.20[1]:2016-09-13:10:23:09.614011000[1]??[1]USD[1]??

      ./bin/kafka-console-consumer.sh –zookeeper kafka-0:2181 –topic TPC.ORDERS_STATUS_HISTORY –from-beginning

      2TPC.ORDERS_STATUS_HISTORY[1]I42016-09-13 14:23:09.99936242017-02-09T08:57:30.580000(000000000000000212410ORDERS_STATUS_HISTORY_ID ORDERS_ID DATE_ADDED[1][1]@[1]t?@[1]??[1]:2016-09-13:10:23:10.046815000[1]??[1]BOrder received, customer notified

      Regards,
      Loren Penton

      • Thomas Robert says:

        Hi Loren,

        oh yes … it is
        gg.handler.kafkahandler.topicPartitioning=table
        and not
        gg.handler.kafka.topicPartitioning=table

        Now it works perfectly well 🙂

        Thank you
        Thomas

Add Your Comment