Files
oracle-livemount-python/list_oracle_snapshots.py
2025-11-25 10:51:26 -05:00

268 lines
9.6 KiB
Python
Executable File

#!/usr/bin/env python3
import json
import sys
import os
from datetime import datetime
from rsc import RSCAuth, RSCGraphQL
from tabulate import tabulate
def find_database_by_name_or_id(identifier):
"""Find database by name or ID and return its details"""
auth = RSCAuth()
gql = RSCGraphQL(auth)
# Check if identifier looks like a UUID (contains hyphens)
if '-' in identifier:
# It's likely a database ID
query = """
query OracleDatabase($fid: UUID!) {
oracleDatabase(fid: $fid) {
dbUniqueName
id
cluster {
id
name
}
logicalPath {
fid
name
objectType
}
}
}
"""
variables = {"fid": identifier}
else:
# It's a database name
query = """
query OracleDatabases($filter: [Filter!]) {
oracleDatabases(filter: $filter) {
nodes {
dbUniqueName
id
cluster {
id
name
}
logicalPath {
fid
name
objectType
}
}
}
}
"""
variables = {
"filter": [
{"texts": [identifier], "field": "NAME_EXACT_MATCH"},
{"texts": ["false"], "field": "IS_RELIC"},
{"texts": ["false"], "field": "IS_REPLICATED"}
]
}
response = gql.query(query, variables)
if '-' in identifier:
# Direct ID lookup
db = response['data']['oracleDatabase']
if not db:
raise ValueError(f"Database with ID '{identifier}' not found")
return db
else:
# Name lookup
databases = response['data']['oracleDatabases']['nodes']
if not databases:
raise ValueError(f"No databases found with name '{identifier}'")
if len(databases) > 1:
print(f"Multiple databases found with name '{identifier}':")
for db in databases:
host_name = db['logicalPath'][0]['name'] if db['logicalPath'] else 'Unknown'
print(f" - {db['dbUniqueName']} (ID: {db['id']}, Host: {host_name})")
raise ValueError("Please specify the database ID instead")
return databases[0]
def format_timestamp(timestamp):
"""Format ISO timestamp to readable format"""
try:
dt = datetime.strptime(timestamp.replace('Z', '+0000'), '%Y-%m-%dT%H:%M:%S.%f%z')
return dt.strftime('%Y-%m-%d %H:%M:%S UTC')
except:
return timestamp
def list_snapshots(identifier):
"""List all snapshots for a database using snapshotsListConnection"""
try:
# Find the database
db = find_database_by_name_or_id(identifier)
print(f"Database: {db['dbUniqueName']}")
print(f"ID: {db['id']}")
cluster_name = db['cluster']['name'] if db['cluster'] else 'Unknown'
print(f"Cluster: {cluster_name}")
host_name = db['logicalPath'][0]['name'] if db['logicalPath'] else 'Unknown'
print(f"Host: {host_name}")
print("-" * 80)
# Initialize auth and GraphQL client
auth = RSCAuth()
gql = RSCGraphQL(auth)
# Query to get all snapshots using snapshotOfASnappableConnection
query = """
query SnapshotsListSingleQuery($snappableId: String!, $first: Int, $sortBy: SnapshotQuerySortByField, $sortOrder: SortOrder, $includeOnlySourceSnapshots: Boolean) {
snapshotsListConnection: snapshotOfASnappableConnection(
workloadId: $snappableId
first: $first
sortBy: $sortBy
sortOrder: $sortOrder
includeOnlySourceSnapshots: $includeOnlySourceSnapshots
) {
edges {
node {
__typename
id
date
isOnDemandSnapshot
... on CdmSnapshot {
cluster {
id
name
}
slaDomain {
id
name
}
snapshotRetentionInfo {
localInfo {
name
isExpirationDateCalculated
expirationTime
}
replicationInfos {
locationId
name
}
archivalInfos {
locationId
name
}
}
}
... on PolarisSnapshot {
archivalLocationName
isReplica
isArchivalCopy
slaDomain {
name
... on ClusterSlaDomain {
cluster {
id
name
}
}
}
}
}
}
}
}
"""
# Filter for the specific database
variables = {
"snappableId": db['id'],
"first": 1000, # Get up to 1000 snapshots
"sortBy": "CREATION_TIME",
"sortOrder": "DESC",
"includeOnlySourceSnapshots": False, # Include replicated and archived snapshots
}
response = gql.query(query, variables)
snapshots = response['data']['snapshotsListConnection']['edges']
if not snapshots:
print("\nNo snapshots found.")
return
# Group snapshots by actual location status
source_snapshots = []
replica_only_snapshots = []
for edge in snapshots:
snap = edge['node']
# Check if snapshot exists locally
if snap['snapshotRetentionInfo']['localInfo'] is not None:
source_snapshots.append(snap)
else:
replica_only_snapshots.append(snap)
# Display summary
print(f"\nSnapshot Summary:")
print(f" Source cluster (jp-edge-proxmox): {len(source_snapshots)} snapshots")
print(f" Replica only (jp-edge-dr): {len(replica_only_snapshots)} snapshots")
print(f" Total: {len(snapshots)} snapshots")
print("-" * 80)
# Display source snapshots first
if source_snapshots:
print(f"\nSOURCE CLUSTER SNAPSHOTS ({len(source_snapshots)})")
print("=" * 80)
table_data = []
for snap in source_snapshots:
table_data.append([
snap['id'][:8] + '...',
format_timestamp(snap['date']),
'On-Demand' if snap['isOnDemandSnapshot'] else 'Policy',
snap['slaDomain']['name'] if snap['slaDomain'] else 'None',
'Local + Replica' if snap['snapshotRetentionInfo']['replicationInfos'] else 'Local only'
])
headers = ['Snapshot ID', 'Date', 'Type', 'SLA Domain', 'Location Status']
print(tabulate(table_data, headers=headers, tablefmt='grid'))
# Display replica-only snapshots
if replica_only_snapshots:
print(f"\nREPLICA-ONLY SNAPSHOTS ({len(replica_only_snapshots)})")
print("=" * 80)
table_data = []
for snap in replica_only_snapshots:
table_data.append([
snap['id'][:8] + '...',
format_timestamp(snap['date']),
'On-Demand' if snap['isOnDemandSnapshot'] else 'Policy',
snap['slaDomain']['name'] if snap['slaDomain'] else 'None',
'Expired from source'
])
headers = ['Snapshot ID', 'Date', 'Type', 'SLA Domain', 'Location Status']
print(tabulate(table_data, headers=headers, tablefmt='grid'))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def main():
if len(sys.argv) == 2:
identifier = sys.argv[1]
elif len(sys.argv) == 1:
# No argument, find local database
auth = RSCAuth()
gql = RSCGraphQL(auth)
identifier = gql.get_local_database_id()
print(f"INFO: Using local database ID: {identifier}")
else:
print("Usage: python list_db_snapshots.py [<database_name_or_id>]")
print("If no database is specified, uses the local database.")
print("Examples:")
print(" python list_db_snapshots.py SCLONE")
print(" python list_db_snapshots.py 2cb7e201-9da0-53f2-8c69-8fc21f82e0d2")
print(" python list_db_snapshots.py")
sys.exit(1)
list_snapshots(identifier)
if __name__ == "__main__":
main()