Initial commit

This commit is contained in:
2026-06-11 09:39:37 +01:00
commit ca92551217
4 changed files with 452 additions and 0 deletions
+24
View File
@@ -0,0 +1,24 @@
# Python
__pycache__/
*.py[cod]
*.pyo
# Virtual environment
bin/
lib/
include/
pyvenv.cfg
share/
# RSC credentials and token cache
rsc.json
~/.rbkRscsession.*
# RSA keys
*.pem
*.key
SCRATCH
# OS
.DS_Store
+235
View File
@@ -0,0 +1,235 @@
#!/usr/bin/env python3
"""
Rotate the RSA master key for an RCV archival location.
Usage:
python rotate_rcv_rsa_key.py [--nowait] <location_name_or_id> <rsa_key_file>
Arguments:
location_name_or_id Name or UUID of the archival location to rekey.
If a name is given, it is matched case-insensitively
against all targets; an error is raised if ambiguous.
rsa_key_file Path to the PEM file containing the new RSA private key,
or '-' to read from stdin.
Options:
--nowait Submit the job and exit immediately without polling.
Examples:
python rotate_rcv_rsa_key.py "My RCV Location" new_key.pem
python rotate_rcv_rsa_key.py --nowait "My RCV Location" new_key.pem
python rotate_rcv_rsa_key.py 6088cabc-6a3f-489e-846c-4009238839b7 new_key.pem
cat new_key.pem | python rotate_rcv_rsa_key.py "My RCV Location" -
"""
import sys
import os
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from rsc import RSCAuth, RSCGraphQL
# ── Queries / mutations ──────────────────────────────────────────────────────
QUERY_TARGETS = """
query ArchivalLocationsWithClustersInfoQuery($filter: [TargetFilterInput!]) {
targets(filter: $filter) {
edges {
node {
id
name
isActive
status
targetType
__typename
}
}
}
}
"""
MUTATION_REKEY = """
mutation RekeyMasterKeyWithRsaKeyMutation($input: RekeyArchivalLocationMasterKeyWithRsaKeyInput!) {
rekeyArchivalLocationMasterKeyWithRsaKey(input: $input) {
jobId
taskchainId
__typename
}
}
"""
QUERY_TASKCHAIN_STATUS = """
query TaskchainStatus($id: UUID!) {
activitySeries(
input: {
activitySeriesId: $id
clusterUuid: "00000000-0000-0000-0000-000000000000"
}
) {
activitySeriesId
lastActivityStatus
lastActivityType
lastUpdated
activityConnection(first: 1) {
nodes {
status
message
time
}
}
}
}
"""
# Statuses that mean the job has stopped (success or failure)
TERMINAL_STATUSES = {"Success", "Failure", "Canceled", "Canceling"}
POLL_INTERVAL_SECONDS = 5
POLL_TIMEOUT_SECONDS = 300
# ── Location lookup ──────────────────────────────────────────────────────────
def resolve_location_id(gql, name_or_id):
"""Return a location UUID, resolving from name if needed."""
if len(name_or_id) == 36 and name_or_id.count('-') == 4:
return name_or_id
response = gql.query(QUERY_TARGETS)
targets = [edge['node'] for edge in response['data']['targets']['edges']]
needle = name_or_id.lower()
# Exact match
exact = [t for t in targets if t['name'].lower() == needle]
if len(exact) == 1:
return exact[0]['id']
if len(exact) > 1:
_print_matches(exact)
raise ValueError(f"Ambiguous name '{name_or_id}' — use the UUID instead.")
# Substring fallback
partial = [t for t in targets if needle in t['name'].lower()]
if len(partial) == 1:
print(f"Matched location: {partial[0]['name']} ({partial[0]['id']})")
return partial[0]['id']
if len(partial) > 1:
_print_matches(partial)
raise ValueError(f"Ambiguous name '{name_or_id}' — use the UUID instead.")
print("Available archival locations:", file=sys.stderr)
for t in sorted(targets, key=lambda x: x['name']):
print(f" {t['name']:<50} {t['id']}", file=sys.stderr)
raise ValueError(f"No archival location found matching '{name_or_id}'.")
def _print_matches(matches):
print("Matched locations:", file=sys.stderr)
for m in matches:
print(f" {m['name']:<50} {m['id']}", file=sys.stderr)
# ── Polling ──────────────────────────────────────────────────────────────────
def poll_taskchain(gql, taskchain_id):
"""Poll until the job reaches a terminal state. Returns True on success."""
print(f"\nPolling for job completion (timeout {POLL_TIMEOUT_SECONDS}s, "
f"interval {POLL_INTERVAL_SECONDS}s)...")
deadline = time.time() + POLL_TIMEOUT_SECONDS
last_status = None
while time.time() < deadline:
response = gql.query(QUERY_TASKCHAIN_STATUS, {"id": taskchain_id})
series = response['data']['activitySeries']
status = series['lastActivityStatus']
updated = series['lastUpdated']
# Print a line only when status changes
if status != last_status:
# Pull the latest message if available
nodes = series.get('activityConnection', {}).get('nodes', [])
message = nodes[0]['message'] if nodes else ''
msg_suffix = f"{message}" if message else ''
print(f" [{updated}] {status}{msg_suffix}")
last_status = status
if status in TERMINAL_STATUSES:
if status == "Success":
print("\nJob completed successfully.")
return True
else:
nodes = series.get('activityConnection', {}).get('nodes', [])
message = nodes[0]['message'] if nodes else '(no message)'
print(f"\nJob ended with status '{status}': {message}", file=sys.stderr)
return False
time.sleep(POLL_INTERVAL_SECONDS)
print(f"\nTimed out after {POLL_TIMEOUT_SECONDS}s. "
f"Last status: {last_status}", file=sys.stderr)
return False
# ── Core logic ───────────────────────────────────────────────────────────────
def load_rsa_key(path):
if path == '-':
return sys.stdin.read().strip()
if not os.path.exists(path):
raise FileNotFoundError(f"Key file not found: {path}")
with open(path, 'r') as f:
return f.read().strip()
def rotate_key(location_arg, rsa_key, nowait=False):
auth = RSCAuth()
gql = RSCGraphQL(auth)
location_id = resolve_location_id(gql, location_arg)
response = gql.query(MUTATION_REKEY, {
"input": {
"locationId": location_id,
"newRsaKey": rsa_key,
}
})
result = response['data']['rekeyArchivalLocationMasterKeyWithRsaKey']
taskchain_id = result['taskchainId']
print(f"Rekey job submitted.")
print(f" Location ID: {location_id}")
print(f" Job ID: {result['jobId']}")
print(f" Taskchain ID: {taskchain_id}")
if nowait:
return
success = poll_taskchain(gql, taskchain_id)
if not success:
sys.exit(1)
# ── Entry point ──────────────────────────────────────────────────────────────
def main():
args = sys.argv[1:]
nowait = '--nowait' in args
if nowait:
args = [a for a in args if a != '--nowait']
if len(args) != 2:
print(__doc__)
sys.exit(1)
location_arg, key_path = args
try:
rsa_key = load_rsa_key(key_path)
rotate_key(location_arg, rsa_key, nowait=nowait)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+6
View File
@@ -0,0 +1,6 @@
{
"client_id": "client|your-client-id-here",
"client_secret": "your-client-secret-here",
"name": "Your RSC Service Account Name",
"access_token_uri": "https://your-organization.my.rubrik.com/api/client_token"
}
+187
View File
@@ -0,0 +1,187 @@
import json
import os
import time
import requests
class RSCAuth:
def __init__(self, config_file='rsc.json'):
self.config_file = config_file
self.load_config()
self.token = None
self.token_expiration = None
def load_config(self):
if not os.path.exists(self.config_file):
raise FileNotFoundError(f"Configuration file {self.config_file} not found")
with open(self.config_file, 'r') as f:
config = json.load(f)
self.client_id = config.get('client_id')
self.client_secret = config.get('client_secret')
self.access_token_uri = config.get('access_token_uri')
if not all([self.client_id, self.client_secret, self.access_token_uri]):
raise ValueError("Missing required fields in config: client_id, client_secret, access_token_uri")
# Derive host from access_token_uri
self.host = self.access_token_uri.replace('https://', '').replace('/api/client_token', '')
def get_token(self):
# Check if we have a cached token
cache_file = self._get_cache_file()
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
expiration, token = f.read().strip().split(' ', 1)
expiration = int(expiration)
if time.time() < expiration - 1800: # Refresh 30 min before expiry
self.token = token
self.token_expiration = expiration
return token
# Get new token
return self._fetch_token()
def _fetch_token(self):
payload = {
'client_id': self.client_id,
'client_secret': self.client_secret
}
headers = {'accept': 'application/json', 'Content-Type': 'application/json'}
response = requests.post(self.access_token_uri, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
self.token = data['access_token']
expires_in = data['expires_in']
self.token_expiration = int(time.time()) + expires_in
# Cache the token
cache_file = self._get_cache_file()
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
with open(cache_file, 'w') as f:
f.write(f"{self.token_expiration} {self.token}")
os.chmod(cache_file, 0o600)
return self.token
def _get_cache_file(self):
# Use the id part after 'client|'
if '|' in self.client_id:
id_part = self.client_id.split('|')[1]
else:
id_part = self.client_id
return os.path.expanduser(f"~/.rbkRscsession.{id_part}")
def get_headers(self):
return {
'Authorization': f'Bearer {self.get_token()}',
'Content-Type': 'application/json'
}
class RSCGraphQL:
def __init__(self, auth):
self.auth = auth
self.endpoint = f"https://{self.auth.host}/api/graphql"
def query(self, query, variables=None):
payload = {'query': query}
if variables:
payload['variables'] = variables
headers = self.auth.get_headers()
response = requests.post(self.endpoint, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
# Check for GraphQL errors
if 'errors' in data:
raise Exception(f"GraphQL errors: {data['errors']}")
return data
def introspect_schema(self):
"""Introspect the GraphQL schema to get type information"""
introspection_query = """
query IntrospectionQuery {
__schema {
types {
name
kind
description
fields(includeDeprecated: true) {
name
description
type {
name
kind
ofType {
name
kind
}
}
args {
name
description
type {
name
kind
ofType {
name
kind
}
}
}
}
}
}
}
"""
return self.query(introspection_query)
def get_type_info(self, type_name):
"""Get detailed information about a specific GraphQL type"""
query = """
query GetTypeInfo($typeName: String!) {
__type(name: $typeName) {
name
kind
description
fields(includeDeprecated: true) {
name
description
type {
name
kind
ofType {
name
kind
ofType {
name
kind
}
}
}
args {
name
description
type {
name
kind
ofType {
name
kind
ofType {
name
kind
}
}
}
}
}
}
}
"""
return self.query(query, {"typeName": type_name})