Files
MyFSIO/app/select_content.py

172 lines
5.0 KiB
Python

"""S3 SelectObjectContent SQL query execution using DuckDB."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, Generator, Optional
try:
import duckdb
DUCKDB_AVAILABLE = True
except ImportError:
DUCKDB_AVAILABLE = False
class SelectError(Exception):
"""Error during SELECT query execution."""
pass
def execute_select_query(
file_path: Path,
expression: str,
input_format: str,
input_config: Dict[str, Any],
output_format: str,
output_config: Dict[str, Any],
chunk_size: int = 65536,
) -> Generator[bytes, None, None]:
"""Execute SQL query on object content."""
if not DUCKDB_AVAILABLE:
raise SelectError("DuckDB is not installed. Install with: pip install duckdb")
conn = duckdb.connect(":memory:")
try:
if input_format == "CSV":
_load_csv(conn, file_path, input_config)
elif input_format == "JSON":
_load_json(conn, file_path, input_config)
elif input_format == "Parquet":
_load_parquet(conn, file_path)
else:
raise SelectError(f"Unsupported input format: {input_format}")
normalized_expression = expression.replace("s3object", "data").replace("S3Object", "data")
try:
result = conn.execute(normalized_expression)
except duckdb.Error as exc:
raise SelectError(f"SQL execution error: {exc}")
if output_format == "CSV":
yield from _output_csv(result, output_config, chunk_size)
elif output_format == "JSON":
yield from _output_json(result, output_config, chunk_size)
else:
raise SelectError(f"Unsupported output format: {output_format}")
finally:
conn.close()
def _load_csv(conn, file_path: Path, config: Dict[str, Any]) -> None:
"""Load CSV file into DuckDB."""
file_header_info = config.get("file_header_info", "NONE")
delimiter = config.get("field_delimiter", ",")
quote = config.get("quote_character", '"')
header = file_header_info in ("USE", "IGNORE")
path_str = str(file_path).replace("\\", "/")
conn.execute(f"""
CREATE TABLE data AS
SELECT * FROM read_csv('{path_str}',
header={header},
delim='{delimiter}',
quote='{quote}'
)
""")
def _load_json(conn, file_path: Path, config: Dict[str, Any]) -> None:
"""Load JSON file into DuckDB."""
json_type = config.get("type", "DOCUMENT")
path_str = str(file_path).replace("\\", "/")
if json_type == "LINES":
conn.execute(f"""
CREATE TABLE data AS
SELECT * FROM read_json_auto('{path_str}', format='newline_delimited')
""")
else:
conn.execute(f"""
CREATE TABLE data AS
SELECT * FROM read_json_auto('{path_str}', format='array')
""")
def _load_parquet(conn, file_path: Path) -> None:
"""Load Parquet file into DuckDB."""
path_str = str(file_path).replace("\\", "/")
conn.execute(f"CREATE TABLE data AS SELECT * FROM read_parquet('{path_str}')")
def _output_csv(
result,
config: Dict[str, Any],
chunk_size: int,
) -> Generator[bytes, None, None]:
"""Output query results as CSV."""
delimiter = config.get("field_delimiter", ",")
record_delimiter = config.get("record_delimiter", "\n")
quote = config.get("quote_character", '"')
buffer = ""
while True:
rows = result.fetchmany(1000)
if not rows:
break
for row in rows:
fields = []
for value in row:
if value is None:
fields.append("")
elif isinstance(value, str):
if delimiter in value or quote in value or record_delimiter in value:
escaped = value.replace(quote, quote + quote)
fields.append(f'{quote}{escaped}{quote}')
else:
fields.append(value)
else:
fields.append(str(value))
buffer += delimiter.join(fields) + record_delimiter
while len(buffer) >= chunk_size:
yield buffer[:chunk_size].encode("utf-8")
buffer = buffer[chunk_size:]
if buffer:
yield buffer.encode("utf-8")
def _output_json(
result,
config: Dict[str, Any],
chunk_size: int,
) -> Generator[bytes, None, None]:
"""Output query results as JSON Lines."""
record_delimiter = config.get("record_delimiter", "\n")
columns = [desc[0] for desc in result.description]
buffer = ""
while True:
rows = result.fetchmany(1000)
if not rows:
break
for row in rows:
record = dict(zip(columns, row))
buffer += json.dumps(record, default=str) + record_delimiter
while len(buffer) >= chunk_size:
yield buffer[:chunk_size].encode("utf-8")
buffer = buffer[chunk_size:]
if buffer:
yield buffer.encode("utf-8")