Separate Python and Rust into python/ and rust/ with per-stack Dockerfiles

This commit is contained in:
2026-04-19 14:01:05 +08:00
parent be8e030940
commit c2ef37b84e
184 changed files with 96 additions and 85 deletions

176
python/tests/conftest.py Normal file
View File

@@ -0,0 +1,176 @@
import json
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import quote, urlparse
import hashlib
import hmac
import pytest
from werkzeug.serving import make_server
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app import create_api_app
@pytest.fixture()
def app(tmp_path: Path):
storage_root = tmp_path / "data"
iam_config = tmp_path / "iam.json"
bucket_policies = tmp_path / "bucket_policies.json"
iam_payload = {
"users": [
{
"access_key": "test",
"secret_key": "secret",
"display_name": "Test User",
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy",
"create_bucket", "delete_bucket", "share", "versioning", "tagging",
"encryption", "cors", "lifecycle", "replication", "quota",
"object_lock", "notification", "logging", "website"]}],
}
]
}
iam_config.write_text(json.dumps(iam_payload))
flask_app = create_api_app(
{
"TESTING": True,
"SECRET_KEY": "testing",
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
}
)
yield flask_app
storage = flask_app.extensions.get("object_storage")
if storage:
base = getattr(storage, "storage", storage)
if hasattr(base, "shutdown_stats"):
base.shutdown_stats()
@pytest.fixture()
def client(app):
return app.test_client()
@pytest.fixture()
def live_server(app):
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
time.sleep(0.05)
try:
yield f"http://{host}:{port}"
finally:
server.shutdown()
thread.join(timeout=1)
def _sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(key, date_stamp, region_name, service_name):
k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp)
k_region = _sign(k_date, region_name)
k_service = _sign(k_region, service_name)
k_signing = _sign(k_service, "aws4_request")
return k_signing
@pytest.fixture
def signer():
def _signer(
method,
path,
headers=None,
body=None,
access_key="test",
secret_key="secret",
region="us-east-1",
service="s3",
):
if headers is None:
headers = {}
now = datetime.now(timezone.utc)
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
date_stamp = now.strftime("%Y%m%d")
headers["X-Amz-Date"] = amz_date
# Host header is required for SigV4
if "Host" not in headers:
headers["Host"] = "localhost" # Default for Flask test client
# Payload hash
if body is None:
body = b""
elif isinstance(body, str):
body = body.encode("utf-8")
payload_hash = hashlib.sha256(body).hexdigest()
headers["X-Amz-Content-Sha256"] = payload_hash
# Canonical Request
canonical_uri = quote(path.split("?")[0])
# Query string
parsed = urlparse(path)
query_args = []
if parsed.query:
for pair in parsed.query.split("&"):
if "=" in pair:
k, v = pair.split("=", 1)
else:
k, v = pair, ""
query_args.append((k, v))
query_args.sort(key=lambda x: (x[0], x[1]))
canonical_query_parts = []
for k, v in query_args:
canonical_query_parts.append(f"{quote(k, safe='')}={quote(v, safe='')}")
canonical_query_string = "&".join(canonical_query_parts)
# Canonical Headers
canonical_headers_parts = []
signed_headers_parts = []
for k, v in sorted(headers.items(), key=lambda x: x[0].lower()):
k_lower = k.lower()
v_trim = " ".join(str(v).split())
canonical_headers_parts.append(f"{k_lower}:{v_trim}\n")
signed_headers_parts.append(k_lower)
canonical_headers = "".join(canonical_headers_parts)
signed_headers = ";".join(signed_headers_parts)
canonical_request = (
f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
)
# String to Sign
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
string_to_sign = (
f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
)
# Signature
signing_key = _get_signature_key(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
authorization = (
f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}"
)
headers["Authorization"] = authorization
return headers
return _signer