#!/usr/bin/env python3 import json import sys import os from datetime import datetime from rsc import RSCAuth, RSCGraphQL from tabulate import tabulate def find_database_by_name_or_id(identifier): """Find database by name or ID and return its details""" auth = RSCAuth() gql = RSCGraphQL(auth) # Check if identifier looks like a UUID (contains hyphens) if '-' in identifier: # It's likely a database ID query = """ query OracleDatabase($fid: UUID!) { oracleDatabase(fid: $fid) { dbUniqueName id cluster { id name } logicalPath { fid name objectType } } } """ variables = {"fid": identifier} else: # It's a database name query = """ query OracleDatabases($filter: [Filter!]) { oracleDatabases(filter: $filter) { nodes { dbUniqueName id cluster { id name } logicalPath { fid name objectType } } } } """ variables = { "filter": [ {"texts": [identifier], "field": "NAME_EXACT_MATCH"}, {"texts": ["false"], "field": "IS_RELIC"}, {"texts": ["false"], "field": "IS_REPLICATED"} ] } response = gql.query(query, variables) if '-' in identifier: # Direct ID lookup db = response['data']['oracleDatabase'] if not db: raise ValueError(f"Database with ID '{identifier}' not found") return db else: # Name lookup databases = response['data']['oracleDatabases']['nodes'] if not databases: raise ValueError(f"No databases found with name '{identifier}'") if len(databases) > 1: print(f"Multiple databases found with name '{identifier}':") for db in databases: host_name = db['logicalPath'][0]['name'] if db['logicalPath'] else 'Unknown' print(f" - {db['dbUniqueName']} (ID: {db['id']}, Host: {host_name})") raise ValueError("Please specify the database ID instead") return databases[0] def get_recoverable_ranges(db_id): """Get recoverable ranges for a database""" auth = RSCAuth() gql = RSCGraphQL(auth) query = """ query OracleDatabaseRecoverableRangesQuery($fid: String!) { oracleRecoverableRanges( input: {id: $fid, shouldIncludeDbSnapshotSummaries: true} ) { data { beginTime endTime status dbSnapshotSummaries { databaseName isValid hostOrRacName baseSnapshotSummary { id date isOnDemandSnapshot replicationLocationIds archivalLocationIds } } __typename } __typename } oracleMissedRecoverableRanges(input: {id: $fid}) { data { beginTime endTime __typename } __typename } } """ variables = {"fid": db_id} response = gql.query(query, variables) return response['data'] def get_cluster_name(cluster_id): """Get cluster name by ID""" if not cluster_id: return "Unknown" # Cache cluster names to avoid repeated queries if not hasattr(get_cluster_name, '_cache'): get_cluster_name._cache = {} if cluster_id in get_cluster_name._cache: return get_cluster_name._cache[cluster_id] try: auth = RSCAuth() gql = RSCGraphQL(auth) query = """ query ListClusters { allClusterConnection { nodes { id name } } } """ response = gql.query(query) clusters = response['data']['allClusterConnection']['nodes'] # Find the cluster with matching ID for cluster in clusters: if cluster['id'] == cluster_id: name = cluster['name'] get_cluster_name._cache[cluster_id] = name return name # If not found, return fallback return f"Cluster-{cluster_id[:8]}" except: return f"Cluster-{cluster_id[:8]}" def get_location_name(location_type, location_ids): """Get human-readable location names""" if not location_ids: return [] names = [] for loc_id in location_ids: if location_type == "replication": name = get_cluster_name(loc_id) names.append(f"Replicated: {name}") elif location_type == "archival": # For archival, we might not have cluster info, so use a generic name names.append(f"Archive: {loc_id[:8]}...") else: names.append(f"{location_type}: {loc_id[:8]}...") return names def format_timestamp(timestamp): """Format ISO timestamp to readable format""" try: dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) return dt.strftime('%Y-%m-%d %H:%M:%S UTC') except: return timestamp def list_snapshots_and_ranges(identifier): """List snapshots and recovery ranges for a database""" try: # Find the database db = find_database_by_name_or_id(identifier) print(f"Database: {db['dbUniqueName']}") print(f"ID: {db['id']}") cluster_name = db['cluster']['name'] if db['cluster'] else 'Unknown' print(f"Cluster: {cluster_name}") host_name = db['logicalPath'][0]['name'] if db['logicalPath'] else 'Unknown' print(f"Host: {host_name}") print("-" * 80) # Get recoverable ranges ranges_data = get_recoverable_ranges(db['id']) # Display recoverable ranges recoverable_ranges = ranges_data.get('oracleRecoverableRanges', {}).get('data', []) if recoverable_ranges: print(f"\nRecoverable Ranges ({len(recoverable_ranges)} found):") table_data = [] for range_item in recoverable_ranges: table_data.append([ format_timestamp(range_item['beginTime']), format_timestamp(range_item['endTime']), range_item.get('status', 'Unknown') ]) headers = ['Begin Time', 'End Time', 'Status'] print(tabulate(table_data, headers=headers, tablefmt='grid')) # Display snapshots by location print(f"\nSnapshot Distribution:") location_snapshots = {} for range_item in recoverable_ranges: snapshots = range_item.get('dbSnapshotSummaries', []) for snapshot in snapshots: base = snapshot.get('baseSnapshotSummary', {}) # Group by replication locations repl_locations = base.get('replicationLocationIds', []) arch_locations = base.get('archivalLocationIds', []) # Add to local (assuming no replication/archival IDs means local) if not repl_locations and not arch_locations: loc_name = "Local (Source)" if loc_name not in location_snapshots: location_snapshots[loc_name] = [] location_snapshots[loc_name].append({ 'id': base.get('id'), 'date': base.get('date'), 'isOnDemand': base.get('isOnDemandSnapshot', False), 'host': snapshot.get('hostOrRacName', 'Unknown') }) else: # Handle replication locations for repl_id in repl_locations: cluster_name = get_cluster_name(repl_id) loc_name = f"Replicated: {cluster_name}" if loc_name not in location_snapshots: location_snapshots[loc_name] = [] location_snapshots[loc_name].append({ 'id': base.get('id'), 'date': base.get('date'), 'isOnDemand': base.get('isOnDemandSnapshot', False), 'host': snapshot.get('hostOrRacName', 'Unknown') }) # Handle archival locations for arch_id in arch_locations: loc_name = f"Archived: {arch_id[:8]}..." if loc_name not in location_snapshots: location_snapshots[loc_name] = [] location_snapshots[loc_name].append({ 'id': base.get('id'), 'date': base.get('date'), 'isOnDemand': base.get('isOnDemandSnapshot', False), 'host': snapshot.get('hostOrRacName', 'Unknown') }) # Display snapshots by location for location, snapshots in location_snapshots.items(): print(f"\nšŸ“ {location} ({len(snapshots)} snapshots):") table_data = [] for snap in sorted(snapshots, key=lambda x: x['date']): table_data.append([ snap['id'][:8] + '...', # Truncate ID for readability format_timestamp(snap['date']), 'On-Demand' if snap['isOnDemand'] else 'Policy', snap['host'] ]) headers = ['Snapshot ID', 'Date', 'Type', 'Host'] print(tabulate(table_data, headers=headers, tablefmt='grid')) else: print("\nNo recoverable ranges found.") # Display missed recoverable ranges missed_ranges = ranges_data.get('oracleMissedRecoverableRanges', {}).get('data', []) if missed_ranges: print(f"\nMissed Recoverable Ranges ({len(missed_ranges)} found):") table_data = [] for range_item in missed_ranges: table_data.append([ format_timestamp(range_item['beginTime']), format_timestamp(range_item['endTime']) ]) headers = ['Begin Time', 'End Time'] print(tabulate(table_data, headers=headers, tablefmt='grid')) else: print("\nNo missed recoverable ranges found.") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def main(): if len(sys.argv) != 2: print("Usage: python list_db_snapshots.py ") print("Examples:") print(" python list_db_snapshots.py SCLONE") print(" python list_db_snapshots.py 2cb7e201-9da0-53f2-8c69-8fc21f82e0d2") sys.exit(1) identifier = sys.argv[1] list_snapshots_and_ranges(identifier) if __name__ == "__main__": main()