Add option to display custom timezone; Fix timezone inconsistencies

This commit is contained in:
2025-12-23 13:48:02 +08:00
parent b592fa9fdb
commit 01e26754e8
4 changed files with 38 additions and 70 deletions

View File

@@ -128,13 +128,12 @@ class ObjectStorage:
BUCKET_VERSIONS_DIR = "versions"
MULTIPART_MANIFEST = "manifest.json"
BUCKET_CONFIG_FILE = ".bucket.json"
KEY_INDEX_CACHE_TTL = 30 # seconds - longer TTL for better browsing performance
KEY_INDEX_CACHE_TTL = 30
def __init__(self, root: Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
# In-memory object metadata cache: bucket_id -> (dict[key -> ObjectMeta], timestamp)
self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {}
def list_buckets(self) -> List[BucketMeta]:
@@ -145,7 +144,7 @@ class ObjectStorage:
buckets.append(
BucketMeta(
name=bucket.name,
created_at=datetime.fromtimestamp(stat.st_ctime),
created_at=datetime.fromtimestamp(stat.st_ctime, timezone.utc),
)
)
return buckets
@@ -192,8 +191,7 @@ class ObjectStorage:
total_bytes = 0
version_count = 0
version_bytes = 0
# Count current objects in the bucket folder
for path in bucket_path.rglob("*"):
if path.is_file():
rel = path.relative_to(bucket_path)
@@ -204,8 +202,7 @@ class ObjectStorage:
stat = path.stat()
object_count += 1
total_bytes += stat.st_size
# Count archived versions in the system folder
versions_root = self._bucket_versions_root(bucket_name)
if versions_root.exists():
for path in versions_root.rglob("*.bin"):
@@ -219,8 +216,8 @@ class ObjectStorage:
"bytes": total_bytes,
"version_count": version_count,
"version_bytes": version_bytes,
"total_objects": object_count + version_count, # All objects including versions
"total_bytes": total_bytes + version_bytes, # All storage including versions
"total_objects": object_count + version_count,
"total_bytes": total_bytes + version_bytes,
}
try:
@@ -277,23 +274,17 @@ class ObjectStorage:
raise StorageError("Bucket does not exist")
bucket_id = bucket_path.name
# Use cached object metadata for fast listing
object_cache = self._get_object_cache(bucket_id, bucket_path)
# Get sorted keys
all_keys = sorted(object_cache.keys())
# Apply prefix filter if specified
if prefix:
all_keys = [k for k in all_keys if k.startswith(prefix)]
total_count = len(all_keys)
# Handle continuation token (the key to start after)
start_index = 0
if continuation_token:
try:
# Binary search for efficiency on large lists
import bisect
start_index = bisect.bisect_right(all_keys, continuation_token)
if start_index >= total_count:
@@ -304,14 +295,12 @@ class ObjectStorage:
total_count=total_count,
)
except Exception:
pass # Invalid token, start from beginning
pass
# Get the slice we need
end_index = start_index + max_keys
keys_slice = all_keys[start_index:end_index]
is_truncated = end_index < total_count
# Build result from cached metadata (no file I/O!)
objects: List[ObjectMeta] = []
for key in keys_slice:
obj = object_cache.get(key)
@@ -350,14 +339,12 @@ class ObjectStorage:
destination = bucket_path / safe_key
destination.parent.mkdir(parents=True, exist_ok=True)
# Check if this is an overwrite (won't add to object count)
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
if self._is_versioning_enabled(bucket_path) and is_overwrite:
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
# Write to temp file first to get actual size
tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp"
@@ -369,9 +356,7 @@ class ObjectStorage:
new_size = tmp_path.stat().st_size
# Check quota before finalizing
if enforce_quota:
# Calculate net change (new size minus size being replaced)
size_delta = new_size - existing_size
object_delta = 0 if is_overwrite else 1
@@ -387,11 +372,9 @@ class ObjectStorage:
quota_check["usage"],
)
# Move to final destination
shutil.move(str(tmp_path), str(destination))
finally:
# Clean up temp file if it still exists
try:
tmp_path.unlink(missing_ok=True)
except OSError:
@@ -400,7 +383,6 @@ class ObjectStorage:
stat = destination.stat()
etag = checksum.hexdigest()
# Always store internal metadata (etag, size) alongside user metadata
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
combined_meta = {**internal_meta, **(metadata or {})}
self._write_metadata(bucket_id, safe_key, combined_meta)
@@ -411,7 +393,7 @@ class ObjectStorage:
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=etag,
metadata=metadata,
)
@@ -438,16 +420,14 @@ class ObjectStorage:
for parent in path.parents:
if parent == stop_at:
break
# Retry a few times with small delays for Windows/OneDrive
for attempt in range(3):
try:
if parent.exists() and not any(parent.iterdir()):
parent.rmdir()
break # Success, move to next parent
break
except OSError:
if attempt < 2:
time.sleep(0.1) # Brief delay before retry
# Final attempt failed - continue to next parent
time.sleep(0.1)
break
def delete_object(self, bucket_name: str, object_key: str) -> None:
@@ -485,7 +465,6 @@ class ObjectStorage:
if legacy_version_dir.exists():
shutil.rmtree(legacy_version_dir, ignore_errors=True)
# Invalidate bucket stats cache
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
self._cleanup_empty_parents(target, bucket_path)
@@ -599,7 +578,6 @@ class ObjectStorage:
bucket_path = self._require_bucket_path(bucket_name)
if max_bytes is None and max_objects is None:
# Remove quota entirely
self._set_bucket_config_entry(bucket_path.name, "quota", None)
return
@@ -641,9 +619,7 @@ class ObjectStorage:
"message": None,
}
# Get current stats (uses cache when available)
stats = self.bucket_stats(bucket_name)
# Use totals which include versions for quota enforcement
current_bytes = stats.get("total_bytes", stats.get("bytes", 0))
current_objects = stats.get("total_objects", stats.get("objects", 0))
@@ -804,7 +780,7 @@ class ObjectStorage:
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=self._compute_etag(destination),
metadata=metadata or None,
)
@@ -907,14 +883,12 @@ class ObjectStorage:
raise StorageError("part_number must be >= 1")
bucket_path = self._bucket_path(bucket_name)
# Get the upload root directory
upload_root = self._multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
raise StorageError("Multipart upload not found")
# Write the part data first (can happen concurrently)
checksum = hashlib.md5()
part_filename = f"part-{part_number:05d}.part"
part_path = upload_root / part_filename
@@ -926,13 +900,11 @@ class ObjectStorage:
"filename": part_filename,
}
# Update manifest with file locking to prevent race conditions
manifest_path = upload_root / self.MULTIPART_MANIFEST
lock_path = upload_root / ".manifest.lock"
with lock_path.open("w") as lock_file:
with _file_lock(lock_file):
# Re-read manifest under lock to get latest state
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
@@ -986,11 +958,9 @@ class ObjectStorage:
safe_key = self._sanitize_object_key(manifest["object_key"])
destination = bucket_path / safe_key
# Check if this is an overwrite
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
# Check quota before writing
if enforce_quota:
size_delta = total_size - existing_size
object_delta = 0 if is_overwrite else 1
@@ -1052,7 +1022,7 @@ class ObjectStorage:
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=checksum.hexdigest(),
metadata=metadata,
)
@@ -1168,9 +1138,8 @@ class ObjectStorage:
bucket_id = bucket_path.name
objects: Dict[str, ObjectMeta] = {}
bucket_str = str(bucket_path)
bucket_len = len(bucket_str) + 1 # +1 for the separator
bucket_len = len(bucket_str) + 1
# Try to load persisted etag index first (single file read vs thousands)
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
meta_cache: Dict[str, str] = {}
index_mtime: float = 0
@@ -1183,12 +1152,10 @@ class ObjectStorage:
except (OSError, json.JSONDecodeError):
meta_cache = {}
# Check if we need to rebuild the index
meta_root = self._bucket_meta_root(bucket_id)
needs_rebuild = False
if meta_root.exists() and index_mtime > 0:
# Quick check: if any meta file is newer than index, rebuild
def check_newer(dir_path: str) -> bool:
try:
with os.scandir(dir_path) as it:
@@ -1211,7 +1178,6 @@ class ObjectStorage:
meta_len = len(meta_str) + 1
meta_files: list[tuple[str, str]] = []
# Collect all metadata file paths
def collect_meta_files(dir_path: str) -> None:
try:
with os.scandir(dir_path) as it:
@@ -1227,7 +1193,6 @@ class ObjectStorage:
collect_meta_files(meta_str)
# Parallel read of metadata files - only extract __etag__
def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]:
key, path = item
try:
@@ -1252,7 +1217,6 @@ class ObjectStorage:
if etag:
meta_cache[key] = etag
# Persist the index for next time
try:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
@@ -1260,43 +1224,36 @@ class ObjectStorage:
except OSError:
pass
# Now scan objects and use cached etags
def scan_dir(dir_path: str) -> None:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
# Skip internal folders
rel_start = entry.path[bucket_len:].split(os.sep)[0] if len(entry.path) > bucket_len else entry.name
if rel_start in self.INTERNAL_FOLDERS:
continue
scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
# Get relative path and convert to POSIX
rel = entry.path[bucket_len:]
# Check if in internal folder
first_part = rel.split(os.sep)[0] if os.sep in rel else rel
if first_part in self.INTERNAL_FOLDERS:
continue
key = rel.replace(os.sep, '/')
try:
# Use entry.stat() which is cached from scandir
stat = entry.stat()
# Get etag from cache (now just a string, not dict)
etag = meta_cache.get(key)
# Use placeholder for legacy objects without stored etag
if not etag:
etag = f'"{stat.st_size}-{int(stat.st_mtime)}"'
objects[key] = ObjectMeta(
key=key,
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=etag,
metadata=None, # Don't include user metadata in listing
metadata=None,
)
except OSError:
pass
@@ -1316,7 +1273,6 @@ class ObjectStorage:
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
return objects
# Rebuild cache
objects = self._build_object_cache(bucket_path)
self._object_cache[bucket_id] = (objects, now)
return objects
@@ -1324,7 +1280,6 @@ class ObjectStorage:
def _invalidate_object_cache(self, bucket_id: str) -> None:
"""Invalidate the object cache and etag index for a bucket."""
self._object_cache.pop(bucket_id, None)
# Also invalidate persisted etag index
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.unlink(missing_ok=True)