81 lines
2.8 KiB
Python
81 lines
2.8 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import requests
|
|
|
|
class RSCAuth:
|
|
def __init__(self, config_file='rsc.json'):
|
|
self.config_file = config_file
|
|
self.load_config()
|
|
self.token = None
|
|
self.token_expiration = None
|
|
|
|
def load_config(self):
|
|
if not os.path.exists(self.config_file):
|
|
raise FileNotFoundError(f"Configuration file {self.config_file} not found")
|
|
|
|
with open(self.config_file, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
self.client_id = config.get('client_id')
|
|
self.client_secret = config.get('client_secret')
|
|
self.access_token_uri = config.get('access_token_uri')
|
|
|
|
if not all([self.client_id, self.client_secret, self.access_token_uri]):
|
|
raise ValueError("Missing required fields in config: client_id, client_secret, access_token_uri")
|
|
|
|
# Derive host from access_token_uri
|
|
self.host = self.access_token_uri.replace('https://', '').replace('/api/client_token', '')
|
|
|
|
def get_token(self):
|
|
# Check if we have a cached token
|
|
cache_file = self._get_cache_file()
|
|
if os.path.exists(cache_file):
|
|
with open(cache_file, 'r') as f:
|
|
expiration, token = f.read().strip().split(' ', 1)
|
|
expiration = int(expiration)
|
|
if time.time() < expiration - 1800: # Refresh 30 min before expiry
|
|
self.token = token
|
|
self.token_expiration = expiration
|
|
return token
|
|
|
|
# Get new token
|
|
return self._fetch_token()
|
|
|
|
def _fetch_token(self):
|
|
payload = {
|
|
'client_id': self.client_id,
|
|
'client_secret': self.client_secret
|
|
}
|
|
headers = {'accept': 'application/json', 'Content-Type': 'application/json'}
|
|
|
|
response = requests.post(self.access_token_uri, json=payload, headers=headers)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
self.token = data['access_token']
|
|
expires_in = data['expires_in']
|
|
self.token_expiration = int(time.time()) + expires_in
|
|
|
|
# Cache the token
|
|
cache_file = self._get_cache_file()
|
|
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
|
|
with open(cache_file, 'w') as f:
|
|
f.write(f"{self.token_expiration} {self.token}")
|
|
os.chmod(cache_file, 0o600)
|
|
|
|
return self.token
|
|
|
|
def _get_cache_file(self):
|
|
# Use the id part after 'client|'
|
|
if '|' in self.client_id:
|
|
id_part = self.client_id.split('|')[1]
|
|
else:
|
|
id_part = self.client_id
|
|
return os.path.expanduser(f"~/.rbkRscsession.{id_part}")
|
|
|
|
def get_headers(self):
|
|
return {
|
|
'Authorization': f'Bearer {self.get_token()}',
|
|
'Content-Type': 'application/json'
|
|
} |