First porting of Python to Rust - update docs and bug fixes

This commit is contained in:
2026-04-20 21:27:02 +08:00
parent c2ef37b84e
commit 476b9bd2e4
82 changed files with 24682 additions and 4132 deletions

View File

@@ -0,0 +1,105 @@
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingConfiguration {
pub target_bucket: String,
#[serde(default)]
pub target_prefix: String,
#[serde(default = "default_enabled")]
pub enabled: bool,
}
fn default_enabled() -> bool {
true
}
#[derive(Serialize, Deserialize)]
struct StoredLoggingFile {
#[serde(rename = "LoggingEnabled")]
logging_enabled: Option<StoredLoggingEnabled>,
}
#[derive(Serialize, Deserialize)]
struct StoredLoggingEnabled {
#[serde(rename = "TargetBucket")]
target_bucket: String,
#[serde(rename = "TargetPrefix", default)]
target_prefix: String,
}
pub struct AccessLoggingService {
storage_root: PathBuf,
cache: RwLock<HashMap<String, Option<LoggingConfiguration>>>,
}
impl AccessLoggingService {
pub fn new(storage_root: &Path) -> Self {
Self {
storage_root: storage_root.to_path_buf(),
cache: RwLock::new(HashMap::new()),
}
}
fn config_path(&self, bucket: &str) -> PathBuf {
self.storage_root
.join(".myfsio.sys")
.join("buckets")
.join(bucket)
.join("logging.json")
}
pub fn get(&self, bucket: &str) -> Option<LoggingConfiguration> {
if let Some(cached) = self.cache.read().get(bucket).cloned() {
return cached;
}
let path = self.config_path(bucket);
let config = if path.exists() {
std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str::<StoredLoggingFile>(&s).ok())
.and_then(|f| f.logging_enabled)
.map(|e| LoggingConfiguration {
target_bucket: e.target_bucket,
target_prefix: e.target_prefix,
enabled: true,
})
} else {
None
};
self.cache
.write()
.insert(bucket.to_string(), config.clone());
config
}
pub fn set(&self, bucket: &str, config: LoggingConfiguration) -> std::io::Result<()> {
let path = self.config_path(bucket);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let stored = StoredLoggingFile {
logging_enabled: Some(StoredLoggingEnabled {
target_bucket: config.target_bucket.clone(),
target_prefix: config.target_prefix.clone(),
}),
};
let json = serde_json::to_string_pretty(&stored)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
std::fs::write(&path, json)?;
self.cache.write().insert(bucket.to_string(), Some(config));
Ok(())
}
pub fn delete(&self, bucket: &str) {
let path = self.config_path(bucket);
if path.exists() {
let _ = std::fs::remove_file(&path);
}
self.cache.write().insert(bucket.to_string(), None);
}
}

View File

