Oracle Cloud Infrastructure (OCI) provides a secure and reliable environment for managing secrets such as passwords, API keys, and other sensitive information. In multi-region or disaster recovery setups, it is crucial to replicate secrets from a source region to a destination region to ensure continuous availability and resilience. Replicating secrets between regions ensures that your data is available even during regional failures or disasters.
In this blog post, we will explore how to replicate secrets across regions in OCI.

Pre-Requisite
- OCI enforces service limits on resources in each region. Before configuring replication, ensure you have the necessary resource limits in the destination region to accommodate the replicated vault. You can refer to OCI Vault Limits for more information.
Step 1: Perform a One-Time Replication for Existing Vaults
To replicate existing vaults and secrets to the destination region, execute the following Python script. Ensure that the user running the script has a valid OCI configuration and sufficient permissions to manage vaults and secrets. In the script, update the user parameters to specify the compartment_id, destination vault endpoints and optionally, the destination vault name (default: backup-[VaultName]-[SourceRegion]) and destination KMS key name (default: secret-replication-key).
Secrets in OCI can only be created within vaults and for replication purposes, vaults with names like backup-[vault display name]-[region code] will be created in the destination region to store the replicated secret values. Secrets in the target vault are created with the same name and content as in the source vault, but their OCID values in the destination region will be different from those in the source region.
import oci
import sys
import traceback
import logging
# User parameters
compartment_id = "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxx"
dest_vault_endpoint = "https://kms.uk-london-1.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/key/release/
dest_vault_secret_endpoint = "https://vaults.uk-london-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretmgmt/20180608/
dest_vault_secret_retrieval_endpoint="https://secrets.vaults.uk-london-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretretrieval/20190301/
dst_key_name = 'secret-replication-key'
config = oci.config.from_file("~/.oci/config")
def list_secrets(compartment_id, vault_client, src_vault_id, secrets_client):
secrets_list_response = {}
try:
paginator = vault_client.list_secrets(compartment_id=compartment_id, vault_id=src_vault_id)
for page in paginator.data:
secrets_info = {}
secrets_info["vault_id"] = page.vault_id
secrets_info["secret_name"] = page.secret_name
secrets_info["lifecycle_state"] = page.lifecycle_state
secrets_info["key_id"] = page.key_id
secrets_info["secret_ocid"] = page.id
secrets_info["freeform_tags"] = page.freeform_tags
secrets_info["description"] = page.description
secrets_info["time_of_deletion"] = paginator.data[0].time_of_deletion
if page.lifecycle_state == 'ACTIVE':
get_secret_bundle_by_name_response = secrets_client.get_secret_bundle(secret_id=page.id, stage="LATEST")
secrets_info["secret_bundle_content"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content
secrets_info["secret_bundle_content_type"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content_type
secrets_info["secret_stages"] = get_secret_bundle_by_name_response.data.stages
secrets_info["version_number"] = get_secret_bundle_by_name_response.data.version_number
secrets_list_response[page.secret_name] = secrets_info
except Exception as e:
print(f"Error listing secrets: {e}")
return secrets_list_response
def get_vault(kms_vault_client, compartment_id):
vault_list = {}
try:
vault_resp = kms_vault_client.list_vaults(compartment_id)
for v in vault_resp.data:
vault_data = {}
vault_data["display_name"] = v.display_name
vault_data["freeform_tags"] = v.freeform_tags
vault_data["management_endpoint"] = v.management_endpoint
vault_data["vault_type"] = v.vault_type
vault_data["vault_id"] = v.id
vault_data["lifecycle_state"] = v.lifecycle_state
vault_list[v.display_name] = vault_data
except Exception as e:
print(f"Error getting vaults: {e}")
return vault_list
def create_vault(compartment_id, vault_info, kms_vault_client_composite):
try:
dest_vault_name = "Backup-" + vault_info["display_name"] + "-" + vault_info["vault_id"].split('.')[-3]
print(" Creating vault {} in {} compartment".format(dest_vault_name, compartment_id))
vault_details = oci.key_management.models.CreateVaultDetails(
compartment_id=compartment_id,
vault_type=vault_info["vault_type"],
display_name=dest_vault_name,
freeform_tags=vault_info["freeform_tags"]
)
response = kms_vault_client_composite.create_vault_and_wait_for_state(
vault_details,
wait_for_states=[oci.key_management.models.Vault.LIFECYCLE_STATE_ACTIVE]
)
return response
except Exception as e:
print(f"Error creating vault: {e}")
return None
def create_secret(vaults_management_client_composite, compartment_id, vault_id, key_id, secret_details):
try:
if 'PENDING' in secret_details['secret_stages']:
print("Latest version of Secret {} is in 'Pending' stage and it will be replicated as Active version".format(secret_details["secret_name"]))
secret_content_details = oci.vault.models.Base64SecretContentDetails(
content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
name=secret_details["secret_name"],
stage='CURRENT',
content=secret_details['secret_bundle_content']
)
secrets_details = oci.vault.models.CreateSecretDetails(
compartment_id=compartment_id,
description=secret_details['description'],
secret_content=secret_content_details,
secret_name=secret_details["secret_name"],
vault_id=vault_id,
key_id=key_id,
freeform_tags=secret_details["freeform_tags"]
)
response = vaults_management_client_composite.create_secret_and_wait_for_state(
create_secret_details=secrets_details,
wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
)
return response
except Exception as e:
print(f"Error creating secret: {e}")
return None
def create_key(key_mgmt_composite, dst_key_name, compartment_id):
try:
print("Creating KMS key {} in compartment {}.".format(dst_key_name, compartment_id))
key_shape = oci.key_management.models.KeyShape(algorithm="AES", length=32)
key_details = oci.key_management.models.CreateKeyDetails(
compartment_id=compartment_id,
display_name=dst_key_name,
key_shape=key_shape
)
response = key_mgmt_composite.create_key_and_wait_for_state(
key_details,
wait_for_states=[oci.key_management.models.Key.LIFECYCLE_STATE_ENABLED]
)
return response
except Exception as e:
print(f"Error creating key: {e}")
return None
def list_keys(key_management_client,compartment_id):
try:
list_keys_response = key_management_client.list_keys(compartment_id=compartment_id)
except Exception as e:
print(f"An error occurred: {e}")
return list_keys_response
def update_secret(vaults_management_client_composite,secret_details,secret_id):
try:
Stage='CURRENT'
if 'PENDING' in secret_details['secret_stages']:
logging.getLogger().info(f"[INFO]Latest version of Secret {secret_details['secret_name']} is in 'Pending' stage")
Stage='PENDING'
secret_content_details = oci.vault.models.Base64SecretContentDetails(
content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
stage=Stage,
content=secret_details['secret_bundle_content']
)
secrets_details = oci.vault.models.UpdateSecretDetails(
description=secret_details['description'],
secret_content=secret_content_details
)
response = vaults_management_client_composite.update_secret_and_wait_for_state(
secret_id=secret_id,
update_secret_details=secrets_details,
wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
)
return response
except Exception as e:
logging.getLogger().info(f"[Error]update_secret - {e}")
return None
def schedule_secret_deletion(dst_secrets_client,dst_vaults_client, secret_id, deletion_time):
print("Deleting a secret")
#Get secret info to get time of deletion
schedule_secret_deletion_response = dst_vaults_client.schedule_secret_deletion(
secret_id=secret_id,
schedule_secret_deletion_details=oci.vault.models.ScheduleSecretDeletionDetails(
time_of_deletion=deletion_time))
try:
# Configure OCI clients
src_secrets_client = oci.secrets.SecretsClient(config)
dst_secrets_client = oci.secrets.SecretsClient(config,service_endpoint=dest_vault_secret_retrieval_endpoint)
src_kms_vault_client = oci.key_management.KmsVaultClient(config)
src_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(src_kms_vault_client)
dst_kms_vault_client = oci.key_management.KmsVaultClient(config, service_endpoint=dest_vault_endpoint)
dst_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(dst_kms_vault_client)
src_vault_client = oci.vault.VaultsClient(config)
dst_vaults_client = oci.vault.VaultsClient(config, service_endpoint=dest_vault_secret_endpoint)
dst_vaults_management_client_composite = oci.vault.VaultsClientCompositeOperations(dst_vaults_client)
# Step 1: Getting source Vault details
print("Getting source Vault information.")
src_vault_list = get_vault(src_kms_vault_client, compartment_id)
#ensure the vault is not present in destination region.
dst_vault_list = get_vault(dst_kms_vault_client, compartment_id)
# Step 2: Creating vaults in the secondary region
for src_vault_info in src_vault_list.values():
src_list_secrets_response = list_secrets(compartment_id, src_vault_client, src_vault_info["vault_id"], src_secrets_client)
if src_vault_info["lifecycle_state"] != 'ACTIVE' or not any(secret["lifecycle_state"] == 'ACTIVE' for secret in src_list_secrets_response.values()):
#avoid creating duplicate Vaults
print(f"[Info] No active secrets in {src_vault_info['display_name'] } vault. Skipping vault creation in destination region.")
continue
print(f"Processing {src_vault_info['display_name']} Vault")
dest_vault_name = "Backup-" + src_vault_info["display_name"] + "-" + src_vault_info["vault_id"].split('.')[-3]
dst_vault_presence=False
for vault_name,vault_info in dst_vault_list.items():
if vault_name == dest_vault_name:
dst_vault_presence=True
break
if dst_vault_presence == True:
ext_dst_vault=dst_vault_list[dest_vault_name]
dst_vault_id=ext_dst_vault["vault_id"]
dst_vault_management_endpoint=ext_dst_vault["management_endpoint"]
print("[Warning] Vault {} exists in Dest region {}".format(ext_dst_vault['display_name'], ext_dst_vault['vault_id'].split('.')[-3]))
# Step 2.1: Validating KMS key in the destination to avoid duplicates
dst_vault_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=dst_vault_management_endpoint)
dst_list_keys=list_keys(dst_vault_management_client,compartment_id)
key_flag=False
for key_dtls in dst_list_keys.data:
if key_dtls.display_name == dst_key_name and key_dtls.lifecycle_state == 'ENABLED':
dst_key_id=key_dtls.id
key_flag=True
break
if key_flag == False:
# Step 2.1: Creating KMS key in the destination
dst_vault_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=dst_vault_management_endpoint)
dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
dst_key_id = key.id
else:
print("[Warning] Key {} exists in Dest region".format(dst_key_name))
else:
dst_vault = create_vault(compartment_id, src_vault_info, dst_kms_vault_client_composite).data
print("Created Vault {} in Dest region {}".format(dst_vault.display_name, dst_vault.id.split('.')[-3]))
dest_vault_create=True
dst_vault_id=dst_vault.id
dst_vault_management_endpoint=dst_vault.management_endpoint
# Step 2.1: Creating KMS key in the destination
dst_vault_management_client = oci.key_management.KmsManagementClient(config, service_endpoint=dst_vault_management_endpoint)
dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
dst_key_id = key.id
# Step 3: Replicating secrets from source to destination
print("Retrieving secrets from Source vault {}".format(src_vault_info["display_name"]))
dst_list_secrets_response = list_secrets(compartment_id, dst_vaults_client, dst_vault_id, dst_secrets_client)
for secret_name, secret_dtls in src_list_secrets_response.items():
#Avoid creating duplicate secrets
dst_secret_presence=False
for dst_secret_name,dst_secret_dtls in dst_list_secrets_response.items():
if dst_secret_dtls["secret_name"] == secret_name:
dst_secret_presence=True
dst_secret_id=dst_secret_dtls["secret_ocid"]
dst_secret_state=dst_secret_dtls["lifecycle_state"]
break
if dst_secret_presence == False and secret_dtls["lifecycle_state"] == 'ACTIVE':
create_secret_key_response = create_secret(dst_vaults_management_client_composite, compartment_id, dst_vault_id, dst_key_id, secret_dtls).data
print("Created Secret {}".format(create_secret_key_response.secret_name))
elif dst_secret_presence == True and secret_dtls["lifecycle_state"] == 'ACTIVE':
update_secret(dst_vaults_management_client_composite,secret_dtls,dst_secret_id)
print("[Info] Secret {} already present in dest vault, updating the secret".format(secret_name))
elif dst_secret_presence == True and secret_dtls["lifecycle_state"] == 'PENDING_DELETION' :
if dst_secret_state == 'ACTIVE':
schedule_secret_deletion(dst_secrets_client,dst_vaults_client, dst_secret_id, secret_dtls['time_of_deletion'])
print("[Info] Marking Secret {} for deletion at {} ".format(secret_name,secret_dtls['time_of_deletion']) )
else:
print("[Info] Secret {} is scheduled for deletion on {}. Skipping Replication ".format(secret_name,secret_dtls['time_of_deletion']) )
else:
print("[Error] Secret {} is in {} state. Skipping Replication".format(secret_name,secret_dtls['lifecycle_state']))
except Exception as e:
print(f"An error occurred: {e}")
# Display full error details using traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_details = {
'filename': exc_traceback.tb_frame.f_code.co_filename,
'lineno': exc_traceback.tb_lineno,
'name': exc_traceback.tb_frame.f_code.co_name,
'type': exc_type.__name__,
'message': str(exc_value)
}
print("Exception Details:")
for key, value in traceback_details.items():
print(f"{key}: {value}")
traceback.print_exception(exc_type, exc_value, exc_traceback)
This script will list all active secrets from the source vault, replicate them to the destination region, and create a new vault if necessary.
Step 2: Automating Secret Replication Across Regions with Event-Driven Architecture
Once the existing vaults and secrets are replicated, you can automate the replication of future vaults and secrets by setting up an event-driven architecture using OCI Cloud Native services.
Step 2.1: Grant Necessary Permissions
First, ensure that the OCI Function has the required permissions to manage vaults and secrets and that Connector Hub can invoke the OCI Function.
- Create Dynamic Groups in your Identity Domain for functions that will execute the replication logic.
- Group Name – Per your Organization standards (for example Sec-sc-fn-policy)
- Matching Rules
-
ALL {resource.type = 'fnfunc', resource.compartment.id = 'ocid1.compartment.oc1..xxxxxxxxxxxxxxx}
-
- Add identity Policies to grant OCI Functions necessary permissions to replicate secrets.
-
Allow dynamic-group sec-sc-fn-policy to manage keys in compartment CompartmentName Allow dynamic-group sec-sc-fn-policy to manage vaults in compartment CompartmentName Allow dynamic-group sec-sc-fn-policy to manage secret-family in compartment CompartmentName Allow dynamic-group sec-sc-fn-policy to manage repos in compartment CompartmentName
-
- Create another Identity Policy to grant Connector Hub permissions to invoke OCI Function
-
allow any-user to use fn-function in compartment id ocid1.compartment.oc1..XXXXXXXX where all {request.principal.type='serviceconnector', request.principal.compartment.id='ocid1.compartment.oc1..XXXXXXXXX}
-
compartment id with the actual OCID of your compartment.
Step 2.2 – Configure OCI Function
- Create a custom OCI Function to handle replication events. Below is a basic configuration for the Function. For additional details on creating OCI Function refer to this page
func.yaml
schema_version: 20180708 name: func-replicate-secrets version: 0.0.1 runtime: python build_image: fnproject/python:3.11-dev run_image: fnproject/python:3.11 entrypoint: /python/bin/fdk /function/func.py handler memory: 256
requirements.txt
fdk requests oci
func.py
import io
import oci
import json
from datetime import datetime, timedelta
import logging
from fdk import response
def extract_info_from_logs(logs):
log_entry = logs[0].get("data", {})
compartment_id = log_entry.get("compartmentId")
event_name = log_entry.get("eventName")
secret_id = log_entry.get("resourceId")
freeform_Tags=log_entry.get("freeformTags")
secret_name=log_entry.get("source")
return compartment_id,event_name,secret_id,freeform_Tags,secret_name
def get_vault(kms_vault_client, vault_id):
vault_list = {}
try:
vault_resp = kms_vault_client.get_vault(vault_id=vault_id)
except Exception as e:
logging.getLogger().info(f"[Error]get_vault - Error : {e}")
return vault_resp
def list_vault(kms_vault_client, compartment_id,vault_name):
try:
vault_resp = kms_vault_client.list_vaults(compartment_id)
vault_data = {}
for v in vault_resp.data:
if v.display_name == vault_name and v.lifecycle_state == 'ACTIVE':
vault_data["display_name"] = v.display_name
vault_data["freeform_tags"] = v.freeform_tags
vault_data["management_endpoint"] = v.management_endpoint
vault_data["vault_type"] = v.vault_type
vault_data["vault_id"] = v.id
vault_data["lifecycle_state"] = v.lifecycle_state
break
except Exception as e:
logging.getLogger().info(f"[Error]list_vault - {e}")
return vault_data
def get_secret (secrets_client,vault_client,secret_id):
secrets_info = {}
secret_data=vault_client.get_secret(secret_id=secret_id)
secrets_info["vault_id"] = secret_data.data.vault_id
secrets_info["secret_name"] = secret_data.data.secret_name
secrets_info["lifecycle_state"] = secret_data.data.lifecycle_state
secrets_info["key_id"] = secret_data.data.key_id
secrets_info["secret_ocid"] = secret_data.data.id
secrets_info["freeform_tags"] = secret_data.data.freeform_tags
secrets_info["description"] = secret_data.data.description
#secret_bundle_content library will only work for Active Secrets
if secret_data.data.lifecycle_state == 'ACTIVE':
get_secret_bundle_by_name_response = secrets_client.get_secret_bundle(secret_id=secret_id, stage="LATEST")
secrets_info["secret_bundle_content"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content
secrets_info["secret_bundle_content_type"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content_type
secrets_info["secret_stages"] = get_secret_bundle_by_name_response.data.stages
secrets_info["version_number"] = get_secret_bundle_by_name_response.data.version_number
elif secret_data.data.lifecycle_state == 'PENDING_DELETION':
secrets_info["time-of-deletion"] = secret_data.data.time_of_deletion
else:
logging.getLogger().info(f"[Error] Secret {secret_data.data.secret_name} is in {secret_data.data.lifecycle_state} State")
return secrets_info
def create_vault(compartment_id, vault_info, kms_vault_client_composite):
logging.getLogger().info(f"[Info] Vault info {vault_info.data}")
dest_vault_name = "Backup-" + vault_info.data.display_name + "-" + vault_info.data.id.split('.')[-3]
logging.getLogger().info(f"[Info] Creating vault {dest_vault_name} in {compartment_id} compartment")
vault_details = oci.key_management.models.CreateVaultDetails(
compartment_id=compartment_id,
vault_type=vault_info.data.vault_type,
display_name=dest_vault_name,
freeform_tags=vault_info.data.freeform_tags
)
response = kms_vault_client_composite.create_vault_and_wait_for_state(
vault_details,
wait_for_states=[oci.key_management.models.Vault.LIFECYCLE_STATE_ACTIVE]
)
return response
def get_secret_by_name(compartment_id, vault_client,secret_name, vault_id, secrets_client):
paginator = vault_client.list_secrets(compartment_id=compartment_id, vault_id=vault_id,name=secret_name)
secrets_info = {}
if paginator.data:
secrets_info['vault_id'] = paginator.data[0].vault_id
secrets_info["secret_name"] = paginator.data[0].secret_name
secrets_info["lifecycle_state"] = paginator.data[0].lifecycle_state
secrets_info["key_id"] = paginator.data[0].key_id
secrets_info["secret_ocid"] = paginator.data[0].id
secrets_info["freeform_tags"] = paginator.data[0].freeform_tags
secrets_info["description"] = paginator.data[0].description
secrets_info["time_of_deletion"] = paginator.data[0].time_of_deletion
if paginator.data[0].lifecycle_state == 'ACTIVE':
get_secret_bundle_by_name_response = secrets_client.get_secret_bundle(secret_id=paginator.data[0].id, stage="LATEST")
secrets_info["secret_bundle_content"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content
secrets_info["secret_bundle_content_type"] = get_secret_bundle_by_name_response.data.secret_bundle_content.content_type
secrets_info["secret_stages"] = get_secret_bundle_by_name_response.data.stages
secrets_info["version_number"] = get_secret_bundle_by_name_response.data.version_number
else:
logging.getLogger().info(f"[Warning] Secret {paginator.data[0].secret_name} is in {paginator.data[0].lifecycle_state} state and cannot be replicated or updated")
return secrets_info
def create_secret(vaults_management_client_composite, compartment_id, vault_id, key_id, secret_details):
try:
if 'PENDING' in secret_details['secret_stages']:
logging.getLogger().info(f"[Info]Latest version of Secret {secret_details['secret_name']} is in 'Pending' stage and it will be replicated as Active version")
else:
secret_content_details = oci.vault.models.Base64SecretContentDetails(
content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
name=secret_details["secret_name"],
stage='CURRENT',
content=secret_details['secret_bundle_content']
)
secrets_details = oci.vault.models.CreateSecretDetails(
compartment_id=compartment_id,
description=secret_details['description'],
secret_content=secret_content_details,
secret_name=secret_details["secret_name"],
vault_id=vault_id,
key_id=key_id,
freeform_tags=secret_details["freeform_tags"]
)
response = vaults_management_client_composite.create_secret_and_wait_for_state(
create_secret_details=secrets_details,
wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
)
return response
except Exception as e:
logging.getLogger().info(f"[Error] Creating secret: {e}")
return None
def update_secret(vaults_management_client_composite,secret_details,secret_id):
try:
Stage='CURRENT'
if 'PENDING' in secret_details['secret_stages']:
logging.getLogger().info(f"[INFO]Latest version of Secret {secret_details['secret_name']} is in 'Pending' stage")
Stage='PENDING'
secret_content_details = oci.vault.models.Base64SecretContentDetails(
content_type=oci.vault.models.SecretContentDetails.CONTENT_TYPE_BASE64,
stage=Stage,
content=secret_details['secret_bundle_content']
)
secrets_details = oci.vault.models.UpdateSecretDetails(
description=secret_details['description'],
secret_content=secret_content_details
)
response = vaults_management_client_composite.update_secret_and_wait_for_state(
secret_id=secret_id,
update_secret_details=secrets_details,
wait_for_states=[oci.vault.models.Secret.LIFECYCLE_STATE_ACTIVE]
)
return response
except Exception as e:
logging.getLogger().info(f"[Error]update_secret - {e}")
return None
def create_key(key_mgmt_composite, dst_key_name, compartment_id):
try:
key_shape = oci.key_management.models.KeyShape(algorithm="AES", length=32)
key_details = oci.key_management.models.CreateKeyDetails(
compartment_id=compartment_id,
display_name=dst_key_name,
key_shape=key_shape
)
response = key_mgmt_composite.create_key_and_wait_for_state(
key_details,
wait_for_states=[oci.key_management.models.Key.LIFECYCLE_STATE_ENABLED]
)
return response
except Exception as e:
logging.getLogger().info(f"[Error]create_key - {e}")
return None
def list_keys(key_management_client,compartment_id):
try:
list_keys_response = key_management_client.list_keys(compartment_id=compartment_id)
except Exception as e:
logging.getLogger().info(f"[Error]list_keys - An error occurred: {e}")
return list_keys_response
def schedule_secret_deletion(dst_vaults_client, secret_id, deletion_time):
result_date = datetime.now() + timedelta(days=44) #Since secret info on pending delete secret cannot be retireved, set default delete to 44 days
logging.getLogger().info(f"[Info]Deleting a secret {secret_id} and delete date is set to {result_date}")
schedule_secret_deletion_response = dst_vaults_client.schedule_secret_deletion(
secret_id=secret_id,
schedule_secret_deletion_details=oci.vault.models.ScheduleSecretDeletionDetails(
time_of_deletion=result_date))
return schedule_secret_deletion
def cancel_secret_deletion(dst_vaults_client, secret_id):
cancel_secret_deletion_response = dst_vaults_client.cancel_secret_deletion(
secret_id=secret_id)
return cancel_secret_deletion_response.headers
def schedule_secret_deletion(dst_secrets_client,dst_vaults_client, secret_id, deletion_time):
print("Deleting a secret")
#Get secret info to get time of deletion
dst_secret_info=get_secret(dst_secrets_client,dst_vaults_client,secret_id=secret_id)
schedule_secret_deletion_response = dst_vaults_client.schedule_secret_deletion(
secret_id=secret_id,
schedule_secret_deletion_details=oci.vault.models.ScheduleSecretDeletionDetails(
time_of_deletion=deletion_time))
def cance_secret_deletion(dst_vaults_client, secret_id):
cancel_secret_deletion_response = dst_vaults_client.cancel_secret_deletion(
secret_id=secret_id)
return cancel_secret_deletion_response.headers
def handler(ctx, data: io.BytesIO=None):
try:
#Limit functions logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Parse Connector Hub response and get the key information
logs = json.loads(data.getvalue())
compartment_id,src_event_name,src_secret_id,freeform_Tags,src_secret_name = extract_info_from_logs(logs)
logging.getLogger().info(f"[INFO] Variables from Connector Hub - Compartment is {compartment_id}, event name is {src_event_name},secret_id is {src_secret_id},freeform_Tags is {freeform_Tags},secret_name is {src_secret_name} ")
# Create OCI signer using resource principals
signer = oci.auth.signers.get_resource_principals_signer()
"""
Set defaults and hard code values as needed. For advanced users, store the data in a JSON config file and dynamically pull them
"""
dst_key_name = 'secret-replication-key'
dest_vault_endpoint = "https://kms.ca-toronto-1.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/key/release/
dest_vault_secret_endpoint = "https://vaults.ca-toronto-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretmgmt/20180608/
dest_vault_secret_retrieval_endpoint="https://secrets.vaults.ca-toronto-1.oci.oraclecloud.com" #https://docs.oracle.com/en-us/iaas/api/#/en/secretretrieval/20190301/
dst_secrets_client = oci.secrets.SecretsClient(config={},signer=signer,service_endpoint=dest_vault_secret_retrieval_endpoint)
src_secrets_client = oci.secrets.SecretsClient(config={},signer=signer)
src_kms_vault_client = oci.key_management.KmsVaultClient(config={},signer=signer)
src_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(src_kms_vault_client)
dst_kms_vault_client = oci.key_management.KmsVaultClient(config={},signer=signer, service_endpoint=dest_vault_endpoint)
dst_kms_vault_client_composite = oci.key_management.KmsVaultClientCompositeOperations(dst_kms_vault_client)
src_vault_client = oci.vault.VaultsClient(config={},signer=signer)
dst_vaults_client = oci.vault.VaultsClient(config={},signer=signer, service_endpoint=dest_vault_secret_endpoint)
dst_vaults_management_client_composite = oci.vault.VaultsClientCompositeOperations(dst_vaults_client)
#get source vault and secret info
src_secret_info=get_secret(src_secrets_client,src_vault_client,secret_id=src_secret_id)
src_vault_info=get_vault(src_kms_vault_client, vault_id=src_secret_info["vault_id"])
logging.getLogger().info(f"[INFO] Source Secret id is {src_secret_id} and Vault ID is {src_secret_info['vault_id']} ")
#validate destination vault info
dest_vault_name = "Backup-" + src_vault_info.data.display_name + "-" + src_vault_info.data.id.split('.')[-3]
dst_vault_info=list_vault(dst_kms_vault_client, compartment_id=compartment_id,vault_name=dest_vault_name)
logging.getLogger().info(f"[INFO] Destination vault info {dst_vault_info}")
#If vault exists
if dst_vault_info:
dst_vault_id=dst_vault_info["vault_id"]
dst_vault_management_endpoint=dst_vault_info["management_endpoint"]
logging.getLogger().info(f"[INFO] Dest Vault ID {dst_vault_id}")
#create/validate KMS key only for Create or Update secret operations. For all other, we dont need this info
if src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret':
# Step 2.1: Validating KMS key in the destination to avoid duplicates
# you will need key id only for creating or updating secrets
dst_vault_management_client = oci.key_management.KmsManagementClient(config={},signer=signer, service_endpoint=dst_vault_management_endpoint)
dst_list_keys=list_keys(dst_vault_management_client,compartment_id)
key_flag=False
for key_dtls in dst_list_keys.data:
if key_dtls.display_name == dst_key_name and key_dtls.lifecycle_state == 'ENABLED':
dst_key_id=key_dtls.id
key_flag=True # KMS Key with same name exists. No need to create new one
logging.getLogger().info(f"[INFO] KMS Key {dst_key_id} with same name exists. No need to create new one")
break
if key_flag == False:
# Step 2.1: Creating KMS key in the destination
dst_vault_management_client = oci.key_management.KmsManagementClient(config={},signer=signer, service_endpoint=dst_vault_management_endpoint)
dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
dst_key_id = key.id
logging.getLogger().info(f"[INFO] KMS Key {dst_key_id} doesnt exist, Created new key")
#For existing vaults, ensure there are no duplicate secrets
dst_secret_presence=False
resp_get_secret_by_name = get_secret_by_name(compartment_id, dst_vaults_client,src_secret_info["secret_name"], dst_vault_id, dst_secrets_client)
if resp_get_secret_by_name:
dst_secret_presence=True # secret with same name exists in destination vault
dst_secret_id=resp_get_secret_by_name["secret_ocid"]
logging.getLogger().info(f"[INFO] Secret {resp_get_secret_by_name['secret_name']} exists in destination and OCID is {dst_secret_id}")
if dst_secret_presence == False and (src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret'):
create_secret_key = create_secret(dst_vaults_management_client_composite, compartment_id, dst_vault_id, dst_key_id, src_secret_info).data
logging.getLogger().info(f"[INFO] Created Secret {create_secret_key.secret_name}")
elif dst_secret_presence == True and (src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret'):
logging.getLogger().info(f"[Warning] Secret {src_secret_info['secret_name']} already present in dest vault. Updating the secret {resp_get_secret_by_name['secret_name']}")
update_secret_key_response = update_secret(dst_vaults_management_client_composite,src_secret_info,dst_secret_id)
elif src_event_name == 'CancelSecretDeletion':
cance_secret_deletion(dst_vaults_client, dst_secret_id)
logging.getLogger().info(f"[Info] CancelSecretDeletion operation executed on {resp_get_secret_by_name['secret_name']}")
elif src_event_name == 'ScheduleSecretDeletion':
schedule_secret_deletion(dst_secrets_client,dst_vaults_client, dst_secret_id, resp_get_secret_by_name['time_of_deletion'])
logging.getLogger().info(f"[Info] ScheduleSecretDeletion operation executed on {resp_get_secret_by_name['secret_name']} and time of delettion is {resp_get_secret_by_name['time_of_deletion']}")
else:
#handle any other state(if there is one) here
logging.getLogger().info(f"[Info] This code works only for create,update and cancelScheduleDelete events.All other Events will be discarded")
else:
logging.getLogger().info(f"[Info] Vault dest_vault_name doest exist in destination region. New Vault will be created")
if src_event_name == 'CreateSecret' or src_event_name == 'UpdateSecret':
#destination vault is missing and only allowed operations are Create Secret and Update secret
dst_new_vault_info=create_vault(compartment_id=compartment_id, vault_info=src_vault_info, kms_vault_client_composite=dst_kms_vault_client_composite).data
dst_vault_id=dst_new_vault_info.id
vault_exists = False
logging.getLogger().info(f"[Info] Created Vault {dst_new_vault_info.display_name} in Dest region")
dest_vault_create=True
dst_vault_id=dst_new_vault_info.id
dst_vault_management_endpoint=dst_new_vault_info.management_endpoint
# Step 2.1: Creating KMS key in the destination
dst_vault_management_client = oci.key_management.KmsManagementClient(config={},signer=signer, service_endpoint=dst_vault_management_endpoint)
dst_vault_management_client_composite = oci.key_management.KmsManagementClientCompositeOperations(dst_vault_management_client)
key = create_key(dst_vault_management_client_composite, dst_key_name, compartment_id).data
dst_key_id = key.id
# since its a new vault, no secrets will be present. Create the secret irrespective of its source state.
create_secret_key_response = create_secret(dst_vaults_management_client_composite, compartment_id, dst_vault_id, dst_key_id, src_secret_info).data
else:
logging.getLogger().info(f"[Error] Vault doesnt exist in the destination region and secret deletion or Cancel Secret deletion will be ignored")
except (Exception, ValueError) as ex:
logging.getLogger().error(f"[Error] An error occurred: {ex}", exc_info=True)
return
SecretsClient, KmsVaultClient, and VaultsClient) are updated in Func.py code based on the specific region to which the secrets are being replicated.
Step 2.3 – Configure Connector Hub to listen to vault events and trigger the function. Set the following in the connector configuration:
- Navigate to OCI connector Hub and setup a new connector with below information
- Name – Any name per your organization naming standards and description
- Source – Logging and Target – Function
- Configure Source – Under your Tenancy, select Log Group _Audit
- For filters, switch to Advanced mode and use below filter.
-
search "ocid1.compartment.oc1..xxxxxxxxxxxx/_Audit" | (type='com.oraclecloud.VaultSecret.UpdateSecret.end' or type='com.oraclecloud.VaultSecret.CreateSecret.end' or type='com.oraclecloud.VaultSecret.ScheduleSecretDeletion.end' or type='com.oraclecloud.VaultSecret.CancelSecretDeletion.end')
-
-
- Under Configure Target, select the OCI Function that was created in step 2.2.
Step 3 – Testing
- Create, Update, Delete, Un-Delete secret in the source region (one at a time with time delay of 5 minutes per operation) and ensure the changes are replicated in the destination region.
- Verify logging information is stored under Function Logs.
Conclusion
Replicating vaults and secrets across regions in OCI ensures business continuity and enhances security for sensitive data. This approach provides resilience against regional outages and strengthens your security architecture across geographically distributed environments.
