diff --git a/app/s3_api.py b/app/s3_api.py index d6620eb..5c5cde5 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -305,16 +305,10 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: header_values, payload_hash, amz_date, date_stamp, region, service, secret_key, signature, ): - if current_app.config.get("DEBUG_SIGV4"): - logger.warning("SigV4 signature mismatch for %s %s", req.method, req.path) raise IamError("SignatureDoesNotMatch") else: method = req.method - query_args = [] - for key, value in req.args.items(multi=True): - query_args.append((key, value)) - query_args.sort(key=lambda x: (x[0], x[1])) - + query_args = sorted(req.args.items(multi=True), key=lambda x: (x[0], x[1])) canonical_query_parts = [] for k, v in query_args: canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}") @@ -339,8 +333,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}" calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() if not hmac.compare_digest(calculated_signature, signature): - if current_app.config.get("DEBUG_SIGV4"): - logger.warning("SigV4 signature mismatch for %s %s", method, req.path) raise IamError("SignatureDoesNotMatch") session_token = req.headers.get("X-Amz-Security-Token") @@ -682,7 +674,7 @@ def _extract_request_metadata() -> Dict[str, str]: for header, value in request.headers.items(): if header.lower().startswith("x-amz-meta-"): key = header[11:] - if key: + if key and not (key.startswith("__") and key.endswith("__")): metadata[key] = value return metadata @@ -1039,6 +1031,8 @@ def _apply_object_headers( response.headers["ETag"] = f'"{etag}"' response.headers["Accept-Ranges"] = "bytes" for key, value in (metadata or {}).items(): + if key.startswith("__") and key.endswith("__"): + continue safe_value = _sanitize_header_value(str(value)) response.headers[f"X-Amz-Meta-{key}"] = safe_value @@ -2467,7 +2461,7 @@ def _post_object(bucket_name: str) -> Response: for field_name, value in request.form.items(): if field_name.lower().startswith("x-amz-meta-"): key = field_name[11:] - if key: + if key and not (key.startswith("__") and key.endswith("__")): metadata[key] = value try: meta = storage.put_object(bucket_name, object_key, file.stream, metadata=metadata or None) @@ -3445,8 +3439,8 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response: if validation_error: return _error_response("InvalidArgument", validation_error, 400) else: - metadata = source_metadata - + metadata = {k: v for k, v in source_metadata.items() if not (k.startswith("__") and k.endswith("__"))} + try: with source_path.open("rb") as stream: meta = storage.put_object( diff --git a/app/storage.py b/app/storage.py index 0d4111e..e9b1c0c 100644 --- a/app/storage.py +++ b/app/storage.py @@ -16,7 +16,7 @@ from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Any, BinaryIO, Dict, Generator, List, Optional try: @@ -292,37 +292,43 @@ class ObjectStorage: bucket_str = str(bucket_path) try: - stack = [bucket_str] - while stack: - current = stack.pop() - try: - with os.scandir(current) as it: - for entry in it: - if current == bucket_str and entry.name in internal: - continue - if entry.is_dir(follow_symlinks=False): - stack.append(entry.path) - elif entry.is_file(follow_symlinks=False): - object_count += 1 - total_bytes += entry.stat(follow_symlinks=False).st_size - except PermissionError: - continue - - versions_root = self._bucket_versions_root(bucket_name) - if versions_root.exists(): - v_stack = [str(versions_root)] - while v_stack: - v_current = v_stack.pop() + if _HAS_RUST: + versions_root = str(self._bucket_versions_root(bucket_name)) + object_count, total_bytes, version_count, version_bytes = _rc.bucket_stats_scan( + bucket_str, versions_root + ) + else: + stack = [bucket_str] + while stack: + current = stack.pop() try: - with os.scandir(v_current) as it: + with os.scandir(current) as it: for entry in it: + if current == bucket_str and entry.name in internal: + continue if entry.is_dir(follow_symlinks=False): - v_stack.append(entry.path) - elif entry.is_file(follow_symlinks=False) and entry.name.endswith(".bin"): - version_count += 1 - version_bytes += entry.stat(follow_symlinks=False).st_size + stack.append(entry.path) + elif entry.is_file(follow_symlinks=False): + object_count += 1 + total_bytes += entry.stat(follow_symlinks=False).st_size except PermissionError: continue + + versions_root = self._bucket_versions_root(bucket_name) + if versions_root.exists(): + v_stack = [str(versions_root)] + while v_stack: + v_current = v_stack.pop() + try: + with os.scandir(v_current) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + v_stack.append(entry.path) + elif entry.is_file(follow_symlinks=False) and entry.name.endswith(".bin"): + version_count += 1 + version_bytes += entry.stat(follow_symlinks=False).st_size + except PermissionError: + continue except OSError: if cached_stats is not None: return cached_stats @@ -559,47 +565,69 @@ class ObjectStorage: entries_files: list[tuple[str, int, float, Optional[str]]] = [] entries_dirs: list[str] = [] - try: - with os.scandir(str(target_dir)) as it: - for entry in it: - name = entry.name - if name in self.INTERNAL_FOLDERS: - continue - if entry.is_dir(follow_symlinks=False): - cp = prefix + name + delimiter - entries_dirs.append(cp) - elif entry.is_file(follow_symlinks=False): - key = prefix + name - try: - st = entry.stat() - etag = meta_cache.get(key) - entries_files.append((key, st.st_size, st.st_mtime, etag)) - except OSError: - pass - except OSError: - return ShallowListResult( - objects=[], common_prefixes=[], - is_truncated=False, next_continuation_token=None, - ) + if _HAS_RUST: + try: + raw = _rc.shallow_scan(str(target_dir), prefix, json.dumps(meta_cache)) + entries_files = [] + for key, size, mtime, etag in raw["files"]: + if etag is None: + safe_key = PurePosixPath(key) + meta = self._read_metadata(bucket_id, Path(safe_key)) + etag = meta.get("__etag__") if meta else None + entries_files.append((key, size, mtime, etag)) + entries_dirs = raw["dirs"] + all_items = raw["merged_keys"] + except OSError: + return ShallowListResult( + objects=[], common_prefixes=[], + is_truncated=False, next_continuation_token=None, + ) + else: + try: + with os.scandir(str(target_dir)) as it: + for entry in it: + name = entry.name + if name in self.INTERNAL_FOLDERS: + continue + if entry.is_dir(follow_symlinks=False): + cp = prefix + name + delimiter + entries_dirs.append(cp) + elif entry.is_file(follow_symlinks=False): + key = prefix + name + try: + st = entry.stat() + etag = meta_cache.get(key) + if etag is None: + safe_key = PurePosixPath(key) + meta = self._read_metadata(bucket_id, Path(safe_key)) + etag = meta.get("__etag__") if meta else None + entries_files.append((key, st.st_size, st.st_mtime, etag)) + except OSError: + pass + except OSError: + return ShallowListResult( + objects=[], common_prefixes=[], + is_truncated=False, next_continuation_token=None, + ) - entries_dirs.sort() - entries_files.sort(key=lambda x: x[0]) + entries_dirs.sort() + entries_files.sort(key=lambda x: x[0]) - all_items: list[tuple[str, bool]] = [] - fi, di = 0, 0 - while fi < len(entries_files) and di < len(entries_dirs): - if entries_files[fi][0] <= entries_dirs[di]: + all_items: list[tuple[str, bool]] = [] + fi, di = 0, 0 + while fi < len(entries_files) and di < len(entries_dirs): + if entries_files[fi][0] <= entries_dirs[di]: + all_items.append((entries_files[fi][0], False)) + fi += 1 + else: + all_items.append((entries_dirs[di], True)) + di += 1 + while fi < len(entries_files): all_items.append((entries_files[fi][0], False)) fi += 1 - else: + while di < len(entries_dirs): all_items.append((entries_dirs[di], True)) di += 1 - while fi < len(entries_files): - all_items.append((entries_files[fi][0], False)) - fi += 1 - while di < len(entries_dirs): - all_items.append((entries_dirs[di], True)) - di += 1 files_map = {e[0]: e for e in entries_files} @@ -714,6 +742,22 @@ class ObjectStorage: else: search_root = bucket_path + if _HAS_RUST: + raw = _rc.search_objects_scan( + str(bucket_path), str(search_root), query, limit + ) + results = [ + { + "key": k, + "size": s, + "last_modified": datetime.fromtimestamp( + m, tz=timezone.utc + ).strftime("%Y-%m-%dT%H:%M:%S.000Z"), + } + for k, s, m in raw["results"] + ] + return {"results": results, "truncated": raw["truncated"]} + query_lower = query.lower() results: list[Dict[str, Any]] = [] internal = self.INTERNAL_FOLDERS @@ -1838,21 +1882,41 @@ class ObjectStorage: return list(self._build_object_cache(bucket_path).keys()) def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]: - """Build a complete object metadata cache for a bucket. - - Uses os.scandir for fast directory walking and a persistent etag index. - """ from concurrent.futures import ThreadPoolExecutor - + bucket_id = bucket_path.name objects: Dict[str, ObjectMeta] = {} bucket_str = str(bucket_path) bucket_len = len(bucket_str) + 1 - + + if _HAS_RUST: + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + raw = _rc.build_object_cache( + bucket_str, + str(self._bucket_meta_root(bucket_id)), + str(etag_index_path), + ) + if raw["etag_cache_changed"] and raw["etag_cache"]: + try: + etag_index_path.parent.mkdir(parents=True, exist_ok=True) + with open(etag_index_path, 'w', encoding='utf-8') as f: + json.dump(raw["etag_cache"], f) + except OSError: + pass + for key, size, mtime, etag in raw["objects"]: + objects[key] = ObjectMeta( + key=key, + size=size, + last_modified=datetime.fromtimestamp(mtime, timezone.utc), + etag=etag, + metadata=None, + ) + return objects + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" meta_cache: Dict[str, str] = {} index_mtime: float = 0 - + if etag_index_path.exists(): try: index_mtime = etag_index_path.stat().st_mtime @@ -1860,10 +1924,10 @@ class ObjectStorage: meta_cache = json.load(f) except (OSError, json.JSONDecodeError): meta_cache = {} - + meta_root = self._bucket_meta_root(bucket_id) needs_rebuild = False - + if meta_root.exists() and index_mtime > 0: def check_newer(dir_path: str) -> bool: try: @@ -1881,7 +1945,7 @@ class ObjectStorage: needs_rebuild = check_newer(str(meta_root)) elif not meta_cache: needs_rebuild = True - + if needs_rebuild and meta_root.exists(): meta_str = str(meta_root) meta_len = len(meta_str) + 1 @@ -1962,7 +2026,7 @@ class ObjectStorage: json.dump(meta_cache, f) except OSError: pass - + def scan_dir(dir_path: str) -> None: try: with os.scandir(dir_path) as it: @@ -1977,11 +2041,11 @@ class ObjectStorage: first_part = rel.split(os.sep)[0] if os.sep in rel else rel if first_part in self.INTERNAL_FOLDERS: continue - + key = rel.replace(os.sep, '/') try: stat = entry.stat() - + etag = meta_cache.get(key) objects[key] = ObjectMeta( @@ -1989,13 +2053,13 @@ class ObjectStorage: size=stat.st_size, last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=etag, - metadata=None, + metadata=None, ) except OSError: pass except OSError: pass - + scan_dir(bucket_str) return objects @@ -2094,16 +2158,15 @@ class ObjectStorage: def _update_etag_index(self, bucket_id: str, key: str, etag: Optional[str]) -> None: etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + if not etag_index_path.exists(): + return try: - index: Dict[str, str] = {} - if etag_index_path.exists(): - with open(etag_index_path, 'r', encoding='utf-8') as f: - index = json.load(f) + with open(etag_index_path, 'r', encoding='utf-8') as f: + index = json.load(f) if etag is None: index.pop(key, None) else: index[key] = etag - etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: json.dump(index, f) except (OSError, json.JSONDecodeError): @@ -2281,15 +2344,18 @@ class ObjectStorage: index_path, entry_name = self._index_file_for_key(bucket_name, key) lock = self._get_meta_index_lock(str(index_path)) with lock: - index_path.parent.mkdir(parents=True, exist_ok=True) - index_data: Dict[str, Any] = {} - if index_path.exists(): - try: - index_data = json.loads(index_path.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError): - pass - index_data[entry_name] = entry - index_path.write_text(json.dumps(index_data), encoding="utf-8") + if _HAS_RUST: + _rc.write_index_entry(str(index_path), entry_name, json.dumps(entry)) + else: + index_path.parent.mkdir(parents=True, exist_ok=True) + index_data: Dict[str, Any] = {} + if index_path.exists(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + index_data[entry_name] = entry + index_path.write_text(json.dumps(index_data), encoding="utf-8") self._invalidate_meta_read_cache(bucket_name, key) def _delete_index_entry(self, bucket_name: str, key: Path) -> None: @@ -2299,20 +2365,23 @@ class ObjectStorage: return lock = self._get_meta_index_lock(str(index_path)) with lock: - try: - index_data = json.loads(index_path.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError): - self._invalidate_meta_read_cache(bucket_name, key) - return - if entry_name in index_data: - del index_data[entry_name] - if index_data: - index_path.write_text(json.dumps(index_data), encoding="utf-8") - else: - try: - index_path.unlink() - except OSError: - pass + if _HAS_RUST: + _rc.delete_index_entry(str(index_path), entry_name) + else: + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + self._invalidate_meta_read_cache(bucket_name, key) + return + if entry_name in index_data: + del index_data[entry_name] + if index_data: + index_path.write_text(json.dumps(index_data), encoding="utf-8") + else: + try: + index_path.unlink() + except OSError: + pass self._invalidate_meta_read_cache(bucket_name, key) def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]: @@ -2410,15 +2479,24 @@ class ObjectStorage: continue def _check_bucket_contents(self, bucket_path: Path) -> tuple[bool, bool, bool]: - """Check bucket for objects, versions, and multipart uploads in a single pass. + bucket_name = bucket_path.name + + if _HAS_RUST: + return _rc.check_bucket_contents( + str(bucket_path), + [ + str(self._bucket_versions_root(bucket_name)), + str(self._legacy_versions_root(bucket_name)), + ], + [ + str(self._multipart_bucket_root(bucket_name)), + str(self._legacy_multipart_bucket_root(bucket_name)), + ], + ) - Returns (has_visible_objects, has_archived_versions, has_active_multipart_uploads). - Uses early exit when all three are found. - """ has_objects = False has_versions = False has_multipart = False - bucket_name = bucket_path.name for path in bucket_path.rglob("*"): if has_objects: diff --git a/app/version.py b/app/version.py index 00712b1..b14910a 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.2" +APP_VERSION = "0.3.3" def get_version() -> str: diff --git a/myfsio_core/src/lib.rs b/myfsio_core/src/lib.rs index fc1b9f3..3ff04f9 100644 --- a/myfsio_core/src/lib.rs +++ b/myfsio_core/src/lib.rs @@ -1,6 +1,7 @@ mod hashing; mod metadata; mod sigv4; +mod storage; mod validation; use pyo3::prelude::*; @@ -29,6 +30,14 @@ mod myfsio_core { m.add_function(wrap_pyfunction!(metadata::read_index_entry, m)?)?; + m.add_function(wrap_pyfunction!(storage::write_index_entry, m)?)?; + m.add_function(wrap_pyfunction!(storage::delete_index_entry, m)?)?; + m.add_function(wrap_pyfunction!(storage::check_bucket_contents, m)?)?; + m.add_function(wrap_pyfunction!(storage::shallow_scan, m)?)?; + m.add_function(wrap_pyfunction!(storage::bucket_stats_scan, m)?)?; + m.add_function(wrap_pyfunction!(storage::search_objects_scan, m)?)?; + m.add_function(wrap_pyfunction!(storage::build_object_cache, m)?)?; + Ok(()) } } diff --git a/myfsio_core/src/storage.rs b/myfsio_core/src/storage.rs new file mode 100644 index 0000000..cb80dae --- /dev/null +++ b/myfsio_core/src/storage.rs @@ -0,0 +1,817 @@ +use pyo3::exceptions::PyIOError; +use pyo3::prelude::*; +use pyo3::types::{PyDict, PyList, PyString, PyTuple}; +use serde_json::Value; +use std::collections::HashMap; +use std::fs; +use std::path::Path; +use std::time::SystemTime; + +const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"]; + +fn system_time_to_epoch(t: SystemTime) -> f64 { + t.duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0) +} + +fn extract_etag_from_meta_bytes(content: &[u8]) -> Option { + let marker = b"\"__etag__\""; + let idx = content.windows(marker.len()).position(|w| w == marker)?; + let after = &content[idx + marker.len()..]; + let start = after.iter().position(|&b| b == b'"')? + 1; + let rest = &after[start..]; + let end = rest.iter().position(|&b| b == b'"')?; + std::str::from_utf8(&rest[..end]).ok().map(|s| s.to_owned()) +} + +fn has_any_file(root: &str) -> bool { + let root_path = Path::new(root); + if !root_path.is_dir() { + return false; + } + let mut stack = vec![root_path.to_path_buf()]; + while let Some(current) = stack.pop() { + let entries = match fs::read_dir(¤t) { + Ok(e) => e, + Err(_) => continue, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_file() { + return true; + } + if ft.is_dir() && !ft.is_symlink() { + stack.push(entry.path()); + } + } + } + false +} + +#[pyfunction] +pub fn write_index_entry( + py: Python<'_>, + path: &str, + entry_name: &str, + entry_data_json: &str, +) -> PyResult<()> { + let path_owned = path.to_owned(); + let entry_owned = entry_name.to_owned(); + let data_owned = entry_data_json.to_owned(); + + py.detach(move || -> PyResult<()> { + let entry_value: Value = serde_json::from_str(&data_owned) + .map_err(|e| PyIOError::new_err(format!("Failed to parse entry data: {}", e)))?; + + if let Some(parent) = Path::new(&path_owned).parent() { + let _ = fs::create_dir_all(parent); + } + + let mut index_data: serde_json::Map = match fs::read_to_string(&path_owned) + { + Ok(content) => serde_json::from_str(&content).unwrap_or_default(), + Err(_) => serde_json::Map::new(), + }; + + index_data.insert(entry_owned, entry_value); + + let serialized = serde_json::to_string(&Value::Object(index_data)) + .map_err(|e| PyIOError::new_err(format!("Failed to serialize index: {}", e)))?; + + fs::write(&path_owned, serialized) + .map_err(|e| PyIOError::new_err(format!("Failed to write index: {}", e)))?; + + Ok(()) + }) +} + +#[pyfunction] +pub fn delete_index_entry(py: Python<'_>, path: &str, entry_name: &str) -> PyResult { + let path_owned = path.to_owned(); + let entry_owned = entry_name.to_owned(); + + py.detach(move || -> PyResult { + let content = match fs::read_to_string(&path_owned) { + Ok(c) => c, + Err(_) => return Ok(false), + }; + + let mut index_data: serde_json::Map = + match serde_json::from_str(&content) { + Ok(v) => v, + Err(_) => return Ok(false), + }; + + if index_data.remove(&entry_owned).is_none() { + return Ok(false); + } + + if index_data.is_empty() { + let _ = fs::remove_file(&path_owned); + return Ok(true); + } + + let serialized = serde_json::to_string(&Value::Object(index_data)) + .map_err(|e| PyIOError::new_err(format!("Failed to serialize index: {}", e)))?; + + fs::write(&path_owned, serialized) + .map_err(|e| PyIOError::new_err(format!("Failed to write index: {}", e)))?; + + Ok(false) + }) +} + +#[pyfunction] +pub fn check_bucket_contents( + py: Python<'_>, + bucket_path: &str, + version_roots: Vec, + multipart_roots: Vec, +) -> PyResult<(bool, bool, bool)> { + let bucket_owned = bucket_path.to_owned(); + + py.detach(move || -> PyResult<(bool, bool, bool)> { + let mut has_objects = false; + let bucket_p = Path::new(&bucket_owned); + if bucket_p.is_dir() { + let mut stack = vec![bucket_p.to_path_buf()]; + 'obj_scan: while let Some(current) = stack.pop() { + let is_root = current == bucket_p; + let entries = match fs::read_dir(¤t) { + Ok(e) => e, + Err(_) => continue, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if is_root { + if let Some(name) = entry.file_name().to_str() { + if INTERNAL_FOLDERS.contains(&name) { + continue; + } + } + } + if ft.is_file() && !ft.is_symlink() { + has_objects = true; + break 'obj_scan; + } + if ft.is_dir() && !ft.is_symlink() { + stack.push(entry.path()); + } + } + } + } + + let mut has_versions = false; + for root in &version_roots { + if has_versions { + break; + } + has_versions = has_any_file(root); + } + + let mut has_multipart = false; + for root in &multipart_roots { + if has_multipart { + break; + } + has_multipart = has_any_file(root); + } + + Ok((has_objects, has_versions, has_multipart)) + }) +} + +#[pyfunction] +pub fn shallow_scan( + py: Python<'_>, + target_dir: &str, + prefix: &str, + meta_cache_json: &str, +) -> PyResult> { + let target_owned = target_dir.to_owned(); + let prefix_owned = prefix.to_owned(); + let cache_owned = meta_cache_json.to_owned(); + + let result: ( + Vec<(String, u64, f64, Option)>, + Vec, + Vec<(String, bool)>, + ) = py.detach(move || -> PyResult<( + Vec<(String, u64, f64, Option)>, + Vec, + Vec<(String, bool)>, + )> { + let meta_cache: HashMap = + serde_json::from_str(&cache_owned).unwrap_or_default(); + + let mut files: Vec<(String, u64, f64, Option)> = Vec::new(); + let mut dirs: Vec = Vec::new(); + + let entries = match fs::read_dir(&target_owned) { + Ok(e) => e, + Err(_) => return Ok((files, dirs, Vec::new())), + }; + + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let name = match entry.file_name().into_string() { + Ok(n) => n, + Err(_) => continue, + }; + if INTERNAL_FOLDERS.contains(&name.as_str()) { + continue; + } + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + let cp = format!("{}{}/", prefix_owned, name); + dirs.push(cp); + } else if ft.is_file() && !ft.is_symlink() { + let key = format!("{}{}", prefix_owned, name); + let md = match entry.metadata() { + Ok(m) => m, + Err(_) => continue, + }; + let size = md.len(); + let mtime = md + .modified() + .map(system_time_to_epoch) + .unwrap_or(0.0); + let etag = meta_cache.get(&key).cloned(); + files.push((key, size, mtime, etag)); + } + } + + files.sort_by(|a, b| a.0.cmp(&b.0)); + dirs.sort(); + + let mut merged: Vec<(String, bool)> = Vec::with_capacity(files.len() + dirs.len()); + let mut fi = 0; + let mut di = 0; + while fi < files.len() && di < dirs.len() { + if files[fi].0 <= dirs[di] { + merged.push((files[fi].0.clone(), false)); + fi += 1; + } else { + merged.push((dirs[di].clone(), true)); + di += 1; + } + } + while fi < files.len() { + merged.push((files[fi].0.clone(), false)); + fi += 1; + } + while di < dirs.len() { + merged.push((dirs[di].clone(), true)); + di += 1; + } + + Ok((files, dirs, merged)) + })?; + + let (files, dirs, merged) = result; + + let dict = PyDict::new(py); + + let files_list = PyList::empty(py); + for (key, size, mtime, etag) in &files { + let etag_py: Py = match etag { + Some(e) => PyString::new(py, e).into_any().unbind(), + None => py.None(), + }; + let tuple = PyTuple::new(py, &[ + PyString::new(py, key).into_any().unbind(), + size.into_pyobject(py)?.into_any().unbind(), + mtime.into_pyobject(py)?.into_any().unbind(), + etag_py, + ])?; + files_list.append(tuple)?; + } + dict.set_item("files", files_list)?; + + let dirs_list = PyList::empty(py); + for d in &dirs { + dirs_list.append(PyString::new(py, d))?; + } + dict.set_item("dirs", dirs_list)?; + + let merged_list = PyList::empty(py); + for (key, is_dir) in &merged { + let bool_obj: Py = if *is_dir { + true.into_pyobject(py)?.to_owned().into_any().unbind() + } else { + false.into_pyobject(py)?.to_owned().into_any().unbind() + }; + let tuple = PyTuple::new(py, &[ + PyString::new(py, key).into_any().unbind(), + bool_obj, + ])?; + merged_list.append(tuple)?; + } + dict.set_item("merged_keys", merged_list)?; + + Ok(dict.into_any().unbind()) +} + +#[pyfunction] +pub fn bucket_stats_scan( + py: Python<'_>, + bucket_path: &str, + versions_root: &str, +) -> PyResult<(u64, u64, u64, u64)> { + let bucket_owned = bucket_path.to_owned(); + let versions_owned = versions_root.to_owned(); + + py.detach(move || -> PyResult<(u64, u64, u64, u64)> { + let mut object_count: u64 = 0; + let mut total_bytes: u64 = 0; + + let bucket_p = Path::new(&bucket_owned); + if bucket_p.is_dir() { + let mut stack = vec![bucket_p.to_path_buf()]; + while let Some(current) = stack.pop() { + let is_root = current == bucket_p; + let entries = match fs::read_dir(¤t) { + Ok(e) => e, + Err(_) => continue, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + if is_root { + if let Some(name) = entry.file_name().to_str() { + if INTERNAL_FOLDERS.contains(&name) { + continue; + } + } + } + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + stack.push(entry.path()); + } else if ft.is_file() && !ft.is_symlink() { + object_count += 1; + if let Ok(md) = entry.metadata() { + total_bytes += md.len(); + } + } + } + } + } + + let mut version_count: u64 = 0; + let mut version_bytes: u64 = 0; + + let versions_p = Path::new(&versions_owned); + if versions_p.is_dir() { + let mut stack = vec![versions_p.to_path_buf()]; + while let Some(current) = stack.pop() { + let entries = match fs::read_dir(¤t) { + Ok(e) => e, + Err(_) => continue, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + stack.push(entry.path()); + } else if ft.is_file() && !ft.is_symlink() { + if let Some(name) = entry.file_name().to_str() { + if name.ends_with(".bin") { + version_count += 1; + if let Ok(md) = entry.metadata() { + version_bytes += md.len(); + } + } + } + } + } + } + } + + Ok((object_count, total_bytes, version_count, version_bytes)) + }) +} + +#[pyfunction] +#[pyo3(signature = (bucket_path, search_root, query, limit))] +pub fn search_objects_scan( + py: Python<'_>, + bucket_path: &str, + search_root: &str, + query: &str, + limit: usize, +) -> PyResult> { + let bucket_owned = bucket_path.to_owned(); + let search_owned = search_root.to_owned(); + let query_owned = query.to_owned(); + + let result: (Vec<(String, u64, f64)>, bool) = py.detach( + move || -> PyResult<(Vec<(String, u64, f64)>, bool)> { + let query_lower = query_owned.to_lowercase(); + let bucket_len = bucket_owned.len() + 1; + let scan_limit = limit * 4; + let mut matched: usize = 0; + let mut results: Vec<(String, u64, f64)> = Vec::new(); + + let search_p = Path::new(&search_owned); + if !search_p.is_dir() { + return Ok((results, false)); + } + + let bucket_p = Path::new(&bucket_owned); + let mut stack = vec![search_p.to_path_buf()]; + + 'scan: while let Some(current) = stack.pop() { + let is_bucket_root = current == bucket_p; + let entries = match fs::read_dir(¤t) { + Ok(e) => e, + Err(_) => continue, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + if is_bucket_root { + if let Some(name) = entry.file_name().to_str() { + if INTERNAL_FOLDERS.contains(&name) { + continue; + } + } + } + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + stack.push(entry.path()); + } else if ft.is_file() && !ft.is_symlink() { + let full_path = entry.path(); + let full_str = full_path.to_string_lossy(); + if full_str.len() <= bucket_len { + continue; + } + let key = full_str[bucket_len..].replace('\\', "/"); + if key.to_lowercase().contains(&query_lower) { + if let Ok(md) = entry.metadata() { + let size = md.len(); + let mtime = md + .modified() + .map(system_time_to_epoch) + .unwrap_or(0.0); + results.push((key, size, mtime)); + matched += 1; + } + } + if matched >= scan_limit { + break 'scan; + } + } + } + } + + results.sort_by(|a, b| a.0.cmp(&b.0)); + let truncated = results.len() > limit; + results.truncate(limit); + + Ok((results, truncated)) + }, + )?; + + let (results, truncated) = result; + + let dict = PyDict::new(py); + + let results_list = PyList::empty(py); + for (key, size, mtime) in &results { + let tuple = PyTuple::new(py, &[ + PyString::new(py, key).into_any().unbind(), + size.into_pyobject(py)?.into_any().unbind(), + mtime.into_pyobject(py)?.into_any().unbind(), + ])?; + results_list.append(tuple)?; + } + dict.set_item("results", results_list)?; + dict.set_item("truncated", truncated)?; + + Ok(dict.into_any().unbind()) +} + +#[pyfunction] +pub fn build_object_cache( + py: Python<'_>, + bucket_path: &str, + meta_root: &str, + etag_index_path: &str, +) -> PyResult> { + let bucket_owned = bucket_path.to_owned(); + let meta_owned = meta_root.to_owned(); + let index_path_owned = etag_index_path.to_owned(); + + let result: (HashMap, Vec<(String, u64, f64, Option)>, bool) = + py.detach(move || -> PyResult<( + HashMap, + Vec<(String, u64, f64, Option)>, + bool, + )> { + let mut meta_cache: HashMap = HashMap::new(); + let mut index_mtime: f64 = 0.0; + let mut etag_cache_changed = false; + + let index_p = Path::new(&index_path_owned); + if index_p.is_file() { + if let Ok(md) = fs::metadata(&index_path_owned) { + index_mtime = md + .modified() + .map(system_time_to_epoch) + .unwrap_or(0.0); + } + if let Ok(content) = fs::read_to_string(&index_path_owned) { + if let Ok(parsed) = serde_json::from_str::>(&content) { + meta_cache = parsed; + } + } + } + + let meta_p = Path::new(&meta_owned); + let mut needs_rebuild = false; + + if meta_p.is_dir() && index_mtime > 0.0 { + fn check_newer(dir: &Path, index_mtime: f64) -> bool { + let entries = match fs::read_dir(dir) { + Ok(e) => e, + Err(_) => return false, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + if check_newer(&entry.path(), index_mtime) { + return true; + } + } else if ft.is_file() { + if let Some(name) = entry.file_name().to_str() { + if name.ends_with(".meta.json") || name == "_index.json" { + if let Ok(md) = entry.metadata() { + let mt = md + .modified() + .map(system_time_to_epoch) + .unwrap_or(0.0); + if mt > index_mtime { + return true; + } + } + } + } + } + } + false + } + needs_rebuild = check_newer(meta_p, index_mtime); + } else if meta_cache.is_empty() { + needs_rebuild = true; + } + + if needs_rebuild && meta_p.is_dir() { + let meta_str = meta_owned.clone(); + let meta_len = meta_str.len() + 1; + let mut index_files: Vec = Vec::new(); + let mut legacy_meta_files: Vec<(String, String)> = Vec::new(); + + fn collect_meta( + dir: &Path, + meta_len: usize, + index_files: &mut Vec, + legacy_meta_files: &mut Vec<(String, String)>, + ) { + let entries = match fs::read_dir(dir) { + Ok(e) => e, + Err(_) => return, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + collect_meta(&entry.path(), meta_len, index_files, legacy_meta_files); + } else if ft.is_file() { + if let Some(name) = entry.file_name().to_str() { + let full = entry.path().to_string_lossy().to_string(); + if name == "_index.json" { + index_files.push(full); + } else if name.ends_with(".meta.json") { + if full.len() > meta_len { + let rel = &full[meta_len..]; + let key = if rel.len() > 10 { + rel[..rel.len() - 10].replace('\\', "/") + } else { + continue; + }; + legacy_meta_files.push((key, full)); + } + } + } + } + } + } + + collect_meta( + meta_p, + meta_len, + &mut index_files, + &mut legacy_meta_files, + ); + + meta_cache.clear(); + + for idx_path in &index_files { + if let Ok(content) = fs::read_to_string(idx_path) { + if let Ok(idx_data) = serde_json::from_str::>(&content) { + let rel_dir = if idx_path.len() > meta_len { + let r = &idx_path[meta_len..]; + r.replace('\\', "/") + } else { + String::new() + }; + let dir_prefix = if rel_dir.ends_with("/_index.json") { + &rel_dir[..rel_dir.len() - "/_index.json".len()] + } else { + "" + }; + for (entry_name, entry_data) in &idx_data { + let key = if dir_prefix.is_empty() { + entry_name.clone() + } else { + format!("{}/{}", dir_prefix, entry_name) + }; + if let Some(meta_obj) = entry_data.get("metadata") { + if let Some(etag) = meta_obj.get("__etag__") { + if let Some(etag_str) = etag.as_str() { + meta_cache.insert(key, etag_str.to_owned()); + } + } + } + } + } + } + } + + for (key, path) in &legacy_meta_files { + if meta_cache.contains_key(key) { + continue; + } + if let Ok(content) = fs::read(path) { + if let Some(etag) = extract_etag_from_meta_bytes(&content) { + meta_cache.insert(key.clone(), etag); + } + } + } + + etag_cache_changed = true; + } + + let bucket_p = Path::new(&bucket_owned); + let bucket_len = bucket_owned.len() + 1; + let mut objects: Vec<(String, u64, f64, Option)> = Vec::new(); + + if bucket_p.is_dir() { + let mut stack = vec![bucket_p.to_path_buf()]; + while let Some(current) = stack.pop() { + let entries = match fs::read_dir(¤t) { + Ok(e) => e, + Err(_) => continue, + }; + for entry_result in entries { + let entry = match entry_result { + Ok(e) => e, + Err(_) => continue, + }; + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() && !ft.is_symlink() { + let full = entry.path(); + let full_str = full.to_string_lossy(); + if full_str.len() > bucket_len { + let first_part: &str = if let Some(sep_pos) = + full_str[bucket_len..].find(|c: char| c == '\\' || c == '/') + { + &full_str[bucket_len..bucket_len + sep_pos] + } else { + &full_str[bucket_len..] + }; + if INTERNAL_FOLDERS.contains(&first_part) { + continue; + } + } else if let Some(name) = entry.file_name().to_str() { + if INTERNAL_FOLDERS.contains(&name) { + continue; + } + } + stack.push(full); + } else if ft.is_file() && !ft.is_symlink() { + let full = entry.path(); + let full_str = full.to_string_lossy(); + if full_str.len() <= bucket_len { + continue; + } + let rel = &full_str[bucket_len..]; + let first_part: &str = + if let Some(sep_pos) = rel.find(|c: char| c == '\\' || c == '/') { + &rel[..sep_pos] + } else { + rel + }; + if INTERNAL_FOLDERS.contains(&first_part) { + continue; + } + let key = rel.replace('\\', "/"); + if let Ok(md) = entry.metadata() { + let size = md.len(); + let mtime = md + .modified() + .map(system_time_to_epoch) + .unwrap_or(0.0); + let etag = meta_cache.get(&key).cloned(); + objects.push((key, size, mtime, etag)); + } + } + } + } + } + + Ok((meta_cache, objects, etag_cache_changed)) + })?; + + let (meta_cache, objects, etag_cache_changed) = result; + + let dict = PyDict::new(py); + + let cache_dict = PyDict::new(py); + for (k, v) in &meta_cache { + cache_dict.set_item(k, v)?; + } + dict.set_item("etag_cache", cache_dict)?; + + let objects_list = PyList::empty(py); + for (key, size, mtime, etag) in &objects { + let etag_py: Py = match etag { + Some(e) => PyString::new(py, e).into_any().unbind(), + None => py.None(), + }; + let tuple = PyTuple::new(py, &[ + PyString::new(py, key).into_any().unbind(), + size.into_pyobject(py)?.into_any().unbind(), + mtime.into_pyobject(py)?.into_any().unbind(), + etag_py, + ])?; + objects_list.append(tuple)?; + } + dict.set_item("objects", objects_list)?; + dict.set_item("etag_cache_changed", etag_cache_changed)?; + + Ok(dict.into_any().unbind()) +}