16 Commits

Author SHA1 Message Date
eb0e435a5a MyFSIO v0.3.6 Release
Reviewed-on: #29
2026-03-08 04:46:31 +00:00
72f5d9d70c Restore data integrity guarantees: Content-MD5 validation, fsync durability, atomic metadata writes, concurrent write protection 2026-03-07 17:54:00 +08:00
be63e27c15 Reduce per-request CPU overhead: eliminate double stat(), cache content type and policy context, gate logging, configurable stat intervals 2026-03-07 14:08:23 +08:00
7633007a08 MyFSIO v0.3.5 Release
Reviewed-on: #28
2026-03-07 05:53:02 +00:00
81ef0fe4c7 Fix stale object count in bucket header and metrics dashboard after deletes 2026-03-03 19:42:37 +08:00
5f24bd920d Reduce P99 tail latency: defer etag index writes, eliminate double cache rebuild, skip redundant stat() in bucket config 2026-03-02 22:39:37 +08:00
8552f193de Reduce CPU/lock contention under concurrent uploads: split cache lock, in-memory stats, dict copy, lightweight request IDs, defaultdict metrics 2026-03-02 22:05:54 +08:00
de0d869c9f Merge pull request 'MyFSIO v0.3.4 Release' (#27) from next into main
Reviewed-on: #27
2026-03-02 08:31:32 +00:00
5536330aeb Move performance-critical Python functions to Rust: streaming I/O, multipart assembly, and AES-256-GCM encryption 2026-02-27 22:55:20 +08:00
d4657c389d Fix misleading default credentials in README to match actual random generation behavior 2026-02-27 21:58:10 +08:00
3827235232 Reduce CPU usage on heavy uploads: skip SHA256 body hashing in SigV4, use Rust md5_file post-write instead of per-chunk _HashingReader 2026-02-27 21:57:13 +08:00
fdd068feee MyFSIO v0.3.3 Release
Reviewed-on: #26
2026-02-27 04:49:32 +00:00
dfc0058d0d Extend myfsio_core Rust extension with 7 storage hot paths (directory scanning, metadata I/O, object listing, search, bucket stats, cache building) 2026-02-27 12:22:39 +08:00
27aef84311 Fix rclone CopyObject SignatureDoesNotMatch caused by internal metadata leaking as X-Amz-Meta headers 2026-02-26 21:39:43 +08:00
66b7677d2c MyFSIO v0.3.2 Release
Reviewed-on: #25
2026-02-26 10:10:19 +00:00
5003514a3d Fix null ETags in shallow listing by updating etag index on store/delete 2026-02-26 18:09:08 +08:00
18 changed files with 2139 additions and 326 deletions

View File

@@ -80,7 +80,7 @@ python run.py --mode api # API only (port 5000)
python run.py --mode ui # UI only (port 5100)
```
**Default Credentials:** `localadmin` / `localadmin`
**Credentials:** Generated automatically on first run and printed to the console. If missed, check the IAM config file at `<STORAGE_ROOT>/.myfsio.sys/config/iam.json`.
- **Web Console:** http://127.0.0.1:5100/ui
- **API Endpoint:** http://127.0.0.1:5000

View File

@@ -1,13 +1,13 @@
from __future__ import annotations
import html as html_module
import itertools
import logging
import mimetypes
import os
import shutil
import sys
import time
import uuid
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import timedelta
@@ -39,6 +39,8 @@ from .storage import ObjectStorage, StorageError
from .version import get_version
from .website_domains import WebsiteDomainStore
_request_counter = itertools.count(1)
def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path:
"""Migrate config file from legacy locations to the active path.
@@ -481,13 +483,9 @@ def _configure_logging(app: Flask) -> None:
@app.before_request
def _log_request_start() -> None:
g.request_id = uuid.uuid4().hex
g.request_id = f"{os.getpid():x}{next(_request_counter):012x}"
g.request_started_at = time.perf_counter()
g.request_bytes_in = request.content_length or 0
app.logger.info(
"Request started",
extra={"path": request.path, "method": request.method, "remote_addr": request.remote_addr},
)
@app.before_request
def _maybe_serve_website():
@@ -616,16 +614,17 @@ def _configure_logging(app: Flask) -> None:
duration_ms = 0.0
if hasattr(g, "request_started_at"):
duration_ms = (time.perf_counter() - g.request_started_at) * 1000
request_id = getattr(g, "request_id", uuid.uuid4().hex)
request_id = getattr(g, "request_id", f"{os.getpid():x}{next(_request_counter):012x}")
response.headers.setdefault("X-Request-ID", request_id)
app.logger.info(
"Request completed",
extra={
"path": request.path,
"method": request.method,
"remote_addr": request.remote_addr,
},
)
if app.logger.isEnabledFor(logging.INFO):
app.logger.info(
"Request completed",
extra={
"path": request.path,
"method": request.method,
"remote_addr": request.remote_addr,
},
)
response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}"
operation_metrics = app.extensions.get("operation_metrics")

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import ipaddress
import json
import os
import re
import time
from dataclasses import dataclass, field
@@ -268,7 +269,7 @@ class BucketPolicyStore:
self._last_mtime = self._current_mtime()
# Performance: Avoid stat() on every request
self._last_stat_check = 0.0
self._stat_check_interval = 1.0 # Only check mtime every 1 second
self._stat_check_interval = float(os.environ.get("BUCKET_POLICY_STAT_CHECK_INTERVAL_SECONDS", "2.0"))
def maybe_reload(self) -> None:
# Performance: Skip stat check if we checked recently

View File

@@ -19,6 +19,13 @@ from cryptography.hazmat.primitives import hashes
if sys.platform != "win32":
import fcntl
try:
import myfsio_core as _rc
_HAS_RUST = True
except ImportError:
_rc = None
_HAS_RUST = False
logger = logging.getLogger(__name__)
@@ -338,6 +345,69 @@ class StreamingEncryptor:
output.seek(0)
return output
def encrypt_file(self, input_path: str, output_path: str) -> EncryptionMetadata:
data_key, encrypted_data_key = self.provider.generate_data_key()
base_nonce = secrets.token_bytes(12)
if _HAS_RUST:
_rc.encrypt_stream_chunked(
input_path, output_path, data_key, base_nonce, self.chunk_size
)
else:
with open(input_path, "rb") as stream:
aesgcm = AESGCM(data_key)
with open(output_path, "wb") as out:
out.write(b"\x00\x00\x00\x00")
chunk_index = 0
while True:
chunk = stream.read(self.chunk_size)
if not chunk:
break
chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index)
encrypted_chunk = aesgcm.encrypt(chunk_nonce, chunk, None)
out.write(len(encrypted_chunk).to_bytes(self.HEADER_SIZE, "big"))
out.write(encrypted_chunk)
chunk_index += 1
out.seek(0)
out.write(chunk_index.to_bytes(4, "big"))
return EncryptionMetadata(
algorithm="AES256",
key_id=self.provider.KEY_ID if hasattr(self.provider, "KEY_ID") else "local",
nonce=base_nonce,
encrypted_data_key=encrypted_data_key,
)
def decrypt_file(self, input_path: str, output_path: str,
metadata: EncryptionMetadata) -> None:
data_key = self.provider.decrypt_data_key(metadata.encrypted_data_key, metadata.key_id)
base_nonce = metadata.nonce
if _HAS_RUST:
_rc.decrypt_stream_chunked(input_path, output_path, data_key, base_nonce)
else:
with open(input_path, "rb") as stream:
chunk_count_bytes = stream.read(4)
if len(chunk_count_bytes) < 4:
raise EncryptionError("Invalid encrypted stream: missing header")
chunk_count = int.from_bytes(chunk_count_bytes, "big")
aesgcm = AESGCM(data_key)
with open(output_path, "wb") as out:
for chunk_index in range(chunk_count):
size_bytes = stream.read(self.HEADER_SIZE)
if len(size_bytes) < self.HEADER_SIZE:
raise EncryptionError(f"Invalid encrypted stream: truncated at chunk {chunk_index}")
chunk_size = int.from_bytes(size_bytes, "big")
encrypted_chunk = stream.read(chunk_size)
if len(encrypted_chunk) < chunk_size:
raise EncryptionError(f"Invalid encrypted stream: incomplete chunk {chunk_index}")
chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index)
try:
decrypted_chunk = aesgcm.decrypt(chunk_nonce, encrypted_chunk, None)
out.write(decrypted_chunk)
except Exception as exc:
raise EncryptionError(f"Failed to decrypt chunk {chunk_index}: {exc}") from exc
class EncryptionManager:
"""Manages encryption providers and operations."""

View File

@@ -125,7 +125,7 @@ class IamService:
self._secret_key_cache: Dict[str, Tuple[str, float]] = {}
self._cache_ttl = float(os.environ.get("IAM_CACHE_TTL_SECONDS", "5.0"))
self._last_stat_check = 0.0
self._stat_check_interval = 1.0
self._stat_check_interval = float(os.environ.get("IAM_STAT_CHECK_INTERVAL_SECONDS", "2.0"))
self._sessions: Dict[str, Dict[str, Any]] = {}
self._session_lock = threading.Lock()
self._load()

View File

@@ -5,6 +5,7 @@ import logging
import random
import threading
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
@@ -138,8 +139,8 @@ class OperationMetricsCollector:
self.interval_seconds = interval_minutes * 60
self.retention_hours = retention_hours
self._lock = threading.Lock()
self._by_method: Dict[str, OperationStats] = {}
self._by_endpoint: Dict[str, OperationStats] = {}
self._by_method: Dict[str, OperationStats] = defaultdict(OperationStats)
self._by_endpoint: Dict[str, OperationStats] = defaultdict(OperationStats)
self._by_status_class: Dict[str, int] = {}
self._error_codes: Dict[str, int] = {}
self._totals = OperationStats()
@@ -211,8 +212,8 @@ class OperationMetricsCollector:
self._prune_old_snapshots()
self._save_history()
self._by_method.clear()
self._by_endpoint.clear()
self._by_method = defaultdict(OperationStats)
self._by_endpoint = defaultdict(OperationStats)
self._by_status_class.clear()
self._error_codes.clear()
self._totals = OperationStats()
@@ -232,12 +233,7 @@ class OperationMetricsCollector:
status_class = f"{status_code // 100}xx"
with self._lock:
if method not in self._by_method:
self._by_method[method] = OperationStats()
self._by_method[method].record(latency_ms, success, bytes_in, bytes_out)
if endpoint_type not in self._by_endpoint:
self._by_endpoint[endpoint_type] = OperationStats()
self._by_endpoint[endpoint_type].record(latency_ms, success, bytes_in, bytes_out)
self._by_status_class[status_class] = self._by_status_class.get(status_class, 0) + 1

View File

@@ -85,6 +85,9 @@ def _bucket_policies() -> BucketPolicyStore:
def _build_policy_context() -> Dict[str, Any]:
cached = getattr(g, "_policy_context", None)
if cached is not None:
return cached
ctx: Dict[str, Any] = {}
if request.headers.get("Referer"):
ctx["aws:Referer"] = request.headers.get("Referer")
@@ -98,6 +101,7 @@ def _build_policy_context() -> Dict[str, Any]:
ctx["aws:SecureTransport"] = str(request.is_secure).lower()
if request.headers.get("User-Agent"):
ctx["aws:UserAgent"] = request.headers.get("User-Agent")
g._policy_context = ctx
return ctx
@@ -293,9 +297,7 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
raise IamError("Required headers not signed")
canonical_uri = _get_canonical_uri(req)
payload_hash = req.headers.get("X-Amz-Content-Sha256")
if not payload_hash:
payload_hash = hashlib.sha256(req.get_data()).hexdigest()
payload_hash = req.headers.get("X-Amz-Content-Sha256") or "UNSIGNED-PAYLOAD"
if _HAS_RUST:
query_params = list(req.args.items(multi=True))
@@ -305,16 +307,10 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
header_values, payload_hash, amz_date, date_stamp, region,
service, secret_key, signature,
):
if current_app.config.get("DEBUG_SIGV4"):
logger.warning("SigV4 signature mismatch for %s %s", req.method, req.path)
raise IamError("SignatureDoesNotMatch")
else:
method = req.method
query_args = []
for key, value in req.args.items(multi=True):
query_args.append((key, value))
query_args.sort(key=lambda x: (x[0], x[1]))
query_args = sorted(req.args.items(multi=True), key=lambda x: (x[0], x[1]))
canonical_query_parts = []
for k, v in query_args:
canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}")
@@ -339,8 +335,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(calculated_signature, signature):
if current_app.config.get("DEBUG_SIGV4"):
logger.warning("SigV4 signature mismatch for %s %s", method, req.path)
raise IamError("SignatureDoesNotMatch")
session_token = req.headers.get("X-Amz-Security-Token")
@@ -682,7 +676,7 @@ def _extract_request_metadata() -> Dict[str, str]:
for header, value in request.headers.items():
if header.lower().startswith("x-amz-meta-"):
key = header[11:]
if key:
if key and not (key.startswith("__") and key.endswith("__")):
metadata[key] = value
return metadata
@@ -1031,14 +1025,20 @@ def _apply_object_headers(
file_stat,
metadata: Dict[str, str] | None,
etag: str,
size_override: int | None = None,
mtime_override: float | None = None,
) -> None:
if file_stat is not None:
if response.status_code != 206:
response.headers["Content-Length"] = str(file_stat.st_size)
response.headers["Last-Modified"] = http_date(file_stat.st_mtime)
effective_size = size_override if size_override is not None else (file_stat.st_size if file_stat is not None else None)
effective_mtime = mtime_override if mtime_override is not None else (file_stat.st_mtime if file_stat is not None else None)
if effective_size is not None and response.status_code != 206:
response.headers["Content-Length"] = str(effective_size)
if effective_mtime is not None:
response.headers["Last-Modified"] = http_date(effective_mtime)
response.headers["ETag"] = f'"{etag}"'
response.headers["Accept-Ranges"] = "bytes"
for key, value in (metadata or {}).items():
if key.startswith("__") and key.endswith("__"):
continue
safe_value = _sanitize_header_value(str(value))
response.headers[f"X-Amz-Meta-{key}"] = safe_value
@@ -2467,7 +2467,7 @@ def _post_object(bucket_name: str) -> Response:
for field_name, value in request.form.items():
if field_name.lower().startswith("x-amz-meta-"):
key = field_name[11:]
if key:
if key and not (key.startswith("__") and key.endswith("__")):
metadata[key] = value
try:
meta = storage.put_object(bucket_name, object_key, file.stream, metadata=metadata or None)
@@ -2828,6 +2828,8 @@ def object_handler(bucket_name: str, object_key: str):
if validation_error:
return _error_response("InvalidArgument", validation_error, 400)
metadata["__content_type__"] = content_type or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
try:
meta = storage.put_object(
bucket_name,
@@ -2842,10 +2844,23 @@ def object_handler(bucket_name: str, object_key: str):
if "Bucket" in message:
return _error_response("NoSuchBucket", message, 404)
return _error_response("InvalidArgument", message, 400)
current_app.logger.info(
"Object uploaded",
extra={"bucket": bucket_name, "key": object_key, "size": meta.size},
)
content_md5 = request.headers.get("Content-MD5")
if content_md5 and meta.etag:
try:
expected_md5 = base64.b64decode(content_md5).hex()
except Exception:
storage.delete_object(bucket_name, object_key)
return _error_response("InvalidDigest", "Content-MD5 header is not valid base64", 400)
if expected_md5 != meta.etag:
storage.delete_object(bucket_name, object_key)
return _error_response("BadDigest", "The Content-MD5 you specified did not match what we received", 400)
if current_app.logger.isEnabledFor(logging.INFO):
current_app.logger.info(
"Object uploaded",
extra={"bucket": bucket_name, "key": object_key, "size": meta.size},
)
response = Response(status=200)
if meta.etag:
response.headers["ETag"] = f'"{meta.etag}"'
@@ -2879,7 +2894,7 @@ def object_handler(bucket_name: str, object_key: str):
except StorageError as exc:
return _error_response("NoSuchKey", str(exc), 404)
metadata = storage.get_object_metadata(bucket_name, object_key)
mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
mimetype = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
is_encrypted = "x-amz-server-side-encryption" in metadata
@@ -2971,10 +2986,7 @@ def object_handler(bucket_name: str, object_key: str):
response.headers["Content-Type"] = mimetype
logged_bytes = 0
try:
file_stat = path.stat() if not is_encrypted else None
except (PermissionError, OSError):
file_stat = None
file_stat = stat if not is_encrypted else None
_apply_object_headers(response, file_stat=file_stat, metadata=metadata, etag=etag)
if request.method == "GET":
@@ -2991,8 +3003,9 @@ def object_handler(bucket_name: str, object_key: str):
if value:
response.headers[header] = _sanitize_header_value(value)
action = "Object read" if request.method == "GET" else "Object head"
current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes})
if current_app.logger.isEnabledFor(logging.INFO):
action = "Object read" if request.method == "GET" else "Object head"
current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes})
return response
if "uploadId" in request.args:
@@ -3010,7 +3023,8 @@ def object_handler(bucket_name: str, object_key: str):
storage.delete_object(bucket_name, object_key)
lock_service.delete_object_lock_metadata(bucket_name, object_key)
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
if current_app.logger.isEnabledFor(logging.INFO):
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
principal, _ = _require_principal()
_notifications().emit_object_removed(
@@ -3351,12 +3365,20 @@ def head_object(bucket_name: str, object_key: str) -> Response:
_authorize_action(principal, bucket_name, "read", object_key=object_key)
path = _storage().get_object_path(bucket_name, object_key)
metadata = _storage().get_object_metadata(bucket_name, object_key)
stat = path.stat()
etag = metadata.get("__etag__") or _storage()._compute_etag(path)
response = Response(status=200)
_apply_object_headers(response, file_stat=stat, metadata=metadata, etag=etag)
response.headers["Content-Type"] = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
cached_size = metadata.get("__size__")
cached_mtime = metadata.get("__last_modified__")
if cached_size is not None and cached_mtime is not None:
size_val = int(cached_size)
mtime_val = float(cached_mtime)
response = Response(status=200)
_apply_object_headers(response, file_stat=None, metadata=metadata, etag=etag, size_override=size_val, mtime_override=mtime_val)
else:
stat = path.stat()
response = Response(status=200)
_apply_object_headers(response, file_stat=stat, metadata=metadata, etag=etag)
response.headers["Content-Type"] = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
return response
except (StorageError, FileNotFoundError):
return _error_response("NoSuchKey", "Object not found", 404)
@@ -3445,7 +3467,7 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response:
if validation_error:
return _error_response("InvalidArgument", validation_error, 400)
else:
metadata = source_metadata
metadata = {k: v for k, v in source_metadata.items() if not (k.startswith("__") and k.endswith("__"))}
try:
with source_path.open("rb") as stream:
@@ -3586,6 +3608,8 @@ def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response:
return error
metadata = _extract_request_metadata()
content_type = request.headers.get("Content-Type")
metadata["__content_type__"] = content_type or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
try:
upload_id = _storage().initiate_multipart_upload(
bucket_name,
@@ -3638,6 +3662,15 @@ def _upload_part(bucket_name: str, object_key: str) -> Response:
return _error_response("NoSuchUpload", str(exc), 404)
return _error_response("InvalidArgument", str(exc), 400)
content_md5 = request.headers.get("Content-MD5")
if content_md5 and etag:
try:
expected_md5 = base64.b64decode(content_md5).hex()
except Exception:
return _error_response("InvalidDigest", "Content-MD5 header is not valid base64", 400)
if expected_md5 != etag:
return _error_response("BadDigest", "The Content-MD5 you specified did not match what we received", 400)
response = Response(status=200)
response.headers["ETag"] = f'"{etag}"'
return response

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
APP_VERSION = "0.3.1"
APP_VERSION = "0.3.6"
def get_version() -> str:

View File

@@ -19,3 +19,6 @@ regex = "1"
lru = "0.14"
parking_lot = "0.12"
percent-encoding = "2"
aes-gcm = "0.10"
hkdf = "0.12"
uuid = { version = "1", features = ["v4"] }

192
myfsio_core/src/crypto.rs Normal file
View File

@@ -0,0 +1,192 @@
use aes_gcm::aead::Aead;
use aes_gcm::{Aes256Gcm, KeyInit, Nonce};
use hkdf::Hkdf;
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use sha2::Sha256;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
const DEFAULT_CHUNK_SIZE: usize = 65536;
const HEADER_SIZE: usize = 4;
fn read_exact_chunk(reader: &mut impl Read, buf: &mut [u8]) -> std::io::Result<usize> {
let mut filled = 0;
while filled < buf.len() {
match reader.read(&mut buf[filled..]) {
Ok(0) => break,
Ok(n) => filled += n,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
Ok(filled)
}
fn derive_chunk_nonce(base_nonce: &[u8], chunk_index: u32) -> Result<[u8; 12], String> {
let hkdf = Hkdf::<Sha256>::new(Some(base_nonce), b"chunk_nonce");
let mut okm = [0u8; 12];
hkdf.expand(&chunk_index.to_be_bytes(), &mut okm)
.map_err(|e| format!("HKDF expand failed: {}", e))?;
Ok(okm)
}
#[pyfunction]
#[pyo3(signature = (input_path, output_path, key, base_nonce, chunk_size=DEFAULT_CHUNK_SIZE))]
pub fn encrypt_stream_chunked(
py: Python<'_>,
input_path: &str,
output_path: &str,
key: &[u8],
base_nonce: &[u8],
chunk_size: usize,
) -> PyResult<u32> {
if key.len() != 32 {
return Err(PyValueError::new_err(format!(
"Key must be 32 bytes, got {}",
key.len()
)));
}
if base_nonce.len() != 12 {
return Err(PyValueError::new_err(format!(
"Base nonce must be 12 bytes, got {}",
base_nonce.len()
)));
}
let chunk_size = if chunk_size == 0 {
DEFAULT_CHUNK_SIZE
} else {
chunk_size
};
let inp = input_path.to_owned();
let out = output_path.to_owned();
let key_arr: [u8; 32] = key.try_into().unwrap();
let nonce_arr: [u8; 12] = base_nonce.try_into().unwrap();
py.detach(move || {
let cipher = Aes256Gcm::new(&key_arr.into());
let mut infile = File::open(&inp)
.map_err(|e| PyIOError::new_err(format!("Failed to open input: {}", e)))?;
let mut outfile = File::create(&out)
.map_err(|e| PyIOError::new_err(format!("Failed to create output: {}", e)))?;
outfile
.write_all(&[0u8; 4])
.map_err(|e| PyIOError::new_err(format!("Failed to write header: {}", e)))?;
let mut buf = vec![0u8; chunk_size];
let mut chunk_index: u32 = 0;
loop {
let n = read_exact_chunk(&mut infile, &mut buf)
.map_err(|e| PyIOError::new_err(format!("Failed to read: {}", e)))?;
if n == 0 {
break;
}
let nonce_bytes = derive_chunk_nonce(&nonce_arr, chunk_index)
.map_err(|e| PyValueError::new_err(e))?;
let nonce = Nonce::from_slice(&nonce_bytes);
let encrypted = cipher
.encrypt(nonce, &buf[..n])
.map_err(|e| PyValueError::new_err(format!("Encrypt failed: {}", e)))?;
let size = encrypted.len() as u32;
outfile
.write_all(&size.to_be_bytes())
.map_err(|e| PyIOError::new_err(format!("Failed to write chunk size: {}", e)))?;
outfile
.write_all(&encrypted)
.map_err(|e| PyIOError::new_err(format!("Failed to write chunk: {}", e)))?;
chunk_index += 1;
}
outfile
.seek(SeekFrom::Start(0))
.map_err(|e| PyIOError::new_err(format!("Failed to seek: {}", e)))?;
outfile
.write_all(&chunk_index.to_be_bytes())
.map_err(|e| PyIOError::new_err(format!("Failed to write chunk count: {}", e)))?;
Ok(chunk_index)
})
}
#[pyfunction]
pub fn decrypt_stream_chunked(
py: Python<'_>,
input_path: &str,
output_path: &str,
key: &[u8],
base_nonce: &[u8],
) -> PyResult<u32> {
if key.len() != 32 {
return Err(PyValueError::new_err(format!(
"Key must be 32 bytes, got {}",
key.len()
)));
}
if base_nonce.len() != 12 {
return Err(PyValueError::new_err(format!(
"Base nonce must be 12 bytes, got {}",
base_nonce.len()
)));
}
let inp = input_path.to_owned();
let out = output_path.to_owned();
let key_arr: [u8; 32] = key.try_into().unwrap();
let nonce_arr: [u8; 12] = base_nonce.try_into().unwrap();
py.detach(move || {
let cipher = Aes256Gcm::new(&key_arr.into());
let mut infile = File::open(&inp)
.map_err(|e| PyIOError::new_err(format!("Failed to open input: {}", e)))?;
let mut outfile = File::create(&out)
.map_err(|e| PyIOError::new_err(format!("Failed to create output: {}", e)))?;
let mut header = [0u8; HEADER_SIZE];
infile
.read_exact(&mut header)
.map_err(|e| PyIOError::new_err(format!("Failed to read header: {}", e)))?;
let chunk_count = u32::from_be_bytes(header);
let mut size_buf = [0u8; HEADER_SIZE];
for chunk_index in 0..chunk_count {
infile
.read_exact(&mut size_buf)
.map_err(|e| {
PyIOError::new_err(format!(
"Failed to read chunk {} size: {}",
chunk_index, e
))
})?;
let chunk_size = u32::from_be_bytes(size_buf) as usize;
let mut encrypted = vec![0u8; chunk_size];
infile.read_exact(&mut encrypted).map_err(|e| {
PyIOError::new_err(format!("Failed to read chunk {}: {}", chunk_index, e))
})?;
let nonce_bytes = derive_chunk_nonce(&nonce_arr, chunk_index)
.map_err(|e| PyValueError::new_err(e))?;
let nonce = Nonce::from_slice(&nonce_bytes);
let decrypted = cipher.decrypt(nonce, encrypted.as_ref()).map_err(|e| {
PyValueError::new_err(format!("Decrypt chunk {} failed: {}", chunk_index, e))
})?;
outfile.write_all(&decrypted).map_err(|e| {
PyIOError::new_err(format!("Failed to write chunk {}: {}", chunk_index, e))
})?;
}
Ok(chunk_count)
})
}

View File

@@ -1,6 +1,9 @@
mod crypto;
mod hashing;
mod metadata;
mod sigv4;
mod storage;
mod streaming;
mod validation;
use pyo3::prelude::*;
@@ -29,6 +32,20 @@ mod myfsio_core {
m.add_function(wrap_pyfunction!(metadata::read_index_entry, m)?)?;
m.add_function(wrap_pyfunction!(storage::write_index_entry, m)?)?;
m.add_function(wrap_pyfunction!(storage::delete_index_entry, m)?)?;
m.add_function(wrap_pyfunction!(storage::check_bucket_contents, m)?)?;
m.add_function(wrap_pyfunction!(storage::shallow_scan, m)?)?;
m.add_function(wrap_pyfunction!(storage::bucket_stats_scan, m)?)?;
m.add_function(wrap_pyfunction!(storage::search_objects_scan, m)?)?;
m.add_function(wrap_pyfunction!(storage::build_object_cache, m)?)?;
m.add_function(wrap_pyfunction!(streaming::stream_to_file_with_md5, m)?)?;
m.add_function(wrap_pyfunction!(streaming::assemble_parts_with_md5, m)?)?;
m.add_function(wrap_pyfunction!(crypto::encrypt_stream_chunked, m)?)?;
m.add_function(wrap_pyfunction!(crypto::decrypt_stream_chunked, m)?)?;
Ok(())
}
}

817
myfsio_core/src/storage.rs Normal file
View File

@@ -0,0 +1,817 @@
use pyo3::exceptions::PyIOError;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList, PyString, PyTuple};
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::SystemTime;
const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"];
fn system_time_to_epoch(t: SystemTime) -> f64 {
t.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
fn extract_etag_from_meta_bytes(content: &[u8]) -> Option<String> {
let marker = b"\"__etag__\"";
let idx = content.windows(marker.len()).position(|w| w == marker)?;
let after = &content[idx + marker.len()..];
let start = after.iter().position(|&b| b == b'"')? + 1;
let rest = &after[start..];
let end = rest.iter().position(|&b| b == b'"')?;
std::str::from_utf8(&rest[..end]).ok().map(|s| s.to_owned())
}
fn has_any_file(root: &str) -> bool {
let root_path = Path::new(root);
if !root_path.is_dir() {
return false;
}
let mut stack = vec![root_path.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_file() {
return true;
}
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
}
}
}
false
}
#[pyfunction]
pub fn write_index_entry(
py: Python<'_>,
path: &str,
entry_name: &str,
entry_data_json: &str,
) -> PyResult<()> {
let path_owned = path.to_owned();
let entry_owned = entry_name.to_owned();
let data_owned = entry_data_json.to_owned();
py.detach(move || -> PyResult<()> {
let entry_value: Value = serde_json::from_str(&data_owned)
.map_err(|e| PyIOError::new_err(format!("Failed to parse entry data: {}", e)))?;
if let Some(parent) = Path::new(&path_owned).parent() {
let _ = fs::create_dir_all(parent);
}
let mut index_data: serde_json::Map<String, Value> = match fs::read_to_string(&path_owned)
{
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => serde_json::Map::new(),
};
index_data.insert(entry_owned, entry_value);
let serialized = serde_json::to_string(&Value::Object(index_data))
.map_err(|e| PyIOError::new_err(format!("Failed to serialize index: {}", e)))?;
fs::write(&path_owned, serialized)
.map_err(|e| PyIOError::new_err(format!("Failed to write index: {}", e)))?;
Ok(())
})
}
#[pyfunction]
pub fn delete_index_entry(py: Python<'_>, path: &str, entry_name: &str) -> PyResult<bool> {
let path_owned = path.to_owned();
let entry_owned = entry_name.to_owned();
py.detach(move || -> PyResult<bool> {
let content = match fs::read_to_string(&path_owned) {
Ok(c) => c,
Err(_) => return Ok(false),
};
let mut index_data: serde_json::Map<String, Value> =
match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return Ok(false),
};
if index_data.remove(&entry_owned).is_none() {
return Ok(false);
}
if index_data.is_empty() {
let _ = fs::remove_file(&path_owned);
return Ok(true);
}
let serialized = serde_json::to_string(&Value::Object(index_data))
.map_err(|e| PyIOError::new_err(format!("Failed to serialize index: {}", e)))?;
fs::write(&path_owned, serialized)
.map_err(|e| PyIOError::new_err(format!("Failed to write index: {}", e)))?;
Ok(false)
})
}
#[pyfunction]
pub fn check_bucket_contents(
py: Python<'_>,
bucket_path: &str,
version_roots: Vec<String>,
multipart_roots: Vec<String>,
) -> PyResult<(bool, bool, bool)> {
let bucket_owned = bucket_path.to_owned();
py.detach(move || -> PyResult<(bool, bool, bool)> {
let mut has_objects = false;
let bucket_p = Path::new(&bucket_owned);
if bucket_p.is_dir() {
let mut stack = vec![bucket_p.to_path_buf()];
'obj_scan: while let Some(current) = stack.pop() {
let is_root = current == bucket_p;
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if is_root {
if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
}
if ft.is_file() && !ft.is_symlink() {
has_objects = true;
break 'obj_scan;
}
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
}
}
}
}
let mut has_versions = false;
for root in &version_roots {
if has_versions {
break;
}
has_versions = has_any_file(root);
}
let mut has_multipart = false;
for root in &multipart_roots {
if has_multipart {
break;
}
has_multipart = has_any_file(root);
}
Ok((has_objects, has_versions, has_multipart))
})
}
#[pyfunction]
pub fn shallow_scan(
py: Python<'_>,
target_dir: &str,
prefix: &str,
meta_cache_json: &str,
) -> PyResult<Py<PyAny>> {
let target_owned = target_dir.to_owned();
let prefix_owned = prefix.to_owned();
let cache_owned = meta_cache_json.to_owned();
let result: (
Vec<(String, u64, f64, Option<String>)>,
Vec<String>,
Vec<(String, bool)>,
) = py.detach(move || -> PyResult<(
Vec<(String, u64, f64, Option<String>)>,
Vec<String>,
Vec<(String, bool)>,
)> {
let meta_cache: HashMap<String, String> =
serde_json::from_str(&cache_owned).unwrap_or_default();
let mut files: Vec<(String, u64, f64, Option<String>)> = Vec::new();
let mut dirs: Vec<String> = Vec::new();
let entries = match fs::read_dir(&target_owned) {
Ok(e) => e,
Err(_) => return Ok((files, dirs, Vec::new())),
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let name = match entry.file_name().into_string() {
Ok(n) => n,
Err(_) => continue,
};
if INTERNAL_FOLDERS.contains(&name.as_str()) {
continue;
}
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
let cp = format!("{}{}/", prefix_owned, name);
dirs.push(cp);
} else if ft.is_file() && !ft.is_symlink() {
let key = format!("{}{}", prefix_owned, name);
let md = match entry.metadata() {
Ok(m) => m,
Err(_) => continue,
};
let size = md.len();
let mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
let etag = meta_cache.get(&key).cloned();
files.push((key, size, mtime, etag));
}
}
files.sort_by(|a, b| a.0.cmp(&b.0));
dirs.sort();
let mut merged: Vec<(String, bool)> = Vec::with_capacity(files.len() + dirs.len());
let mut fi = 0;
let mut di = 0;
while fi < files.len() && di < dirs.len() {
if files[fi].0 <= dirs[di] {
merged.push((files[fi].0.clone(), false));
fi += 1;
} else {
merged.push((dirs[di].clone(), true));
di += 1;
}
}
while fi < files.len() {
merged.push((files[fi].0.clone(), false));
fi += 1;
}
while di < dirs.len() {
merged.push((dirs[di].clone(), true));
di += 1;
}
Ok((files, dirs, merged))
})?;
let (files, dirs, merged) = result;
let dict = PyDict::new(py);
let files_list = PyList::empty(py);
for (key, size, mtime, etag) in &files {
let etag_py: Py<PyAny> = match etag {
Some(e) => PyString::new(py, e).into_any().unbind(),
None => py.None(),
};
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
size.into_pyobject(py)?.into_any().unbind(),
mtime.into_pyobject(py)?.into_any().unbind(),
etag_py,
])?;
files_list.append(tuple)?;
}
dict.set_item("files", files_list)?;
let dirs_list = PyList::empty(py);
for d in &dirs {
dirs_list.append(PyString::new(py, d))?;
}
dict.set_item("dirs", dirs_list)?;
let merged_list = PyList::empty(py);
for (key, is_dir) in &merged {
let bool_obj: Py<PyAny> = if *is_dir {
true.into_pyobject(py)?.to_owned().into_any().unbind()
} else {
false.into_pyobject(py)?.to_owned().into_any().unbind()
};
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
bool_obj,
])?;
merged_list.append(tuple)?;
}
dict.set_item("merged_keys", merged_list)?;
Ok(dict.into_any().unbind())
}
#[pyfunction]
pub fn bucket_stats_scan(
py: Python<'_>,
bucket_path: &str,
versions_root: &str,
) -> PyResult<(u64, u64, u64, u64)> {
let bucket_owned = bucket_path.to_owned();
let versions_owned = versions_root.to_owned();
py.detach(move || -> PyResult<(u64, u64, u64, u64)> {
let mut object_count: u64 = 0;
let mut total_bytes: u64 = 0;
let bucket_p = Path::new(&bucket_owned);
if bucket_p.is_dir() {
let mut stack = vec![bucket_p.to_path_buf()];
while let Some(current) = stack.pop() {
let is_root = current == bucket_p;
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
if is_root {
if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
}
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
} else if ft.is_file() && !ft.is_symlink() {
object_count += 1;
if let Ok(md) = entry.metadata() {
total_bytes += md.len();
}
}
}
}
}
let mut version_count: u64 = 0;
let mut version_bytes: u64 = 0;
let versions_p = Path::new(&versions_owned);
if versions_p.is_dir() {
let mut stack = vec![versions_p.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
} else if ft.is_file() && !ft.is_symlink() {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".bin") {
version_count += 1;
if let Ok(md) = entry.metadata() {
version_bytes += md.len();
}
}
}
}
}
}
}
Ok((object_count, total_bytes, version_count, version_bytes))
})
}
#[pyfunction]
#[pyo3(signature = (bucket_path, search_root, query, limit))]
pub fn search_objects_scan(
py: Python<'_>,
bucket_path: &str,
search_root: &str,
query: &str,
limit: usize,
) -> PyResult<Py<PyAny>> {
let bucket_owned = bucket_path.to_owned();
let search_owned = search_root.to_owned();
let query_owned = query.to_owned();
let result: (Vec<(String, u64, f64)>, bool) = py.detach(
move || -> PyResult<(Vec<(String, u64, f64)>, bool)> {
let query_lower = query_owned.to_lowercase();
let bucket_len = bucket_owned.len() + 1;
let scan_limit = limit * 4;
let mut matched: usize = 0;
let mut results: Vec<(String, u64, f64)> = Vec::new();
let search_p = Path::new(&search_owned);
if !search_p.is_dir() {
return Ok((results, false));
}
let bucket_p = Path::new(&bucket_owned);
let mut stack = vec![search_p.to_path_buf()];
'scan: while let Some(current) = stack.pop() {
let is_bucket_root = current == bucket_p;
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
if is_bucket_root {
if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
}
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
stack.push(entry.path());
} else if ft.is_file() && !ft.is_symlink() {
let full_path = entry.path();
let full_str = full_path.to_string_lossy();
if full_str.len() <= bucket_len {
continue;
}
let key = full_str[bucket_len..].replace('\\', "/");
if key.to_lowercase().contains(&query_lower) {
if let Ok(md) = entry.metadata() {
let size = md.len();
let mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
results.push((key, size, mtime));
matched += 1;
}
}
if matched >= scan_limit {
break 'scan;
}
}
}
}
results.sort_by(|a, b| a.0.cmp(&b.0));
let truncated = results.len() > limit;
results.truncate(limit);
Ok((results, truncated))
},
)?;
let (results, truncated) = result;
let dict = PyDict::new(py);
let results_list = PyList::empty(py);
for (key, size, mtime) in &results {
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
size.into_pyobject(py)?.into_any().unbind(),
mtime.into_pyobject(py)?.into_any().unbind(),
])?;
results_list.append(tuple)?;
}
dict.set_item("results", results_list)?;
dict.set_item("truncated", truncated)?;
Ok(dict.into_any().unbind())
}
#[pyfunction]
pub fn build_object_cache(
py: Python<'_>,
bucket_path: &str,
meta_root: &str,
etag_index_path: &str,
) -> PyResult<Py<PyAny>> {
let bucket_owned = bucket_path.to_owned();
let meta_owned = meta_root.to_owned();
let index_path_owned = etag_index_path.to_owned();
let result: (HashMap<String, String>, Vec<(String, u64, f64, Option<String>)>, bool) =
py.detach(move || -> PyResult<(
HashMap<String, String>,
Vec<(String, u64, f64, Option<String>)>,
bool,
)> {
let mut meta_cache: HashMap<String, String> = HashMap::new();
let mut index_mtime: f64 = 0.0;
let mut etag_cache_changed = false;
let index_p = Path::new(&index_path_owned);
if index_p.is_file() {
if let Ok(md) = fs::metadata(&index_path_owned) {
index_mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
}
if let Ok(content) = fs::read_to_string(&index_path_owned) {
if let Ok(parsed) = serde_json::from_str::<HashMap<String, String>>(&content) {
meta_cache = parsed;
}
}
}
let meta_p = Path::new(&meta_owned);
let mut needs_rebuild = false;
if meta_p.is_dir() && index_mtime > 0.0 {
fn check_newer(dir: &Path, index_mtime: f64) -> bool {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return false,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
if check_newer(&entry.path(), index_mtime) {
return true;
}
} else if ft.is_file() {
if let Some(name) = entry.file_name().to_str() {
if name.ends_with(".meta.json") || name == "_index.json" {
if let Ok(md) = entry.metadata() {
let mt = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
if mt > index_mtime {
return true;
}
}
}
}
}
}
false
}
needs_rebuild = check_newer(meta_p, index_mtime);
} else if meta_cache.is_empty() {
needs_rebuild = true;
}
if needs_rebuild && meta_p.is_dir() {
let meta_str = meta_owned.clone();
let meta_len = meta_str.len() + 1;
let mut index_files: Vec<String> = Vec::new();
let mut legacy_meta_files: Vec<(String, String)> = Vec::new();
fn collect_meta(
dir: &Path,
meta_len: usize,
index_files: &mut Vec<String>,
legacy_meta_files: &mut Vec<(String, String)>,
) {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
collect_meta(&entry.path(), meta_len, index_files, legacy_meta_files);
} else if ft.is_file() {
if let Some(name) = entry.file_name().to_str() {
let full = entry.path().to_string_lossy().to_string();
if name == "_index.json" {
index_files.push(full);
} else if name.ends_with(".meta.json") {
if full.len() > meta_len {
let rel = &full[meta_len..];
let key = if rel.len() > 10 {
rel[..rel.len() - 10].replace('\\', "/")
} else {
continue;
};
legacy_meta_files.push((key, full));
}
}
}
}
}
}
collect_meta(
meta_p,
meta_len,
&mut index_files,
&mut legacy_meta_files,
);
meta_cache.clear();
for idx_path in &index_files {
if let Ok(content) = fs::read_to_string(idx_path) {
if let Ok(idx_data) = serde_json::from_str::<HashMap<String, Value>>(&content) {
let rel_dir = if idx_path.len() > meta_len {
let r = &idx_path[meta_len..];
r.replace('\\', "/")
} else {
String::new()
};
let dir_prefix = if rel_dir.ends_with("/_index.json") {
&rel_dir[..rel_dir.len() - "/_index.json".len()]
} else {
""
};
for (entry_name, entry_data) in &idx_data {
let key = if dir_prefix.is_empty() {
entry_name.clone()
} else {
format!("{}/{}", dir_prefix, entry_name)
};
if let Some(meta_obj) = entry_data.get("metadata") {
if let Some(etag) = meta_obj.get("__etag__") {
if let Some(etag_str) = etag.as_str() {
meta_cache.insert(key, etag_str.to_owned());
}
}
}
}
}
}
}
for (key, path) in &legacy_meta_files {
if meta_cache.contains_key(key) {
continue;
}
if let Ok(content) = fs::read(path) {
if let Some(etag) = extract_etag_from_meta_bytes(&content) {
meta_cache.insert(key.clone(), etag);
}
}
}
etag_cache_changed = true;
}
let bucket_p = Path::new(&bucket_owned);
let bucket_len = bucket_owned.len() + 1;
let mut objects: Vec<(String, u64, f64, Option<String>)> = Vec::new();
if bucket_p.is_dir() {
let mut stack = vec![bucket_p.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(e) => e,
Err(_) => continue,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let ft = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if ft.is_dir() && !ft.is_symlink() {
let full = entry.path();
let full_str = full.to_string_lossy();
if full_str.len() > bucket_len {
let first_part: &str = if let Some(sep_pos) =
full_str[bucket_len..].find(|c: char| c == '\\' || c == '/')
{
&full_str[bucket_len..bucket_len + sep_pos]
} else {
&full_str[bucket_len..]
};
if INTERNAL_FOLDERS.contains(&first_part) {
continue;
}
} else if let Some(name) = entry.file_name().to_str() {
if INTERNAL_FOLDERS.contains(&name) {
continue;
}
}
stack.push(full);
} else if ft.is_file() && !ft.is_symlink() {
let full = entry.path();
let full_str = full.to_string_lossy();
if full_str.len() <= bucket_len {
continue;
}
let rel = &full_str[bucket_len..];
let first_part: &str =
if let Some(sep_pos) = rel.find(|c: char| c == '\\' || c == '/') {
&rel[..sep_pos]
} else {
rel
};
if INTERNAL_FOLDERS.contains(&first_part) {
continue;
}
let key = rel.replace('\\', "/");
if let Ok(md) = entry.metadata() {
let size = md.len();
let mtime = md
.modified()
.map(system_time_to_epoch)
.unwrap_or(0.0);
let etag = meta_cache.get(&key).cloned();
objects.push((key, size, mtime, etag));
}
}
}
}
}
Ok((meta_cache, objects, etag_cache_changed))
})?;
let (meta_cache, objects, etag_cache_changed) = result;
let dict = PyDict::new(py);
let cache_dict = PyDict::new(py);
for (k, v) in &meta_cache {
cache_dict.set_item(k, v)?;
}
dict.set_item("etag_cache", cache_dict)?;
let objects_list = PyList::empty(py);
for (key, size, mtime, etag) in &objects {
let etag_py: Py<PyAny> = match etag {
Some(e) => PyString::new(py, e).into_any().unbind(),
None => py.None(),
};
let tuple = PyTuple::new(py, &[
PyString::new(py, key).into_any().unbind(),
size.into_pyobject(py)?.into_any().unbind(),
mtime.into_pyobject(py)?.into_any().unbind(),
etag_py,
])?;
objects_list.append(tuple)?;
}
dict.set_item("objects", objects_list)?;
dict.set_item("etag_cache_changed", etag_cache_changed)?;
Ok(dict.into_any().unbind())
}

View File

@@ -0,0 +1,112 @@
use md5::{Digest, Md5};
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::prelude::*;
use std::fs::{self, File};
use std::io::{Read, Write};
use uuid::Uuid;
const DEFAULT_CHUNK_SIZE: usize = 262144;
#[pyfunction]
#[pyo3(signature = (stream, tmp_dir, chunk_size=DEFAULT_CHUNK_SIZE))]
pub fn stream_to_file_with_md5(
py: Python<'_>,
stream: &Bound<'_, PyAny>,
tmp_dir: &str,
chunk_size: usize,
) -> PyResult<(String, String, u64)> {
let chunk_size = if chunk_size == 0 {
DEFAULT_CHUNK_SIZE
} else {
chunk_size
};
fs::create_dir_all(tmp_dir)
.map_err(|e| PyIOError::new_err(format!("Failed to create tmp dir: {}", e)))?;
let tmp_name = format!("{}.tmp", Uuid::new_v4().as_hyphenated());
let tmp_path_buf = std::path::PathBuf::from(tmp_dir).join(&tmp_name);
let tmp_path = tmp_path_buf.to_string_lossy().into_owned();
let mut file = File::create(&tmp_path)
.map_err(|e| PyIOError::new_err(format!("Failed to create temp file: {}", e)))?;
let mut hasher = Md5::new();
let mut total_bytes: u64 = 0;
let result: PyResult<()> = (|| {
loop {
let chunk: Vec<u8> = stream.call_method1("read", (chunk_size,))?.extract()?;
if chunk.is_empty() {
break;
}
hasher.update(&chunk);
file.write_all(&chunk)
.map_err(|e| PyIOError::new_err(format!("Failed to write: {}", e)))?;
total_bytes += chunk.len() as u64;
py.check_signals()?;
}
file.sync_all()
.map_err(|e| PyIOError::new_err(format!("Failed to fsync: {}", e)))?;
Ok(())
})();
if let Err(e) = result {
drop(file);
let _ = fs::remove_file(&tmp_path);
return Err(e);
}
drop(file);
let md5_hex = format!("{:x}", hasher.finalize());
Ok((tmp_path, md5_hex, total_bytes))
}
#[pyfunction]
pub fn assemble_parts_with_md5(
py: Python<'_>,
part_paths: Vec<String>,
dest_path: &str,
) -> PyResult<String> {
if part_paths.is_empty() {
return Err(PyValueError::new_err("No parts to assemble"));
}
let dest = dest_path.to_owned();
let parts = part_paths;
py.detach(move || {
if let Some(parent) = std::path::Path::new(&dest).parent() {
fs::create_dir_all(parent)
.map_err(|e| PyIOError::new_err(format!("Failed to create dest dir: {}", e)))?;
}
let mut target = File::create(&dest)
.map_err(|e| PyIOError::new_err(format!("Failed to create dest file: {}", e)))?;
let mut hasher = Md5::new();
let mut buf = vec![0u8; 1024 * 1024];
for part_path in &parts {
let mut part = File::open(part_path)
.map_err(|e| PyIOError::new_err(format!("Failed to open part {}: {}", part_path, e)))?;
loop {
let n = part
.read(&mut buf)
.map_err(|e| PyIOError::new_err(format!("Failed to read part: {}", e)))?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
target
.write_all(&buf[..n])
.map_err(|e| PyIOError::new_err(format!("Failed to write: {}", e)))?;
}
}
target.sync_all()
.map_err(|e| PyIOError::new_err(format!("Failed to fsync: {}", e)))?;
Ok(format!("{:x}", hasher.finalize()))
})
}

View File

@@ -321,7 +321,7 @@
`;
};
const bucketTotalObjects = objectsContainer ? parseInt(objectsContainer.dataset.bucketTotalObjects || '0', 10) : 0;
let bucketTotalObjects = objectsContainer ? parseInt(objectsContainer.dataset.bucketTotalObjects || '0', 10) : 0;
const updateObjectCountBadge = () => {
if (!objectCountBadge) return;
@@ -702,6 +702,7 @@
flushPendingStreamObjects();
hasMoreObjects = false;
totalObjectCount = loadedObjectCount;
if (!currentPrefix) bucketTotalObjects = totalObjectCount;
updateObjectCountBadge();
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
@@ -766,6 +767,7 @@
}
totalObjectCount = data.total_count || 0;
if (!append && !currentPrefix) bucketTotalObjects = totalObjectCount;
nextContinuationToken = data.next_continuation_token;
if (!append && objectsLoadingRow) {

View File

@@ -43,6 +43,11 @@ def app(tmp_path: Path):
}
)
yield flask_app
storage = flask_app.extensions.get("object_storage")
if storage:
base = getattr(storage, "storage", storage)
if hasattr(base, "shutdown_stats"):
base.shutdown_stats()
@pytest.fixture()

