diff --git a/src/databricks/sql/backend/sea/models/__init__.py b/src/databricks/sql/backend/sea/models/__init__.py index c9310d367..b7c8bd399 100644 --- a/src/databricks/sql/backend/sea/models/__init__.py +++ b/src/databricks/sql/backend/sea/models/__init__.py @@ -4,19 +4,49 @@ This package contains data models for SEA API requests and responses. """ +from databricks.sql.backend.sea.models.base import ( + ServiceError, + StatementStatus, + ExternalLink, + ResultData, + ColumnInfo, + ResultManifest, +) + from databricks.sql.backend.sea.models.requests import ( + StatementParameter, + ExecuteStatementRequest, + GetStatementRequest, + CancelStatementRequest, + CloseStatementRequest, CreateSessionRequest, DeleteSessionRequest, ) from databricks.sql.backend.sea.models.responses import ( + ExecuteStatementResponse, + GetStatementResponse, CreateSessionResponse, ) __all__ = [ + # Base models + "ServiceError", + "StatementStatus", + "ExternalLink", + "ResultData", + "ColumnInfo", + "ResultManifest", # Request models + "StatementParameter", + "ExecuteStatementRequest", + "GetStatementRequest", + "CancelStatementRequest", + "CloseStatementRequest", "CreateSessionRequest", "DeleteSessionRequest", # Response models + "ExecuteStatementResponse", + "GetStatementResponse", "CreateSessionResponse", ] diff --git a/src/databricks/sql/backend/sea/models/base.py b/src/databricks/sql/backend/sea/models/base.py new file mode 100644 index 000000000..b12c26eb0 --- /dev/null +++ b/src/databricks/sql/backend/sea/models/base.py @@ -0,0 +1,95 @@ +""" +Base models for the SEA (Statement Execution API) backend. + +These models define the common structures used in SEA API requests and responses. +""" + +from typing import Dict, List, Any, Optional, Union +from dataclasses import dataclass, field + +from databricks.sql.backend.types import CommandState + + +@dataclass +class ServiceError: + """Error information returned by the SEA API.""" + + message: str + error_code: Optional[str] = None + + +@dataclass +class StatementStatus: + """Status information for a statement execution.""" + + state: CommandState + error: Optional[ServiceError] = None + sql_state: Optional[str] = None + + +@dataclass +class ExternalLink: + """External link information for result data.""" + + external_link: str + expiration: str + chunk_index: int + byte_count: int = 0 + row_count: int = 0 + row_offset: int = 0 + next_chunk_index: Optional[int] = None + next_chunk_internal_link: Optional[str] = None + http_headers: Optional[Dict[str, str]] = None + + +@dataclass +class ChunkInfo: + """Information about a chunk in the result set.""" + + chunk_index: int + byte_count: int + row_offset: int + row_count: int + + +@dataclass +class ResultData: + """Result data from a statement execution.""" + + data: Optional[List[List[Any]]] = None + external_links: Optional[List[ExternalLink]] = None + byte_count: Optional[int] = None + chunk_index: Optional[int] = None + next_chunk_index: Optional[int] = None + next_chunk_internal_link: Optional[str] = None + row_count: Optional[int] = None + row_offset: Optional[int] = None + attachment: Optional[bytes] = None + + +@dataclass +class ColumnInfo: + """Information about a column in the result set.""" + + name: str + type_name: str + type_text: str + nullable: bool = True + precision: Optional[int] = None + scale: Optional[int] = None + ordinal_position: Optional[int] = None + + +@dataclass +class ResultManifest: + """Manifest information for a result set.""" + + format: str + schema: Dict[str, Any] # Will contain column information + total_row_count: int + total_byte_count: int + total_chunk_count: int + truncated: bool = False + chunks: Optional[List[ChunkInfo]] = None + result_compression: Optional[str] = None + is_volume_operation: Optional[bool] = None diff --git a/src/databricks/sql/backend/sea/models/requests.py b/src/databricks/sql/backend/sea/models/requests.py index 3175132bd..4c5071dba 100644 --- a/src/databricks/sql/backend/sea/models/requests.py +++ b/src/databricks/sql/backend/sea/models/requests.py @@ -1,5 +1,99 @@ -from typing import Dict, Any, Optional -from dataclasses import dataclass +""" +Request models for the SEA (Statement Execution API) backend. + +These models define the structures used in SEA API requests. +""" + +from typing import Dict, List, Any, Optional, Union +from dataclasses import dataclass, field + + +@dataclass +class StatementParameter: + """Representation of a parameter for a SQL statement.""" + + name: str + value: Optional[str] = None + type: Optional[str] = None + + +@dataclass +class ExecuteStatementRequest: + """Representation of a request to execute a SQL statement.""" + + session_id: str + statement: str + warehouse_id: str + disposition: str = "EXTERNAL_LINKS" + format: str = "JSON_ARRAY" + result_compression: Optional[str] = None + parameters: Optional[List[StatementParameter]] = None + wait_timeout: str = "10s" + on_wait_timeout: str = "CONTINUE" + row_limit: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert the request to a dictionary for JSON serialization.""" + result: Dict[str, Any] = { + "warehouse_id": self.warehouse_id, + "session_id": self.session_id, + "statement": self.statement, + "disposition": self.disposition, + "format": self.format, + "wait_timeout": self.wait_timeout, + "on_wait_timeout": self.on_wait_timeout, + } + + if self.row_limit is not None and self.row_limit > 0: + result["row_limit"] = self.row_limit + + if self.result_compression: + result["result_compression"] = self.result_compression + + if self.parameters: + result["parameters"] = [ + { + "name": param.name, + **({"value": param.value} if param.value is not None else {}), + **({"type": param.type} if param.type is not None else {}), + } + for param in self.parameters + ] + + return result + + +@dataclass +class GetStatementRequest: + """Representation of a request to get information about a statement.""" + + statement_id: str + + def to_dict(self) -> Dict[str, Any]: + """Convert the request to a dictionary for JSON serialization.""" + return {"statement_id": self.statement_id} + + +@dataclass +class CancelStatementRequest: + """Representation of a request to cancel a statement.""" + + statement_id: str + + def to_dict(self) -> Dict[str, Any]: + """Convert the request to a dictionary for JSON serialization.""" + return {"statement_id": self.statement_id} + + +@dataclass +class CloseStatementRequest: + """Representation of a request to close a statement.""" + + statement_id: str + + def to_dict(self) -> Dict[str, Any]: + """Convert the request to a dictionary for JSON serialization.""" + return {"statement_id": self.statement_id} @dataclass diff --git a/src/databricks/sql/backend/sea/models/responses.py b/src/databricks/sql/backend/sea/models/responses.py index 4eeb9eef7..0baf27ab2 100644 --- a/src/databricks/sql/backend/sea/models/responses.py +++ b/src/databricks/sql/backend/sea/models/responses.py @@ -1,6 +1,148 @@ +""" +Response models for the SEA (Statement Execution API) backend. + +These models define the structures used in SEA API responses. +""" + from typing import Dict, Any from dataclasses import dataclass +from databricks.sql.backend.types import CommandState +from databricks.sql.backend.sea.models.base import ( + StatementStatus, + ResultManifest, + ResultData, + ServiceError, + ExternalLink, + ChunkInfo, +) + + +def _parse_status(data: Dict[str, Any]) -> StatementStatus: + """Parse status from response data.""" + status_data = data.get("status", {}) + error = None + if "error" in status_data: + error_data = status_data["error"] + error = ServiceError( + message=error_data.get("message", ""), + error_code=error_data.get("error_code"), + ) + + state = CommandState.from_sea_state(status_data.get("state", "")) + if state is None: + raise ValueError(f"Invalid state: {status_data.get('state', '')}") + + return StatementStatus( + state=state, + error=error, + sql_state=status_data.get("sql_state"), + ) + + +def _parse_manifest(data: Dict[str, Any]) -> ResultManifest: + """Parse manifest from response data.""" + + manifest_data = data.get("manifest", {}) + chunks = None + if "chunks" in manifest_data: + chunks = [ + ChunkInfo( + chunk_index=chunk.get("chunk_index", 0), + byte_count=chunk.get("byte_count", 0), + row_offset=chunk.get("row_offset", 0), + row_count=chunk.get("row_count", 0), + ) + for chunk in manifest_data.get("chunks", []) + ] + + return ResultManifest( + format=manifest_data.get("format", ""), + schema=manifest_data.get("schema", {}), + total_row_count=manifest_data.get("total_row_count", 0), + total_byte_count=manifest_data.get("total_byte_count", 0), + total_chunk_count=manifest_data.get("total_chunk_count", 0), + truncated=manifest_data.get("truncated", False), + chunks=chunks, + result_compression=manifest_data.get("result_compression"), + is_volume_operation=manifest_data.get("is_volume_operation"), + ) + + +def _parse_result(data: Dict[str, Any]) -> ResultData: + """Parse result data from response data.""" + result_data = data.get("result", {}) + external_links = None + + if "external_links" in result_data: + external_links = [] + for link_data in result_data["external_links"]: + external_links.append( + ExternalLink( + external_link=link_data.get("external_link", ""), + expiration=link_data.get("expiration", ""), + chunk_index=link_data.get("chunk_index", 0), + byte_count=link_data.get("byte_count", 0), + row_count=link_data.get("row_count", 0), + row_offset=link_data.get("row_offset", 0), + next_chunk_index=link_data.get("next_chunk_index"), + next_chunk_internal_link=link_data.get("next_chunk_internal_link"), + http_headers=link_data.get("http_headers"), + ) + ) + + return ResultData( + data=result_data.get("data_array"), + external_links=external_links, + byte_count=result_data.get("byte_count"), + chunk_index=result_data.get("chunk_index"), + next_chunk_index=result_data.get("next_chunk_index"), + next_chunk_internal_link=result_data.get("next_chunk_internal_link"), + row_count=result_data.get("row_count"), + row_offset=result_data.get("row_offset"), + attachment=result_data.get("attachment"), + ) + + +@dataclass +class ExecuteStatementResponse: + """Representation of the response from executing a SQL statement.""" + + statement_id: str + status: StatementStatus + manifest: ResultManifest + result: ResultData + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ExecuteStatementResponse": + """Create an ExecuteStatementResponse from a dictionary.""" + return cls( + statement_id=data.get("statement_id", ""), + status=_parse_status(data), + manifest=_parse_manifest(data), + result=_parse_result(data), + ) + + +@dataclass +class GetStatementResponse: + """Representation of the response from getting information about a statement.""" + + statement_id: str + status: StatementStatus + manifest: ResultManifest + result: ResultData + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "GetStatementResponse": + """Create a GetStatementResponse from a dictionary.""" + return cls( + statement_id=data.get("statement_id", ""), + status=_parse_status(data), + manifest=_parse_manifest(data), + result=_parse_result(data), + ) + @dataclass class CreateSessionResponse: