#!/usr/bin/env python3 """ Rotate the RSA master key for an RCV archival location. Usage: python rotate_rcv_rsa_key.py [--nowait] Arguments: location_name_or_id Name or UUID of the archival location to rekey. If a name is given, it is matched case-insensitively against all targets; an error is raised if ambiguous. rsa_key_file Path to the PEM file containing the new RSA private key, or '-' to read from stdin. Options: --nowait Submit the job and exit immediately without polling. Examples: python rotate_rcv_rsa_key.py "My RCV Location" new_key.pem python rotate_rcv_rsa_key.py --nowait "My RCV Location" new_key.pem python rotate_rcv_rsa_key.py 6088cabc-6a3f-489e-846c-4009238839b7 new_key.pem cat new_key.pem | python rotate_rcv_rsa_key.py "My RCV Location" - """ import sys import os import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from rsc import RSCAuth, RSCGraphQL # ── Queries / mutations ────────────────────────────────────────────────────── QUERY_TARGETS = """ query ArchivalLocationsWithClustersInfoQuery($filter: [TargetFilterInput!]) { targets(filter: $filter) { edges { node { id name isActive status targetType __typename } } } } """ MUTATION_REKEY = """ mutation RekeyMasterKeyWithRsaKeyMutation($input: RekeyArchivalLocationMasterKeyWithRsaKeyInput!) { rekeyArchivalLocationMasterKeyWithRsaKey(input: $input) { jobId taskchainId __typename } } """ QUERY_TASKCHAIN_STATUS = """ query TaskchainStatus($id: UUID!) { activitySeries( input: { activitySeriesId: $id clusterUuid: "00000000-0000-0000-0000-000000000000" } ) { activitySeriesId lastActivityStatus lastActivityType lastUpdated activityConnection(first: 1) { nodes { status message time } } } } """ # Statuses that mean the job has stopped (success or failure) TERMINAL_STATUSES = {"Success", "Failure", "Canceled", "Canceling"} POLL_INTERVAL_SECONDS = 5 POLL_TIMEOUT_SECONDS = 300 # ── Location lookup ────────────────────────────────────────────────────────── def resolve_location_id(gql, name_or_id): """Return a location UUID, resolving from name if needed.""" if len(name_or_id) == 36 and name_or_id.count('-') == 4: return name_or_id response = gql.query(QUERY_TARGETS) targets = [edge['node'] for edge in response['data']['targets']['edges']] needle = name_or_id.lower() # Exact match exact = [t for t in targets if t['name'].lower() == needle] if len(exact) == 1: return exact[0]['id'] if len(exact) > 1: _print_matches(exact) raise ValueError(f"Ambiguous name '{name_or_id}' — use the UUID instead.") # Substring fallback partial = [t for t in targets if needle in t['name'].lower()] if len(partial) == 1: print(f"Matched location: {partial[0]['name']} ({partial[0]['id']})") return partial[0]['id'] if len(partial) > 1: _print_matches(partial) raise ValueError(f"Ambiguous name '{name_or_id}' — use the UUID instead.") print("Available archival locations:", file=sys.stderr) for t in sorted(targets, key=lambda x: x['name']): print(f" {t['name']:<50} {t['id']}", file=sys.stderr) raise ValueError(f"No archival location found matching '{name_or_id}'.") def _print_matches(matches): print("Matched locations:", file=sys.stderr) for m in matches: print(f" {m['name']:<50} {m['id']}", file=sys.stderr) # ── Polling ────────────────────────────────────────────────────────────────── def poll_taskchain(gql, taskchain_id): """Poll until the job reaches a terminal state. Returns True on success.""" print(f"\nPolling for job completion (timeout {POLL_TIMEOUT_SECONDS}s, " f"interval {POLL_INTERVAL_SECONDS}s)...") deadline = time.time() + POLL_TIMEOUT_SECONDS last_status = None while time.time() < deadline: response = gql.query(QUERY_TASKCHAIN_STATUS, {"id": taskchain_id}) series = response['data']['activitySeries'] status = series['lastActivityStatus'] updated = series['lastUpdated'] # Print a line only when status changes if status != last_status: # Pull the latest message if available nodes = series.get('activityConnection', {}).get('nodes', []) message = nodes[0]['message'] if nodes else '' msg_suffix = f" — {message}" if message else '' print(f" [{updated}] {status}{msg_suffix}") last_status = status if status in TERMINAL_STATUSES: if status == "Success": print("\nJob completed successfully.") return True else: nodes = series.get('activityConnection', {}).get('nodes', []) message = nodes[0]['message'] if nodes else '(no message)' print(f"\nJob ended with status '{status}': {message}", file=sys.stderr) return False time.sleep(POLL_INTERVAL_SECONDS) print(f"\nTimed out after {POLL_TIMEOUT_SECONDS}s. " f"Last status: {last_status}", file=sys.stderr) return False # ── Core logic ─────────────────────────────────────────────────────────────── def load_rsa_key(path): if path == '-': return sys.stdin.read().strip() if not os.path.exists(path): raise FileNotFoundError(f"Key file not found: {path}") with open(path, 'r') as f: return f.read().strip() def rotate_key(location_arg, rsa_key, nowait=False): auth = RSCAuth() gql = RSCGraphQL(auth) location_id = resolve_location_id(gql, location_arg) response = gql.query(MUTATION_REKEY, { "input": { "locationId": location_id, "newRsaKey": rsa_key, } }) result = response['data']['rekeyArchivalLocationMasterKeyWithRsaKey'] taskchain_id = result['taskchainId'] print(f"Rekey job submitted.") print(f" Location ID: {location_id}") print(f" Job ID: {result['jobId']}") print(f" Taskchain ID: {taskchain_id}") if nowait: return success = poll_taskchain(gql, taskchain_id) if not success: sys.exit(1) # ── Entry point ────────────────────────────────────────────────────────────── def main(): args = sys.argv[1:] nowait = '--nowait' in args if nowait: args = [a for a in args if a != '--nowait'] if len(args) != 2: print(__doc__) sys.exit(1) location_arg, key_path = args try: rsa_key = load_rsa_key(key_path) rotate_key(location_arg, rsa_key, nowait=nowait) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()