View File

@@ -53,7 +53,9 @@ def test_special_characters_in_metadata(tmp_path: Path):
assert meta["special"] == "!@#$%^&*()"
def test_disk_full_scenario(tmp_path: Path, monkeypatch):
# Simulate disk full by mocking write to fail
import app.storage as _storage_mod
monkeypatch.setattr(_storage_mod, "_HAS_RUST", False)
storage = ObjectStorage(tmp_path)
storage.create_bucket("full")

View File

@@ -0,0 +1,350 @@
import hashlib
import io
import os
import secrets
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
try:
import myfsio_core as _rc
HAS_RUST = True
except ImportError:
_rc = None
HAS_RUST = False
pytestmark = pytest.mark.skipif(not HAS_RUST, reason="myfsio_core not available")
class TestStreamToFileWithMd5:
def test_basic_write(self, tmp_path):
data = b"hello world" * 1000
stream = io.BytesIO(data)
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(stream, tmp_dir)
assert size == len(data)
assert md5_hex == hashlib.md5(data).hexdigest()
assert Path(tmp_path_str).exists()
assert Path(tmp_path_str).read_bytes() == data
def test_empty_stream(self, tmp_path):
stream = io.BytesIO(b"")
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(stream, tmp_dir)
assert size == 0
assert md5_hex == hashlib.md5(b"").hexdigest()
assert Path(tmp_path_str).read_bytes() == b""
def test_large_data(self, tmp_path):
data = os.urandom(1024 * 1024 * 2)
stream = io.BytesIO(data)
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(stream, tmp_dir)
assert size == len(data)
assert md5_hex == hashlib.md5(data).hexdigest()
def test_custom_chunk_size(self, tmp_path):
data = b"x" * 10000
stream = io.BytesIO(data)
tmp_dir = str(tmp_path / "tmp")
tmp_path_str, md5_hex, size = _rc.stream_to_file_with_md5(
stream, tmp_dir, chunk_size=128
)
assert size == len(data)
assert md5_hex == hashlib.md5(data).hexdigest()
class TestAssemblePartsWithMd5:
def test_basic_assembly(self, tmp_path):
parts = []
combined = b""
for i in range(3):
data = f"part{i}data".encode() * 100
combined += data
p = tmp_path / f"part{i}"
p.write_bytes(data)
parts.append(str(p))
dest = str(tmp_path / "output")
md5_hex = _rc.assemble_parts_with_md5(parts, dest)
assert md5_hex == hashlib.md5(combined).hexdigest()
assert Path(dest).read_bytes() == combined
def test_single_part(self, tmp_path):
data = b"single part data"
p = tmp_path / "part0"
p.write_bytes(data)
dest = str(tmp_path / "output")
md5_hex = _rc.assemble_parts_with_md5([str(p)], dest)
assert md5_hex == hashlib.md5(data).hexdigest()
assert Path(dest).read_bytes() == data
def test_empty_parts_list(self):
with pytest.raises(ValueError, match="No parts"):
_rc.assemble_parts_with_md5([], "dummy")
def test_missing_part_file(self, tmp_path):
with pytest.raises(OSError):
_rc.assemble_parts_with_md5(
[str(tmp_path / "nonexistent")], str(tmp_path / "out")
)
def test_large_parts(self, tmp_path):
parts = []
combined = b""
for i in range(5):
data = os.urandom(512 * 1024)
combined += data
p = tmp_path / f"part{i}"
p.write_bytes(data)
parts.append(str(p))
dest = str(tmp_path / "output")
md5_hex = _rc.assemble_parts_with_md5(parts, dest)
assert md5_hex == hashlib.md5(combined).hexdigest()
assert Path(dest).read_bytes() == combined
class TestEncryptDecryptStreamChunked:
def _python_derive_chunk_nonce(self, base_nonce, chunk_index):
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=12,
salt=base_nonce,
info=chunk_index.to_bytes(4, "big"),
)
return hkdf.derive(b"chunk_nonce")
def test_encrypt_decrypt_roundtrip(self, tmp_path):
data = b"Hello, encryption!" * 500
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce
)
assert chunk_count > 0
chunk_count_dec = _rc.decrypt_stream_chunked(
encrypted_path, decrypted_path, key, base_nonce
)
assert chunk_count_dec == chunk_count
assert Path(decrypted_path).read_bytes() == data
def test_empty_file(self, tmp_path):
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "empty")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(b"")
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce
)
assert chunk_count == 0
chunk_count_dec = _rc.decrypt_stream_chunked(
encrypted_path, decrypted_path, key, base_nonce
)
assert chunk_count_dec == 0
assert Path(decrypted_path).read_bytes() == b""
def test_custom_chunk_size(self, tmp_path):
data = os.urandom(10000)
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce, chunk_size=1024
)
assert chunk_count == 10
_rc.decrypt_stream_chunked(encrypted_path, decrypted_path, key, base_nonce)
assert Path(decrypted_path).read_bytes() == data
def test_invalid_key_length(self, tmp_path):
input_path = str(tmp_path / "in")
Path(input_path).write_bytes(b"data")
with pytest.raises(ValueError, match="32 bytes"):
_rc.encrypt_stream_chunked(
input_path, str(tmp_path / "out"), b"short", secrets.token_bytes(12)
)
def test_invalid_nonce_length(self, tmp_path):
input_path = str(tmp_path / "in")
Path(input_path).write_bytes(b"data")
with pytest.raises(ValueError, match="12 bytes"):
_rc.encrypt_stream_chunked(
input_path, str(tmp_path / "out"), secrets.token_bytes(32), b"short"
)
def test_wrong_key_fails_decrypt(self, tmp_path):
data = b"sensitive data"
key = secrets.token_bytes(32)
wrong_key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
_rc.encrypt_stream_chunked(input_path, encrypted_path, key, base_nonce)
with pytest.raises((ValueError, OSError)):
_rc.decrypt_stream_chunked(
encrypted_path, decrypted_path, wrong_key, base_nonce
)
def test_cross_compat_python_encrypt_rust_decrypt(self, tmp_path):
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
data = b"cross compat test data" * 100
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
chunk_size = 1024
encrypted_path = str(tmp_path / "py_encrypted")
with open(encrypted_path, "wb") as f:
f.write(b"\x00\x00\x00\x00")
aesgcm = AESGCM(key)
chunk_index = 0
offset = 0
while offset < len(data):
chunk = data[offset:offset + chunk_size]
nonce = self._python_derive_chunk_nonce(base_nonce, chunk_index)
enc = aesgcm.encrypt(nonce, chunk, None)
f.write(len(enc).to_bytes(4, "big"))
f.write(enc)
chunk_index += 1
offset += chunk_size
f.seek(0)
f.write(chunk_index.to_bytes(4, "big"))
decrypted_path = str(tmp_path / "rust_decrypted")
_rc.decrypt_stream_chunked(encrypted_path, decrypted_path, key, base_nonce)
assert Path(decrypted_path).read_bytes() == data
def test_cross_compat_rust_encrypt_python_decrypt(self, tmp_path):
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
data = b"cross compat reverse test" * 100
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
chunk_size = 1024
input_path = str(tmp_path / "plaintext")
encrypted_path = str(tmp_path / "rust_encrypted")
Path(input_path).write_bytes(data)
chunk_count = _rc.encrypt_stream_chunked(
input_path, encrypted_path, key, base_nonce, chunk_size=chunk_size
)
aesgcm = AESGCM(key)
with open(encrypted_path, "rb") as f:
count_bytes = f.read(4)
assert int.from_bytes(count_bytes, "big") == chunk_count
decrypted = b""
for i in range(chunk_count):
size = int.from_bytes(f.read(4), "big")
enc_chunk = f.read(size)
nonce = self._python_derive_chunk_nonce(base_nonce, i)
decrypted += aesgcm.decrypt(nonce, enc_chunk, None)
assert decrypted == data
def test_large_file_roundtrip(self, tmp_path):
data = os.urandom(1024 * 1024)
key = secrets.token_bytes(32)
base_nonce = secrets.token_bytes(12)
input_path = str(tmp_path / "large")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
_rc.encrypt_stream_chunked(input_path, encrypted_path, key, base_nonce)
_rc.decrypt_stream_chunked(encrypted_path, decrypted_path, key, base_nonce)
assert Path(decrypted_path).read_bytes() == data
class TestStreamingEncryptorFileMethods:
def test_encrypt_file_decrypt_file_roundtrip(self, tmp_path):
from app.encryption import LocalKeyEncryption, StreamingEncryptor
master_key_path = tmp_path / "master.key"
provider = LocalKeyEncryption(master_key_path)
encryptor = StreamingEncryptor(provider, chunk_size=512)
data = b"file method test data" * 200
input_path = str(tmp_path / "input")
encrypted_path = str(tmp_path / "encrypted")
decrypted_path = str(tmp_path / "decrypted")
Path(input_path).write_bytes(data)
metadata = encryptor.encrypt_file(input_path, encrypted_path)
assert metadata.algorithm == "AES256"
encryptor.decrypt_file(encrypted_path, decrypted_path, metadata)
assert Path(decrypted_path).read_bytes() == data
def test_encrypt_file_matches_encrypt_stream(self, tmp_path):
from app.encryption import LocalKeyEncryption, StreamingEncryptor
master_key_path = tmp_path / "master.key"
provider = LocalKeyEncryption(master_key_path)
encryptor = StreamingEncryptor(provider, chunk_size=512)
data = b"stream vs file comparison" * 100
input_path = str(tmp_path / "input")
Path(input_path).write_bytes(data)
file_encrypted_path = str(tmp_path / "file_enc")
metadata_file = encryptor.encrypt_file(input_path, file_encrypted_path)
file_decrypted_path = str(tmp_path / "file_dec")
encryptor.decrypt_file(file_encrypted_path, file_decrypted_path, metadata_file)
assert Path(file_decrypted_path).read_bytes() == data
stream_enc, metadata_stream = encryptor.encrypt_stream(io.BytesIO(data))
stream_dec = encryptor.decrypt_stream(stream_enc, metadata_stream)
assert stream_dec.read() == data