Restore data integrity guarantees: Content-MD5 validation, fsync durability, atomic metadata writes, concurrent write protection
This commit is contained in:
@@ -2844,6 +2844,18 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
if "Bucket" in message:
|
||||
return _error_response("NoSuchBucket", message, 404)
|
||||
return _error_response("InvalidArgument", message, 400)
|
||||
|
||||
content_md5 = request.headers.get("Content-MD5")
|
||||
if content_md5 and meta.etag:
|
||||
try:
|
||||
expected_md5 = base64.b64decode(content_md5).hex()
|
||||
except Exception:
|
||||
storage.delete_object(bucket_name, object_key)
|
||||
return _error_response("InvalidDigest", "Content-MD5 header is not valid base64", 400)
|
||||
if expected_md5 != meta.etag:
|
||||
storage.delete_object(bucket_name, object_key)
|
||||
return _error_response("BadDigest", "The Content-MD5 you specified did not match what we received", 400)
|
||||
|
||||
if current_app.logger.isEnabledFor(logging.INFO):
|
||||
current_app.logger.info(
|
||||
"Object uploaded",
|
||||
@@ -3650,6 +3662,15 @@ def _upload_part(bucket_name: str, object_key: str) -> Response:
|
||||
return _error_response("NoSuchUpload", str(exc), 404)
|
||||
return _error_response("InvalidArgument", str(exc), 400)
|
||||
|
||||
content_md5 = request.headers.get("Content-MD5")
|
||||
if content_md5 and etag:
|
||||
try:
|
||||
expected_md5 = base64.b64decode(content_md5).hex()
|
||||
except Exception:
|
||||
return _error_response("InvalidDigest", "Content-MD5 header is not valid base64", 400)
|
||||
if expected_md5 != etag:
|
||||
return _error_response("BadDigest", "The Content-MD5 you specified did not match what we received", 400)
|
||||
|
||||
response = Response(status=200)
|
||||
response.headers["ETag"] = f'"{etag}"'
|
||||
return response
|
||||
|
||||
111
app/storage.py
111
app/storage.py
@@ -361,7 +361,7 @@ class ObjectStorage:
|
||||
|
||||
try:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_text(json.dumps(stats), encoding="utf-8")
|
||||
self._atomic_write_json(cache_path, stats)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -423,7 +423,7 @@ class ObjectStorage:
|
||||
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||
try:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_path.write_text(json.dumps(data), encoding="utf-8")
|
||||
self._atomic_write_json(cache_path, data)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -879,11 +879,6 @@ class ObjectStorage:
|
||||
is_overwrite = destination.exists()
|
||||
existing_size = destination.stat().st_size if is_overwrite else 0
|
||||
|
||||
archived_version_size = 0
|
||||
if self._is_versioning_enabled(bucket_path) and is_overwrite:
|
||||
archived_version_size = existing_size
|
||||
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
|
||||
|
||||
tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR
|
||||
tmp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@@ -910,19 +905,21 @@ class ObjectStorage:
|
||||
quota_check["quota"],
|
||||
quota_check["usage"],
|
||||
)
|
||||
|
||||
shutil.move(str(tmp_path), str(destination))
|
||||
finally:
|
||||
except BaseException:
|
||||
if tmp_path:
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
else:
|
||||
tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp"
|
||||
try:
|
||||
checksum = hashlib.md5()
|
||||
with tmp_path.open("wb") as target:
|
||||
shutil.copyfileobj(stream, target)
|
||||
shutil.copyfileobj(_HashingReader(stream, checksum), target)
|
||||
target.flush()
|
||||
os.fsync(target.fileno())
|
||||
|
||||
new_size = tmp_path.stat().st_size
|
||||
size_delta = new_size - existing_size
|
||||
@@ -941,27 +938,43 @@ class ObjectStorage:
|
||||
quota_check["usage"],
|
||||
)
|
||||
|
||||
checksum = hashlib.md5()
|
||||
with tmp_path.open("rb") as f:
|
||||
while True:
|
||||
chunk = f.read(1048576)
|
||||
if not chunk:
|
||||
break
|
||||
checksum.update(chunk)
|
||||
etag = checksum.hexdigest()
|
||||
|
||||
shutil.move(str(tmp_path), str(destination))
|
||||
finally:
|
||||
except BaseException:
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
stat = destination.stat()
|
||||
lock_file_path = self._system_bucket_root(bucket_id) / "locks" / f"{safe_key.as_posix().replace('/', '_')}.lock"
|
||||
try:
|
||||
with _atomic_lock_file(lock_file_path):
|
||||
archived_version_size = 0
|
||||
if self._is_versioning_enabled(bucket_path) and is_overwrite:
|
||||
archived_version_size = existing_size
|
||||
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
|
||||
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)}
|
||||
combined_meta = {**internal_meta, **(metadata or {})}
|
||||
self._write_metadata(bucket_id, safe_key, combined_meta)
|
||||
shutil.move(str(tmp_path), str(destination))
|
||||
tmp_path = None
|
||||
|
||||
stat = destination.stat()
|
||||
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)}
|
||||
combined_meta = {**internal_meta, **(metadata or {})}
|
||||
self._write_metadata(bucket_id, safe_key, combined_meta)
|
||||
except BlockingIOError:
|
||||
try:
|
||||
if tmp_path:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise StorageError("Another upload to this key is in progress")
|
||||
finally:
|
||||
if tmp_path:
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
self._update_bucket_stats_cache(
|
||||
bucket_id,
|
||||
@@ -1553,18 +1566,16 @@ class ObjectStorage:
|
||||
temp_path = upload_root / f".{part_filename}.tmp"
|
||||
|
||||
try:
|
||||
with temp_path.open("wb") as target:
|
||||
shutil.copyfileobj(stream, target)
|
||||
if _HAS_RUST:
|
||||
with temp_path.open("wb") as target:
|
||||
shutil.copyfileobj(stream, target)
|
||||
part_etag = _rc.md5_file(str(temp_path))
|
||||
else:
|
||||
checksum = hashlib.md5()
|
||||
with temp_path.open("rb") as f:
|
||||
while True:
|
||||
chunk = f.read(1048576)
|
||||
if not chunk:
|
||||
break
|
||||
checksum.update(chunk)
|
||||
with temp_path.open("wb") as target:
|
||||
shutil.copyfileobj(_HashingReader(stream, checksum), target)
|
||||
target.flush()
|
||||
os.fsync(target.fileno())
|
||||
part_etag = checksum.hexdigest()
|
||||
temp_path.replace(part_path)
|
||||
except OSError:
|
||||
@@ -1598,7 +1609,7 @@ class ObjectStorage:
|
||||
|
||||
parts = manifest.setdefault("parts", {})
|
||||
parts[str(part_number)] = record
|
||||
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
|
||||
self._atomic_write_json(manifest_path, manifest)
|
||||
break
|
||||
except OSError as exc:
|
||||
if attempt < max_retries - 1:
|
||||
@@ -1691,7 +1702,7 @@ class ObjectStorage:
|
||||
|
||||
parts = manifest.setdefault("parts", {})
|
||||
parts[str(part_number)] = record
|
||||
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
|
||||
self._atomic_write_json(manifest_path, manifest)
|
||||
break
|
||||
except OSError as exc:
|
||||
if attempt < max_retries - 1:
|
||||
@@ -1797,6 +1808,8 @@ class ObjectStorage:
|
||||
break
|
||||
checksum.update(data)
|
||||
target.write(data)
|
||||
target.flush()
|
||||
os.fsync(target.fileno())
|
||||
checksum_hex = checksum.hexdigest()
|
||||
except BlockingIOError:
|
||||
raise StorageError("Another upload to this key is in progress")
|
||||
@@ -2303,6 +2316,23 @@ class ObjectStorage:
|
||||
):
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@staticmethod
|
||||
def _atomic_write_json(path: Path, data: Any) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = path.with_suffix(".tmp")
|
||||
try:
|
||||
with tmp_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
tmp_path.replace(path)
|
||||
except BaseException:
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
def _multipart_dir(self, bucket_name: str, upload_id: str) -> Path:
|
||||
return self._multipart_bucket_root(bucket_name) / upload_id
|
||||
|
||||
@@ -2337,7 +2367,7 @@ class ObjectStorage:
|
||||
def _write_bucket_config(self, bucket_name: str, payload: dict[str, Any]) -> None:
|
||||
config_path = self._bucket_config_path(bucket_name)
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
config_path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
self._atomic_write_json(config_path, payload)
|
||||
try:
|
||||
mtime = config_path.stat().st_mtime
|
||||
except OSError:
|
||||
@@ -2371,8 +2401,7 @@ class ObjectStorage:
|
||||
|
||||
def _write_multipart_manifest(self, upload_root: Path, manifest: dict[str, Any]) -> None:
|
||||
manifest_path = upload_root / self.MULTIPART_MANIFEST
|
||||
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
|
||||
self._atomic_write_json(manifest_path, manifest)
|
||||
|
||||
def _metadata_file(self, bucket_name: str, key: Path) -> Path:
|
||||
meta_root = self._bucket_meta_root(bucket_name)
|
||||
@@ -2442,7 +2471,7 @@ class ObjectStorage:
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
index_data[entry_name] = entry
|
||||
index_path.write_text(json.dumps(index_data), encoding="utf-8")
|
||||
self._atomic_write_json(index_path, index_data)
|
||||
self._invalidate_meta_read_cache(bucket_name, key)
|
||||
|
||||
def _delete_index_entry(self, bucket_name: str, key: Path) -> None:
|
||||
@@ -2463,7 +2492,7 @@ class ObjectStorage:
|
||||
if entry_name in index_data:
|
||||
del index_data[entry_name]
|
||||
if index_data:
|
||||
index_path.write_text(json.dumps(index_data), encoding="utf-8")
|
||||
self._atomic_write_json(index_path, index_data)
|
||||
else:
|
||||
try:
|
||||
index_path.unlink()
|
||||
@@ -2512,7 +2541,7 @@ class ObjectStorage:
|
||||
"reason": reason,
|
||||
}
|
||||
manifest_path = version_dir / f"{version_id}.json"
|
||||
manifest_path.write_text(json.dumps(record), encoding="utf-8")
|
||||
self._atomic_write_json(manifest_path, record)
|
||||
|
||||
def _read_metadata(self, bucket_name: str, key: Path) -> Dict[str, str]:
|
||||
entry = self._read_index_entry(bucket_name, key)
|
||||
|
||||
@@ -46,6 +46,8 @@ pub fn stream_to_file_with_md5(
|
||||
|
||||
py.check_signals()?;
|
||||
}
|
||||
file.sync_all()
|
||||
.map_err(|e| PyIOError::new_err(format!("Failed to fsync: {}", e)))?;
|
||||
Ok(())
|
||||
})();
|
||||
|
||||
@@ -102,6 +104,9 @@ pub fn assemble_parts_with_md5(
|
||||
}
|
||||
}
|
||||
|
||||
target.sync_all()
|
||||
.map_err(|e| PyIOError::new_err(format!("Failed to fsync: {}", e)))?;
|
||||
|
||||
Ok(format!("{:x}", hasher.finalize()))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user