Further optimize CPU usage; Improve security and performance; 4 bug fixes.
This commit is contained in:
192
app/storage.py
192
app/storage.py
@@ -186,6 +186,7 @@ class ObjectStorage:
|
||||
self._cache_ttl = cache_ttl
|
||||
self._object_cache_max_size = object_cache_max_size
|
||||
self._object_key_max_length_bytes = object_key_max_length_bytes
|
||||
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
"""Get or create a lock for a specific bucket. Reduces global lock contention."""
|
||||
@@ -243,10 +244,15 @@ class ObjectStorage:
|
||||
raise BucketNotFoundError("Bucket does not exist")
|
||||
|
||||
cache_path = self._system_bucket_root(bucket_name) / "stats.json"
|
||||
cached_stats = None
|
||||
cache_fresh = False
|
||||
|
||||
if cache_path.exists():
|
||||
try:
|
||||
if time.time() - cache_path.stat().st_mtime < cache_ttl:
|
||||
return json.loads(cache_path.read_text(encoding="utf-8"))
|
||||
cache_fresh = time.time() - cache_path.stat().st_mtime < cache_ttl
|
||||
cached_stats = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||
if cache_fresh:
|
||||
return cached_stats
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
@@ -255,40 +261,45 @@ class ObjectStorage:
|
||||
version_count = 0
|
||||
version_bytes = 0
|
||||
|
||||
for path in bucket_path.rglob("*"):
|
||||
if path.is_file():
|
||||
rel = path.relative_to(bucket_path)
|
||||
if not rel.parts:
|
||||
continue
|
||||
top_folder = rel.parts[0]
|
||||
if top_folder not in self.INTERNAL_FOLDERS:
|
||||
stat = path.stat()
|
||||
object_count += 1
|
||||
total_bytes += stat.st_size
|
||||
|
||||
versions_root = self._bucket_versions_root(bucket_name)
|
||||
if versions_root.exists():
|
||||
for path in versions_root.rglob("*.bin"):
|
||||
try:
|
||||
for path in bucket_path.rglob("*"):
|
||||
if path.is_file():
|
||||
stat = path.stat()
|
||||
version_count += 1
|
||||
version_bytes += stat.st_size
|
||||
|
||||
rel = path.relative_to(bucket_path)
|
||||
if not rel.parts:
|
||||
continue
|
||||
top_folder = rel.parts[0]
|
||||
if top_folder not in self.INTERNAL_FOLDERS:
|
||||
stat = path.stat()
|
||||
object_count += 1
|
||||
total_bytes += stat.st_size
|
||||
|
||||
versions_root = self._bucket_versions_root(bucket_name)
|
||||
if versions_root.exists():
|
||||
for path in versions_root.rglob("*.bin"):
|
||||
if path.is_file():
|
||||
stat = path.stat()
|
||||
version_count += 1
|
||||
version_bytes += stat.st_size
|
||||
except OSError:
|
||||
if cached_stats is not None:
|
||||
return cached_stats
|
||||
raise
|
||||
|
||||
stats = {
|
||||
"objects": object_count,
|
||||
"bytes": total_bytes,
|
||||
"version_count": version_count,
|
||||
"version_bytes": version_bytes,
|
||||
"total_objects": object_count + version_count,
|
||||
"total_bytes": total_bytes + version_bytes,
|
||||
"total_bytes": total_bytes + version_bytes,
|
||||
}
|
||||
|
||||
|
||||
try:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_text(json.dumps(stats), encoding="utf-8")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
return stats
|
||||
|
||||
def _invalidate_bucket_stats_cache(self, bucket_id: str) -> None:
|
||||
@@ -299,6 +310,34 @@ class ObjectStorage:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _update_bucket_stats_cache(
|
||||
self,
|
||||
bucket_id: str,
|
||||
*,
|
||||
bytes_delta: int = 0,
|
||||
objects_delta: int = 0,
|
||||
version_bytes_delta: int = 0,
|
||||
version_count_delta: int = 0,
|
||||
) -> None:
|
||||
"""Incrementally update cached bucket statistics instead of invalidating.
|
||||
|
||||
This avoids expensive full directory scans on every PUT/DELETE by
|
||||
adjusting the cached values directly.
|
||||
"""
|
||||
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||
try:
|
||||
if cache_path.exists():
|
||||
data = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||
data["objects"] = max(0, data.get("objects", 0) + objects_delta)
|
||||
data["bytes"] = max(0, data.get("bytes", 0) + bytes_delta)
|
||||
data["version_count"] = max(0, data.get("version_count", 0) + version_count_delta)
|
||||
data["version_bytes"] = max(0, data.get("version_bytes", 0) + version_bytes_delta)
|
||||
data["total_objects"] = max(0, data.get("total_objects", 0) + objects_delta + version_count_delta)
|
||||
data["total_bytes"] = max(0, data.get("total_bytes", 0) + bytes_delta + version_bytes_delta)
|
||||
cache_path.write_text(json.dumps(data), encoding="utf-8")
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
def delete_bucket(self, bucket_name: str) -> None:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.exists():
|
||||
@@ -333,22 +372,35 @@ class ObjectStorage:
|
||||
Returns:
|
||||
ListObjectsResult with objects, truncation status, and continuation token
|
||||
"""
|
||||
import bisect
|
||||
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.exists():
|
||||
raise BucketNotFoundError("Bucket does not exist")
|
||||
bucket_id = bucket_path.name
|
||||
|
||||
object_cache = self._get_object_cache(bucket_id, bucket_path)
|
||||
|
||||
all_keys = sorted(object_cache.keys())
|
||||
|
||||
|
||||
cache_version = self._cache_version.get(bucket_id, 0)
|
||||
cached_entry = self._sorted_key_cache.get(bucket_id)
|
||||
if cached_entry and cached_entry[1] == cache_version:
|
||||
all_keys = cached_entry[0]
|
||||
else:
|
||||
all_keys = sorted(object_cache.keys())
|
||||
self._sorted_key_cache[bucket_id] = (all_keys, cache_version)
|
||||
|
||||
if prefix:
|
||||
all_keys = [k for k in all_keys if k.startswith(prefix)]
|
||||
|
||||
lo = bisect.bisect_left(all_keys, prefix)
|
||||
hi = len(all_keys)
|
||||
for i in range(lo, len(all_keys)):
|
||||
if not all_keys[i].startswith(prefix):
|
||||
hi = i
|
||||
break
|
||||
all_keys = all_keys[lo:hi]
|
||||
|
||||
total_count = len(all_keys)
|
||||
start_index = 0
|
||||
if continuation_token:
|
||||
import bisect
|
||||
start_index = bisect.bisect_right(all_keys, continuation_token)
|
||||
if start_index >= total_count:
|
||||
return ListObjectsResult(
|
||||
@@ -356,8 +408,8 @@ class ObjectStorage:
|
||||
is_truncated=False,
|
||||
next_continuation_token=None,
|
||||
total_count=total_count,
|
||||
)
|
||||
|
||||
)
|
||||
|
||||
end_index = start_index + max_keys
|
||||
keys_slice = all_keys[start_index:end_index]
|
||||
is_truncated = end_index < total_count
|
||||
@@ -403,7 +455,9 @@ class ObjectStorage:
|
||||
is_overwrite = destination.exists()
|
||||
existing_size = destination.stat().st_size if is_overwrite else 0
|
||||
|
||||
archived_version_size = 0
|
||||
if self._is_versioning_enabled(bucket_path) and is_overwrite:
|
||||
archived_version_size = existing_size
|
||||
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
|
||||
|
||||
tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR
|
||||
@@ -416,11 +470,10 @@ class ObjectStorage:
|
||||
shutil.copyfileobj(_HashingReader(stream, checksum), target)
|
||||
|
||||
new_size = tmp_path.stat().st_size
|
||||
|
||||
size_delta = new_size - existing_size
|
||||
object_delta = 0 if is_overwrite else 1
|
||||
|
||||
if enforce_quota:
|
||||
size_delta = new_size - existing_size
|
||||
object_delta = 0 if is_overwrite else 1
|
||||
|
||||
quota_check = self.check_quota(
|
||||
bucket_name,
|
||||
additional_bytes=max(0, size_delta),
|
||||
@@ -432,7 +485,7 @@ class ObjectStorage:
|
||||
quota_check["quota"],
|
||||
quota_check["usage"],
|
||||
)
|
||||
|
||||
|
||||
shutil.move(str(tmp_path), str(destination))
|
||||
|
||||
finally:
|
||||
@@ -448,7 +501,13 @@ class ObjectStorage:
|
||||
combined_meta = {**internal_meta, **(metadata or {})}
|
||||
self._write_metadata(bucket_id, safe_key, combined_meta)
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._update_bucket_stats_cache(
|
||||
bucket_id,
|
||||
bytes_delta=size_delta,
|
||||
objects_delta=object_delta,
|
||||
version_bytes_delta=archived_version_size,
|
||||
version_count_delta=1 if archived_version_size > 0 else 0,
|
||||
)
|
||||
|
||||
obj_meta = ObjectMeta(
|
||||
key=safe_key.as_posix(),
|
||||
@@ -498,15 +557,24 @@ class ObjectStorage:
|
||||
path = self._object_path(bucket_name, object_key)
|
||||
if not path.exists():
|
||||
return
|
||||
deleted_size = path.stat().st_size
|
||||
safe_key = path.relative_to(bucket_path)
|
||||
bucket_id = bucket_path.name
|
||||
archived_version_size = 0
|
||||
if self._is_versioning_enabled(bucket_path):
|
||||
archived_version_size = deleted_size
|
||||
self._archive_current_version(bucket_id, safe_key, reason="delete")
|
||||
rel = path.relative_to(bucket_path)
|
||||
self._safe_unlink(path)
|
||||
self._delete_metadata(bucket_id, rel)
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._update_bucket_stats_cache(
|
||||
bucket_id,
|
||||
bytes_delta=-deleted_size,
|
||||
objects_delta=-1,
|
||||
version_bytes_delta=archived_version_size,
|
||||
version_count_delta=1 if archived_version_size > 0 else 0,
|
||||
)
|
||||
self._update_object_cache_entry(bucket_id, safe_key.as_posix(), None)
|
||||
self._cleanup_empty_parents(path, bucket_path)
|
||||
|
||||
@@ -828,7 +896,12 @@ class ObjectStorage:
|
||||
if not isinstance(metadata, dict):
|
||||
metadata = {}
|
||||
destination = bucket_path / safe_key
|
||||
if self._is_versioning_enabled(bucket_path) and destination.exists():
|
||||
restored_size = data_path.stat().st_size
|
||||
is_overwrite = destination.exists()
|
||||
existing_size = destination.stat().st_size if is_overwrite else 0
|
||||
archived_version_size = 0
|
||||
if self._is_versioning_enabled(bucket_path) and is_overwrite:
|
||||
archived_version_size = existing_size
|
||||
self._archive_current_version(bucket_id, safe_key, reason="restore-overwrite")
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(data_path, destination)
|
||||
@@ -837,7 +910,13 @@ class ObjectStorage:
|
||||
else:
|
||||
self._delete_metadata(bucket_id, safe_key)
|
||||
stat = destination.stat()
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._update_bucket_stats_cache(
|
||||
bucket_id,
|
||||
bytes_delta=restored_size - existing_size,
|
||||
objects_delta=0 if is_overwrite else 1,
|
||||
version_bytes_delta=archived_version_size,
|
||||
version_count_delta=1 if archived_version_size > 0 else 0,
|
||||
)
|
||||
return ObjectMeta(
|
||||
key=safe_key.as_posix(),
|
||||
size=stat.st_size,
|
||||
@@ -861,6 +940,7 @@ class ObjectStorage:
|
||||
meta_path = legacy_version_dir / f"{version_id}.json"
|
||||
if not data_path.exists() and not meta_path.exists():
|
||||
raise StorageError(f"Version {version_id} not found")
|
||||
deleted_version_size = data_path.stat().st_size if data_path.exists() else 0
|
||||
if data_path.exists():
|
||||
data_path.unlink()
|
||||
if meta_path.exists():
|
||||
@@ -868,6 +948,12 @@ class ObjectStorage:
|
||||
parent = data_path.parent
|
||||
if parent.exists() and not any(parent.iterdir()):
|
||||
parent.rmdir()
|
||||
if deleted_version_size > 0:
|
||||
self._update_bucket_stats_cache(
|
||||
bucket_id,
|
||||
version_bytes_delta=-deleted_version_size,
|
||||
version_count_delta=-1,
|
||||
)
|
||||
|
||||
def list_orphaned_objects(self, bucket_name: str) -> List[Dict[str, Any]]:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
@@ -1164,14 +1250,14 @@ class ObjectStorage:
|
||||
|
||||
safe_key = self._sanitize_object_key(manifest["object_key"], self._object_key_max_length_bytes)
|
||||
destination = bucket_path / safe_key
|
||||
|
||||
|
||||
is_overwrite = destination.exists()
|
||||
existing_size = destination.stat().st_size if is_overwrite else 0
|
||||
|
||||
size_delta = total_size - existing_size
|
||||
object_delta = 0 if is_overwrite else 1
|
||||
versioning_enabled = self._is_versioning_enabled(bucket_path)
|
||||
|
||||
if enforce_quota:
|
||||
size_delta = total_size - existing_size
|
||||
object_delta = 0 if is_overwrite else 1
|
||||
|
||||
quota_check = self.check_quota(
|
||||
bucket_name,
|
||||
additional_bytes=max(0, size_delta),
|
||||
@@ -1183,14 +1269,16 @@ class ObjectStorage:
|
||||
quota_check["quota"],
|
||||
quota_check["usage"],
|
||||
)
|
||||
|
||||
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
lock_file_path = self._system_bucket_root(bucket_id) / "locks" / f"{safe_key.as_posix().replace('/', '_')}.lock"
|
||||
|
||||
archived_version_size = 0
|
||||
try:
|
||||
with _atomic_lock_file(lock_file_path):
|
||||
if self._is_versioning_enabled(bucket_path) and destination.exists():
|
||||
if versioning_enabled and destination.exists():
|
||||
archived_version_size = destination.stat().st_size
|
||||
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
|
||||
checksum = hashlib.md5()
|
||||
with destination.open("wb") as target:
|
||||
@@ -1210,7 +1298,13 @@ class ObjectStorage:
|
||||
|
||||
shutil.rmtree(upload_root, ignore_errors=True)
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._update_bucket_stats_cache(
|
||||
bucket_id,
|
||||
bytes_delta=size_delta,
|
||||
objects_delta=object_delta,
|
||||
version_bytes_delta=archived_version_size,
|
||||
version_count_delta=1 if archived_version_size > 0 else 0,
|
||||
)
|
||||
|
||||
stat = destination.stat()
|
||||
etag = checksum.hexdigest()
|
||||
@@ -1586,6 +1680,8 @@ class ObjectStorage:
|
||||
objects.pop(key, None)
|
||||
else:
|
||||
objects[key] = meta
|
||||
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||
self._sorted_key_cache.pop(bucket_id, None)
|
||||
|
||||
def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None:
|
||||
"""Pre-warm the object cache for specified buckets or all buckets.
|
||||
|
||||
Reference in New Issue
Block a user