7 Commits

12 changed files with 1863 additions and 177 deletions

View File

@@ -80,7 +80,7 @@ python run.py --mode api # API only (port 5000)
python run.py --mode ui # UI only (port 5100) python run.py --mode ui # UI only (port 5100)
``` ```
**Default Credentials:** `localadmin` / `localadmin` **Credentials:** Generated automatically on first run and printed to the console. If missed, check the IAM config file at `<STORAGE_ROOT>/.myfsio.sys/config/iam.json`.
- **Web Console:** http://127.0.0.1:5100/ui - **Web Console:** http://127.0.0.1:5100/ui
- **API Endpoint:** http://127.0.0.1:5000 - **API Endpoint:** http://127.0.0.1:5000

View File

@@ -19,6 +19,13 @@ from cryptography.hazmat.primitives import hashes
if sys.platform != "win32": if sys.platform != "win32":
import fcntl import fcntl
try:
import myfsio_core as _rc
_HAS_RUST = True
except ImportError:
_rc = None
_HAS_RUST = False
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -338,6 +345,69 @@ class StreamingEncryptor:
output.seek(0) output.seek(0)
return output return output
def encrypt_file(self, input_path: str, output_path: str) -> EncryptionMetadata:
data_key, encrypted_data_key = self.provider.generate_data_key()
base_nonce = secrets.token_bytes(12)
if _HAS_RUST:
_rc.encrypt_stream_chunked(
input_path, output_path, data_key, base_nonce, self.chunk_size
)
else:
with open(input_path, "rb") as stream:
aesgcm = AESGCM(data_key)
with open(output_path, "wb") as out:
out.write(b"\x00\x00\x00\x00")
chunk_index = 0
while True:
chunk = stream.read(self.chunk_size)
if not chunk:
break
chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index)
encrypted_chunk = aesgcm.encrypt(chunk_nonce, chunk, None)
out.write(len(encrypted_chunk).to_bytes(self.HEADER_SIZE, "big"))
out.write(encrypted_chunk)
chunk_index += 1
out.seek(0)
out.write(chunk_index.to_bytes(4, "big"))
return EncryptionMetadata(
algorithm="AES256",
key_id=self.provider.KEY_ID if hasattr(self.provider, "KEY_ID") else "local",
nonce=base_nonce,
encrypted_data_key=encrypted_data_key,
)
def decrypt_file(self, input_path: str, output_path: str,
metadata: EncryptionMetadata) -> None:
data_key = self.provider.decrypt_data_key(metadata.encrypted_data_key, metadata.key_id)
base_nonce = metadata.nonce
if _HAS_RUST:
_rc.decrypt_stream_chunked(input_path, output_path, data_key, base_nonce)
else:
with open(input_path, "rb") as stream:
chunk_count_bytes = stream.read(4)
if len(chunk_count_bytes) < 4:
raise EncryptionError("Invalid encrypted stream: missing header")
chunk_count = int.from_bytes(chunk_count_bytes, "big")
aesgcm = AESGCM(data_key)
with open(output_path, "wb") as out:
for chunk_index in range(chunk_count):
size_bytes = stream.read(self.HEADER_SIZE)
if len(size_bytes) < self.HEADER_SIZE:
raise EncryptionError(f"Invalid encrypted stream: truncated at chunk {chunk_index}")
chunk_size = int.from_bytes(size_bytes, "big")
encrypted_chunk = stream.read(chunk_size)
if len(encrypted_chunk) < chunk_size:
raise EncryptionError(f"Invalid encrypted stream: incomplete chunk {chunk_index}")
chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index)
try:
decrypted_chunk = aesgcm.decrypt(chunk_nonce, encrypted_chunk, None)
out.write(decrypted_chunk)
except Exception as exc:
raise EncryptionError(f"Failed to decrypt chunk {chunk_index}: {exc}") from exc
class EncryptionManager: class EncryptionManager:
"""Manages encryption providers and operations.""" """Manages encryption providers and operations."""

View File

