This blog covers a subset of REST APIs that are used by the Data Extraction tool. Although these APIs are not owned by Data Extraction, this post focuses on their use within the tool while related documentation is being updated. The blog title may be revised in the future to better reflect the broader scope.

Data extraction pipelines often start as manual workflows: define an extract, run it, wait for completion, download the output, unpack files, and upload them somewhere durable for downstream consumption.

This post walks through a practical pattern for automating a data extract from Fusion and landing the resulting files in OCI Object Storage. The workflow uses OAuth-based API access, creates an extract definition and extract group, submits a Batch job, monitors the job, downloads the output files, extracts ZIP archives, and uploads the final data files to an OCI Object Storage bucket.

What the pipeline does

At a high level, the pipeline performs the following steps:

  1. Load configuration and credentials securely.
  2. Request OAuth access tokens for the data extraction APIs.
  3. Create a extract definition.
  4. Create a extract group.
  5. Submit a Batch Processing for the extract.
  6. Poll the Batch Processing job until it completes.
  7. List the output files produced by the job.
  8. Download the output files.
  9. Extract ZIP files.
  10. Upload the extracted files to OCI Object Storage.

Architecture overview

The pipeline connects three main systems:

Data Export API
Used to create the extract definition and group.

Batch Processing API
Used to submit and monitor the extract execution job, and to retrieve generated output files.

OCI Object Storage
Used as the durable landing zone for the extracted data.

A simplified flow looks like this:

Prerequisites

Before you start, make sure you have:

  • Access to the Fusion environment and its base URL.
  • Oauth Confidential Applications created
  • IDCS OAuth details: host, client ID, client secret, username, and password.
  • Permission to create extract definitions and submit Batch jobs.
  • An existing OCI Object Storage bucket.
  • OCI SDK/config profile with permission to upload objects to that bucket.
  • A runtime environment with network access to Fusion, IDCS, and OCI Object Storage.

1. Load configuration

Start by separating non secret configuration from sensitive credentials.

Note: I tested the code using Oracle AI Data Platform Workbench. I stored all sensitive information in the credential store.

Non secret values can be kept in environment variables or deployment configuration:

def mask(value: str, visible: int = 4) -> str:
    if not value:
        return "<empty>"
    return "*" * max(0, len(value) - visible) + value[-visible:]
BASE_URL = "https://<fusion host>.fa.ocs.oraclecloud.com"
IDCS_HOST = aidputils.secrets.get(name="DataExtractionAPIs", key="IDCS_HOST")
IDCS_CLIENT_ID = aidputils.secrets.get(name="DataExtractionAPIs", key="IDCS_CLIENT_ID")
IDCS_CLIENT_SECRET = aidputils.secrets.get(name="DataExtractionAPIs", key="IDCS_CLIENT_SECRET")
IDCS_USERNAME = aidputils.secrets.get(name="DataExtractionAPIs", key="USERNAME")
IDCS_PASSWORD = aidputils.secrets.get(name="DataExtractionAPIs", key="password")

#<fusion hostname> if your fusion hostname is: https://fa-myfusion-test-saasfaprod1.fa.ocs.oraclecloud.com/ - you will put only myfusion-test
BOF_SCOPE = os.getenv("BOF_SCOPE", "urn:opc:resource:fusion:<fusion hostname>:boss/") 
BATCH_SCOPE = os.getenv("BATCH_SCOPE", "urn:opc:resource:fusion:<fusion hostname>:saas-batch/")
BUCKET_NAME = os.getenv("OCI_BUCKET_NAME", "bucket-data")

print(f"Configured host {BASE_URL}, client {mask(IDCS_CLIENT_ID)}")

The code snippets sometimes include extra characters that weren’t part of the original code, so please pay extra attention when copying them.

2. Request OAuth tokens

The pipeline needs separate access tokens for the Data Extraction APIs. A reusable token function keeps authentication consistent:

import requests
from requests.auth import HTTPBasicAuth

session = requests.Session()
session.headers.update({"Accept": "application/json"})

class HttpError(RuntimeError):
    pass

def request_token(scope: str) -> str:
    url = f"{IDCS_HOST.rstrip('/')}/oauth2/v1/token"

    response = session.post(
        url,
        data={
            "grant_type": "password",
            "username": IDCS_USERNAME,
            "password": IDCS_PASSWORD,
            "scope": scope,
        },
        auth=HTTPBasicAuth(IDCS_CLIENT_ID, IDCS_CLIENT_SECRET),
        headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
        timeout=30,
    )

    if response.status_code != 200:
        raise HttpError(
            f"Token request failed with status {response.status_code}: "
            f"{response.text[:500]}"
        )

    token = response.json().get("access_token")
    if not token:
        raise HttpError("Token response did not include access_token")

    return token

