Adding encryption

This commit is contained in:
2026-02-01 05:30:05 -05:00
parent bb35db384f
commit a7aedd3b64
7 changed files with 1214 additions and 57 deletions

View File

@@ -1,10 +1,19 @@
import os
import random
import json
from pathlib import Path
from fastapi import FastAPI
from datetime import datetime, timedelta
from typing import Optional, Tuple
import bcrypt
import hashlib
import hmac
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from typing import List
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
@@ -17,15 +26,133 @@ app.add_middleware(
allow_headers=["*"],
)
# ============ Models ============
class LoginRequest(BaseModel):
pin: str
class LoginResponse(BaseModel):
token: str
message: str
class VideoMetadata(BaseModel):
filename: str
salt: str # hex encoded
iv: str # hex encoded
uploaded_at: str
VIDEOS_DIR = Path(os.getenv("VIDEOS_DIR", "/videos"))
ENCRYPTED_DIR = Path(os.getenv("ENCRYPTED_DIR", "/videos/encrypted"))
METADATA_DIR = Path(os.getenv("METADATA_DIR", "/videos/metadata"))
MARKER_FILE = Path(os.getenv("METADATA_DIR", "/videos/metadata")) / ".pinlock"
VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".avi", ".mkv"}
# Mount videos directory
# Create necessary directories
ENCRYPTED_DIR.mkdir(parents=True, exist_ok=True)
METADATA_DIR.mkdir(parents=True, exist_ok=True)
# Mount videos directory for unencrypted videos (optional)
if VIDEOS_DIR.exists():
app.mount("/videos", StaticFiles(directory=VIDEOS_DIR), name="videos")
# ============ Authentication ============
def get_video_files() -> List[str]:
def hash_pin(pin: str) -> str:
"""Hash PIN with bcrypt."""
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
def verify_pin(pin: str, hashed: str) -> bool:
"""Verify PIN against hash."""
return bcrypt.checkpw(pin.encode(), hashed.encode())
def create_token(pin: str) -> str:
"""Create a simple auth token (in production, use JWT)."""
import base64
import secrets
token = base64.b64encode(
f"{pin}:{secrets.token_hex(16)}:{datetime.utcnow().isoformat()}".encode()
).decode()
return token
def verify_token(token: str) -> bool:
"""Verify token is valid format."""
try:
import base64
decoded = base64.b64decode(token.encode()).decode()
parts = decoded.split(":")
if len(parts) < 3:
return False
return True
except:
return False
# ============ Encryption Functions ============
def derive_key_from_password(password: str, salt: bytes) -> bytes:
"""
Derive encryption key from password using PBKDF2-like key derivation.
Same password + same salt = same key.
Key is never stored - only exists in memory during crypto operations.
"""
# Use hashlib's pbkdf2_hmac for key derivation (no external dependency needed)
key = hashlib.pbkdf2_hmac(
'sha256',
password.encode(),
salt,
iterations=480000 # OWASP recommendation
)
return key
def encrypt_file(file_bytes: bytes, password: str) -> Tuple[bytes, str, str]:
"""
Encrypt file with password-derived key.
Returns: (encrypted_data, salt_hex, iv_hex)
"""
import secrets
salt = secrets.token_bytes(16)
iv = secrets.token_bytes(16)
key = derive_key_from_password(password, salt)
# Add PKCS7 padding
padding_length = 16 - (len(file_bytes) % 16)
padded_data = file_bytes + bytes([padding_length] * padding_length)
# Encrypt with AES-256-CBC
cipher = Cipher(
algorithms.AES(key),
modes.CBC(iv),
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return ciphertext, salt.hex(), iv.hex()
def decrypt_file(ciphertext: bytes, password: str, salt_hex: str, iv_hex: str) -> bytes:
"""
Decrypt file with password-derived key.
Only works with correct password - no backdoor.
"""
salt = bytes.fromhex(salt_hex)
iv = bytes.fromhex(iv_hex)
key = derive_key_from_password(password, salt)
cipher = Cipher(
algorithms.AES(key),
modes.CBC(iv),
)
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# Remove PKCS7 padding
padding_length = padded_plaintext[-1]
plaintext = padded_plaintext[:-padding_length]
return plaintext
def get_video_files() -> list[str]:
"""Get all video files from the videos directory."""
if not VIDEOS_DIR.exists():
return []
@@ -36,16 +163,220 @@ def get_video_files() -> List[str]:
]
return videos
# ============ API Endpoints ============
@app.post("/api/login")
def login(request: LoginRequest):
"""
Authenticate user with PIN.
First login: Create marker file encrypted with PIN
Subsequent logins: Verify PIN by decrypting marker file
"""
pin = request.pin
# Validate PIN format (6 digits)
if not pin.isdigit() or len(pin) != 6:
raise HTTPException(status_code=400, detail="PIN must be 6 digits")
if MARKER_FILE.exists():
# Existing system - verify PIN by decrypting marker file
try:
with open(MARKER_FILE, "rb") as f:
marker_data = json.load(f)
# Try to decrypt the marker with the provided PIN
decrypted = decrypt_file(
bytes.fromhex(marker_data["data"]),
pin,
marker_data["salt"],
marker_data["iv"]
)
# If we get here, PIN is correct
token = create_token(pin)
return LoginResponse(
token=token,
message="Login successful"
)
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid PIN")
else:
# First login - create marker file with this PIN
marker_content = b"PIN_MARKER"
encrypted_data, salt_hex, iv_hex = encrypt_file(marker_content, pin)
marker_data = {
"data": encrypted_data.hex(),
"salt": salt_hex,
"iv": iv_hex,
"created_at": datetime.utcnow().isoformat()
}
with open(MARKER_FILE, "w") as f:
json.dump(marker_data, f)
token = create_token(pin)
return LoginResponse(
token=token,
message="PIN initialized successfully"
)
def verify_auth(authorization: Optional[str] = Header(None)) -> str:
"""Dependency to verify auth token and extract PIN."""
if not authorization:
raise HTTPException(status_code=401, detail="Missing authorization token")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid token format")
token = authorization.replace("Bearer ", "")
if not verify_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
# Extract PIN from token
try:
import base64
decoded = base64.b64decode(token.encode()).decode()
pin = decoded.split(":")[0]
return pin
except:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/api/upload")
async def upload_file(
file: UploadFile = File(...),
token: str = Depends(verify_auth)
):
"""
Upload and encrypt a file.
File is encrypted with the authenticated user's PIN - no key stored on server.
"""
try:
pin = token # token is actually the PIN extracted by verify_auth
# Read file content
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="Empty file")
# Encrypt with password-derived key
encrypted_data, salt_hex, iv_hex = encrypt_file(content, pin)
# Generate unique filename
import secrets
encrypted_filename = f"{secrets.token_hex(8)}.enc"
encrypted_path = ENCRYPTED_DIR / encrypted_filename
# Save encrypted file
with open(encrypted_path, "wb") as f:
f.write(encrypted_data)
# Save metadata (salt and IV are non-secret, needed for decryption)
metadata = VideoMetadata(
filename=file.filename or "unknown",
salt=salt_hex,
iv=iv_hex,
uploaded_at=datetime.utcnow().isoformat()
)
metadata_path = METADATA_DIR / f"{encrypted_filename}.json"
with open(metadata_path, "w") as f:
json.dump(metadata.dict(), f)
return {
"success": True,
"encrypted_id": encrypted_filename,
"original_filename": file.filename,
"message": "File encrypted and stored securely"
}
except HTTPException:
raise
except Exception as e:
print(f"Upload error: {e}")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.get("/api/encrypted-videos")
def list_encrypted_videos(token: str = Depends(verify_auth)):
"""List all encrypted videos (metadata only - no decryption)."""
try:
videos = []
for metadata_file in METADATA_DIR.glob("*.json"):
with open(metadata_file, "r") as f:
metadata = json.load(f)
encrypted_id = metadata_file.stem.replace(".enc", "")
videos.append({
"id": encrypted_id,
"filename": metadata["filename"],
"uploaded_at": metadata["uploaded_at"]
})
return sorted(videos, key=lambda x: x["uploaded_at"], reverse=True)
except Exception as e:
print(f"List error: {e}")
raise HTTPException(status_code=500, detail="Failed to list videos")
@app.get("/api/decrypt-video/{encrypted_id}")
async def decrypt_video(
encrypted_id: str,
token: str = Depends(verify_auth)
):
"""
Decrypt and stream video file.
Only works with correct PIN (extracted from token) - key is derived, never stored.
"""
try:
pin = token # PIN extracted from token by verify_auth
encrypted_path = ENCRYPTED_DIR / f"{encrypted_id}.enc"
metadata_path = METADATA_DIR / f"{encrypted_id}.enc.json"
if not encrypted_path.exists() or not metadata_path.exists():
raise HTTPException(status_code=404, detail="Video not found")
# Load metadata
with open(metadata_path, "r") as f:
metadata = json.load(f)
# Read encrypted data
with open(encrypted_path, "rb") as f:
ciphertext = f.read()
# Decrypt with PIN-derived key
plaintext = decrypt_file(
ciphertext,
pin,
metadata["salt"],
metadata["iv"]
)
# Stream the decrypted file
def iterfile():
yield plaintext
return StreamingResponse(
iterfile(),
media_type="video/mp4",
headers={"Content-Disposition": f"inline; filename={metadata['filename']}"}
)
except HTTPException:
raise
except Exception as e:
print(f"Decryption error: {e}")
raise HTTPException(status_code=500, detail="Decryption failed")
@app.get("/api/health")
def health_check():
"""Health check endpoint."""
return {"status": "ok"}
@app.get("/api/videos")
def get_videos(limit: int = 10):
"""Get a list of random videos."""
"""Get a list of random videos (public endpoint for unencrypted videos)."""
try:
videos = get_video_files()

View File

@@ -2,3 +2,7 @@ fastapi==0.104.1
uvicorn==0.24.0
python-multipart==0.0.6
python-dotenv==1.0.0
cryptography==41.0.7
bcrypt==4.1.1
python-jose==3.3.0
pydantic==2.5.0