Files
2026-06-11 09:39:37 +01:00

236 lines
7.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Rotate the RSA master key for an RCV archival location.
Usage:
python rotate_rcv_rsa_key.py [--nowait] <location_name_or_id> <rsa_key_file>
Arguments:
location_name_or_id Name or UUID of the archival location to rekey.
If a name is given, it is matched case-insensitively
against all targets; an error is raised if ambiguous.
rsa_key_file Path to the PEM file containing the new RSA private key,
or '-' to read from stdin.
Options:
--nowait Submit the job and exit immediately without polling.
Examples:
python rotate_rcv_rsa_key.py "My RCV Location" new_key.pem
python rotate_rcv_rsa_key.py --nowait "My RCV Location" new_key.pem
python rotate_rcv_rsa_key.py 6088cabc-6a3f-489e-846c-4009238839b7 new_key.pem
cat new_key.pem | python rotate_rcv_rsa_key.py "My RCV Location" -
"""
import sys
import os
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from rsc import RSCAuth, RSCGraphQL
# ── Queries / mutations ──────────────────────────────────────────────────────
QUERY_TARGETS = """
query ArchivalLocationsWithClustersInfoQuery($filter: [TargetFilterInput!]) {
targets(filter: $filter) {
edges {
node {
id
name
isActive
status
targetType
__typename
}
}
}
}
"""
MUTATION_REKEY = """
mutation RekeyMasterKeyWithRsaKeyMutation($input: RekeyArchivalLocationMasterKeyWithRsaKeyInput!) {
rekeyArchivalLocationMasterKeyWithRsaKey(input: $input) {
jobId
taskchainId
__typename
}
}
"""
QUERY_TASKCHAIN_STATUS = """
query TaskchainStatus($id: UUID!) {
activitySeries(
input: {
activitySeriesId: $id
clusterUuid: "00000000-0000-0000-0000-000000000000"
}
) {
activitySeriesId
lastActivityStatus
lastActivityType
lastUpdated
activityConnection(first: 1) {
nodes {
status
message
time
}
}
}
}
"""
# Statuses that mean the job has stopped (success or failure)
TERMINAL_STATUSES = {"Success", "Failure", "Canceled", "Canceling"}
POLL_INTERVAL_SECONDS = 5
POLL_TIMEOUT_SECONDS = 300
# ── Location lookup ──────────────────────────────────────────────────────────
def resolve_location_id(gql, name_or_id):
"""Return a location UUID, resolving from name if needed."""
if len(name_or_id) == 36 and name_or_id.count('-') == 4:
return name_or_id
response = gql.query(QUERY_TARGETS)
targets = [edge['node'] for edge in response['data']['targets']['edges']]
needle = name_or_id.lower()
# Exact match
exact = [t for t in targets if t['name'].lower() == needle]
if len(exact) == 1:
return exact[0]['id']
if len(exact) > 1:
_print_matches(exact)
raise ValueError(f"Ambiguous name '{name_or_id}' — use the UUID instead.")
# Substring fallback
partial = [t for t in targets if needle in t['name'].lower()]
if len(partial) == 1:
print(f"Matched location: {partial[0]['name']} ({partial[0]['id']})")
return partial[0]['id']
if len(partial) > 1:
_print_matches(partial)
raise ValueError(f"Ambiguous name '{name_or_id}' — use the UUID instead.")
print("Available archival locations:", file=sys.stderr)
for t in sorted(targets, key=lambda x: x['name']):
print(f" {t['name']:<50} {t['id']}", file=sys.stderr)
raise ValueError(f"No archival location found matching '{name_or_id}'.")
def _print_matches(matches):
print("Matched locations:", file=sys.stderr)
for m in matches:
print(f" {m['name']:<50} {m['id']}", file=sys.stderr)
# ── Polling ──────────────────────────────────────────────────────────────────
def poll_taskchain(gql, taskchain_id):
"""Poll until the job reaches a terminal state. Returns True on success."""
print(f"\nPolling for job completion (timeout {POLL_TIMEOUT_SECONDS}s, "
f"interval {POLL_INTERVAL_SECONDS}s)...")
deadline = time.time() + POLL_TIMEOUT_SECONDS
last_status = None
while time.time() < deadline:
response = gql.query(QUERY_TASKCHAIN_STATUS, {"id": taskchain_id})
series = response['data']['activitySeries']
status = series['lastActivityStatus']
updated = series['lastUpdated']
# Print a line only when status changes
if status != last_status:
# Pull the latest message if available
nodes = series.get('activityConnection', {}).get('nodes', [])
message = nodes[0]['message'] if nodes else ''
msg_suffix = f" — {message}" if message else ''
print(f" [{updated}] {status}{msg_suffix}")
last_status = status
if status in TERMINAL_STATUSES:
if status == "Success":
print("\nJob completed successfully.")
return True
else:
nodes = series.get('activityConnection', {}).get('nodes', [])
message = nodes[0]['message'] if nodes else '(no message)'
print(f"\nJob ended with status '{status}': {message}", file=sys.stderr)
return False
time.sleep(POLL_INTERVAL_SECONDS)
print(f"\nTimed out after {POLL_TIMEOUT_SECONDS}s. "
f"Last status: {last_status}", file=sys.stderr)
return False
# ── Core logic ───────────────────────────────────────────────────────────────
def load_rsa_key(path):
if path == '-':
return sys.stdin.read().strip()
if not os.path.exists(path):
raise FileNotFoundError(f"Key file not found: {path}")
with open(path, 'r') as f:
return f.read().strip()
def rotate_key(location_arg, rsa_key, nowait=False):
auth = RSCAuth()
gql = RSCGraphQL(auth)
location_id = resolve_location_id(gql, location_arg)
response = gql.query(MUTATION_REKEY, {
"input": {
"locationId": location_id,
"newRsaKey": rsa_key,
}
})
result = response['data']['rekeyArchivalLocationMasterKeyWithRsaKey']
taskchain_id = result['taskchainId']
print(f"Rekey job submitted.")
print(f" Location ID: {location_id}")
print(f" Job ID: {result['jobId']}")
print(f" Taskchain ID: {taskchain_id}")
if nowait:
return
success = poll_taskchain(gql, taskchain_id)
if not success:
sys.exit(1)
# ── Entry point ──────────────────────────────────────────────────────────────
def main():
args = sys.argv[1:]
nowait = '--nowait' in args
if nowait:
args = [a for a in args if a != '--nowait']
if len(args) != 2:
print(__doc__)
sys.exit(1)
location_arg, key_path = args
try:
rsa_key = load_rsa_key(key_path)
rotate_key(location_arg, rsa_key, nowait=nowait)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()