Then request both tokens:

BOF_TOKEN = request_token(BOF_SCOPE)
BATCH_TOKEN = request_token(BATCH_SCOPE)

A helper for authenticated API calls keeps the rest of the code clean:

def auth_headers(token: str, content_type: str = "application/json") -> dict:
    return {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "Content-Type": content_type,
    }

3. Add a reusable API request helper

API automation becomes easier to maintain when error handling is centralized.

def api_request(method: str, endpoint: str, token: str, **kwargs):
    url = f"{BASE_URL.rstrip('/')}{endpoint}"

    response = session.request(
        method,
        url,
        headers=auth_headers(token),
        timeout=60,
        **kwargs,
    )

    if response.status_code >= 400:
        raise HttpError(
            f"{method} {endpoint} failed: "
            f"{response.status_code} {response.text[:500]}"
        )

    return response

This helper handles URL construction, headers, timeouts, and basic error reporting.

4. Define the extract

An extract definition describes what data should be exported and how the output should be formatted.

The key parts are:

extract_name = "ExampleCostDistributionExtract"
extract_group = f"{extract_name}Group"

owner = <owner> #usually, your username
owner_role = "<the role configured previously>"

create_extract_payload = {
    "name": extract_name,
    "description": "Data extract definition",
    "owner": owner,
    "ownerRole": owner_role,
    "exportConfiguration": {
        "type": "BV",
        "outputDataFormat": "CSV",
        "csvDelimiter": ",",
        "sortBy": None,
        "csvFormat": None,
        "historyStartDate": None,
    },
    "bossArtifacts": {
        "extractionQuery": "{\"viewQueries\":{\"costDistributionExtract\":{\"module\":\"oraScmCoreCostMgmtReviewDistribution\",\"businessObject\":\"CostDistribution\",\"view\":\"costDistributionExtract\",\"fieldAliases\":{\"accountingStatus\":\"accountingStatus\",\"additionalProcessingCode\":\"additionalProcessingCode\",\"costBook.id\":\"costBookid\",\"costOrganization.id\":\"costOrganizationid\",\"costTransactionType\":\"costTransactionType\",\"createdBy\":\"createdBy\",\"currency.currencyCode\":\"currencycurrencyCode\",\"effectiveDate\":\"effectiveDate\",\"effectiveDateCharacter\":\"effectiveDateCharacter\",\"generalLedgerDate\":\"generalLedgerDate\",\"grossMarginStatus\":\"grossMarginStatus\",\"id\":\"id\",\"layerQuantity\":\"layerQuantity\",\"ledger.ledgerId\":\"ledgerledgerId\",\"legalEntity.legalEntityId\":\"legalEntitylegalEntityId\",\"projectTransactionStatus\":\"projectTransactionStatus\",\"subledgerAccountingEvent.eventId\":\"subledgerAccountingEventeventId\",\"subledgerAccountingEventClass.entityCode\":\"subledgerAccountingEventClassentityCode\",\"subledgerAccountingEventClass.eventClassCode\":\"subledgerAccountingEventClasseventClassCode\",\"subledgerAccountingEventType.entityCode\":\"subledgerAccountingEventTypeentityCode\",\"subledgerAccountingEventType.eventClassCode\":\"subledgerAccountingEventTypeeventClassCode\",\"subledgerAccountingEventType.eventTypeCode\":\"subledgerAccountingEventTypeeventTypeCode\",\"timeCreated\":\"timeCreated\",\"timeUpdated\":\"timeUpdated\",\"transactionNumber\":\"transactionNumber\",\"unitOfMeasure.code\":\"unitOfMeasurecode\",\"updatedBy\":\"updatedBy\",\"valuationOnhandStatus\":\"valuationOnhandStatus\"}}},\"select\":\"select accountingStatus, additionalProcessingCode, costBookid, costOrganizationid, costTransactionType, createdBy, currencycurrencyCode, effectiveDate, effectiveDateCharacter, generalLedgerDate, grossMarginStatus, id, layerQuantity, ledgerledgerId, legalEntitylegalEntityId, projectTransactionStatus, subledgerAccountingEventeventId, subledgerAccountingEventClassentityCode, subledgerAccountingEventClasseventClassCode, subledgerAccountingEventTypeentityCode, subledgerAccountingEventTypeeventClassCode, subledgerAccountingEventTypeeventTypeCode, timeCreated, timeUpdated, transactionNumber, unitOfMeasurecode, updatedBy, valuationOnhandStatus from costDistributionExtract where timeUpdated between to_timestamp(:minSrcLastUpdateDate ,'YYYY-MM-DD HH24:MI:SS.FF9') and to_timestamp(:maxSrcLastUpdateDate,'YYYY-MM-DD HH24:MI:SS.FF9') and timeCreated >=  to_timestamp(:minCreationDate,'YYYY-MM-DD HH24:MI:SS.FF9') \"}"
    },
}

