From 8552f193de4816a07b52ef2085b7e1c5eba94bcc Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 2 Mar 2026 22:05:54 +0800 Subject: [PATCH 1/3] Reduce CPU/lock contention under concurrent uploads: split cache lock, in-memory stats, dict copy, lightweight request IDs, defaultdict metrics --- app/__init__.py | 8 ++- app/operation_metrics.py | 14 ++-- app/storage.py | 148 +++++++++++++++++++++------------------ app/version.py | 2 +- tests/conftest.py | 5 ++ 5 files changed, 97 insertions(+), 80 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 316e247..aea5d78 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,13 +1,13 @@ from __future__ import annotations import html as html_module +import itertools import logging import mimetypes import os import shutil import sys import time -import uuid from logging.handlers import RotatingFileHandler from pathlib import Path from datetime import timedelta @@ -39,6 +39,8 @@ from .storage import ObjectStorage, StorageError from .version import get_version from .website_domains import WebsiteDomainStore +_request_counter = itertools.count(1) + def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path: """Migrate config file from legacy locations to the active path. @@ -481,7 +483,7 @@ def _configure_logging(app: Flask) -> None: @app.before_request def _log_request_start() -> None: - g.request_id = uuid.uuid4().hex + g.request_id = f"{os.getpid():x}{next(_request_counter):012x}" g.request_started_at = time.perf_counter() g.request_bytes_in = request.content_length or 0 app.logger.info( @@ -616,7 +618,7 @@ def _configure_logging(app: Flask) -> None: duration_ms = 0.0 if hasattr(g, "request_started_at"): duration_ms = (time.perf_counter() - g.request_started_at) * 1000 - request_id = getattr(g, "request_id", uuid.uuid4().hex) + request_id = getattr(g, "request_id", f"{os.getpid():x}{next(_request_counter):012x}") response.headers.setdefault("X-Request-ID", request_id) app.logger.info( "Request completed", diff --git a/app/operation_metrics.py b/app/operation_metrics.py index 3a002e1..0917d8e 100644 --- a/app/operation_metrics.py +++ b/app/operation_metrics.py @@ -5,6 +5,7 @@ import logging import random import threading import time +from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path @@ -138,8 +139,8 @@ class OperationMetricsCollector: self.interval_seconds = interval_minutes * 60 self.retention_hours = retention_hours self._lock = threading.Lock() - self._by_method: Dict[str, OperationStats] = {} - self._by_endpoint: Dict[str, OperationStats] = {} + self._by_method: Dict[str, OperationStats] = defaultdict(OperationStats) + self._by_endpoint: Dict[str, OperationStats] = defaultdict(OperationStats) self._by_status_class: Dict[str, int] = {} self._error_codes: Dict[str, int] = {} self._totals = OperationStats() @@ -211,8 +212,8 @@ class OperationMetricsCollector: self._prune_old_snapshots() self._save_history() - self._by_method.clear() - self._by_endpoint.clear() + self._by_method = defaultdict(OperationStats) + self._by_endpoint = defaultdict(OperationStats) self._by_status_class.clear() self._error_codes.clear() self._totals = OperationStats() @@ -232,12 +233,7 @@ class OperationMetricsCollector: status_class = f"{status_code // 100}xx" with self._lock: - if method not in self._by_method: - self._by_method[method] = OperationStats() self._by_method[method].record(latency_ms, success, bytes_in, bytes_out) - - if endpoint_type not in self._by_endpoint: - self._by_endpoint[endpoint_type] = OperationStats() self._by_endpoint[endpoint_type].record(latency_ms, success, bytes_in, bytes_out) self._by_status_class[status_class] = self._by_status_class.get(status_class, 0) + 1 diff --git a/app/storage.py b/app/storage.py index 7296abb..f50961e 100644 --- a/app/storage.py +++ b/app/storage.py @@ -1,6 +1,5 @@ from __future__ import annotations -import copy import hashlib import json import os @@ -196,7 +195,9 @@ class ObjectStorage: self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float, float]] = OrderedDict() - self._cache_lock = threading.Lock() + self._obj_cache_lock = threading.Lock() + self._meta_cache_lock = threading.Lock() + self._registry_lock = threading.Lock() self._bucket_locks: Dict[str, threading.Lock] = {} self._cache_version: Dict[str, int] = {} self._bucket_config_cache: Dict[str, tuple[dict[str, Any], float]] = {} @@ -209,10 +210,14 @@ class ObjectStorage: self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict() self._meta_read_cache_max = 2048 self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") + self._stats_mem: Dict[str, Dict[str, int]] = {} + self._stats_serial: Dict[str, int] = {} + self._stats_lock = threading.Lock() + self._stats_dirty: set[str] = set() + self._stats_flush_timer: Optional[threading.Timer] = None def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: - """Get or create a lock for a specific bucket. Reduces global lock contention.""" - with self._cache_lock: + with self._registry_lock: if bucket_id not in self._bucket_locks: self._bucket_locks[bucket_id] = threading.Lock() return self._bucket_locks[bucket_id] @@ -260,26 +265,20 @@ class ObjectStorage: self._system_bucket_root(bucket_path.name).mkdir(parents=True, exist_ok=True) def bucket_stats(self, bucket_name: str, cache_ttl: int = 60) -> dict[str, int]: - """Return object count and total size for the bucket (cached). - - Args: - bucket_name: Name of the bucket - cache_ttl: Cache time-to-live in seconds (default 60) - """ bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") + with self._stats_lock: + if bucket_name in self._stats_mem: + return dict(self._stats_mem[bucket_name]) + cache_path = self._system_bucket_root(bucket_name) / "stats.json" cached_stats = None - cache_fresh = False if cache_path.exists(): try: - cache_fresh = time.time() - cache_path.stat().st_mtime < cache_ttl cached_stats = json.loads(cache_path.read_text(encoding="utf-8")) - if cache_fresh: - return cached_stats except (OSError, json.JSONDecodeError): pass @@ -348,6 +347,11 @@ class ObjectStorage: "_cache_serial": existing_serial, } + with self._stats_lock: + if bucket_name not in self._stats_mem: + self._stats_mem[bucket_name] = stats + self._stats_serial[bucket_name] = existing_serial + try: cache_path.parent.mkdir(parents=True, exist_ok=True) cache_path.write_text(json.dumps(stats), encoding="utf-8") @@ -357,7 +361,10 @@ class ObjectStorage: return stats def _invalidate_bucket_stats_cache(self, bucket_id: str) -> None: - """Invalidate the cached bucket statistics.""" + with self._stats_lock: + self._stats_mem.pop(bucket_id, None) + self._stats_serial[bucket_id] = self._stats_serial.get(bucket_id, 0) + 1 + self._stats_dirty.discard(bucket_id) cache_path = self._system_bucket_root(bucket_id) / "stats.json" try: cache_path.unlink(missing_ok=True) @@ -373,29 +380,48 @@ class ObjectStorage: version_bytes_delta: int = 0, version_count_delta: int = 0, ) -> None: - """Incrementally update cached bucket statistics instead of invalidating. + with self._stats_lock: + if bucket_id not in self._stats_mem: + self._stats_mem[bucket_id] = { + "objects": 0, "bytes": 0, "version_count": 0, + "version_bytes": 0, "total_objects": 0, "total_bytes": 0, + "_cache_serial": 0, + } + data = self._stats_mem[bucket_id] + data["objects"] = max(0, data["objects"] + objects_delta) + data["bytes"] = max(0, data["bytes"] + bytes_delta) + data["version_count"] = max(0, data["version_count"] + version_count_delta) + data["version_bytes"] = max(0, data["version_bytes"] + version_bytes_delta) + data["total_objects"] = max(0, data["total_objects"] + objects_delta + version_count_delta) + data["total_bytes"] = max(0, data["total_bytes"] + bytes_delta + version_bytes_delta) + data["_cache_serial"] = data["_cache_serial"] + 1 + self._stats_serial[bucket_id] = self._stats_serial.get(bucket_id, 0) + 1 + self._stats_dirty.add(bucket_id) + self._schedule_stats_flush() - This avoids expensive full directory scans on every PUT/DELETE by - adjusting the cached values directly. Also signals cross-process cache - invalidation by incrementing _cache_serial. - """ - cache_path = self._system_bucket_root(bucket_id) / "stats.json" - try: - cache_path.parent.mkdir(parents=True, exist_ok=True) - if cache_path.exists(): - data = json.loads(cache_path.read_text(encoding="utf-8")) - else: - data = {"objects": 0, "bytes": 0, "version_count": 0, "version_bytes": 0, "total_objects": 0, "total_bytes": 0, "_cache_serial": 0} - data["objects"] = max(0, data.get("objects", 0) + objects_delta) - data["bytes"] = max(0, data.get("bytes", 0) + bytes_delta) - data["version_count"] = max(0, data.get("version_count", 0) + version_count_delta) - data["version_bytes"] = max(0, data.get("version_bytes", 0) + version_bytes_delta) - data["total_objects"] = max(0, data.get("total_objects", 0) + objects_delta + version_count_delta) - data["total_bytes"] = max(0, data.get("total_bytes", 0) + bytes_delta + version_bytes_delta) - data["_cache_serial"] = data.get("_cache_serial", 0) + 1 - cache_path.write_text(json.dumps(data), encoding="utf-8") - except (OSError, json.JSONDecodeError): - pass + def _schedule_stats_flush(self) -> None: + if self._stats_flush_timer is None or not self._stats_flush_timer.is_alive(): + self._stats_flush_timer = threading.Timer(3.0, self._flush_stats) + self._stats_flush_timer.daemon = True + self._stats_flush_timer.start() + + def _flush_stats(self) -> None: + with self._stats_lock: + dirty = list(self._stats_dirty) + self._stats_dirty.clear() + snapshots = {b: dict(self._stats_mem[b]) for b in dirty if b in self._stats_mem} + for bucket_id, data in snapshots.items(): + cache_path = self._system_bucket_root(bucket_id) / "stats.json" + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_text(json.dumps(data), encoding="utf-8") + except OSError: + pass + + def shutdown_stats(self) -> None: + if self._stats_flush_timer is not None: + self._stats_flush_timer.cancel() + self._flush_stats() def delete_bucket(self, bucket_name: str) -> None: bucket_path = self._bucket_path(bucket_name) @@ -413,13 +439,18 @@ class ObjectStorage: self._remove_tree(self._system_bucket_root(bucket_id)) self._remove_tree(self._multipart_bucket_root(bucket_id)) self._bucket_config_cache.pop(bucket_id, None) - with self._cache_lock: + with self._obj_cache_lock: self._object_cache.pop(bucket_id, None) self._cache_version.pop(bucket_id, None) self._sorted_key_cache.pop(bucket_id, None) + with self._meta_cache_lock: stale = [k for k in self._meta_read_cache if k[0] == bucket_id] for k in stale: del self._meta_read_cache[k] + with self._stats_lock: + self._stats_mem.pop(bucket_id, None) + self._stats_serial.pop(bucket_id, None) + self._stats_dirty.discard(bucket_id) def list_objects( self, @@ -2131,7 +2162,7 @@ class ObjectStorage: now = time.time() current_stats_mtime = self._get_cache_marker_mtime(bucket_id) - with self._cache_lock: + with self._obj_cache_lock: cached = self._object_cache.get(bucket_id) if cached: objects, timestamp, cached_stats_mtime = cached @@ -2143,7 +2174,7 @@ class ObjectStorage: bucket_lock = self._get_bucket_lock(bucket_id) with bucket_lock: current_stats_mtime = self._get_cache_marker_mtime(bucket_id) - with self._cache_lock: + with self._obj_cache_lock: cached = self._object_cache.get(bucket_id) if cached: objects, timestamp, cached_stats_mtime = cached @@ -2154,7 +2185,7 @@ class ObjectStorage: objects = self._build_object_cache(bucket_path) new_stats_mtime = self._get_cache_marker_mtime(bucket_id) - with self._cache_lock: + with self._obj_cache_lock: current_version = self._cache_version.get(bucket_id, 0) if current_version != cache_version: objects = self._build_object_cache(bucket_path) @@ -2170,12 +2201,7 @@ class ObjectStorage: return objects def _invalidate_object_cache(self, bucket_id: str) -> None: - """Invalidate the object cache and etag index for a bucket. - - Increments version counter to signal stale reads. - Cross-process invalidation is handled by checking stats.json mtime. - """ - with self._cache_lock: + with self._obj_cache_lock: self._object_cache.pop(bucket_id, None) self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 @@ -2186,22 +2212,10 @@ class ObjectStorage: pass def _get_cache_marker_mtime(self, bucket_id: str) -> float: - """Get a cache marker combining serial and object count for cross-process invalidation. - - Returns a combined value that changes if either _cache_serial or object count changes. - This handles cases where the serial was reset but object count differs. - """ - stats_path = self._system_bucket_root(bucket_id) / "stats.json" - try: - data = json.loads(stats_path.read_text(encoding="utf-8")) - serial = data.get("_cache_serial", 0) - count = data.get("objects", 0) - return float(serial * 1000000 + count) - except (OSError, json.JSONDecodeError): - return 0 + return float(self._stats_serial.get(bucket_id, 0)) def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None: - with self._cache_lock: + with self._obj_cache_lock: cached = self._object_cache.get(bucket_id) if cached: objects, timestamp, stats_mtime = cached @@ -2359,19 +2373,19 @@ class ObjectStorage: return meta_root / parent / "_index.json", entry_name def _get_meta_index_lock(self, index_path: str) -> threading.Lock: - with self._cache_lock: + with self._registry_lock: if index_path not in self._meta_index_locks: self._meta_index_locks[index_path] = threading.Lock() return self._meta_index_locks[index_path] def _read_index_entry(self, bucket_name: str, key: Path) -> Optional[Dict[str, Any]]: cache_key = (bucket_name, str(key)) - with self._cache_lock: + with self._meta_cache_lock: hit = self._meta_read_cache.get(cache_key) if hit is not None: self._meta_read_cache.move_to_end(cache_key) cached = hit[0] - return copy.deepcopy(cached) if cached is not None else None + return dict(cached) if cached is not None else None index_path, entry_name = self._index_file_for_key(bucket_name, key) if _HAS_RUST: @@ -2386,16 +2400,16 @@ class ObjectStorage: except (OSError, json.JSONDecodeError): result = None - with self._cache_lock: + with self._meta_cache_lock: while len(self._meta_read_cache) >= self._meta_read_cache_max: self._meta_read_cache.popitem(last=False) - self._meta_read_cache[cache_key] = (copy.deepcopy(result) if result is not None else None,) + self._meta_read_cache[cache_key] = (dict(result) if result is not None else None,) return result def _invalidate_meta_read_cache(self, bucket_name: str, key: Path) -> None: cache_key = (bucket_name, str(key)) - with self._cache_lock: + with self._meta_cache_lock: self._meta_read_cache.pop(cache_key, None) def _write_index_entry(self, bucket_name: str, key: Path, entry: Dict[str, Any]) -> None: diff --git a/app/version.py b/app/version.py index b3225c6..edc8de6 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.4" +APP_VERSION = "0.3.5" def get_version() -> str: diff --git a/tests/conftest.py b/tests/conftest.py index 59cb5dd..89bcca1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,6 +43,11 @@ def app(tmp_path: Path): } ) yield flask_app + storage = flask_app.extensions.get("object_storage") + if storage: + base = getattr(storage, "storage", storage) + if hasattr(base, "shutdown_stats"): + base.shutdown_stats() @pytest.fixture() -- 2.49.1 From 5f24bd920d329e974acbdd8daf8f17b07ff8f39f Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 2 Mar 2026 22:39:37 +0800 Subject: [PATCH 2/3] Reduce P99 tail latency: defer etag index writes, eliminate double cache rebuild, skip redundant stat() in bucket config --- app/storage.py | 63 ++++++++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/app/storage.py b/app/storage.py index f50961e..c3e1afa 100644 --- a/app/storage.py +++ b/app/storage.py @@ -215,6 +215,8 @@ class ObjectStorage: self._stats_lock = threading.Lock() self._stats_dirty: set[str] = set() self._stats_flush_timer: Optional[threading.Timer] = None + self._etag_index_dirty: set[str] = set() + self._etag_index_flush_timer: Optional[threading.Timer] = None def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: with self._registry_lock: @@ -422,6 +424,9 @@ class ObjectStorage: if self._stats_flush_timer is not None: self._stats_flush_timer.cancel() self._flush_stats() + if self._etag_index_flush_timer is not None: + self._etag_index_flush_timer.cancel() + self._flush_etag_indexes() def delete_bucket(self, bucket_name: str) -> None: bucket_path = self._bucket_path(bucket_name) @@ -451,6 +456,7 @@ class ObjectStorage: self._stats_mem.pop(bucket_id, None) self._stats_serial.pop(bucket_id, None) self._stats_dirty.discard(bucket_id) + self._etag_index_dirty.discard(bucket_id) def list_objects( self, @@ -2169,10 +2175,10 @@ class ObjectStorage: if now - timestamp < self._cache_ttl and current_stats_mtime == cached_stats_mtime: self._object_cache.move_to_end(bucket_id) return objects - cache_version = self._cache_version.get(bucket_id, 0) bucket_lock = self._get_bucket_lock(bucket_id) with bucket_lock: + now = time.time() current_stats_mtime = self._get_cache_marker_mtime(bucket_id) with self._obj_cache_lock: cached = self._object_cache.get(bucket_id) @@ -2186,16 +2192,12 @@ class ObjectStorage: new_stats_mtime = self._get_cache_marker_mtime(bucket_id) with self._obj_cache_lock: - current_version = self._cache_version.get(bucket_id, 0) - if current_version != cache_version: - objects = self._build_object_cache(bucket_path) - new_stats_mtime = self._get_cache_marker_mtime(bucket_id) while len(self._object_cache) >= self._object_cache_max_size: self._object_cache.popitem(last=False) self._object_cache[bucket_id] = (objects, time.time(), new_stats_mtime) self._object_cache.move_to_end(bucket_id) - self._cache_version[bucket_id] = current_version + 1 + self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 self._sorted_key_cache.pop(bucket_id, None) return objects @@ -2205,6 +2207,7 @@ class ObjectStorage: self._object_cache.pop(bucket_id, None) self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 + self._etag_index_dirty.discard(bucket_id) etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: etag_index_path.unlink(missing_ok=True) @@ -2226,23 +2229,32 @@ class ObjectStorage: self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 self._sorted_key_cache.pop(bucket_id, None) - self._update_etag_index(bucket_id, key, meta.etag if meta else None) + self._etag_index_dirty.add(bucket_id) + self._schedule_etag_index_flush() - def _update_etag_index(self, bucket_id: str, key: str, etag: Optional[str]) -> None: - etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" - if not etag_index_path.exists(): - return - try: - with open(etag_index_path, 'r', encoding='utf-8') as f: - index = json.load(f) - if etag is None: - index.pop(key, None) - else: - index[key] = etag - with open(etag_index_path, 'w', encoding='utf-8') as f: - json.dump(index, f) - except (OSError, json.JSONDecodeError): - pass + def _schedule_etag_index_flush(self) -> None: + if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive(): + self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes) + self._etag_index_flush_timer.daemon = True + self._etag_index_flush_timer.start() + + def _flush_etag_indexes(self) -> None: + dirty = set(self._etag_index_dirty) + self._etag_index_dirty.clear() + for bucket_id in dirty: + with self._obj_cache_lock: + cached = self._object_cache.get(bucket_id) + if not cached: + continue + objects = cached[0] + index = {k: v.etag for k, v in objects.items() if v.etag} + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + try: + etag_index_path.parent.mkdir(parents=True, exist_ok=True) + with open(etag_index_path, 'w', encoding='utf-8') as f: + json.dump(index, f) + except OSError: + pass def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None: """Pre-warm the object cache for specified buckets or all buckets. @@ -2299,12 +2311,7 @@ class ObjectStorage: if cached: config, cached_time, cached_mtime = cached if now - cached_time < self._bucket_config_cache_ttl: - try: - current_mtime = config_path.stat().st_mtime if config_path.exists() else 0.0 - except OSError: - current_mtime = 0.0 - if current_mtime == cached_mtime: - return config.copy() + return config.copy() if not config_path.exists(): self._bucket_config_cache[bucket_name] = ({}, now, 0.0) -- 2.49.1 From 81ef0fe4c73f8088642bb83bc0474e0b728be63a Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 3 Mar 2026 19:42:37 +0800 Subject: [PATCH 3/3] Fix stale object count in bucket header and metrics dashboard after deletes --- app/storage.py | 16 ++++++++++++---- static/js/bucket-detail-main.js | 4 +++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/app/storage.py b/app/storage.py index c3e1afa..3e613f9 100644 --- a/app/storage.py +++ b/app/storage.py @@ -212,6 +212,7 @@ class ObjectStorage: self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") self._stats_mem: Dict[str, Dict[str, int]] = {} self._stats_serial: Dict[str, int] = {} + self._stats_mem_time: Dict[str, float] = {} self._stats_lock = threading.Lock() self._stats_dirty: set[str] = set() self._stats_flush_timer: Optional[threading.Timer] = None @@ -273,7 +274,11 @@ class ObjectStorage: with self._stats_lock: if bucket_name in self._stats_mem: - return dict(self._stats_mem[bucket_name]) + cached_at = self._stats_mem_time.get(bucket_name, 0.0) + if (time.monotonic() - cached_at) < cache_ttl: + return dict(self._stats_mem[bucket_name]) + self._stats_mem.pop(bucket_name, None) + self._stats_mem_time.pop(bucket_name, None) cache_path = self._system_bucket_root(bucket_name) / "stats.json" cached_stats = None @@ -350,9 +355,9 @@ class ObjectStorage: } with self._stats_lock: - if bucket_name not in self._stats_mem: - self._stats_mem[bucket_name] = stats - self._stats_serial[bucket_name] = existing_serial + self._stats_mem[bucket_name] = stats + self._stats_mem_time[bucket_name] = time.monotonic() + self._stats_serial[bucket_name] = existing_serial try: cache_path.parent.mkdir(parents=True, exist_ok=True) @@ -365,6 +370,7 @@ class ObjectStorage: def _invalidate_bucket_stats_cache(self, bucket_id: str) -> None: with self._stats_lock: self._stats_mem.pop(bucket_id, None) + self._stats_mem_time.pop(bucket_id, None) self._stats_serial[bucket_id] = self._stats_serial.get(bucket_id, 0) + 1 self._stats_dirty.discard(bucket_id) cache_path = self._system_bucket_root(bucket_id) / "stats.json" @@ -398,6 +404,7 @@ class ObjectStorage: data["total_bytes"] = max(0, data["total_bytes"] + bytes_delta + version_bytes_delta) data["_cache_serial"] = data["_cache_serial"] + 1 self._stats_serial[bucket_id] = self._stats_serial.get(bucket_id, 0) + 1 + self._stats_mem_time[bucket_id] = time.monotonic() self._stats_dirty.add(bucket_id) self._schedule_stats_flush() @@ -454,6 +461,7 @@ class ObjectStorage: del self._meta_read_cache[k] with self._stats_lock: self._stats_mem.pop(bucket_id, None) + self._stats_mem_time.pop(bucket_id, None) self._stats_serial.pop(bucket_id, None) self._stats_dirty.discard(bucket_id) self._etag_index_dirty.discard(bucket_id) diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 7b5d6ff..05a7b34 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -321,7 +321,7 @@ `; }; - const bucketTotalObjects = objectsContainer ? parseInt(objectsContainer.dataset.bucketTotalObjects || '0', 10) : 0; + let bucketTotalObjects = objectsContainer ? parseInt(objectsContainer.dataset.bucketTotalObjects || '0', 10) : 0; const updateObjectCountBadge = () => { if (!objectCountBadge) return; @@ -702,6 +702,7 @@ flushPendingStreamObjects(); hasMoreObjects = false; totalObjectCount = loadedObjectCount; + if (!currentPrefix) bucketTotalObjects = totalObjectCount; updateObjectCountBadge(); if (objectsLoadingRow && objectsLoadingRow.parentNode) { @@ -766,6 +767,7 @@ } totalObjectCount = data.total_count || 0; + if (!append && !currentPrefix) bucketTotalObjects = totalObjectCount; nextContinuationToken = data.next_continuation_token; if (!append && objectsLoadingRow) { -- 2.49.1