Oracle Cloud Infrastructure (OCI) provides a secure and reliable environment for managing secrets such as passwords, API keys, and other sensitive information. In multi-region or disaster recovery setups, it is crucial to replicate secrets from a source region to a destination region to ensure continuous availability and resilience. Replicating secrets between regions ensures that your data is available even during regional failures or disasters.

In this blog post, we will explore how to replicate secrets across regions in OCI.

 

Pre-Requisite

  • OCI enforces service limits on resources in each region. Before configuring replication, ensure you have the necessary resource limits in the destination region to accommodate the replicated vault. You can refer to OCI Vault Limits for more information.

 

Step 1: Perform a One-Time Replication for Existing Vaults

To replicate existing vaults and secrets to the destination region, execute the following Python script. Ensure that the user running the script has a valid OCI configuration and sufficient permissions to manage vaults and secrets. In the script, update the user parameters to specify the compartment_id, destination vault endpoints and optionally, the destination vault name (default: backup-[VaultName]-[SourceRegion]) and destination KMS key name (default: secret-replication-key).

Secrets in OCI can only be created within vaults and for replication purposes, vaults with names like backup-[vault display name]-[region code] will be created in the destination region to store the replicated secret values. Secrets in the target vault are created with the same name and content as in the source vault, but their OCID values in the destination region will be different from those in the source region.

Note: The script does not replicate the source vault or the master encryption keys associated with it. Instead, it creates a new vault in the destination region as a placeholder to store the replicated secrets. To learn more about Key replication, refer to this page
import oci
import sys
import traceback
import logging

# User parameters
compartment_id = "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxx"
dest_vault_endpoint = "https://kms.uk-london-1.oraclecloud.com"  #https://docs.oracle.com/en-us/iaas/api/#/en/key/release/
dest_vault_secret_endpoint = "https://vaults.uk-london-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretmgmt/20180608/
dest_vault_secret_retrieval_endpoint="https://secrets.vaults.uk-london-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretretrieval/20190301/
dst_key_name = 'secret-replication-key'
config = oci.config.from_file("~/.oci/config")