The extractionQuery contains the business object view, selected fields, aliases, and filter conditions. In many real world pipelines, this query is generated from metadata or maintained as a versioned configuration artifact.

For incremental style extracts, the query can include date filters such as:

where timeUpdated between to_timestamp(:minSrcLastUpdateDate, 'YYYY-MM-DD HH24:MI:SS.FF9')
  and to_timestamp(:maxSrcLastUpdateDate, 'YYYY-MM-DD HH24:MI:SS.FF9')

This keeps the extract definition reusable while allowing runtime parameters to control the extract window.

5. Create the extract definition

Submit the extract definition to the Data Export API:

create_extract_response = api_request(
    "POST",
    "/api/boss/data/objects/ora/commonBoss/dataExport/v1/exportDefinitions",
    BOF_TOKEN,
    json=create_extract_payload,
)

extract_definition_location = create_extract_response.headers.get("Location")
extract_definition_id = extract_definition_location.rstrip("/").split("/")[-1]

print("Created extract definition:", extract_definition_id)

The response location header contains the created extract definition resource ID.

6. Create the extract group

The extract group links one or more extract definitions into a runnable group.

create_group_payload = {
    "name": extract_group,
    "owner": owner,
    "ownerRole": owner_role,
    "groupedExtracts": {
        "items": [
            {
                "name": extract_name
            }
        ]
    },
}

Create the group:

create_group_response = api_request(
    "POST",
    "/api/boss/data/objects/ora/commonBoss/dataExport/v1/exportGroupDefinitions",
    BOF_TOKEN,
    json=create_group_payload,
)

export_group_location = create_group_response.headers.get("Location")
export_group_id = export_group_location.rstrip("/").split("/")[-1]

print("Created extract group:", export_group_id)

At this point, the extract metadata exists and is ready to be executed.

7. Submit the Batch Processing Job

Submit the extract through Batch Processing.

Example for Full Extract:

schedule_name = f"{extract_name}Schedule"
extract_type = "Full"
#allowed values Full/Incremental
recurrence = "Immediate" 
#allowed values Immediate/Simple/Hourly/Daily/Weekly/Monthly/Yearly

schedule_objst_payload = {
    "serviceName": "boss",
    "jobDefinitionName": "DataExport",
    "description": f"{schedule_name}::{extract_name}::{extract_type}::{recurrence}",
    "requestParameters": {
        "submit.argument1": extract_name,
        "submit.argument2": "Full Data Extract",
    },
}

Example for Incremental Extract:

schedule_name = f"{extract_name}Schedule"
extract_type = "Incremental"
recurrence = "Hourly"

schedule_objst_payload = {
  "serviceName": "boss",
  "jobDefinitionName": "DataExport",
  "description": f"{schedule_name}::{extract_name}::{extract_type}::{recurrence}",
  "requestParameters": {
    "submit.argument1": extract_name,
    "submit.argument2": "Incremental Data Extract"
  },
  "runAtTimes": {
    "fixedInterval": {
      "interval": 1, 
      "timeUnit": "HOUR"
    },
    "startDate": "2026-05-15", #Date must be in the format YYYY-MM-DD
    "endDate": "2026-05-15",
    "startTime": "1200",
    "endTime": "1500"
  }
}

The fixedInterval defines a fixed rate schedule or a fixed interval schedule, for example, every 30 minutes or 150 minutes, where the time unit could be a minute, hour, or day.

The allowed values for timeUnit are: “HOUR”, “MINUTE”, “HOUR”, “DAY”.

The startDate and endDate together define the overall time frame in which the scheduling rules should apply, and startDate should be a future time.

The startTime and endTime define the window within which a scheduled run should start when its prerequisites are all met. The default startTime is midnight at “0000” and the default endTime is “2359”. These values are given in the format of “HHMM”.

Submit the job request:

schedule_objst_response = api_request(
    "POST",
    "/api/saas-batch/jobscheduler/v1/jobRequests",
    BATCH_TOKEN,
    json=schedule_objst_payload,
)

location = (
    schedule_objst_response.headers.get("Location")
    or schedule_objst_response.headers.get("location")
)

if not location:
    raise RuntimeError("Batch Processing job request did not return a Location header")

job_request_id = location.rstrip("/").split("/")[-1]

print("Created Batch Processing jobRequestId:", job_request_id)

