471f96b654
Builds on the partial bd6b137 fix (which only stripped server_default
expressions) by also:
- Add _StringUUID TypeDecorator: lets Text/String/UUID columns accept
uuid.UUID values on bind (SQLite has no native UUID type) and returns
uuid.UUID on read so existing test assertions like
isinstance(store.id, uuid.UUID) still pass.
- Replace UUID column types with _StringUUID before create_all so
CREATE TABLE uses CHAR(36) instead of the native UUID type that
SQLite can't bind.
- Extend before_insert listener to also set Text PK columns (User.id)
and func.now()-stripped columns (ingested_at) to Python-side defaults
so INSERTs without explicit values succeed under SQLite.
- Switch _create_test_user_and_session to use 32-char hex user/session
ids so they match the format bound by the TypeDecorator on FK reads.
- Simplify test_encrypted_json.py to use the shared engine/session
fixtures from conftest instead of duplicating its own broken engine.
Tests passing: tests/test_models.py (14), tests/test_encrypted_json.py (6).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
"""Tests for EncryptedJSON TypeDecorator and session_data encryption."""
|
|
|
|
import json
|
|
|
|
import pytest
|
|
from cryptography.fernet import Fernet
|
|
from pydantic import ValidationError
|
|
from sqlalchemy import column, table, text
|
|
|
|
from cartsnitch_api.config import settings
|
|
from cartsnitch_api.models.store import Store
|
|
from cartsnitch_api.models.user import User, UserStoreAccount
|
|
|
|
|
|
@pytest.fixture
|
|
def store(session):
|
|
s = Store(name="Test Store", slug="test-store")
|
|
session.add(s)
|
|
session.commit()
|
|
session.refresh(s)
|
|
return s
|
|
|
|
|
|
@pytest.fixture
|
|
def user(session):
|
|
u = User(email="alice@example.com", hashed_password="fakehash")
|
|
session.add(u)
|
|
session.commit()
|
|
session.refresh(u)
|
|
return u
|
|
|
|
|
|
class TestEncryptedJSONType:
|
|
"""Unit tests for the EncryptedJSON TypeDecorator."""
|
|
|
|
def test_round_trip(self, session, user, store):
|
|
"""Data written via the ORM comes back as the original dict."""
|
|
original = {"token": "abc123", "cookies": {"session_id": "xyz"}}
|
|
account = UserStoreAccount(user_id=user.id, store_id=store.id, session_data=original)
|
|
session.add(account)
|
|
session.commit()
|
|
|
|
loaded = session.get(UserStoreAccount, account.id)
|
|
assert loaded.session_data == original
|
|
|
|
def test_stored_value_is_encrypted(self, session, user, store):
|
|
"""The raw value in the DB should be a Fernet token, not plaintext JSON."""
|
|
original = {"secret": "do-not-leak"}
|
|
account = UserStoreAccount(user_id=user.id, store_id=store.id, session_data=original)
|
|
session.add(account)
|
|
session.commit()
|
|
|
|
# Use a raw table construct to bypass TypeDecorator on read
|
|
raw_table = table("user_store_accounts", column("id"), column("session_data"))
|
|
raw = session.execute(raw_table.select().where(raw_table.c.id == str(account.id))).first()
|
|
# If UUID matching fails with str, try bytes format
|
|
if raw is None:
|
|
raw = session.execute(
|
|
text("SELECT session_data FROM user_store_accounts LIMIT 1")
|
|
).scalar_one()
|
|
else:
|
|
raw = raw[1]
|
|
|
|
assert raw != json.dumps(original)
|
|
assert raw.startswith("gAAAAA")
|
|
|
|
# Verify we can decrypt the raw value manually
|
|
f = Fernet(settings.fernet_key.encode())
|
|
decrypted = json.loads(f.decrypt(raw.encode()))
|
|
assert decrypted == original
|
|
|
|
def test_null_round_trip(self, session, user, store):
|
|
"""NULL session_data stays NULL."""
|
|
account = UserStoreAccount(user_id=user.id, store_id=store.id, session_data=None)
|
|
session.add(account)
|
|
session.commit()
|
|
|
|
loaded = session.get(UserStoreAccount, account.id)
|
|
assert loaded.session_data is None
|
|
|
|
def test_empty_dict_round_trip(self, session, user, store):
|
|
"""Empty dict round-trips correctly."""
|
|
account = UserStoreAccount(user_id=user.id, store_id=store.id, session_data={})
|
|
session.add(account)
|
|
session.commit()
|
|
|
|
loaded = session.get(UserStoreAccount, account.id)
|
|
assert loaded.session_data == {}
|
|
|
|
def test_update_session_data(self, session, user, store):
|
|
"""Updating session_data re-encrypts the new value."""
|
|
account = UserStoreAccount(user_id=user.id, store_id=store.id, session_data={"v": 1})
|
|
session.add(account)
|
|
session.commit()
|
|
|
|
account.session_data = {"v": 2, "new_field": True}
|
|
session.commit()
|
|
|
|
loaded = session.get(UserStoreAccount, account.id)
|
|
assert loaded.session_data == {"v": 2, "new_field": True}
|
|
|
|
|
|
class TestEncryptionKeyValidation:
|
|
"""Test that invalid/missing keys are caught at startup."""
|
|
|
|
def test_invalid_fernet_key_rejected(self, monkeypatch):
|
|
"""Settings validation rejects a bad key."""
|
|
monkeypatch.setenv("CARTSNITCH_FERNET_KEY", "not-a-valid-key")
|
|
|
|
with pytest.raises(ValidationError):
|
|
from cartsnitch_api.config import Settings
|
|
|
|
Settings()
|