#!/usr/bin/env python3 import json import sys import os from datetime import datetime from rsc import RSCAuth, RSCGraphQL from tabulate import tabulate def find_database_by_name_or_id(identifier): """Find database by name or ID and return its details""" auth = RSCAuth() gql = RSCGraphQL(auth) # Check if identifier looks like a UUID (contains hyphens) if '-' in identifier: # It's likely a database ID query = """ query OracleDatabase($fid: UUID!) { oracleDatabase(fid: $fid) { dbUniqueName id cluster { id name } logicalPath { fid name objectType } } } """ variables = {"fid": identifier} else: # It's a database name query = """ query OracleDatabases($filter: [Filter!]) { oracleDatabases(filter: $filter) { nodes { dbUniqueName id cluster { id name } logicalPath { fid name objectType } } } } """ variables = { "filter": [ {"texts": [identifier], "field": "NAME_EXACT_MATCH"}, {"texts": ["false"], "field": "IS_RELIC"}, {"texts": ["false"], "field": "IS_REPLICATED"} ] } response = gql.query(query, variables) if '-' in identifier: # Direct ID lookup db = response['data']['oracleDatabase'] if not db: raise ValueError(f"Database with ID '{identifier}' not found") return db else: # Name lookup databases = response['data']['oracleDatabases']['nodes'] if not databases: raise ValueError(f"No databases found with name '{identifier}'") if len(databases) > 1: print(f"Multiple databases found with name '{identifier}':") for db in databases: host_name = db['logicalPath'][0]['name'] if db['logicalPath'] else 'Unknown' print(f" - {db['dbUniqueName']} (ID: {db['id']}, Host: {host_name})") raise ValueError("Please specify the database ID instead") return databases[0] def format_timestamp(timestamp): """Format ISO timestamp to readable format""" try: dt = datetime.strptime(timestamp.replace('Z', '+0000'), '%Y-%m-%dT%H:%M:%S.%f%z') return dt.strftime('%Y-%m-%d %H:%M:%S UTC') except: return timestamp def list_snapshots(identifier): """List all snapshots for a database using snapshotsListConnection""" try: # Find the database db = find_database_by_name_or_id(identifier) print(f"Database: {db['dbUniqueName']}") print(f"ID: {db['id']}") cluster_name = db['cluster']['name'] if db['cluster'] else 'Unknown' print(f"Cluster: {cluster_name}") host_name = db['logicalPath'][0]['name'] if db['logicalPath'] else 'Unknown' print(f"Host: {host_name}") print("-" * 80) # Initialize auth and GraphQL client auth = RSCAuth() gql = RSCGraphQL(auth) # Query to get all snapshots using snapshotOfASnappableConnection query = """ query SnapshotsListSingleQuery($snappableId: String!, $first: Int, $sortBy: SnapshotQuerySortByField, $sortOrder: SortOrder, $includeOnlySourceSnapshots: Boolean) { snapshotsListConnection: snapshotOfASnappableConnection( workloadId: $snappableId first: $first sortBy: $sortBy sortOrder: $sortOrder includeOnlySourceSnapshots: $includeOnlySourceSnapshots ) { edges { node { __typename id date isOnDemandSnapshot ... on CdmSnapshot { cluster { id name } slaDomain { id name } snapshotRetentionInfo { localInfo { name isExpirationDateCalculated expirationTime } replicationInfos { locationId name } archivalInfos { locationId name } } } ... on PolarisSnapshot { archivalLocationName isReplica isArchivalCopy slaDomain { name ... on ClusterSlaDomain { cluster { id name } } } } } } } } """ # Filter for the specific database variables = { "snappableId": db['id'], "first": 1000, # Get up to 1000 snapshots "sortBy": "CREATION_TIME", "sortOrder": "DESC", "includeOnlySourceSnapshots": False, # Include replicated and archived snapshots } response = gql.query(query, variables) snapshots = response['data']['snapshotsListConnection']['edges'] if not snapshots: print("\nNo snapshots found.") return # Group snapshots by actual location status source_snapshots = [] replica_only_snapshots = [] for edge in snapshots: snap = edge['node'] # Check if snapshot exists locally if snap['snapshotRetentionInfo']['localInfo'] is not None: source_snapshots.append(snap) else: replica_only_snapshots.append(snap) # Display summary print(f"\nSnapshot Summary:") print(f" Source cluster (jp-edge-proxmox): {len(source_snapshots)} snapshots") print(f" Replica only (jp-edge-dr): {len(replica_only_snapshots)} snapshots") print(f" Total: {len(snapshots)} snapshots") print("-" * 80) # Display source snapshots first if source_snapshots: print(f"\nSOURCE CLUSTER SNAPSHOTS ({len(source_snapshots)})") print("=" * 80) table_data = [] for snap in source_snapshots: table_data.append([ snap['id'][:8] + '...', format_timestamp(snap['date']), 'On-Demand' if snap['isOnDemandSnapshot'] else 'Policy', snap['slaDomain']['name'] if snap['slaDomain'] else 'None', 'Local + Replica' if snap['snapshotRetentionInfo']['replicationInfos'] else 'Local only' ]) headers = ['Snapshot ID', 'Date', 'Type', 'SLA Domain', 'Location Status'] print(tabulate(table_data, headers=headers, tablefmt='grid')) # Display replica-only snapshots if replica_only_snapshots: print(f"\nREPLICA-ONLY SNAPSHOTS ({len(replica_only_snapshots)})") print("=" * 80) table_data = [] for snap in replica_only_snapshots: table_data.append([ snap['id'][:8] + '...', format_timestamp(snap['date']), 'On-Demand' if snap['isOnDemandSnapshot'] else 'Policy', snap['slaDomain']['name'] if snap['slaDomain'] else 'None', 'Expired from source' ]) headers = ['Snapshot ID', 'Date', 'Type', 'SLA Domain', 'Location Status'] print(tabulate(table_data, headers=headers, tablefmt='grid')) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def main(): if len(sys.argv) == 2: identifier = sys.argv[1] elif len(sys.argv) == 1: # No argument, find local database auth = RSCAuth() gql = RSCGraphQL(auth) identifier = gql.get_local_database_id() print(f"INFO: Using local database ID: {identifier}") else: print("Usage: python list_db_snapshots.py []") print("If no database is specified, uses the local database.") print("Examples:") print(" python list_db_snapshots.py SCLONE") print(" python list_db_snapshots.py 2cb7e201-9da0-53f2-8c69-8fc21f82e0d2") print(" python list_db_snapshots.py") sys.exit(1) list_snapshots(identifier) if __name__ == "__main__": main()