From 01e26754e824e32bd6a618d2fdfc3dd133935d4a Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 23 Dec 2025 13:48:02 +0800 Subject: [PATCH] Add option to display custom timezone; Fix timezone inconsistencies --- app/__init__.py | 17 ++++++++--- app/config.py | 6 +++- app/iam.py | 8 ++--- app/storage.py | 77 ++++++++++--------------------------------------- 4 files changed, 38 insertions(+), 70 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 3147a1c..4d7d68f 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -171,13 +171,22 @@ def create_app( @app.template_filter("timestamp_to_datetime") def timestamp_to_datetime(value: float) -> str: - """Format Unix timestamp as human-readable datetime.""" - from datetime import datetime + """Format Unix timestamp as human-readable datetime in configured timezone.""" + from datetime import datetime, timezone as dt_timezone + from zoneinfo import ZoneInfo if not value: return "Never" try: - dt = datetime.fromtimestamp(value) - return dt.strftime("%Y-%m-%d %H:%M:%S") + dt_utc = datetime.fromtimestamp(value, dt_timezone.utc) + display_tz = app.config.get("DISPLAY_TIMEZONE", "UTC") + if display_tz and display_tz != "UTC": + try: + tz = ZoneInfo(display_tz) + dt_local = dt_utc.astimezone(tz) + return dt_local.strftime("%Y-%m-%d %H:%M:%S") + except (KeyError, ValueError): + pass + return dt_utc.strftime("%Y-%m-%d %H:%M:%S UTC") except (ValueError, OSError): return "Unknown" diff --git a/app/config.py b/app/config.py index 2489397..206cce6 100644 --- a/app/config.py +++ b/app/config.py @@ -73,6 +73,7 @@ class AppConfig: kms_enabled: bool kms_keys_path: Path default_encryption_algorithm: str + display_timezone: str @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -161,6 +162,7 @@ class AppConfig: kms_enabled = str(_get("KMS_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} kms_keys_path = Path(_get("KMS_KEYS_PATH", encryption_keys_dir / "kms_keys.json")).resolve() default_encryption_algorithm = str(_get("DEFAULT_ENCRYPTION_ALGORITHM", "AES256")) + display_timezone = str(_get("DISPLAY_TIMEZONE", "UTC")) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -195,7 +197,8 @@ class AppConfig: encryption_master_key_path=encryption_master_key_path, kms_enabled=kms_enabled, kms_keys_path=kms_keys_path, - default_encryption_algorithm=default_encryption_algorithm) + default_encryption_algorithm=default_encryption_algorithm, + display_timezone=display_timezone) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -320,4 +323,5 @@ class AppConfig: "KMS_ENABLED": self.kms_enabled, "KMS_KEYS_PATH": str(self.kms_keys_path), "DEFAULT_ENCRYPTION_ALGORITHM": self.default_encryption_algorithm, + "DISPLAY_TIMEZONE": self.display_timezone, } diff --git a/app/iam.py b/app/iam.py index 519ae74..465aca1 100644 --- a/app/iam.py +++ b/app/iam.py @@ -6,7 +6,7 @@ import math import secrets from collections import deque from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set @@ -148,7 +148,7 @@ class IamService: return attempts = self._failed_attempts.setdefault(access_key, deque()) self._prune_attempts(attempts) - attempts.append(datetime.now()) + attempts.append(datetime.now(timezone.utc)) def _clear_failed_attempts(self, access_key: str) -> None: if not access_key: @@ -156,7 +156,7 @@ class IamService: self._failed_attempts.pop(access_key, None) def _prune_attempts(self, attempts: Deque[datetime]) -> None: - cutoff = datetime.now() - self.auth_lockout_window + cutoff = datetime.now(timezone.utc) - self.auth_lockout_window while attempts and attempts[0] < cutoff: attempts.popleft() @@ -177,7 +177,7 @@ class IamService: if len(attempts) < self.auth_max_attempts: return 0 oldest = attempts[0] - elapsed = (datetime.now() - oldest).total_seconds() + elapsed = (datetime.now(timezone.utc) - oldest).total_seconds() return int(max(0, self.auth_lockout_window.total_seconds() - elapsed)) def principal_for_key(self, access_key: str) -> Principal: diff --git a/app/storage.py b/app/storage.py index b8fcefd..c710847 100644 --- a/app/storage.py +++ b/app/storage.py @@ -128,13 +128,12 @@ class ObjectStorage: BUCKET_VERSIONS_DIR = "versions" MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" - KEY_INDEX_CACHE_TTL = 30 # seconds - longer TTL for better browsing performance + KEY_INDEX_CACHE_TTL = 30 def __init__(self, root: Path) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() - # In-memory object metadata cache: bucket_id -> (dict[key -> ObjectMeta], timestamp) self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {} def list_buckets(self) -> List[BucketMeta]: @@ -145,7 +144,7 @@ class ObjectStorage: buckets.append( BucketMeta( name=bucket.name, - created_at=datetime.fromtimestamp(stat.st_ctime), + created_at=datetime.fromtimestamp(stat.st_ctime, timezone.utc), ) ) return buckets @@ -192,8 +191,7 @@ class ObjectStorage: total_bytes = 0 version_count = 0 version_bytes = 0 - - # Count current objects in the bucket folder + for path in bucket_path.rglob("*"): if path.is_file(): rel = path.relative_to(bucket_path) @@ -204,8 +202,7 @@ class ObjectStorage: stat = path.stat() object_count += 1 total_bytes += stat.st_size - - # Count archived versions in the system folder + versions_root = self._bucket_versions_root(bucket_name) if versions_root.exists(): for path in versions_root.rglob("*.bin"): @@ -219,8 +216,8 @@ class ObjectStorage: "bytes": total_bytes, "version_count": version_count, "version_bytes": version_bytes, - "total_objects": object_count + version_count, # All objects including versions - "total_bytes": total_bytes + version_bytes, # All storage including versions + "total_objects": object_count + version_count, + "total_bytes": total_bytes + version_bytes, } try: @@ -277,23 +274,17 @@ class ObjectStorage: raise StorageError("Bucket does not exist") bucket_id = bucket_path.name - # Use cached object metadata for fast listing object_cache = self._get_object_cache(bucket_id, bucket_path) - # Get sorted keys all_keys = sorted(object_cache.keys()) - # Apply prefix filter if specified if prefix: all_keys = [k for k in all_keys if k.startswith(prefix)] total_count = len(all_keys) - - # Handle continuation token (the key to start after) start_index = 0 if continuation_token: try: - # Binary search for efficiency on large lists import bisect start_index = bisect.bisect_right(all_keys, continuation_token) if start_index >= total_count: @@ -304,14 +295,12 @@ class ObjectStorage: total_count=total_count, ) except Exception: - pass # Invalid token, start from beginning + pass - # Get the slice we need end_index = start_index + max_keys keys_slice = all_keys[start_index:end_index] is_truncated = end_index < total_count - - # Build result from cached metadata (no file I/O!) + objects: List[ObjectMeta] = [] for key in keys_slice: obj = object_cache.get(key) @@ -350,14 +339,12 @@ class ObjectStorage: destination = bucket_path / safe_key destination.parent.mkdir(parents=True, exist_ok=True) - # Check if this is an overwrite (won't add to object count) is_overwrite = destination.exists() existing_size = destination.stat().st_size if is_overwrite else 0 if self._is_versioning_enabled(bucket_path) and is_overwrite: self._archive_current_version(bucket_id, safe_key, reason="overwrite") - # Write to temp file first to get actual size tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR tmp_dir.mkdir(parents=True, exist_ok=True) tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp" @@ -369,9 +356,7 @@ class ObjectStorage: new_size = tmp_path.stat().st_size - # Check quota before finalizing if enforce_quota: - # Calculate net change (new size minus size being replaced) size_delta = new_size - existing_size object_delta = 0 if is_overwrite else 1 @@ -387,11 +372,9 @@ class ObjectStorage: quota_check["usage"], ) - # Move to final destination shutil.move(str(tmp_path), str(destination)) finally: - # Clean up temp file if it still exists try: tmp_path.unlink(missing_ok=True) except OSError: @@ -400,7 +383,6 @@ class ObjectStorage: stat = destination.stat() etag = checksum.hexdigest() - # Always store internal metadata (etag, size) alongside user metadata internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} combined_meta = {**internal_meta, **(metadata or {})} self._write_metadata(bucket_id, safe_key, combined_meta) @@ -411,7 +393,7 @@ class ObjectStorage: return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=etag, metadata=metadata, ) @@ -438,16 +420,14 @@ class ObjectStorage: for parent in path.parents: if parent == stop_at: break - # Retry a few times with small delays for Windows/OneDrive for attempt in range(3): try: if parent.exists() and not any(parent.iterdir()): parent.rmdir() - break # Success, move to next parent + break except OSError: if attempt < 2: - time.sleep(0.1) # Brief delay before retry - # Final attempt failed - continue to next parent + time.sleep(0.1) break def delete_object(self, bucket_name: str, object_key: str) -> None: @@ -485,7 +465,6 @@ class ObjectStorage: if legacy_version_dir.exists(): shutil.rmtree(legacy_version_dir, ignore_errors=True) - # Invalidate bucket stats cache self._invalidate_bucket_stats_cache(bucket_id) self._invalidate_object_cache(bucket_id) self._cleanup_empty_parents(target, bucket_path) @@ -599,7 +578,6 @@ class ObjectStorage: bucket_path = self._require_bucket_path(bucket_name) if max_bytes is None and max_objects is None: - # Remove quota entirely self._set_bucket_config_entry(bucket_path.name, "quota", None) return @@ -641,9 +619,7 @@ class ObjectStorage: "message": None, } - # Get current stats (uses cache when available) stats = self.bucket_stats(bucket_name) - # Use totals which include versions for quota enforcement current_bytes = stats.get("total_bytes", stats.get("bytes", 0)) current_objects = stats.get("total_objects", stats.get("objects", 0)) @@ -804,7 +780,7 @@ class ObjectStorage: return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=self._compute_etag(destination), metadata=metadata or None, ) @@ -907,14 +883,12 @@ class ObjectStorage: raise StorageError("part_number must be >= 1") bucket_path = self._bucket_path(bucket_name) - # Get the upload root directory upload_root = self._multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): raise StorageError("Multipart upload not found") - # Write the part data first (can happen concurrently) checksum = hashlib.md5() part_filename = f"part-{part_number:05d}.part" part_path = upload_root / part_filename @@ -926,13 +900,11 @@ class ObjectStorage: "filename": part_filename, } - # Update manifest with file locking to prevent race conditions manifest_path = upload_root / self.MULTIPART_MANIFEST lock_path = upload_root / ".manifest.lock" with lock_path.open("w") as lock_file: with _file_lock(lock_file): - # Re-read manifest under lock to get latest state try: manifest = json.loads(manifest_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: @@ -986,11 +958,9 @@ class ObjectStorage: safe_key = self._sanitize_object_key(manifest["object_key"]) destination = bucket_path / safe_key - # Check if this is an overwrite is_overwrite = destination.exists() existing_size = destination.stat().st_size if is_overwrite else 0 - # Check quota before writing if enforce_quota: size_delta = total_size - existing_size object_delta = 0 if is_overwrite else 1 @@ -1052,7 +1022,7 @@ class ObjectStorage: return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=checksum.hexdigest(), metadata=metadata, ) @@ -1168,9 +1138,8 @@ class ObjectStorage: bucket_id = bucket_path.name objects: Dict[str, ObjectMeta] = {} bucket_str = str(bucket_path) - bucket_len = len(bucket_str) + 1 # +1 for the separator + bucket_len = len(bucket_str) + 1 - # Try to load persisted etag index first (single file read vs thousands) etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" meta_cache: Dict[str, str] = {} index_mtime: float = 0 @@ -1183,12 +1152,10 @@ class ObjectStorage: except (OSError, json.JSONDecodeError): meta_cache = {} - # Check if we need to rebuild the index meta_root = self._bucket_meta_root(bucket_id) needs_rebuild = False if meta_root.exists() and index_mtime > 0: - # Quick check: if any meta file is newer than index, rebuild def check_newer(dir_path: str) -> bool: try: with os.scandir(dir_path) as it: @@ -1211,7 +1178,6 @@ class ObjectStorage: meta_len = len(meta_str) + 1 meta_files: list[tuple[str, str]] = [] - # Collect all metadata file paths def collect_meta_files(dir_path: str) -> None: try: with os.scandir(dir_path) as it: @@ -1227,7 +1193,6 @@ class ObjectStorage: collect_meta_files(meta_str) - # Parallel read of metadata files - only extract __etag__ def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]: key, path = item try: @@ -1252,7 +1217,6 @@ class ObjectStorage: if etag: meta_cache[key] = etag - # Persist the index for next time try: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: @@ -1260,43 +1224,36 @@ class ObjectStorage: except OSError: pass - # Now scan objects and use cached etags def scan_dir(dir_path: str) -> None: try: with os.scandir(dir_path) as it: for entry in it: if entry.is_dir(follow_symlinks=False): - # Skip internal folders rel_start = entry.path[bucket_len:].split(os.sep)[0] if len(entry.path) > bucket_len else entry.name if rel_start in self.INTERNAL_FOLDERS: continue scan_dir(entry.path) elif entry.is_file(follow_symlinks=False): - # Get relative path and convert to POSIX rel = entry.path[bucket_len:] - # Check if in internal folder first_part = rel.split(os.sep)[0] if os.sep in rel else rel if first_part in self.INTERNAL_FOLDERS: continue key = rel.replace(os.sep, '/') try: - # Use entry.stat() which is cached from scandir stat = entry.stat() - # Get etag from cache (now just a string, not dict) etag = meta_cache.get(key) - # Use placeholder for legacy objects without stored etag if not etag: etag = f'"{stat.st_size}-{int(stat.st_mtime)}"' objects[key] = ObjectMeta( key=key, size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=etag, - metadata=None, # Don't include user metadata in listing + metadata=None, ) except OSError: pass @@ -1316,7 +1273,6 @@ class ObjectStorage: if now - timestamp < self.KEY_INDEX_CACHE_TTL: return objects - # Rebuild cache objects = self._build_object_cache(bucket_path) self._object_cache[bucket_id] = (objects, now) return objects @@ -1324,7 +1280,6 @@ class ObjectStorage: def _invalidate_object_cache(self, bucket_id: str) -> None: """Invalidate the object cache and etag index for a bucket.""" self._object_cache.pop(bucket_id, None) - # Also invalidate persisted etag index etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: etag_index_path.unlink(missing_ok=True)