MyFSIO v0.4.2 Release #35

Merged
kqjy merged 9 commits from next into main 2026-04-01 08:33:29 +00:00
14 changed files with 613 additions and 306 deletions

View File

@@ -72,6 +72,11 @@ source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# (Optional) Build Rust native extension for better performance
# Requires Rust toolchain: https://rustup.rs
pip install maturin
cd myfsio_core && maturin develop --release && cd ..
# Start both servers
python run.py

View File

@@ -184,6 +184,7 @@ def create_app(
object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100),
bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0),
object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024),
meta_read_cache_max=app.config.get("META_READ_CACHE_MAX", 2048),
)
if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"):

View File

@@ -136,6 +136,7 @@ class AppConfig:
site_sync_clock_skew_tolerance_seconds: float
object_key_max_length_bytes: int
object_cache_max_size: int
meta_read_cache_max: int
bucket_config_cache_ttl_seconds: float
object_tag_limit: int
encryption_chunk_size_bytes: int
@@ -315,6 +316,7 @@ class AppConfig:
site_sync_clock_skew_tolerance_seconds = float(_get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0))
object_key_max_length_bytes = int(_get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024))
object_cache_max_size = int(_get("OBJECT_CACHE_MAX_SIZE", 100))
meta_read_cache_max = int(_get("META_READ_CACHE_MAX", 2048))
bucket_config_cache_ttl_seconds = float(_get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0))
object_tag_limit = int(_get("OBJECT_TAG_LIMIT", 50))
encryption_chunk_size_bytes = int(_get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024))
@@ -421,6 +423,7 @@ class AppConfig:
site_sync_clock_skew_tolerance_seconds=site_sync_clock_skew_tolerance_seconds,
object_key_max_length_bytes=object_key_max_length_bytes,
object_cache_max_size=object_cache_max_size,
meta_read_cache_max=meta_read_cache_max,
bucket_config_cache_ttl_seconds=bucket_config_cache_ttl_seconds,
object_tag_limit=object_tag_limit,
encryption_chunk_size_bytes=encryption_chunk_size_bytes,
@@ -648,6 +651,7 @@ class AppConfig:
"SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS": self.site_sync_clock_skew_tolerance_seconds,
"OBJECT_KEY_MAX_LENGTH_BYTES": self.object_key_max_length_bytes,
"OBJECT_CACHE_MAX_SIZE": self.object_cache_max_size,
"META_READ_CACHE_MAX": self.meta_read_cache_max,
"BUCKET_CONFIG_CACHE_TTL_SECONDS": self.bucket_config_cache_ttl_seconds,
"OBJECT_TAG_LIMIT": self.object_tag_limit,
"ENCRYPTION_CHUNK_SIZE_BYTES": self.encryption_chunk_size_bytes,

View File

@@ -21,6 +21,10 @@ if sys.platform != "win32":
try:
import myfsio_core as _rc
if not all(hasattr(_rc, f) for f in (
"encrypt_stream_chunked", "decrypt_stream_chunked",
)):
raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release")
_HAS_RUST = True
except ImportError:
_rc = None

View File

@@ -398,9 +398,11 @@ class IamService:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
self._enforce_key_and_user_status(access_key)
return principal
self._maybe_reload()
self._enforce_key_and_user_status(access_key)
user_id = self._key_index.get(access_key)
if not user_id:
raise IamError("Unknown access key")
@@ -414,6 +416,7 @@ class IamService:
def secret_for_key(self, access_key: str) -> str:
self._maybe_reload()
self._enforce_key_and_user_status(access_key)
secret = self._key_secrets.get(access_key)
if not secret:
raise IamError("Unknown access key")
@@ -1028,6 +1031,16 @@ class IamService:
user, _ = self._resolve_raw_user(access_key)
return user
def _enforce_key_and_user_status(self, access_key: str) -> None:
key_status = self._key_status.get(access_key, "active")
if key_status != "active":
raise IamError("Access key is inactive")
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record and not record.get("enabled", True):
raise IamError("User account is disabled")
def get_secret_key(self, access_key: str) -> str | None:
now = time.time()
cached = self._secret_key_cache.get(access_key)
@@ -1039,6 +1052,7 @@ class IamService:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
self._enforce_key_and_user_status(access_key)
return secret_key
self._maybe_reload()
@@ -1049,6 +1063,7 @@ class IamService:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
self._enforce_key_and_user_status(access_key)
self._secret_key_cache[access_key] = (secret, now)
return secret
return None
@@ -1064,9 +1079,11 @@ class IamService:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
self._enforce_key_and_user_status(access_key)
return principal
self._maybe_reload()
self._enforce_key_and_user_status(access_key)
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)

View File