def list_secrets(compartment_id, vault_client, src_vault_id, secrets_client):
    secrets_list_response = {}
    try:
        paginator = vault_client.list_secrets(compartment_id=compartment_id, vault_id=src_vault_id)
        for page in paginator.data:
                secrets_info = {}     
                secrets_info["vault_id"] = page.vault_id
                secrets_info["secret_name"] = page.secret_name
                secrets_info["lifecycle_state"] = page.lifecycle_state
                secrets_info["key_id"] = page.key_id
                secrets_info["secret_ocid"] = page.id
                secrets_info["freeform_tags"] = page.freeform_tags
                secrets_info["description"] = page.description
                secrets_info["time_of_deletion"] = paginator.data[0].time_of_deletion 
                if page.lifecycle_state == 'ACTIVE':
                    get_secret_bundle_by_name_response = secrets_client.get_secret_bundle(secret_id=page.id, stage="LATEST")
                    secrets_info["secret_bundle_content"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content
                    secrets_info["secret_bundle_content_type"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content_type
                    secrets_info["secret_stages"] = get_secret_bundle_by_name_response.data.stages
                    secrets_info["version_number"] = get_secret_bundle_by_name_response.data.version_number
                
                secrets_list_response[page.secret_name] = secrets_info

    except Exception as e:
        print(f"Error listing secrets: {e}")

    return secrets_list_response

def get_vault(kms_vault_client, compartment_id):
    vault_list = {}
    try:
        vault_resp = kms_vault_client.list_vaults(compartment_id)
        for v in vault_resp.data:
            vault_data = {}
            vault_data["display_name"] = v.display_name
            vault_data["freeform_tags"] = v.freeform_tags
            vault_data["management_endpoint"] = v.management_endpoint
            vault_data["vault_type"] = v.vault_type
            vault_data["vault_id"] = v.id
            vault_data["lifecycle_state"] = v.lifecycle_state
            vault_list[v.display_name] = vault_data
    except Exception as e:
        print(f"Error getting vaults: {e}")

    return vault_list

def create_vault(compartment_id, vault_info, kms_vault_client_composite):    
    try:        
        dest_vault_name = "Backup-" + vault_info["display_name"] + "-" + vault_info["vault_id"].split('.')[-3]
        print("  Creating vault {} in {} compartment".format(dest_vault_name, compartment_id))
        vault_details = oci.key_management.models.CreateVaultDetails(
            compartment_id=compartment_id,
            vault_type=vault_info["vault_type"],
            display_name=dest_vault_name,
            freeform_tags=vault_info["freeform_tags"]
        )

        response = kms_vault_client_composite.create_vault_and_wait_for_state(
            vault_details,
            wait_for_states=[oci.key_management.models.Vault.LIFECYCLE_STATE_ACTIVE]
        )
        return response
    except Exception as e:
        print(f"Error creating vault: {e}")
        return None

def create_secret(vaults_management_client_composite, compartment_id, vault_id, key_id, secret_details):
    try:
        if 'PENDING' in secret_details['secret_stages']:
            print("Latest version of Secret {} is in 'Pending' stage and it will be replicated as Active version".format(secret_details["secret_name"]))
            
        secret_content_details = oci.vault.models.Base64SecretContentDetails(
            content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
            name=secret_details["secret_name"],
            stage='CURRENT',
            content=secret_details['secret_bundle_content']
        )

        secrets_details = oci.vault.models.CreateSecretDetails(
            compartment_id=compartment_id,
            description=secret_details['description'],
            secret_content=secret_content_details,
            secret_name=secret_details["secret_name"],
            vault_id=vault_id,
            key_id=key_id,
            freeform_tags=secret_details["freeform_tags"]
        )

        response = vaults_management_client_composite.create_secret_and_wait_for_state(
            create_secret_details=secrets_details,
            wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
        )
        return response
    except Exception as e:
        print(f"Error creating secret: {e}")
        return None

def create_key(key_mgmt_composite, dst_key_name, compartment_id):
    try:
        print("Creating KMS key {} in compartment {}.".format(dst_key_name, compartment_id))

        key_shape = oci.key_management.models.KeyShape(algorithm="AES", length=32)
        key_details = oci.key_management.models.CreateKeyDetails(
            compartment_id=compartment_id,
            display_name=dst_key_name,
            key_shape=key_shape
        )

        response = key_mgmt_composite.create_key_and_wait_for_state(
            key_details,
            wait_for_states=[oci.key_management.models.Key.LIFECYCLE_STATE_ENABLED]
        )
        return response
    except Exception as e:
        print(f"Error creating key: {e}")
        return None

def list_keys(key_management_client,compartment_id):
    try:
        list_keys_response = key_management_client.list_keys(compartment_id=compartment_id)            
    except Exception as e:
        print(f"An error occurred: {e}")
    return list_keys_response

def update_secret(vaults_management_client_composite,secret_details,secret_id):
    try:
        Stage='CURRENT'
        if 'PENDING' in secret_details['secret_stages']:
            logging.getLogger().info(f"[INFO]Latest version of Secret {secret_details['secret_name']} is in 'Pending' stage")
            Stage='PENDING'
        secret_content_details = oci.vault.models.Base64SecretContentDetails(
            content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
            stage=Stage,
            content=secret_details['secret_bundle_content']
        )
        secrets_details = oci.vault.models.UpdateSecretDetails(
            description=secret_details['description'],
            secret_content=secret_content_details
        )
        response = vaults_management_client_composite.update_secret_and_wait_for_state(
        secret_id=secret_id,
        update_secret_details=secrets_details,
        wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
        )
        return response
    except Exception as e:
        logging.getLogger().info(f"[Error]update_secret - {e}")
        return None

def schedule_secret_deletion(dst_secrets_client,dst_vaults_client, secret_id, deletion_time):
    print("Deleting a secret")
    #Get secret info to get time of deletion    
    schedule_secret_deletion_response = dst_vaults_client.schedule_secret_deletion(
        secret_id=secret_id,
        schedule_secret_deletion_details=oci.vault.models.ScheduleSecretDeletionDetails(
            time_of_deletion=deletion_time))


try:
    # Configure OCI clients
    src_secrets_client = oci.secrets.SecretsClient(config)
    dst_secrets_client = oci.secrets.SecretsClient(config,service_endpoint=dest_vault_secret_retrieval_endpoint)
    src_kms_vault_client = oci.key_management.KmsVaultClient(config)
    src_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(src_kms_vault_client)
    dst_kms_vault_client = oci.key_management.KmsVaultClient(config, service_endpoint=dest_vault_endpoint)
    dst_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(dst_kms_vault_client)
    src_vault_client = oci.vault.VaultsClient(config)
    dst_vaults_client = oci.vault.VaultsClient(config, service_endpoint=dest_vault_secret_endpoint)
    dst_vaults_management_client_composite = oci.vault.VaultsClientCompositeOperations(dst_vaults_client)

    # Step 1: Getting source Vault details
    print("Getting source Vault information.")
    src_vault_list = get_vault(src_kms_vault_client, compartment_id)
    #ensure the vault is not present in destination region. 
    dst_vault_list = get_vault(dst_kms_vault_client, compartment_id)     
    # Step 2: Creating vaults in the secondary region
    for src_vault_info in src_vault_list.values():
        src_list_secrets_response = list_secrets(compartment_id, src_vault_client, src_vault_info["vault_id"], src_secrets_client) 
        if src_vault_info["lifecycle_state"] != 'ACTIVE' or not any(secret["lifecycle_state"] == 'ACTIVE' for secret in src_list_secrets_response.values()):
            #avoid creating duplicate Vaults
            print(f"[Info] No active secrets in {src_vault_info['display_name'] } vault. Skipping vault creation in destination region.")
            continue  
        
        print(f"Processing {src_vault_info['display_name']} Vault")
        dest_vault_name = "Backup-" + src_vault_info["display_name"] + "-" + src_vault_info["vault_id"].split('.')[-3]
        dst_vault_presence=False 
        for vault_name,vault_info in dst_vault_list.items():
            if vault_name == dest_vault_name:
                dst_vault_presence=True
                break 
        
        if dst_vault_presence == True:
            ext_dst_vault=dst_vault_list[dest_vault_name]
            dst_vault_id=ext_dst_vault["vault_id"]
            dst_vault_management_endpoint=ext_dst_vault["management_endpoint"]
            print("[Warning] Vault {} exists in Dest region {}".format(ext_dst_vault['display_name'], ext_dst_vault['vault_id'].split('.')[-3]))
            # Step 2.1: Validating KMS key in the destination to avoid duplicates                
            dst_vault_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=dst_vault_management_endpoint)
            dst_list_keys=list_keys(dst_vault_management_client,compartment_id)
            key_flag=False
            for key_dtls in dst_list_keys.data:
                if key_dtls.display_name == dst_key_name and key_dtls.lifecycle_state == 'ENABLED':
                    dst_key_id=key_dtls.id
                    key_flag=True
                    break
            if  key_flag == False:
                # Step 2.1: Creating KMS key in the destination                
                dst_vault_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=dst_vault_management_endpoint)
                dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
                key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
                dst_key_id = key.id
            else:
                print("[Warning] Key {} exists in Dest region".format(dst_key_name))
        else:
            dst_vault = create_vault(compartment_id, src_vault_info, dst_kms_vault_client_composite).data
            print("Created Vault {} in Dest region {}".format(dst_vault.display_name, dst_vault.id.split('.')[-3]))
            dest_vault_create=True
            dst_vault_id=dst_vault.id
            dst_vault_management_endpoint=dst_vault.management_endpoint
            # Step 2.1: Creating KMS key in the destination                
            dst_vault_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=dst_vault_management_endpoint)
            dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
            key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
            dst_key_id = key.id

        # Step 3: Replicating secrets from source to destination
        print("Retrieving secrets from Source vault {}".format(src_vault_info["display_name"]))            
        dst_list_secrets_response = list_secrets(compartment_id, dst_vaults_client, dst_vault_id, dst_secrets_client)
        for secret_name, secret_dtls in src_list_secrets_response.items():
            #Avoid creating duplicate secrets
            dst_secret_presence=False
            for dst_secret_name,dst_secret_dtls in dst_list_secrets_response.items():
                if dst_secret_dtls["secret_name"] == secret_name:
                    dst_secret_presence=True
                    dst_secret_id=dst_secret_dtls["secret_ocid"]
                    dst_secret_state=dst_secret_dtls["lifecycle_state"]
                    break
                    
            if dst_secret_presence == False and secret_dtls["lifecycle_state"] == 'ACTIVE':
                create_secret_key_response = create_secret(dst_vaults_management_client_composite, compartment_id, dst_vault_id, dst_key_id, secret_dtls).data
                print("Created Secret {}".format(create_secret_key_response.secret_name))
            elif dst_secret_presence == True and secret_dtls["lifecycle_state"] == 'ACTIVE':
                update_secret(dst_vaults_management_client_composite,secret_dtls,dst_secret_id)  
                print("[Info]  Secret {} already present in dest vault, updating the secret".format(secret_name))
            elif dst_secret_presence == True and secret_dtls["lifecycle_state"] == 'PENDING_DELETION' :                    
                if dst_secret_state == 'ACTIVE':
                    schedule_secret_deletion(dst_secrets_client,dst_vaults_client, dst_secret_id, secret_dtls['time_of_deletion'])
                    print("[Info]  Marking Secret {} for deletion at {} ".format(secret_name,secret_dtls['time_of_deletion'])   )     
                else:
                    print("[Info]  Secret {} is scheduled for deletion on {}. Skipping Replication ".format(secret_name,secret_dtls['time_of_deletion'])   ) 
            else:
                print("[Error] Secret {} is in {} state. Skipping Replication".format(secret_name,secret_dtls['lifecycle_state']))
        
