266 lines
8.9 KiB
Python
266 lines
8.9 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import logging
|
|
import queue
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class AccessLogEntry:
|
|
bucket_owner: str = "-"
|
|
bucket: str = "-"
|
|
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
remote_ip: str = "-"
|
|
requester: str = "-"
|
|
request_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16].upper())
|
|
operation: str = "-"
|
|
key: str = "-"
|
|
request_uri: str = "-"
|
|
http_status: int = 200
|
|
error_code: str = "-"
|
|
bytes_sent: int = 0
|
|
object_size: int = 0
|
|
total_time_ms: int = 0
|
|
turn_around_time_ms: int = 0
|
|
referrer: str = "-"
|
|
user_agent: str = "-"
|
|
version_id: str = "-"
|
|
host_id: str = "-"
|
|
signature_version: str = "SigV4"
|
|
cipher_suite: str = "-"
|
|
authentication_type: str = "AuthHeader"
|
|
host_header: str = "-"
|
|
tls_version: str = "-"
|
|
|
|
def to_log_line(self) -> str:
|
|
time_str = self.timestamp.strftime("[%d/%b/%Y:%H:%M:%S %z]")
|
|
return (
|
|
f'{self.bucket_owner} {self.bucket} {time_str} {self.remote_ip} '
|
|
f'{self.requester} {self.request_id} {self.operation} {self.key} '
|
|
f'"{self.request_uri}" {self.http_status} {self.error_code or "-"} '
|
|
f'{self.bytes_sent or "-"} {self.object_size or "-"} {self.total_time_ms or "-"} '
|
|
f'{self.turn_around_time_ms or "-"} "{self.referrer}" "{self.user_agent}" {self.version_id}'
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"bucket_owner": self.bucket_owner,
|
|
"bucket": self.bucket,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"remote_ip": self.remote_ip,
|
|
"requester": self.requester,
|
|
"request_id": self.request_id,
|
|
"operation": self.operation,
|
|
"key": self.key,
|
|
"request_uri": self.request_uri,
|
|
"http_status": self.http_status,
|
|
"error_code": self.error_code,
|
|
"bytes_sent": self.bytes_sent,
|
|
"object_size": self.object_size,
|
|
"total_time_ms": self.total_time_ms,
|
|
"referrer": self.referrer,
|
|
"user_agent": self.user_agent,
|
|
"version_id": self.version_id,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class LoggingConfiguration:
|
|
target_bucket: str
|
|
target_prefix: str = ""
|
|
enabled: bool = True
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"LoggingEnabled": {
|
|
"TargetBucket": self.target_bucket,
|
|
"TargetPrefix": self.target_prefix,
|
|
}
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> Optional["LoggingConfiguration"]:
|
|
logging_enabled = data.get("LoggingEnabled")
|
|
if not logging_enabled:
|
|
return None
|
|
return cls(
|
|
target_bucket=logging_enabled.get("TargetBucket", ""),
|
|
target_prefix=logging_enabled.get("TargetPrefix", ""),
|
|
enabled=True,
|
|
)
|
|
|
|
|
|
class AccessLoggingService:
|
|
def __init__(self, storage_root: Path, flush_interval: int = 60, max_buffer_size: int = 1000):
|
|
self.storage_root = storage_root
|
|
self.flush_interval = flush_interval
|
|
self.max_buffer_size = max_buffer_size
|
|
self._configs: Dict[str, LoggingConfiguration] = {}
|
|
self._buffer: Dict[str, List[AccessLogEntry]] = {}
|
|
self._buffer_lock = threading.Lock()
|
|
self._shutdown = threading.Event()
|
|
self._storage = None
|
|
|
|
self._flush_thread = threading.Thread(target=self._flush_loop, name="access-log-flush", daemon=True)
|
|
self._flush_thread.start()
|
|
|
|
def set_storage(self, storage: Any) -> None:
|
|
self._storage = storage
|
|
|
|
def _config_path(self, bucket_name: str) -> Path:
|
|
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "logging.json"
|
|
|
|
def get_bucket_logging(self, bucket_name: str) -> Optional[LoggingConfiguration]:
|
|
if bucket_name in self._configs:
|
|
return self._configs[bucket_name]
|
|
|
|
config_path = self._config_path(bucket_name)
|
|
if not config_path.exists():
|
|
return None
|
|
|
|
try:
|
|
data = json.loads(config_path.read_text(encoding="utf-8"))
|
|
config = LoggingConfiguration.from_dict(data)
|
|
if config:
|
|
self._configs[bucket_name] = config
|
|
return config
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.warning(f"Failed to load logging config for {bucket_name}: {e}")
|
|
return None
|
|
|
|
def set_bucket_logging(self, bucket_name: str, config: LoggingConfiguration) -> None:
|
|
config_path = self._config_path(bucket_name)
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
config_path.write_text(json.dumps(config.to_dict(), indent=2), encoding="utf-8")
|
|
self._configs[bucket_name] = config
|
|
|
|
def delete_bucket_logging(self, bucket_name: str) -> None:
|
|
config_path = self._config_path(bucket_name)
|
|
try:
|
|
if config_path.exists():
|
|
config_path.unlink()
|
|
except OSError:
|
|
pass
|
|
self._configs.pop(bucket_name, None)
|
|
|
|
def log_request(
|
|
self,
|
|
bucket_name: str,
|
|
*,
|
|
operation: str,
|
|
key: str = "-",
|
|
remote_ip: str = "-",
|
|
requester: str = "-",
|
|
request_uri: str = "-",
|
|
http_status: int = 200,
|
|
error_code: str = "",
|
|
bytes_sent: int = 0,
|
|
object_size: int = 0,
|
|
total_time_ms: int = 0,
|
|
referrer: str = "-",
|
|
user_agent: str = "-",
|
|
version_id: str = "-",
|
|
request_id: str = "",
|
|
) -> None:
|
|
config = self.get_bucket_logging(bucket_name)
|
|
if not config or not config.enabled:
|
|
return
|
|
|
|
entry = AccessLogEntry(
|
|
bucket_owner="local-owner",
|
|
bucket=bucket_name,
|
|
remote_ip=remote_ip,
|
|
requester=requester,
|
|
request_id=request_id or uuid.uuid4().hex[:16].upper(),
|
|
operation=operation,
|
|
key=key,
|
|
request_uri=request_uri,
|
|
http_status=http_status,
|
|
error_code=error_code,
|
|
bytes_sent=bytes_sent,
|
|
object_size=object_size,
|
|
total_time_ms=total_time_ms,
|
|
referrer=referrer,
|
|
user_agent=user_agent,
|
|
version_id=version_id,
|
|
)
|
|
|
|
target_key = f"{config.target_bucket}:{config.target_prefix}"
|
|
should_flush = False
|
|
with self._buffer_lock:
|
|
if target_key not in self._buffer:
|
|
self._buffer[target_key] = []
|
|
self._buffer[target_key].append(entry)
|
|
should_flush = len(self._buffer[target_key]) >= self.max_buffer_size
|
|
|
|
if should_flush:
|
|
self._flush_buffer(target_key)
|
|
|
|
def _flush_loop(self) -> None:
|
|
while not self._shutdown.is_set():
|
|
self._shutdown.wait(timeout=self.flush_interval)
|
|
if not self._shutdown.is_set():
|
|
self._flush_all()
|
|
|
|
def _flush_all(self) -> None:
|
|
with self._buffer_lock:
|
|
targets = list(self._buffer.keys())
|
|
|
|
for target_key in targets:
|
|
self._flush_buffer(target_key)
|
|
|
|
def _flush_buffer(self, target_key: str) -> None:
|
|
with self._buffer_lock:
|
|
entries = self._buffer.pop(target_key, [])
|
|
|
|
if not entries or not self._storage:
|
|
return
|
|
|
|
try:
|
|
bucket_name, prefix = target_key.split(":", 1)
|
|
except ValueError:
|
|
logger.error(f"Invalid target key: {target_key}")
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
log_key = f"{prefix}{now.strftime('%Y-%m-%d-%H-%M-%S')}-{uuid.uuid4().hex[:8]}"
|
|
|
|
log_content = "\n".join(entry.to_log_line() for entry in entries) + "\n"
|
|
|
|
try:
|
|
stream = io.BytesIO(log_content.encode("utf-8"))
|
|
self._storage.put_object(bucket_name, log_key, stream, enforce_quota=False)
|
|
logger.info(f"Flushed {len(entries)} access log entries to {bucket_name}/{log_key}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to write access log to {bucket_name}/{log_key}: {e}")
|
|
with self._buffer_lock:
|
|
if target_key not in self._buffer:
|
|
self._buffer[target_key] = []
|
|
self._buffer[target_key] = entries + self._buffer[target_key]
|
|
|
|
def flush(self) -> None:
|
|
self._flush_all()
|
|
|
|
def shutdown(self) -> None:
|
|
self._shutdown.set()
|
|
self._flush_all()
|
|
self._flush_thread.join(timeout=5.0)
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
with self._buffer_lock:
|
|
buffered = sum(len(entries) for entries in self._buffer.values())
|
|
return {
|
|
"buffered_entries": buffered,
|
|
"target_buckets": len(self._buffer),
|
|
}
|