@@ -28,6 +28,7 @@ pub struct GcService {
storage_root: PathBuf,
config: GcConfig,
running: Arc<RwLock<bool>>,
started_at: Arc<RwLock<Option<Instant>>>,
history: Arc<RwLock<Vec<Value>>>,
history_path: PathBuf,
}
@@ -53,6 +54,7 @@ impl GcService {
storage_root,
config,
running: Arc::new(RwLock::new(false)),
started_at: Arc::new(RwLock::new(None)),
history: Arc::new(RwLock::new(history)),
history_path,
}
@@ -60,9 +62,17 @@ impl GcService {
pub async fn status(&self) -> Value {
let running = *self.running.read().await;
let scan_elapsed_seconds = self
.started_at
.read()
.await
.as_ref()
.map(|started| started.elapsed().as_secs_f64());
json!({
"enabled": true,
"running": running,
"scanning": running,
"scan_elapsed_seconds": scan_elapsed_seconds,
"interval_hours": self.config.interval_hours,
"temp_file_max_age_hours": self.config.temp_file_max_age_hours,
"multipart_max_age_days": self.config.multipart_max_age_days,
@@ -73,7 +83,9 @@ impl GcService {
pub async fn history(&self) -> Value {
let history = self.history.read().await;
json!({ "executions": *history })
let mut executions: Vec<Value> = history.iter().cloned().collect();
executions.reverse();
json!({ "executions": executions })
}
pub async fn run_now(&self, dry_run: bool) -> Result<Value, String> {
@@ -84,12 +96,14 @@ impl GcService {
}
*running = true;
}
*self.started_at.write().await = Some(Instant::now());
let start = Instant::now();
let result = self.execute_gc(dry_run || self.config.dry_run).await;
let elapsed = start.elapsed().as_secs_f64();
*self.running.write().await = false;
*self.started_at.write().await = None;
let mut result_json = result.clone();
if let Some(obj) = result_json.as_object_mut() {
@@ -124,9 +138,12 @@ impl GcService {
let mut errors: Vec<String> = Vec::new();
let now = std::time::SystemTime::now();
let temp_max_age = std::time::Duration::from_secs_f64(self.config.temp_file_max_age_hours * 3600.0);
let multipart_max_age = std::time::Duration::from_secs(self.config.multipart_max_age_days * 86400);
let lock_max_age = std::time::Duration::from_secs_f64(self.config.lock_file_max_age_hours * 3600.0);
let temp_max_age =
std::time::Duration::from_secs_f64(self.config.temp_file_max_age_hours * 3600.0);
let multipart_max_age =
std::time::Duration::from_secs(self.config.multipart_max_age_days * 86400);
let lock_max_age =
std::time::Duration::from_secs_f64(self.config.lock_file_max_age_hours * 3600.0);
let tmp_dir = self.storage_root.join(".myfsio.sys").join("tmp");
if tmp_dir.exists() {
@@ -140,7 +157,10 @@ impl GcService {
let size = metadata.len();
if !dry_run {
if let Err(e) = std::fs::remove_file(entry.path()) {
errors.push(format!("Failed to remove temp file: {}", e));
errors.push(format!(
"Failed to remove temp file: {}",
e
));
continue;
}
}
@@ -242,7 +262,10 @@ impl GcService {
if let Some(parent) = self.history_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&self.history_path, serde_json::to_string_pretty(&data).unwrap_or_default());
let _ = std::fs::write(
&self.history_path,
serde_json::to_string_pretty(&data).unwrap_or_default(),
);
}
pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> {

View File

@@ -1,11 +1,17 @@
use myfsio_common::constants::{
BUCKET_META_DIR, BUCKET_VERSIONS_DIR, INDEX_FILE, SYSTEM_BUCKETS_DIR, SYSTEM_ROOT,
};
use myfsio_storage::fs_backend::FsStorageBackend;
use myfsio_storage::traits::StorageEngine;
use serde_json::{json, Value};
use std::path::PathBuf;
use serde_json::{json, Map, Value};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
const MAX_ISSUES: usize = 500;
const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"];
pub struct IntegrityConfig {
pub interval_hours: f64,
pub batch_size: usize,
@@ -17,7 +23,7 @@ impl Default for IntegrityConfig {
fn default() -> Self {
Self {
interval_hours: 24.0,
batch_size: 1000,
batch_size: 10_000,
auto_heal: false,
dry_run: false,
}
@@ -25,21 +31,70 @@ impl Default for IntegrityConfig {
}
pub struct IntegrityService {
#[allow(dead_code)]
storage: Arc<FsStorageBackend>,
storage_root: PathBuf,
config: IntegrityConfig,
running: Arc<RwLock<bool>>,
started_at: Arc<RwLock<Option<Instant>>>,
history: Arc<RwLock<Vec<Value>>>,
history_path: PathBuf,
}
#[derive(Default)]
struct ScanState {
objects_scanned: u64,
buckets_scanned: u64,
corrupted_objects: u64,
orphaned_objects: u64,
phantom_metadata: u64,
stale_versions: u64,
etag_cache_inconsistencies: u64,
issues: Vec<Value>,
errors: Vec<String>,
}
impl ScanState {
fn batch_exhausted(&self, batch_size: usize) -> bool {
self.objects_scanned >= batch_size as u64
}
fn push_issue(&mut self, issue_type: &str, bucket: &str, key: &str, detail: String) {
if self.issues.len() < MAX_ISSUES {
self.issues.push(json!({
"issue_type": issue_type,
"bucket": bucket,
"key": key,
"detail": detail,
}));
}
}
fn into_json(self, elapsed: f64) -> Value {
json!({
"objects_scanned": self.objects_scanned,
"buckets_scanned": self.buckets_scanned,
"corrupted_objects": self.corrupted_objects,
"orphaned_objects": self.orphaned_objects,
"phantom_metadata": self.phantom_metadata,
"stale_versions": self.stale_versions,
"etag_cache_inconsistencies": self.etag_cache_inconsistencies,
"issues_healed": 0,
"issues": self.issues,
"errors": self.errors,
"execution_time_seconds": elapsed,
})
}
}
impl IntegrityService {
pub fn new(
storage: Arc<FsStorageBackend>,
storage_root: &std::path::Path,
storage_root: &Path,
config: IntegrityConfig,
) -> Self {
let history_path = storage_root
.join(".myfsio.sys")
.join(SYSTEM_ROOT)
.join("config")
.join("integrity_history.json");
@@ -55,8 +110,10 @@ impl IntegrityService {
Self {
storage,
storage_root: storage_root.to_path_buf(),
config,
running: Arc::new(RwLock::new(false)),
started_at: Arc::new(RwLock::new(None)),
history: Arc::new(RwLock::new(history)),
history_path,
}
@@ -64,9 +121,17 @@ impl IntegrityService {
pub async fn status(&self) -> Value {
let running = *self.running.read().await;
let scan_elapsed_seconds = self
.started_at
.read()
.await
.as_ref()
.map(|started| started.elapsed().as_secs_f64());
json!({
"enabled": true,
"running": running,
"scanning": running,
"scan_elapsed_seconds": scan_elapsed_seconds,
"interval_hours": self.config.interval_hours,
"batch_size": self.config.batch_size,
"auto_heal": self.config.auto_heal,
@@ -76,7 +141,9 @@ impl IntegrityService {
pub async fn history(&self) -> Value {
let history = self.history.read().await;
json!({ "executions": *history })
let mut executions: Vec<Value> = history.iter().cloned().collect();
executions.reverse();
json!({ "executions": executions })
}
pub async fn run_now(&self, dry_run: bool, auto_heal: bool) -> Result<Value, String> {
@@ -87,23 +154,31 @@ impl IntegrityService {
}
*running = true;
}
*self.started_at.write().await = Some(Instant::now());
let start = Instant::now();
let result = self.check_integrity(dry_run, auto_heal).await;
let storage_root = self.storage_root.clone();
let batch_size = self.config.batch_size;
let result =
tokio::task::spawn_blocking(move || scan_all_buckets(&storage_root, batch_size))
.await
.unwrap_or_else(|e| {
let mut st = ScanState::default();
st.errors.push(format!("scan task failed: {}", e));
st
});
let elapsed = start.elapsed().as_secs_f64();
*self.running.write().await = false;
*self.started_at.write().await = None;
let mut result_json = result.clone();
if let Some(obj) = result_json.as_object_mut() {
obj.insert("execution_time_seconds".to_string(), json!(elapsed));
}
let result_json = result.into_json(elapsed);
let record = json!({
"timestamp": chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
"dry_run": dry_run,
"auto_heal": auto_heal,
"result": result_json,
"result": result_json.clone(),
});
{
@@ -116,62 +191,7 @@ impl IntegrityService {
}
self.save_history().await;
Ok(result)
}
async fn check_integrity(&self, _dry_run: bool, _auto_heal: bool) -> Value {
let buckets = match self.storage.list_buckets().await {
Ok(b) => b,
Err(e) => return json!({"error": e.to_string()}),
};
let mut objects_scanned = 0u64;
let mut corrupted = 0u64;
let mut phantom_metadata = 0u64;
let mut errors: Vec<String> = Vec::new();
for bucket in &buckets {
let params = myfsio_common::types::ListParams {
max_keys: self.config.batch_size,
..Default::default()
};
let objects = match self.storage.list_objects(&bucket.name, &params).await {
Ok(r) => r.objects,
Err(e) => {
errors.push(format!("{}: {}", bucket.name, e));
continue;
}
};
for obj in &objects {
objects_scanned += 1;
match self.storage.get_object_path(&bucket.name, &obj.key).await {
Ok(path) => {
if !path.exists() {
phantom_metadata += 1;
} else if let Some(ref expected_etag) = obj.etag {
match myfsio_crypto::hashing::md5_file(&path) {
Ok(actual_etag) => {
if &actual_etag != expected_etag {
corrupted += 1;
}
}
Err(e) => errors.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
}
}
}
Err(e) => errors.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
}
}
}
json!({
"objects_scanned": objects_scanned,
"buckets_scanned": buckets.len(),
"corrupted_objects": corrupted,
"phantom_metadata": phantom_metadata,
"errors": errors,
})
Ok(result_json)
}
async fn save_history(&self) {
@@ -202,3 +222,511 @@ impl IntegrityService {
})
}
}
fn scan_all_buckets(storage_root: &Path, batch_size: usize) -> ScanState {
let mut state = ScanState::default();
let buckets = match list_bucket_names(storage_root) {
Ok(b) => b,
Err(e) => {
state.errors.push(format!("list buckets: {}", e));
return state;
}
};
for bucket in &buckets {
if state.batch_exhausted(batch_size) {
break;
}
state.buckets_scanned += 1;
let bucket_path = storage_root.join(bucket);
let meta_root = storage_root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join(BUCKET_META_DIR);
let index_entries = collect_index_entries(&meta_root);
check_corrupted(&mut state, bucket, &bucket_path, &index_entries, batch_size);
check_phantom(&mut state, bucket, &bucket_path, &index_entries, batch_size);
check_orphaned(&mut state, bucket, &bucket_path, &index_entries, batch_size);
check_stale_versions(&mut state, storage_root, bucket, batch_size);
check_etag_cache(&mut state, storage_root, bucket, &index_entries, batch_size);
}
state
}
fn list_bucket_names(storage_root: &Path) -> std::io::Result<Vec<String>> {
let mut names = Vec::new();
if !storage_root.exists() {
return Ok(names);
}
for entry in std::fs::read_dir(storage_root)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
if name == SYSTEM_ROOT {
continue;
}
if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
names.push(name);
}
}
Ok(names)
}
#[allow(dead_code)]
struct IndexEntryInfo {
entry: Value,
index_file: PathBuf,
key_name: String,
}
fn collect_index_entries(meta_root: &Path) -> HashMap<String, IndexEntryInfo> {
let mut out: HashMap<String, IndexEntryInfo> = HashMap::new();
if !meta_root.exists() {
return out;
}
let mut stack: Vec<PathBuf> = vec![meta_root.to_path_buf()];
while let Some(dir) = stack.pop() {
let rd = match std::fs::read_dir(&dir) {
Ok(r) => r,
Err(_) => continue,
};
for entry in rd.flatten() {
let path = entry.path();
let ft = match entry.file_type() {
Ok(t) => t,
Err(_) => continue,
};
if ft.is_dir() {
stack.push(path);
continue;
}
if entry.file_name().to_string_lossy() != INDEX_FILE {
continue;
}
let rel_dir = match path.parent().and_then(|p| p.strip_prefix(meta_root).ok()) {
Some(p) => p.to_path_buf(),
None => continue,
};
let dir_prefix = if rel_dir.as_os_str().is_empty() {
String::new()
} else {
rel_dir
.components()
.map(|c| c.as_os_str().to_string_lossy().to_string())
.collect::<Vec<_>>()
.join("/")
};
let content = match std::fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => continue,
};
let index_data: Map<String, Value> = match serde_json::from_str(&content) {
Ok(Value::Object(m)) => m,
_ => continue,
};
for (key_name, entry_val) in index_data {
let full_key = if dir_prefix.is_empty() {
key_name.clone()
} else {
format!("{}/{}", dir_prefix, key_name)
};
out.insert(
full_key,
IndexEntryInfo {
entry: entry_val,
index_file: path.clone(),
key_name,
},
);
}
}
}
out
}
fn stored_etag(entry: &Value) -> Option<String> {
entry
.get("metadata")
.and_then(|m| m.get("__etag__"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
fn check_corrupted(
state: &mut ScanState,
bucket: &str,
bucket_path: &Path,
entries: &HashMap<String, IndexEntryInfo>,
batch_size: usize,
) {
let mut keys: Vec<&String> = entries.keys().collect();
keys.sort();
for full_key in keys {
if state.batch_exhausted(batch_size) {
return;
}
let info = &entries[full_key];
let object_path = bucket_path.join(full_key);
if !object_path.exists() {
continue;
}
state.objects_scanned += 1;
let Some(stored) = stored_etag(&info.entry) else {
continue;
};
match myfsio_crypto::hashing::md5_file(&object_path) {
Ok(actual) => {
if actual != stored {
state.corrupted_objects += 1;
state.push_issue(
"corrupted_object",
bucket,
full_key,
format!("stored_etag={} actual_etag={}", stored, actual),
);
}
}
Err(e) => state
.errors
.push(format!("hash {}/{}: {}", bucket, full_key, e)),
}
}
}
fn check_phantom(
state: &mut ScanState,
bucket: &str,
bucket_path: &Path,
entries: &HashMap<String, IndexEntryInfo>,
batch_size: usize,
) {
let mut keys: Vec<&String> = entries.keys().collect();
keys.sort();
for full_key in keys {
if state.batch_exhausted(batch_size) {
return;
}
state.objects_scanned += 1;
let object_path = bucket_path.join(full_key);
if !object_path.exists() {
state.phantom_metadata += 1;
state.push_issue(
"phantom_metadata",
bucket,
full_key,
"metadata entry without file on disk".to_string(),
);
}
}
}
fn check_orphaned(
state: &mut ScanState,
bucket: &str,
bucket_path: &Path,
entries: &HashMap<String, IndexEntryInfo>,
batch_size: usize,
) {
let indexed: HashSet<&String> = entries.keys().collect();
let mut stack: Vec<(PathBuf, String)> = vec![(bucket_path.to_path_buf(), String::new())];
while let Some((dir, prefix)) = stack.pop() {
if state.batch_exhausted(batch_size) {
return;
}
let rd = match std::fs::read_dir(&dir) {
Ok(r) => r,
Err(_) => continue,
};
for entry in rd.flatten() {
if state.batch_exhausted(batch_size) {
return;
}
let name = entry.file_name().to_string_lossy().to_string();
let ft = match entry.file_type() {
Ok(t) => t,
Err(_) => continue,
};
if ft.is_dir() {
if prefix.is_empty() && INTERNAL_FOLDERS.contains(&name.as_str()) {
continue;
}
let new_prefix = if prefix.is_empty() {
name
} else {
format!("{}/{}", prefix, name)
};
stack.push((entry.path(), new_prefix));
} else if ft.is_file() {
let full_key = if prefix.is_empty() {
name
} else {
format!("{}/{}", prefix, name)
};
state.objects_scanned += 1;
if !indexed.contains(&full_key) {
state.orphaned_objects += 1;
state.push_issue(
"orphaned_object",
bucket,
&full_key,
"file exists without metadata entry".to_string(),
);
}
}
}
}
}
fn check_stale_versions(
state: &mut ScanState,
storage_root: &Path,
bucket: &str,
batch_size: usize,
) {
let versions_root = storage_root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join(BUCKET_VERSIONS_DIR);
if !versions_root.exists() {
return;
}
let mut stack: Vec<PathBuf> = vec![versions_root.clone()];
while let Some(dir) = stack.pop() {
if state.batch_exhausted(batch_size) {
return;
}
let rd = match std::fs::read_dir(&dir) {
Ok(r) => r,
Err(_) => continue,
};
let mut bin_stems: HashMap<String, PathBuf> = HashMap::new();
let mut json_stems: HashMap<String, PathBuf> = HashMap::new();
let mut subdirs: Vec<PathBuf> = Vec::new();
for entry in rd.flatten() {
let ft = match entry.file_type() {
Ok(t) => t,
Err(_) => continue,
};
let path = entry.path();
if ft.is_dir() {
subdirs.push(path);
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
if let Some(stem) = name.strip_suffix(".bin") {
bin_stems.insert(stem.to_string(), path);
} else if let Some(stem) = name.strip_suffix(".json") {
json_stems.insert(stem.to_string(), path);
}
}
for (stem, path) in &bin_stems {
if state.batch_exhausted(batch_size) {
return;
}
state.objects_scanned += 1;
if !json_stems.contains_key(stem) {
state.stale_versions += 1;
let key = path
.strip_prefix(&versions_root)
.map(|p| p.to_string_lossy().replace('\\', "/"))
.unwrap_or_else(|_| path.display().to_string());
state.push_issue(
"stale_version",
bucket,
&key,
"version data without manifest".to_string(),
);
}
}
for (stem, path) in &json_stems {
if state.batch_exhausted(batch_size) {
return;
}
state.objects_scanned += 1;
if !bin_stems.contains_key(stem) {
state.stale_versions += 1;
let key = path
.strip_prefix(&versions_root)
.map(|p| p.to_string_lossy().replace('\\', "/"))
.unwrap_or_else(|_| path.display().to_string());
state.push_issue(
"stale_version",
bucket,
&key,
"version manifest without data".to_string(),
);
}
}
stack.extend(subdirs);
}
}
fn check_etag_cache(
state: &mut ScanState,
storage_root: &Path,
bucket: &str,
entries: &HashMap<String, IndexEntryInfo>,
batch_size: usize,
) {
let etag_index_path = storage_root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join("etag_index.json");
if !etag_index_path.exists() {
return;
}
let cache: HashMap<String, Value> = match std::fs::read_to_string(&etag_index_path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
{
Some(Value::Object(m)) => m.into_iter().collect(),
_ => return,
};
for (full_key, cached_val) in cache {
if state.batch_exhausted(batch_size) {
return;
}
state.objects_scanned += 1;
let Some(cached_etag) = cached_val.as_str() else {
continue;
};
let Some(info) = entries.get(&full_key) else {
continue;
};
let Some(stored) = stored_etag(&info.entry) else {
continue;
};
if cached_etag != stored {
state.etag_cache_inconsistencies += 1;
state.push_issue(
"etag_cache_inconsistency",
bucket,
&full_key,
format!("cached_etag={} index_etag={}", cached_etag, stored),
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn md5_hex(bytes: &[u8]) -> String {
myfsio_crypto::hashing::md5_bytes(bytes)
}
fn write_index(meta_dir: &Path, entries: &[(&str, &str)]) {
fs::create_dir_all(meta_dir).unwrap();
let mut map = Map::new();
for (name, etag) in entries {
map.insert(
name.to_string(),
json!({ "metadata": { "__etag__": etag } }),
);
}
fs::write(
meta_dir.join(INDEX_FILE),
serde_json::to_string(&Value::Object(map)).unwrap(),
)
.unwrap();
}
#[test]
fn scan_detects_each_issue_type() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let bucket = "testbucket";
let bucket_path = root.join(bucket);
let meta_root = root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join(BUCKET_META_DIR);
fs::create_dir_all(&bucket_path).unwrap();
let clean_bytes = b"clean file contents";
let clean_etag = md5_hex(clean_bytes);
fs::write(bucket_path.join("clean.txt"), clean_bytes).unwrap();
let corrupted_bytes = b"actual content";
fs::write(bucket_path.join("corrupted.txt"), corrupted_bytes).unwrap();
fs::write(bucket_path.join("orphan.txt"), b"no metadata").unwrap();
write_index(
&meta_root,
&[
("clean.txt", &clean_etag),
("corrupted.txt", "00000000000000000000000000000000"),
("phantom.txt", "deadbeefdeadbeefdeadbeefdeadbeef"),
],
);
let versions_root = root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join(BUCKET_VERSIONS_DIR)
.join("someobject");
fs::create_dir_all(&versions_root).unwrap();
fs::write(versions_root.join("v1.bin"), b"orphan bin").unwrap();
fs::write(versions_root.join("v2.json"), b"{}").unwrap();
let etag_index = root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join("etag_index.json");
fs::write(
&etag_index,
serde_json::to_string(&json!({ "clean.txt": "stale-cached-etag" })).unwrap(),
)
.unwrap();
let state = scan_all_buckets(root, 10_000);
assert_eq!(state.corrupted_objects, 1, "corrupted");
assert_eq!(state.phantom_metadata, 1, "phantom");
assert_eq!(state.orphaned_objects, 1, "orphaned");
assert_eq!(state.stale_versions, 2, "stale versions");
assert_eq!(state.etag_cache_inconsistencies, 1, "etag cache");
assert_eq!(state.buckets_scanned, 1);
assert!(
state.errors.is_empty(),
"unexpected errors: {:?}",
state.errors
);
}
#[test]
fn skips_system_root_as_bucket() {
let tmp = tempfile::tempdir().unwrap();
fs::create_dir_all(tmp.path().join(SYSTEM_ROOT).join("config")).unwrap();
let state = scan_all_buckets(tmp.path(), 100);
assert_eq!(state.buckets_scanned, 0);
}
}

View File

@@ -66,7 +66,10 @@ impl LifecycleService {
None => continue,
};
let rules = match lifecycle.as_str().and_then(|s| serde_json::from_str::<Value>(s).ok()) {
let rules = match lifecycle
.as_str()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
{
Some(v) => v,
None => continue,
};
@@ -93,7 +96,11 @@ impl LifecycleService {
let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64);
let params = myfsio_common::types::ListParams {
max_keys: 1000,
prefix: if prefix.is_empty() { None } else { Some(prefix.to_string()) },
prefix: if prefix.is_empty() {
None
} else {
Some(prefix.to_string())
},
..Default::default()
};
if let Ok(result) = self.storage.list_objects(&bucket.name, &params).await {
@@ -101,7 +108,8 @@ impl LifecycleService {
if obj.last_modified < cutoff {
match self.storage.delete_object(&bucket.name, &obj.key).await {
Ok(()) => total_expired += 1,
Err(e) => errors.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
Err(e) => errors
.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
}
}
}
@@ -112,12 +120,18 @@ impl LifecycleService {
if let Some(abort) = rule.get("AbortIncompleteMultipartUpload") {
if let Some(days) = abort.get("DaysAfterInitiation").and_then(|d| d.as_u64()) {
let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64);
if let Ok(uploads) = self.storage.list_multipart_uploads(&bucket.name).await {
if let Ok(uploads) = self.storage.list_multipart_uploads(&bucket.name).await
{
for upload in &uploads {
if upload.initiated < cutoff {
match self.storage.abort_multipart(&bucket.name, &upload.upload_id).await {
match self
.storage
.abort_multipart(&bucket.name, &upload.upload_id)
.await
{
Ok(()) => total_multipart_aborted += 1,
Err(e) => errors.push(format!("abort {}: {}", upload.upload_id, e)),
Err(e) => errors
.push(format!("abort {}: {}", upload.upload_id, e)),
}
}
}

View File

@@ -165,8 +165,9 @@ impl MetricsService {
.ok()
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
.and_then(|v| {
v.get("snapshots")
.and_then(|s| serde_json::from_value::<Vec<MetricsSnapshot>>(s.clone()).ok())
v.get("snapshots").and_then(|s| {
serde_json::from_value::<Vec<MetricsSnapshot>>(s.clone()).ok()
})
})
.unwrap_or_default()
} else {
@@ -218,7 +219,9 @@ impl MetricsService {
if let Some(code) = error_code {
*inner.error_codes.entry(code.to_string()).or_insert(0) += 1;
}
inner.totals.record(latency_ms, success, bytes_in, bytes_out);
inner
.totals
.record(latency_ms, success, bytes_in, bytes_out);
}
pub fn get_current_stats(&self) -> Value {

View File

@@ -1,9 +1,11 @@
pub mod access_logging;
pub mod gc;
pub mod lifecycle;
pub mod integrity;
pub mod lifecycle;
pub mod metrics;
pub mod replication;
pub mod s3_client;
pub mod site_registry;
pub mod site_sync;
pub mod system_metrics;
pub mod website_domains;

View File

@@ -8,6 +8,7 @@ use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use myfsio_common::types::ListParams;
use myfsio_storage::fs_backend::FsStorageBackend;
use myfsio_storage::traits::StorageEngine;
@@ -124,7 +125,10 @@ impl ReplicationFailureStore {
}
let trimmed = &failures[..failures.len().min(self.max_failures_per_bucket)];
let data = serde_json::json!({ "failures": trimmed });
let _ = std::fs::write(&path, serde_json::to_string_pretty(&data).unwrap_or_default());
let _ = std::fs::write(
&path,
serde_json::to_string_pretty(&data).unwrap_or_default(),
);
}
pub fn load(&self, bucket: &str) -> Vec<ReplicationFailure> {
@@ -148,7 +152,10 @@ impl ReplicationFailureStore {
pub fn add(&self, bucket: &str, failure: ReplicationFailure) {
let mut failures = self.load(bucket);
if let Some(existing) = failures.iter_mut().find(|f| f.object_key == failure.object_key) {
if let Some(existing) = failures
.iter_mut()
.find(|f| f.object_key == failure.object_key)
{
existing.failure_count += 1;
existing.timestamp = failure.timestamp;
existing.error_message = failure.error_message.clone();
@@ -318,7 +325,101 @@ impl ReplicationManager {
let manager = self.clone();
tokio::spawn(async move {
let _permit = permit;
manager.replicate_task(&bucket, &key, &rule, &connection, &action).await;
manager
.replicate_task(&bucket, &key, &rule, &connection, &action)
.await;
});
}
pub async fn replicate_existing_objects(self: Arc<Self>, bucket: String) -> usize {
let rule = match self.get_rule(&bucket) {
Some(r) if r.enabled => r,
_ => return 0,
};
let connection = match self.connections.get(&rule.target_connection_id) {
Some(c) => c,
None => {
tracing::warn!(
"Cannot replicate existing objects for {}: connection {} not found",
bucket,
rule.target_connection_id
);
return 0;
}
};
if !self.check_endpoint(&connection).await {
tracing::warn!(
"Cannot replicate existing objects for {}: endpoint {} is unreachable",
bucket,
connection.endpoint_url
);
return 0;
}
let mut continuation_token: Option<String> = None;
let mut submitted = 0usize;
loop {
let page = match self
.storage
.list_objects(
&bucket,
&ListParams {
max_keys: 1000,
continuation_token: continuation_token.clone(),
prefix: rule.filter_prefix.clone(),
start_after: None,
},
)
.await
{
Ok(page) => page,
Err(err) => {
tracing::error!(
"Failed to list existing objects for replication in {}: {}",
bucket,
err
);
break;
}
};
let next_token = page.next_continuation_token.clone();
let is_truncated = page.is_truncated;
for object in page.objects {
submitted += 1;
self.clone()
.trigger(bucket.clone(), object.key, "write".to_string())
.await;
}
if !is_truncated {
break;
}
continuation_token = next_token;
if continuation_token.is_none() {
break;
}
}
submitted
}
pub fn schedule_existing_objects_sync(self: Arc<Self>, bucket: String) {
tokio::spawn(async move {
let submitted = self
.clone()
.replicate_existing_objects(bucket.clone())
.await;
if submitted > 0 {
tracing::info!(
"Scheduled {} existing object(s) for replication in {}",
submitted,
bucket
);
}
});
}
@@ -330,7 +431,8 @@ impl ReplicationManager {
conn: &RemoteConnection,
action: &str,
) {
if object_key.contains("..") || object_key.starts_with('/') || object_key.starts_with('\\') {
if object_key.contains("..") || object_key.starts_with('/') || object_key.starts_with('\\')
{
tracing::error!("Invalid object key (path traversal): {}", object_key);
return;
}
@@ -358,7 +460,12 @@ impl ReplicationManager {
}
Err(err) => {
let msg = format!("{:?}", err);
tracing::error!("Replication DELETE failed {}/{}: {}", bucket, object_key, msg);
tracing::error!(
"Replication DELETE failed {}/{}: {}",
bucket,
object_key,
msg
);
self.failures.add(
bucket,
ReplicationFailure {
@@ -414,16 +521,18 @@ impl ReplicationManager {
.send()
.await
{
Ok(_) | Err(_) => upload_object(
&client,
&rule.target_bucket,
object_key,
&src_path,
file_size,
self.streaming_threshold_bytes,
content_type.as_deref(),
)
.await,
Ok(_) | Err(_) => {
upload_object(
&client,
&rule.target_bucket,
object_key,
&src_path,
file_size,
self.streaming_threshold_bytes,
content_type.as_deref(),
)
.await
}
}
}
other => other,
@@ -577,9 +686,9 @@ async fn upload_object(
)))
})?
} else {
let bytes = tokio::fs::read(path).await.map_err(|e| {
aws_sdk_s3::error::SdkError::construction_failure(Box::new(e))
})?;
let bytes = tokio::fs::read(path)
.await
.map_err(|e| aws_sdk_s3::error::SdkError::construction_failure(Box::new(e)))?;
ByteStream::from(bytes)
};

View File

@@ -37,8 +37,8 @@ pub fn build_client(connection: &RemoteConnection, options: &ClientOptions) -> C
.read_timeout(options.read_timeout)
.build();
let retry_config = aws_smithy_types::retry::RetryConfig::standard()
.with_max_attempts(options.max_attempts);
let retry_config =
aws_smithy_types::retry::RetryConfig::standard().with_max_attempts(options.max_attempts);
let config = aws_sdk_s3::config::Builder::new()
.behavior_version(BehaviorVersion::latest())

View File

@@ -102,7 +102,12 @@ impl SiteRegistry {
}
pub fn get_peer(&self, site_id: &str) -> Option<PeerSite> {
self.data.read().peers.iter().find(|p| p.site_id == site_id).cloned()
self.data
.read()
.peers
.iter()
.find(|p| p.site_id == site_id)
.cloned()
}
pub fn add_peer(&self, peer: PeerSite) {

View File

@@ -102,7 +102,10 @@ impl SiteSyncWorker {
}
pub async fn run(self: Arc<Self>) {
tracing::info!("Site sync worker started (interval={}s)", self.interval.as_secs());
tracing::info!(
"Site sync worker started (interval={}s)",
self.interval.as_secs()
);
loop {
tokio::select! {
_ = tokio::time::sleep(self.interval) => {}
@@ -309,11 +312,10 @@ impl SiteSyncWorker {
let resp = match req.send().await {
Ok(r) => r,
Err(err) => {
let msg = format!("{:?}", err);
if msg.contains("NoSuchBucket") {
if is_not_found_error(&err) {
return Ok(result);
}
return Err(msg);
return Err(format!("{:?}", err));
}
};
for obj in resp.contents() {
@@ -409,11 +411,9 @@ impl SiteSyncWorker {
}
};
let metadata: Option<HashMap<String, String>> = head.metadata().map(|m| {
m.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
});
let metadata: Option<HashMap<String, String>> = head
.metadata()
.map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect());
let stream = resp.body.into_async_read();
let boxed: Pin<Box<dyn AsyncRead + Send>> = Box::pin(stream);
@@ -428,7 +428,12 @@ impl SiteSyncWorker {
true
}
Err(err) => {
tracing::error!("Store pulled object failed {}/{}: {}", local_bucket, key, err);
tracing::error!(
"Store pulled object failed {}/{}: {}",
local_bucket,
key,
err
);
false
}
}
@@ -483,3 +488,11 @@ fn now_secs() -> f64 {
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
fn is_not_found_error<E: std::fmt::Debug>(err: &aws_sdk_s3::error::SdkError<E>) -> bool {
let msg = format!("{:?}", err);
msg.contains("NoSuchBucket")
|| msg.contains("code: Some(\"NotFound\")")
|| msg.contains("code: Some(\"NoSuchBucket\")")
|| msg.contains("status: 404")
}

View File

@@ -0,0 +1,203 @@
use chrono::{DateTime, Utc};
use myfsio_storage::fs_backend::FsStorageBackend;
use myfsio_storage::traits::StorageEngine;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sysinfo::{Disks, System};
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct SystemMetricsConfig {
pub interval_minutes: u64,
pub retention_hours: u64,
}
impl Default for SystemMetricsConfig {
fn default() -> Self {
Self {
interval_minutes: 5,
retention_hours: 24,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetricsSnapshot {
pub timestamp: DateTime<Utc>,
pub cpu_percent: f64,
pub memory_percent: f64,
pub disk_percent: f64,
pub storage_bytes: u64,
}
pub struct SystemMetricsService {
storage_root: PathBuf,
storage: Arc<FsStorageBackend>,
config: SystemMetricsConfig,
history: Arc<RwLock<Vec<SystemMetricsSnapshot>>>,
history_path: PathBuf,
}
impl SystemMetricsService {
pub fn new(
storage_root: &Path,
storage: Arc<FsStorageBackend>,
config: SystemMetricsConfig,
) -> Self {
let history_path = storage_root
.join(".myfsio.sys")
.join("config")
.join("metrics_history.json");
let mut history = if history_path.exists() {
std::fs::read_to_string(&history_path)
.ok()
.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok())
.and_then(|v| {
v.get("history").and_then(|h| {
serde_json::from_value::<Vec<SystemMetricsSnapshot>>(h.clone()).ok()
})
})
.unwrap_or_default()
} else {
Vec::new()
};
prune_history(&mut history, config.retention_hours);
Self {
storage_root: storage_root.to_path_buf(),
storage,
config,
history: Arc::new(RwLock::new(history)),
history_path,
}
}
pub async fn get_history(&self, hours: Option<u64>) -> Vec<SystemMetricsSnapshot> {
let mut history = self.history.read().await.clone();
prune_history(&mut history, hours.unwrap_or(self.config.retention_hours));
history
}
async fn take_snapshot(&self) {
let snapshot = collect_snapshot(&self.storage_root, &self.storage).await;
let mut history = self.history.write().await;
history.push(snapshot);
prune_history(&mut history, self.config.retention_hours);
drop(history);
self.save_history().await;
}
async fn save_history(&self) {
let history = self.history.read().await;
let data = json!({ "history": *history });
if let Some(parent) = self.history_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(
&self.history_path,
serde_json::to_string_pretty(&data).unwrap_or_default(),
);
}
pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
let interval =
std::time::Duration::from_secs(self.config.interval_minutes.saturating_mul(60));
tokio::spawn(async move {
self.take_snapshot().await;
let mut timer = tokio::time::interval(interval);
loop {
timer.tick().await;
self.take_snapshot().await;
}
})
}
}
fn prune_history(history: &mut Vec<SystemMetricsSnapshot>, retention_hours: u64) {
let cutoff = Utc::now() - chrono::Duration::hours(retention_hours as i64);
history.retain(|item| item.timestamp > cutoff);
}
fn sample_system_now() -> (f64, f64) {
let mut system = System::new();
system.refresh_cpu_usage();
std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
system.refresh_cpu_usage();
system.refresh_memory();
let cpu_percent = system.global_cpu_usage() as f64;
let memory_percent = if system.total_memory() > 0 {
(system.used_memory() as f64 / system.total_memory() as f64) * 100.0
} else {
0.0
};
(cpu_percent, memory_percent)
}
fn normalize_path_for_mount(path: &Path) -> String {
let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
let raw = canonical.to_string_lossy().to_string();
let stripped = raw.strip_prefix(r"\\?\").unwrap_or(&raw);
stripped.to_lowercase()
}
fn sample_disk(path: &Path) -> (u64, u64) {
let disks = Disks::new_with_refreshed_list();
let path_str = normalize_path_for_mount(path);
let mut best: Option<(usize, u64, u64)> = None;
for disk in disks.list() {
let mount_raw = disk.mount_point().to_string_lossy().to_string();
let mount = mount_raw
.strip_prefix(r"\\?\")
.unwrap_or(&mount_raw)
.to_lowercase();
let total = disk.total_space();
let free = disk.available_space();
if path_str.starts_with(&mount) {
let len = mount.len();
match best {
Some((best_len, _, _)) if len <= best_len => {}
_ => best = Some((len, total, free)),
}
}
}
best.map(|(_, total, free)| (total, free)).unwrap_or((0, 0))
}
async fn collect_snapshot(
storage_root: &Path,
storage: &Arc<FsStorageBackend>,
) -> SystemMetricsSnapshot {
let (cpu_percent, memory_percent) = sample_system_now();
let (disk_total, disk_free) = sample_disk(storage_root);
let disk_percent = if disk_total > 0 {
((disk_total - disk_free) as f64 / disk_total as f64) * 100.0
} else {
0.0
};
let mut storage_bytes = 0u64;
let buckets = storage.list_buckets().await.unwrap_or_default();
for bucket in buckets {
if let Ok(stats) = storage.bucket_stats(&bucket.name).await {
storage_bytes += stats.total_bytes();
}
}
SystemMetricsSnapshot {
timestamp: Utc::now(),
cpu_percent: round2(cpu_percent),
memory_percent: round2(memory_percent),
disk_percent: round2(disk_percent),
storage_bytes,
}
}
fn round2(value: f64) -> f64 {
(value * 100.0).round() / 100.0
}

View File

@@ -64,7 +64,10 @@ impl WebsiteDomainStore {
}
pub fn set_mapping(&self, domain: &str, bucket: &str) {
self.data.write().mappings.insert(domain.to_string(), bucket.to_string());
self.data
.write()
.mappings
.insert(domain.to_string(), bucket.to_string());
self.save();
}