235 lines
8.6 KiB
Python
235 lines
8.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
|
|
class RetentionMode(Enum):
|
|
GOVERNANCE = "GOVERNANCE"
|
|
COMPLIANCE = "COMPLIANCE"
|
|
|
|
|
|
class ObjectLockError(Exception):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ObjectLockRetention:
|
|
mode: RetentionMode
|
|
retain_until_date: datetime
|
|
|
|
def to_dict(self) -> Dict[str, str]:
|
|
return {
|
|
"Mode": self.mode.value,
|
|
"RetainUntilDate": self.retain_until_date.isoformat(),
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> Optional["ObjectLockRetention"]:
|
|
if not data:
|
|
return None
|
|
mode_str = data.get("Mode")
|
|
date_str = data.get("RetainUntilDate")
|
|
if not mode_str or not date_str:
|
|
return None
|
|
try:
|
|
mode = RetentionMode(mode_str)
|
|
retain_until = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
|
return cls(mode=mode, retain_until_date=retain_until)
|
|
except (ValueError, KeyError):
|
|
return None
|
|
|
|
def is_expired(self) -> bool:
|
|
return datetime.now(timezone.utc) > self.retain_until_date
|
|
|
|
|
|
@dataclass
|
|
class ObjectLockConfig:
|
|
enabled: bool = False
|
|
default_retention: Optional[ObjectLockRetention] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
result: Dict[str, Any] = {"ObjectLockEnabled": "Enabled" if self.enabled else "Disabled"}
|
|
if self.default_retention:
|
|
result["Rule"] = {
|
|
"DefaultRetention": {
|
|
"Mode": self.default_retention.mode.value,
|
|
"Days": None,
|
|
"Years": None,
|
|
}
|
|
}
|
|
return result
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "ObjectLockConfig":
|
|
enabled = data.get("ObjectLockEnabled") == "Enabled"
|
|
default_retention = None
|
|
rule = data.get("Rule")
|
|
if rule and "DefaultRetention" in rule:
|
|
dr = rule["DefaultRetention"]
|
|
mode_str = dr.get("Mode", "GOVERNANCE")
|
|
days = dr.get("Days")
|
|
years = dr.get("Years")
|
|
if days or years:
|
|
from datetime import timedelta
|
|
now = datetime.now(timezone.utc)
|
|
if years:
|
|
delta = timedelta(days=int(years) * 365)
|
|
else:
|
|
delta = timedelta(days=int(days))
|
|
default_retention = ObjectLockRetention(
|
|
mode=RetentionMode(mode_str),
|
|
retain_until_date=now + delta,
|
|
)
|
|
return cls(enabled=enabled, default_retention=default_retention)
|
|
|
|
|
|
class ObjectLockService:
|
|
def __init__(self, storage_root: Path):
|
|
self.storage_root = storage_root
|
|
self._config_cache: Dict[str, ObjectLockConfig] = {}
|
|
|
|
def _bucket_lock_config_path(self, bucket_name: str) -> Path:
|
|
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "object_lock.json"
|
|
|
|
def _object_lock_meta_path(self, bucket_name: str, object_key: str) -> Path:
|
|
safe_key = object_key.replace("/", "_").replace("\\", "_")
|
|
return (
|
|
self.storage_root / ".myfsio.sys" / "buckets" / bucket_name /
|
|
"locks" / f"{safe_key}.lock.json"
|
|
)
|
|
|
|
def get_bucket_lock_config(self, bucket_name: str) -> ObjectLockConfig:
|
|
if bucket_name in self._config_cache:
|
|
return self._config_cache[bucket_name]
|
|
|
|
config_path = self._bucket_lock_config_path(bucket_name)
|
|
if not config_path.exists():
|
|
return ObjectLockConfig(enabled=False)
|
|
|
|
try:
|
|
data = json.loads(config_path.read_text(encoding="utf-8"))
|
|
config = ObjectLockConfig.from_dict(data)
|
|
self._config_cache[bucket_name] = config
|
|
return config
|
|
except (json.JSONDecodeError, OSError):
|
|
return ObjectLockConfig(enabled=False)
|
|
|
|
def set_bucket_lock_config(self, bucket_name: str, config: ObjectLockConfig) -> None:
|
|
config_path = self._bucket_lock_config_path(bucket_name)
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
config_path.write_text(json.dumps(config.to_dict()), encoding="utf-8")
|
|
self._config_cache[bucket_name] = config
|
|
|
|
def enable_bucket_lock(self, bucket_name: str) -> None:
|
|
config = self.get_bucket_lock_config(bucket_name)
|
|
config.enabled = True
|
|
self.set_bucket_lock_config(bucket_name, config)
|
|
|
|
def is_bucket_lock_enabled(self, bucket_name: str) -> bool:
|
|
return self.get_bucket_lock_config(bucket_name).enabled
|
|
|
|
def get_object_retention(self, bucket_name: str, object_key: str) -> Optional[ObjectLockRetention]:
|
|
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
|
if not meta_path.exists():
|
|
return None
|
|
try:
|
|
data = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
return ObjectLockRetention.from_dict(data.get("retention", {}))
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
def set_object_retention(
|
|
self,
|
|
bucket_name: str,
|
|
object_key: str,
|
|
retention: ObjectLockRetention,
|
|
bypass_governance: bool = False,
|
|
) -> None:
|
|
existing = self.get_object_retention(bucket_name, object_key)
|
|
if existing and not existing.is_expired():
|
|
if existing.mode == RetentionMode.COMPLIANCE:
|
|
raise ObjectLockError(
|
|
"Cannot modify retention on object with COMPLIANCE mode until retention expires"
|
|
)
|
|
if existing.mode == RetentionMode.GOVERNANCE and not bypass_governance:
|
|
raise ObjectLockError(
|
|
"Cannot modify GOVERNANCE retention without bypass-governance permission"
|
|
)
|
|
|
|
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
|
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
existing_data: Dict[str, Any] = {}
|
|
if meta_path.exists():
|
|
try:
|
|
existing_data = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
existing_data["retention"] = retention.to_dict()
|
|
meta_path.write_text(json.dumps(existing_data), encoding="utf-8")
|
|
|
|
def get_legal_hold(self, bucket_name: str, object_key: str) -> bool:
|
|
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
|
if not meta_path.exists():
|
|
return False
|
|
try:
|
|
data = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
return data.get("legal_hold", False)
|
|
except (json.JSONDecodeError, OSError):
|
|
return False
|
|
|
|
def set_legal_hold(self, bucket_name: str, object_key: str, enabled: bool) -> None:
|
|
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
|
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
existing_data: Dict[str, Any] = {}
|
|
if meta_path.exists():
|
|
try:
|
|
existing_data = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
existing_data["legal_hold"] = enabled
|
|
meta_path.write_text(json.dumps(existing_data), encoding="utf-8")
|
|
|
|
def can_delete_object(
|
|
self,
|
|
bucket_name: str,
|
|
object_key: str,
|
|
bypass_governance: bool = False,
|
|
) -> tuple[bool, str]:
|
|
if self.get_legal_hold(bucket_name, object_key):
|
|
return False, "Object is under legal hold"
|
|
|
|
retention = self.get_object_retention(bucket_name, object_key)
|
|
if retention and not retention.is_expired():
|
|
if retention.mode == RetentionMode.COMPLIANCE:
|
|
return False, f"Object is locked in COMPLIANCE mode until {retention.retain_until_date.isoformat()}"
|
|
if retention.mode == RetentionMode.GOVERNANCE:
|
|
if not bypass_governance:
|
|
return False, f"Object is locked in GOVERNANCE mode until {retention.retain_until_date.isoformat()}"
|
|
|
|
return True, ""
|
|
|
|
def can_overwrite_object(
|
|
self,
|
|
bucket_name: str,
|
|
object_key: str,
|
|
bypass_governance: bool = False,
|
|
) -> tuple[bool, str]:
|
|
return self.can_delete_object(bucket_name, object_key, bypass_governance)
|
|
|
|
def delete_object_lock_metadata(self, bucket_name: str, object_key: str) -> None:
|
|
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
|
try:
|
|
if meta_path.exists():
|
|
meta_path.unlink()
|
|
except OSError:
|
|
pass
|