Oracle GoldenGate: Apply to Apache Flume File Roll Sink

Introduction

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. The Oracle GoldenGate for Big Data Flume Handler is designed to stream change capture data from a Oracle GoldenGate trail to a Flume File Roll Sink.

In this article we will setup the Oracle GoldenGate Big Data Flume Handler and configure data apply from Oracle 12c tables.

This document covers functionality present in Oracle GoldenGate version 12.3; which may not be available in earlier product releases.

The concepts, scripts, and information presented in this article are for educational purposes only. They are not supported by Oracle Development or Support, and come with no guarantee or warrant for functionality in any environment other than the test system used to prepare this article. Before applying any changes presented in this article to your environment, you should thoroughly test to assess functionality and performance implications.

Main Article

For this article, we shall be using an Apache Flume version 1.8.0 running on a Linux Virtual Machine as the Oracle GoldenGate target instance. Oracle GoldenGate for Big Data version 12.3.1.1 and Java version 8 complete the requisite software components.

Simply put, Apache Flume is a data ingestion mechanism for collecting, aggregating, and transporting large amounts of streaming data from various sources to a centralized data store. The basic architecture of a Flume Agent is depicted below.

Flume ArchitectureData Generator

A data generator is any data feed; such as Twitter, Facebook, or in our case the Oracle GoldenGate Big Data Adapter, that creates data to be collected by the Flume Agent.

Flume Agent

The Flume Agent is a JVM daemon process that receives the events from data generator clients or other agents and forwards it to a destination sink or agent. The Flume Agent is comprised of:

Source

The source component receives data from the data generators and transfers it to one or more channels in the form of Flume events.

Channel

A channel is a transient store which receives Flume events from the source and buffers them till they are consumed by sinks.

Sink

A sink consumes Flume events from the channels and delivers it to the destination. The destination of the sink may be another agent or an external repository; such as, HDFS, HBase, or in our case text files in a Linux file system.

Oracle GoldenGate

Now that we have an understanding of Apache Flume, we can begin setting up Oracle GoldenGate for data capture and delivery. The first step is to install Oracle GoldenGate for my source database, Oracle 12c in this case, alter the database settings to enable data capture, and create a GoldenGate Change Data Capture Extract that will retrieve near real-time transactional data from Oracle Redo.

The Oracle GoldenGate architecture we’ll be configuring is depicted below.

OGG Architecture

Configuring the Apache Flume Handler

We assume you already know how to setup Oracle GoldenGate to capture from a source database and transmit data to a target instance; therefore, we are only covering the setup of Oracle GoldenGate for delivery to Apache Flume in this article.

To setup the Apache Flume Handler for data delivery, we need to configure three files: (1) a GoldenGate Replicat parameter file, (2) a GoldenGate Big Data properties file, and (3) a customer Apache Flume properties file. I have placed all three files in the Oracle GoldenGate configuration file default directory; dirprm.

Replicat Parameter File

Below is my Replicat parameter file:

REPLICAT rflume
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume_frs.properties
REPORTCOUNT EVERY 20 MINUTES, RATE
GROUPTRANSOPS 100
MAP orcl.moviedemo.movie TARGET orcl.moviedemo.movie;

Important items to note in the Replicat parameter file:

TARGETDB LIBFILE libggjava.so SET property=dirprm/flume_frs.properties

This parameter serves as a trigger to initialize the Java module. The SET clause specifies the location and name of a Java properties file for the Java module. The Java properties file location may be specified as either an absolute path, or path relative to the Replicat executable location. In the configuration above, I used a relative path for the Oracle GoldenGate instance installed at /u01/ogg-bd.

GROUPTRANSOPS 100

This parameter controls the number of SQL operations that are contained in a Replicat transaction. The default setting is 1000, and is the best practice setting for a production environment. However, my test server is very small and I do not what to overwhelm my flume channel; so I elected to reduce the number of operations the Replicat will apply in a transaction.

Properties File

Below is my Flume properties file:

## RFLUME properties for Apache Flume Sink apply
##
gg.handlerlist=flumehandler
gg.handler.flumehandler.type=flume
gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties
gg.handler.flumehandler.format=delimitedtext
gg.handler.flumehandler.format.fieldDelimiter=,
gg.handler.flumehandler.mode=tx
gg.handler.flumehandler.EventMapsTo=tx
gg.handler.flumehandler.PropagateSchema=true
gg.handler.flumehandler.includeTokens=false
##
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
##
gg.log=log4j
gg.log.level=INFO
##
gg.report.time=30sec
##
gg.classpath=dirprm/:/home/oracle/apache-flume-1.8.0/lib/*:
##
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

The flume_frs.properties file defines the Java Virtual Machine settings for the Oracle Big Data Flume Adapter:

gg.handlerlist=flumehandler

Defines the name of the handler configuration properties in this file.

gg.handler.flumehandler.type=flume

Defines the type of this handler.

gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties

Specifies the name of the Flume Agent configuration file. This file defines the Flume connection information for Replicat to perform remote procedure calls.

gg.handler.flumehandler.format=delimitedtext

Sets the format of the output. Supported formatters are: avro_row, avro_op, delimitedtext, xml, and json.

gg.handler.flumehandler.format.fieldDelimiter=|

The default delimiter is an unprintable character, I changed it to be a vertical bar via this setting.

gg.handler.flumehandler.mode=tx

Sets the operating mode of the Java Adapter. In tx mode, output is written in transactional groups defined by the Replicat GROUPTRANSOPS setting.

gg.handler.flumehandler.EventMapsTo=tx

Defines whether each flume event would represent an operation or a transaction, based upon the setting of gg.handler.flumehandler.mode.

gg.handler.flumehandler.PropagateSchema=true

Defines whether, or not, the Flume handler publishes schema events.

gg.handler.flumehandler.includeTokens=false

When set to true, includes token data from the source trail files in the output. When set to false excludes the token data from the source trail files in the output.

gg.classpath=dirprm/:/home/oracle/apache-flume-1.8.0/lib/*:

Specifies user-defines Java classes and packages used by the Java Virtual Machine to connect to Flume and run. The classpath setting must include (1) the directory location containing the Flume Agent configuration file and (2) a list of the Flume client jars required for the Big Data Adapter to work with Flume. In this example, the Flume client jars are installed at /home/oracle/apache-flume-1.8.0/lib and we use a wildcard, *, to load all of the jars.

The Flume client library versions must match the version of Flume to which the Flume Handler is connecting.

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

Sets the JVM runtime memory allocation and the location of the Oracle GoldenGate Java Adapter dependencies (ggjava.jar) file.

Flume Agent Configuration File

My Flume Agent configuration file contains:

client.type=default
hosts=h1
hosts.h1=localhost:41414
batch-size=100
connect-timeout=20000
request-timeout=20000

This file contains the settings Oracle GoldenGate will use to connect to the Flume Agent. In this example, the Oracle GoldenGate Big Data Adapter will attempt an Avro connection to a Flume Agent running on the local machine and listening for connections on port 41414. Data will be sent to Flume in batches of 100 events. Connection and requests to the Flume Agent will time-out and fail if there is no response within 2000 ms.

Flume Agent

As previously shown, the Oracle GoldenGate Big Data Adapter is configured to connect to a Flume Agent on the local machine, listening on port 41414. To configure the Flume Agent, I created the file /home/oracle/flume/flume_frs.conf; with the following settings:

## Flume Agent Settings
##
oggfrs.channels = memoryChannel
oggfrs.sources = rpcSource
oggfrs.sinks = fr1
#
oggfrs.channels.memoryChannel.type = memory
#
oggfrs.sources.rpcSource.channels = memoryChannel
oggfrs.sources.rpcSource.type = avro
oggfrs.sources.rpcSource.bind = 0.0.0.0
oggfrs.sources.rpcSource.port = 41414
#
oggfrs.sinks.fr1.type = file_roll
oggfrs.sinks.fr1.channel = memoryChannel
oggfrs.sinks.fr1.sink.directory = /u01/flumeOut/moviedemo/movie
#
# Create a new file every 120 seconds, default is 30
oggfrs.sinks.fr1.sink.rollInterval = 120

In this file, we create a single Flume Agent named oggfrs. The oggfrs Agent will consist of an Avro RPC source that listens on port 41414 of our local host, a channel that buffers event data in memory, and a sink that writes events to files in a directory of the local file system. The sink will close the existing file and create a new one every 120 seconds.

You will notice that I set the sink directory to include the Oracle source schema and table name. This is a personal preference as the File Roll Sink does not provide a mechanism for naming the output files. Currently, the File Roll Sink names the files based upon the current timestamp when the file is created. The Flume source code sets the file name as follows:

public class PathManager {

  private long seriesTimestamp;
  private File baseDirectory;
  private AtomicInteger fileIndex;

  private File currentFile;

  public PathManager() {
    seriesTimestamp = System.currentTimeMillis();
    fileIndex = new AtomicInteger();
  }

  public File nextFile() {
    currentFile = new File(baseDirectory, seriesTimestamp + "-"
        + fileIndex.incrementAndGet());

    return currentFile;
  }

  public File getCurrentFile() {
    if (currentFile == null) {
      return nextFile();
    }

    return currentFile;
  }

We could write a Java module to override the Flume PathManager class; however, that is beyond the scope of this article.

For more information on setting up Apache Flume Agents, refer to the Apache documentation.

Start the Flume Agent

We are now ready to start the Flume Agent. In my /home/oracle/flume directory, I created a shell script to start the agent, with the following commands:

Flume Agent shell

After creating the output directory /u01/flumeOut/movidemo/movie, execute the script to start the agent. Upon a successful agent startup, you will see output similar to the following on the terminal screen:

Agent Start

You will also see the files created by the File Roll Sink in the output directory.

File Roll Files

The File Roll Sink will create new files at the interval specified in the Flume Agent configuration file, even when there is no Oracle GoldenGate activity.

Start Oracle GoldenGate and Test

To start and test our configuration, go to the target Oracle GoldenGate instance and make sure the Oracle GoldenGate Manager is in the RUNNING state.

Target OGG Manager

Go to the source Oracle GoldenGate instance, start GGSCI and then start the Extract and Extract Data Pump (if they not already running).

In the Oracle GoldenGate target, start the rflume Replicat. Executing the command status rflume should show it in the RUNNING state.

OGG RFLUME Start

The Flume Agent will show the Replicat connect to the Flume Source.

Agent Connect

Now let’s generate some data in the Oracle Database and verify it flows through to the Flume File Roll Sink. In SQL Developer, I connect to the Oracle Pluggable Database and execute the following:

INSERT INTO "MOVIEDEMO"."MOVIE" (MOVIE_ID, TITLE, YEAR, BUDGET, GROSS, PLOT_SUMMARY) VALUES ('1173971','Jack Reacher: Never Go Back', '2016','96000000','0','Jack Reacher must uncover the truth behind a major government conspiracy in order to clear his name. On the run as a fugitive from the law, Reacher uncovers a potential secret from his past that could change his life forever.');
INSERT INTO "MOVIEDEMO"."MOVIE" (MOVIE_ID, TITLE, YEAR, BUDGET, GROSS, PLOT_SUMMARY) VALUES ('1173972','Boo! A Madea Holloween', '2016','0','0','Madea winds up in the middle of mayhem when she spends a haunted Halloween fending off killers, paranormal poltergeists, ghosts, ghouls and zombies while keeping a watchful eye on a group of misbehaving teens.');
INSERT INTO "MOVIEDEMO"."MOVIE" (MOVIE_ID, TITLE, YEAR, BUDGET, GROSS, PLOT_SUMMARY) VALUES ('1173973','Like Stars on Earth', '2007','0','1204660','An eight-year-old boy is thought to be lazy and a trouble-maker, until the new art teacher has the patience and compassion to discover the real problem behind his struggles in school.');
commit;

Executing the GGSCI command view report rflume on the Oracle GoldenGate target, shows that data was captured and sent to the Oracle GoldenGate Replicat.

RFLUME Report

We will also see a file containing data in out Flume File Roll Sink output directory:

FRS DirectoryThis is a delimited text file, so we can view the contents of the file using cat.

flume frs output

We now have data queued in Apache Flume ready for further processing.

Summary

In this article we demonstrated the functionality of Oracle GoldenGate to capture database transactions and apply this data to an Apache Flume Agent configured with a File Roll Sink.

For more information on what other articles are available for Oracle GoldenGate please view our index page.

Add Your Comment