@@ -293,9 +293,7 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
raise IamError("Required headers not signed") raise IamError("Required headers not signed")
canonical_uri = _get_canonical_uri(req) canonical_uri = _get_canonical_uri(req)
payload_hash = req.headers.get("X-Amz-Content-Sha256") payload_hash = req.headers.get("X-Amz-Content-Sha256") or "UNSIGNED-PAYLOAD"
if not payload_hash:
payload_hash = hashlib.sha256(req.get_data()).hexdigest()
if _HAS_RUST: if _HAS_RUST:
query_params = list(req.args.items(multi=True)) query_params = list(req.args.items(multi=True))
@@ -305,16 +303,10 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
header_values, payload_hash, amz_date, date_stamp, region, header_values, payload_hash, amz_date, date_stamp, region,
service, secret_key, signature, service, secret_key, signature,
): ):
if current_app.config.get("DEBUG_SIGV4"):
logger.warning("SigV4 signature mismatch for %s %s", req.method, req.path)
raise IamError("SignatureDoesNotMatch") raise IamError("SignatureDoesNotMatch")
else: else:
method = req.method method = req.method
query_args = [] query_args = sorted(req.args.items(multi=True), key=lambda x: (x[0], x[1]))
for key, value in req.args.items(multi=True):
query_args.append((key, value))
query_args.sort(key=lambda x: (x[0], x[1]))
canonical_query_parts = [] canonical_query_parts = []
for k, v in query_args: for k, v in query_args:
canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}") canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}")
@@ -339,8 +331,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}" string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(calculated_signature, signature): if not hmac.compare_digest(calculated_signature, signature):
if current_app.config.get("DEBUG_SIGV4"):
logger.warning("SigV4 signature mismatch for %s %s", method, req.path)
raise IamError("SignatureDoesNotMatch") raise IamError("SignatureDoesNotMatch")
session_token = req.headers.get("X-Amz-Security-Token") session_token = req.headers.get("X-Amz-Security-Token")
@@ -682,7 +672,7 @@ def _extract_request_metadata() -> Dict[str, str]:
for header, value in request.headers.items(): for header, value in request.headers.items():
if header.lower().startswith("x-amz-meta-"): if header.lower().startswith("x-amz-meta-"):
key = header[11:] key = header[11:]
if key: if key and not (key.startswith("__") and key.endswith("__")):
metadata[key] = value metadata[key] = value
return metadata return metadata
@@ -1039,6 +1029,8 @@ def _apply_object_headers(
response.headers["ETag"] = f'"{etag}"' response.headers["ETag"] = f'"{etag}"'
response.headers["Accept-Ranges"] = "bytes" response.headers["Accept-Ranges"] = "bytes"
for key, value in (metadata or {}).items(): for key, value in (metadata or {}).items():
if key.startswith("__") and key.endswith("__"):
continue
safe_value = _sanitize_header_value(str(value)) safe_value = _sanitize_header_value(str(value))
response.headers[f"X-Amz-Meta-{key}"] = safe_value response.headers[f"X-Amz-Meta-{key}"] = safe_value
@@ -2467,7 +2459,7 @@ def _post_object(bucket_name: str) -> Response:
for field_name, value in request.form.items(): for field_name, value in request.form.items():
if field_name.lower().startswith("x-amz-meta-"): if field_name.lower().startswith("x-amz-meta-"):
key = field_name[11:] key = field_name[11:]
if key: if key and not (key.startswith("__") and key.endswith("__")):
metadata[key] = value metadata[key] = value
try: try:
meta = storage.put_object(bucket_name, object_key, file.stream, metadata=metadata or None) meta = storage.put_object(bucket_name, object_key, file.stream, metadata=metadata or None)
@@ -3445,8 +3437,8 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response:
if validation_error: if validation_error:
return _error_response("InvalidArgument", validation_error, 400) return _error_response("InvalidArgument", validation_error, 400)
else: else:
metadata = source_metadata metadata = {k: v for k, v in source_metadata.items() if not (k.startswith("__") and k.endswith("__"))}
try: try:
with source_path.open("rb") as stream: with source_path.open("rb") as stream:
meta = storage.put_object( meta = storage.put_object(

View File

@@ -16,7 +16,7 @@ from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path, PurePosixPath
from typing import Any, BinaryIO, Dict, Generator, List, Optional from typing import Any, BinaryIO, Dict, Generator, List, Optional
try: try:
@@ -292,37 +292,43 @@ class ObjectStorage:
bucket_str = str(bucket_path) bucket_str = str(bucket_path)
try: try:
stack = [bucket_str] if _HAS_RUST:
while stack: versions_root = str(self._bucket_versions_root(bucket_name))
current = stack.pop() object_count, total_bytes, version_count, version_bytes = _rc.bucket_stats_scan(
try: bucket_str, versions_root
with os.scandir(current) as it: )
for entry in it: else:
if current == bucket_str and entry.name in internal: stack = [bucket_str]
continue while stack:
if entry.is_dir(follow_symlinks=False): current = stack.pop()
stack.append(entry.path)
elif entry.is_file(follow_symlinks=False):
object_count += 1
total_bytes += entry.stat(follow_symlinks=False).st_size
except PermissionError:
continue
versions_root = self._bucket_versions_root(bucket_name)
if versions_root.exists():
v_stack = [str(versions_root)]
while v_stack:
v_current = v_stack.pop()
try: try:
with os.scandir(v_current) as it: with os.scandir(current) as it:
for entry in it: for entry in it:
if current == bucket_str and entry.name in internal:
continue
if entry.is_dir(follow_symlinks=False): if entry.is_dir(follow_symlinks=False):
v_stack.append(entry.path) stack.append(entry.path)
elif entry.is_file(follow_symlinks=False) and entry.name.endswith(".bin"): elif entry.is_file(follow_symlinks=False):
version_count += 1 object_count += 1
version_bytes += entry.stat(follow_symlinks=False).st_size total_bytes += entry.stat(follow_symlinks=False).st_size
except PermissionError: except PermissionError:
continue continue
versions_root = self._bucket_versions_root(bucket_name)
if versions_root.exists():
v_stack = [str(versions_root)]
while v_stack:
v_current = v_stack.pop()
try:
with os.scandir(v_current) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
v_stack.append(entry.path)
elif entry.is_file(follow_symlinks=False) and entry.name.endswith(".bin"):
version_count += 1
version_bytes += entry.stat(follow_symlinks=False).st_size
except PermissionError:
continue
except OSError: except OSError:
if cached_stats is not None: if cached_stats is not None:
return cached_stats return cached_stats
@@ -559,47 +565,69 @@ class ObjectStorage:
entries_files: list[tuple[str, int, float, Optional[str]]] = [] entries_files: list[tuple[str, int, float, Optional[str]]] = []
entries_dirs: list[str] = [] entries_dirs: list[str] = []
try: if _HAS_RUST:
with os.scandir(str(target_dir)) as it: try:
for entry in it: raw = _rc.shallow_scan(str(target_dir), prefix, json.dumps(meta_cache))
name = entry.name entries_files = []
if name in self.INTERNAL_FOLDERS: for key, size, mtime, etag in raw["files"]:
continue if etag is None:
if entry.is_dir(follow_symlinks=False): safe_key = PurePosixPath(key)
cp = prefix + name + delimiter meta = self._read_metadata(bucket_id, Path(safe_key))
entries_dirs.append(cp) etag = meta.get("__etag__") if meta else None
elif entry.is_file(follow_symlinks=False): entries_files.append((key, size, mtime, etag))
key = prefix + name entries_dirs = raw["dirs"]
try: all_items = raw["merged_keys"]
st = entry.stat() except OSError:
etag = meta_cache.get(key) return ShallowListResult(
entries_files.append((key, st.st_size, st.st_mtime, etag)) objects=[], common_prefixes=[],
except OSError: is_truncated=False, next_continuation_token=None,
pass )
except OSError: else:
return ShallowListResult( try:
objects=[], common_prefixes=[], with os.scandir(str(target_dir)) as it:
is_truncated=False, next_continuation_token=None, for entry in it:
) name = entry.name
if name in self.INTERNAL_FOLDERS:
continue
if entry.is_dir(follow_symlinks=False):
cp = prefix + name + delimiter
entries_dirs.append(cp)
elif entry.is_file(follow_symlinks=False):
key = prefix + name
try:
st = entry.stat()
etag = meta_cache.get(key)
if etag is None:
safe_key = PurePosixPath(key)
meta = self._read_metadata(bucket_id, Path(safe_key))
etag = meta.get("__etag__") if meta else None
entries_files.append((key, st.st_size, st.st_mtime, etag))
except OSError:
pass
except OSError:
return ShallowListResult(
objects=[], common_prefixes=[],
is_truncated=False, next_continuation_token=None,
)
entries_dirs.sort() entries_dirs.sort()
entries_files.sort(key=lambda x: x[0]) entries_files.sort(key=lambda x: x[0])
all_items: list[tuple[str, bool]] = [] all_items: list[tuple[str, bool]] = []
fi, di = 0, 0 fi, di = 0, 0
while fi < len(entries_files) and di < len(entries_dirs): while fi < len(entries_files) and di < len(entries_dirs):
if entries_files[fi][0] <= entries_dirs[di]: if entries_files[fi][0] <= entries_dirs[di]:
all_items.append((entries_files[fi][0], False))
fi += 1
else:
all_items.append((entries_dirs[di], True))
di += 1
while fi < len(entries_files):
all_items.append((entries_files[fi][0], False)) all_items.append((entries_files[fi][0], False))
fi += 1 fi += 1
else: while di < len(entries_dirs):
all_items.append((entries_dirs[di], True)) all_items.append((entries_dirs[di], True))
di += 1 di += 1
while fi < len(entries_files):
all_items.append((entries_files[fi][0], False))
fi += 1
while di < len(entries_dirs):
all_items.append((entries_dirs[di], True))
di += 1
files_map = {e[0]: e for e in entries_files} files_map = {e[0]: e for e in entries_files}
@@ -714,6 +742,22 @@ class ObjectStorage:
else: else:
search_root = bucket_path search_root = bucket_path
if _HAS_RUST:
raw = _rc.search_objects_scan(
str(bucket_path), str(search_root), query, limit
)
results = [
{
"key": k,
"size": s,
"last_modified": datetime.fromtimestamp(
m, tz=timezone.utc
).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
}
for k, s, m in raw["results"]
]
return {"results": results, "truncated": raw["truncated"]}
query_lower = query.lower() query_lower = query.lower()
results: list[Dict[str, Any]] = [] results: list[Dict[str, Any]] = []
internal = self.INTERNAL_FOLDERS internal = self.INTERNAL_FOLDERS
@@ -797,40 +841,78 @@ class ObjectStorage:
tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR
tmp_dir.mkdir(parents=True, exist_ok=True) tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp"
try:
checksum = hashlib.md5()
with tmp_path.open("wb") as target:
shutil.copyfileobj(_HashingReader(stream, checksum), target)
new_size = tmp_path.stat().st_size
size_delta = new_size - existing_size
object_delta = 0 if is_overwrite else 1
if enforce_quota: if _HAS_RUST:
quota_check = self.check_quota( tmp_path = None
bucket_name,
additional_bytes=max(0, size_delta),
additional_objects=object_delta,
)
if not quota_check["allowed"]:
raise QuotaExceededError(
quota_check["message"] or "Quota exceeded",
quota_check["quota"],
quota_check["usage"],
)
shutil.move(str(tmp_path), str(destination))
finally:
try: try:
tmp_path.unlink(missing_ok=True) tmp_path_str, etag, new_size = _rc.stream_to_file_with_md5(
except OSError: stream, str(tmp_dir)
pass )
tmp_path = Path(tmp_path_str)
size_delta = new_size - existing_size
object_delta = 0 if is_overwrite else 1
if enforce_quota:
quota_check = self.check_quota(
bucket_name,
additional_bytes=max(0, size_delta),
additional_objects=object_delta,
)
if not quota_check["allowed"]:
raise QuotaExceededError(
quota_check["message"] or "Quota exceeded",
quota_check["quota"],
quota_check["usage"],
)
shutil.move(str(tmp_path), str(destination))
finally:
if tmp_path:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
else:
tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp"
try:
with tmp_path.open("wb") as target:
shutil.copyfileobj(stream, target)
new_size = tmp_path.stat().st_size
size_delta = new_size - existing_size
object_delta = 0 if is_overwrite else 1
if enforce_quota:
quota_check = self.check_quota(
bucket_name,
additional_bytes=max(0, size_delta),
additional_objects=object_delta,
)
if not quota_check["allowed"]:
raise QuotaExceededError(
quota_check["message"] or "Quota exceeded",
quota_check["quota"],
quota_check["usage"],
)
checksum = hashlib.md5()
with tmp_path.open("rb") as f:
while True:
chunk = f.read(1048576)
if not chunk:
break
checksum.update(chunk)
etag = checksum.hexdigest()
shutil.move(str(tmp_path), str(destination))
finally:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
stat = destination.stat() stat = destination.stat()
etag = checksum.hexdigest()
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
combined_meta = {**internal_meta, **(metadata or {})} combined_meta = {**internal_meta, **(metadata or {})}
@@ -1421,14 +1503,24 @@ class ObjectStorage:
if not upload_root.exists(): if not upload_root.exists():
raise StorageError("Multipart upload not found") raise StorageError("Multipart upload not found")
checksum = hashlib.md5()
part_filename = f"part-{part_number:05d}.part" part_filename = f"part-{part_number:05d}.part"
part_path = upload_root / part_filename part_path = upload_root / part_filename
temp_path = upload_root / f".{part_filename}.tmp" temp_path = upload_root / f".{part_filename}.tmp"
try: try:
with temp_path.open("wb") as target: with temp_path.open("wb") as target:
shutil.copyfileobj(_HashingReader(stream, checksum), target) shutil.copyfileobj(stream, target)
if _HAS_RUST:
part_etag = _rc.md5_file(str(temp_path))
else:
checksum = hashlib.md5()
with temp_path.open("rb") as f:
while True:
chunk = f.read(1048576)
if not chunk:
break
checksum.update(chunk)
part_etag = checksum.hexdigest()
temp_path.replace(part_path) temp_path.replace(part_path)
except OSError: except OSError:
try: try:
@@ -1438,7 +1530,7 @@ class ObjectStorage:
raise raise
record = { record = {
"etag": checksum.hexdigest(), "etag": part_etag,
"size": part_path.stat().st_size, "size": part_path.stat().st_size,
"filename": part_filename, "filename": part_filename,
} }
@@ -1638,19 +1730,29 @@ class ObjectStorage:
if versioning_enabled and destination.exists(): if versioning_enabled and destination.exists():
archived_version_size = destination.stat().st_size archived_version_size = destination.stat().st_size
self._archive_current_version(bucket_id, safe_key, reason="overwrite") self._archive_current_version(bucket_id, safe_key, reason="overwrite")
checksum = hashlib.md5() if _HAS_RUST:
with destination.open("wb") as target: part_paths = []
for _, record in validated: for _, record in validated:
part_path = upload_root / record["filename"] pp = upload_root / record["filename"]
if not part_path.exists(): if not pp.exists():
raise StorageError(f"Missing part file {record['filename']}") raise StorageError(f"Missing part file {record['filename']}")
with part_path.open("rb") as chunk: part_paths.append(str(pp))
while True: checksum_hex = _rc.assemble_parts_with_md5(part_paths, str(destination))
data = chunk.read(1024 * 1024) else:
if not data: checksum = hashlib.md5()
break with destination.open("wb") as target:
checksum.update(data) for _, record in validated:
target.write(data) part_path = upload_root / record["filename"]
if not part_path.exists():
raise StorageError(f"Missing part file {record['filename']}")
with part_path.open("rb") as chunk:
while True:
data = chunk.read(1024 * 1024)
if not data:
break
checksum.update(data)
target.write(data)
checksum_hex = checksum.hexdigest()
except BlockingIOError: except BlockingIOError:
raise StorageError("Another upload to this key is in progress") raise StorageError("Another upload to this key is in progress")
@@ -1665,7 +1767,7 @@ class ObjectStorage:
) )
stat = destination.stat() stat = destination.stat()
etag = checksum.hexdigest() etag = checksum_hex
metadata = manifest.get("metadata") metadata = manifest.get("metadata")
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
@@ -1838,21 +1940,41 @@ class ObjectStorage:
return list(self._build_object_cache(bucket_path).keys()) return list(self._build_object_cache(bucket_path).keys())
def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]: def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Build a complete object metadata cache for a bucket.
Uses os.scandir for fast directory walking and a persistent etag index.
"""
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
bucket_id = bucket_path.name bucket_id = bucket_path.name
objects: Dict[str, ObjectMeta] = {} objects: Dict[str, ObjectMeta] = {}
bucket_str = str(bucket_path) bucket_str = str(bucket_path)
bucket_len = len(bucket_str) + 1 bucket_len = len(bucket_str) + 1
if _HAS_RUST:
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
raw = _rc.build_object_cache(
bucket_str,
str(self._bucket_meta_root(bucket_id)),
str(etag_index_path),
)
if raw["etag_cache_changed"] and raw["etag_cache"]:
try:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(raw["etag_cache"], f)
except OSError:
pass
for key, size, mtime, etag in raw["objects"]:
objects[key] = ObjectMeta(
key=key,
size=size,
last_modified=datetime.fromtimestamp(mtime, timezone.utc),
etag=etag,
metadata=None,
)
return objects
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
meta_cache: Dict[str, str] = {} meta_cache: Dict[str, str] = {}
index_mtime: float = 0 index_mtime: float = 0
if etag_index_path.exists(): if etag_index_path.exists():
try: try:
index_mtime = etag_index_path.stat().st_mtime index_mtime = etag_index_path.stat().st_mtime
@@ -1860,10 +1982,10 @@ class ObjectStorage:
meta_cache = json.load(f) meta_cache = json.load(f)
except (OSError, json.JSONDecodeError): except (OSError, json.JSONDecodeError):
meta_cache = {} meta_cache = {}
meta_root = self._bucket_meta_root(bucket_id) meta_root = self._bucket_meta_root(bucket_id)
needs_rebuild = False needs_rebuild = False
if meta_root.exists() and index_mtime > 0: if meta_root.exists() and index_mtime > 0:
def check_newer(dir_path: str) -> bool: def check_newer(dir_path: str) -> bool:
try: try:
@@ -1881,7 +2003,7 @@ class ObjectStorage:
needs_rebuild = check_newer(str(meta_root)) needs_rebuild = check_newer(str(meta_root))
elif not meta_cache: elif not meta_cache:
needs_rebuild = True needs_rebuild = True
if needs_rebuild and meta_root.exists(): if needs_rebuild and meta_root.exists():
meta_str = str(meta_root) meta_str = str(meta_root)
meta_len = len(meta_str) + 1 meta_len = len(meta_str) + 1
@@ -1962,7 +2084,7 @@ class ObjectStorage:
json.dump(meta_cache, f) json.dump(meta_cache, f)
except OSError: except OSError:
pass pass
def scan_dir(dir_path: str) -> None: def scan_dir(dir_path: str) -> None:
try: try:
with os.scandir(dir_path) as it: with os.scandir(dir_path) as it:
@@ -1977,11 +2099,11 @@ class ObjectStorage:
first_part = rel.split(os.sep)[0] if os.sep in rel else rel first_part = rel.split(os.sep)[0] if os.sep in rel else rel
if first_part in self.INTERNAL_FOLDERS: if first_part in self.INTERNAL_FOLDERS:
continue continue
key = rel.replace(os.sep, '/') key = rel.replace(os.sep, '/')
try: try:
stat = entry.stat() stat = entry.stat()
etag = meta_cache.get(key) etag = meta_cache.get(key)
objects[key] = ObjectMeta( objects[key] = ObjectMeta(
@@ -1989,13 +2111,13 @@ class ObjectStorage:
size=stat.st_size, size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=etag, etag=etag,
metadata=None, metadata=None,
) )
except OSError: except OSError:
pass pass
except OSError: except OSError:
pass pass
scan_dir(bucket_str) scan_dir(bucket_str)
return objects return objects
@@ -2094,16 +2216,15 @@ class ObjectStorage:
def _update_etag_index(self, bucket_id: str, key: str, etag: Optional[str]) -> None: def _update_etag_index(self, bucket_id: str, key: str, etag: Optional[str]) -> None:
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
if not etag_index_path.exists():
return
try: try:
index: Dict[str, str] = {} with open(etag_index_path, 'r', encoding='utf-8') as f:
if etag_index_path.exists(): index = json.load(f)
with open(etag_index_path, 'r', encoding='utf-8') as f:
index = json.load(f)
if etag is None: if etag is None:
index.pop(key, None) index.pop(key, None)
else: else:
index[key] = etag index[key] = etag
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f: with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(index, f) json.dump(index, f)
except (OSError, json.JSONDecodeError): except (OSError, json.JSONDecodeError):
@@ -2281,15 +2402,18 @@ class ObjectStorage:
index_path, entry_name = self._index_file_for_key(bucket_name, key) index_path, entry_name = self._index_file_for_key(bucket_name, key)
lock = self._get_meta_index_lock(str(index_path)) lock = self._get_meta_index_lock(str(index_path))
with lock: with lock:
index_path.parent.mkdir(parents=True, exist_ok=True) if _HAS_RUST:
index_data: Dict[str, Any] = {} _rc.write_index_entry(str(index_path), entry_name, json.dumps(entry))
if index_path.exists(): else:
try: index_path.parent.mkdir(parents=True, exist_ok=True)
index_data = json.loads(index_path.read_text(encoding="utf-8")) index_data: Dict[str, Any] = {}
except (OSError, json.JSONDecodeError): if index_path.exists():
pass try:
index_data[entry_name] = entry index_data = json.loads(index_path.read_text(encoding="utf-8"))
index_path.write_text(json.dumps(index_data), encoding="utf-8") except (OSError, json.JSONDecodeError):
pass
index_data[entry_name] = entry
index_path.write_text(json.dumps(index_data), encoding="utf-8")
self._invalidate_meta_read_cache(bucket_name, key) self._invalidate_meta_read_cache(bucket_name, key)
def _delete_index_entry(self, bucket_name: str, key: Path) -> None: def _delete_index_entry(self, bucket_name: str, key: Path) -> None:
@@ -2299,20 +2423,23 @@ class ObjectStorage:
return return
lock = self._get_meta_index_lock(str(index_path)) lock = self._get_meta_index_lock(str(index_path))
with lock: with lock:
try: if _HAS_RUST:
index_data = json.loads(index_path.read_text(encoding="utf-8")) _rc.delete_index_entry(str(index_path), entry_name)
except (OSError, json.JSONDecodeError): else:
self._invalidate_meta_read_cache(bucket_name, key) try:
return index_data = json.loads(index_path.read_text(encoding="utf-8"))
if entry_name in index_data: except (OSError, json.JSONDecodeError):
del index_data[entry_name] self._invalidate_meta_read_cache(bucket_name, key)
if index_data: return
index_path.write_text(json.dumps(index_data), encoding="utf-8") if entry_name in index_data:
else: del index_data[entry_name]
try: if index_data:
index_path.unlink() index_path.write_text(json.dumps(index_data), encoding="utf-8")
except OSError: else:
pass try:
index_path.unlink()
except OSError:
pass
self._invalidate_meta_read_cache(bucket_name, key) self._invalidate_meta_read_cache(bucket_name, key)
def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]: def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
@@ -2410,15 +2537,24 @@ class ObjectStorage:
continue continue
def _check_bucket_contents(self, bucket_path: Path) -> tuple[bool, bool, bool]: def _check_bucket_contents(self, bucket_path: Path) -> tuple[bool, bool, bool]:
"""Check bucket for objects, versions, and multipart uploads in a single pass. bucket_name = bucket_path.name
if _HAS_RUST:
return _rc.check_bucket_contents(
str(bucket_path),
[
str(self._bucket_versions_root(bucket_name)),
str(self._legacy_versions_root(bucket_name)),
],
[
str(self._multipart_bucket_root(bucket_name)),
str(self._legacy_multipart_bucket_root(bucket_name)),
],
)
Returns (has_visible_objects, has_archived_versions, has_active_multipart_uploads).
Uses early exit when all three are found.
"""
has_objects = False has_objects = False
has_versions = False has_versions = False
has_multipart = False has_multipart = False
bucket_name = bucket_path.name
for path in bucket_path.rglob("*"): for path in bucket_path.rglob("*"):
if has_objects: if has_objects:

View File

@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
APP_VERSION = "0.3.2" APP_VERSION = "0.3.4"
def get_version() -> str: def get_version() -> str:

View File

@@ -19,3 +19,6 @@ regex = "1"
lru = "0.14" lru = "0.14"
parking_lot = "0.12" parking_lot = "0.12"
percent-encoding = "2" percent-encoding = "2"
aes-gcm = "0.10"
hkdf = "0.12"
uuid = { version = "1", features = ["v4"] }

192
myfsio_core/src/crypto.rs Normal file
View File

@@ -0,0 +1,192 @@
use aes_gcm::aead::Aead;
use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
use hkdf::Hkdf;
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use sha2::Sha256;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
const DEFAULT_CHUNK_SIZE: usize = 65536;
const HEADER_SIZE: usize = 4;
fn read_exact_chunk(reader: &mut impl Read, buf: &mut [u8]) -> std::io::Result<usize> {
let mut filled = 0;
while filled < buf.len() {
match reader.read(&mut buf[filled..]) {
Ok(0) => break,
Ok(n) => filled += n,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
Ok(filled)
}
fn derive_chunk_nonce(base_nonce: &[u8], chunk_index: u32) -> Result<[u8; 12], String> {
let hkdf = Hkdf::<Sha256>::new(Some(base_nonce), b"chunk_nonce");
let mut okm = [0u8; 12];
hkdf.expand(&chunk_index.to_be_bytes(), &mut okm)
.map_err(|e| format!("HKDF expand failed: {}", e))?;
Ok(okm)
}
#[pyfunction]
#[pyo3(signature = (input_path, output_path, key, base_nonce, chunk_size=DEFAULT_CHUNK_SIZE))]
pub fn encrypt_stream_chunked(
py: Python<'_>,
input_path: &str,
output_path: &str,
key: &[u8],
base_nonce: &[u8],
chunk_size: usize,
) -> PyResult<u32> {
if key.len() != 32 {
return Err(PyValueError::new_err(format!(
"Key must be 32 bytes, got {}",
key.len()
)));
}
if base_nonce.len() != 12 {
return Err(PyValueError::new_err(format!(
"Base nonce must be 12 bytes, got {}",
base_nonce.len()
)));
}
let chunk_size = if chunk_size == 0 {
DEFAULT_CHUNK_SIZE
} else {
chunk_size
};
let inp = input_path.to_owned();
let out = output_path.to_owned();
let key_arr: [u8; 32] = key.try_into().unwrap();
let nonce_arr: [u8; 12] = base_nonce.try_into().unwrap();
py.detach(move || {
let cipher = Aes256Gcm::new(&key_arr.into());
let mut infile = File::open(&inp)
.map_err(|e| PyIOError::new_err(format!("Failed to open input: {}", e)))?;
let mut outfile = File::create(&out)
.map_err(|e| PyIOError::new_err(format!("Failed to create output: {}", e)))?;
outfile
.write_all(&[0u8; 4])
.map_err(|e| PyIOError::new_err(format!("Failed to write header: {}", e)))?;
let mut buf = vec![0u8; chunk_size];
let mut chunk_index: u32 = 0;
loop {
let n = read_exact_chunk(&mut infile, &mut buf)
.map_err(|e| PyIOError::new_err(format!("Failed to read: {}", e)))?;
if n == 0 {
break;
}
let nonce_bytes = derive_chunk_nonce(&nonce_arr, chunk_index)
.map_err(|e| PyValueError::new_err(e))?;
let nonce = Nonce::from_slice(&nonce_bytes);
let encrypted = cipher
.encrypt(nonce, &buf[..n])
.map_err(|e| PyValueError::new_err(format!("Encrypt failed: {}", e)))?;
let size = encrypted.len() as u32;
outfile
.write_all(&size.to_be_bytes())
.map_err(|e| PyIOError::new_err(format!("Failed to write chunk size: {}", e)))?;
outfile
.write_all(&encrypted)
.map_err(|e| PyIOError::new_err(format!("Failed to write chunk: {}", e)))?;
chunk_index += 1;
}
outfile
.seek(SeekFrom::Start(0))
.map_err(|e| PyIOError::new_err(format!("Failed to seek: {}", e)))?;
outfile
.write_all(&chunk_index.to_be_bytes())
.map_err(|e| PyIOError::new_err(format!("Failed to write chunk count: {}", e)))?;
Ok(chunk_index)
})
}
#[pyfunction]
pub fn decrypt_stream_chunked(
py: Python<'_>,
input_path: &str,
output_path: &str,
key: &[u8],
base_nonce: &[u8],
) -> PyResult<u32> {
if key.len() != 32 {
return Err(PyValueError::new_err(format!(
"Key must be 32 bytes, got {}",
key.len()
)));
}
if base_nonce.len() != 12 {
return Err(PyValueError::new_err(format!(
"Base nonce must be 12 bytes, got {}",
base_nonce.len()
)));
}
let inp = input_path.to_owned();
let out = output_path.to_owned();
let key_arr: [u8; 32] = key.try_into().unwrap();
let nonce_arr: [u8; 12] = base_nonce.try_into().unwrap();
py.detach(move || {
let cipher = Aes256Gcm::new(&key_arr.into());
let mut infile = File::open(&inp)
.map_err(|e| PyIOError::new_err(format!("Failed to open input: {}", e)))?;
let mut outfile = File::create(&out)
.map_err(|e| PyIOError::new_err(format!("Failed to create output: {}", e)))?;
let mut header = [0u8; HEADER_SIZE];
infile
.read_exact(&mut header)
.map_err(|e| PyIOError::new_err(format!("Failed to read header: {}", e)))?;
let chunk_count = u32::from_be_bytes(header);
let mut size_buf = [0u8; HEADER_SIZE];
for chunk_index in 0..chunk_count {
infile
.read_exact(&mut size_buf)
.map_err(|e| {
PyIOError::new_err(format!(
"Failed to read chunk {} size: {}",
chunk_index, e
))
})?;
let chunk_size = u32::from_be_bytes(size_buf) as usize;
let mut encrypted = vec![0u8; chunk_size];
infile.read_exact(&mut encrypted).map_err(|e| {
PyIOError::new_err(format!("Failed to read chunk {}: {}", chunk_index, e))
})?;
let nonce_bytes = derive_chunk_nonce(&nonce_arr, chunk_index)
.map_err(|e| PyValueError::new_err(e))?;
let nonce = Nonce::from_slice(&nonce_bytes);
let decrypted = cipher.decrypt(nonce, encrypted.as_ref()).map_err(|e| {
PyValueError::new_err(format!("Decrypt chunk {} failed: {}", chunk_index, e))
})?;
outfile.write_all(&decrypted).map_err(|e| {
PyIOError::new_err(format!("Failed to write chunk {}: {}", chunk_index, e))
})?;
}
Ok(chunk_count)
})
}

View File

@@ -1,6 +1,9 @@
mod crypto;
mod hashing; mod hashing;
mod metadata; mod metadata;
mod sigv4; mod sigv4;
mod storage;
mod streaming;
mod validation; mod validation;
use pyo3::prelude::*; use pyo3::prelude::*;
@@ -29,6 +32,20 @@ mod myfsio_core {
m.add_function(wrap_pyfunction!(metadata::read_index_entry, m)?)?; m.add_function(wrap_pyfunction!(metadata::read_index_entry, m)?)?;
m.add_function(wrap_pyfunction!(storage::write_index_entry, m)?)?;
m.add_function(wrap_pyfunction!(storage::delete_index_entry, m)?)?;
m.add_function(wrap_pyfunction!(storage::check_bucket_contents, m)?)?;
m.add_function(wrap_pyfunction!(storage::shallow_scan, m)?)?;
m.add_function(wrap_pyfunction!(storage::bucket_stats_scan, m)?)?;
m.add_function(wrap_pyfunction!(storage::search_objects_scan, m)?)?;
m.add_function(wrap_pyfunction!(storage::build_object_cache, m)?)?;
m.add_function(wrap_pyfunction!(streaming::stream_to_file_with_md5, m)?)?;
m.add_function(wrap_pyfunction!(streaming::assemble_parts_with_md5, m)?)?;
m.add_function(wrap_pyfunction!(crypto::encrypt_stream_chunked, m)?)?;
m.add_function(wrap_pyfunction!(crypto::decrypt_stream_chunked, m)?)?;
Ok(()) Ok(())
} }
} }

817
myfsio_core/src/storage.rs Normal file
View File

@@ -0,0 +1,817 @@
use pyo3::exceptions::PyIOError;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList, PyString, PyTuple};
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::SystemTime;
const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"];
fn system_time_to_epoch(t: SystemTime) -> f64 {
t.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
fn extract_etag_from_meta_bytes(content: &[u8]) -> Option<String> {
let marker = b"\"__etag__\"";
let idx = content.windows(marker.len()).position(|w| w == marker)?;
let after = &content[idx + marker.len()..];
let start = after.iter().position(|&b| b == b'"')? + 1;
let rest = &after[start..];
let end = rest.iter().position(|&b| b == b'"')?;
std::str::from_utf8(&rest[..end]).ok().map(|s| s.to_owned())
}
fn has_any_file(root: &str) -> bool {
let root_path = Path::new(root);
if !root_path.is_dir() {
return false;
}
let mut stack = vec![root_path.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_file() {
return true;
}
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
}
}
}
false
}
#[pyfunction]
pub fn write_index_entry(
py: Python<'_>,
path: &str,
entry_name: &str,
entry_data_json: &str,
) -> PyResult<()> {
let path_owned = path.to_owned();
let entry_owned = entry_name.to_owned();
let data_owned = entry_data_json.to_owned();
py.detach(move || -> PyResult<()> {
let entry_value: Value = serde_json::from_str(&data_owned)
.map_err(|e| PyIOError::new_err(format!("Failed to parse entry data: {}", e)))?;
if let Some(parent) = Path::new(&path_owned).parent() {
let _ = fs::create_dir_all(parent);
}
let mut index_data: serde_json::Map<String, Value> = match fs::read_to_string(&path_owned)
{
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => serde_json::Map::new(),
};
index_data.insert(entry_owned, entry_value);
let serialized = serde_json::to_string(&Value::Object(index_data))
.map_err(|e| PyIOError::new_err(format!("Failed to serialize index: {}", e)))?;
fs::write(&path_owned, serialized)
.map_err(|e| PyIOError::new_err(format!("Failed to write index: {}", e)))?;
Ok(())
})
}
#[pyfunction]
pub fn delete_index_entry(py: Python<'_>, path: &str, entry_name: &str) -> PyResult<bool> {
let path_owned = path.to_owned();
let entry_owned = entry_name.to_owned();
py.detach(move || -> PyResult<bool> {
let content = match fs::read_to_string(&path_owned) {
Ok(c) => c,
Err(_) => return Ok(false),
};
let mut index_data: serde_json::Map<String, Value> =
match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return Ok(false),
};
if index_data.remove(&entry_owned).is_none() {
return Ok(false);
}
if index_data.is_empty() {
let _ = fs::remove_file(&path_owned);
return Ok(true);
}
let serialized = serde_json::to_string(&Value::Object(index_data))
.map_err(|e| PyIOError::new_err(format!("Failed to serialize index: {}", e)))?;
fs::write(&path_owned, serialized)
.map_err(|e| PyIOError::new_err(format!("Failed to write index: {}", e)))?;
Ok(false)
})
}
#[pyfunction]
pub fn check_bucket_contents(
py: Python<'_>,
bucket_path: &str,
version_roots: Vec<String>,
multipart_roots: Vec<String>,
) -> PyResult<(bool, bool, bool)> {
let bucket_owned = bucket_path.to_owned();
py.detach(move || -> PyResult<(bool, bool, bool)> {
let mut has_objects = false;
let bucket_p = Path::new(&bucket_owned);
if bucket_p.is_dir() {
let mut stack = vec![bucket_p.to_path_buf()];
'obj_scan: while let Some(current) = stack.pop() {
let is_root = current == bucket_p;
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if is_root {
if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
}
if ft.is_file() && !ft.is_symlink() {
has_objects = true;
break 'obj_scan;
}
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
}
}
}
}
let mut has_versions = false;
for root in &version_roots {
if has_versions {
break;
}
has_versions = has_any_file(root);
}
let mut has_multipart = false;
for root in &multipart_roots {
if has_multipart {
break;
}
has_multipart = has_any_file(root);
}
Ok((has_objects, has_versions, has_multipart))
})
}
#[pyfunction]
pub fn shallow_scan(
py: Python<'_>,
target_dir: &str,
prefix: &str,
meta_cache_json: &str,
) -> PyResult<Py<PyAny>> {
let target_owned = target_dir.to_owned();
let prefix_owned = prefix.to_owned();
let cache_owned = meta_cache_json.to_owned();
let result: (
Vec<(String, u64, f64, Option<String>)>,
Vec<String>,
Vec<(String, bool)>,
) = py.detach(move || -> PyResult<(
Vec<(String, u64, f64, Option<String>)>,
Vec<String>,
Vec<(String, bool)>,
)> {
let meta_cache: HashMap<String, String> =
serde_json::from_str(&cache_owned).unwrap_or_default();
let mut files: Vec<(String, u64, f64, Option<String>)> = Vec::new();
let mut dirs: Vec<String> = Vec::new();
let entries = match fs::read_dir(&target_owned) {
Ok(e) => e,
Err(_) => return Ok((files, dirs, Vec::new())),
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let name = match entry.file_name().into_string() {
Ok(n) => n,
Err(_) => continue,
};
if INTERNAL_FOLDERS.contains(&name.as_str()) {
continue;
}
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
let cp = format!("{}{}/", prefix_owned, name);
dirs.push(cp);
} else if ft.is_file() && !ft.is_symlink() {
let key = format!("{}{}", prefix_owned, name);
let md = match entry.metadata() {
Ok(m) => m,
Err(_) => continue,
};
let size = md.len();
let mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
let etag = meta_cache.get(&key).cloned();
files.push((key, size, mtime, etag));
}
}
files.sort_by(|a, b| a.0.cmp(&b.0));
dirs.sort();
let mut merged: Vec<(String, bool)> = Vec::with_capacity(files.len() + dirs.len());
let mut fi = 0;
let mut di = 0;
while fi < files.len() && di < dirs.len() {
if files[fi].0 <= dirs[di] {
merged.push((files[fi].0.clone(), false));
fi += 1;
} else {
merged.push((dirs[di].clone(), true));
di += 1;
}
}
while fi < files.len() {
merged.push((files[fi].0.clone(), false));
fi += 1;
}
while di < dirs.len() {
merged.push((dirs[di].clone(), true));
di += 1;
}
Ok((files, dirs, merged))
})?;
let (files, dirs, merged) = result;
let dict = PyDict::new(py);
let files_list = PyList::empty(py);
for (key, size, mtime, etag) in &files {
let etag_py: Py<PyAny> = match etag {
Some(e) => PyString::new(py, e).into_any().unbind(),
None => py.None(),
};
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
size.into_pyobject(py)?.into_any().unbind(),
mtime.into_pyobject(py)?.into_any().unbind(),
etag_py,
])?;
files_list.append(tuple)?;
}
dict.set_item("files", files_list)?;
let dirs_list = PyList::empty(py);
for d in &dirs {
dirs_list.append(PyString::new(py, d))?;
}
dict.set_item("dirs", dirs_list)?;
let merged_list = PyList::empty(py);
for (key, is_dir) in &merged {
let bool_obj: Py<PyAny> = if *is_dir {
true.into_pyobject(py)?.to_owned().into_any().unbind()
} else {
false.into_pyobject(py)?.to_owned().into_any().unbind()
};
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
bool_obj,
])?;
merged_list.append(tuple)?;
}
dict.set_item("merged_keys", merged_list)?;
Ok(dict.into_any().unbind())
}
#[pyfunction]
pub fn bucket_stats_scan(
py: Python<'_>,
bucket_path: &str,
versions_root: &str,
) -> PyResult<(u64, u64, u64, u64)> {
let bucket_owned = bucket_path.to_owned();
let versions_owned = versions_root.to_owned();
py.detach(move || -> PyResult<(u64, u64, u64, u64)> {
let mut object_count: u64 = 0;
let mut total_bytes: u64 = 0;
let bucket_p = Path::new(&bucket_owned);
if bucket_p.is_dir() {
let mut stack = vec![bucket_p.to_path_buf()];
while let Some(current) = stack.pop() {
let is_root = current == bucket_p;
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
if is_root {
if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
}
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
} else if ft.is_file() && !ft.is_symlink() {
object_count += 1;
if let Ok(md) = entry.metadata() {
total_bytes += md.len();
}
}
}
}
}
let mut version_count: u64 = 0;
let mut version_bytes: u64 = 0;
let versions_p = Path::new(&versions_owned);
if versions_p.is_dir() {
let mut stack = vec![versions_p.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
} else if ft.is_file() && !ft.is_symlink() {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".bin") {
version_count += 1;
if let Ok(md) = entry.metadata() {
version_bytes += md.len();
}
}
}
}
}
}
}
Ok((object_count, total_bytes, version_count, version_bytes))
})
}
#[pyfunction]
#[pyo3(signature = (bucket_path, search_root, query, limit))]
pub fn search_objects_scan(
py: Python<'_>,
bucket_path: &str,
search_root: &str,
query: &str,
limit: usize,
) -> PyResult<Py<PyAny>> {
let bucket_owned = bucket_path.to_owned();
let search_owned = search_root.to_owned();
let query_owned = query.to_owned();
let result: (Vec<(String, u64, f64)>, bool) = py.detach(
move || -> PyResult<(Vec<(String, u64, f64)>, bool)> {
let query_lower = query_owned.to_lowercase();
let bucket_len = bucket_owned.len() + 1;
let scan_limit = limit * 4;
let mut matched: usize = 0;
let mut results: Vec<(String, u64, f64)> = Vec::new();
let search_p = Path::new(&search_owned);
if !search_p.is_dir() {
return Ok((results, false));
}
let bucket_p = Path::new(&bucket_owned);
let mut stack = vec![search_p.to_path_buf()];
'scan: while let Some(current) = stack.pop() {
let is_bucket_root = current == bucket_p;
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
if is_bucket_root {
if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
}
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
} else if ft.is_file() && !ft.is_symlink() {
let full_path = entry.path();
let full_str = full_path.to_string_lossy();
if full_str.len() <= bucket_len {
continue;
}
let key = full_str[bucket_len..].replace('\\', "/");
if key.to_lowercase().contains(&query_lower) {
if let Ok(md) = entry.metadata() {
let size = md.len();
let mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
results.push((key, size, mtime));
matched += 1;
}
}
if matched >= scan_limit {
break 'scan;
}
}
}
}
results.sort_by(|a, b| a.0.cmp(&b.0));
let truncated = results.len() > limit;
results.truncate(limit);
Ok((results, truncated))
},
)?;
let (results, truncated) = result;
let dict = PyDict::new(py);
let results_list = PyList::empty(py);
for (key, size, mtime) in &results {
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
size.into_pyobject(py)?.into_any().unbind(),
mtime.into_pyobject(py)?.into_any().unbind(),
])?;
results_list.append(tuple)?;
}
dict.set_item("results", results_list)?;
dict.set_item("truncated", truncated)?;
Ok(dict.into_any().unbind())
}
#[pyfunction]
pub fn build_object_cache(
py: Python<'_>,
bucket_path: &str,
meta_root: &str,
etag_index_path: &str,
) -> PyResult<Py<PyAny>> {
let bucket_owned = bucket_path.to_owned();
let meta_owned = meta_root.to_owned();
let index_path_owned = etag_index_path.to_owned();
let result: (HashMap<String, String>, Vec<(String, u64, f64, Option<String>)>, bool) =
py.detach(move || -> PyResult<(
HashMap<String, String>,
Vec<(String, u64, f64, Option<String>)>,
bool,
)> {
let mut meta_cache: HashMap<String, String> = HashMap::new();
let mut index_mtime: f64 = 0.0;
let mut etag_cache_changed = false;
let index_p = Path::new(&index_path_owned);
if index_p.is_file() {
if let Ok(md) = fs::metadata(&index_path_owned) {
index_mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
}
if let Ok(content) = fs::read_to_string(&index_path_owned) {
if let Ok(parsed) = serde_json::from_str::<HashMap<String, String>>(&content) {
meta_cache = parsed;
}
}
}
let meta_p = Path::new(&meta_owned);
let mut needs_rebuild = false;
if meta_p.is_dir() && index_mtime > 0.0 {
fn check_newer(dir: &Path, index_mtime: f64) -> bool {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return false,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
if check_newer(&entry.path(), index_mtime) {
return true;
}
} else if ft.is_file() {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".meta.json") || name == "_index.json" {
if let Ok(md) = entry.metadata() {
let mt = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
if mt > index_mtime {
return true;
}
}
}
}
}
}
false
}
needs_rebuild = check_newer(meta_p, index_mtime);
} else if meta_cache.is_empty() {
needs_rebuild = true;
}
if needs_rebuild && meta_p.is_dir() {
let meta_str = meta_owned.clone();
let meta_len = meta_str.len() + 1;
let mut index_files: Vec<String> = Vec::new();
let mut legacy_meta_files: Vec<(String, String)> = Vec::new();
fn collect_meta(
dir: &Path,
meta_len: usize,
index_files: &mut Vec<String>,
legacy_meta_files: &mut Vec<(String, String)>,
) {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
collect_meta(&entry.path(), meta_len, index_files, legacy_meta_files);
} else if ft.is_file() {
if let Some(name) = entry.file_name().to_str() {
let full = entry.path().to_string_lossy().to_string();
if name == "_index.json" {
index_files.push(full);
} else if name.ends_with(".meta.json") {
if full.len() > meta_len {
let rel = &full[meta_len..];
let key = if rel.len() > 10 {
rel[..rel.len() - 10].replace('\\', "/")
} else {
continue;
};
legacy_meta_files.push((key, full));
}
}
}
}
}
}
collect_meta(
meta_p,
meta_len,
&mut index_files,
&mut legacy_meta_files,
);
meta_cache.clear();
for idx_path in &index_files {
if let Ok(content) = fs::read_to_string(idx_path) {
if let Ok(idx_data) = serde_json::from_str::<HashMap<String, Value>>(&content) {
let rel_dir = if idx_path.len() > meta_len {
let r = &idx_path[meta_len..];
r.replace('\\', "/")
} else {
String::new()
};
let dir_prefix = if rel_dir.ends_with("/_index.json") {
&rel_dir[..rel_dir.len() - "/_index.json".len()]
} else {
""
};
for (entry_name, entry_data) in &idx_data {
let key = if dir_prefix.is_empty() {
entry_name.clone()
} else {
format!("{}/{}", dir_prefix, entry_name)
};
if let Some(meta_obj) = entry_data.get("metadata") {
if let Some(etag) = meta_obj.get("__etag__") {
if let Some(etag_str) = etag.as_str() {
meta_cache.insert(key, etag_str.to_owned());
}
}
}
}
}
}
}
for (key, path) in &legacy_meta_files {
if meta_cache.contains_key(key) {
continue;
}
if let Ok(content) = fs::read(path) {
if let Some(etag) = extract_etag_from_meta_bytes(&content) {
meta_cache.insert(key.clone(), etag);
}
}
}
etag_cache_changed = true;
}
let bucket_p = Path::new(&bucket_owned);
let bucket_len = bucket_owned.len() + 1;
let mut objects: Vec<(String, u64, f64, Option<String>)> = Vec::new();
if bucket_p.is_dir() {
let mut stack = vec![bucket_p.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
let full = entry.path();
let full_str = full.to_string_lossy();
if full_str.len() > bucket_len {
let first_part: &str = if let Some(sep_pos) =
full_str[bucket_len..].find(|c: char| c == '\\' || c == '/')
{
&full_str[bucket_len..bucket_len + sep_pos]
} else {
&full_str[bucket_len..]
};
if INTERNAL_FOLDERS.contains(&first_part) {
continue;
}
} else if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
stack.push(full);
} else if ft.is_file() && !ft.is_symlink() {
let full = entry.path();
let full_str = full.to_string_lossy();
if full_str.len() <= bucket_len {
continue;
}
let rel = &full_str[bucket_len..];
let first_part: &str =
if let Some(sep_pos) = rel.find(|c: char| c == '\\' || c == '/') {
&rel[..sep_pos]
} else {
rel
};
if INTERNAL_FOLDERS.contains(&first_part) {
continue;
}
let key = rel.replace('\\', "/");
if let Ok(md) = entry.metadata() {
let size = md.len();
let mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
let etag = meta_cache.get(&key).cloned();
objects.push((key, size, mtime, etag));
}
}
}
}
}
Ok((meta_cache, objects, etag_cache_changed))
})?;
let (meta_cache, objects, etag_cache_changed) = result;
let dict = PyDict::new(py);
let cache_dict = PyDict::new(py);
for (k, v) in &meta_cache {
cache_dict.set_item(k, v)?;
}
dict.set_item("etag_cache", cache_dict)?;
let objects_list = PyList::empty(py);
for (key, size, mtime, etag) in &objects {
let etag_py: Py<PyAny> = match etag {
Some(e) => PyString::new(py, e).into_any().unbind(),
None => py.None(),
};
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
size.into_pyobject(py)?.into_any().unbind(),
mtime.into_pyobject(py)?.into_any().unbind(),
etag_py,
])?;
objects_list.append(tuple)?;
}
dict.set_item("objects", objects_list)?;
dict.set_item("etag_cache_changed", etag_cache_changed)?;
Ok(dict.into_any().unbind())
}

View File

@@ -0,0 +1,107 @@
use md5::{Digest, Md5};
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use std::fs::{self, File};
use std::io::{Read, Write};
use uuid::Uuid;
const DEFAULT_CHUNK_SIZE: usize = 262144;
#[pyfunction]
#[pyo3(signature = (stream, tmp_dir, chunk_size=DEFAULT_CHUNK_SIZE))]
pub fn stream_to_file_with_md5(
py: Python<'_>,
stream: &Bound<'_, PyAny>,
tmp_dir: &str,
chunk_size: usize,
) -> PyResult<(String, String, u64)> {
let chunk_size = if chunk_size == 0 {
DEFAULT_CHUNK_SIZE
} else {
chunk_size
};
fs::create_dir_all(tmp_dir)
.map_err(|e| PyIOError::new_err(format!("Failed to create tmp dir: {}", e)))?;
let tmp_name = format!("{}.tmp", Uuid::new_v4().as_hyphenated());
let tmp_path_buf = std::path::PathBuf::from(tmp_dir).join(&tmp_name);
let tmp_path = tmp_path_buf.to_string_lossy().into_owned();
let mut file = File::create(&tmp_path)
.map_err(|e| PyIOError::new_err(format!("Failed to create temp file: {}", e)))?;
let mut hasher = Md5::new();
let mut total_bytes: u64 = 0;
let result: PyResult<()> = (|| {
loop {
let chunk: Vec<u8> = stream.call_method1("read", (chunk_size,))?.extract()?;
if chunk.is_empty() {
break;
}
hasher.update(&chunk);
file.write_all(&chunk)
.map_err(|e| PyIOError::new_err(format!("Failed to write: {}", e)))?;
total_bytes += chunk.len() as u64;
py.check_signals()?;
}
Ok(())
})();
if let Err(e) = result {
drop(file);
let _ = fs::remove_file(&tmp_path);
return Err(e);
}
drop(file);
let md5_hex = format!("{:x}", hasher.finalize());
Ok((tmp_path, md5_hex, total_bytes))
}
#[pyfunction]
pub fn assemble_parts_with_md5(
py: Python<'_>,
part_paths: Vec<String>,
dest_path: &str,
) -> PyResult<String> {
if part_paths.is_empty() {
return Err(PyValueError::new_err("No parts to assemble"));
}
let dest = dest_path.to_owned();
let parts = part_paths;
py.detach(move || {
if let Some(parent) = std::path::Path::new(&dest).parent() {
fs::create_dir_all(parent)
.map_err(|e| PyIOError::new_err(format!("Failed to create dest dir: {}", e)))?;
}
let mut target = File::create(&dest)
.map_err(|e| PyIOError::new_err(format!("Failed to create dest file: {}", e)))?;
let mut hasher = Md5::new();
let mut buf = vec![0u8; 1024 * 1024];
for part_path in &parts {
let mut part = File::open(part_path)
.map_err(|e| PyIOError::new_err(format!("Failed to open part {}: {}", part_path, e)))?;
loop {
let n = part
.read(&mut buf)
.map_err(|e| PyIOError::new_err(format!("Failed to read part: {}", e)))?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
target
.write_all(&buf[..n])
.map_err(|e| PyIOError::new_err(format!("Failed to write: {}", e)))?;
}
}
Ok(format!("{:x}", hasher.finalize()))
})
}

View File

@@ -53,15 +53,17 @@ def test_special_characters_in_metadata(tmp_path: Path):
assert meta["special"] == "!@#$%^&*()" assert meta["special"] == "!@#$%^&*()"
def test_disk_full_scenario(tmp_path: Path, monkeypatch): def test_disk_full_scenario(tmp_path: Path, monkeypatch):
# Simulate disk full by mocking write to fail import app.storage as _storage_mod
monkeypatch.setattr(_storage_mod, "_HAS_RUST", False)
storage = ObjectStorage(tmp_path) storage = ObjectStorage(tmp_path)
storage.create_bucket("full") storage.create_bucket("full")
def mock_copyfileobj(*args, **kwargs): def mock_copyfileobj(*args, **kwargs):
raise OSError(28, "No space left on device") raise OSError(28, "No space left on device")
import shutil import shutil
monkeypatch.setattr(shutil, "copyfileobj", mock_copyfileobj) monkeypatch.setattr(shutil, "copyfileobj", mock_copyfileobj)
with pytest.raises(OSError, match="No space left on device"): with pytest.raises(OSError, match="No space left on device"):
storage.put_object("full", "file", io.BytesIO(b"data")) storage.put_object("full", "file", io.BytesIO(b"data"))

View File

@@ -0,0 +1,350 @@
import hashlib
import io
import os
import secrets
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
try:
import myfsio_core as _rc
HAS_RUST = True
except ImportError:
_rc = None
HAS_RUST = False
pytestmark = pytest.mark.skipif(not HAS_RUST, reason="myfsio_core not available")
class TestStreamToFileWithMd5:
def test_basic_write(self, tmp_path):
data = b"hello world" * 1000
stream = io.BytesIO(data)
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(stream, tmp_dir)
assert size == len(data)
assert md5_hex == hashlib.md5(data).hexdigest()
assert Path(tmp_path_str).exists()
assert Path(tmp_path_str).read_bytes() == data
def test_empty_stream(self, tmp_path):
stream = io.BytesIO(b"")
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(stream, tmp_dir)
assert size == 0
assert md5_hex == hashlib.md5(b"").hexdigest()
assert Path(tmp_path_str).read_bytes() == b""
def test_large_data(self, tmp_path):
data = os.urandom(1024 * 1024 * 2)
stream = io.BytesIO(data)
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(stream, tmp_dir)
assert size == len(data)
assert md5_hex == hashlib.md5(data).hexdigest()
def test_custom_chunk_size(self, tmp_path):
data = b"x" * 10000
stream = io.BytesIO(data)
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(
stream, tmp_dir, chunk_size=128
)
assert size == len(data)
assert md5_hex == hashlib.md5(data).hexdigest()
class TestAssemblePartsWithMd5:
def test_basic_assembly(self, tmp_path):
parts = []
combined = b""
for i in range(3):
data = f"part{i}data".encode() * 100
combined += data
p = tmp_path / f"part{i}"
p.write_bytes(data)
parts.append(str(p))
dest = str(tmp_path / "output")
md5_hex = _rc.assemble_parts_with_md5(parts, dest)
assert md5_hex == hashlib.md5(combined).hexdigest()
assert Path(dest).read_bytes() == combined
def test_single_part(self, tmp_path):
data = b"single part data"
p = tmp_path / "part0"
p.write_bytes(data)
dest = str(tmp_path / "output")
md5_hex = _rc.assemble_parts_with_md5([str(p)], dest)
assert md5_hex == hashlib.md5(data).hexdigest()
assert Path(dest).read_bytes() == data
def test_empty_parts_list(self):
with pytest.raises(ValueError, match="No parts"):
_rc.assemble_parts_with_md5([], "dummy")
def test_missing_part_file(self, tmp_path):
with pytest.raises(OSError):
_rc.assemble_parts_with_md5(
[str(tmp_path / "nonexistent")], str(tmp_path / "out")
)
def test_large_parts(self, tmp_path):
parts = []
combined = b""
for i in range(5):
data = os.urandom(512 * 1024)
combined += data
p = tmp_path / f"part{i}"
p.write_bytes(data)
parts.append(str(p))
dest = str(tmp_path / "output")
md5_hex = _rc.assemble_parts_with_md5(parts, dest)
assert md5_hex == hashlib.md5(combined).hexdigest()
assert Path(dest).read_bytes() == combined
class TestEncryptDecryptStreamChunked:
def _python_derive_chunk_nonce(self, base_nonce, chunk_index):
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=12,
salt=base_nonce,
info=chunk_index.to_bytes(4, "big"),
)
return hkdf.derive(b"chunk_nonce")
def test_encrypt_decrypt_roundtrip(self, tmp_path):
data = b"Hello, encryption!" * 500
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce
)
assert chunk_count > 0
chunk_count_dec = _rc.decrypt_stream_chunked(
encrypted_path, decrypted_path, key, base_nonce
)
assert chunk_count_dec == chunk_count
assert Path(decrypted_path).read_bytes() == data
def test_empty_file(self, tmp_path):
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "empty")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(b"")
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce
)
assert chunk_count == 0
chunk_count_dec = _rc.decrypt_stream_chunked(
encrypted_path, decrypted_path, key, base_nonce
)
assert chunk_count_dec == 0
assert Path(decrypted_path).read_bytes() == b""
def test_custom_chunk_size(self, tmp_path):
data = os.urandom(10000)
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce, chunk_size=1024
)
assert chunk_count == 10
_rc.decrypt_stream_chunked(encrypted_path, decrypted_path, key, base_nonce)
assert Path(decrypted_path).read_bytes() == data
def test_invalid_key_length(self, tmp_path):
input_path = str(tmp_path / "in")
Path(input_path).write_bytes(b"data")
with pytest.raises(ValueError, match="32 bytes"):
_rc.encrypt_stream_chunked(
input_path, str(tmp_path / "out"), b"short", secrets.token_bytes(12)
)
def test_invalid_nonce_length(self, tmp_path):
input_path = str(tmp_path / "in")
Path(input_path).write_bytes(b"data")
with pytest.raises(ValueError, match="12 bytes"):
_rc.encrypt_stream_chunked(
input_path, str(tmp_path / "out"), secrets.token_bytes(32), b"short"
)
def test_wrong_key_fails_decrypt(self, tmp_path):
data = b"sensitive data"
key = secrets.token_bytes(32)
wrong_key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
_rc.encrypt_stream_chunked(input_path, encrypted_path, key, base_nonce)
with pytest.raises((ValueError, OSError)):
_rc.decrypt_stream_chunked(
encrypted_path, decrypted_path, wrong_key, base_nonce
)
def test_cross_compat_python_encrypt_rust_decrypt(self, tmp_path):
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
data = b"cross compat test data" * 100
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
chunk_size = 1024
encrypted_path = str(tmp_path / "py_encrypted")
with open(encrypted_path, "wb") as f:
f.write(b"\x00\x00\x00\x00")
aesgcm = AESGCM(key)
chunk_index = 0
offset = 0
while offset < len(data):
chunk = data[offset:offset + chunk_size]
nonce = self._python_derive_chunk_nonce(base_nonce, chunk_index)
enc = aesgcm.encrypt(nonce, chunk, None)
f.write(len(enc).to_bytes(4, "big"))
f.write(enc)
chunk_index += 1
offset += chunk_size
f.seek(0)
f.write(chunk_index.to_bytes(4, "big"))
decrypted_path = str(tmp_path / "rust_decrypted")
_rc.decrypt_stream_chunked(encrypted_path, decrypted_path, key, base_nonce)
assert Path(decrypted_path).read_bytes() == data
def test_cross_compat_rust_encrypt_python_decrypt(self, tmp_path):
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
data = b"cross compat reverse test" * 100
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
chunk_size = 1024
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "rust_encrypted")
Path(input_path).write_bytes(data)
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce, chunk_size=chunk_size
)
aesgcm = AESGCM(key)
with open(encrypted_path, "rb") as f:
count_bytes = f.read(4)
assert int.from_bytes(count_bytes, "big") == chunk_count
decrypted = b""
for i in range(chunk_count):
size = int.from_bytes(f.read(4), "big")
enc_chunk = f.read(size)
nonce = self._python_derive_chunk_nonce(base_nonce, i)
decrypted += aesgcm.decrypt(nonce, enc_chunk, None)
assert decrypted == data
def test_large_file_roundtrip(self, tmp_path):
data = os.urandom(1024 * 1024)
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "large")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
_rc.encrypt_stream_chunked(input_path, encrypted_path, key, base_nonce)
_rc.decrypt_stream_chunked(encrypted_path, decrypted_path, key, base_nonce)
assert Path(decrypted_path).read_bytes() == data
class TestStreamingEncryptorFileMethods:
def test_encrypt_file_decrypt_file_roundtrip(self, tmp_path):
from app.encryption import LocalKeyEncryption, StreamingEncryptor
master_key_path = tmp_path / "master.key"
provider = LocalKeyEncryption(master_key_path)
encryptor = StreamingEncryptor(provider, chunk_size=512)
data = b"file method test data" * 200
input_path = str(tmp_path / "input")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
metadata = encryptor.encrypt_file(input_path, encrypted_path)
assert metadata.algorithm == "AES256"
encryptor.decrypt_file(encrypted_path, decrypted_path, metadata)
assert Path(decrypted_path).read_bytes() == data
def test_encrypt_file_matches_encrypt_stream(self, tmp_path):
from app.encryption import LocalKeyEncryption, StreamingEncryptor
master_key_path = tmp_path / "master.key"
provider = LocalKeyEncryption(master_key_path)
encryptor = StreamingEncryptor(provider, chunk_size=512)
data = b"stream vs file comparison" * 100
input_path = str(tmp_path / "input")
Path(input_path).write_bytes(data)
file_encrypted_path = str(tmp_path / "file_enc")
metadata_file = encryptor.encrypt_file(input_path, file_encrypted_path)
file_decrypted_path = str(tmp_path / "file_dec")
encryptor.decrypt_file(file_encrypted_path, file_decrypted_path, metadata_file)
assert Path(file_decrypted_path).read_bytes() == data
stream_enc, metadata_stream = encryptor.encrypt_stream(io.BytesIO(data))
stream_dec = encryptor.decrypt_stream(stream_enc, metadata_stream)
assert stream_dec.read() == data