187 lines
5.8 KiB
Python
187 lines
5.8 KiB
Python
import asyncio
|
|
import json
|
|
from types import SimpleNamespace
|
|
|
|
from app.api.v1 import esp
|
|
from app.core.security import get_machine_token_hash, get_password_hash, verify_machine_token
|
|
from app.models.models import EspReaderProvisioningStatus
|
|
from app.schemas import EspReaderRegistrationRequest
|
|
|
|
|
|
class _FakeDb:
|
|
def __init__(self, reader=None) -> None:
|
|
self.reader = reader
|
|
|
|
class _Query:
|
|
def __init__(self, reader) -> None:
|
|
self.reader = reader
|
|
|
|
def filter(self, *args, **kwargs):
|
|
return self
|
|
|
|
def first(self):
|
|
return self.reader
|
|
|
|
def commit(self) -> None:
|
|
return None
|
|
|
|
def add(self, _obj) -> None:
|
|
return None
|
|
|
|
def refresh(self, _obj) -> None:
|
|
return None
|
|
|
|
def query(self, _model):
|
|
return self._Query(self.reader)
|
|
|
|
|
|
def test_provisioning_status_returns_api_key_for_enum_status(monkeypatch) -> None:
|
|
reader = SimpleNamespace(
|
|
device_id="esp32-123456",
|
|
provisioning_status=EspReaderProvisioningStatus.APPROVED,
|
|
pending_api_key=None,
|
|
api_key_hash=None,
|
|
provisioned_at=None,
|
|
updated_at=None,
|
|
)
|
|
|
|
monkeypatch.setattr(esp, "_get_reader_by_registration_token", lambda *args, **kwargs: reader)
|
|
monkeypatch.setattr(esp, "_new_api_key", lambda: "generated-api-key")
|
|
|
|
response = asyncio.run(
|
|
esp.get_provisioning_status(
|
|
x_esp_device_id="esp32-123456",
|
|
x_esp_registration_token="token",
|
|
db=_FakeDb(),
|
|
)
|
|
)
|
|
payload = json.loads(response.body)
|
|
|
|
assert payload["provisioning_status"] == EspReaderProvisioningStatus.PROVISIONED.value
|
|
assert payload["api_key"] == "generated-api-key"
|
|
assert payload["apiKey"] == "generated-api-key"
|
|
|
|
|
|
def test_register_reader_allows_recovery_before_first_authenticated_call(monkeypatch) -> None:
|
|
reader = SimpleNamespace(
|
|
device_id="esp32-123456",
|
|
name="Old Reader",
|
|
location="Old Location",
|
|
reader_type="checkin_checkout",
|
|
can_write_cards=False,
|
|
firmware_version="old-fw",
|
|
notes="old",
|
|
registration_token_hash="old-hash",
|
|
provisioning_status=EspReaderProvisioningStatus.PROVISIONED,
|
|
is_active=True,
|
|
pending_api_key="pending-api-key",
|
|
last_seen_at=None,
|
|
updated_at=None,
|
|
)
|
|
db = _FakeDb(reader)
|
|
|
|
monkeypatch.setattr(esp, "_new_registration_token", lambda: "replacement-token")
|
|
monkeypatch.setattr(esp, "get_machine_token_hash", lambda value: f"hashed:{value}")
|
|
|
|
response = asyncio.run(
|
|
esp.register_reader(
|
|
EspReaderRegistrationRequest(
|
|
device_id="esp32-123456",
|
|
name="Recovered Reader",
|
|
location="Front Desk",
|
|
reader_type="checkin_checkout",
|
|
can_write_cards=True,
|
|
firmware_version="new-fw",
|
|
notes="recovered",
|
|
),
|
|
db=db,
|
|
)
|
|
)
|
|
|
|
assert response["provisioning_status"] == EspReaderProvisioningStatus.APPROVED
|
|
assert response["registration_token"] == "replacement-token"
|
|
assert response["message"] == "Reader recovery accepted. Poll provisioning to receive the API key again."
|
|
assert reader.registration_token_hash == "hashed:replacement-token"
|
|
assert reader.pending_api_key == "pending-api-key"
|
|
assert reader.provisioning_status == EspReaderProvisioningStatus.APPROVED
|
|
|
|
|
|
def test_machine_token_hash_round_trip() -> None:
|
|
token = "esp-device-token"
|
|
stored_hash = get_machine_token_hash(token)
|
|
|
|
assert verify_machine_token(token, stored_hash) is True
|
|
assert verify_machine_token("wrong-token", stored_hash) is False
|
|
|
|
|
|
def test_machine_token_verify_supports_legacy_bcrypt_hash() -> None:
|
|
token = "legacy-esp-token"
|
|
stored_hash = get_password_hash(token)
|
|
|
|
assert verify_machine_token(token, stored_hash) is True
|
|
assert verify_machine_token("wrong-token", stored_hash) is False
|
|
|
|
|
|
def test_get_current_reader_migrates_legacy_bcrypt_api_key() -> None:
|
|
api_key = "legacy-api-key"
|
|
reader = SimpleNamespace(
|
|
device_id="esp32-123456",
|
|
provisioning_status=EspReaderProvisioningStatus.PROVISIONED,
|
|
is_active=True,
|
|
api_key_hash=get_password_hash(api_key),
|
|
pending_api_key=None,
|
|
last_seen_at=None,
|
|
)
|
|
db = _FakeDb(reader)
|
|
|
|
response_reader = asyncio.run(
|
|
esp.get_current_reader(
|
|
x_esp_device_id="esp32-123456",
|
|
x_esp_api_key=api_key,
|
|
db=db,
|
|
)
|
|
)
|
|
|
|
assert response_reader is reader
|
|
assert reader.api_key_hash == get_machine_token_hash(api_key)
|
|
|
|
|
|
def test_compact_tap_response_uses_short_keys() -> None:
|
|
tap = SimpleNamespace(
|
|
accepted=True,
|
|
action=SimpleNamespace(value="check_in"),
|
|
message="Checked in",
|
|
)
|
|
|
|
response = esp._compact_tap_response(tap)
|
|
payload = json.loads(response.body)
|
|
|
|
assert payload == {"ok": True, "a": "check_in", "m": "Checked in"}
|
|
|
|
|
|
def test_provisioning_status_returns_api_key_for_string_status(monkeypatch) -> None:
|
|
reader = SimpleNamespace(
|
|
device_id="esp32-123456",
|
|
provisioning_status="approved",
|
|
pending_api_key=None,
|
|
api_key_hash=None,
|
|
provisioned_at=None,
|
|
updated_at=None,
|
|
)
|
|
|
|
monkeypatch.setattr(esp, "_get_reader_by_registration_token", lambda *args, **kwargs: reader)
|
|
monkeypatch.setattr(esp, "_new_api_key", lambda: "generated-api-key")
|
|
|
|
response = asyncio.run(
|
|
esp.get_provisioning_status(
|
|
x_esp_device_id="esp32-123456",
|
|
x_esp_registration_token="token",
|
|
db=_FakeDb(),
|
|
)
|
|
)
|
|
payload = json.loads(response.body)
|
|
|
|
assert payload["provisioning_status"] == EspReaderProvisioningStatus.PROVISIONED.value
|
|
assert payload["api_key"] == "generated-api-key"
|
|
assert payload["apiKey"] == "generated-api-key"
|