@@ -12,6 +12,8 @@ from typing import Any, Dict, List, Optional
try:
import myfsio_core as _rc
if not hasattr(_rc, "md5_file"):
raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release")
_HAS_RUST = True
except ImportError:
_HAS_RUST = False
@@ -192,10 +194,26 @@ class IntegrityCursorStore:
except OSError as e:
logger.error("Failed to save integrity cursor: %s", e)
def update_bucket(self, bucket_name: str, timestamp: float) -> None:
def update_bucket(
self,
bucket_name: str,
timestamp: float,
last_key: Optional[str] = None,
completed: bool = False,
) -> None:
with self._lock:
data = self.load()
data["buckets"][bucket_name] = {"last_scanned": timestamp}
entry = data["buckets"].get(bucket_name, {})
if completed:
entry["last_scanned"] = timestamp
entry.pop("last_key", None)
entry["completed"] = True
else:
entry["last_scanned"] = timestamp
if last_key is not None:
entry["last_key"] = last_key
entry["completed"] = False
data["buckets"][bucket_name] = entry
self.save(data)
def clean_stale(self, existing_buckets: List[str]) -> None:
@@ -208,17 +226,32 @@ class IntegrityCursorStore:
del data["buckets"][k]
self.save(data)
def get_last_key(self, bucket_name: str) -> Optional[str]:
data = self.load()
entry = data.get("buckets", {}).get(bucket_name)
if entry is None:
return None
return entry.get("last_key")
def get_bucket_order(self, bucket_names: List[str]) -> List[str]:
data = self.load()
buckets_info = data.get("buckets", {})
def sort_key(name: str) -> float:
incomplete = []
complete = []
for name in bucket_names:
entry = buckets_info.get(name)
if entry is None:
return 0.0
return entry.get("last_scanned", 0.0)
incomplete.append((name, 0.0))
elif entry.get("last_key") is not None:
incomplete.append((name, entry.get("last_scanned", 0.0)))
else:
complete.append((name, entry.get("last_scanned", 0.0)))
return sorted(bucket_names, key=sort_key)
incomplete.sort(key=lambda x: x[1])
complete.sort(key=lambda x: x[1])
return [n for n, _ in incomplete] + [n for n, _ in complete]
def get_info(self) -> Dict[str, Any]:
data = self.load()
@@ -226,7 +259,11 @@ class IntegrityCursorStore:
return {
"tracked_buckets": len(buckets),
"buckets": {
name: info.get("last_scanned")
name: {
"last_scanned": info.get("last_scanned"),
"last_key": info.get("last_key"),
"completed": info.get("completed", False),
}
for name, info in buckets.items()
},
}
@@ -325,13 +362,19 @@ class IntegrityChecker:
if self._batch_exhausted(result):
break
result.buckets_scanned += 1
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
cursor_key = self.cursor_store.get_last_key(bucket_name)
key_corrupted = self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key)
key_orphaned = self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key)
key_phantom = self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key)
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
self.cursor_store.update_bucket(bucket_name, time.time())
returned_keys = [k for k in (key_corrupted, key_orphaned, key_phantom) if k is not None]
bucket_exhausted = self._batch_exhausted(result)
if bucket_exhausted and returned_keys:
self.cursor_store.update_bucket(bucket_name, time.time(), last_key=min(returned_keys))
else:
self.cursor_store.update_bucket(bucket_name, time.time(), completed=True)
result.execution_time_seconds = time.time() - start
@@ -399,108 +442,172 @@ class IntegrityChecker:
if len(result.issues) < MAX_ISSUES:
result.issues.append(issue)
def _check_corrupted_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
def _collect_index_keys(
self, meta_root: Path, cursor_key: Optional[str] = None,
) -> Dict[str, Dict[str, Any]]:
all_keys: Dict[str, Dict[str, Any]] = {}
if not meta_root.exists():
return
return all_keys
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
if self._batch_exhausted(result):
return
if not index_file.is_file():
continue
rel_dir = index_file.parent.relative_to(meta_root)
dir_prefix = "" if rel_dir == Path(".") else rel_dir.as_posix()
if cursor_key is not None and dir_prefix:
full_prefix = dir_prefix + "/"
if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix:
continue
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
for key_name, entry in list(index_data.items()):
if self._throttle():
return
if self._batch_exhausted(result):
return
rel_dir = index_file.parent.relative_to(meta_root)
if rel_dir == Path("."):
full_key = key_name
else:
full_key = rel_dir.as_posix() + "/" + key_name
object_path = bucket_path / full_key
if not object_path.exists():
for key_name, entry in index_data.items():
full_key = (dir_prefix + "/" + key_name) if dir_prefix else key_name
if cursor_key is not None and full_key <= cursor_key:
continue
all_keys[full_key] = {
"entry": entry,
"index_file": index_file,
"key_name": key_name,
}
except OSError:
pass
return all_keys
result.objects_scanned += 1
def _walk_bucket_files_sorted(
self, bucket_path: Path, cursor_key: Optional[str] = None,
):
def _walk(dir_path: Path, prefix: str):
try:
entries = list(os.scandir(dir_path))
except OSError:
return
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
stored_etag = meta.get("__etag__")
if not stored_etag:
def _sort_key(e):
if e.is_dir(follow_symlinks=False):
return e.name + "/"
return e.name
entries.sort(key=_sort_key)
for entry in entries:
if entry.is_dir(follow_symlinks=False):
if not prefix and entry.name in self.INTERNAL_FOLDERS:
continue
try:
actual_etag = _compute_etag(object_path)
except OSError:
new_prefix = (prefix + "/" + entry.name) if prefix else entry.name
if cursor_key is not None:
full_prefix = new_prefix + "/"
if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix:
continue
yield from _walk(Path(entry.path), new_prefix)
elif entry.is_file(follow_symlinks=False):
full_key = (prefix + "/" + entry.name) if prefix else entry.name
if cursor_key is not None and full_key <= cursor_key:
continue
yield full_key
if actual_etag != stored_etag:
result.corrupted_objects += 1
issue = IntegrityIssue(
issue_type="corrupted_object",
bucket=bucket_name,
key=full_key,
detail=f"stored_etag={stored_etag} actual_etag={actual_etag}",
)
yield from _walk(bucket_path, "")
if auto_heal and not dry_run:
try:
stat = object_path.stat()
meta["__etag__"] = actual_etag
meta["__size__"] = str(stat.st_size)
meta["__last_modified__"] = str(stat.st_mtime)
index_data[key_name] = {"metadata": meta}
self._atomic_write_index(index_file, index_data)
issue.healed = True
issue.heal_action = "updated etag in index"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check corrupted {bucket_name}: {e}")
def _check_orphaned_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
def _check_corrupted_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool,
cursor_key: Optional[str] = None,
) -> Optional[str]:
if self._batch_exhausted(result):
return None
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return None
last_key = None
try:
for entry in bucket_path.rglob("*"):
all_keys = self._collect_index_keys(meta_root, cursor_key)
sorted_keys = sorted(all_keys.keys())
for full_key in sorted_keys:
if self._throttle():
return
return last_key
if self._batch_exhausted(result):
return
if not entry.is_file():
continue
try:
rel = entry.relative_to(bucket_path)
except ValueError:
continue
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
return last_key
info = all_keys[full_key]
entry = info["entry"]
index_file = info["index_file"]
key_name = info["key_name"]
object_path = bucket_path / full_key
if not object_path.exists():
continue
result.objects_scanned += 1
full_key = rel.as_posix()
key_name = rel.name
parent = rel.parent
last_key = full_key
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
stored_etag = meta.get("__etag__")
if not stored_etag:
continue
try:
actual_etag = _compute_etag(object_path)
except OSError:
continue
if actual_etag != stored_etag:
result.corrupted_objects += 1
issue = IntegrityIssue(
issue_type="corrupted_object",
bucket=bucket_name,
key=full_key,
detail=f"stored_etag={stored_etag} actual_etag={actual_etag}",
)
if auto_heal and not dry_run:
try:
stat = object_path.stat()
meta["__etag__"] = actual_etag
meta["__size__"] = str(stat.st_size)
meta["__last_modified__"] = str(stat.st_mtime)
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
index_data = {}
index_data[key_name] = {"metadata": meta}
self._atomic_write_index(index_file, index_data)
issue.healed = True
issue.heal_action = "updated etag in index"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check corrupted {bucket_name}: {e}")
return last_key
def _check_orphaned_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool,
cursor_key: Optional[str] = None,
) -> Optional[str]:
if self._batch_exhausted(result):
return None
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
last_key = None
try:
for full_key in self._walk_bucket_files_sorted(bucket_path, cursor_key):
if self._throttle():
return last_key
if self._batch_exhausted(result):
return last_key
result.objects_scanned += 1
last_key = full_key
key_path = Path(full_key)
key_name = key_path.name
parent = key_path.parent
if parent == Path("."):
index_path = meta_root / "_index.json"
@@ -526,8 +633,9 @@ class IntegrityChecker:
if auto_heal and not dry_run:
try:
etag = _compute_etag(entry)
stat = entry.stat()
object_path = bucket_path / full_key
etag = _compute_etag(object_path)
stat = object_path.stat()
meta = {
"__etag__": etag,
"__size__": str(stat.st_size),
@@ -550,58 +658,56 @@ class IntegrityChecker:
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check orphaned {bucket_name}: {e}")
return last_key
def _check_phantom_metadata(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool,
cursor_key: Optional[str] = None,
) -> Optional[str]:
if self._batch_exhausted(result):
return None
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return
return None
last_key = None
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
all_keys = self._collect_index_keys(meta_root, cursor_key)
sorted_keys = sorted(all_keys.keys())
heal_by_index: Dict[Path, List[str]] = {}
for full_key in sorted_keys:
if self._batch_exhausted(result):
return
if not index_file.is_file():
continue
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
break
keys_to_remove = []
for key_name in list(index_data.keys()):
if self._batch_exhausted(result):
break
result.objects_scanned += 1
rel_dir = index_file.parent.relative_to(meta_root)
if rel_dir == Path("."):
full_key = key_name
else:
full_key = rel_dir.as_posix() + "/" + key_name
result.objects_scanned += 1
last_key = full_key
object_path = bucket_path / full_key
if not object_path.exists():
result.phantom_metadata += 1
issue = IntegrityIssue(
issue_type="phantom_metadata",
bucket=bucket_name,
key=full_key,
detail="metadata entry without file on disk",
)
if auto_heal and not dry_run:
keys_to_remove.append(key_name)
issue.healed = True
issue.heal_action = "removed stale index entry"
result.issues_healed += 1
self._add_issue(result, issue)
object_path = bucket_path / full_key
if not object_path.exists():
result.phantom_metadata += 1
info = all_keys[full_key]
issue = IntegrityIssue(
issue_type="phantom_metadata",
bucket=bucket_name,
key=full_key,
detail="metadata entry without file on disk",
)
if auto_heal and not dry_run:
index_file = info["index_file"]
heal_by_index.setdefault(index_file, []).append(info["key_name"])
issue.healed = True
issue.heal_action = "removed stale index entry"
result.issues_healed += 1
self._add_issue(result, issue)
if keys_to_remove and auto_heal and not dry_run:
if heal_by_index and auto_heal and not dry_run:
for index_file, keys_to_remove in heal_by_index.items():
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
for k in keys_to_remove:
index_data.pop(k, None)
if index_data:
@@ -612,10 +718,13 @@ class IntegrityChecker:
result.errors.append(f"heal phantom {bucket_name}: {e}")
except OSError as e:
result.errors.append(f"check phantom {bucket_name}: {e}")
return last_key
def _check_stale_versions(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
if self._batch_exhausted(result):
return
versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR
if not versions_root.exists():
@@ -682,6 +791,8 @@ class IntegrityChecker:
def _check_etag_cache(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
if self._batch_exhausted(result):
return
etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json"
if not etag_index_path.exists():
@@ -751,6 +862,8 @@ class IntegrityChecker:
def _check_legacy_metadata(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
if self._batch_exhausted(result):
return
legacy_meta_root = self.storage_root / bucket_name / ".meta"
if not legacy_meta_root.exists():
return

View File

@@ -19,6 +19,10 @@ from defusedxml.ElementTree import fromstring
try:
import myfsio_core as _rc
if not all(hasattr(_rc, f) for f in (
"verify_sigv4_signature", "derive_signing_key", "clear_signing_key_cache",
)):
raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release")
_HAS_RUST = True
except ImportError:
_rc = None
@@ -201,6 +205,11 @@ _SIGNING_KEY_CACHE_LOCK = threading.Lock()
_SIGNING_KEY_CACHE_TTL = 60.0
_SIGNING_KEY_CACHE_MAX_SIZE = 256
_SIGV4_HEADER_RE = re.compile(
r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)"
)
_SIGV4_REQUIRED_HEADERS = frozenset({'host', 'x-amz-date'})
def clear_signing_key_cache() -> None:
if _HAS_RUST:
@@ -259,10 +268,7 @@ def _get_canonical_uri(req: Any) -> str:
def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
match = re.match(
r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)",
auth_header,
)
match = _SIGV4_HEADER_RE.match(auth_header)
if not match:
return None
@@ -286,14 +292,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
if time_diff > tolerance:
raise IamError("Request timestamp too old or too far in the future")
required_headers = {'host', 'x-amz-date'}
signed_headers_set = set(signed_headers_str.split(';'))
if not required_headers.issubset(signed_headers_set):
if 'date' in signed_headers_set:
required_headers.remove('x-amz-date')
required_headers.add('date')
if not required_headers.issubset(signed_headers_set):
if not _SIGV4_REQUIRED_HEADERS.issubset(signed_headers_set):
if not ({'host', 'date'}.issubset(signed_headers_set)):
raise IamError("Required headers not signed")
canonical_uri = _get_canonical_uri(req)
@@ -533,21 +534,6 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti
raise iam_error or IamError("Access denied")
def _enforce_bucket_policy(principal: Principal | None, bucket_name: str | None, object_key: str | None, action: str) -> None:
if not bucket_name:
return
policy_context = _build_policy_context()
decision = _bucket_policies().evaluate(
principal.access_key if principal else None,
bucket_name,
object_key,
action,
policy_context,
)
if decision == "deny":
raise IamError("Access denied by bucket policy")
def _object_principal(action: str, bucket_name: str, object_key: str):
principal, error = _require_principal()
try:
@@ -556,121 +542,7 @@ def _object_principal(action: str, bucket_name: str, object_key: str):
except IamError as exc:
if not error:
return None, _error_response("AccessDenied", str(exc), 403)
if not _has_presign_params():
return None, error
try:
principal = _validate_presigned_request(action, bucket_name, object_key)
_enforce_bucket_policy(principal, bucket_name, object_key, action)
return principal, None
except IamError as exc:
return None, _error_response("AccessDenied", str(exc), 403)
def _has_presign_params() -> bool:
return bool(request.args.get("X-Amz-Algorithm"))
def _validate_presigned_request(action: str, bucket_name: str, object_key: str) -> Principal:
algorithm = request.args.get("X-Amz-Algorithm")
credential = request.args.get("X-Amz-Credential")
amz_date = request.args.get("X-Amz-Date")
signed_headers = request.args.get("X-Amz-SignedHeaders")
expires = request.args.get("X-Amz-Expires")
signature = request.args.get("X-Amz-Signature")
if not all([algorithm, credential, amz_date, signed_headers, expires, signature]):
raise IamError("Malformed presigned URL")
if algorithm != "AWS4-HMAC-SHA256":
raise IamError("Unsupported signing algorithm")
parts = credential.split("/")
if len(parts) != 5:
raise IamError("Invalid credential scope")
access_key, date_stamp, region, service, terminal = parts
if terminal != "aws4_request":
raise IamError("Invalid credential scope")
config_region = current_app.config["AWS_REGION"]
config_service = current_app.config["AWS_SERVICE"]
if region != config_region or service != config_service:
raise IamError("Credential scope mismatch")
try:
expiry = int(expires)
except ValueError as exc:
raise IamError("Invalid expiration") from exc
min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1)
max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800)
if expiry < min_expiry or expiry > max_expiry:
raise IamError(f"Expiration must be between {min_expiry} second(s) and {max_expiry} seconds")
try:
request_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc)
except ValueError as exc:
raise IamError("Invalid X-Amz-Date") from exc
now = datetime.now(timezone.utc)
tolerance = timedelta(seconds=current_app.config.get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900))
if request_time > now + tolerance:
raise IamError("Request date is too far in the future")
if now > request_time + timedelta(seconds=expiry):
raise IamError("Presigned URL expired")
signed_headers_list = [header.strip().lower() for header in signed_headers.split(";") if header]
signed_headers_list.sort()
canonical_headers = _canonical_headers_from_request(signed_headers_list)
canonical_query = _canonical_query_from_request()
payload_hash = request.args.get("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
canonical_request = "\n".join(
[
request.method,
_canonical_uri(bucket_name, object_key),
canonical_query,
canonical_headers,
";".join(signed_headers_list),
payload_hash,
]
)
hashed_request = hashlib.sha256(canonical_request.encode()).hexdigest()
scope = f"{date_stamp}/{region}/{service}/aws4_request"
string_to_sign = "\n".join([
"AWS4-HMAC-SHA256",
amz_date,
scope,
hashed_request,
])
secret = _iam().secret_for_key(access_key)
signing_key = _derive_signing_key(secret, date_stamp, region, service)
expected = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, signature):
raise IamError("Signature mismatch")
return _iam().principal_for_key(access_key)
def _canonical_query_from_request() -> str:
parts = []
for key in sorted(request.args.keys()):
if key == "X-Amz-Signature":
continue
values = request.args.getlist(key)
encoded_key = quote(str(key), safe="-_.~")
for value in sorted(values):
encoded_value = quote(str(value), safe="-_.~")
parts.append(f"{encoded_key}={encoded_value}")
return "&".join(parts)
def _canonical_headers_from_request(headers: list[str]) -> str:
lines = []
for header in headers:
if header == "host":
api_base = current_app.config.get("API_BASE_URL")
if api_base:
value = urlparse(api_base).netloc
else:
value = request.host
else:
value = request.headers.get(header, "")
canonical_value = " ".join(value.strip().split()) if value else ""
lines.append(f"{header}:{canonical_value}")
return "\n".join(lines) + "\n"
def _canonical_uri(bucket_name: str, object_key: str | None) -> str:
@@ -736,8 +608,8 @@ def _generate_presigned_url(
host = parsed.netloc
scheme = parsed.scheme
else:
host = request.headers.get("X-Forwarded-Host", request.host)
scheme = request.headers.get("X-Forwarded-Proto", request.scheme or "http")
host = request.host
scheme = request.scheme or "http"
canonical_headers = f"host:{host}\n"
canonical_request = "\n".join(
@@ -1010,7 +882,7 @@ def _render_encryption_document(config: dict[str, Any]) -> Element:
return root
def _stream_file(path, chunk_size: int = 256 * 1024):
def _stream_file(path, chunk_size: int = 1024 * 1024):
with path.open("rb") as handle:
while True:
chunk = handle.read(chunk_size)
@@ -2961,9 +2833,12 @@ def object_handler(bucket_name: str, object_key: str):
is_encrypted = "x-amz-server-side-encryption" in metadata
cond_etag = metadata.get("__etag__")
_etag_was_healed = False
if not cond_etag and not is_encrypted:
try:
cond_etag = storage._compute_etag(path)
_etag_was_healed = True
storage.heal_missing_etag(bucket_name, object_key, cond_etag)
except OSError:
cond_etag = None
if cond_etag:
@@ -3009,7 +2884,7 @@ def object_handler(bucket_name: str, object_key: str):
try:
stat = path.stat()
file_size = stat.st_size
etag = metadata.get("__etag__") or storage._compute_etag(path)
etag = cond_etag or storage._compute_etag(path)
except PermissionError:
return _error_response("AccessDenied", "Permission denied accessing object", 403)
except OSError as exc:
@@ -3057,7 +2932,7 @@ def object_handler(bucket_name: str, object_key: str):
try:
stat = path.stat()
response = Response(status=200)
etag = metadata.get("__etag__") or storage._compute_etag(path)
etag = cond_etag or storage._compute_etag(path)
except PermissionError:
return _error_response("AccessDenied", "Permission denied accessing object", 403)
except OSError as exc:
@@ -3442,9 +3317,13 @@ def head_object(bucket_name: str, object_key: str) -> Response:
return error
try:
_authorize_action(principal, bucket_name, "read", object_key=object_key)
path = _storage().get_object_path(bucket_name, object_key)
metadata = _storage().get_object_metadata(bucket_name, object_key)
etag = metadata.get("__etag__") or _storage()._compute_etag(path)
storage = _storage()
path = storage.get_object_path(bucket_name, object_key)
metadata = storage.get_object_metadata(bucket_name, object_key)
etag = metadata.get("__etag__")
if not etag:
etag = storage._compute_etag(path)
storage.heal_missing_etag(bucket_name, object_key, etag)
head_mtime = float(metadata["__last_modified__"]) if "__last_modified__" in metadata else None
if head_mtime is None:

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import shutil
@@ -20,12 +21,21 @@ from typing import Any, BinaryIO, Dict, Generator, List, Optional
try:
import myfsio_core as _rc
if not all(hasattr(_rc, f) for f in (
"validate_bucket_name", "validate_object_key", "md5_file",
"shallow_scan", "bucket_stats_scan", "search_objects_scan",
"stream_to_file_with_md5", "assemble_parts_with_md5",
"build_object_cache", "read_index_entry", "write_index_entry",
"delete_index_entry", "check_bucket_contents",
)):
raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release")
_HAS_RUST = True
except ImportError:
_rc = None
_HAS_RUST = False
# Platform-specific file locking
logger = logging.getLogger(__name__)
if os.name == "nt":
import msvcrt
@@ -190,6 +200,7 @@ class ObjectStorage:
object_cache_max_size: int = 100,
bucket_config_cache_ttl: float = 30.0,
object_key_max_length_bytes: int = 1024,
meta_read_cache_max: int = 2048,
) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
@@ -208,7 +219,7 @@ class ObjectStorage:
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
self._meta_index_locks: Dict[str, threading.Lock] = {}
self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict()
self._meta_read_cache_max = 2048
self._meta_read_cache_max = meta_read_cache_max
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
self._stats_mem: Dict[str, Dict[str, int]] = {}
self._stats_serial: Dict[str, int] = {}
@@ -218,6 +229,7 @@ class ObjectStorage:
self._stats_flush_timer: Optional[threading.Timer] = None
self._etag_index_dirty: set[str] = set()
self._etag_index_flush_timer: Optional[threading.Timer] = None
self._etag_index_mem: Dict[str, tuple[Dict[str, str], float]] = {}
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
with self._registry_lock:
@@ -427,7 +439,7 @@ class ObjectStorage:
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
try:
cache_path.parent.mkdir(parents=True, exist_ok=True)
self._atomic_write_json(cache_path, data)
self._atomic_write_json(cache_path, data, sync=False)
except OSError:
pass
@@ -602,14 +614,7 @@ class ObjectStorage:
is_truncated=False, next_continuation_token=None,
)
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
meta_cache: Dict[str, str] = {}
if etag_index_path.exists():
try:
with open(etag_index_path, 'r', encoding='utf-8') as f:
meta_cache = json.load(f)
except (OSError, json.JSONDecodeError):
pass
meta_cache: Dict[str, str] = self._get_etag_index(bucket_id)
entries_files: list[tuple[str, int, float, Optional[str]]] = []
entries_dirs: list[str] = []
@@ -1079,6 +1084,30 @@ class ObjectStorage:
safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes)
return self._read_metadata(bucket_path.name, safe_key) or {}
def heal_missing_etag(self, bucket_name: str, object_key: str, etag: str) -> None:
"""Persist a computed ETag back to metadata (self-heal on read)."""
try:
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
return
bucket_id = bucket_path.name
safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes)
existing = self._read_metadata(bucket_id, safe_key) or {}
if existing.get("__etag__"):
return
existing["__etag__"] = etag
self._write_metadata(bucket_id, safe_key, existing)
with self._obj_cache_lock:
cached = self._object_cache.get(bucket_id)
if cached:
obj = cached[0].get(safe_key.as_posix())
if obj and not obj.etag:
obj.etag = etag
self._etag_index_dirty.add(bucket_id)
self._schedule_etag_index_flush()
except Exception:
logger.warning("Failed to heal missing ETag for %s/%s", bucket_name, object_key)
def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
"""Remove empty parent directories in a background thread.
@@ -2088,6 +2117,7 @@ class ObjectStorage:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(raw["etag_cache"], f)
self._etag_index_mem[bucket_id] = (dict(raw["etag_cache"]), etag_index_path.stat().st_mtime)
except OSError:
pass
for key, size, mtime, etag in raw["objects"]:
@@ -2211,6 +2241,7 @@ class ObjectStorage:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(meta_cache, f)
self._etag_index_mem[bucket_id] = (dict(meta_cache), etag_index_path.stat().st_mtime)
except OSError:
pass
@@ -2324,6 +2355,25 @@ class ObjectStorage:
self._etag_index_dirty.add(bucket_id)
self._schedule_etag_index_flush()
def _get_etag_index(self, bucket_id: str) -> Dict[str, str]:
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
current_mtime = etag_index_path.stat().st_mtime
except OSError:
return {}
cached = self._etag_index_mem.get(bucket_id)
if cached:
cache_dict, cached_mtime = cached
if current_mtime == cached_mtime:
return cache_dict
try:
with open(etag_index_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self._etag_index_mem[bucket_id] = (data, current_mtime)
return data
except (OSError, json.JSONDecodeError):
return {}
def _schedule_etag_index_flush(self) -> None:
if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive():
self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes)
@@ -2342,11 +2392,10 @@ class ObjectStorage:
index = {k: v.etag for k, v in objects.items() if v.etag}
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(index, f)
self._atomic_write_json(etag_index_path, index, sync=False)
self._etag_index_mem[bucket_id] = (index, etag_index_path.stat().st_mtime)
except OSError:
pass
logger.warning("Failed to flush etag index for bucket %s", bucket_id)
def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None:
"""Pre-warm the object cache for specified buckets or all buckets.
@@ -2388,14 +2437,15 @@ class ObjectStorage:
path.mkdir(parents=True, exist_ok=True)
@staticmethod
def _atomic_write_json(path: Path, data: Any) -> None:
def _atomic_write_json(path: Path, data: Any, *, sync: bool = True) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(".tmp")
try:
with tmp_path.open("w", encoding="utf-8") as f:
json.dump(data, f)
f.flush()
os.fsync(f.fileno())
if sync:
f.flush()
os.fsync(f.fileno())
tmp_path.replace(path)
except BaseException:
try:

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
APP_VERSION = "0.4.1"
APP_VERSION = "0.4.2"
def get_version() -> str:

View File

@@ -1,4 +1,4 @@
Flask>=3.1.2
Flask>=3.1.3
Flask-Limiter>=4.1.1
Flask-Cors>=6.0.2
Flask-WTF>=1.2.2
@@ -6,8 +6,8 @@ python-dotenv>=1.2.1
pytest>=9.0.2
requests>=2.32.5
boto3>=1.42.14
granian>=2.2.0
psutil>=7.1.3
cryptography>=46.0.3
granian>=2.7.2
psutil>=7.2.2
cryptography>=46.0.5
defusedxml>=0.7.1
duckdb>=1.4.4
duckdb>=1.5.1

2
run.py
View File

@@ -26,6 +26,7 @@ from typing import Optional
from app import create_api_app, create_ui_app
from app.config import AppConfig
from app.iam import IamService, IamError, ALLOWED_ACTIONS, _derive_fernet_key
from app.version import get_version
def _server_host() -> str:
@@ -229,6 +230,7 @@ if __name__ == "__main__":
parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit")
parser.add_argument("--show-config", action="store_true", help="Show configuration summary and exit")
parser.add_argument("--reset-cred", action="store_true", help="Reset admin credentials and exit")
parser.add_argument("--version", action="version", version=f"MyFSIO {get_version()}")
args = parser.parse_args()
if args.reset_cred or args.mode == "reset-cred":

View File

@@ -245,12 +245,12 @@
</svg>
</a>
<div class="dropdown d-inline-block">
<button class="btn btn-outline-secondary btn-icon dropdown-toggle" type="button" data-bs-toggle="dropdown" data-bs-auto-close="true" aria-expanded="false" title="More actions">
<button class="btn btn-outline-secondary btn-icon dropdown-toggle" type="button" data-bs-toggle="dropdown" data-bs-auto-close="true" data-bs-config='{"popperConfig":{"strategy":"fixed"}}' aria-expanded="false" title="More actions">
<svg xmlns="http://www.w3.org/2000/svg" width="14" height="14" fill="currentColor" viewBox="0 0 16 16">
<path d="M9.5 13a1.5 1.5 0 1 1-3 0 1.5 1.5 0 0 1 3 0zm0-5a1.5 1.5 0 1 1-3 0 1.5 1.5 0 0 1 3 0zm0-5a1.5 1.5 0 1 1-3 0 1.5 1.5 0 0 1 3 0z"/>
</svg>
</button>
<ul class="dropdown-menu dropdown-menu-end" style="position: fixed;">
<ul class="dropdown-menu dropdown-menu-end">
<li><button class="dropdown-item" type="button" onclick="openCopyMoveModal('copy', '${escapeHtml(obj.key)}')">
<svg xmlns="http://www.w3.org/2000/svg" width="14" height="14" fill="currentColor" class="me-2" viewBox="0 0 16 16"><path fill-rule="evenodd" d="M4 2a2 2 0 0 1 2-2h8a2 2 0 0 1 2 2v8a2 2 0 0 1-2 2H6a2 2 0 0 1-2-2V2Zm2-1a1 1 0 0 0-1 1v8a1 1 0 0 0 1 1h8a1 1 0 0 0 1-1V2a1 1 0 0 0-1-1H6ZM2 5a1 1 0 0 0-1 1v8a1 1 0 0 0 1 1h8a1 1 0 0 0 1-1v-1h1v1a2 2 0 0 1-2 2H2a2 2 0 0 1-2-2V6a2 2 0 0 1 2-2h1v1H2Z"/></svg>
Copy
@@ -948,7 +948,7 @@
const row = e.target.closest('[data-object-row]');
if (!row) return;
if (e.target.closest('[data-delete-object]') || e.target.closest('[data-object-select]') || e.target.closest('a')) {
if (e.target.closest('[data-delete-object]') || e.target.closest('[data-object-select]') || e.target.closest('a') || e.target.closest('.dropdown')) {
return;
}

View File

@@ -1,3 +1,56 @@
import hashlib
import hmac
from datetime import datetime, timezone
from urllib.parse import quote
def _build_presigned_query(path: str, *, access_key: str = "test", secret_key: str = "secret", expires: int = 60) -> str:
now = datetime.now(timezone.utc)
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
date_stamp = now.strftime("%Y%m%d")
region = "us-east-1"
service = "s3"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
query_items = [
("X-Amz-Algorithm", "AWS4-HMAC-SHA256"),
("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD"),
("X-Amz-Credential", f"{access_key}/{credential_scope}"),
("X-Amz-Date", amz_date),
("X-Amz-Expires", str(expires)),
("X-Amz-SignedHeaders", "host"),
]
canonical_query = "&".join(
f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}" for k, v in sorted(query_items)
)
canonical_request = "\n".join([
"GET",
quote(path, safe="/-_.~"),
canonical_query,
"host:localhost\n",
"host",
"UNSIGNED-PAYLOAD",
])
hashed_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = "\n".join([
"AWS4-HMAC-SHA256",
amz_date,
credential_scope,
hashed_request,
])
def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
k_date = _sign(("AWS4" + secret_key).encode("utf-8"), date_stamp)
k_region = _sign(k_date, region)
k_service = _sign(k_region, service)
signing_key = _sign(k_service, "aws4_request")
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
return canonical_query + f"&X-Amz-Signature={signature}"
def test_bucket_and_object_lifecycle(client, signer):
headers = signer("PUT", "/photos")
response = client.put("/photos", headers=headers)
@@ -114,6 +167,45 @@ def test_missing_credentials_denied(client):
assert response.status_code == 403
def test_presigned_url_denied_for_disabled_user(client, signer):
headers = signer("PUT", "/secure")
assert client.put("/secure", headers=headers).status_code == 200
payload = b"hello"
headers = signer("PUT", "/secure/file.txt", body=payload)
assert client.put("/secure/file.txt", headers=headers, data=payload).status_code == 200
iam = client.application.extensions["iam"]
iam.disable_user("test")
query = _build_presigned_query("/secure/file.txt")
response = client.get(f"/secure/file.txt?{query}", headers={"Host": "localhost"})
assert response.status_code == 403
assert b"User account is disabled" in response.data
def test_presigned_url_denied_for_inactive_key(client, signer):
headers = signer("PUT", "/secure2")
assert client.put("/secure2", headers=headers).status_code == 200
payload = b"hello"
headers = signer("PUT", "/secure2/file.txt", body=payload)
assert client.put("/secure2/file.txt", headers=headers, data=payload).status_code == 200
iam = client.application.extensions["iam"]
for user in iam._raw_config.get("users", []):
for key_info in user.get("access_keys", []):
if key_info.get("access_key") == "test":
key_info["status"] = "inactive"
iam._save()
iam._load()
query = _build_presigned_query("/secure2/file.txt")
response = client.get(f"/secure2/file.txt?{query}", headers={"Host": "localhost"})
assert response.status_code == 403
assert b"Access key is inactive" in response.data
def test_bucket_policies_deny_reads(client, signer):
import json