The jobRequestId is the key identifier used for monitoring job progress and retrieving output files.

8. Poll the Batch Processing job status

Once the job is submitted, poll the Batch Processing API until the job reaches a terminal state.

import time

terminal_statuses = {
    "SUCCEEDED",
    "COMPLETED",
    "ERROR",
    "FAILED",
    "CANCELLED",
    "PAUSED",
    "WARNING",
}

while True:
    batch_status_response = api_request(
        "GET",
        f"/api/saas-batch/jobscheduler/v1/jobRequests/{job_request_id}",
        BATCH_TOKEN,
    )

    batch_status = batch_status_response.json()

    status = (
        batch_status.get("jobStatus")
        or batch_status.get("jobDetails", {}).get("jobStatus")
        or batch_status.get("jobProgress", {}).get("status")
    )

    print(f"Batch jobRequestId {job_request_id} status {status}")

    if status in terminal_statuses:
        break

    time.sleep(10)

9. List output files

After the job completes, list the generated files:

output_files_response = api_request(
    "GET",
    f"/api/saas-batch/jobfilemanager/v1/jobRequests/{job_request_id}/outputFiles",
    BATCH_TOKEN,
)

output_files = output_files_response.json().get("items", [])

print("Found", len(output_files), "output file(s)")

Each item typically contains metadata about the file and a download link in the response context.

10. Download output files

import tempfile
from pathlib import Path
from urllib.parse import urlparse

def download_output_files(items, batch_token: str) -> list[Path]:
    paths = []
    tmp_dir = Path(tempfile.mkdtemp(prefix="bof_extract_"))

    headers = {
        "Authorization": f"Bearer {batch_token}",
        "Accept": "application/octet-stream",
    }

    for item in items:
        file_name = Path(item["fileName"]).name
        href = item["$context"]["links"]["enclosure"]["href"]

        parsed = urlparse(href)
        if parsed.scheme != "https":
            raise RuntimeError(f"Refusing non-HTTPS download URL: {href}")

        destination = tmp_dir / file_name

        with session.get(href, headers=headers, stream=True, timeout=120) as response:
            response.raise_for_status()

            with open(destination, "wb") as file:
                for chunk in response.iter_content(chunk_size=1024 * 1024):
                    if chunk:
                        file.write(chunk)

        print(f"Downloaded {file_name} to temporary storage")
        paths.append(destination)

    return paths

This function includes two important safeguards:

First, it strips any directory path from the remote filename by using Path(item["fileName"]).name. Second, it refuses non-HTTPS download URLs.

11. Extract ZIP files

import os
import zipfile

def extract_zip(zip_path: Path, extract_dir: Path) -> None:
    extract_dir.mkdir(parents=True, exist_ok=True)

    base = extract_dir.resolve()

    with zipfile.ZipFile(zip_path, "r") as zip_file:
        for member in zip_file.infolist():
            target = (extract_dir / member.filename).resolve()

            if not str(target).startswith(str(base) + os.sep):
                raise RuntimeError(
                    f"Unsafe ZIP member path blocked: {member.filename}"
                )

        zip_file.extractall(extract_dir)

Then extract downloaded ZIP files:

extract_dir = Path("/tmp/extract_output")

downloaded_paths = download_output_files(output_files, BATCH_TOKEN)

for downloaded_path in downloaded_paths:
    zip_path = Path(downloaded_path)

    if zip_path.suffix.lower() == ".zip":
        extract_zip(zip_path, extract_dir)

12. Upload files to OCI Object Storage

Use the OCI Python SDK to upload the extracted files to Object Storage.

import oci

profile = os.getenv("OCI_CONFIG_PROFILE", "DEFAULT")
config_path = os.getenv("OCI_CONFIG_PATH", "~/.oci/config")

config = oci.config.from_file(config_path, profile)
object_storage_client = oci.object_storage.ObjectStorageClient(config)

namespace = object_storage_client.get_namespace().data

Create a small upload helper:

def upload_to_object_storage(file_path: Path, object_name: str | None = None):
    object_name = object_name or file_path.name

    with open(file_path, "rb") as file:
        object_storage_client.put_object(
            namespace_name=namespace,
            bucket_name=BUCKET_NAME,
            object_name=object_name,
            put_object_body=file,
        )

    print(f"Uploaded object {object_name} to bucket {BUCKET_NAME}")

Upload all extracted files:

for file_path in extract_dir.rglob("*"):
    if file_path.is_file():
        upload_to_object_storage(file_path)

Conclusion

By replacing manual extract handling with a secure, API driven pipeline, you create a repeatable foundation for delivering Fusion data into OCI Object Storage with less operational effort and fewer handoffs.