Introduction

Oracle BI Publisher data models often hide their most valuable implementation detail: the SQL. In this project, the source artifact is a BI Publisher `.xdm.catalog` file. The goal is to extract the embedded SQL and convert it into a Data extraction definition.

This process does that in three stages:

1. Extract SQL queries from the BI Publisher data model.

2. Send a selected SQL query to Oracle AI Agent Studio for conversion into extraction query JSON.

3. Use that generated payload to create and run a Data extract through the Batch API.

The result is a practical bridge from BIP data model SQL to an API-created Data export definition.

Workflow

The project workflow starts with a compressed BI Publisher catalog file and ends with Data and SaaS Batch API response artifacts written to disk.

Workflow

Important: Currently, only a small subset of tables are mapped to Extraction Views. For tables that are not supported, the agent cannot provide an SQL translation. As a prerequisite, customers should first verify that the tables they want to translate are supported.

Step 1: Extract SQL from the BIP Catalog

The first script is `scripts/extract_bip_xdm_queries.py`. It reads the catalog file as bytes, decompresses it, finds the embedded `<dataModel>` XML, and parses SQL nodes from normal data sets and bursting definitions.

Run it with an output directory:

python3 scripts/extract_bip_xdm_queries.py \
  bip.xdm.catalog \
  --out-dir extracted

The core decompression step is small:

def inflate_catalog(catalog_path):
    return zlib.decompress(Path(catalog_path).read_bytes()).decode(
        "utf-8",
        errors="replace",
    )

After inflation, the script finds the data model XML directly inside the payload:

def extract_data_model_xml(inflated_text):
    start = inflated_text.find("<dataModel")
    if start == -1:
        raise ValueError("Could not find a <dataModel> element in the inflated catalog payload.")

    end_tag = "</dataModel>"
    end = inflated_text.find(end_tag, start)
    if end == -1:
        raise ValueError("Found <dataModel>, but could not find the closing </dataModel> tag.")

    return inflated_text[start : end + len(end_tag)]

Once the XML is parsed, top-level data set SQL is collected from `<dataSets>/<dataSet>/<sql>`:

def top_level_data_sets(root):
    data_sets = child_named(root, "dataSets")
    if data_sets is None:
        return []

    queries = []
    for data_set in children_named(data_sets, "dataSet"):
        sql = child_named(data_set, "sql")
        if sql is None:
            continue

        queries.append(
            {
                "kind": "dataSet",
                "name": data_set.attrib.get("name", "unnamed_data_set"),
                "dataSourceRef": sql.attrib.get("dataSourceRef"),
                "sql": extract_sql(sql),
            }
        )

    return queries

Bursting queries are extracted separately because they live under `<bursting>/<burst>/<dataSet>/<sql>`:

def bursting_data_sets(root):
    bursting = child_named(root, "bursting")
    if bursting is None:
        return []

    queries = []
    for burst in children_named(bursting, "burst"):
        burst_name = burst.attrib.get("name")
        for index, data_set in enumerate(children_named(burst, "dataSet"), start=1):
            sql = child_named(data_set, "sql")
            if sql is None:
                continue

            fallback_name = f"{burst_name or 'burst'}_{index}"
            queries.append(
                {
                    "kind": "bursting",
                    "name": data_set.attrib.get("name", fallback_name),
                    "burstName": burst_name,
                    "dataSourceRef": sql.attrib.get("dataSourceRef"),
                    "sql": extract_sql(sql),
                }
            )

    return queries

For the included sample, the script writes four SQL files:

extracted/
  data_model.xml
  queries.json
  extracted_sql1.sql
  extracted_sql2.sql

The manifest in queries.json records the query kind, name, data source reference, and generated SQL file name.

Step 2: Convert SQL with Oracle AI Agent Studio

After extraction, `scripts/call_agent_studio.py` sends one SQL file to an Oracle AI Agent Studio workflow.

Run it like this:

python3 scripts/call_agent_studio.py \
  --sql-file extracted/extracted_sql1.sql \
  --out-dir extracted/agent_studio/extracted_sql1

When a SQL file is provided, the script prepends fixed instructions so the agent returns only the JSON object needed by the downstream API:

AGENT_OUTPUT_INSTRUCTIONS = """\
Convert the following SQL into the extraction query JSON format.
Return only the final BQL representation (viewQueries/select/namedSelect).
Do not include markdown, explanations, comments, or any text outside the JSON object.
Keep the output concise.
"""

The script then invokes the async Agent Studio endpoint:

def invoke_agent(args, token, message):
    url = (
        f"{args.agent_base_url.rstrip('/')}/api/fusion-ai/orchestrator/agent/v2/"
        f"{urllib.parse.quote(args.workflow_code)}/invokeAsync"
    )
    payload = {
        "conversational": True,
        "version": args.version,
        "status": args.status,
        "message": message,
    }
    return request_json(
        url,
        method="POST",
        headers={
            "Content-Type": "application/json",
            "Authorization": f"Bearer {token}",
        },
        data=json.dumps(payload).encode("utf-8"),
    )

Because the job is asynchronous, the script polls until the status is COMPLETE:

def poll_agent(args, token, job_id):
    url = (
        f"{args.agent_base_url.rstrip('/')}/api/fusion-ai/orchestrator/agent/v2/"
        f"{urllib.parse.quote(args.workflow_code)}/status/{urllib.parse.quote(job_id)}"
    )

    while time.monotonic() <= deadline:
        last_response = request_json(
            url,
            method="GET",
            headers={"Authorization": f"Bearer {token}"},
        )
        status = last_response.get("status") or "UNKNOWN"
        if status == "COMPLETE":
            return last_response
        time.sleep(args.poll_interval)


The output field can contain JSON embedded inside text, so the script scans for the first valid JSON object or array:

def extract_first_json(value):
    decoder = json.JSONDecoder()
    text = str(value)
    for index, char in enumerate(text):
        if char not in "{[":
            continue
        try:
            parsed, _ = decoder.raw_decode(text[index:])
            return parsed
        except json.JSONDecodeError:
            continue

    raise ValueError("Could not find a JSON object or array in the agent output.")

Finally, the extracted JSON is wrapped in the shape expected by the Data Tool API:

def build_Data_artifacts_payload(agent_output):
    extraction_query = extract_first_json(agent_output)
    if not isinstance(extraction_query, dict):
        raise ValueError("Agent output must contain a JSON object for extractionQuery.")

    return {
        "DataArtifacts": {
            "extractionQuery": json.dumps(extraction_query, separators=(",", ":")),
        }
    }

With `–out-dir`, this stage writes:

agent_invoke_response.json
agent_status_response.json
agent_output.json
agent_api_payload.json

The most important file for the next stage is agent_api_payload.json.

Step 3: Create and Run the Data Extract

The final script, scripts/call_Data_extract_api.py, reads extracted/agent_api_payload.json and uses it to create a Data export definition.

Run it from the project root:

python3 scripts/call_Data_extract_api.py

The script starts by loading the generated Agent Studio payload:

def load_agent_api_payload(path):
    payload = json.loads(Path(path).read_text(encoding="utf-8"))
    Data_artifacts = payload.get("DataArtifacts")
    if not isinstance(Data_artifacts, dict):
        raise ValueError("agent_api_payload.json must contain a DataArtifacts object.")

    extraction_query = Data_artifacts.get("extractionQuery")
    if not isinstance(extraction_query, str) or not extraction_query.strip():
        raise ValueError(
            "agent_api_payload.json must contain DataArtifacts.extractionQuery "
            "as a non-empty string."
        )

    return Data_artifacts

It then builds the export definition payload:

def build_create_extract_payload(extract_name, Data_artifacts):
    return {
        "name": extract_name,
        "description": EXTRACT_DESCRIPTION,
        "owner": OWNER,
        "ownerRole": OWNER_ROLE,
        "exportConfiguration": {
            "type": "BV",
            "outputDataFormat": OUTPUT_DATA_FORMAT,
            "csvDelimiter": CSV_DELIMITER,
            "sortBy": None,
            "csvFormat": None,
            "historyStartDate": None,
        },
        "DataArtifacts": Data_artifacts,
    }

Next, it creates an extract group that points to the newly created extract:

def build_create_group_payload(extract_name, extract_group):
    return {
        "name": extract_group,
        "owner": OWNER,
        "ownerRole": OWNER_ROLE,
        "groupedExtracts": {
            "items": [
                {
                    "name": extract_name,
                }
            ]
        },
    }

Then it submits a SaaS Batch job for the Data `DataExport` job definition:

def build_batch_payload(extract_name):
    schedule_name = f"{extract_name}Schedule"
    return {
        "serviceName": "Data",
        "jobDefinitionName": "DataExport",
        "description": f"{schedule_name}::{extract_name}::{BATCH_EXTRACT_TYPE}::{BATCH_RECURRENCE}",
        "requestParameters": {
            "submit.argument1": extract_name,
            "submit.argument2": BATCH_SUBMIT_ARGUMENT2,
        },
    }

The batch job is polled until it reaches a success or failure state:

def poll_batch(batch_token, job_request_id):
    while time.monotonic() <= deadline:
        last_response = api_request(
            "GET",
            f"/api/saas-batch/jobscheduler/v1/jobRequests/{urllib.parse.quote(job_request_id)}",
            batch_token,
        )
        body = last_response["body"]
        status = (
            body.get("jobStatus")
            or body.get("jobDetails", {}).get("jobStatus")
            or body.get("jobProgress", {}).get("status")
            or "UNKNOWN"
        )

        if status in BATCH_FAILURE_STATES:
            raise RuntimeError(f"Batch job {job_request_id} ended with status {status}")
        if status in BATCH_SUCCESS_STATES:
            return last_response

        time.sleep(POLL_INTERVAL_SECONDS)

If the job succeeds, the script lists the output files:

output_files_response = api_request(
    "GET",
    f"/api/saas-batch/jobfilemanager/v1/jobRequests/{urllib.parse.quote(job_request_id)}/outputFiles",
    batch_token,
)
output_files = output_files_response["body"].get("items", [])

This stage writes Data and batch request and response artifacts under `extracted/Data`.

Conclusion

This project turns a difficult to inspect BI Publisher .xdm.catalog artifact into a repeatable extraction and conversion workflow. The extractor makes the hidden SQL visible, Agent Studio transforms that SQL into the extraction-query JSON expected by Data, and the final script creates and executes the Data export through Batch.