Introduction

Oracle Data Transforms offers powerful capabilities for designing and orchestrating ETL pipelines, now accessible programmatically via its Python API. In this article, I’ll walk through a solution where we automate data pipeline creation using metadata driven logic, extracting mappings dynamically from a metadata table.

Prerequisites

  • Target tables have to be created first in Oracle Database. Currently, the auto create table based on source tables is not supported.

Solution Overview

Our approach takes metadata from an Oracle table, identifying source (BICC) and target(ADW) system/table pairs. For each pair, we create a DataFlow in the Data Transforms Python API, dynamically resolving columns, auto matching columns by name, and building data loads between the source and target tables.

Step by Step Implementation

1 – Setting up the Environment and Connections

First, we create the metadata table where we populate with the metata for connections and tables we will read and load.

CREATE TABLE metadata_info (
    id                  NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    source_connection   VARCHAR2(100)  NOT NULL,
    source_schema       VARCHAR2(100)  NOT NULL,
    source_table_name   VARCHAR2(128)  NOT NULL,
    target_connection   VARCHAR2(100)  NOT NULL,
    target_schema       VARCHAR2(100)  NOT NULL,
    target_table_name   VARCHAR2(128)  NOT NULL
);

 

Example:

metadata table

In Oracle Data Transforms I previously created the source and target connections. You need to replace by your own.

Second, we import the required libraries and set up secure connections using Oracle wallet credentials. This ensures secure and compliant database access.

WalletCreadentials.py file:

username=’ADMIN’
pwd=””
config_dir=”<path to wallet folder>”
dsn=”<adb_name>_high”

 

from datatransforms.workbench import DataTransformsWorkbench, WorkbenchConfig
from datatransforms.dataflow import DataFlow
from datatransforms.dataflow import SourceDataStore
from datatransforms.dataentity import DataEntity
from datatransforms.dataflow_load_options import OracleInsert, DataFlowIntegrationType
import logging
import oracledb
from walletCredentials import username,pwd,config_dir,dsn

# Get workbench config and connect
connect_params = WorkbenchConfig.get_workbench_config(“<password>”)
workbench = DataTransformsWorkbench()
workbench.connect_workbench(connect_params)

# Connect to Oracle
connection = oracledb.connect(
    user=username,
    password=pwd,
    dsn=dsn,
    config_dir=config_dir,
    wallet_location=config_dir,
    wallet_password=pwd
)

 

2 – Reading ETL Metadata

We query a metadata_info table to retrieve the source and target connection info for each ETL flow. This table contains columns such as source and target connection, schema, and table names.

cursor = connection.cursor()
cursor.execute(“””
    SELECT source_connection, source_schema, source_table_name,
           target_connection, target_schema, target_table_name
      FROM metadata_info
“””)
metadata_rows = cursor.fetchall()
project = “MetadataProject”

 

3 – Importing BICC Entities

For each metadata row, we:

  • Iterates through the metadata to dynamically build commands.

  • Executes a CLI tool to generate data entities based on the retrieved metadata.

 

import oracledb
from walletCredentials import username,pwd,config_dir,dsn
import subprocess
connection=oracledb.connect(
     user=username,
     password=pwd,
     dsn=dsn,
     config_dir=config_dir,
     wallet_location=config_dir,
     wallet_password=pwd)
cursor = connection.cursor()
cursor.execute(“””
    SELECT source_connection, source_schema, source_table_name
      FROM metadata_info
“””)
metadata_rows = cursor.fetchall()
for row in metadata_rows:
    source_conn, source_schema, source_table = row
    command = [
        “python”,
        “-m”,
        “datatransforms.cli.generate_data_entities”,
        “–connection”, source_conn,
        “–schema”, source_schema,
        “–matching”, source_table,
        “–live”, “true”
    ]
    result = subprocess.run(command, capture_output=True, text=True)
    print(“STDOUT:”, result.stdout)
    print(“STDERR:”, result.stderr)

4 – Dynamic DataFlow Creation

For each metadata row, we:

  • Initialize a DataFlow.
  • Define source and target data entities.
  • Dynamically resolve and map columns by matching column names.
  • Build and persist the DataFlow

for row in metadata_rows:
    source_conn, source_schema, source_table, target_conn, target_schema, target_table = row
    source_table_split = str(source_table).split(“.”)[-1]
    alias=f”{target_table}_TGT”
    flow = DataFlow(f”DF_{source_table_split}”, project)
    source_entity = flow.use(connection_name=source_conn,
                             schema_name=source_schema,
                             data_entity_name=source_table,
                             alias_name=source_table_split)
    target_entity = flow.use(connection_name=target_conn,
                             schema_name=target_schema,
                             data_entity_name=target_table,
                             alias_name=alias)

    resolved_source_column_names = {}
    source_columns = source_entity.resolved_columns()
    target_columns = target_entity.resolved_columns()

    # Reverse-lookup source column names by last segment
    source_lookup = {v.split(‘.’)[-1].upper(): v for v in source_columns.values()}
    target_lookup = {v.split(‘.’)[-1].upper(): v for v in target_columns.values()}
    manual_column_mappings = {}

    for col_name in source_lookup:
        if col_name in target_lookup:
            manual_column_mappings[target_lookup[col_name]] = source_lookup[col_name]

    override_column_mappings = {
        “column_mappings”: manual_column_mappings
    }
    load_options = DataFlowIntegrationType.append()
    load_options.update(OracleInsert.options())
    load_options.update(override_column_mappings)

    flow.from_source(source_table_split, f”{source_conn}.{source_schema}.{source_table}”) \
        .load(alias, f”{target_conn}.{target_schema}.{target_table}”, load_options)

    result = flow.create()

    print(“Data flow = ” + flow.data_flow_name + ” Project= ” + flow.project + ” Result= ” + str(result))

 

5 – Clean Up

Finally, we close the database resources:

cursor.close()
connection.close()

 

 

If you are interested to schedule these data flows you can check this A-Team blog

Conclusion

By combining metadata driven automation, dynamic column mapping, and Python API integration, organizations can streamline ETL processes, reduce errors, and enable faster, more scalable data transformations.