Debug replication corruption issue
This commit is contained in:
@@ -133,18 +133,7 @@ class ReplicationManager:
|
|||||||
content_type, _ = mimetypes.guess_type(path)
|
content_type, _ = mimetypes.guess_type(path)
|
||||||
file_size = path.stat().st_size
|
file_size = path.stat().st_size
|
||||||
|
|
||||||
# Debug: Calculate MD5 of source file
|
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
|
||||||
import hashlib
|
|
||||||
md5_hash = hashlib.md5()
|
|
||||||
with path.open("rb") as f:
|
|
||||||
# Log first 32 bytes
|
|
||||||
header = f.read(32)
|
|
||||||
logger.info(f"Source first 32 bytes: {header.hex()}")
|
|
||||||
md5_hash.update(header)
|
|
||||||
for chunk in iter(lambda: f.read(4096), b""):
|
|
||||||
md5_hash.update(chunk)
|
|
||||||
source_md5 = md5_hash.hexdigest()
|
|
||||||
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, MD5={source_md5}, ContentType={content_type}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with path.open("rb") as f:
|
with path.open("rb") as f:
|
||||||
|
|||||||
@@ -405,7 +405,11 @@ def _canonical_headers_from_request(headers: list[str]) -> str:
|
|||||||
lines = []
|
lines = []
|
||||||
for header in headers:
|
for header in headers:
|
||||||
if header == "host":
|
if header == "host":
|
||||||
value = request.host
|
api_base = current_app.config.get("API_BASE_URL")
|
||||||
|
if api_base:
|
||||||
|
value = urlparse(api_base).netloc
|
||||||
|
else:
|
||||||
|
value = request.host
|
||||||
else:
|
else:
|
||||||
value = request.headers.get(header, "")
|
value = request.headers.get(header, "")
|
||||||
canonical_value = " ".join(value.strip().split()) if value else ""
|
canonical_value = " ".join(value.strip().split()) if value else ""
|
||||||
@@ -1084,7 +1088,12 @@ def object_handler(bucket_name: str, object_key: str):
|
|||||||
current_app.logger.info(f"Headers: {dict(request.headers)}")
|
current_app.logger.info(f"Headers: {dict(request.headers)}")
|
||||||
current_app.logger.info(f"Content-Length: {request.content_length}")
|
current_app.logger.info(f"Content-Length: {request.content_length}")
|
||||||
|
|
||||||
stream = DebugStream(request.stream, current_app.logger)
|
stream = request.stream
|
||||||
|
content_encoding = request.headers.get("Content-Encoding", "").lower()
|
||||||
|
if "aws-chunked" in content_encoding:
|
||||||
|
current_app.logger.info("Decoding aws-chunked stream")
|
||||||
|
stream = AwsChunkedDecoder(stream)
|
||||||
|
|
||||||
metadata = _extract_request_metadata()
|
metadata = _extract_request_metadata()
|
||||||
try:
|
try:
|
||||||
meta = storage.put_object(
|
meta = storage.put_object(
|
||||||
@@ -1260,17 +1269,77 @@ def head_object(bucket_name: str, object_key: str) -> Response:
|
|||||||
return _error_response("AccessDenied", str(exc), 403)
|
return _error_response("AccessDenied", str(exc), 403)
|
||||||
|
|
||||||
|
|
||||||
class DebugStream:
|
class AwsChunkedDecoder:
|
||||||
def __init__(self, stream, logger):
|
"""Decodes aws-chunked encoded streams."""
|
||||||
|
def __init__(self, stream):
|
||||||
self.stream = stream
|
self.stream = stream
|
||||||
self.logger = logger
|
self.buffer = b""
|
||||||
self.first_chunk = True
|
self.chunk_remaining = 0
|
||||||
|
self.finished = False
|
||||||
|
|
||||||
def read(self, size=-1):
|
def read(self, size=-1):
|
||||||
chunk = self.stream.read(size)
|
if self.finished:
|
||||||
if self.first_chunk and chunk:
|
return b""
|
||||||
# Log first 32 bytes
|
|
||||||
prefix = chunk[:32]
|
result = b""
|
||||||
self.logger.info(f"Received first 32 bytes: {prefix.hex()}")
|
while size == -1 or len(result) < size:
|
||||||
self.first_chunk = False
|
if self.chunk_remaining > 0:
|
||||||
return chunk
|
to_read = self.chunk_remaining
|
||||||
|
if size != -1:
|
||||||
|
to_read = min(to_read, size - len(result))
|
||||||
|
|
||||||
|
chunk = self.stream.read(to_read)
|
||||||
|
if not chunk:
|
||||||
|
raise IOError("Unexpected EOF in chunk data")
|
||||||
|
|
||||||
|
result += chunk
|
||||||
|
self.chunk_remaining -= len(chunk)
|
||||||
|
|
||||||
|
if self.chunk_remaining == 0:
|
||||||
|
# Read CRLF after chunk data
|
||||||
|
crlf = self.stream.read(2)
|
||||||
|
if crlf != b"\r\n":
|
||||||
|
raise IOError("Malformed chunk: missing CRLF")
|
||||||
|
else:
|
||||||
|
# Read chunk size line
|
||||||
|
line = b""
|
||||||
|
while True:
|
||||||
|
char = self.stream.read(1)
|
||||||
|
if not char:
|
||||||
|
if not line: # EOF at start of chunk size
|
||||||
|
self.finished = True
|
||||||
|
return result
|
||||||
|
raise IOError("Unexpected EOF in chunk size")
|
||||||
|
line += char
|
||||||
|
if line.endswith(b"\r\n"):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Parse chunk size (hex)
|
||||||
|
try:
|
||||||
|
line_str = line.decode("ascii").strip()
|
||||||
|
# Handle chunk-signature extension if present (e.g. "1000;chunk-signature=...")
|
||||||
|
if ";" in line_str:
|
||||||
|
line_str = line_str.split(";")[0]
|
||||||
|
chunk_size = int(line_str, 16)
|
||||||
|
except ValueError:
|
||||||
|
raise IOError(f"Invalid chunk size: {line}")
|
||||||
|
|
||||||
|
if chunk_size == 0:
|
||||||
|
self.finished = True
|
||||||
|
# Read trailers if any (until empty line)
|
||||||
|
while True:
|
||||||
|
line = b""
|
||||||
|
while True:
|
||||||
|
char = self.stream.read(1)
|
||||||
|
if not char:
|
||||||
|
break
|
||||||
|
line += char
|
||||||
|
if line.endswith(b"\r\n"):
|
||||||
|
break
|
||||||
|
if line == b"\r\n" or not line:
|
||||||
|
break
|
||||||
|
return result
|
||||||
|
|
||||||
|
self.chunk_remaining = chunk_size
|
||||||
|
|
||||||
|
return result
|
||||||
|
|||||||
Reference in New Issue
Block a user