diff --git a/README.md b/README.md index 24b61d4..772178d 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,11 @@ source .venv/bin/activate # Install dependencies pip install -r requirements.txt +# (Optional) Build Rust native extension for better performance +# Requires Rust toolchain: https://rustup.rs +pip install maturin +cd myfsio_core && maturin develop --release && cd .. + # Start both servers python run.py diff --git a/app/__init__.py b/app/__init__.py index 9961fff..25f0028 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -184,6 +184,7 @@ def create_app( object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100), bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0), object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024), + meta_read_cache_max=app.config.get("META_READ_CACHE_MAX", 2048), ) if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"): diff --git a/app/config.py b/app/config.py index 04cae7a..38906bc 100644 --- a/app/config.py +++ b/app/config.py @@ -136,6 +136,7 @@ class AppConfig: site_sync_clock_skew_tolerance_seconds: float object_key_max_length_bytes: int object_cache_max_size: int + meta_read_cache_max: int bucket_config_cache_ttl_seconds: float object_tag_limit: int encryption_chunk_size_bytes: int @@ -315,6 +316,7 @@ class AppConfig: site_sync_clock_skew_tolerance_seconds = float(_get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0)) object_key_max_length_bytes = int(_get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024)) object_cache_max_size = int(_get("OBJECT_CACHE_MAX_SIZE", 100)) + meta_read_cache_max = int(_get("META_READ_CACHE_MAX", 2048)) bucket_config_cache_ttl_seconds = float(_get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0)) object_tag_limit = int(_get("OBJECT_TAG_LIMIT", 50)) encryption_chunk_size_bytes = int(_get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024)) @@ -421,6 +423,7 @@ class AppConfig: site_sync_clock_skew_tolerance_seconds=site_sync_clock_skew_tolerance_seconds, object_key_max_length_bytes=object_key_max_length_bytes, object_cache_max_size=object_cache_max_size, + meta_read_cache_max=meta_read_cache_max, bucket_config_cache_ttl_seconds=bucket_config_cache_ttl_seconds, object_tag_limit=object_tag_limit, encryption_chunk_size_bytes=encryption_chunk_size_bytes, @@ -648,6 +651,7 @@ class AppConfig: "SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS": self.site_sync_clock_skew_tolerance_seconds, "OBJECT_KEY_MAX_LENGTH_BYTES": self.object_key_max_length_bytes, "OBJECT_CACHE_MAX_SIZE": self.object_cache_max_size, + "META_READ_CACHE_MAX": self.meta_read_cache_max, "BUCKET_CONFIG_CACHE_TTL_SECONDS": self.bucket_config_cache_ttl_seconds, "OBJECT_TAG_LIMIT": self.object_tag_limit, "ENCRYPTION_CHUNK_SIZE_BYTES": self.encryption_chunk_size_bytes, diff --git a/app/encryption.py b/app/encryption.py index cb40199..f000176 100644 --- a/app/encryption.py +++ b/app/encryption.py @@ -21,6 +21,10 @@ if sys.platform != "win32": try: import myfsio_core as _rc + if not all(hasattr(_rc, f) for f in ( + "encrypt_stream_chunked", "decrypt_stream_chunked", + )): + raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release") _HAS_RUST = True except ImportError: _rc = None diff --git a/app/iam.py b/app/iam.py index f4d895d..7e8f3fa 100644 --- a/app/iam.py +++ b/app/iam.py @@ -398,9 +398,11 @@ class IamService: record = self._user_records.get(user_id) if record: self._check_expiry(access_key, record) + self._enforce_key_and_user_status(access_key) return principal self._maybe_reload() + self._enforce_key_and_user_status(access_key) user_id = self._key_index.get(access_key) if not user_id: raise IamError("Unknown access key") @@ -414,6 +416,7 @@ class IamService: def secret_for_key(self, access_key: str) -> str: self._maybe_reload() + self._enforce_key_and_user_status(access_key) secret = self._key_secrets.get(access_key) if not secret: raise IamError("Unknown access key") @@ -1028,6 +1031,16 @@ class IamService: user, _ = self._resolve_raw_user(access_key) return user + def _enforce_key_and_user_status(self, access_key: str) -> None: + key_status = self._key_status.get(access_key, "active") + if key_status != "active": + raise IamError("Access key is inactive") + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record and not record.get("enabled", True): + raise IamError("User account is disabled") + def get_secret_key(self, access_key: str) -> str | None: now = time.time() cached = self._secret_key_cache.get(access_key) @@ -1039,6 +1052,7 @@ class IamService: record = self._user_records.get(user_id) if record: self._check_expiry(access_key, record) + self._enforce_key_and_user_status(access_key) return secret_key self._maybe_reload() @@ -1049,6 +1063,7 @@ class IamService: record = self._user_records.get(user_id) if record: self._check_expiry(access_key, record) + self._enforce_key_and_user_status(access_key) self._secret_key_cache[access_key] = (secret, now) return secret return None @@ -1064,9 +1079,11 @@ class IamService: record = self._user_records.get(user_id) if record: self._check_expiry(access_key, record) + self._enforce_key_and_user_status(access_key) return principal self._maybe_reload() + self._enforce_key_and_user_status(access_key) user_id = self._key_index.get(access_key) if user_id: record = self._user_records.get(user_id) diff --git a/app/integrity.py b/app/integrity.py index b79cbc8..2ca3eb5 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -12,6 +12,8 @@ from typing import Any, Dict, List, Optional try: import myfsio_core as _rc + if not hasattr(_rc, "md5_file"): + raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release") _HAS_RUST = True except ImportError: _HAS_RUST = False @@ -192,10 +194,26 @@ class IntegrityCursorStore: except OSError as e: logger.error("Failed to save integrity cursor: %s", e) - def update_bucket(self, bucket_name: str, timestamp: float) -> None: + def update_bucket( + self, + bucket_name: str, + timestamp: float, + last_key: Optional[str] = None, + completed: bool = False, + ) -> None: with self._lock: data = self.load() - data["buckets"][bucket_name] = {"last_scanned": timestamp} + entry = data["buckets"].get(bucket_name, {}) + if completed: + entry["last_scanned"] = timestamp + entry.pop("last_key", None) + entry["completed"] = True + else: + entry["last_scanned"] = timestamp + if last_key is not None: + entry["last_key"] = last_key + entry["completed"] = False + data["buckets"][bucket_name] = entry self.save(data) def clean_stale(self, existing_buckets: List[str]) -> None: @@ -208,17 +226,32 @@ class IntegrityCursorStore: del data["buckets"][k] self.save(data) + def get_last_key(self, bucket_name: str) -> Optional[str]: + data = self.load() + entry = data.get("buckets", {}).get(bucket_name) + if entry is None: + return None + return entry.get("last_key") + def get_bucket_order(self, bucket_names: List[str]) -> List[str]: data = self.load() buckets_info = data.get("buckets", {}) - def sort_key(name: str) -> float: + incomplete = [] + complete = [] + for name in bucket_names: entry = buckets_info.get(name) if entry is None: - return 0.0 - return entry.get("last_scanned", 0.0) + incomplete.append((name, 0.0)) + elif entry.get("last_key") is not None: + incomplete.append((name, entry.get("last_scanned", 0.0))) + else: + complete.append((name, entry.get("last_scanned", 0.0))) - return sorted(bucket_names, key=sort_key) + incomplete.sort(key=lambda x: x[1]) + complete.sort(key=lambda x: x[1]) + + return [n for n, _ in incomplete] + [n for n, _ in complete] def get_info(self) -> Dict[str, Any]: data = self.load() @@ -226,7 +259,11 @@ class IntegrityCursorStore: return { "tracked_buckets": len(buckets), "buckets": { - name: info.get("last_scanned") + name: { + "last_scanned": info.get("last_scanned"), + "last_key": info.get("last_key"), + "completed": info.get("completed", False), + } for name, info in buckets.items() }, } @@ -325,13 +362,19 @@ class IntegrityChecker: if self._batch_exhausted(result): break result.buckets_scanned += 1 - self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + cursor_key = self.cursor_store.get_last_key(bucket_name) + key_corrupted = self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) + key_orphaned = self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) + key_phantom = self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) - self.cursor_store.update_bucket(bucket_name, time.time()) + returned_keys = [k for k in (key_corrupted, key_orphaned, key_phantom) if k is not None] + bucket_exhausted = self._batch_exhausted(result) + if bucket_exhausted and returned_keys: + self.cursor_store.update_bucket(bucket_name, time.time(), last_key=min(returned_keys)) + else: + self.cursor_store.update_bucket(bucket_name, time.time(), completed=True) result.execution_time_seconds = time.time() - start @@ -399,108 +442,172 @@ class IntegrityChecker: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) - def _check_corrupted_objects( - self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool - ) -> None: - bucket_path = self.storage_root / bucket_name - meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR - + def _collect_index_keys( + self, meta_root: Path, cursor_key: Optional[str] = None, + ) -> Dict[str, Dict[str, Any]]: + all_keys: Dict[str, Dict[str, Any]] = {} if not meta_root.exists(): - return - + return all_keys try: for index_file in meta_root.rglob("_index.json"): - if self._throttle(): - return - if self._batch_exhausted(result): - return if not index_file.is_file(): continue + rel_dir = index_file.parent.relative_to(meta_root) + dir_prefix = "" if rel_dir == Path(".") else rel_dir.as_posix() + if cursor_key is not None and dir_prefix: + full_prefix = dir_prefix + "/" + if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix: + continue try: index_data = json.loads(index_file.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue - - for key_name, entry in list(index_data.items()): - if self._throttle(): - return - if self._batch_exhausted(result): - return - - rel_dir = index_file.parent.relative_to(meta_root) - if rel_dir == Path("."): - full_key = key_name - else: - full_key = rel_dir.as_posix() + "/" + key_name - - object_path = bucket_path / full_key - if not object_path.exists(): + for key_name, entry in index_data.items(): + full_key = (dir_prefix + "/" + key_name) if dir_prefix else key_name + if cursor_key is not None and full_key <= cursor_key: continue + all_keys[full_key] = { + "entry": entry, + "index_file": index_file, + "key_name": key_name, + } + except OSError: + pass + return all_keys - result.objects_scanned += 1 + def _walk_bucket_files_sorted( + self, bucket_path: Path, cursor_key: Optional[str] = None, + ): + def _walk(dir_path: Path, prefix: str): + try: + entries = list(os.scandir(dir_path)) + except OSError: + return - meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} - stored_etag = meta.get("__etag__") - if not stored_etag: + def _sort_key(e): + if e.is_dir(follow_symlinks=False): + return e.name + "/" + return e.name + + entries.sort(key=_sort_key) + + for entry in entries: + if entry.is_dir(follow_symlinks=False): + if not prefix and entry.name in self.INTERNAL_FOLDERS: continue - - try: - actual_etag = _compute_etag(object_path) - except OSError: + new_prefix = (prefix + "/" + entry.name) if prefix else entry.name + if cursor_key is not None: + full_prefix = new_prefix + "/" + if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix: + continue + yield from _walk(Path(entry.path), new_prefix) + elif entry.is_file(follow_symlinks=False): + full_key = (prefix + "/" + entry.name) if prefix else entry.name + if cursor_key is not None and full_key <= cursor_key: continue + yield full_key - if actual_etag != stored_etag: - result.corrupted_objects += 1 - issue = IntegrityIssue( - issue_type="corrupted_object", - bucket=bucket_name, - key=full_key, - detail=f"stored_etag={stored_etag} actual_etag={actual_etag}", - ) + yield from _walk(bucket_path, "") - if auto_heal and not dry_run: - try: - stat = object_path.stat() - meta["__etag__"] = actual_etag - meta["__size__"] = str(stat.st_size) - meta["__last_modified__"] = str(stat.st_mtime) - index_data[key_name] = {"metadata": meta} - self._atomic_write_index(index_file, index_data) - issue.healed = True - issue.heal_action = "updated etag in index" - result.issues_healed += 1 - except OSError as e: - result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}") - - self._add_issue(result, issue) - except OSError as e: - result.errors.append(f"check corrupted {bucket_name}: {e}") - - def _check_orphaned_objects( - self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool - ) -> None: + def _check_corrupted_objects( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, + cursor_key: Optional[str] = None, + ) -> Optional[str]: + if self._batch_exhausted(result): + return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + if not meta_root.exists(): + return None + + last_key = None try: - for entry in bucket_path.rglob("*"): + all_keys = self._collect_index_keys(meta_root, cursor_key) + sorted_keys = sorted(all_keys.keys()) + + for full_key in sorted_keys: if self._throttle(): - return + return last_key if self._batch_exhausted(result): - return - if not entry.is_file(): - continue - try: - rel = entry.relative_to(bucket_path) - except ValueError: - continue - if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: + return last_key + + info = all_keys[full_key] + entry = info["entry"] + index_file = info["index_file"] + key_name = info["key_name"] + + object_path = bucket_path / full_key + if not object_path.exists(): continue result.objects_scanned += 1 - full_key = rel.as_posix() - key_name = rel.name - parent = rel.parent + last_key = full_key + + meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} + stored_etag = meta.get("__etag__") + if not stored_etag: + continue + + try: + actual_etag = _compute_etag(object_path) + except OSError: + continue + + if actual_etag != stored_etag: + result.corrupted_objects += 1 + issue = IntegrityIssue( + issue_type="corrupted_object", + bucket=bucket_name, + key=full_key, + detail=f"stored_etag={stored_etag} actual_etag={actual_etag}", + ) + + if auto_heal and not dry_run: + try: + stat = object_path.stat() + meta["__etag__"] = actual_etag + meta["__size__"] = str(stat.st_size) + meta["__last_modified__"] = str(stat.st_mtime) + try: + index_data = json.loads(index_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + index_data = {} + index_data[key_name] = {"metadata": meta} + self._atomic_write_index(index_file, index_data) + issue.healed = True + issue.heal_action = "updated etag in index" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}") + + self._add_issue(result, issue) + except OSError as e: + result.errors.append(f"check corrupted {bucket_name}: {e}") + return last_key + + def _check_orphaned_objects( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, + cursor_key: Optional[str] = None, + ) -> Optional[str]: + if self._batch_exhausted(result): + return None + bucket_path = self.storage_root / bucket_name + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + + last_key = None + try: + for full_key in self._walk_bucket_files_sorted(bucket_path, cursor_key): + if self._throttle(): + return last_key + if self._batch_exhausted(result): + return last_key + + result.objects_scanned += 1 + last_key = full_key + key_path = Path(full_key) + key_name = key_path.name + parent = key_path.parent if parent == Path("."): index_path = meta_root / "_index.json" @@ -526,8 +633,9 @@ class IntegrityChecker: if auto_heal and not dry_run: try: - etag = _compute_etag(entry) - stat = entry.stat() + object_path = bucket_path / full_key + etag = _compute_etag(object_path) + stat = object_path.stat() meta = { "__etag__": etag, "__size__": str(stat.st_size), @@ -550,58 +658,56 @@ class IntegrityChecker: self._add_issue(result, issue) except OSError as e: result.errors.append(f"check orphaned {bucket_name}: {e}") + return last_key def _check_phantom_metadata( - self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool - ) -> None: + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, + cursor_key: Optional[str] = None, + ) -> Optional[str]: + if self._batch_exhausted(result): + return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR if not meta_root.exists(): - return + return None + last_key = None try: - for index_file in meta_root.rglob("_index.json"): - if self._throttle(): - return + all_keys = self._collect_index_keys(meta_root, cursor_key) + sorted_keys = sorted(all_keys.keys()) + + heal_by_index: Dict[Path, List[str]] = {} + + for full_key in sorted_keys: if self._batch_exhausted(result): - return - if not index_file.is_file(): - continue - try: - index_data = json.loads(index_file.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError): - continue + break - keys_to_remove = [] - for key_name in list(index_data.keys()): - if self._batch_exhausted(result): - break - result.objects_scanned += 1 - rel_dir = index_file.parent.relative_to(meta_root) - if rel_dir == Path("."): - full_key = key_name - else: - full_key = rel_dir.as_posix() + "/" + key_name + result.objects_scanned += 1 + last_key = full_key - object_path = bucket_path / full_key - if not object_path.exists(): - result.phantom_metadata += 1 - issue = IntegrityIssue( - issue_type="phantom_metadata", - bucket=bucket_name, - key=full_key, - detail="metadata entry without file on disk", - ) - if auto_heal and not dry_run: - keys_to_remove.append(key_name) - issue.healed = True - issue.heal_action = "removed stale index entry" - result.issues_healed += 1 - self._add_issue(result, issue) + object_path = bucket_path / full_key + if not object_path.exists(): + result.phantom_metadata += 1 + info = all_keys[full_key] + issue = IntegrityIssue( + issue_type="phantom_metadata", + bucket=bucket_name, + key=full_key, + detail="metadata entry without file on disk", + ) + if auto_heal and not dry_run: + index_file = info["index_file"] + heal_by_index.setdefault(index_file, []).append(info["key_name"]) + issue.healed = True + issue.heal_action = "removed stale index entry" + result.issues_healed += 1 + self._add_issue(result, issue) - if keys_to_remove and auto_heal and not dry_run: + if heal_by_index and auto_heal and not dry_run: + for index_file, keys_to_remove in heal_by_index.items(): try: + index_data = json.loads(index_file.read_text(encoding="utf-8")) for k in keys_to_remove: index_data.pop(k, None) if index_data: @@ -612,10 +718,13 @@ class IntegrityChecker: result.errors.append(f"heal phantom {bucket_name}: {e}") except OSError as e: result.errors.append(f"check phantom {bucket_name}: {e}") + return last_key def _check_stale_versions( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: + if self._batch_exhausted(result): + return versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR if not versions_root.exists(): @@ -682,6 +791,8 @@ class IntegrityChecker: def _check_etag_cache( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: + if self._batch_exhausted(result): + return etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json" if not etag_index_path.exists(): @@ -751,6 +862,8 @@ class IntegrityChecker: def _check_legacy_metadata( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: + if self._batch_exhausted(result): + return legacy_meta_root = self.storage_root / bucket_name / ".meta" if not legacy_meta_root.exists(): return diff --git a/app/s3_api.py b/app/s3_api.py index 2c151ee..97cfd0a 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -19,6 +19,10 @@ from defusedxml.ElementTree import fromstring try: import myfsio_core as _rc + if not all(hasattr(_rc, f) for f in ( + "verify_sigv4_signature", "derive_signing_key", "clear_signing_key_cache", + )): + raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release") _HAS_RUST = True except ImportError: _rc = None @@ -201,6 +205,11 @@ _SIGNING_KEY_CACHE_LOCK = threading.Lock() _SIGNING_KEY_CACHE_TTL = 60.0 _SIGNING_KEY_CACHE_MAX_SIZE = 256 +_SIGV4_HEADER_RE = re.compile( + r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)" +) +_SIGV4_REQUIRED_HEADERS = frozenset({'host', 'x-amz-date'}) + def clear_signing_key_cache() -> None: if _HAS_RUST: @@ -259,10 +268,7 @@ def _get_canonical_uri(req: Any) -> str: def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: - match = re.match( - r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)", - auth_header, - ) + match = _SIGV4_HEADER_RE.match(auth_header) if not match: return None @@ -286,14 +292,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if time_diff > tolerance: raise IamError("Request timestamp too old or too far in the future") - required_headers = {'host', 'x-amz-date'} signed_headers_set = set(signed_headers_str.split(';')) - if not required_headers.issubset(signed_headers_set): - if 'date' in signed_headers_set: - required_headers.remove('x-amz-date') - required_headers.add('date') - - if not required_headers.issubset(signed_headers_set): + if not _SIGV4_REQUIRED_HEADERS.issubset(signed_headers_set): + if not ({'host', 'date'}.issubset(signed_headers_set)): raise IamError("Required headers not signed") canonical_uri = _get_canonical_uri(req) @@ -533,21 +534,6 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti raise iam_error or IamError("Access denied") -def _enforce_bucket_policy(principal: Principal | None, bucket_name: str | None, object_key: str | None, action: str) -> None: - if not bucket_name: - return - policy_context = _build_policy_context() - decision = _bucket_policies().evaluate( - principal.access_key if principal else None, - bucket_name, - object_key, - action, - policy_context, - ) - if decision == "deny": - raise IamError("Access denied by bucket policy") - - def _object_principal(action: str, bucket_name: str, object_key: str): principal, error = _require_principal() try: @@ -556,121 +542,7 @@ def _object_principal(action: str, bucket_name: str, object_key: str): except IamError as exc: if not error: return None, _error_response("AccessDenied", str(exc), 403) - if not _has_presign_params(): return None, error - try: - principal = _validate_presigned_request(action, bucket_name, object_key) - _enforce_bucket_policy(principal, bucket_name, object_key, action) - return principal, None - except IamError as exc: - return None, _error_response("AccessDenied", str(exc), 403) - - -def _has_presign_params() -> bool: - return bool(request.args.get("X-Amz-Algorithm")) - - -def _validate_presigned_request(action: str, bucket_name: str, object_key: str) -> Principal: - algorithm = request.args.get("X-Amz-Algorithm") - credential = request.args.get("X-Amz-Credential") - amz_date = request.args.get("X-Amz-Date") - signed_headers = request.args.get("X-Amz-SignedHeaders") - expires = request.args.get("X-Amz-Expires") - signature = request.args.get("X-Amz-Signature") - if not all([algorithm, credential, amz_date, signed_headers, expires, signature]): - raise IamError("Malformed presigned URL") - if algorithm != "AWS4-HMAC-SHA256": - raise IamError("Unsupported signing algorithm") - - parts = credential.split("/") - if len(parts) != 5: - raise IamError("Invalid credential scope") - access_key, date_stamp, region, service, terminal = parts - if terminal != "aws4_request": - raise IamError("Invalid credential scope") - config_region = current_app.config["AWS_REGION"] - config_service = current_app.config["AWS_SERVICE"] - if region != config_region or service != config_service: - raise IamError("Credential scope mismatch") - - try: - expiry = int(expires) - except ValueError as exc: - raise IamError("Invalid expiration") from exc - min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) - max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) - if expiry < min_expiry or expiry > max_expiry: - raise IamError(f"Expiration must be between {min_expiry} second(s) and {max_expiry} seconds") - - try: - request_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc) - except ValueError as exc: - raise IamError("Invalid X-Amz-Date") from exc - now = datetime.now(timezone.utc) - tolerance = timedelta(seconds=current_app.config.get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900)) - if request_time > now + tolerance: - raise IamError("Request date is too far in the future") - if now > request_time + timedelta(seconds=expiry): - raise IamError("Presigned URL expired") - - signed_headers_list = [header.strip().lower() for header in signed_headers.split(";") if header] - signed_headers_list.sort() - canonical_headers = _canonical_headers_from_request(signed_headers_list) - canonical_query = _canonical_query_from_request() - payload_hash = request.args.get("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") - canonical_request = "\n".join( - [ - request.method, - _canonical_uri(bucket_name, object_key), - canonical_query, - canonical_headers, - ";".join(signed_headers_list), - payload_hash, - ] - ) - hashed_request = hashlib.sha256(canonical_request.encode()).hexdigest() - scope = f"{date_stamp}/{region}/{service}/aws4_request" - string_to_sign = "\n".join([ - "AWS4-HMAC-SHA256", - amz_date, - scope, - hashed_request, - ]) - secret = _iam().secret_for_key(access_key) - signing_key = _derive_signing_key(secret, date_stamp, region, service) - expected = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest() - if not hmac.compare_digest(expected, signature): - raise IamError("Signature mismatch") - return _iam().principal_for_key(access_key) - - -def _canonical_query_from_request() -> str: - parts = [] - for key in sorted(request.args.keys()): - if key == "X-Amz-Signature": - continue - values = request.args.getlist(key) - encoded_key = quote(str(key), safe="-_.~") - for value in sorted(values): - encoded_value = quote(str(value), safe="-_.~") - parts.append(f"{encoded_key}={encoded_value}") - return "&".join(parts) - - -def _canonical_headers_from_request(headers: list[str]) -> str: - lines = [] - for header in headers: - if header == "host": - api_base = current_app.config.get("API_BASE_URL") - if api_base: - value = urlparse(api_base).netloc - else: - value = request.host - else: - value = request.headers.get(header, "") - canonical_value = " ".join(value.strip().split()) if value else "" - lines.append(f"{header}:{canonical_value}") - return "\n".join(lines) + "\n" def _canonical_uri(bucket_name: str, object_key: str | None) -> str: @@ -736,8 +608,8 @@ def _generate_presigned_url( host = parsed.netloc scheme = parsed.scheme else: - host = request.headers.get("X-Forwarded-Host", request.host) - scheme = request.headers.get("X-Forwarded-Proto", request.scheme or "http") + host = request.host + scheme = request.scheme or "http" canonical_headers = f"host:{host}\n" canonical_request = "\n".join( @@ -1010,7 +882,7 @@ def _render_encryption_document(config: dict[str, Any]) -> Element: return root -def _stream_file(path, chunk_size: int = 256 * 1024): +def _stream_file(path, chunk_size: int = 1024 * 1024): with path.open("rb") as handle: while True: chunk = handle.read(chunk_size) @@ -2961,9 +2833,12 @@ def object_handler(bucket_name: str, object_key: str): is_encrypted = "x-amz-server-side-encryption" in metadata cond_etag = metadata.get("__etag__") + _etag_was_healed = False if not cond_etag and not is_encrypted: try: cond_etag = storage._compute_etag(path) + _etag_was_healed = True + storage.heal_missing_etag(bucket_name, object_key, cond_etag) except OSError: cond_etag = None if cond_etag: @@ -3009,7 +2884,7 @@ def object_handler(bucket_name: str, object_key: str): try: stat = path.stat() file_size = stat.st_size - etag = metadata.get("__etag__") or storage._compute_etag(path) + etag = cond_etag or storage._compute_etag(path) except PermissionError: return _error_response("AccessDenied", "Permission denied accessing object", 403) except OSError as exc: @@ -3057,7 +2932,7 @@ def object_handler(bucket_name: str, object_key: str): try: stat = path.stat() response = Response(status=200) - etag = metadata.get("__etag__") or storage._compute_etag(path) + etag = cond_etag or storage._compute_etag(path) except PermissionError: return _error_response("AccessDenied", "Permission denied accessing object", 403) except OSError as exc: @@ -3442,9 +3317,13 @@ def head_object(bucket_name: str, object_key: str) -> Response: return error try: _authorize_action(principal, bucket_name, "read", object_key=object_key) - path = _storage().get_object_path(bucket_name, object_key) - metadata = _storage().get_object_metadata(bucket_name, object_key) - etag = metadata.get("__etag__") or _storage()._compute_etag(path) + storage = _storage() + path = storage.get_object_path(bucket_name, object_key) + metadata = storage.get_object_metadata(bucket_name, object_key) + etag = metadata.get("__etag__") + if not etag: + etag = storage._compute_etag(path) + storage.heal_missing_etag(bucket_name, object_key, etag) head_mtime = float(metadata["__last_modified__"]) if "__last_modified__" in metadata else None if head_mtime is None: diff --git a/app/storage.py b/app/storage.py index fa66332..d2469e9 100644 --- a/app/storage.py +++ b/app/storage.py @@ -2,6 +2,7 @@ from __future__ import annotations import hashlib import json +import logging import os import re import shutil @@ -20,12 +21,21 @@ from typing import Any, BinaryIO, Dict, Generator, List, Optional try: import myfsio_core as _rc + if not all(hasattr(_rc, f) for f in ( + "validate_bucket_name", "validate_object_key", "md5_file", + "shallow_scan", "bucket_stats_scan", "search_objects_scan", + "stream_to_file_with_md5", "assemble_parts_with_md5", + "build_object_cache", "read_index_entry", "write_index_entry", + "delete_index_entry", "check_bucket_contents", + )): + raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release") _HAS_RUST = True except ImportError: _rc = None _HAS_RUST = False -# Platform-specific file locking +logger = logging.getLogger(__name__) + if os.name == "nt": import msvcrt @@ -190,6 +200,7 @@ class ObjectStorage: object_cache_max_size: int = 100, bucket_config_cache_ttl: float = 30.0, object_key_max_length_bytes: int = 1024, + meta_read_cache_max: int = 2048, ) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) @@ -208,7 +219,7 @@ class ObjectStorage: self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {} self._meta_index_locks: Dict[str, threading.Lock] = {} self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict() - self._meta_read_cache_max = 2048 + self._meta_read_cache_max = meta_read_cache_max self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") self._stats_mem: Dict[str, Dict[str, int]] = {} self._stats_serial: Dict[str, int] = {} @@ -218,6 +229,7 @@ class ObjectStorage: self._stats_flush_timer: Optional[threading.Timer] = None self._etag_index_dirty: set[str] = set() self._etag_index_flush_timer: Optional[threading.Timer] = None + self._etag_index_mem: Dict[str, tuple[Dict[str, str], float]] = {} def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: with self._registry_lock: @@ -427,7 +439,7 @@ class ObjectStorage: cache_path = self._system_bucket_root(bucket_id) / "stats.json" try: cache_path.parent.mkdir(parents=True, exist_ok=True) - self._atomic_write_json(cache_path, data) + self._atomic_write_json(cache_path, data, sync=False) except OSError: pass @@ -602,14 +614,7 @@ class ObjectStorage: is_truncated=False, next_continuation_token=None, ) - etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" - meta_cache: Dict[str, str] = {} - if etag_index_path.exists(): - try: - with open(etag_index_path, 'r', encoding='utf-8') as f: - meta_cache = json.load(f) - except (OSError, json.JSONDecodeError): - pass + meta_cache: Dict[str, str] = self._get_etag_index(bucket_id) entries_files: list[tuple[str, int, float, Optional[str]]] = [] entries_dirs: list[str] = [] @@ -1079,6 +1084,30 @@ class ObjectStorage: safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) return self._read_metadata(bucket_path.name, safe_key) or {} + def heal_missing_etag(self, bucket_name: str, object_key: str, etag: str) -> None: + """Persist a computed ETag back to metadata (self-heal on read).""" + try: + bucket_path = self._bucket_path(bucket_name) + if not bucket_path.exists(): + return + bucket_id = bucket_path.name + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) + existing = self._read_metadata(bucket_id, safe_key) or {} + if existing.get("__etag__"): + return + existing["__etag__"] = etag + self._write_metadata(bucket_id, safe_key, existing) + with self._obj_cache_lock: + cached = self._object_cache.get(bucket_id) + if cached: + obj = cached[0].get(safe_key.as_posix()) + if obj and not obj.etag: + obj.etag = etag + self._etag_index_dirty.add(bucket_id) + self._schedule_etag_index_flush() + except Exception: + logger.warning("Failed to heal missing ETag for %s/%s", bucket_name, object_key) + def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: """Remove empty parent directories in a background thread. @@ -2088,6 +2117,7 @@ class ObjectStorage: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: json.dump(raw["etag_cache"], f) + self._etag_index_mem[bucket_id] = (dict(raw["etag_cache"]), etag_index_path.stat().st_mtime) except OSError: pass for key, size, mtime, etag in raw["objects"]: @@ -2211,6 +2241,7 @@ class ObjectStorage: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: json.dump(meta_cache, f) + self._etag_index_mem[bucket_id] = (dict(meta_cache), etag_index_path.stat().st_mtime) except OSError: pass @@ -2324,6 +2355,25 @@ class ObjectStorage: self._etag_index_dirty.add(bucket_id) self._schedule_etag_index_flush() + def _get_etag_index(self, bucket_id: str) -> Dict[str, str]: + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + try: + current_mtime = etag_index_path.stat().st_mtime + except OSError: + return {} + cached = self._etag_index_mem.get(bucket_id) + if cached: + cache_dict, cached_mtime = cached + if current_mtime == cached_mtime: + return cache_dict + try: + with open(etag_index_path, 'r', encoding='utf-8') as f: + data = json.load(f) + self._etag_index_mem[bucket_id] = (data, current_mtime) + return data + except (OSError, json.JSONDecodeError): + return {} + def _schedule_etag_index_flush(self) -> None: if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive(): self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes) @@ -2342,11 +2392,10 @@ class ObjectStorage: index = {k: v.etag for k, v in objects.items() if v.etag} etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: - etag_index_path.parent.mkdir(parents=True, exist_ok=True) - with open(etag_index_path, 'w', encoding='utf-8') as f: - json.dump(index, f) + self._atomic_write_json(etag_index_path, index, sync=False) + self._etag_index_mem[bucket_id] = (index, etag_index_path.stat().st_mtime) except OSError: - pass + logger.warning("Failed to flush etag index for bucket %s", bucket_id) def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None: """Pre-warm the object cache for specified buckets or all buckets. @@ -2388,14 +2437,15 @@ class ObjectStorage: path.mkdir(parents=True, exist_ok=True) @staticmethod - def _atomic_write_json(path: Path, data: Any) -> None: + def _atomic_write_json(path: Path, data: Any, *, sync: bool = True) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_suffix(".tmp") try: with tmp_path.open("w", encoding="utf-8") as f: json.dump(data, f) - f.flush() - os.fsync(f.fileno()) + if sync: + f.flush() + os.fsync(f.fileno()) tmp_path.replace(path) except BaseException: try: diff --git a/app/version.py b/app/version.py index 5c24c4a..0abc36a 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.4.1" +APP_VERSION = "0.4.2" def get_version() -> str: diff --git a/requirements.txt b/requirements.txt index 8a29238..abc53ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -Flask>=3.1.2 +Flask>=3.1.3 Flask-Limiter>=4.1.1 Flask-Cors>=6.0.2 Flask-WTF>=1.2.2 @@ -6,8 +6,8 @@ python-dotenv>=1.2.1 pytest>=9.0.2 requests>=2.32.5 boto3>=1.42.14 -granian>=2.2.0 -psutil>=7.1.3 -cryptography>=46.0.3 +granian>=2.7.2 +psutil>=7.2.2 +cryptography>=46.0.5 defusedxml>=0.7.1 -duckdb>=1.4.4 \ No newline at end of file +duckdb>=1.5.1 \ No newline at end of file diff --git a/run.py b/run.py index d4ea394..40ab540 100644 --- a/run.py +++ b/run.py @@ -26,6 +26,7 @@ from typing import Optional from app import create_api_app, create_ui_app from app.config import AppConfig from app.iam import IamService, IamError, ALLOWED_ACTIONS, _derive_fernet_key +from app.version import get_version def _server_host() -> str: @@ -229,6 +230,7 @@ if __name__ == "__main__": parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit") parser.add_argument("--show-config", action="store_true", help="Show configuration summary and exit") parser.add_argument("--reset-cred", action="store_true", help="Reset admin credentials and exit") + parser.add_argument("--version", action="version", version=f"MyFSIO {get_version()}") args = parser.parse_args() if args.reset_cred or args.mode == "reset-cred": diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index f35f46f..3f4edb2 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -245,12 +245,12 @@