Oracle GoldenGate Big Data Adapter: Apache Kafka Producer

Introduction

Apache Kafka is a distributed, partitioned, replicated commit log service that provides the functionality of a Java Messaging System. The Oracle GoldenGate for Big Data Kafka Handler acts as a Kafka Producer that writes serialized change capture data from an Oracle GoldenGate Trail to a Kafka Topic.

In this article we will setup the Oracle GoldenGate Big Data Kafka Handler, configure data apply from Oracle 12c tables, and show examples of the different big data formatters available in the Kafka Handler.

This document covers functionality present in Oracle GoldenGate version 12.3; which may not be available in earlier product releases.

The concepts, scripts, and information presented in this article are for educational purposes only. They are not supported by Oracle Development or Support, and come with no guarantee or warrant for functionality in any environment other than the test system used to prepare this article. Before applying any changes presented in this article to your environment, you should thoroughly test to assess functionality and performance implications.

Main Article

The Oracle GoldenGate for Big Data product contains built-in support to write operation data from Oracle GoldenGate trail records into various Big Data targets, such as Apache Kafka. The functionality is separated into handlers that integrate with third party applications and formatters, which transform the data into various formats, such as Avro, JSON, delimited text, and XML. The Oracle GoldenGate Big Data Kafka module seamlessly integrates with Oracle GoldenGate Replicat for data apply to Kafka Topics.

For this article, we shall be using an Apache Kafka version 1.1.0 running on a Linux Virtual Machine as the Oracle GoldenGate target instance. Oracle GoldenGate for Big Data version 12.3.1.1 and Java version 8 complete the requisite software components. Environment variables set in the shell include:

# User specific environment and startup programs
export KAFKA_HOME=/home/oracle/kafka_2.11-0.9.0.0
export JAVA_HOME=/usr/java/jdk1.8.0_92
export JRE_HOME=/usr/java/jdk1.8.0_92/jre
export LD_LIBRARY_PATH=/usr/java/jdk1.8.0_92/jre/lib/amd64/server
export OGG_HOME=/u01/OGG123BD
#
export PATH=$KAFKA_HOME/bin:$JAVA_HOME/bin:$JRE_HOME/bin:$KAFKA_HOME:$PATH

Installation

Oracle GoldenGate for BigData requires JAVA 1.7 or later; both the Java Runtime Environment (JRE) and Java Development Kit (JDK) are supported.

For Replicat to function as a Kafka Producer, the Big Data Kafka module may be integrated with any database version of Oracle GoldenGate; however, the best practice is to install Oracle GoldenGate for Non-Oracle Databases; also known as generic GoldenGate, which is packaged as part of the Oracle GoldenGate for Big Data release.

Configuring the Apache Kafka Handler

To setup the Kafka Handler for data delivery, we need to configure three files: (1) a GoldenGate Replicat parameter file, (2) a GoldenGate Big Data properties file, and (3) a customer Kafka Producer properties file. I have placed all three files in the Oracle GoldenGate configuration file default directory; dirprm.

Replicat Parameter File

Below is my Replicat parameter file:

REPLICAT RKAFKA
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
GROUPTRANSOPS 10000
MAP PDBORCL.TPC.ORDERS, TARGET TPC.ORDERS;
MAP PDBORCL.TPC.ORDERS_PRODUCTS, TARGET TPC.ORDERS_PRODUCTS;
MAP PDBORCL.TPC.ORDERS_STATUS_HISTORY, TARGET TPC.ORDERS_STATUS_HISTORY;

Important items to note in the Replicat parameter file:

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props : Overrides the default Replicat product code by loading the GG JAVA object and Kafka specific functionality. In this example my Kafka properties file is named kafka.props in the GoldenGate dirprm directory.

GROUPTRANSOPS 10000 : Groups 10,000 transactions from the source trail files into a single target transaction. This is the default and improves the performance of Big Data integrations. Typically you do not specify default settings in GoldenGate parameter files; however, in this case it is a good idea to do so to minimize confusion in environments where the database specific GoldenGate application is installed.

Properties File

Below is my Kafka properties file:

## RKAFKA properties for Kafka Topic apply
##
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp}
gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|
gg.handler.kafkahandler.SchemaTopicName=tpcSchemaTopic
gg.handler.kafkahandler.BlockingSend=false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
##
gg.log=log4j
gg.log.level=INFO
##
gg.report.time=30sec
##
gg.classpath=dirprm/:/home/oracle/kafka_2.11-0.9.0.0/libs/*
##
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

Items to note in the Kafka properties file:

gg.handlerlist : Defines a unique name for this handler. All properties defined with this handler name will used to make the Kafka Producer connection.

gg.handler.[handler name].type : Defines the Oracle GoldenGate Big Data Handler to use.

gg.handler.[handler name].KafkaProducerConfigFile : File that contains the configuration properties for an Apache Kafka Producer. This file must be in a location defined by the classpath setting.

gg.handler.[handler name].topicMappingTemplate : A template string value to resolve the Kafka topic name at runtime.

gg.handler.[handler name].keyMappingTemplate : A template string value to resolve the Kafka message key at runtime.

gg.handler.[handler name].format : Formatter to be used when writing data to the Kafka Topic: xml, delimitedtext, json, avro_row, or avro_op.

gg.handler.[handler name].BlockingSend : Defines how messaged are sent to Kafka; true for Blocking Mode or false for Non-Blocking Mode. In Blocking Mode, messages are delivered synchronous and the Kafka Handler waits for an acknowledgment from the Kafka server before sending the next message. This mode provides guarantees message delivery, but has reduced performance. In Non-Blocking Mode messages are delivered asynchronous. Messages are published to Kafka as they are read from the GoldenGate Trail by Replicat. For best performance, Non-Blocking Mode is best practice.

gg.classpath : Defines the location of the Kafka libraries required by the Big Data Handler to connect to Kafka and format messages, and the location of the Apache Kafka producer configuration file.

Kafka Producer Properties File

My Kafka Producer properties file contains:

bootstrap.servers=kafka-0:9092,kafka-0:9093,kafka-0:9094
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000
send.buffer.bytes = 5024000

These configuration items are defined in the Apache Kafka documentation. Any configurable Producer options may be defined in this file. Items to note in the Kafka Producer properties file include:

– bootstrap.servers : A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Since these servers are just used for the initial connection to discover the full cluster membership, this list need not contain the full set of servers; however, you may want more than one in case a server is down. If no server in this list is available the GoldenGate Replicat will abend.

– acks : The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.

– batch.size : The Producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

– linger.ms : The amount of time the Producer waits for messages to arrive before submitting the batch.

– max.request.size : The maximum size of a request. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.

– send.buffer.bytes : The size of the TCP send buffer to use when sending data.

Data Formats

As noted previously, the following data formats are supported by the GoldenGate Kafka Handler: xml, delimitedtext, json, avro_row, or avro_op. Let’s capture some Oracle Database records and see the Kafka Topic data created for each format.

XML Format

To enable the xml data format, modify the kafka.props file as follows:

gg.handler.kafkahandler.format=xml

I then start the RKAFKA Replicat to process data coming over from my Oracle Database. Data is published to the Topic in the xml format:

<operation table=’TPC.ORDERS_STATUS_HISTORY’ type=’I’ ts=’2016-06-16 17:32:22.997966′ current_ts=’2016-06-21T12:51:59.505014′ pos=’00000000000000151048′ numCols=’6′>
<col name=’ORDERS_STATUS_HISTORY_ID’ index=’0′>
<before missing=’true’/>
<after><![CDATA[2]]></after>
</col>
<col name=’ORDERS_ID’ index=’1′>
<before missing=’true’/>
<after><![CDATA[7]]></after>
</col>
<col name=’ORDERS_STATUS’ index=’2′>
<before missing=’true’/>
<after><![CDATA[1]]></after>
</col>
<col name=’DATE_ADDED’ index=’3′>
<before missing=’true’/>
<after><![CDATA[2016-06-16:13:32:23.172212000]]></after>
</col>
<col name=’CUSTOMER_NOTIFIED’ index=’4′>
<before missing=’true’/>
<after><![CDATA[1]]></after>
</col>
<col name=’COMMENTS’ index=’5′>
<before missing=’true’/>
<after><![CDATA[Order received, customer notified]]></after>
</col>
</operation>

Delimited Text Format

Setting delimitedtext as the output format in the kafka.props file:

gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|

Publishes the following output to the Kafka Topic:

I|TPC.ORDERS_STATUS_HISTORY|2016-06-16 17:32:22.997966|2016-06-21T13:07:00.913024|00000000000000151048|2|7|1|2016-06-16:13:32:23.172212000|1|Order received, customer notified

Note that I changed the default delimiter to pipe (|) from the default Hive delimiter (CDATA[001]) for clarity.

JSON Format

Setting json as the output format in the kafka.props file:

gg.handler.kafkahandler.format=json

Publishes the following output to the Kafka Topic:

{“table”:”TPC.ORDERS_STATUS_HISTORY”,”op_type”:”I”,”op_ts”:”2016-06-16 17:32:22.997966″,”current_ts”:”2016-06-21T13:27:22.392000″,”pos”:”00000000000000151048″,”after”:{“ORDERS_STATUS_HISTORY_ID”:2,”ORDERS_ID”:7,”ORDERS_STATUS”:1,”DATE_ADDED”:”2016-06-16:13:32:23.172212000″,”CUSTOMER_NOTIFIED”:1,”COMMENTS”:”Order received, customer notified”}}

Avro Operation Format

The Avro Operation Formatter formats operation data from the source trail file into messages in an Avro binary array format. Each insert, update, delete and truncate operation will be formatted into an individual Avro message. The Avro Operation Formatter takes the before and after image data from the GoldenGate Trail and formats the data into an Avro binary representation of the operation data.

When the Avro formatters are used a schema Topic must be specified if schema data is to be propagated. Set the following on the kafka.props file to enable Avro Operation Format.

gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.SchemaTopicName=tpcSchemaTopic

For Avero Operation, Kafka handler will publish the following to the data topic:

2TPC.ORDERS_STATUS_HISTORYU42016-06-22 16:53:40.00064742016-06-22T13:13:13.650000(000000000200000022730ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@�?:2016-06-16:13:29:49.144858000�?BOrder received, customer notified@@”@:2016-06-16:13:29:49.1448580008customer aknowledged receipt
2TPC.ORDERS_STATUS_HISTORYU42016-06-22 17:06:13.00066542016-06-22T13:13:13.670000(000000000200000031510ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@”@:2016-06-16:13:29:49.144858000�?8customer aknowledged receipt@@�?:2016-06-16:13:29:49.144858000�?BOrder received, customer notified

Avro schemas are represented as JSONs. Avro schemas define the format of generated Avro messages and are required to serialize and deserialize Avro messages. Avro schemas are generated on a JIT (just in time) basis when the first operation for a table is encountered. Avro schemas are specific to a table definition, which means that a separate Avro schema will be generated for every table encountered for processed operations. The Kafka handler will publish the following to the Avro schema topic:

{
“type” : “record”,
“name” : “ORDERS_STATUS_HISTORY”,
“namespace” : “TPC”,
“fields” : [ {
“name” : “table”,
“type” : “string”
}, {
“name” : “op_type”,
“type” : “string”
}, {
“name” : “op_ts”,
“type” : “string”
}, {
“name” : “current_ts”,
“type” : “string”
}, {
“name” : “pos”,
“type” : “string”
}, {
“name” : “primary_keys”,
“type” : {
“type” : “array”,
“items” : “string”
}
}, {
“name” : “tokens”,
“type” : {
“type” : “map”,
“values” : “string”
},
“default” : { }
}, {
“name” : “before”,
“type” : [ “null”, {
“type” : “record”,
“name” : “columns”,
“fields” : [ {
“name” : “ORDERS_STATUS_HISTORY_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_HISTORY_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_STATUS”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_isMissing”,
“type” : “boolean”
}, {
“name” : “DATE_ADDED”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “DATE_ADDED_isMissing”,
“type” : “boolean”
}, {
“name” : “CUSTOMER_NOTIFIED”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “CUSTOMER_NOTIFIED_isMissing”,
“type” : “boolean”
}, {
“name” : “COMMENTS”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “COMMENTS_isMissing”,
“type” : “boolean”
} ]
} ],
“default” : null
}, {
“name” : “after”,
“type” : [ “null”, “columns” ],
“default” : null
} ]
}

Avro Row Format

The Avro Row Formatter formats operation data from the source trail file into messages in an Avro binary array format. The Avro Row Formatter formats operations from the source trail file into a format that represents the row data. This format is more compact than the output from the Avro Operation Formatter.

When the Avro formatters are used a schema Topic must be specified if schema data is to be propagated. Set the following on the kafka.props file to enable Avro Row Format.

gg.handler.kafkahandler.format=avro_row
gg.handler.kafkahandler.SchemaTopicName=tpcSchemaTopic

For Avero Operation, Kafka handler will publish the following to the data topic:

TPC.ORDERS_STATUS_HISTORYU42016-06-22 16:53:40.00064742016-06-22T13:10:21.786000(000000000200000022730ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@”@:2016-06-16:13:29:49.1448580008customer aknowledged receipt2TPC.ORDERS_STATUS_HISTORYU42016-06-22 17:06:13.00066542016-06-22T13:10:21.804000(000000000200000031510ORDERS_STATUS_HISTORY_IDORDERS_IDDATE_ADDED@@�?:2016-06-16:13:29:49.144858000�?BOrder received, customer notified

Avro schemas are represented as JSONs. Avro schemas define the format of generated Avro messages and are required to serialize and deserialize Avro messages. Avro schemas are generated on a JIT (just in time) basis when the first operation for a table is encountered. Avro schemas are specific to a table definition, which means that a separate Avro schema will be generated for every table encountered for processed operations. The Kafka handler will publish the following to the Avro schema topic:

{
“type” : “record”,
“name” : “ORDERS_STATUS_HISTORY”,
“namespace” : “TPC”,
“fields” : [ {
“name” : “table”,
“type” : “string”
}, {
“name” : “op_type”,
“type” : “string”
}, {
“name” : “op_ts”,
“type” : “string”
}, {
“name” : “current_ts”,
“type” : “string”
}, {
“name” : “pos”,
“type” : “string”
}, {
“name” : “primary_keys”,
“type” : {
“type” : “array”,
“items” : “string”
}
}, {
“name” : “tokens”,
“type” : {
“type” : “map”,
“values” : “string”
},
“default” : { }
}, {
“name” : “before”,
“type” : [ “null”, {
“type” : “record”,
“name” : “columns”,
“fields” : [ {
“name” : “ORDERS_STATUS_HISTORY_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_HISTORY_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_ID”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_ID_isMissing”,
“type” : “boolean”
}, {
“name” : “ORDERS_STATUS”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “ORDERS_STATUS_isMissing”,
“type” : “boolean”
}, {
“name” : “DATE_ADDED”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “DATE_ADDED_isMissing”,
“type” : “boolean”
}, {
“name” : “CUSTOMER_NOTIFIED”,
“type” : [ “null”, “double” ],
“default” : null
}, {
“name” : “CUSTOMER_NOTIFIED_isMissing”,
“type” : “boolean”
}, {
“name” : “COMMENTS”,
“type” : [ “null”, “string” ],
“default” : null
}, {
“name” : “COMMENTS_isMissing”,
“type” : “boolean”
} ]
} ],
“default” : null
}, {
“name” : “after”,
“type” : [ “null”, “columns” ],
“default” : null
} ]
}

Publishing to Multiple Topics

The Kafka Handler allows operation data from the source trail file to be published to separate topics based on the corresponding table name of the operation data. This feature allows sorting of operation data from the source trail file by the source table name. The feature is enabled by setting the following configuration in the kafka.props file:

gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.mode=op

The mode must be set to op and the Kafka topic name used is the fully qualified table name of the source operation. The topics are automatically created and with the topic name equal to the fully-qualified table name.

When topic partitioning is enabled, running Kafka commands to list out all topics displays the following in my test environment:

[oracle@kafka-0 ~]$ $KAFKA_HOME/bin/kafka-topics.sh –list –zookeeper localhost:2181
ORDERS
ORDERS_PRODUCTS
ORDERS_STATUS_HISTORY

 

Summary

In this article we introduced you to the Oracle GoldenGate Big Data Adapter for Apache Kafka by providing an example on how to setup and configure the Oracle GoldenGate Replicat to act as a Kafka Producer. We further demonstrated the data formatter capabilities and how to enable publishing to multiple Kafka Topics.

For more information on what other articles are available for Oracle GoldenGate please view our index page.

Comments

  1. Hello Loren,

    I get the following error. can you please help resolve?

    t property: topicPartitioning:=”table” (class: oracle.goldengate.handler.kafka.KafkaHandler).
    at oracle.goldengate.datasource.DataSourceLauncher.(DataSourceLauncher.java:161)
    at oracle.goldengate.datasource.UserExitMain.main(UserExitMain.java:108)
    Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [oracle.goldengate.datasource.GGDataSource]: Factory method ‘getDataSource’ threw exception; nested exception is oracle.goldengate.util.ConfigException: Unable to set property on handler ‘kafkahandler’ (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: topicPartitioning:=”table” (class: oracle.goldengate.handler.kafka.KafkaHandler).
    2017-09-21T12:47:31.179-0400 ERROR OGG-01668 Oracle GoldenGate Delivery, rkafka.prm: PROCESS ABENDING.

    I also tried with the recommendation

    – Remove or comment out the settings:
    gg.handler.kafkahandler.TopicName=oggtopic
    gg.handler.kafkahandler.mode=tx

    – Add the settings:
    gg.handler.kafkahandler.topicPartitioning=table
    gg.handler.kafkahandler.mode=op

    But no luck!

    Thanks,
    Sujitha

    • Hi Sujitha,

      There is not enough information here to provide any direction. The error message is saying that the property “table” cannot be set for the handler “kafkahandler”, is that the name you defined via the gg.handlerlist setting?

      If you provide your complete properties file, I’ll take a look; however, A-Team is not able to provide customer support or troubleshooting assistance via our blog. For that you should open a ticket with Oracle Support.

      Regards,
      Loren Penton

      • Loren,

        I have the properties file below:

        fyi – I am following – Step by Step Installation of 12.2 GoldenGate Replicat for KafKa example (Doc ID 2238324.1)

        ######### kafka.props
        gg.handlerlist = kafkahandler
        gg.handler.kafkahandler.type = kafka
        gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
        #gg.handler.kafkahandler.TopicName =oggtopic
        gg.handler.kafkahandler.format =json
        #gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
        gg.handler.kafkahandler.BlockingSend =false
        gg.handler.kafkahandler.includeTokens=false
        gg.handler.kafkahandler.topicPartitioning=table
        gg.handler.kafkahandler.mode=op
        #gg.handler.kafkahandler.mode =tx
        #gg.handler.kafkahandler.maxGroupSize =100, 1Mb
        #gg.handler.kafkahandler.minGroupSize =50, 500Kb

        goldengate.userexit.timestamp=utc
        goldengate.userexit.writers=javawriter
        javawriter.stats.display=TRUE
        javawriter.stats.full=TRUE
        gg.log=log4j
        gg.log.level=INFO
        gg.report.time=30sec
        gg.classpath=dirprm/:/u01/kafka_2.11-0.9.0.1/libs/*:
        javawriter.bootoptions=-Xmx128m -Xms32m -Djava.class.path=ggjava/ggjava.jar:./dirprm -Dlog_file_name=my_kafka

        ###########custom_kafka_producer.properties
        bootstrap.servers=localhost:9092 ### this points to kafka server listener configured in step I-4-2-2
        acks=1
        compression.type=gzip
        reconnect.backoff.ms=1000
        value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
        key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
        # 100KB per partition
        batch.size=102400
        linger.ms=10000

        Unable to start the replicat process

        Thanks,
        Sujitha

        • Sujitha,

          I cannot troubleshoot documents published by Oracle Support in their Knowledge Base. The document you referenced is unrelated to this blog article. Please open a ticket for assistance with Oracle Support.

          Regards,
          Loren Penton

  2. Thomas Robert says:

    Hi Loren,

    I am unable to get the “Publishing to Multiple Topics” to work. I am following the official documentation:
    Integrating Oracle GoldenGate for Big Data – Using the Kafka Handler – Chapter 5.2.

    There is says:

    “The Kafka Handler allows operation data from the source trail file to be published to separate topics based on the corresponding table name of the operation data. It allows the sorting of operation data from the source trail file by the source table name. It is enabled by setting the following configuration property in the Java Adapter properties file as follows:

    gg.handler.kafka.topicPartitioning=table
    gg.handler.kafka.mode=op

    The mode must be set to op and the Kafka topic name used is the fully qualified table name of the source operation.”

    But that doesn’t work for me.
    If I comment the default topic, like

    # gg.handler.kafkahandler.TopicName =oggtopic

    then my replicat doesn’t start anymore, with the error message:

    ERROR OGG-15051 Java or JNI exception:
    oracle.goldengate.util.GGException: org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘userExitDataSource’ defined in class path resource [oracle/goldengate/datasource/DataSource-context.xml]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [oracle.goldengate.datasource.GGDataSource]: Factory method ‘getDataSource’ threw exception; nested exception is oracle.goldengate.util.ConfigException: A valid topic name needs to be defined for the Kafka Handler or configure table selection based on table name.

    What’s wrong??

    • Hi Thomas,

      Excellent question as I did not cover this in the original blog article. To have OGG create a topic for each table being replicated, make the following changes:
      1. In the kafka server properties file add: auto.create.topics.enable=true
      – The OGG documentation says this is the default, but that was not the case in my test environment.

      2. Modify the OGG Replicat kafka properties file as follows (these settings are from my properties file for the handler named “kafkahandler”):
      – Remove or comment out the settings:
      gg.handler.kafkahandler.TopicName=oggtopic
      gg.handler.kafkahandler.mode=tx

      – Add the settings:
      gg.handler.kafkahandler.topicPartitioning=table
      gg.handler.kafkahandler.mode=op

      I then tested by starting Zookeeper, the Kafka server, the OGG Replicat, and two Kafka Consumers. I did not predefine any Kafka Topics, instead I let OGG create the topics on the fly. Here are my test results:

      ./bin/kafka-console-consumer.sh –zookeeper kafka-0:2181 –topic TPC.ORDERS –from-beginning

      TPC.ORDERS[1]I42016-09-13 14:23:08.99936242017-02-09T08:57:30.189000(00000000000000010775[1] ORDERS_ID[1][1]t?@[1]??[1]Loren Penton[1]3268W 33rd St[1]New York[1]90458[1]GA[1]United States[1]504.555.1212[1]*loren@lorenpenton.com[1]@[1]Loren Penton[1]3268W 33rd St[1]New York[1]90458[1]GA[1]United States[1]@[1]Loren Penton[1]3268W 33rd St[1]New York[1]90458[1]GA[1]United States[1]@[1]Credit Card[1] American Express[1]Loren Penton[1] 43541516626308514.20[1]:2016-09-13:10:23:09.614011000[1]??[1]USD[1]??

      ./bin/kafka-console-consumer.sh –zookeeper kafka-0:2181 –topic TPC.ORDERS_STATUS_HISTORY –from-beginning

      2TPC.ORDERS_STATUS_HISTORY[1]I42016-09-13 14:23:09.99936242017-02-09T08:57:30.580000(000000000000000212410ORDERS_STATUS_HISTORY_ID ORDERS_ID DATE_ADDED[1][1]@[1]t?@[1]??[1]:2016-09-13:10:23:10.046815000[1]??[1]BOrder received, customer notified

      Regards,
      Loren Penton

      • Thomas Robert says:

        Hi Loren,

        oh yes … it is
        gg.handler.kafkahandler.topicPartitioning=table
        and not
        gg.handler.kafka.topicPartitioning=table

        Now it works perfectly well 🙂

        Thank you
        Thomas

Add Your Comment