View File

@@ -644,5 +644,145 @@ class TestCursorRotation:
after = time.time()
cursor_info = checker.cursor_store.get_info()
ts = cursor_info["buckets"]["mybucket"]
assert before <= ts <= after
entry = cursor_info["buckets"]["mybucket"]
assert before <= entry["last_scanned"] <= after
assert entry["completed"] is True
class TestIntraBucketCursor:
def test_resumes_from_cursor_key(self, storage_root):
objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
result1 = checker.run_now()
assert result1.objects_scanned == 3
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert entry["last_key"] is not None
assert entry["completed"] is False
result2 = checker.run_now()
assert result2.objects_scanned == 3
cursor_after = checker.cursor_store.get_info()["buckets"]["mybucket"]
assert cursor_after["last_key"] > entry["last_key"]
def test_cursor_resets_after_full_pass(self, storage_root):
objects = {f"file_{i}.txt": f"data{i}".encode() for i in range(3)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=100)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert entry["last_key"] is None
assert entry["completed"] is True
def test_full_coverage_across_cycles(self, storage_root):
objects = {f"obj_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
all_scanned = 0
for _ in range(10):
result = checker.run_now()
all_scanned += result.objects_scanned
if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]:
break
assert all_scanned >= 10
def test_deleted_cursor_key_skips_gracefully(self, storage_root):
objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(6)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
cursor_key = cursor_info["buckets"]["mybucket"]["last_key"]
assert cursor_key is not None
obj_path = storage_root / "mybucket" / cursor_key
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
key_path = Path(cursor_key)
index_path = meta_root / key_path.parent / "_index.json" if key_path.parent != Path(".") else meta_root / "_index.json"
if key_path.parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / key_path.parent / "_index.json"
if obj_path.exists():
obj_path.unlink()
if index_path.exists():
index_data = json.loads(index_path.read_text())
index_data.pop(key_path.name, None)
index_path.write_text(json.dumps(index_data))
result2 = checker.run_now()
assert result2.objects_scanned > 0
def test_incomplete_buckets_prioritized(self, storage_root):
_setup_bucket(storage_root, "bucket-a", {f"a{i}.txt": b"a" for i in range(5)})
_setup_bucket(storage_root, "bucket-b", {f"b{i}.txt": b"b" for i in range(5)})
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
incomplete = [
name for name, info in cursor_info["buckets"].items()
if info.get("last_key") is not None
]
assert len(incomplete) >= 1
result2 = checker.run_now()
assert result2.objects_scanned > 0
def test_cursor_skips_nested_directories(self, storage_root):
objects = {
"aaa/file1.txt": b"a1",
"aaa/file2.txt": b"a2",
"bbb/file1.txt": b"b1",
"bbb/file2.txt": b"b2",
"ccc/file1.txt": b"c1",
"ccc/file2.txt": b"c2",
}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
result1 = checker.run_now()
assert result1.objects_scanned == 4
cursor_info = checker.cursor_store.get_info()
cursor_key = cursor_info["buckets"]["mybucket"]["last_key"]
assert cursor_key is not None
assert cursor_key.startswith("aaa/") or cursor_key.startswith("bbb/")
result2 = checker.run_now()
assert result2.objects_scanned >= 2
all_scanned = result1.objects_scanned + result2.objects_scanned
for _ in range(10):
if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]:
break
r = checker.run_now()
all_scanned += r.objects_scanned
assert all_scanned >= 6
def test_sorted_walk_order(self, storage_root):
objects = {
"bar.txt": b"bar",
"bar/inner.txt": b"inner",
"abc.txt": b"abc",
"zzz/deep.txt": b"deep",
}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=100)
result = checker.run_now()
assert result.objects_scanned >= 4
assert result.total_issues == 0