except Exception as e:
    print(f"An error occurred: {e}")
        # Display full error details using traceback
    exc_type, exc_value, exc_traceback = sys.exc_info()
    traceback_details = {
        'filename': exc_traceback.tb_frame.f_code.co_filename,
        'lineno': exc_traceback.tb_lineno,
        'name': exc_traceback.tb_frame.f_code.co_name,
        'type': exc_type.__name__,
        'message': str(exc_value)
    }
    print("Exception Details:")
    for key, value in traceback_details.items():
        print(f"{key}: {value}")
    traceback.print_exception(exc_type, exc_value, exc_traceback)


This script will list all active secrets from the source vault, replicate them to the destination region, and create a new vault if necessary.

 

Step 2: Automating Secret Replication Across Regions with Event-Driven Architecture

Once the existing vaults and secrets are replicated, you can automate the replication of future vaults and secrets by setting up an event-driven architecture using OCI Cloud Native services.

 

Step 2.1: Grant Necessary Permissions

First, ensure that the OCI Function has the required permissions to manage vaults and secrets and that Connector Hub can invoke the OCI Function.

  • Create Dynamic Groups in your Identity Domain for functions that will execute the replication logic.
    • Group Name – Per your Organization standards (for example Sec-sc-fn-policy)
    • Matching Rules
      • ALL {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..xxxxxxxxxxxxxxx}
        

 

  • Add identity Policies to grant OCI Functions necessary permissions to replicate secrets.
    • Allow dynamic-group sec-sc-fn-policy to manage keys in compartment CompartmentName
      Allow dynamic-group sec-sc-fn-policy to manage vaults in compartment CompartmentName
      Allow dynamic-group sec-sc-fn-policy to manage secret-family in compartment CompartmentName
      Allow dynamic-group sec-sc-fn-policy to manage repos in compartment CompartmentName
  • Create another Identity Policy to grant Connector Hub permissions to invoke OCI Function
    • allow any-user to use fn-function in compartment id ocid1.compartment.oc1..XXXXXXXX where all {request.principal.type='serviceconnector', request.principal.compartment.id='ocid1.compartment.oc1..XXXXXXXXX}
Note: Make sure to replace the placeholder compartment id with the actual OCID of your compartment.

 

Step 2.2 – Configure OCI Function

  1. Create a custom OCI Function to handle replication events. Below is a basic configuration for the Function. For additional details on creating OCI Function refer to this page

func.yaml

schema_version: 20180708
name: func-replicate-secrets
version: 0.0.1
runtime: python
build_image: fnproject/python:3.11-dev
run_image: fnproject/python:3.11
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256

requirements.txt

fdk
requests
oci

func.py

import io
import oci
import json
from datetime import datetime, timedelta
import logging
from fdk import response

def extract_info_from_logs(logs):

    log_entry = logs[0].get("data", {})
    compartment_id = log_entry.get("compartmentId")
    event_name = log_entry.get("eventName")
    secret_id = log_entry.get("resourceId")
    freeform_Tags=log_entry.get("freeformTags")
    secret_name=log_entry.get("source")

    return compartment_id,event_name,secret_id,freeform_Tags,secret_name

def get_vault(kms_vault_client, vault_id):
    vault_list = {}
    try:
        vault_resp = kms_vault_client.get_vault(vault_id=vault_id)
    except Exception as e:
        logging.getLogger().info(f"[Error]get_vault - Error : {e}")

    return vault_resp

def list_vault(kms_vault_client, compartment_id,vault_name):
    try:
        vault_resp = kms_vault_client.list_vaults(compartment_id)
        vault_data = {}
        for v in vault_resp.data:            
            if v.display_name == vault_name and v.lifecycle_state == 'ACTIVE':                
                vault_data["display_name"] = v.display_name
                vault_data["freeform_tags"] = v.freeform_tags
                vault_data["management_endpoint"] = v.management_endpoint
                vault_data["vault_type"] = v.vault_type
                vault_data["vault_id"] = v.id
                vault_data["lifecycle_state"] = v.lifecycle_state
                break
    except Exception as e:
        logging.getLogger().info(f"[Error]list_vault - {e}")

    return vault_data

def get_secret (secrets_client,vault_client,secret_id):
    secrets_info = {}
    secret_data=vault_client.get_secret(secret_id=secret_id)  
    secrets_info["vault_id"] = secret_data.data.vault_id
    secrets_info["secret_name"] = secret_data.data.secret_name
    secrets_info["lifecycle_state"] = secret_data.data.lifecycle_state
    secrets_info["key_id"] = secret_data.data.key_id
    secrets_info["secret_ocid"] = secret_data.data.id
    secrets_info["freeform_tags"] = secret_data.data.freeform_tags
    secrets_info["description"] = secret_data.data.description   
    #secret_bundle_content library will only work for Active Secrets
    if secret_data.data.lifecycle_state == 'ACTIVE':        
        get_secret_bundle_by_name_response = secrets_client.get_secret_bundle(secret_id=secret_id, stage="LATEST")
        secrets_info["secret_bundle_content"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content
        secrets_info["secret_bundle_content_type"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content_type
        secrets_info["secret_stages"] = get_secret_bundle_by_name_response.data.stages
        secrets_info["version_number"] = get_secret_bundle_by_name_response.data.version_number  
    elif secret_data.data.lifecycle_state == 'PENDING_DELETION':        
        secrets_info["time-of-deletion"] = secret_data.data.time_of_deletion  
    else:
        logging.getLogger().info(f"[Error] Secret {secret_data.data.secret_name} is in {secret_data.data.lifecycle_state} State")
    
    return secrets_info

def create_vault(compartment_id, vault_info, kms_vault_client_composite):    
          
        logging.getLogger().info(f"[Info] Vault info {vault_info.data}")
        dest_vault_name = "Backup-" + vault_info.data.display_name + "-" + vault_info.data.id.split('.')[-3]
        logging.getLogger().info(f"[Info] Creating vault {dest_vault_name} in {compartment_id} compartment")
        vault_details = oci.key_management.models.CreateVaultDetails(
            compartment_id=compartment_id,
            vault_type=vault_info.data.vault_type,
            display_name=dest_vault_name,
            freeform_tags=vault_info.data.freeform_tags
        )

        response = kms_vault_client_composite.create_vault_and_wait_for_state(
            vault_details,
            wait_for_states=[oci.key_management.models.Vault.LIFECYCLE_STATE_ACTIVE]
        )
        return response

def get_secret_by_name(compartment_id, vault_client,secret_name, vault_id, secrets_client):
    paginator = vault_client.list_secrets(compartment_id=compartment_id, vault_id=vault_id,name=secret_name)   
    secrets_info = {}
    if paginator.data:
        secrets_info['vault_id'] = paginator.data[0].vault_id
        secrets_info["secret_name"] = paginator.data[0].secret_name
        secrets_info["lifecycle_state"] = paginator.data[0].lifecycle_state
        secrets_info["key_id"] = paginator.data[0].key_id
        secrets_info["secret_ocid"] = paginator.data[0].id
        secrets_info["freeform_tags"] = paginator.data[0].freeform_tags
        secrets_info["description"] = paginator.data[0].description 
        secrets_info["time_of_deletion"] = paginator.data[0].time_of_deletion        
        if paginator.data[0].lifecycle_state == 'ACTIVE':                
            get_secret_bundle_by_name_response = secrets_client.get_secret_bundle(secret_id=paginator.data[0].id, stage="LATEST")
            secrets_info["secret_bundle_content"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content
            secrets_info["secret_bundle_content_type"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content_type
            secrets_info["secret_stages"] = get_secret_bundle_by_name_response.data.stages
            secrets_info["version_number"] = get_secret_bundle_by_name_response.data.version_number                             
        else:                
            logging.getLogger().info(f"[Warning] Secret {paginator.data[0].secret_name} is in {paginator.data[0].lifecycle_state} state and cannot be replicated or updated")
       
    return secrets_info

def create_secret(vaults_management_client_composite, compartment_id, vault_id, key_id, secret_details):
    try:
        if 'PENDING' in secret_details['secret_stages']:
            logging.getLogger().info(f"[Info]Latest version of Secret {secret_details['secret_name']} is in 'Pending' stage and it will be replicated as Active version")
        else:    
            secret_content_details = oci.vault.models.Base64SecretContentDetails(
                content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
                name=secret_details["secret_name"],
                stage='CURRENT',
                content=secret_details['secret_bundle_content']
            )

            secrets_details = oci.vault.models.CreateSecretDetails(
                compartment_id=compartment_id,
                description=secret_details['description'],
                secret_content=secret_content_details,
                secret_name=secret_details["secret_name"],
                vault_id=vault_id,
                key_id=key_id,
                freeform_tags=secret_details["freeform_tags"]
            )

            response = vaults_management_client_composite.create_secret_and_wait_for_state(
                create_secret_details=secrets_details,
                wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
            )
        return response
    except Exception as e:
        logging.getLogger().info(f"[Error] Creating secret: {e}")
        return None

def update_secret(vaults_management_client_composite,secret_details,secret_id):
    try:
        Stage='CURRENT'
        if 'PENDING' in secret_details['secret_stages']:
            logging.getLogger().info(f"[INFO]Latest version of Secret {secret_details['secret_name']} is in 'Pending' stage")
            Stage='PENDING'
        secret_content_details = oci.vault.models.Base64SecretContentDetails(
            content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
            stage=Stage,
            content=secret_details['secret_bundle_content']
        )
        secrets_details = oci.vault.models.UpdateSecretDetails(
            description=secret_details['description'],
            secret_content=secret_content_details
        )
        response = vaults_management_client_composite.update_secret_and_wait_for_state(
        secret_id=secret_id,
        update_secret_details=secrets_details,
        wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
        )
        return response
    except Exception as e:
        logging.getLogger().info(f"[Error]update_secret - {e}")
        return None

def create_key(key_mgmt_composite, dst_key_name, compartment_id):
    try:
        key_shape = oci.key_management.models.KeyShape(algorithm="AES", length=32)
        key_details = oci.key_management.models.CreateKeyDetails(
            compartment_id=compartment_id,
            display_name=dst_key_name,
            key_shape=key_shape
        )

        response = key_mgmt_composite.create_key_and_wait_for_state(
            key_details,
            wait_for_states=[oci.key_management.models.Key.LIFECYCLE_STATE_ENABLED]
        )
        return response
    except Exception as e:
        logging.getLogger().info(f"[Error]create_key - {e}")
        return None

def list_keys(key_management_client,compartment_id):
    try:
        list_keys_response = key_management_client.list_keys(compartment_id=compartment_id)            
    except Exception as e:
        logging.getLogger().info(f"[Error]list_keys - An error occurred: {e}")
    return list_keys_response

def schedule_secret_deletion(dst_vaults_client, secret_id, deletion_time):    
    result_date = datetime.now() + timedelta(days=44) #Since secret info on pending delete secret cannot be retireved, set default delete to 44 days
    logging.getLogger().info(f"[Info]Deleting a secret {secret_id} and delete date is set to {result_date}")
    schedule_secret_deletion_response = dst_vaults_client.schedule_secret_deletion(
        secret_id=secret_id,
        schedule_secret_deletion_details=oci.vault.models.ScheduleSecretDeletionDetails(
            time_of_deletion=result_date))
    return schedule_secret_deletion    

def cancel_secret_deletion(dst_vaults_client, secret_id):
    cancel_secret_deletion_response = dst_vaults_client.cancel_secret_deletion(
    secret_id=secret_id)
    return cancel_secret_deletion_response.headers

def schedule_secret_deletion(dst_secrets_client,dst_vaults_client, secret_id, deletion_time):
    print("Deleting a secret")
    #Get secret info to get time of deletion
    dst_secret_info=get_secret(dst_secrets_client,dst_vaults_client,secret_id=secret_id)
    schedule_secret_deletion_response = dst_vaults_client.schedule_secret_deletion(
        secret_id=secret_id,
        schedule_secret_deletion_details=oci.vault.models.ScheduleSecretDeletionDetails(
            time_of_deletion=deletion_time))

def cance_secret_deletion(dst_vaults_client, secret_id):
    cancel_secret_deletion_response = dst_vaults_client.cancel_secret_deletion(
    secret_id=secret_id)
    return cancel_secret_deletion_response.headers

def handler(ctx, data: io.BytesIO=None):
    try:
        
        #Limit functions logs
        logger = logging.getLogger()
        logger.setLevel(logging.INFO)
        # Parse Connector Hub response and get the key information
        logs = json.loads(data.getvalue())
        compartment_id,src_event_name,src_secret_id,freeform_Tags,src_secret_name = extract_info_from_logs(logs)
        logging.getLogger().info(f"[INFO] Variables from Connector Hub - Compartment is {compartment_id}, event name is {src_event_name},secret_id is {src_secret_id},freeform_Tags is {freeform_Tags},secret_name is {src_secret_name} ")
        # Create OCI signer using resource principals
        signer = oci.auth.signers.get_resource_principals_signer()
        """
        Set defaults and hard code values as needed. For advanced users, store the data in a JSON config file and dynamically pull them
        """
        dst_key_name = 'secret-replication-key'
        dest_vault_endpoint = "https://kms.ca-toronto-1.oraclecloud.com"  #https://docs.oracle.com/en-us/iaas/api/#/en/key/release/
        dest_vault_secret_endpoint = "https://vaults.ca-toronto-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretmgmt/20180608/
        dest_vault_secret_retrieval_endpoint="https://secrets.vaults.ca-toronto-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretretrieval/20190301/
        dst_secrets_client = oci.secrets.SecretsClient(config={},signer=signer,service_endpoint=dest_vault_secret_retrieval_endpoint)
        src_secrets_client = oci.secrets.SecretsClient(config={},signer=signer)
        src_kms_vault_client = oci.key_management.KmsVaultClient(config={},signer=signer)
        src_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(src_kms_vault_client)
        dst_kms_vault_client = oci.key_management.KmsVaultClient(config={},signer=signer, service_endpoint=dest_vault_endpoint)
        dst_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(dst_kms_vault_client)
        src_vault_client = oci.vault.VaultsClient(config={},signer=signer)
        dst_vaults_client = oci.vault.VaultsClient(config={},signer=signer, service_endpoint=dest_vault_secret_endpoint)
        dst_vaults_management_client_composite = oci.vault.VaultsClientCompositeOperations(dst_vaults_client)

        #get source vault and secret info
        src_secret_info=get_secret(src_secrets_client,src_vault_client,secret_id=src_secret_id)
        src_vault_info=get_vault(src_kms_vault_client, vault_id=src_secret_info["vault_id"])
        logging.getLogger().info(f"[INFO] Source Secret id is {src_secret_id} and Vault ID is {src_secret_info['vault_id']} ")

        #validate destination vault info
        dest_vault_name = "Backup-" + src_vault_info.data.display_name + "-" + src_vault_info.data.id.split('.')[-3]
        dst_vault_info=list_vault(dst_kms_vault_client, compartment_id=compartment_id,vault_name=dest_vault_name)
        logging.getLogger().info(f"[INFO] Destination vault info {dst_vault_info}")

        #If vault exists
        if dst_vault_info:    
            dst_vault_id=dst_vault_info["vault_id"]
            dst_vault_management_endpoint=dst_vault_info["management_endpoint"]
            logging.getLogger().info(f"[INFO] Dest Vault ID {dst_vault_id}")
            #create/validate KMS key only for Create or Update secret operations. For all other, we dont need this info
            if src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret':
                # Step 2.1: Validating KMS key in the destination to avoid duplicates 
                # you will need key id only for creating or updating secrets               
                dst_vault_management_client = oci.key_management.KmsManagementClient(config={},signer=signer, service_endpoint=dst_vault_management_endpoint)
                dst_list_keys=list_keys(dst_vault_management_client,compartment_id)
                key_flag=False
                for key_dtls in dst_list_keys.data:                        
                    if key_dtls.display_name == dst_key_name and key_dtls.lifecycle_state == 'ENABLED':
                        dst_key_id=key_dtls.id
                        key_flag=True # KMS Key with same name exists. No need to create new one
                        logging.getLogger().info(f"[INFO] KMS Key {dst_key_id} with same name exists. No need to create new one")
                        break     
                if  key_flag == False:
                    # Step 2.1: Creating KMS key in the destination                
                    dst_vault_management_client = oci.key_management.KmsManagementClient(config={},signer=signer, service_endpoint=dst_vault_management_endpoint)
                    dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
                    key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
                    dst_key_id = key.id
                    logging.getLogger().info(f"[INFO] KMS Key {dst_key_id} doesnt exist, Created new key")

            #For existing vaults, ensure there are no duplicate secrets
            dst_secret_presence=False
            resp_get_secret_by_name = get_secret_by_name(compartment_id, dst_vaults_client,src_secret_info["secret_name"], dst_vault_id, dst_secrets_client)
            if resp_get_secret_by_name:
                dst_secret_presence=True # secret with same name exists in destination vault
                dst_secret_id=resp_get_secret_by_name["secret_ocid"]
                logging.getLogger().info(f"[INFO] Secret {resp_get_secret_by_name['secret_name']} exists in destination and OCID is {dst_secret_id}")
 
            if dst_secret_presence == False and (src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret'):
                create_secret_key = create_secret(dst_vaults_management_client_composite, compartment_id, dst_vault_id, dst_key_id, src_secret_info).data
                logging.getLogger().info(f"[INFO] Created Secret {create_secret_key.secret_name}")
            elif dst_secret_presence == True and (src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret'):
                logging.getLogger().info(f"[Warning]  Secret {src_secret_info['secret_name']} already present in dest vault. Updating the secret {resp_get_secret_by_name['secret_name']}")
                update_secret_key_response = update_secret(dst_vaults_management_client_composite,src_secret_info,dst_secret_id)         
            elif src_event_name == 'CancelSecretDeletion':
                cance_secret_deletion(dst_vaults_client, dst_secret_id)
                logging.getLogger().info(f"[Info] CancelSecretDeletion operation executed on {resp_get_secret_by_name['secret_name']}")
            elif src_event_name == 'ScheduleSecretDeletion':
                schedule_secret_deletion(dst_secrets_client,dst_vaults_client, dst_secret_id, resp_get_secret_by_name['time_of_deletion'])
                logging.getLogger().info(f"[Info] ScheduleSecretDeletion operation executed on {resp_get_secret_by_name['secret_name']} and time of delettion is {resp_get_secret_by_name['time_of_deletion']}")                
            else:
                #handle any other state(if there is one) here
                logging.getLogger().info(f"[Info] This code works only for create,update and cancelScheduleDelete events.All other Events will be discarded")
        else:
            logging.getLogger().info(f"[Info] Vault dest_vault_name doest exist in destination region. New Vault will be created")
            if src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret':
                #destination vault is missing and only allowed operations are Create Secret and Update secret    
                dst_new_vault_info=create_vault(compartment_id=compartment_id, vault_info=src_vault_info, kms_vault_client_composite=dst_kms_vault_client_composite).data
                dst_vault_id=dst_new_vault_info.id
                vault_exists = False
                logging.getLogger().info(f"[Info] Created Vault {dst_new_vault_info.display_name} in Dest region")
                dest_vault_create=True
                dst_vault_id=dst_new_vault_info.id
                dst_vault_management_endpoint=dst_new_vault_info.management_endpoint
                # Step 2.1: Creating KMS key in the destination                
                dst_vault_management_client = oci.key_management.KmsManagementClient(config={},signer=signer, service_endpoint=dst_vault_management_endpoint)
                dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
                key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
                dst_key_id = key.id
                # since its a new vault, no secrets will be present. Create the secret irrespective of its source state.
                create_secret_key_response = create_secret(dst_vaults_management_client_composite, compartment_id, dst_vault_id, dst_key_id, src_secret_info).data
            else:
                logging.getLogger().info(f"[Error] Vault doesnt exist in the destination region and secret deletion or Cancel Secret deletion will be ignored")
    except (Exception, ValueError) as ex:
        logging.getLogger().error(f"[Error] An error occurred: {ex}", exc_info=True)
        return       

Note: Ensure that the destination service endpoints for the OCI clients (such as SecretsClient, KmsVaultClient, and VaultsClient) are updated in Func.py code based on the specific region to which the secrets are being replicated.

Step 2.3 – Configure Connector Hub to listen to vault events and trigger the function. Set the following in the connector configuration:

  1. Navigate to OCI connector Hub and setup a new connector with below information
    • Name – Any name per your organization naming standards and description
    • Source – Logging and Target – Function
    • Configure Source – Under your Tenancy, select Log Group _Audit
    • For filters, switch to Advanced mode and use below filter.
      • search "ocid1.compartment.oc1..xxxxxxxxxxxx/_Audit" | (type='com.oraclecloud.VaultSecret.UpdateSecret.end' or type='com.oraclecloud.VaultSecret.CreateSecret.end' or type='com.oraclecloud.VaultSecret.ScheduleSecretDeletion.end' or type='com.oraclecloud.VaultSecret.CancelSecretDeletion.end') 
        
    • Under Configure Target, select the OCI Function that was created in step 2.2.

Step 3 – Testing

  1. Create, Update, Delete, Un-Delete secret in the source region (one at a time with time delay of 5 minutes per operation) and ensure the changes are replicated in the destination region.
  2. Verify logging information is stored under Function Logs.

Conclusion

Replicating vaults and secrets across regions in OCI ensures business continuity and enhances security for sensitive data. This approach provides resilience against regional outages and strengthens your security architecture across geographically distributed environments.