Managing user access securely in Oracle Cloud Infrastructure (OCI) requires more than just assigning roles — it also involves regular audits of long-lived credentials such as API keys. Rotating API keys every 90 days is a widely recommended security practice, and OCI offers native support for this through Cloud Guard.
Cloud Guard includes a detector rule called “API key is too old”, which triggers an alert when a key exceeds the recommended age. This helps promote rotation hygiene and reduce exposure to compromised credentials. However, there are situations where you may:
- Not have Cloud Guard enabled in your tenancy, or
- Want a quick, on-demand audit of all API keys across identity domains for compliance checks or operational reviews.
To support those use cases, we’ve created a standalone Python script that complements Cloud Guard by:
- Querying all users in each identity domain.
- Identifying users who have associated API keys.
- Calculating the age of each key.
- Reporting keys that are either older than 90 days (non-compliant) or nearing expiry (between 75–90 days).
- Optionally exporting results to a CSV file for automation or reporting purposes.

1. Load the config
config_path = "~/.oci/config" config = oci.config.from_file(config_path)
The script uses the oci.config.from_file() method to load authentication details (tenancy, user OCID, key file path, etc.) from a predefined config file.
The config file is a simple INI-style file that contains one or more profiles (usually [DEFAULT]), with keys that define how the SDK connects to OCI. For example
[DEFAULT] user=ocid1.user.oc1..aaaa... fingerprint=aa:bb:cc:dd:... key_file=/Users/you/.oci/oci_api_key.pem tenancy=ocid1.tenancy.oc1..aaaa... region=us-ashburn-1
where
user: OCID of the user running the script.
fingerprint: Fingerprint of the public API key uploaded to the user’s OCI profile.
key_file: Path to the private key matching the uploaded public key.
tenancy: OCID of the tenancy being audited.
region: Region to send API calls to (e.g., us-phoenix-1, eu-frankfurt-1).
2.Initialization
identity_domains_client = oci.identity.IdentityClient(config)
- Used to discover all compartments and list identity domains.
- These actions require tenancy-wide access and are not domain-specific.
domain_client = oci.identity_domains.IdentityDomainsClient(
config=config,
service_endpoint=domain['url']
)
- Used to list users, fetch API keys, and inspect key metadata.
- This client targets a specific identity domain via its SCIM-compatible service_endpoint URL.
3. Timeout against Domains and Faulty Domains
To avoid hanging during long-running API calls, the script uses threading with built-in timeouts. If a domain takes longer than 2 minutes to respond, the script prompts the user for input. If no response is received within 30 seconds, the domain check is automatically terminated.
- If the user responds with N, the domain is immediately skipped.
- If the user responds with y, the script waits up to an additional 3 minutes (total of 5 minutes) for the domain to respond.
If the domain still doesn’t respond or returns an error (like 404 or 429), it’s marked as a faulty domain and reported at the end of the audit.
Example
- Domain ‘Test’ is taking longer than 2 minutes to respond. Do you want to wait longer? (y/N): Terminated check for domain Test by user choice.
- domainxxxx (error: {‘target_service’: ‘identity_domains’, ‘status’: 404, ‘code’: None, ‘opc-request-id’: ‘FCA0D0367F5E4236B4C14D45EC9C5C1F/YBz^L014q00000000’, ‘message’: ‘The service returned error code 404’, ‘operation_name’: ‘list_api_keys’,
- TestDomain (error: {‘target_service’: ‘identity_domains’, ‘status’: 429, ‘code’: None, ‘opc-request-id’: ‘82837DE989154A01A06E8E964277CA93/fObES0aCC10000000’, ‘message’: ‘The service returned error code 429’, ‘operation_name’: ‘list_users’,
4. Auditing Domains
def discover_all_auditable_domains():
print("Discovering all Identity Domains...")
all_domains_info = []
try:
all_comps = oci.pagination.list_call_get_all_results(
identity_domains_client.list_compartments,
compartment_id=tenancy_id,
compartment_id_in_subtree=True
).data
all_comps.append(identity_domains_client.get_compartment(compartment_id=tenancy_id).data)
for comp in all_comps:
domains_in_comp = oci.pagination.list_call_get_all_results(
identity_domains_client.list_domains,
compartment_id=comp.id
).data
for dom in domains_in_comp:
if dom.lifecycle_state == 'ACTIVE' and dom.url:
all_domains_info.append({
"url": dom.url,
"display_name": dom.display_name,
"type": dom.type,
"compartment_id": dom.compartment_id,
"compartment_name": comp.name
})
except oci.exceptions.ServiceError as e:
print(f"Could not list compartments or domains. Error: {e}")
return []
print(f"Found {len(all_domains_info)} auditable domains.")
return all_domains_info
This function lists all compartments and searches for active identity domains. Each domain is stored along with its name, type, compartment ID, and compartment name for reference.
5.Listing Users and API Keys
For each domain:
- A domain-specific client is created using its endpoint URL.
- The script fetches all users in that domain using:
def get_users_in_domain(domain_client):
users = []
next_page = None
while True:
response = domain_client.list_users(
page=next_page,
attribute_sets=["all"]
)
users.extend(response.data.resources)
next_page = response.headers.get("opc-next-page")
if not next_page:
break
return users
- It checks each user using to see if they have api_keys
def search_api_keys_by_user_id(domain_client, user_id):
try:
response = domain_client.list_api_keys(
filter=f'user.value eq "{user_id}"'
)
return response.data.resources
except oci.exceptions.ServiceError as e:
if e.status in [404, 429]:
raise e
else:
print(f" Failed to list API keys for user ID {user_id}: {e}")
return []
This retrieves all API keys associated with the given user.
6.API Key Age Calculation
To determine how long an API key has been active, the script retrieves the key’s creation timestamp using the meta.created field returned from the get_api_key() API call. This value is an ISO 8601-formatted string, which the script then parses into a timezone-aware Python datetime object using datetime.fromisoformat(). It calculates the age by subtracting the creation date from the current UTC time (datetime.now(timezone.utc)). If the key is older than 90 days, the script flags it for rotation. Keys that are between 75 and 90 days old are considered “nearing expiry” and included in a separate list.
for key in api_keys:
key_details = get_api_key_details(domain_client, key.id)
if key_details:
created_date_raw = key_details.meta.created
created_date = parse_datetime(created_date_raw)
if not created_date:
continue
age = now - created_date
#print(f" API Key ID: {key_details.id}")
#print(f" Created On: {created_date}")
if age > rotation_threshold:
print(f" Found API key's older than 90 days and should be rotated. The Current API keys age is: {age.days} days")
users_needing_rotation.append({
'domain': domain['display_name'],
'compartment': domain['compartment_name'],
'user_name': user.user_name,
'api_key_id': key_details.id,
'age_days': age.days
})
elif 75 <= age.days <= 90:
print(f" API key is nearing expiry. Current age: {age.days} days")
users_expiring_soon.append({
'domain': domain['display_name'],
'compartment': domain['compartment_name'],
'user_name': user.user_name,
'api_key_id': key_details.id,
'age_days': age.days
})
if not user_with_keys_found:
print(" No users with API keys in this domain.")
else:
print(f" Users with API keys found: {len(users_with_keys)}\n")
except (oci.exceptions.ServiceError, Exception) as e:
failed_domains.append(f"{domain['display_name']} (error: {str(e)})")
print(f" Error processing domain {domain['display_name']}: {e}")
7.Output
The script prints the results in a human-readable table format and groups findings by domain, sorted by API key age.
Here is the sample script
import oci
from oci.identity_domains.models import ApiKeySearchRequest
from datetime import datetime, timezone, timedelta
import concurrent.futures
import threading
import sys
import queue
def timed_input(prompt, timeout=30):
print(prompt, end='', flush=True)
input_queue = queue.Queue()
def read_input():
try:
user_input = sys.stdin.readline().strip()
input_queue.put(user_input)
except Exception:
input_queue.put('')
t = threading.Thread(target=read_input)
t.daemon = True
t.start()
try:
return input_queue.get(timeout=timeout)
except queue.Empty:
return ''
# Load OCI config
config_path = "~/.oci/config"
config = oci.config.from_file(config_path)
tenancy_id = config["tenancy"]
region = config["region"]
# Identity clients
identity_domains_client = oci.identity.IdentityClient(config)
# Timeout settings
DOMAIN_TIMEOUT_SECONDS = 120 # Set to 2 minutes
def discover_all_auditable_domains():
print("Discovering all Identity Domains...")
all_domains_info = []
try:
all_comps = oci.pagination.list_call_get_all_results(
identity_domains_client.list_compartments,
compartment_id=tenancy_id,
compartment_id_in_subtree=True
).data
all_comps.append(identity_domains_client.get_compartment(compartment_id=tenancy_id).data)
for comp in all_comps:
domains_in_comp = oci.pagination.list_call_get_all_results(
identity_domains_client.list_domains,
compartment_id=comp.id
).data
for dom in domains_in_comp:
if dom.lifecycle_state == 'ACTIVE' and dom.url:
all_domains_info.append({
"url": dom.url,
"display_name": dom.display_name,
"type": dom.type,
"compartment_id": dom.compartment_id,
"compartment_name": comp.name
})
except oci.exceptions.ServiceError as e:
print(f"Could not list compartments or domains. Error: {e}")
return []
print(f"Found {len(all_domains_info)} auditable domains.")
return all_domains_info
def get_users_in_domain(domain_client):
users = []
next_page = None
while True:
response = domain_client.list_users(
page=next_page,
attribute_sets=["all"]
)
users.extend(response.data.resources)
next_page = response.headers.get("opc-next-page")
if not next_page:
break
return users
def search_api_keys_by_user_id(domain_client, user_id):
try:
response = domain_client.list_api_keys(
filter=f'user.value eq "{user_id}"'
)
return response.data.resources
except oci.exceptions.ServiceError as e:
if e.status in [404, 429]:
raise e
else:
print(f" Failed to list API keys for user ID {user_id}: {e}")
return []
def get_api_key_details(domain_client, api_key_id):
try:
key = domain_client.get_api_key(
api_key_id=api_key_id
)
return key.data
except Exception as e:
print(f" Failed to get key details for {api_key_id}: {str(e)}")
return None
def parse_datetime(iso_string):
try:
return datetime.fromisoformat(iso_string.replace("Z", "+00:00"))
except Exception as e:
print(f" Failed to parse date '{iso_string}': {str(e)}")
return None
def audit_domain(domain, failed_domains, rotation_threshold, now, users_needing_rotation):
try:
print(f"Auditing Domain: {domain['display_name']} Compartment Name: {domain['compartment_name']}")
domain_client = oci.identity_domains.IdentityDomainsClient(
config=config,
service_endpoint=domain['url']
)
users = get_users_in_domain(domain_client)
print(f" - Found {len(users)} users in the Domain. We will now check for users with API Keys")
user_with_keys_found = False
users_with_keys = []
for user in users: # iterate through each user to check for API keys
try:
api_keys = search_api_keys_by_user_id(domain_client, user.id)
except oci.exceptions.ServiceError as e:
if e.status in [404, 429]:
raise e
else:
continue
if not api_keys:
continue
user_with_keys_found = True
users_with_keys.append(user)
print(f" User: {user.user_name}")
for key in api_keys:
key_details = get_api_key_details(domain_client, key.id)
if key_details:
created_date_raw = key_details.meta.created
created_date = parse_datetime(created_date_raw)
if not created_date:
continue
age = now - created_date
#print(f" API Key ID: {key_details.id}")
#print(f" Created On: {created_date}")
if age > rotation_threshold:
print(f" Found API key's older than 90 days and should be rotated. The Current API keys age is: {age.days} days")
users_needing_rotation.append({
'domain': domain['display_name'],
'compartment': domain['compartment_name'],
'user_name': user.user_name,
'api_key_id': key_details.id,
'age_days': age.days
})
elif 75 <= age.days <= 90:
print(f" API key is nearing expiry. Current age: {age.days} days")
users_expiring_soon.append({
'domain': domain['display_name'],
'compartment': domain['compartment_name'],
'user_name': user.user_name,
'api_key_id': key_details.id,
'age_days': age.days
})
if not user_with_keys_found:
print(" No users with API keys in this domain.")
else:
print(f" Users with API keys found: {len(users_with_keys)}\n")
except (oci.exceptions.ServiceError, Exception) as e:
failed_domains.append(f"{domain['display_name']} (error: {str(e)})")
print(f" Error processing domain {domain['display_name']}: {e}")
import argparse
def main():
try:
parser = argparse.ArgumentParser(description="Audit OCI Identity Domains for stale API keys.")
parser.add_argument('--non-interactive', action='store_true', help='Run in non-interactive mode')
args = parser.parse_args()
all_domains = discover_all_auditable_domains()
print("Available Domains:")
for idx, dom in enumerate(all_domains, 1):
print(f" [{idx}] {dom['display_name']}")
selection = input("Enter domain numbers to audit (comma-separated), or press Enter to audit all: ").strip()
if selection:
indices = [int(i)-1 for i in selection.split(',') if i.strip().isdigit() and 0 < int(i) <= len(all_domains)]
domains = [all_domains[i] for i in indices]
else:
domains = all_domains
rotation_threshold = timedelta(days=90)
now = datetime.now(timezone.utc)
failed_domains = []
users_needing_rotation = []
users_expiring_soon = []
users_expiring_soon = []
for domain in domains:
thread = threading.Thread(
target=audit_domain,
args=(domain, failed_domains, rotation_threshold, now, users_needing_rotation),
daemon=True
)
thread.start()
thread.join(timeout=30)
if thread.is_alive():
user_input = 'n'
if not args.non_interactive:
user_input = 'n'
if not args.non_interactive:
user_input = timed_input(f"Domain '{domain['display_name']}' is taking longer than 2 minutes to respond. Do you want to wait longer? (y/N): ",timeout=30).strip().lower()
#user_input = input(f"Domain '{domain['display_name']}' is taking longer than 2 minutes to respond. Do you want to wait longer? (y/N): ").strip().lower()
if user_input != 'y':
failed_domains.append(f"{domain['display_name']} (user terminated)")
print(f" Terminated check for domain {domain['display_name']} by user choice.")
continue
print(f" Waiting up to 5 minutes for domain '{domain['display_name']}' to complete...")
thread.join(timeout=180)
if thread.is_alive():
user_input = input(f"[?] Domain '{domain['display_name']}' is taking longer than 2 minutes to respond. Do you want to wait longer? (y/N): ").strip().lower()
if user_input == 'y':
print(f" Waiting up to 5 minutes for domain '{domain['display_name']}' to complete...")
thread.join(timeout=180)
if thread.is_alive():
failed_domains.append(f"{domain['display_name']} (timeout)")
print(f" Timeout while auditing domain {domain['display_name']}")
thread.join(timeout=DOMAIN_TIMEOUT_SECONDS)
if thread.is_alive():
print(f" Still waiting for domain {domain['display_name']} to respond. Will wait until it completes...")
thread.join()
if thread.is_alive():
failed_domains.append(f"{domain['display_name']} (timeout)")
print(f" Timeout while auditing domain {domain['display_name']}")
if failed_domains:
print("Domains that couldn't be audited due to errors:")
for name in failed_domains:
print(f" - {name}")
if users_needing_rotation:
print("\nUsers with API keys older than 90 days (rotation needed):\n")
print("{:<60} {:<40} {:<25} {:<40} {:>10}".format("User", "Domain", "Compartment", "API Key ID", "Age (days)"))
print("=" * 180)
import csv
from collections import defaultdict
grouped = defaultdict(list)
for entry in users_needing_rotation:
grouped[entry['domain']].append(entry)
with open("api_keys_rotation_report.csv", "w", newline="") as csvfile:
fieldnames = ["User", "Domain", "Compartment", "API Key ID", "Age (days)"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for domain in sorted(grouped):
print(f"\nDomain: {domain} \n")
print("=" * 180)
sorted_entries = sorted(grouped[domain], key=lambda x: -x['age_days'])
for entry in sorted_entries:
print("{:<60} {:<40} {:<25} {:<40} {:>10}".format(
entry['user_name'], entry['domain'], entry['compartment'], entry['api_key_id'], entry['age_days']
))
writer.writerow({
"User": entry['user_name'],
"Domain": entry['domain'],
"Compartment": entry['compartment'],
"API Key ID": entry['api_key_id'],
"Age (days)": entry['age_days']
})
print("=" * 180)
if not users_expiring_soon:
print("\n No users found with API keys nearing expiry (75–90 days old). \n")
else:
print("\n Users with API keys nearing expiry (75–90 days): \n")
print("{:<60} {:<40} {:<25} {:<40} {:>10}".format("User", "Domain", "Compartment", "API Key ID", "Age (days)"))
print("=" * 190)
grouped_expiring = defaultdict(list)
for entry in users_expiring_soon:
grouped_expiring[entry['domain']].append(entry)
for domain in sorted(grouped_expiring):
print(f"Domain: {domain}\n")
print("=" * 190)
sorted_entries = sorted(grouped_expiring[domain], key=lambda x: -x['age_days'])
for entry in sorted_entries:
print("{:<60} {:<40} {:<25} {:<40} {:>10}".format(
entry['user_name'], entry['domain'], entry['compartment'], entry['api_key_id'], entry['age_days']
))
with open("api_keys_expiring_soon_report.csv", "w", newline="") as csvfile:
fieldnames = ["User", "Domain", "Compartment", "API Key ID", "Age (days)"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for domain in sorted(grouped_expiring):
sorted_entries = sorted(grouped_expiring[domain], key=lambda x: -x['age_days'])
for entry in sorted_entries:
writer.writerow({
"User": entry['user_name'],
"Domain": entry['domain'],
"Compartment": entry['compartment'],
"API Key ID": entry['api_key_id'],
"Age (days)": entry['age_days']
})
except Exception as e:
print(f" Unexpected error occurred during execution: {e}")
if __name__ == "__main__":
main()
Note: This is a sample script provided for educational and illustrative purposes only. It should be tested thoroughly in a non-production or sandbox environment before being used in any customer tenancy. Ensure appropriate error handling, logging, and access controls are in place when adapting this for use in real-world environments.
