Compare commits
3 Commits
ad7b2a02cb
...
476b9bd2e4
| Author | SHA1 | Date | |
|---|---|---|---|
| 476b9bd2e4 | |||
| c2ef37b84e | |||
| be8e030940 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -27,11 +27,11 @@ dist/
|
||||
.eggs/
|
||||
|
||||
# Rust / maturin build artifacts
|
||||
myfsio_core/target/
|
||||
myfsio_core/Cargo.lock
|
||||
python/myfsio_core/target/
|
||||
python/myfsio_core/Cargo.lock
|
||||
|
||||
# Rust engine build artifacts
|
||||
myfsio-engine/target/
|
||||
rust/myfsio-engine/target/
|
||||
|
||||
# Local runtime artifacts
|
||||
logs/
|
||||
|
||||
393
README.md
393
README.md
@@ -1,255 +1,212 @@
|
||||
# MyFSIO
|
||||
|
||||
A lightweight, S3-compatible object storage system built with Flask. MyFSIO implements core AWS S3 REST API operations with filesystem-backed storage, making it ideal for local development, testing, and self-hosted storage scenarios.
|
||||
MyFSIO is an S3-compatible object storage server with a Rust runtime and a filesystem-backed storage engine. The active server lives under `rust/myfsio-engine` and serves both the S3 API and the built-in web UI from a single process.
|
||||
|
||||
The repository still contains a `python/` tree, but you do not need Python to run the current server.
|
||||
|
||||
## Features
|
||||
|
||||
**Core Storage**
|
||||
- S3-compatible REST API with AWS Signature Version 4 authentication
|
||||
- Bucket and object CRUD operations
|
||||
- Object versioning with version history
|
||||
- Multipart uploads for large files
|
||||
- Presigned URLs (1 second to 7 days validity)
|
||||
- S3-compatible REST API with Signature Version 4 authentication
|
||||
- Browser UI for buckets, objects, IAM users, policies, replication, metrics, and site administration
|
||||
- Filesystem-backed storage rooted at `data/`
|
||||
- Bucket versioning, multipart uploads, presigned URLs, CORS, object and bucket tagging
|
||||
- Server-side encryption and built-in KMS support
|
||||
- Optional background services for lifecycle, garbage collection, integrity scanning, operation metrics, and system metrics history
|
||||
- Replication, site sync, and static website hosting support
|
||||
|
||||
**Security & Access Control**
|
||||
- IAM users with access key management and rotation
|
||||
- Bucket policies (AWS Policy Version 2012-10-17)
|
||||
- Server-side encryption (SSE-S3 and SSE-KMS)
|
||||
- Built-in Key Management Service (KMS)
|
||||
- Rate limiting per endpoint
|
||||
## Runtime Model
|
||||
|
||||
**Advanced Features**
|
||||
- Cross-bucket replication to remote S3-compatible endpoints
|
||||
- Hot-reload for bucket policies (no restart required)
|
||||
- CORS configuration per bucket
|
||||
MyFSIO now runs as one Rust process:
|
||||
|
||||
**Management UI**
|
||||
- Web console for bucket and object management
|
||||
- IAM dashboard for user administration
|
||||
- Inline JSON policy editor with presets
|
||||
- Object browser with folder navigation and bulk operations
|
||||
- Dark mode support
|
||||
- API listener on `HOST` + `PORT` (default `127.0.0.1:5000`)
|
||||
- UI listener on `HOST` + `UI_PORT` (default `127.0.0.1:5100`)
|
||||
- Shared state for storage, IAM, policies, sessions, metrics, and background workers
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
+------------------+ +------------------+
|
||||
| API Server | | UI Server |
|
||||
| (port 5000) | | (port 5100) |
|
||||
| | | |
|
||||
| - S3 REST API |<------->| - Web Console |
|
||||
| - SigV4 Auth | | - IAM Dashboard |
|
||||
| - Presign URLs | | - Bucket Editor |
|
||||
+--------+---------+ +------------------+
|
||||
|
|
||||
v
|
||||
+------------------+ +------------------+
|
||||
| Object Storage | | System Metadata |
|
||||
| (filesystem) | | (.myfsio.sys/) |
|
||||
| | | |
|
||||
| data/<bucket>/ | | - IAM config |
|
||||
| <objects> | | - Bucket policies|
|
||||
| | | - Encryption keys|
|
||||
+------------------+ +------------------+
|
||||
```
|
||||
If you want API-only mode, set `UI_ENABLED=false`. There is no separate "UI-only" runtime anymore.
|
||||
|
||||
## Quick Start
|
||||
|
||||
From the repository root:
|
||||
|
||||
```bash
|
||||
# Clone and setup
|
||||
git clone https://gitea.jzwsite.com/kqjy/MyFSIO
|
||||
cd s3
|
||||
python -m venv .venv
|
||||
|
||||
# Activate virtual environment
|
||||
# Windows PowerShell:
|
||||
.\.venv\Scripts\Activate.ps1
|
||||
# Windows CMD:
|
||||
.venv\Scripts\activate.bat
|
||||
# Linux/macOS:
|
||||
source .venv/bin/activate
|
||||
|
||||
# Install dependencies
|
||||
pip install -r requirements.txt
|
||||
|
||||
# (Optional) Build Rust native extension for better performance
|
||||
# Requires Rust toolchain: https://rustup.rs
|
||||
pip install maturin
|
||||
cd myfsio_core && maturin develop --release && cd ..
|
||||
|
||||
# Start both servers
|
||||
python run.py
|
||||
|
||||
# Or start individually
|
||||
python run.py --mode api # API only (port 5000)
|
||||
python run.py --mode ui # UI only (port 5100)
|
||||
cd rust/myfsio-engine
|
||||
cargo run -p myfsio-server --
|
||||
```
|
||||
|
||||
**Credentials:** Generated automatically on first run and printed to the console. If missed, check the IAM config file at `<STORAGE_ROOT>/.myfsio.sys/config/iam.json`.
|
||||
Useful URLs:
|
||||
|
||||
- **Web Console:** http://127.0.0.1:5100/ui
|
||||
- **API Endpoint:** http://127.0.0.1:5000
|
||||
- UI: `http://127.0.0.1:5100/ui`
|
||||
- API: `http://127.0.0.1:5000/`
|
||||
- Health: `http://127.0.0.1:5000/myfsio/health`
|
||||
|
||||
On first boot, MyFSIO creates `data/.myfsio.sys/config/iam.json` and prints the generated admin access key and secret key to the console.
|
||||
|
||||
### Common CLI commands
|
||||
|
||||
```bash
|
||||
# Show resolved configuration
|
||||
cargo run -p myfsio-server -- --show-config
|
||||
|
||||
# Validate configuration and exit non-zero on critical issues
|
||||
cargo run -p myfsio-server -- --check-config
|
||||
|
||||
# Reset admin credentials
|
||||
cargo run -p myfsio-server -- --reset-cred
|
||||
|
||||
# API only
|
||||
UI_ENABLED=false cargo run -p myfsio-server --
|
||||
```
|
||||
|
||||
## Building a Binary
|
||||
|
||||
```bash
|
||||
cd rust/myfsio-engine
|
||||
cargo build --release -p myfsio-server
|
||||
```
|
||||
|
||||
Binary locations:
|
||||
|
||||
- Linux/macOS: `rust/myfsio-engine/target/release/myfsio-server`
|
||||
- Windows: `rust/myfsio-engine/target/release/myfsio-server.exe`
|
||||
|
||||
Run the built binary directly:
|
||||
|
||||
```bash
|
||||
./target/release/myfsio-server
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
The server reads environment variables from the process environment and also loads, when present:
|
||||
|
||||
- `/opt/myfsio/myfsio.env`
|
||||
- `.env`
|
||||
- `myfsio.env`
|
||||
|
||||
Core settings:
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `STORAGE_ROOT` | `./data` | Filesystem root for bucket storage |
|
||||
| `IAM_CONFIG` | `.myfsio.sys/config/iam.json` | IAM user and policy store |
|
||||
| `BUCKET_POLICY_PATH` | `.myfsio.sys/config/bucket_policies.json` | Bucket policy store |
|
||||
| `API_BASE_URL` | `http://127.0.0.1:5000` | API endpoint for UI calls |
|
||||
| `MAX_UPLOAD_SIZE` | `1073741824` | Maximum upload size in bytes (1 GB) |
|
||||
| `MULTIPART_MIN_PART_SIZE` | `5242880` | Minimum multipart part size (5 MB) |
|
||||
| `UI_PAGE_SIZE` | `100` | Default page size for listings |
|
||||
| `SECRET_KEY` | `dev-secret-key` | Flask session secret |
|
||||
| `AWS_REGION` | `us-east-1` | Region for SigV4 signing |
|
||||
| `AWS_SERVICE` | `s3` | Service name for SigV4 signing |
|
||||
| `ENCRYPTION_ENABLED` | `false` | Enable server-side encryption |
|
||||
| `KMS_ENABLED` | `false` | Enable Key Management Service |
|
||||
| `LOG_LEVEL` | `INFO` | Logging verbosity |
|
||||
| `SIGV4_TIMESTAMP_TOLERANCE_SECONDS` | `900` | Max time skew for SigV4 requests |
|
||||
| `PRESIGNED_URL_MAX_EXPIRY_SECONDS` | `604800` | Max presigned URL expiry (7 days) |
|
||||
| `REPLICATION_CONNECT_TIMEOUT_SECONDS` | `5` | Replication connection timeout |
|
||||
| `SITE_SYNC_ENABLED` | `false` | Enable bi-directional site sync |
|
||||
| `OBJECT_TAG_LIMIT` | `50` | Maximum tags per object |
|
||||
| --- | --- | --- |
|
||||
| `HOST` | `127.0.0.1` | Bind address for API and UI listeners |
|
||||
| `PORT` | `5000` | API port |
|
||||
| `UI_PORT` | `5100` | UI port |
|
||||
| `UI_ENABLED` | `true` | Disable to run API-only |
|
||||
| `STORAGE_ROOT` | `./data` | Root directory for buckets and system metadata |
|
||||
| `IAM_CONFIG` | `<STORAGE_ROOT>/.myfsio.sys/config/iam.json` | IAM config path |
|
||||
| `API_BASE_URL` | unset | Public API base used by the UI and presigned URL generation |
|
||||
| `AWS_REGION` | `us-east-1` | Region used in SigV4 scope |
|
||||
| `SIGV4_TIMESTAMP_TOLERANCE_SECONDS` | `900` | Allowed request time skew |
|
||||
| `PRESIGNED_URL_MIN_EXPIRY_SECONDS` | `1` | Minimum presigned URL expiry |
|
||||
| `PRESIGNED_URL_MAX_EXPIRY_SECONDS` | `604800` | Maximum presigned URL expiry |
|
||||
| `SECRET_KEY` | loaded from `.myfsio.sys/config/.secret` if present | Session signing key and IAM-at-rest encryption key |
|
||||
| `ADMIN_ACCESS_KEY` | unset | Optional first-run or reset access key |
|
||||
| `ADMIN_SECRET_KEY` | unset | Optional first-run or reset secret key |
|
||||
|
||||
Feature toggles:
|
||||
|
||||
| Variable | Default |
|
||||
| --- | --- |
|
||||
| `ENCRYPTION_ENABLED` | `false` |
|
||||
| `KMS_ENABLED` | `false` |
|
||||
| `GC_ENABLED` | `false` |
|
||||
| `INTEGRITY_ENABLED` | `false` |
|
||||
| `LIFECYCLE_ENABLED` | `false` |
|
||||
| `METRICS_HISTORY_ENABLED` | `false` |
|
||||
| `OPERATION_METRICS_ENABLED` | `false` |
|
||||
| `WEBSITE_HOSTING_ENABLED` | `false` |
|
||||
| `SITE_SYNC_ENABLED` | `false` |
|
||||
|
||||
Metrics and replication tuning:
|
||||
|
||||
| Variable | Default |
|
||||
| --- | --- |
|
||||
| `OPERATION_METRICS_INTERVAL_MINUTES` | `5` |
|
||||
| `OPERATION_METRICS_RETENTION_HOURS` | `24` |
|
||||
| `METRICS_HISTORY_INTERVAL_MINUTES` | `5` |
|
||||
| `METRICS_HISTORY_RETENTION_HOURS` | `24` |
|
||||
| `REPLICATION_CONNECT_TIMEOUT_SECONDS` | `5` |
|
||||
| `REPLICATION_READ_TIMEOUT_SECONDS` | `30` |
|
||||
| `REPLICATION_MAX_RETRIES` | `2` |
|
||||
| `REPLICATION_STREAMING_THRESHOLD_BYTES` | `10485760` |
|
||||
| `REPLICATION_MAX_FAILURES_PER_BUCKET` | `50` |
|
||||
| `SITE_SYNC_INTERVAL_SECONDS` | `60` |
|
||||
| `SITE_SYNC_BATCH_SIZE` | `100` |
|
||||
| `SITE_SYNC_CONNECT_TIMEOUT_SECONDS` | `10` |
|
||||
| `SITE_SYNC_READ_TIMEOUT_SECONDS` | `120` |
|
||||
| `SITE_SYNC_MAX_RETRIES` | `2` |
|
||||
| `SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS` | `1.0` |
|
||||
|
||||
UI asset overrides:
|
||||
|
||||
| Variable | Default |
|
||||
| --- | --- |
|
||||
| `TEMPLATES_DIR` | built-in crate templates directory |
|
||||
| `STATIC_DIR` | built-in crate static directory |
|
||||
|
||||
See [docs.md](./docs.md) for the full Rust-side operations guide.
|
||||
|
||||
## Data Layout
|
||||
|
||||
```
|
||||
```text
|
||||
data/
|
||||
├── <bucket>/ # User buckets with objects
|
||||
└── .myfsio.sys/ # System metadata
|
||||
├── config/
|
||||
│ ├── iam.json # IAM users and policies
|
||||
│ ├── bucket_policies.json # Bucket policies
|
||||
│ ├── replication_rules.json
|
||||
│ └── connections.json # Remote S3 connections
|
||||
├── buckets/<bucket>/
|
||||
│ ├── meta/ # Object metadata (.meta.json)
|
||||
│ ├── versions/ # Archived object versions
|
||||
│ └── .bucket.json # Bucket config (versioning, CORS)
|
||||
├── multipart/ # Active multipart uploads
|
||||
└── keys/ # Encryption keys (SSE-S3/KMS)
|
||||
<bucket>/
|
||||
.myfsio.sys/
|
||||
config/
|
||||
iam.json
|
||||
bucket_policies.json
|
||||
connections.json
|
||||
operation_metrics.json
|
||||
metrics_history.json
|
||||
buckets/<bucket>/
|
||||
meta/
|
||||
versions/
|
||||
multipart/
|
||||
keys/
|
||||
```
|
||||
|
||||
## API Reference
|
||||
|
||||
All endpoints require AWS Signature Version 4 authentication unless using presigned URLs or public bucket policies.
|
||||
|
||||
### Bucket Operations
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| `GET` | `/` | List all buckets |
|
||||
| `PUT` | `/<bucket>` | Create bucket |
|
||||
| `DELETE` | `/<bucket>` | Delete bucket (must be empty) |
|
||||
| `HEAD` | `/<bucket>` | Check bucket exists |
|
||||
|
||||
### Object Operations
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| `GET` | `/<bucket>` | List objects (supports `list-type=2`) |
|
||||
| `PUT` | `/<bucket>/<key>` | Upload object |
|
||||
| `GET` | `/<bucket>/<key>` | Download object |
|
||||
| `DELETE` | `/<bucket>/<key>` | Delete object |
|
||||
| `HEAD` | `/<bucket>/<key>` | Get object metadata |
|
||||
| `POST` | `/<bucket>/<key>?uploads` | Initiate multipart upload |
|
||||
| `PUT` | `/<bucket>/<key>?partNumber=N&uploadId=X` | Upload part |
|
||||
| `POST` | `/<bucket>/<key>?uploadId=X` | Complete multipart upload |
|
||||
| `DELETE` | `/<bucket>/<key>?uploadId=X` | Abort multipart upload |
|
||||
|
||||
### Bucket Policies (S3-compatible)
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| `GET` | `/<bucket>?policy` | Get bucket policy |
|
||||
| `PUT` | `/<bucket>?policy` | Set bucket policy |
|
||||
| `DELETE` | `/<bucket>?policy` | Delete bucket policy |
|
||||
|
||||
### Versioning
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| `GET` | `/<bucket>/<key>?versionId=X` | Get specific version |
|
||||
| `DELETE` | `/<bucket>/<key>?versionId=X` | Delete specific version |
|
||||
| `GET` | `/<bucket>?versions` | List object versions |
|
||||
|
||||
### Health Check
|
||||
|
||||
| Method | Endpoint | Description |
|
||||
|--------|----------|-------------|
|
||||
| `GET` | `/myfsio/health` | Health check endpoint |
|
||||
|
||||
## IAM & Access Control
|
||||
|
||||
### Users and Access Keys
|
||||
|
||||
On first run, MyFSIO creates a default admin user (`localadmin`/`localadmin`). Use the IAM dashboard to:
|
||||
|
||||
- Create and delete users
|
||||
- Generate and rotate access keys
|
||||
- Attach inline policies to users
|
||||
- Control IAM management permissions
|
||||
|
||||
### Bucket Policies
|
||||
|
||||
Bucket policies follow AWS policy grammar (Version `2012-10-17`) with support for:
|
||||
|
||||
- Principal-based access (`*` for anonymous, specific users)
|
||||
- Action-based permissions (`s3:GetObject`, `s3:PutObject`, etc.)
|
||||
- Resource patterns (`arn:aws:s3:::bucket/*`)
|
||||
- Condition keys
|
||||
|
||||
**Policy Presets:**
|
||||
- **Public:** Grants anonymous read access (`s3:GetObject`, `s3:ListBucket`)
|
||||
- **Private:** Removes bucket policy (IAM-only access)
|
||||
- **Custom:** Manual policy editing with draft preservation
|
||||
|
||||
Policies hot-reload when the JSON file changes.
|
||||
|
||||
## Server-Side Encryption
|
||||
|
||||
MyFSIO supports two encryption modes:
|
||||
|
||||
- **SSE-S3:** Server-managed keys with automatic key rotation
|
||||
- **SSE-KMS:** Customer-managed keys via built-in KMS
|
||||
|
||||
Enable encryption with:
|
||||
```bash
|
||||
ENCRYPTION_ENABLED=true python run.py
|
||||
```
|
||||
|
||||
## Cross-Bucket Replication
|
||||
|
||||
Replicate objects to remote S3-compatible endpoints:
|
||||
|
||||
1. Configure remote connections in the UI
|
||||
2. Create replication rules specifying source/destination
|
||||
3. Objects are automatically replicated on upload
|
||||
|
||||
## Docker
|
||||
|
||||
Build the Rust image from the `rust/` directory:
|
||||
|
||||
```bash
|
||||
docker build -t myfsio .
|
||||
docker run -p 5000:5000 -p 5100:5100 -v ./data:/app/data myfsio
|
||||
docker build -t myfsio ./rust
|
||||
docker run --rm -p 5000:5000 -p 5100:5100 -v "${PWD}/data:/app/data" myfsio
|
||||
```
|
||||
|
||||
If the instance sits behind a reverse proxy, set `API_BASE_URL` to the public S3 endpoint.
|
||||
|
||||
## Linux Installation
|
||||
|
||||
The repository includes `scripts/install.sh` for systemd-style Linux installs. Build the Rust binary first, then pass it to the installer:
|
||||
|
||||
```bash
|
||||
cd rust/myfsio-engine
|
||||
cargo build --release -p myfsio-server
|
||||
|
||||
cd ../..
|
||||
sudo ./scripts/install.sh --binary ./rust/myfsio-engine/target/release/myfsio-server
|
||||
```
|
||||
|
||||
The installer copies the binary into `/opt/myfsio/myfsio`, writes `/opt/myfsio/myfsio.env`, and can register a `myfsio.service` unit.
|
||||
|
||||
## Testing
|
||||
|
||||
Run the Rust test suite from the workspace:
|
||||
|
||||
```bash
|
||||
# Run all tests
|
||||
pytest tests/ -v
|
||||
|
||||
# Run specific test file
|
||||
pytest tests/test_api.py -v
|
||||
|
||||
# Run with coverage
|
||||
pytest tests/ --cov=app --cov-report=html
|
||||
cd rust/myfsio-engine
|
||||
cargo test
|
||||
```
|
||||
|
||||
## References
|
||||
## Health Check
|
||||
|
||||
- [Amazon S3 Documentation](https://docs.aws.amazon.com/s3/)
|
||||
- [AWS Signature Version 4](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)
|
||||
- [S3 Bucket Policy Examples](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-bucket-policies.html)
|
||||
`GET /myfsio/health` returns:
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "ok",
|
||||
"version": "0.5.0"
|
||||
}
|
||||
```
|
||||
|
||||
The `version` field comes from the Rust crate version in `rust/myfsio-engine/crates/myfsio-server/Cargo.toml`.
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
ENGINE="${ENGINE:-rust}"
|
||||
|
||||
exec python run.py --prod --engine "$ENGINE"
|
||||
@@ -1,117 +0,0 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerConfig {
|
||||
pub bind_addr: SocketAddr,
|
||||
pub storage_root: PathBuf,
|
||||
pub region: String,
|
||||
pub iam_config_path: PathBuf,
|
||||
pub sigv4_timestamp_tolerance_secs: u64,
|
||||
pub presigned_url_min_expiry: u64,
|
||||
pub presigned_url_max_expiry: u64,
|
||||
pub secret_key: Option<String>,
|
||||
pub encryption_enabled: bool,
|
||||
pub kms_enabled: bool,
|
||||
pub gc_enabled: bool,
|
||||
pub integrity_enabled: bool,
|
||||
pub metrics_enabled: bool,
|
||||
pub lifecycle_enabled: bool,
|
||||
pub website_hosting_enabled: bool,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn from_env() -> Self {
|
||||
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let port: u16 = std::env::var("PORT")
|
||||
.unwrap_or_else(|_| "5000".to_string())
|
||||
.parse()
|
||||
.unwrap_or(5000);
|
||||
let storage_root = std::env::var("STORAGE_ROOT")
|
||||
.unwrap_or_else(|_| "./data".to_string());
|
||||
let region = std::env::var("AWS_REGION")
|
||||
.unwrap_or_else(|_| "us-east-1".to_string());
|
||||
|
||||
let storage_path = PathBuf::from(&storage_root);
|
||||
let iam_config_path = std::env::var("IAM_CONFIG")
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|_| {
|
||||
storage_path.join(".myfsio.sys").join("config").join("iam.json")
|
||||
});
|
||||
|
||||
let sigv4_timestamp_tolerance_secs: u64 = std::env::var("SIGV4_TIMESTAMP_TOLERANCE_SECONDS")
|
||||
.unwrap_or_else(|_| "900".to_string())
|
||||
.parse()
|
||||
.unwrap_or(900);
|
||||
|
||||
let presigned_url_min_expiry: u64 = std::env::var("PRESIGNED_URL_MIN_EXPIRY_SECONDS")
|
||||
.unwrap_or_else(|_| "1".to_string())
|
||||
.parse()
|
||||
.unwrap_or(1);
|
||||
|
||||
let presigned_url_max_expiry: u64 = std::env::var("PRESIGNED_URL_MAX_EXPIRY_SECONDS")
|
||||
.unwrap_or_else(|_| "604800".to_string())
|
||||
.parse()
|
||||
.unwrap_or(604800);
|
||||
|
||||
let secret_key = {
|
||||
let env_key = std::env::var("SECRET_KEY").ok();
|
||||
match env_key {
|
||||
Some(k) if !k.is_empty() && k != "dev-secret-key" => Some(k),
|
||||
_ => {
|
||||
let secret_file = storage_path
|
||||
.join(".myfsio.sys")
|
||||
.join("config")
|
||||
.join(".secret");
|
||||
std::fs::read_to_string(&secret_file).ok().map(|s| s.trim().to_string())
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let encryption_enabled = std::env::var("ENCRYPTION_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
let kms_enabled = std::env::var("KMS_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
let gc_enabled = std::env::var("GC_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
let integrity_enabled = std::env::var("INTEGRITY_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
let metrics_enabled = std::env::var("OPERATION_METRICS_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
let lifecycle_enabled = std::env::var("LIFECYCLE_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
let website_hosting_enabled = std::env::var("WEBSITE_HOSTING_ENABLED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase() == "true";
|
||||
|
||||
Self {
|
||||
bind_addr: SocketAddr::new(host.parse().unwrap(), port),
|
||||
storage_root: storage_path,
|
||||
region,
|
||||
iam_config_path,
|
||||
sigv4_timestamp_tolerance_secs,
|
||||
presigned_url_min_expiry,
|
||||
presigned_url_max_expiry,
|
||||
secret_key,
|
||||
encryption_enabled,
|
||||
kms_enabled,
|
||||
gc_enabled,
|
||||
integrity_enabled,
|
||||
metrics_enabled,
|
||||
lifecycle_enabled,
|
||||
website_hosting_enabled,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,704 +0,0 @@
|
||||
use axum::body::Body;
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::Extension;
|
||||
use myfsio_common::types::Principal;
|
||||
use myfsio_storage::traits::StorageEngine;
|
||||
|
||||
use crate::services::site_registry::{PeerSite, SiteInfo};
|
||||
use crate::services::website_domains::{is_valid_domain, normalize_domain};
|
||||
use crate::state::AppState;
|
||||
|
||||
fn json_response(status: StatusCode, value: serde_json::Value) -> Response {
|
||||
(
|
||||
status,
|
||||
[("content-type", "application/json")],
|
||||
value.to_string(),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
fn json_error(code: &str, message: &str, status: StatusCode) -> Response {
|
||||
json_response(
|
||||
status,
|
||||
serde_json::json!({"error": {"code": code, "message": message}}),
|
||||
)
|
||||
}
|
||||
|
||||
fn require_admin(principal: &Principal) -> Option<Response> {
|
||||
if !principal.is_admin {
|
||||
return Some(json_error("AccessDenied", "Admin access required", StatusCode::FORBIDDEN));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn read_json_body(body: Body) -> Option<serde_json::Value> {
|
||||
let bytes = http_body_util::BodyExt::collect(body).await.ok()?.to_bytes();
|
||||
serde_json::from_slice(&bytes).ok()
|
||||
}
|
||||
|
||||
fn validate_site_id(site_id: &str) -> Option<String> {
|
||||
if site_id.is_empty() || site_id.len() > 63 {
|
||||
return Some("site_id must be 1-63 characters".to_string());
|
||||
}
|
||||
let first = site_id.chars().next().unwrap();
|
||||
if !first.is_ascii_alphanumeric() {
|
||||
return Some("site_id must start with alphanumeric".to_string());
|
||||
}
|
||||
if !site_id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') {
|
||||
return Some("site_id must contain only alphanumeric, hyphens, underscores".to_string());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn validate_endpoint(endpoint: &str) -> Option<String> {
|
||||
if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
|
||||
return Some("Endpoint must be http or https URL".to_string());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn validate_region(region: &str) -> Option<String> {
|
||||
let re = regex::Regex::new(r"^[a-z]{2,}-[a-z]+-\d+$").unwrap();
|
||||
if !re.is_match(region) {
|
||||
return Some("Region must match format like us-east-1".to_string());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn validate_priority(priority: i64) -> Option<String> {
|
||||
if priority < 0 || priority > 1000 {
|
||||
return Some("Priority must be between 0 and 1000".to_string());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn get_local_site(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
|
||||
if let Some(ref registry) = state.site_registry {
|
||||
if let Some(local) = registry.get_local_site() {
|
||||
return json_response(StatusCode::OK, serde_json::to_value(&local).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
json_error("NotFound", "Local site not configured", StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
pub async fn update_local_site(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("InvalidRequest", "Site registry not available", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let payload = match read_json_body(body).await {
|
||||
Some(v) => v,
|
||||
None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let site_id = match payload.get("site_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => return json_error("ValidationError", "site_id is required", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
if let Some(err) = validate_site_id(&site_id) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
let endpoint = payload.get("endpoint").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||
if !endpoint.is_empty() {
|
||||
if let Some(err) = validate_endpoint(&endpoint) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(p) = payload.get("priority").and_then(|v| v.as_i64()) {
|
||||
if let Some(err) = validate_priority(p) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(r) = payload.get("region").and_then(|v| v.as_str()) {
|
||||
if let Some(err) = validate_region(r) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
let existing = registry.get_local_site();
|
||||
let site = SiteInfo {
|
||||
site_id: site_id.clone(),
|
||||
endpoint,
|
||||
region: payload.get("region").and_then(|v| v.as_str()).unwrap_or("us-east-1").to_string(),
|
||||
priority: payload.get("priority").and_then(|v| v.as_i64()).unwrap_or(100) as i32,
|
||||
display_name: payload.get("display_name").and_then(|v| v.as_str()).unwrap_or(&site_id).to_string(),
|
||||
created_at: existing.and_then(|e| e.created_at),
|
||||
};
|
||||
|
||||
registry.set_local_site(site.clone());
|
||||
json_response(StatusCode::OK, serde_json::to_value(&site).unwrap())
|
||||
}
|
||||
|
||||
pub async fn list_all_sites(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_response(StatusCode::OK, serde_json::json!({"local": null, "peers": [], "total_peers": 0})),
|
||||
};
|
||||
|
||||
let local = registry.get_local_site();
|
||||
let peers = registry.list_peers();
|
||||
|
||||
json_response(StatusCode::OK, serde_json::json!({
|
||||
"local": local,
|
||||
"peers": peers,
|
||||
"total_peers": peers.len(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn register_peer_site(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("InvalidRequest", "Site registry not available", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let payload = match read_json_body(body).await {
|
||||
Some(v) => v,
|
||||
None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let site_id = match payload.get("site_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => return json_error("ValidationError", "site_id is required", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
if let Some(err) = validate_site_id(&site_id) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
let endpoint = match payload.get("endpoint").and_then(|v| v.as_str()) {
|
||||
Some(e) => e.to_string(),
|
||||
None => return json_error("ValidationError", "endpoint is required", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
if let Some(err) = validate_endpoint(&endpoint) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
let region = payload.get("region").and_then(|v| v.as_str()).unwrap_or("us-east-1").to_string();
|
||||
if let Some(err) = validate_region(®ion) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
let priority = payload.get("priority").and_then(|v| v.as_i64()).unwrap_or(100);
|
||||
if let Some(err) = validate_priority(priority) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
if registry.get_peer(&site_id).is_some() {
|
||||
return json_error("AlreadyExists", &format!("Peer site '{}' already exists", site_id), StatusCode::CONFLICT);
|
||||
}
|
||||
|
||||
let peer = PeerSite {
|
||||
site_id: site_id.clone(),
|
||||
endpoint,
|
||||
region,
|
||||
priority: priority as i32,
|
||||
display_name: payload.get("display_name").and_then(|v| v.as_str()).unwrap_or(&site_id).to_string(),
|
||||
connection_id: payload.get("connection_id").and_then(|v| v.as_str()).map(|s| s.to_string()),
|
||||
created_at: Some(chrono::Utc::now().to_rfc3339()),
|
||||
is_healthy: false,
|
||||
last_health_check: None,
|
||||
};
|
||||
|
||||
registry.add_peer(peer.clone());
|
||||
json_response(StatusCode::CREATED, serde_json::to_value(&peer).unwrap())
|
||||
}
|
||||
|
||||
pub async fn get_peer_site(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(site_id): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
match registry.get_peer(&site_id) {
|
||||
Some(peer) => json_response(StatusCode::OK, serde_json::to_value(&peer).unwrap()),
|
||||
None => json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_peer_site(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(site_id): Path<String>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
let existing = match registry.get_peer(&site_id) {
|
||||
Some(p) => p,
|
||||
None => return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
let payload = match read_json_body(body).await {
|
||||
Some(v) => v,
|
||||
None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
if let Some(ep) = payload.get("endpoint").and_then(|v| v.as_str()) {
|
||||
if let Some(err) = validate_endpoint(ep) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
if let Some(p) = payload.get("priority").and_then(|v| v.as_i64()) {
|
||||
if let Some(err) = validate_priority(p) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
if let Some(r) = payload.get("region").and_then(|v| v.as_str()) {
|
||||
if let Some(err) = validate_region(r) {
|
||||
return json_error("ValidationError", &err, StatusCode::BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
let peer = PeerSite {
|
||||
site_id: site_id.clone(),
|
||||
endpoint: payload.get("endpoint").and_then(|v| v.as_str()).unwrap_or(&existing.endpoint).to_string(),
|
||||
region: payload.get("region").and_then(|v| v.as_str()).unwrap_or(&existing.region).to_string(),
|
||||
priority: payload.get("priority").and_then(|v| v.as_i64()).unwrap_or(existing.priority as i64) as i32,
|
||||
display_name: payload.get("display_name").and_then(|v| v.as_str()).unwrap_or(&existing.display_name).to_string(),
|
||||
connection_id: payload.get("connection_id").and_then(|v| v.as_str()).map(|s| s.to_string()).or(existing.connection_id),
|
||||
created_at: existing.created_at,
|
||||
is_healthy: existing.is_healthy,
|
||||
last_health_check: existing.last_health_check,
|
||||
};
|
||||
|
||||
registry.update_peer(peer.clone());
|
||||
json_response(StatusCode::OK, serde_json::to_value(&peer).unwrap())
|
||||
}
|
||||
|
||||
pub async fn delete_peer_site(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(site_id): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
if !registry.delete_peer(&site_id) {
|
||||
return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND);
|
||||
}
|
||||
StatusCode::NO_CONTENT.into_response()
|
||||
}
|
||||
|
||||
pub async fn check_peer_health(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(site_id): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
if registry.get_peer(&site_id).is_none() {
|
||||
return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, serde_json::json!({
|
||||
"site_id": site_id,
|
||||
"is_healthy": false,
|
||||
"error": "Health check not implemented in standalone mode",
|
||||
"checked_at": chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn get_topology(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_response(StatusCode::OK, serde_json::json!({"sites": [], "total": 0, "healthy_count": 0})),
|
||||
};
|
||||
|
||||
let local = registry.get_local_site();
|
||||
let peers = registry.list_peers();
|
||||
|
||||
let mut sites: Vec<serde_json::Value> = Vec::new();
|
||||
if let Some(l) = local {
|
||||
let mut v = serde_json::to_value(&l).unwrap();
|
||||
v.as_object_mut().unwrap().insert("is_local".to_string(), serde_json::json!(true));
|
||||
v.as_object_mut().unwrap().insert("is_healthy".to_string(), serde_json::json!(true));
|
||||
sites.push(v);
|
||||
}
|
||||
for p in &peers {
|
||||
let mut v = serde_json::to_value(p).unwrap();
|
||||
v.as_object_mut().unwrap().insert("is_local".to_string(), serde_json::json!(false));
|
||||
sites.push(v);
|
||||
}
|
||||
|
||||
sites.sort_by_key(|s| s.get("priority").and_then(|v| v.as_i64()).unwrap_or(100));
|
||||
|
||||
let healthy_count = sites.iter().filter(|s| s.get("is_healthy").and_then(|v| v.as_bool()).unwrap_or(false)).count();
|
||||
|
||||
json_response(StatusCode::OK, serde_json::json!({
|
||||
"sites": sites,
|
||||
"total": sites.len(),
|
||||
"healthy_count": healthy_count,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn check_bidirectional_status(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(site_id): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let registry = match &state.site_registry {
|
||||
Some(r) => r,
|
||||
None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
if registry.get_peer(&site_id).is_none() {
|
||||
return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
let local = registry.get_local_site();
|
||||
json_response(StatusCode::OK, serde_json::json!({
|
||||
"site_id": site_id,
|
||||
"local_site_id": local.as_ref().map(|l| &l.site_id),
|
||||
"local_endpoint": local.as_ref().map(|l| &l.endpoint),
|
||||
"local_bidirectional_rules": [],
|
||||
"local_site_sync_enabled": false,
|
||||
"remote_status": null,
|
||||
"issues": [{"code": "NOT_IMPLEMENTED", "message": "Bidirectional status check not implemented in standalone mode", "severity": "warning"}],
|
||||
"is_fully_configured": false,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn iam_list_users(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let users = state.iam.list_users().await;
|
||||
json_response(StatusCode::OK, serde_json::json!({"users": users}))
|
||||
}
|
||||
|
||||
pub async fn iam_get_user(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(identifier): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match state.iam.get_user(&identifier).await {
|
||||
Some(user) => json_response(StatusCode::OK, user),
|
||||
None => json_error("NotFound", &format!("User '{}' not found", identifier), StatusCode::NOT_FOUND),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn iam_get_user_policies(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(identifier): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match state.iam.get_user_policies(&identifier) {
|
||||
Some(policies) => json_response(StatusCode::OK, serde_json::json!({"policies": policies})),
|
||||
None => json_error("NotFound", &format!("User '{}' not found", identifier), StatusCode::NOT_FOUND),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn iam_create_access_key(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(identifier): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match state.iam.create_access_key(&identifier) {
|
||||
Ok(result) => json_response(StatusCode::CREATED, result),
|
||||
Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn iam_delete_access_key(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path((_identifier, access_key)): Path<(String, String)>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match state.iam.delete_access_key(&access_key) {
|
||||
Ok(()) => StatusCode::NO_CONTENT.into_response(),
|
||||
Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn iam_disable_user(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(identifier): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match state.iam.set_user_enabled(&identifier, false).await {
|
||||
Ok(()) => json_response(StatusCode::OK, serde_json::json!({"status": "disabled"})),
|
||||
Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn iam_enable_user(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(identifier): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match state.iam.set_user_enabled(&identifier, true).await {
|
||||
Ok(()) => json_response(StatusCode::OK, serde_json::json!({"status": "enabled"})),
|
||||
Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_website_domains(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let store = match &state.website_domains {
|
||||
Some(s) => s,
|
||||
None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
json_response(StatusCode::OK, serde_json::json!(store.list_all()))
|
||||
}
|
||||
|
||||
pub async fn create_website_domain(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let store = match &state.website_domains {
|
||||
Some(s) => s,
|
||||
None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let payload = match read_json_body(body).await {
|
||||
Some(v) => v,
|
||||
None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let domain = normalize_domain(payload.get("domain").and_then(|v| v.as_str()).unwrap_or(""));
|
||||
if domain.is_empty() {
|
||||
return json_error("ValidationError", "domain is required", StatusCode::BAD_REQUEST);
|
||||
}
|
||||
if !is_valid_domain(&domain) {
|
||||
return json_error("ValidationError", &format!("Invalid domain: '{}'", domain), StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
let bucket = payload.get("bucket").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
|
||||
if bucket.is_empty() {
|
||||
return json_error("ValidationError", "bucket is required", StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
match state.storage.bucket_exists(&bucket).await {
|
||||
Ok(true) => {}
|
||||
_ => return json_error("NoSuchBucket", &format!("Bucket '{}' does not exist", bucket), StatusCode::NOT_FOUND),
|
||||
}
|
||||
|
||||
if store.get_bucket(&domain).is_some() {
|
||||
return json_error("Conflict", &format!("Domain '{}' is already mapped", domain), StatusCode::CONFLICT);
|
||||
}
|
||||
|
||||
store.set_mapping(&domain, &bucket);
|
||||
json_response(StatusCode::CREATED, serde_json::json!({"domain": domain, "bucket": bucket}))
|
||||
}
|
||||
|
||||
pub async fn get_website_domain(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(domain): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let store = match &state.website_domains {
|
||||
Some(s) => s,
|
||||
None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let domain = normalize_domain(&domain);
|
||||
match store.get_bucket(&domain) {
|
||||
Some(bucket) => json_response(StatusCode::OK, serde_json::json!({"domain": domain, "bucket": bucket})),
|
||||
None => json_error("NotFound", &format!("No mapping found for domain '{}'", domain), StatusCode::NOT_FOUND),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_website_domain(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(domain): Path<String>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let store = match &state.website_domains {
|
||||
Some(s) => s,
|
||||
None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let domain = normalize_domain(&domain);
|
||||
let payload = match read_json_body(body).await {
|
||||
Some(v) => v,
|
||||
None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let bucket = payload.get("bucket").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
|
||||
if bucket.is_empty() {
|
||||
return json_error("ValidationError", "bucket is required", StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
match state.storage.bucket_exists(&bucket).await {
|
||||
Ok(true) => {}
|
||||
_ => return json_error("NoSuchBucket", &format!("Bucket '{}' does not exist", bucket), StatusCode::NOT_FOUND),
|
||||
}
|
||||
|
||||
if store.get_bucket(&domain).is_none() {
|
||||
return json_error("NotFound", &format!("No mapping found for domain '{}'", domain), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
store.set_mapping(&domain, &bucket);
|
||||
json_response(StatusCode::OK, serde_json::json!({"domain": domain, "bucket": bucket}))
|
||||
}
|
||||
|
||||
pub async fn delete_website_domain(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(domain): Path<String>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let store = match &state.website_domains {
|
||||
Some(s) => s,
|
||||
None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let domain = normalize_domain(&domain);
|
||||
if !store.delete_mapping(&domain) {
|
||||
return json_error("NotFound", &format!("No mapping found for domain '{}'", domain), StatusCode::NOT_FOUND);
|
||||
}
|
||||
StatusCode::NO_CONTENT.into_response()
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, Default)]
|
||||
pub struct PaginationQuery {
|
||||
pub limit: Option<usize>,
|
||||
pub offset: Option<usize>,
|
||||
}
|
||||
|
||||
pub async fn gc_status(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match &state.gc {
|
||||
Some(gc) => json_response(StatusCode::OK, gc.status().await),
|
||||
None => json_response(StatusCode::OK, serde_json::json!({"enabled": false, "message": "GC is not enabled. Set GC_ENABLED=true to enable."})),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn gc_run(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let gc = match &state.gc {
|
||||
Some(gc) => gc,
|
||||
None => return json_error("InvalidRequest", "GC is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let payload = read_json_body(body).await.unwrap_or(serde_json::json!({}));
|
||||
let dry_run = payload.get("dry_run").and_then(|v| v.as_bool()).unwrap_or(false);
|
||||
|
||||
match gc.run_now(dry_run).await {
|
||||
Ok(result) => json_response(StatusCode::OK, result),
|
||||
Err(e) => json_error("Conflict", &e, StatusCode::CONFLICT),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn gc_history(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match &state.gc {
|
||||
Some(gc) => json_response(StatusCode::OK, serde_json::json!({"executions": gc.history().await})),
|
||||
None => json_response(StatusCode::OK, serde_json::json!({"executions": []})),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn integrity_status(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match &state.integrity {
|
||||
Some(checker) => json_response(StatusCode::OK, checker.status().await),
|
||||
None => json_response(StatusCode::OK, serde_json::json!({"enabled": false, "message": "Integrity checker is not enabled. Set INTEGRITY_ENABLED=true to enable."})),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn integrity_run(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
let checker = match &state.integrity {
|
||||
Some(c) => c,
|
||||
None => return json_error("InvalidRequest", "Integrity checker is not enabled", StatusCode::BAD_REQUEST),
|
||||
};
|
||||
|
||||
let payload = read_json_body(body).await.unwrap_or(serde_json::json!({}));
|
||||
let dry_run = payload.get("dry_run").and_then(|v| v.as_bool()).unwrap_or(false);
|
||||
let auto_heal = payload.get("auto_heal").and_then(|v| v.as_bool()).unwrap_or(false);
|
||||
|
||||
match checker.run_now(dry_run, auto_heal).await {
|
||||
Ok(result) => json_response(StatusCode::OK, result),
|
||||
Err(e) => json_error("Conflict", &e, StatusCode::CONFLICT),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn integrity_history(
|
||||
State(state): State<AppState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
) -> Response {
|
||||
if let Some(err) = require_admin(&principal) { return err; }
|
||||
match &state.integrity {
|
||||
Some(checker) => json_response(StatusCode::OK, serde_json::json!({"executions": checker.history().await})),
|
||||
None => json_response(StatusCode::OK, serde_json::json!({"executions": []})),
|
||||
}
|
||||
}
|
||||
@@ -1,278 +0,0 @@
|
||||
use axum::body::Body;
|
||||
use axum::extract::State;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use base64::engine::general_purpose::STANDARD as B64;
|
||||
use base64::Engine;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
fn json_ok(value: serde_json::Value) -> Response {
|
||||
(
|
||||
StatusCode::OK,
|
||||
[("content-type", "application/json")],
|
||||
value.to_string(),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
fn json_err(status: StatusCode, msg: &str) -> Response {
|
||||
(
|
||||
status,
|
||||
[("content-type", "application/json")],
|
||||
json!({"error": msg}).to_string(),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
pub async fn list_keys(State(state): State<AppState>) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
let keys = kms.list_keys().await;
|
||||
let keys_json: Vec<serde_json::Value> = keys
|
||||
.iter()
|
||||
.map(|k| {
|
||||
json!({
|
||||
"KeyId": k.key_id,
|
||||
"Arn": k.arn,
|
||||
"Description": k.description,
|
||||
"CreationDate": k.creation_date.to_rfc3339(),
|
||||
"Enabled": k.enabled,
|
||||
"KeyState": k.key_state,
|
||||
"KeyUsage": k.key_usage,
|
||||
"KeySpec": k.key_spec,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
json_ok(json!({"keys": keys_json}))
|
||||
}
|
||||
|
||||
pub async fn create_key(State(state): State<AppState>, body: Body) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
let body_bytes = match http_body_util::BodyExt::collect(body).await {
|
||||
Ok(c) => c.to_bytes(),
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid request body"),
|
||||
};
|
||||
|
||||
let description = if body_bytes.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
match serde_json::from_slice::<serde_json::Value>(&body_bytes) {
|
||||
Ok(v) => v
|
||||
.get("Description")
|
||||
.or_else(|| v.get("description"))
|
||||
.and_then(|d| d.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string(),
|
||||
Err(_) => String::new(),
|
||||
}
|
||||
};
|
||||
|
||||
match kms.create_key(&description).await {
|
||||
Ok(key) => json_ok(json!({
|
||||
"KeyId": key.key_id,
|
||||
"Arn": key.arn,
|
||||
"Description": key.description,
|
||||
"CreationDate": key.creation_date.to_rfc3339(),
|
||||
"Enabled": key.enabled,
|
||||
"KeyState": key.key_state,
|
||||
})),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_key(
|
||||
State(state): State<AppState>,
|
||||
axum::extract::Path(key_id): axum::extract::Path<String>,
|
||||
) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
match kms.get_key(&key_id).await {
|
||||
Some(key) => json_ok(json!({
|
||||
"KeyId": key.key_id,
|
||||
"Arn": key.arn,
|
||||
"Description": key.description,
|
||||
"CreationDate": key.creation_date.to_rfc3339(),
|
||||
"Enabled": key.enabled,
|
||||
"KeyState": key.key_state,
|
||||
"KeyUsage": key.key_usage,
|
||||
"KeySpec": key.key_spec,
|
||||
})),
|
||||
None => json_err(StatusCode::NOT_FOUND, "Key not found"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_key(
|
||||
State(state): State<AppState>,
|
||||
axum::extract::Path(key_id): axum::extract::Path<String>,
|
||||
) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
match kms.delete_key(&key_id).await {
|
||||
Ok(true) => StatusCode::NO_CONTENT.into_response(),
|
||||
Ok(false) => json_err(StatusCode::NOT_FOUND, "Key not found"),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn enable_key(
|
||||
State(state): State<AppState>,
|
||||
axum::extract::Path(key_id): axum::extract::Path<String>,
|
||||
) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
match kms.enable_key(&key_id).await {
|
||||
Ok(true) => json_ok(json!({"status": "enabled"})),
|
||||
Ok(false) => json_err(StatusCode::NOT_FOUND, "Key not found"),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn disable_key(
|
||||
State(state): State<AppState>,
|
||||
axum::extract::Path(key_id): axum::extract::Path<String>,
|
||||
) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
match kms.disable_key(&key_id).await {
|
||||
Ok(true) => json_ok(json!({"status": "disabled"})),
|
||||
Ok(false) => json_err(StatusCode::NOT_FOUND, "Key not found"),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn encrypt(State(state): State<AppState>, body: Body) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
let body_bytes = match http_body_util::BodyExt::collect(body).await {
|
||||
Ok(c) => c.to_bytes(),
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid request body"),
|
||||
};
|
||||
|
||||
let req: serde_json::Value = match serde_json::from_slice(&body_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid JSON"),
|
||||
};
|
||||
|
||||
let key_id = match req.get("KeyId").and_then(|v| v.as_str()) {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::BAD_REQUEST, "Missing KeyId"),
|
||||
};
|
||||
let plaintext_b64 = match req.get("Plaintext").and_then(|v| v.as_str()) {
|
||||
Some(p) => p,
|
||||
None => return json_err(StatusCode::BAD_REQUEST, "Missing Plaintext"),
|
||||
};
|
||||
let plaintext = match B64.decode(plaintext_b64) {
|
||||
Ok(p) => p,
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid base64 Plaintext"),
|
||||
};
|
||||
|
||||
match kms.encrypt_data(key_id, &plaintext).await {
|
||||
Ok(ct) => json_ok(json!({
|
||||
"KeyId": key_id,
|
||||
"CiphertextBlob": B64.encode(&ct),
|
||||
})),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn decrypt(State(state): State<AppState>, body: Body) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
let body_bytes = match http_body_util::BodyExt::collect(body).await {
|
||||
Ok(c) => c.to_bytes(),
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid request body"),
|
||||
};
|
||||
|
||||
let req: serde_json::Value = match serde_json::from_slice(&body_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid JSON"),
|
||||
};
|
||||
|
||||
let key_id = match req.get("KeyId").and_then(|v| v.as_str()) {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::BAD_REQUEST, "Missing KeyId"),
|
||||
};
|
||||
let ct_b64 = match req.get("CiphertextBlob").and_then(|v| v.as_str()) {
|
||||
Some(c) => c,
|
||||
None => return json_err(StatusCode::BAD_REQUEST, "Missing CiphertextBlob"),
|
||||
};
|
||||
let ciphertext = match B64.decode(ct_b64) {
|
||||
Ok(c) => c,
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid base64"),
|
||||
};
|
||||
|
||||
match kms.decrypt_data(key_id, &ciphertext).await {
|
||||
Ok(pt) => json_ok(json!({
|
||||
"KeyId": key_id,
|
||||
"Plaintext": B64.encode(&pt),
|
||||
})),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn generate_data_key(State(state): State<AppState>, body: Body) -> Response {
|
||||
let kms = match &state.kms {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::SERVICE_UNAVAILABLE, "KMS not enabled"),
|
||||
};
|
||||
|
||||
let body_bytes = match http_body_util::BodyExt::collect(body).await {
|
||||
Ok(c) => c.to_bytes(),
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid request body"),
|
||||
};
|
||||
|
||||
let req: serde_json::Value = match serde_json::from_slice(&body_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(_) => return json_err(StatusCode::BAD_REQUEST, "Invalid JSON"),
|
||||
};
|
||||
|
||||
let key_id = match req.get("KeyId").and_then(|v| v.as_str()) {
|
||||
Some(k) => k,
|
||||
None => return json_err(StatusCode::BAD_REQUEST, "Missing KeyId"),
|
||||
};
|
||||
let num_bytes = req
|
||||
.get("NumberOfBytes")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(32) as usize;
|
||||
|
||||
if num_bytes < 1 || num_bytes > 1024 {
|
||||
return json_err(StatusCode::BAD_REQUEST, "NumberOfBytes must be 1-1024");
|
||||
}
|
||||
|
||||
match kms.generate_data_key(key_id, num_bytes).await {
|
||||
Ok((plaintext, wrapped)) => json_ok(json!({
|
||||
"KeyId": key_id,
|
||||
"Plaintext": B64.encode(&plaintext),
|
||||
"CiphertextBlob": B64.encode(&wrapped),
|
||||
})),
|
||||
Err(e) => json_err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
|
||||
}
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
pub mod config;
|
||||
pub mod handlers;
|
||||
pub mod middleware;
|
||||
pub mod services;
|
||||
pub mod state;
|
||||
|
||||
use axum::Router;
|
||||
|
||||
pub const SERVER_HEADER: &str = concat!("MyFSIO-Rust/", env!("CARGO_PKG_VERSION"));
|
||||
|
||||
pub fn create_router(state: state::AppState) -> Router {
|
||||
let mut router = Router::new()
|
||||
.route("/", axum::routing::get(handlers::list_buckets))
|
||||
.route(
|
||||
"/{bucket}",
|
||||
axum::routing::put(handlers::create_bucket)
|
||||
.get(handlers::get_bucket)
|
||||
.delete(handlers::delete_bucket)
|
||||
.head(handlers::head_bucket)
|
||||
.post(handlers::post_bucket),
|
||||
)
|
||||
.route(
|
||||
"/{bucket}/{*key}",
|
||||
axum::routing::put(handlers::put_object)
|
||||
.get(handlers::get_object)
|
||||
.delete(handlers::delete_object)
|
||||
.head(handlers::head_object)
|
||||
.post(handlers::post_object),
|
||||
);
|
||||
|
||||
if state.config.kms_enabled {
|
||||
router = router
|
||||
.route("/kms/keys", axum::routing::get(handlers::kms::list_keys).post(handlers::kms::create_key))
|
||||
.route("/kms/keys/{key_id}", axum::routing::get(handlers::kms::get_key).delete(handlers::kms::delete_key))
|
||||
.route("/kms/keys/{key_id}/enable", axum::routing::post(handlers::kms::enable_key))
|
||||
.route("/kms/keys/{key_id}/disable", axum::routing::post(handlers::kms::disable_key))
|
||||
.route("/kms/encrypt", axum::routing::post(handlers::kms::encrypt))
|
||||
.route("/kms/decrypt", axum::routing::post(handlers::kms::decrypt))
|
||||
.route("/kms/generate-data-key", axum::routing::post(handlers::kms::generate_data_key));
|
||||
}
|
||||
|
||||
router = router
|
||||
.route("/admin/site/local", axum::routing::get(handlers::admin::get_local_site).put(handlers::admin::update_local_site))
|
||||
.route("/admin/site/all", axum::routing::get(handlers::admin::list_all_sites))
|
||||
.route("/admin/site/peers", axum::routing::post(handlers::admin::register_peer_site))
|
||||
.route("/admin/site/peers/{site_id}", axum::routing::get(handlers::admin::get_peer_site).put(handlers::admin::update_peer_site).delete(handlers::admin::delete_peer_site))
|
||||
.route("/admin/site/peers/{site_id}/health", axum::routing::post(handlers::admin::check_peer_health))
|
||||
.route("/admin/site/topology", axum::routing::get(handlers::admin::get_topology))
|
||||
.route("/admin/site/peers/{site_id}/bidirectional-status", axum::routing::get(handlers::admin::check_bidirectional_status))
|
||||
.route("/admin/iam/users", axum::routing::get(handlers::admin::iam_list_users))
|
||||
.route("/admin/iam/users/{identifier}", axum::routing::get(handlers::admin::iam_get_user))
|
||||
.route("/admin/iam/users/{identifier}/policies", axum::routing::get(handlers::admin::iam_get_user_policies))
|
||||
.route("/admin/iam/users/{identifier}/access-keys", axum::routing::post(handlers::admin::iam_create_access_key))
|
||||
.route("/admin/iam/users/{identifier}/access-keys/{access_key}", axum::routing::delete(handlers::admin::iam_delete_access_key))
|
||||
.route("/admin/iam/users/{identifier}/disable", axum::routing::post(handlers::admin::iam_disable_user))
|
||||
.route("/admin/iam/users/{identifier}/enable", axum::routing::post(handlers::admin::iam_enable_user))
|
||||
.route("/admin/website-domains", axum::routing::get(handlers::admin::list_website_domains).post(handlers::admin::create_website_domain))
|
||||
.route("/admin/website-domains/{domain}", axum::routing::get(handlers::admin::get_website_domain).put(handlers::admin::update_website_domain).delete(handlers::admin::delete_website_domain))
|
||||
.route("/admin/gc/status", axum::routing::get(handlers::admin::gc_status))
|
||||
.route("/admin/gc/run", axum::routing::post(handlers::admin::gc_run))
|
||||
.route("/admin/gc/history", axum::routing::get(handlers::admin::gc_history))
|
||||
.route("/admin/integrity/status", axum::routing::get(handlers::admin::integrity_status))
|
||||
.route("/admin/integrity/run", axum::routing::post(handlers::admin::integrity_run))
|
||||
.route("/admin/integrity/history", axum::routing::get(handlers::admin::integrity_history));
|
||||
|
||||
router
|
||||
.layer(axum::middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
middleware::auth_layer,
|
||||
))
|
||||
.layer(axum::middleware::from_fn(middleware::server_header))
|
||||
.with_state(state)
|
||||
}
|
||||
@@ -1,97 +0,0 @@
|
||||
use myfsio_server::config::ServerConfig;
|
||||
use myfsio_server::state::AppState;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let config = ServerConfig::from_env();
|
||||
let bind_addr = config.bind_addr;
|
||||
|
||||
tracing::info!("MyFSIO Rust Engine starting on {}", bind_addr);
|
||||
tracing::info!("Storage root: {}", config.storage_root.display());
|
||||
tracing::info!("Region: {}", config.region);
|
||||
tracing::info!(
|
||||
"Encryption: {}, KMS: {}, GC: {}, Lifecycle: {}, Integrity: {}, Metrics: {}",
|
||||
config.encryption_enabled,
|
||||
config.kms_enabled,
|
||||
config.gc_enabled,
|
||||
config.lifecycle_enabled,
|
||||
config.integrity_enabled,
|
||||
config.metrics_enabled
|
||||
);
|
||||
|
||||
let state = if config.encryption_enabled || config.kms_enabled {
|
||||
AppState::new_with_encryption(config.clone()).await
|
||||
} else {
|
||||
AppState::new(config.clone())
|
||||
};
|
||||
|
||||
let mut bg_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
|
||||
|
||||
if let Some(ref gc) = state.gc {
|
||||
bg_handles.push(gc.clone().start_background());
|
||||
tracing::info!("GC background service started");
|
||||
}
|
||||
|
||||
if let Some(ref integrity) = state.integrity {
|
||||
bg_handles.push(integrity.clone().start_background());
|
||||
tracing::info!("Integrity checker background service started");
|
||||
}
|
||||
|
||||
if let Some(ref metrics) = state.metrics {
|
||||
bg_handles.push(metrics.clone().start_background());
|
||||
tracing::info!("Metrics collector background service started");
|
||||
}
|
||||
|
||||
if config.lifecycle_enabled {
|
||||
let lifecycle = std::sync::Arc::new(
|
||||
myfsio_server::services::lifecycle::LifecycleService::new(
|
||||
state.storage.clone(),
|
||||
myfsio_server::services::lifecycle::LifecycleConfig::default(),
|
||||
),
|
||||
);
|
||||
bg_handles.push(lifecycle.start_background());
|
||||
tracing::info!("Lifecycle manager background service started");
|
||||
}
|
||||
|
||||
let app = myfsio_server::create_router(state);
|
||||
|
||||
let listener = match tokio::net::TcpListener::bind(bind_addr).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
if err.kind() == std::io::ErrorKind::AddrInUse {
|
||||
tracing::error!("Port already in use: {}", bind_addr);
|
||||
} else {
|
||||
tracing::error!("Failed to bind {}: {}", bind_addr, err);
|
||||
}
|
||||
for handle in bg_handles {
|
||||
handle.abort();
|
||||
}
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
tracing::info!("Listening on {}", bind_addr);
|
||||
|
||||
if let Err(err) = axum::serve(listener, app)
|
||||
.with_graceful_shutdown(shutdown_signal())
|
||||
.await
|
||||
{
|
||||
tracing::error!("Server exited with error: {}", err);
|
||||
for handle in bg_handles {
|
||||
handle.abort();
|
||||
}
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
for handle in bg_handles {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown_signal() {
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
.expect("Failed to listen for Ctrl+C");
|
||||
tracing::info!("Shutdown signal received");
|
||||
}
|
||||
@@ -1,569 +0,0 @@
|
||||
use axum::extract::{Request, State};
|
||||
use axum::http::{Method, StatusCode};
|
||||
use axum::middleware::Next;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use myfsio_auth::sigv4;
|
||||
use myfsio_common::error::{S3Error, S3ErrorCode};
|
||||
use myfsio_common::types::Principal;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
pub async fn auth_layer(
|
||||
State(state): State<AppState>,
|
||||
mut req: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
let uri = req.uri().clone();
|
||||
let path = uri.path().to_string();
|
||||
|
||||
if path == "/" && req.method() == axum::http::Method::GET {
|
||||
match try_auth(&state, &req) {
|
||||
AuthResult::Ok(principal) => {
|
||||
if let Err(err) = authorize_request(&state, &principal, &req) {
|
||||
return error_response(err, &path);
|
||||
}
|
||||
req.extensions_mut().insert(principal);
|
||||
}
|
||||
AuthResult::Denied(err) => return error_response(err, &path),
|
||||
AuthResult::NoAuth => {
|
||||
return error_response(
|
||||
S3Error::new(S3ErrorCode::AccessDenied, "Missing credentials"),
|
||||
&path,
|
||||
);
|
||||
}
|
||||
}
|
||||
return next.run(req).await;
|
||||
}
|
||||
|
||||
match try_auth(&state, &req) {
|
||||
AuthResult::Ok(principal) => {
|
||||
if let Err(err) = authorize_request(&state, &principal, &req) {
|
||||
return error_response(err, &path);
|
||||
}
|
||||
req.extensions_mut().insert(principal);
|
||||
next.run(req).await
|
||||
}
|
||||
AuthResult::Denied(err) => error_response(err, &path),
|
||||
AuthResult::NoAuth => {
|
||||
error_response(
|
||||
S3Error::new(S3ErrorCode::AccessDenied, "Missing credentials"),
|
||||
&path,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum AuthResult {
|
||||
Ok(Principal),
|
||||
Denied(S3Error),
|
||||
NoAuth,
|
||||
}
|
||||
|
||||
fn authorize_request(state: &AppState, principal: &Principal, req: &Request) -> Result<(), S3Error> {
|
||||
let path = req.uri().path();
|
||||
if path == "/" {
|
||||
if state.iam.authorize(principal, None, "list", None) {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
|
||||
}
|
||||
|
||||
if path.starts_with("/admin/") || path.starts_with("/kms/") {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut segments = path.trim_start_matches('/').split('/').filter(|s| !s.is_empty());
|
||||
let bucket = match segments.next() {
|
||||
Some(b) => b,
|
||||
None => {
|
||||
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
|
||||
}
|
||||
};
|
||||
let remaining: Vec<&str> = segments.collect();
|
||||
let query = req.uri().query().unwrap_or("");
|
||||
|
||||
if remaining.is_empty() {
|
||||
let action = resolve_bucket_action(req.method(), query);
|
||||
if state.iam.authorize(principal, Some(bucket), action, None) {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
|
||||
}
|
||||
|
||||
let object_key = remaining.join("/");
|
||||
if req.method() == Method::PUT {
|
||||
if let Some(copy_source) = req
|
||||
.headers()
|
||||
.get("x-amz-copy-source")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
{
|
||||
let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
|
||||
if let Some((src_bucket, src_key)) = source.split_once('/') {
|
||||
let source_allowed =
|
||||
state.iam.authorize(principal, Some(src_bucket), "read", Some(src_key));
|
||||
let dest_allowed =
|
||||
state.iam.authorize(principal, Some(bucket), "write", Some(&object_key));
|
||||
if source_allowed && dest_allowed {
|
||||
return Ok(());
|
||||
}
|
||||
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let action = resolve_object_action(req.method(), query);
|
||||
if state
|
||||
.iam
|
||||
.authorize(principal, Some(bucket), action, Some(&object_key))
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"))
|
||||
}
|
||||
|
||||
fn resolve_bucket_action(method: &Method, query: &str) -> &'static str {
|
||||
if has_query_key(query, "versioning") {
|
||||
return "versioning";
|
||||
}
|
||||
if has_query_key(query, "tagging") {
|
||||
return "tagging";
|
||||
}
|
||||
if has_query_key(query, "cors") {
|
||||
return "cors";
|
||||
}
|
||||
if has_query_key(query, "location") {
|
||||
return "list";
|
||||
}
|
||||
if has_query_key(query, "encryption") {
|
||||
return "encryption";
|
||||
}
|
||||
if has_query_key(query, "lifecycle") {
|
||||
return "lifecycle";
|
||||
}
|
||||
if has_query_key(query, "acl") {
|
||||
return "share";
|
||||
}
|
||||
if has_query_key(query, "policy") || has_query_key(query, "policyStatus") {
|
||||
return "policy";
|
||||
}
|
||||
if has_query_key(query, "replication") {
|
||||
return "replication";
|
||||
}
|
||||
if has_query_key(query, "quota") {
|
||||
return "quota";
|
||||
}
|
||||
if has_query_key(query, "website") {
|
||||
return "website";
|
||||
}
|
||||
if has_query_key(query, "object-lock") {
|
||||
return "object_lock";
|
||||
}
|
||||
if has_query_key(query, "notification") {
|
||||
return "notification";
|
||||
}
|
||||
if has_query_key(query, "logging") {
|
||||
return "logging";
|
||||
}
|
||||
if has_query_key(query, "versions") || has_query_key(query, "uploads") {
|
||||
return "list";
|
||||
}
|
||||
if has_query_key(query, "delete") {
|
||||
return "delete";
|
||||
}
|
||||
|
||||
match *method {
|
||||
Method::GET => "list",
|
||||
Method::HEAD => "read",
|
||||
Method::PUT => "create_bucket",
|
||||
Method::DELETE => "delete_bucket",
|
||||
Method::POST => "write",
|
||||
_ => "list",
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_object_action(method: &Method, query: &str) -> &'static str {
|
||||
if has_query_key(query, "tagging") {
|
||||
return if *method == Method::GET { "read" } else { "write" };
|
||||
}
|
||||
if has_query_key(query, "acl") {
|
||||
return if *method == Method::GET { "read" } else { "write" };
|
||||
}
|
||||
if has_query_key(query, "retention") || has_query_key(query, "legal-hold") {
|
||||
return "object_lock";
|
||||
}
|
||||
if has_query_key(query, "attributes") {
|
||||
return "read";
|
||||
}
|
||||
if has_query_key(query, "uploads") || has_query_key(query, "uploadId") {
|
||||
return match *method {
|
||||
Method::GET => "read",
|
||||
_ => "write",
|
||||
};
|
||||
}
|
||||
if has_query_key(query, "select") {
|
||||
return "read";
|
||||
}
|
||||
|
||||
match *method {
|
||||
Method::GET | Method::HEAD => "read",
|
||||
Method::PUT => "write",
|
||||
Method::DELETE => "delete",
|
||||
Method::POST => "write",
|
||||
_ => "read",
|
||||
}
|
||||
}
|
||||
|
||||
fn has_query_key(query: &str, key: &str) -> bool {
|
||||
if query.is_empty() {
|
||||
return false;
|
||||
}
|
||||
query
|
||||
.split('&')
|
||||
.filter(|part| !part.is_empty())
|
||||
.any(|part| part == key || part.starts_with(&format!("{}=", key)))
|
||||
}
|
||||
|
||||
fn try_auth(state: &AppState, req: &Request) -> AuthResult {
|
||||
if let Some(auth_header) = req.headers().get("authorization") {
|
||||
if let Ok(auth_str) = auth_header.to_str() {
|
||||
if auth_str.starts_with("AWS4-HMAC-SHA256 ") {
|
||||
return verify_sigv4_header(state, req, auth_str);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let query = req.uri().query().unwrap_or("");
|
||||
if query.contains("X-Amz-Algorithm=AWS4-HMAC-SHA256") {
|
||||
return verify_sigv4_query(state, req);
|
||||
}
|
||||
|
||||
if let (Some(ak), Some(sk)) = (
|
||||
req.headers().get("x-access-key").and_then(|v| v.to_str().ok()),
|
||||
req.headers().get("x-secret-key").and_then(|v| v.to_str().ok()),
|
||||
) {
|
||||
return match state.iam.authenticate(ak, sk) {
|
||||
Some(principal) => AuthResult::Ok(principal),
|
||||
None => AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::SignatureDoesNotMatch),
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
AuthResult::NoAuth
|
||||
}
|
||||
|
||||
fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthResult {
|
||||
let parts: Vec<&str> = auth_str
|
||||
.strip_prefix("AWS4-HMAC-SHA256 ")
|
||||
.unwrap()
|
||||
.split(", ")
|
||||
.collect();
|
||||
|
||||
if parts.len() != 3 {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Malformed Authorization header"),
|
||||
);
|
||||
}
|
||||
|
||||
let credential = parts[0].strip_prefix("Credential=").unwrap_or("");
|
||||
let signed_headers_str = parts[1].strip_prefix("SignedHeaders=").unwrap_or("");
|
||||
let provided_signature = parts[2].strip_prefix("Signature=").unwrap_or("");
|
||||
|
||||
let cred_parts: Vec<&str> = credential.split('/').collect();
|
||||
if cred_parts.len() != 5 {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Malformed credential"),
|
||||
);
|
||||
}
|
||||
|
||||
let access_key = cred_parts[0];
|
||||
let date_stamp = cred_parts[1];
|
||||
let region = cred_parts[2];
|
||||
let service = cred_parts[3];
|
||||
|
||||
let amz_date = req
|
||||
.headers()
|
||||
.get("x-amz-date")
|
||||
.or_else(|| req.headers().get("date"))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
|
||||
if amz_date.is_empty() {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::AccessDenied, "Missing Date header"),
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(err) = check_timestamp_freshness(amz_date, state.config.sigv4_timestamp_tolerance_secs) {
|
||||
return AuthResult::Denied(err);
|
||||
}
|
||||
|
||||
let secret_key = match state.iam.get_secret_key(access_key) {
|
||||
Some(sk) => sk,
|
||||
None => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::InvalidAccessKeyId),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let method = req.method().as_str();
|
||||
let canonical_uri = req.uri().path();
|
||||
|
||||
let query_params = parse_query_params(req.uri().query().unwrap_or(""));
|
||||
|
||||
let payload_hash = req
|
||||
.headers()
|
||||
.get("x-amz-content-sha256")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("UNSIGNED-PAYLOAD");
|
||||
|
||||
let signed_headers: Vec<&str> = signed_headers_str.split(';').collect();
|
||||
let header_values: Vec<(String, String)> = signed_headers
|
||||
.iter()
|
||||
.map(|&name| {
|
||||
let value = req
|
||||
.headers()
|
||||
.get(name)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
(name.to_string(), value.to_string())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let verified = sigv4::verify_sigv4_signature(
|
||||
method,
|
||||
canonical_uri,
|
||||
&query_params,
|
||||
signed_headers_str,
|
||||
&header_values,
|
||||
payload_hash,
|
||||
amz_date,
|
||||
date_stamp,
|
||||
region,
|
||||
service,
|
||||
&secret_key,
|
||||
provided_signature,
|
||||
);
|
||||
|
||||
if !verified {
|
||||
return AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::SignatureDoesNotMatch),
|
||||
);
|
||||
}
|
||||
|
||||
match state.iam.get_principal(access_key) {
|
||||
Some(p) => AuthResult::Ok(p),
|
||||
None => AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::InvalidAccessKeyId),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_sigv4_query(state: &AppState, req: &Request) -> AuthResult {
|
||||
let query = req.uri().query().unwrap_or("");
|
||||
let params = parse_query_params(query);
|
||||
let param_map: std::collections::HashMap<&str, &str> = params
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_str(), v.as_str()))
|
||||
.collect();
|
||||
|
||||
let credential = match param_map.get("X-Amz-Credential") {
|
||||
Some(c) => *c,
|
||||
None => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Missing X-Amz-Credential"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let signed_headers_str = param_map
|
||||
.get("X-Amz-SignedHeaders")
|
||||
.copied()
|
||||
.unwrap_or("host");
|
||||
let provided_signature = match param_map.get("X-Amz-Signature") {
|
||||
Some(s) => *s,
|
||||
None => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Missing X-Amz-Signature"),
|
||||
);
|
||||
}
|
||||
};
|
||||
let amz_date = match param_map.get("X-Amz-Date") {
|
||||
Some(d) => *d,
|
||||
None => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Missing X-Amz-Date"),
|
||||
);
|
||||
}
|
||||
};
|
||||
let expires_str = match param_map.get("X-Amz-Expires") {
|
||||
Some(e) => *e,
|
||||
None => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Missing X-Amz-Expires"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let cred_parts: Vec<&str> = credential.split('/').collect();
|
||||
if cred_parts.len() != 5 {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Malformed credential"),
|
||||
);
|
||||
}
|
||||
|
||||
let access_key = cred_parts[0];
|
||||
let date_stamp = cred_parts[1];
|
||||
let region = cred_parts[2];
|
||||
let service = cred_parts[3];
|
||||
|
||||
let expires: u64 = match expires_str.parse() {
|
||||
Ok(e) => e,
|
||||
Err(_) => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "Invalid X-Amz-Expires"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
if expires < state.config.presigned_url_min_expiry
|
||||
|| expires > state.config.presigned_url_max_expiry
|
||||
{
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::InvalidArgument, "X-Amz-Expires out of range"),
|
||||
);
|
||||
}
|
||||
|
||||
if let Ok(request_time) =
|
||||
NaiveDateTime::parse_from_str(amz_date, "%Y%m%dT%H%M%SZ")
|
||||
{
|
||||
let request_utc = request_time.and_utc();
|
||||
let now = Utc::now();
|
||||
let elapsed = (now - request_utc).num_seconds();
|
||||
if elapsed > expires as i64 {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::AccessDenied, "Request has expired"),
|
||||
);
|
||||
}
|
||||
if elapsed < -(state.config.sigv4_timestamp_tolerance_secs as i64) {
|
||||
return AuthResult::Denied(
|
||||
S3Error::new(S3ErrorCode::AccessDenied, "Request is too far in the future"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let secret_key = match state.iam.get_secret_key(access_key) {
|
||||
Some(sk) => sk,
|
||||
None => {
|
||||
return AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::InvalidAccessKeyId),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let method = req.method().as_str();
|
||||
let canonical_uri = req.uri().path();
|
||||
|
||||
let query_params_no_sig: Vec<(String, String)> = params
|
||||
.iter()
|
||||
.filter(|(k, _)| k != "X-Amz-Signature")
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let payload_hash = "UNSIGNED-PAYLOAD";
|
||||
|
||||
let signed_headers: Vec<&str> = signed_headers_str.split(';').collect();
|
||||
let header_values: Vec<(String, String)> = signed_headers
|
||||
.iter()
|
||||
.map(|&name| {
|
||||
let value = req
|
||||
.headers()
|
||||
.get(name)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
(name.to_string(), value.to_string())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let verified = sigv4::verify_sigv4_signature(
|
||||
method,
|
||||
canonical_uri,
|
||||
&query_params_no_sig,
|
||||
signed_headers_str,
|
||||
&header_values,
|
||||
payload_hash,
|
||||
amz_date,
|
||||
date_stamp,
|
||||
region,
|
||||
service,
|
||||
&secret_key,
|
||||
provided_signature,
|
||||
);
|
||||
|
||||
if !verified {
|
||||
return AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::SignatureDoesNotMatch),
|
||||
);
|
||||
}
|
||||
|
||||
match state.iam.get_principal(access_key) {
|
||||
Some(p) => AuthResult::Ok(p),
|
||||
None => AuthResult::Denied(
|
||||
S3Error::from_code(S3ErrorCode::InvalidAccessKeyId),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_timestamp_freshness(amz_date: &str, tolerance_secs: u64) -> Option<S3Error> {
|
||||
let request_time = NaiveDateTime::parse_from_str(amz_date, "%Y%m%dT%H%M%SZ").ok()?;
|
||||
let request_utc = request_time.and_utc();
|
||||
let now = Utc::now();
|
||||
let diff = (now - request_utc).num_seconds().unsigned_abs();
|
||||
|
||||
if diff > tolerance_secs {
|
||||
return Some(S3Error::new(
|
||||
S3ErrorCode::AccessDenied,
|
||||
"Request timestamp too old or too far in the future",
|
||||
));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn parse_query_params(query: &str) -> Vec<(String, String)> {
|
||||
if query.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
query
|
||||
.split('&')
|
||||
.filter_map(|pair| {
|
||||
let mut parts = pair.splitn(2, '=');
|
||||
let key = parts.next()?;
|
||||
let value = parts.next().unwrap_or("");
|
||||
Some((
|
||||
urlencoding_decode(key),
|
||||
urlencoding_decode(value),
|
||||
))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn urlencoding_decode(s: &str) -> String {
|
||||
percent_encoding::percent_decode_str(s)
|
||||
.decode_utf8_lossy()
|
||||
.into_owned()
|
||||
}
|
||||
|
||||
fn error_response(err: S3Error, resource: &str) -> Response {
|
||||
let status =
|
||||
StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
let request_id = uuid::Uuid::new_v4().simple().to_string();
|
||||
let body = err
|
||||
.with_resource(resource.to_string())
|
||||
.with_request_id(request_id)
|
||||
.to_xml();
|
||||
(status, [("content-type", "application/xml")], body).into_response()
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
mod auth;
|
||||
|
||||
pub use auth::auth_layer;
|
||||
|
||||
use axum::extract::Request;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Response;
|
||||
|
||||
pub async fn server_header(req: Request, next: Next) -> Response {
|
||||
let mut resp = next.run(req).await;
|
||||
resp.headers_mut().insert(
|
||||
"server",
|
||||
crate::SERVER_HEADER.parse().unwrap(),
|
||||
);
|
||||
resp
|
||||
}
|
||||
@@ -1,204 +0,0 @@
|
||||
use myfsio_storage::fs_backend::FsStorageBackend;
|
||||
use myfsio_storage::traits::StorageEngine;
|
||||
use serde_json::{json, Value};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub struct IntegrityConfig {
|
||||
pub interval_hours: f64,
|
||||
pub batch_size: usize,
|
||||
pub auto_heal: bool,
|
||||
pub dry_run: bool,
|
||||
}
|
||||
|
||||
impl Default for IntegrityConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval_hours: 24.0,
|
||||
batch_size: 1000,
|
||||
auto_heal: false,
|
||||
dry_run: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntegrityService {
|
||||
storage: Arc<FsStorageBackend>,
|
||||
config: IntegrityConfig,
|
||||
running: Arc<RwLock<bool>>,
|
||||
history: Arc<RwLock<Vec<Value>>>,
|
||||
history_path: PathBuf,
|
||||
}
|
||||
|
||||
impl IntegrityService {
|
||||
pub fn new(
|
||||
storage: Arc<FsStorageBackend>,
|
||||
storage_root: &std::path::Path,
|
||||
config: IntegrityConfig,
|
||||
) -> Self {
|
||||
let history_path = storage_root
|
||||
.join(".myfsio.sys")
|
||||
.join("config")
|
||||
.join("integrity_history.json");
|
||||
|
||||
let history = if history_path.exists() {
|
||||
std::fs::read_to_string(&history_path)
|
||||
.ok()
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.get("executions").and_then(|e| e.as_array().cloned()))
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
Self {
|
||||
storage,
|
||||
config,
|
||||
running: Arc::new(RwLock::new(false)),
|
||||
history: Arc::new(RwLock::new(history)),
|
||||
history_path,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn status(&self) -> Value {
|
||||
let running = *self.running.read().await;
|
||||
json!({
|
||||
"enabled": true,
|
||||
"running": running,
|
||||
"interval_hours": self.config.interval_hours,
|
||||
"batch_size": self.config.batch_size,
|
||||
"auto_heal": self.config.auto_heal,
|
||||
"dry_run": self.config.dry_run,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn history(&self) -> Value {
|
||||
let history = self.history.read().await;
|
||||
json!({ "executions": *history })
|
||||
}
|
||||
|
||||
pub async fn run_now(&self, dry_run: bool, auto_heal: bool) -> Result<Value, String> {
|
||||
{
|
||||
let mut running = self.running.write().await;
|
||||
if *running {
|
||||
return Err("Integrity check already running".to_string());
|
||||
}
|
||||
*running = true;
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let result = self.check_integrity(dry_run, auto_heal).await;
|
||||
let elapsed = start.elapsed().as_secs_f64();
|
||||
|
||||
*self.running.write().await = false;
|
||||
|
||||
let mut result_json = result.clone();
|
||||
if let Some(obj) = result_json.as_object_mut() {
|
||||
obj.insert("execution_time_seconds".to_string(), json!(elapsed));
|
||||
}
|
||||
|
||||
let record = json!({
|
||||
"timestamp": chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
|
||||
"dry_run": dry_run,
|
||||
"auto_heal": auto_heal,
|
||||
"result": result_json,
|
||||
});
|
||||
|
||||
{
|
||||
let mut history = self.history.write().await;
|
||||
history.push(record);
|
||||
if history.len() > 50 {
|
||||
let excess = history.len() - 50;
|
||||
history.drain(..excess);
|
||||
}
|
||||
}
|
||||
self.save_history().await;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn check_integrity(&self, _dry_run: bool, _auto_heal: bool) -> Value {
|
||||
let buckets = match self.storage.list_buckets().await {
|
||||
Ok(b) => b,
|
||||
Err(e) => return json!({"error": e.to_string()}),
|
||||
};
|
||||
|
||||
let mut objects_scanned = 0u64;
|
||||
let mut corrupted = 0u64;
|
||||
let mut phantom_metadata = 0u64;
|
||||
let mut errors: Vec<String> = Vec::new();
|
||||
|
||||
for bucket in &buckets {
|
||||
let params = myfsio_common::types::ListParams {
|
||||
max_keys: self.config.batch_size,
|
||||
..Default::default()
|
||||
};
|
||||
let objects = match self.storage.list_objects(&bucket.name, ¶ms).await {
|
||||
Ok(r) => r.objects,
|
||||
Err(e) => {
|
||||
errors.push(format!("{}: {}", bucket.name, e));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for obj in &objects {
|
||||
objects_scanned += 1;
|
||||
match self.storage.get_object_path(&bucket.name, &obj.key).await {
|
||||
Ok(path) => {
|
||||
if !path.exists() {
|
||||
phantom_metadata += 1;
|
||||
} else if let Some(ref expected_etag) = obj.etag {
|
||||
match myfsio_crypto::hashing::md5_file(&path) {
|
||||
Ok(actual_etag) => {
|
||||
if &actual_etag != expected_etag {
|
||||
corrupted += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => errors.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => errors.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json!({
|
||||
"objects_scanned": objects_scanned,
|
||||
"buckets_scanned": buckets.len(),
|
||||
"corrupted_objects": corrupted,
|
||||
"phantom_metadata": phantom_metadata,
|
||||
"errors": errors,
|
||||
})
|
||||
}
|
||||
|
||||
async fn save_history(&self) {
|
||||
let history = self.history.read().await;
|
||||
let data = json!({ "executions": *history });
|
||||
if let Some(parent) = self.history_path.parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
let _ = std::fs::write(
|
||||
&self.history_path,
|
||||
serde_json::to_string_pretty(&data).unwrap_or_default(),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
|
||||
let interval = std::time::Duration::from_secs_f64(self.config.interval_hours * 3600.0);
|
||||
tokio::spawn(async move {
|
||||
let mut timer = tokio::time::interval(interval);
|
||||
timer.tick().await;
|
||||
loop {
|
||||
timer.tick().await;
|
||||
tracing::info!("Integrity check starting");
|
||||
match self.run_now(false, false).await {
|
||||
Ok(result) => tracing::info!("Integrity check complete: {:?}", result),
|
||||
Err(e) => tracing::warn!("Integrity check failed: {}", e),
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,219 +0,0 @@
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub struct MetricsConfig {
|
||||
pub interval_minutes: u64,
|
||||
pub retention_hours: u64,
|
||||
}
|
||||
|
||||
impl Default for MetricsConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval_minutes: 5,
|
||||
retention_hours: 24,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct MethodStats {
|
||||
count: u64,
|
||||
success_count: u64,
|
||||
error_count: u64,
|
||||
bytes_in: u64,
|
||||
bytes_out: u64,
|
||||
latencies: Vec<f64>,
|
||||
}
|
||||
|
||||
impl MethodStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
count: 0,
|
||||
success_count: 0,
|
||||
error_count: 0,
|
||||
bytes_in: 0,
|
||||
bytes_out: 0,
|
||||
latencies: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_json(&self) -> Value {
|
||||
let (min, max, avg, p50, p95, p99) = if self.latencies.is_empty() {
|
||||
(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
|
||||
} else {
|
||||
let mut sorted = self.latencies.clone();
|
||||
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
|
||||
let len = sorted.len();
|
||||
let sum: f64 = sorted.iter().sum();
|
||||
(
|
||||
sorted[0],
|
||||
sorted[len - 1],
|
||||
sum / len as f64,
|
||||
sorted[len / 2],
|
||||
sorted[((len as f64 * 0.95) as usize).min(len - 1)],
|
||||
sorted[((len as f64 * 0.99) as usize).min(len - 1)],
|
||||
)
|
||||
};
|
||||
|
||||
json!({
|
||||
"count": self.count,
|
||||
"success_count": self.success_count,
|
||||
"error_count": self.error_count,
|
||||
"bytes_in": self.bytes_in,
|
||||
"bytes_out": self.bytes_out,
|
||||
"latency_min_ms": min,
|
||||
"latency_max_ms": max,
|
||||
"latency_avg_ms": avg,
|
||||
"latency_p50_ms": p50,
|
||||
"latency_p95_ms": p95,
|
||||
"latency_p99_ms": p99,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct CurrentWindow {
|
||||
by_method: HashMap<String, MethodStats>,
|
||||
by_status_class: HashMap<String, u64>,
|
||||
start_time: Instant,
|
||||
}
|
||||
|
||||
impl CurrentWindow {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
by_method: HashMap::new(),
|
||||
by_status_class: HashMap::new(),
|
||||
start_time: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.by_method.clear();
|
||||
self.by_status_class.clear();
|
||||
self.start_time = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetricsService {
|
||||
config: MetricsConfig,
|
||||
current: Arc<RwLock<CurrentWindow>>,
|
||||
snapshots: Arc<RwLock<Vec<Value>>>,
|
||||
snapshots_path: PathBuf,
|
||||
}
|
||||
|
||||
impl MetricsService {
|
||||
pub fn new(storage_root: &std::path::Path, config: MetricsConfig) -> Self {
|
||||
let snapshots_path = storage_root
|
||||
.join(".myfsio.sys")
|
||||
.join("config")
|
||||
.join("operation_metrics.json");
|
||||
|
||||
let snapshots = if snapshots_path.exists() {
|
||||
std::fs::read_to_string(&snapshots_path)
|
||||
.ok()
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.get("snapshots").and_then(|s| s.as_array().cloned()))
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
Self {
|
||||
config,
|
||||
current: Arc::new(RwLock::new(CurrentWindow::new())),
|
||||
snapshots: Arc::new(RwLock::new(snapshots)),
|
||||
snapshots_path,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record(&self, method: &str, status: u16, latency_ms: f64, bytes_in: u64, bytes_out: u64) {
|
||||
let mut window = self.current.write().await;
|
||||
let stats = window.by_method.entry(method.to_string()).or_insert_with(MethodStats::new);
|
||||
stats.count += 1;
|
||||
if status < 400 {
|
||||
stats.success_count += 1;
|
||||
} else {
|
||||
stats.error_count += 1;
|
||||
}
|
||||
stats.bytes_in += bytes_in;
|
||||
stats.bytes_out += bytes_out;
|
||||
stats.latencies.push(latency_ms);
|
||||
|
||||
let class = format!("{}xx", status / 100);
|
||||
*window.by_status_class.entry(class).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
pub async fn snapshot(&self) -> Value {
|
||||
let window = self.current.read().await;
|
||||
let mut by_method = serde_json::Map::new();
|
||||
for (method, stats) in &window.by_method {
|
||||
by_method.insert(method.clone(), stats.to_json());
|
||||
}
|
||||
|
||||
let snapshots = self.snapshots.read().await;
|
||||
json!({
|
||||
"enabled": true,
|
||||
"current_window": {
|
||||
"by_method": by_method,
|
||||
"by_status_class": window.by_status_class,
|
||||
"window_start_elapsed_secs": window.start_time.elapsed().as_secs_f64(),
|
||||
},
|
||||
"snapshots": *snapshots,
|
||||
})
|
||||
}
|
||||
|
||||
async fn flush_window(&self) {
|
||||
let snap = {
|
||||
let mut window = self.current.write().await;
|
||||
let mut by_method = serde_json::Map::new();
|
||||
for (method, stats) in &window.by_method {
|
||||
by_method.insert(method.clone(), stats.to_json());
|
||||
}
|
||||
let snap = json!({
|
||||
"timestamp": chrono::Utc::now().to_rfc3339(),
|
||||
"window_seconds": self.config.interval_minutes * 60,
|
||||
"by_method": by_method,
|
||||
"by_status_class": window.by_status_class,
|
||||
});
|
||||
window.reset();
|
||||
snap
|
||||
};
|
||||
|
||||
let max_snapshots = (self.config.retention_hours * 60 / self.config.interval_minutes) as usize;
|
||||
{
|
||||
let mut snapshots = self.snapshots.write().await;
|
||||
snapshots.push(snap);
|
||||
if snapshots.len() > max_snapshots {
|
||||
let excess = snapshots.len() - max_snapshots;
|
||||
snapshots.drain(..excess);
|
||||
}
|
||||
}
|
||||
self.save_snapshots().await;
|
||||
}
|
||||
|
||||
async fn save_snapshots(&self) {
|
||||
let snapshots = self.snapshots.read().await;
|
||||
let data = json!({ "snapshots": *snapshots });
|
||||
if let Some(parent) = self.snapshots_path.parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
let _ = std::fs::write(
|
||||
&self.snapshots_path,
|
||||
serde_json::to_string_pretty(&data).unwrap_or_default(),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
|
||||
let interval = std::time::Duration::from_secs(self.config.interval_minutes * 60);
|
||||
tokio::spawn(async move {
|
||||
let mut timer = tokio::time::interval(interval);
|
||||
timer.tick().await;
|
||||
loop {
|
||||
timer.tick().await;
|
||||
self.flush_window().await;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,121 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::config::ServerConfig;
|
||||
use crate::services::gc::GcService;
|
||||
use crate::services::integrity::IntegrityService;
|
||||
use crate::services::metrics::MetricsService;
|
||||
use crate::services::site_registry::SiteRegistry;
|
||||
use crate::services::website_domains::WebsiteDomainStore;
|
||||
use myfsio_auth::iam::IamService;
|
||||
use myfsio_crypto::encryption::EncryptionService;
|
||||
use myfsio_crypto::kms::KmsService;
|
||||
use myfsio_storage::fs_backend::FsStorageBackend;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub config: ServerConfig,
|
||||
pub storage: Arc<FsStorageBackend>,
|
||||
pub iam: Arc<IamService>,
|
||||
pub encryption: Option<Arc<EncryptionService>>,
|
||||
pub kms: Option<Arc<KmsService>>,
|
||||
pub gc: Option<Arc<GcService>>,
|
||||
pub integrity: Option<Arc<IntegrityService>>,
|
||||
pub metrics: Option<Arc<MetricsService>>,
|
||||
pub site_registry: Option<Arc<SiteRegistry>>,
|
||||
pub website_domains: Option<Arc<WebsiteDomainStore>>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(config: ServerConfig) -> Self {
|
||||
let storage = Arc::new(FsStorageBackend::new(config.storage_root.clone()));
|
||||
let iam = Arc::new(IamService::new_with_secret(
|
||||
config.iam_config_path.clone(),
|
||||
config.secret_key.clone(),
|
||||
));
|
||||
|
||||
let gc = if config.gc_enabled {
|
||||
Some(Arc::new(GcService::new(
|
||||
config.storage_root.clone(),
|
||||
crate::services::gc::GcConfig::default(),
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let integrity = if config.integrity_enabled {
|
||||
Some(Arc::new(IntegrityService::new(
|
||||
storage.clone(),
|
||||
&config.storage_root,
|
||||
crate::services::integrity::IntegrityConfig::default(),
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let metrics = if config.metrics_enabled {
|
||||
Some(Arc::new(MetricsService::new(
|
||||
&config.storage_root,
|
||||
crate::services::metrics::MetricsConfig::default(),
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let site_registry = Some(Arc::new(SiteRegistry::new(&config.storage_root)));
|
||||
|
||||
let website_domains = if config.website_hosting_enabled {
|
||||
Some(Arc::new(WebsiteDomainStore::new(&config.storage_root)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Self {
|
||||
config,
|
||||
storage,
|
||||
iam,
|
||||
encryption: None,
|
||||
kms: None,
|
||||
gc,
|
||||
integrity,
|
||||
metrics,
|
||||
site_registry,
|
||||
website_domains,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn new_with_encryption(config: ServerConfig) -> Self {
|
||||
let mut state = Self::new(config.clone());
|
||||
|
||||
let keys_dir = config.storage_root.join(".myfsio.sys").join("keys");
|
||||
|
||||
let kms = if config.kms_enabled {
|
||||
match KmsService::new(&keys_dir).await {
|
||||
Ok(k) => Some(Arc::new(k)),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to initialize KMS: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let encryption = if config.encryption_enabled {
|
||||
match myfsio_crypto::kms::load_or_create_master_key(&keys_dir).await {
|
||||
Ok(master_key) => {
|
||||
Some(Arc::new(EncryptionService::new(master_key, kms.clone())))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to initialize encryption: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
state.encryption = encryption;
|
||||
state.kms = kms;
|
||||
state
|
||||
}
|
||||
}
|
||||
@@ -11,5 +11,7 @@ htmlcov
|
||||
logs
|
||||
data
|
||||
tmp
|
||||
tests
|
||||
myfsio_core/target
|
||||
myfsio-engine/target
|
||||
Dockerfile
|
||||
.dockerignore
|
||||
@@ -1,9 +1,9 @@
|
||||
FROM python:3.14.3-slim
|
||||
FROM python:3.14.3-slim AS builder
|
||||
|
||||
ENV PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONUNBUFFERED=1
|
||||
|
||||
WORKDIR /app
|
||||
WORKDIR /build
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends build-essential curl \
|
||||
@@ -12,27 +12,34 @@ RUN apt-get update \
|
||||
|
||||
ENV PATH="/root/.cargo/bin:${PATH}"
|
||||
|
||||
COPY requirements.txt ./
|
||||
RUN pip install --no-cache-dir maturin
|
||||
|
||||
COPY myfsio_core ./myfsio_core
|
||||
RUN cd myfsio_core \
|
||||
&& maturin build --release --out /wheels
|
||||
|
||||
|
||||
FROM python:3.14.3-slim
|
||||
|
||||
ENV PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONUNBUFFERED=1
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt ./
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
COPY --from=builder /wheels/*.whl /tmp/
|
||||
RUN pip install --no-cache-dir /tmp/*.whl && rm /tmp/*.whl
|
||||
|
||||
RUN pip install --no-cache-dir maturin \
|
||||
&& cd myfsio_core \
|
||||
&& maturin build --release \
|
||||
&& pip install target/wheels/*.whl \
|
||||
&& cd ../myfsio-engine \
|
||||
&& cargo build --release \
|
||||
&& cp target/release/myfsio-server /usr/local/bin/myfsio-server \
|
||||
&& cd .. \
|
||||
&& rm -rf myfsio_core/target myfsio-engine/target \
|
||||
&& pip uninstall -y maturin \
|
||||
&& rustup self uninstall -y
|
||||
COPY app ./app
|
||||
COPY templates ./templates
|
||||
COPY static ./static
|
||||
COPY run.py ./
|
||||
COPY docker-entrypoint.sh ./
|
||||
|
||||
RUN chmod +x docker-entrypoint.sh
|
||||
|
||||
RUN mkdir -p /app/data \
|
||||
RUN chmod +x docker-entrypoint.sh \
|
||||
&& mkdir -p /app/data \
|
||||
&& useradd -m -u 1000 myfsio \
|
||||
&& chown -R myfsio:myfsio /app
|
||||
|
||||
@@ -41,8 +48,7 @@ USER myfsio
|
||||
EXPOSE 5000 5100
|
||||
ENV APP_HOST=0.0.0.0 \
|
||||
FLASK_ENV=production \
|
||||
FLASK_DEBUG=0 \
|
||||
ENGINE=rust
|
||||
FLASK_DEBUG=0
|
||||
|
||||
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
|
||||
CMD python -c "import requests; requests.get('http://localhost:5000/myfsio/health', timeout=2)"
|
||||
@@ -225,10 +225,10 @@ def _policy_allows_public_read(policy: dict[str, Any]) -> bool:
|
||||
|
||||
def _bucket_access_descriptor(policy: dict[str, Any] | None) -> tuple[str, str]:
|
||||
if not policy:
|
||||
return ("IAM only", "text-bg-secondary")
|
||||
return ("IAM only", "bg-secondary-subtle text-secondary-emphasis")
|
||||
if _policy_allows_public_read(policy):
|
||||
return ("Public read", "text-bg-warning")
|
||||
return ("Custom policy", "text-bg-info")
|
||||
return ("Public read", "bg-warning-subtle text-warning-emphasis")
|
||||
return ("Custom policy", "bg-info-subtle text-info-emphasis")
|
||||
|
||||
|
||||
def _current_principal():
|
||||
4
python/docker-entrypoint.sh
Normal file
4
python/docker-entrypoint.sh
Normal file
@@ -0,0 +1,4 @@
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
exec python run.py --prod
|
||||
@@ -5,7 +5,6 @@ import argparse
|
||||
import atexit
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import warnings
|
||||
import multiprocessing
|
||||
@@ -75,49 +74,6 @@ def _serve_granian(target: str, port: int, config: Optional[AppConfig] = None) -
|
||||
server.serve()
|
||||
|
||||
|
||||
def _find_rust_binary() -> Optional[Path]:
|
||||
candidates = [
|
||||
Path("/usr/local/bin/myfsio-server"),
|
||||
Path(__file__).parent / "myfsio-engine" / "target" / "release" / "myfsio-server.exe",
|
||||
Path(__file__).parent / "myfsio-engine" / "target" / "release" / "myfsio-server",
|
||||
Path(__file__).parent / "myfsio-engine" / "target" / "debug" / "myfsio-server.exe",
|
||||
Path(__file__).parent / "myfsio-engine" / "target" / "debug" / "myfsio-server",
|
||||
]
|
||||
for p in candidates:
|
||||
if p.exists():
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def serve_rust_api(port: int, config: Optional[AppConfig] = None) -> None:
|
||||
binary = _find_rust_binary()
|
||||
if binary is None:
|
||||
print("ERROR: Rust engine binary not found. Build it first:")
|
||||
print(" cd myfsio-engine && cargo build --release")
|
||||
sys.exit(1)
|
||||
|
||||
env = os.environ.copy()
|
||||
env["PORT"] = str(port)
|
||||
env["HOST"] = _server_host()
|
||||
if config:
|
||||
env["STORAGE_ROOT"] = str(config.storage_root)
|
||||
env["AWS_REGION"] = config.aws_region
|
||||
if config.secret_key:
|
||||
env["SECRET_KEY"] = config.secret_key
|
||||
env.setdefault("ENCRYPTION_ENABLED", str(config.encryption_enabled).lower())
|
||||
env.setdefault("KMS_ENABLED", str(config.kms_enabled).lower())
|
||||
env.setdefault("LIFECYCLE_ENABLED", str(config.lifecycle_enabled).lower())
|
||||
env.setdefault("RUST_LOG", "info")
|
||||
|
||||
print(f"Starting Rust S3 engine: {binary}")
|
||||
proc = subprocess.Popen([str(binary)], env=env)
|
||||
try:
|
||||
proc.wait()
|
||||
except KeyboardInterrupt:
|
||||
proc.terminate()
|
||||
proc.wait(timeout=5)
|
||||
|
||||
|
||||
def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None:
|
||||
if prod:
|
||||
_serve_granian("app:create_api_app", port, config)
|
||||
@@ -271,7 +227,6 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--ui-port", type=int, default=5100)
|
||||
parser.add_argument("--prod", action="store_true", help="Run in production mode using Granian")
|
||||
parser.add_argument("--dev", action="store_true", help="Force development mode (Flask dev server)")
|
||||
parser.add_argument("--engine", choices=["python", "rust"], default=os.getenv("ENGINE", "python"), help="API engine: python (Flask) or rust (myfsio-engine)")
|
||||
parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit")
|
||||
parser.add_argument("--show-config", action="store_true", help="Show configuration summary and exit")
|
||||
parser.add_argument("--reset-cred", action="store_true", help="Reset admin credentials and exit")
|
||||
@@ -325,16 +280,8 @@ if __name__ == "__main__":
|
||||
else:
|
||||
print("Running in development mode (Flask dev server)")
|
||||
|
||||
use_rust = args.engine == "rust"
|
||||
|
||||
if args.mode in {"api", "both"}:
|
||||
if use_rust:
|
||||
print(f"Starting Rust API engine on port {args.api_port}...")
|
||||
else:
|
||||
print(f"Starting API server on port {args.api_port}...")
|
||||
if use_rust:
|
||||
api_proc = Process(target=serve_rust_api, args=(args.api_port, config))
|
||||
else:
|
||||
api_proc = Process(target=serve_api, args=(args.api_port, prod_mode, config))
|
||||
api_proc.start()
|
||||
else:
|
||||
|
Before Width: | Height: | Size: 200 KiB After Width: | Height: | Size: 200 KiB |
|
Before Width: | Height: | Size: 872 KiB After Width: | Height: | Size: 872 KiB |
@@ -921,14 +921,14 @@
|
||||
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
|
||||
</svg>
|
||||
<div>
|
||||
<strong>Storage quota enabled</strong>
|
||||
<strong>Storage quota active</strong>
|
||||
<p class="mb-0 small">
|
||||
{% if max_bytes is not none and max_objects is not none %}
|
||||
Limited to {{ max_bytes | filesizeformat }} and {{ max_objects }} objects.
|
||||
This bucket is limited to {{ max_bytes | filesizeformat }} storage and {{ max_objects }} objects.
|
||||
{% elif max_bytes is not none %}
|
||||
Limited to {{ max_bytes | filesizeformat }} storage.
|
||||
This bucket is limited to {{ max_bytes | filesizeformat }} storage.
|
||||
{% else %}
|
||||
Limited to {{ max_objects }} objects.
|
||||
This bucket is limited to {{ max_objects }} objects.
|
||||
{% endif %}
|
||||
</p>
|
||||
</div>
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user