172 lines
5.0 KiB
Python
172 lines
5.0 KiB
Python
"""S3 SelectObjectContent SQL query execution using DuckDB."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Generator, Optional
|
|
|
|
try:
|
|
import duckdb
|
|
DUCKDB_AVAILABLE = True
|
|
except ImportError:
|
|
DUCKDB_AVAILABLE = False
|
|
|
|
|
|
class SelectError(Exception):
|
|
"""Error during SELECT query execution."""
|
|
pass
|
|
|
|
|
|
def execute_select_query(
|
|
file_path: Path,
|
|
expression: str,
|
|
input_format: str,
|
|
input_config: Dict[str, Any],
|
|
output_format: str,
|
|
output_config: Dict[str, Any],
|
|
chunk_size: int = 65536,
|
|
) -> Generator[bytes, None, None]:
|
|
"""Execute SQL query on object content."""
|
|
if not DUCKDB_AVAILABLE:
|
|
raise SelectError("DuckDB is not installed. Install with: pip install duckdb")
|
|
|
|
conn = duckdb.connect(":memory:")
|
|
|
|
try:
|
|
if input_format == "CSV":
|
|
_load_csv(conn, file_path, input_config)
|
|
elif input_format == "JSON":
|
|
_load_json(conn, file_path, input_config)
|
|
elif input_format == "Parquet":
|
|
_load_parquet(conn, file_path)
|
|
else:
|
|
raise SelectError(f"Unsupported input format: {input_format}")
|
|
|
|
normalized_expression = expression.replace("s3object", "data").replace("S3Object", "data")
|
|
|
|
try:
|
|
result = conn.execute(normalized_expression)
|
|
except duckdb.Error as exc:
|
|
raise SelectError(f"SQL execution error: {exc}")
|
|
|
|
if output_format == "CSV":
|
|
yield from _output_csv(result, output_config, chunk_size)
|
|
elif output_format == "JSON":
|
|
yield from _output_json(result, output_config, chunk_size)
|
|
else:
|
|
raise SelectError(f"Unsupported output format: {output_format}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _load_csv(conn, file_path: Path, config: Dict[str, Any]) -> None:
|
|
"""Load CSV file into DuckDB."""
|
|
file_header_info = config.get("file_header_info", "NONE")
|
|
delimiter = config.get("field_delimiter", ",")
|
|
quote = config.get("quote_character", '"')
|
|
|
|
header = file_header_info in ("USE", "IGNORE")
|
|
path_str = str(file_path).replace("\\", "/")
|
|
|
|
conn.execute(f"""
|
|
CREATE TABLE data AS
|
|
SELECT * FROM read_csv('{path_str}',
|
|
header={header},
|
|
delim='{delimiter}',
|
|
quote='{quote}'
|
|
)
|
|
""")
|
|
|
|
|
|
def _load_json(conn, file_path: Path, config: Dict[str, Any]) -> None:
|
|
"""Load JSON file into DuckDB."""
|
|
json_type = config.get("type", "DOCUMENT")
|
|
path_str = str(file_path).replace("\\", "/")
|
|
|
|
if json_type == "LINES":
|
|
conn.execute(f"""
|
|
CREATE TABLE data AS
|
|
SELECT * FROM read_json_auto('{path_str}', format='newline_delimited')
|
|
""")
|
|
else:
|
|
conn.execute(f"""
|
|
CREATE TABLE data AS
|
|
SELECT * FROM read_json_auto('{path_str}', format='array')
|
|
""")
|
|
|
|
|
|
def _load_parquet(conn, file_path: Path) -> None:
|
|
"""Load Parquet file into DuckDB."""
|
|
path_str = str(file_path).replace("\\", "/")
|
|
conn.execute(f"CREATE TABLE data AS SELECT * FROM read_parquet('{path_str}')")
|
|
|
|
|
|
def _output_csv(
|
|
result,
|
|
config: Dict[str, Any],
|
|
chunk_size: int,
|
|
) -> Generator[bytes, None, None]:
|
|
"""Output query results as CSV."""
|
|
delimiter = config.get("field_delimiter", ",")
|
|
record_delimiter = config.get("record_delimiter", "\n")
|
|
quote = config.get("quote_character", '"')
|
|
|
|
buffer = ""
|
|
|
|
while True:
|
|
rows = result.fetchmany(1000)
|
|
if not rows:
|
|
break
|
|
|
|
for row in rows:
|
|
fields = []
|
|
for value in row:
|
|
if value is None:
|
|
fields.append("")
|
|
elif isinstance(value, str):
|
|
if delimiter in value or quote in value or record_delimiter in value:
|
|
escaped = value.replace(quote, quote + quote)
|
|
fields.append(f'{quote}{escaped}{quote}')
|
|
else:
|
|
fields.append(value)
|
|
else:
|
|
fields.append(str(value))
|
|
|
|
buffer += delimiter.join(fields) + record_delimiter
|
|
|
|
while len(buffer) >= chunk_size:
|
|
yield buffer[:chunk_size].encode("utf-8")
|
|
buffer = buffer[chunk_size:]
|
|
|
|
if buffer:
|
|
yield buffer.encode("utf-8")
|
|
|
|
|
|
def _output_json(
|
|
result,
|
|
config: Dict[str, Any],
|
|
chunk_size: int,
|
|
) -> Generator[bytes, None, None]:
|
|
"""Output query results as JSON Lines."""
|
|
record_delimiter = config.get("record_delimiter", "\n")
|
|
columns = [desc[0] for desc in result.description]
|
|
|
|
buffer = ""
|
|
|
|
while True:
|
|
rows = result.fetchmany(1000)
|
|
if not rows:
|
|
break
|
|
|
|
for row in rows:
|
|
record = dict(zip(columns, row))
|
|
buffer += json.dumps(record, default=str) + record_delimiter
|
|
|
|
while len(buffer) >= chunk_size:
|
|
yield buffer[:chunk_size].encode("utf-8")
|
|
buffer = buffer[chunk_size:]
|
|
|
|
if buffer:
|
|
yield buffer.encode("utf-8")
|