diff --git a/examples/monitoring/export_inference_pipeline_data.py b/examples/monitoring/export_inference_pipeline_data.py new file mode 100644 index 00000000..83a83eb1 --- /dev/null +++ b/examples/monitoring/export_inference_pipeline_data.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3 +""" +Example: Export data from inference pipelines using the Openlayer Python SDK. + +This example demonstrates how to: +1. Export data from an inference pipeline for a specified time range +2. Poll for task completion +3. Retrieve the presigned URL for downloading the exported data +4. Handle different export formats (JSON, CSV) +5. Work with both synchronous and asynchronous clients + +Requirements: +- OPENLAYER_API_KEY environment variable +- OPENLAYER_PIPELINE_ID environment variable +""" + +import os +import time +import asyncio +from datetime import datetime, timedelta +from typing import Optional + +import openlayer + + +def export_pipeline_data_sync( + pipeline_id: str, + start_datetime: datetime, + end_datetime: datetime, + format_type: str = "json", + api_key: Optional[str] = None, +) -> str: + """ + Export data from inference pipeline synchronously. + + Args: + pipeline_id: The inference pipeline ID + start_datetime: Start datetime for export range + end_datetime: End datetime for export range + format_type: Export format ('json' or 'csv') + api_key: Optional API key (uses environment variable if not provided) + + Returns: + The storage URI of the exported data + + Raises: + ValueError: If the export task fails + TimeoutError: If the task doesn't complete within the timeout period + """ + # Initialize the client + client = openlayer.Openlayer(api_key=api_key) + + # Convert datetime objects to Unix timestamps + start_timestamp = int(start_datetime.timestamp()) + end_timestamp = int(end_datetime.timestamp()) + + print(f"Exporting data from {start_datetime} to {end_datetime}") + print(f"Time range: {start_timestamp} - {end_timestamp} (Unix timestamps)") + print(f"Format: {format_type}") + + # Start the export task + export_response = client.inference_pipelines.export_data( + inference_pipeline_id=pipeline_id, + start=start_timestamp, + end=end_timestamp, + fmt=format_type, + ) + + print(f"Export task started. Task URL: {export_response.task_result_url}") + + # Poll for task completion + max_attempts = 60 # Maximum 5 minutes with 5-second intervals + attempt = 0 + + while attempt < max_attempts: + # Check task status + task_status = client.inference_pipelines.get_task_status( + task_result_url=export_response.task_result_url + ) + + print(f"Attempt {attempt + 1}: Task complete = {task_status.complete}") + + if task_status.complete: + if task_status.error: + raise ValueError(f"Export task failed: {task_status.error}") + + if task_status.outputs and task_status.outputs.storage_uri: + print(f"Export completed successfully!") + print(f"Storage URI: {task_status.outputs.storage_uri}") + return task_status.outputs.storage_uri + else: + raise ValueError("Export completed but no storage URI available") + + # Wait before next attempt + time.sleep(5) + attempt += 1 + + raise TimeoutError("Export task did not complete within the timeout period") + + +async def export_pipeline_data_async( + pipeline_id: str, + start_datetime: datetime, + end_datetime: datetime, + format_type: str = "json", + api_key: Optional[str] = None, +) -> str: + """ + Export data from inference pipeline asynchronously. + + Args: + pipeline_id: The inference pipeline ID + start_datetime: Start datetime for export range + end_datetime: End datetime for export range + format_type: Export format ('json' or 'csv') + api_key: Optional API key (uses environment variable if not provided) + + Returns: + The storage URI of the exported data + + Raises: + ValueError: If the export task fails + TimeoutError: If the task doesn't complete within the timeout period + """ + # Initialize the async client + async with openlayer.AsyncOpenlayer(api_key=api_key) as client: + # Convert datetime objects to Unix timestamps + start_timestamp = int(start_datetime.timestamp()) + end_timestamp = int(end_datetime.timestamp()) + + print(f"[Async] Exporting data from {start_datetime} to {end_datetime}") + print(f"[Async] Time range: {start_timestamp} - {end_timestamp} (Unix timestamps)") + print(f"[Async] Format: {format_type}") + + # Start the export task + export_response = await client.inference_pipelines.export_data( + inference_pipeline_id=pipeline_id, + start=start_timestamp, + end=end_timestamp, + fmt=format_type, + ) + + print(f"[Async] Export task started. Task URL: {export_response.task_result_url}") + + # Poll for task completion + max_attempts = 60 # Maximum 5 minutes with 5-second intervals + attempt = 0 + + while attempt < max_attempts: + # Check task status + task_status = await client.inference_pipelines.get_task_status( + task_result_url=export_response.task_result_url + ) + + print(f"[Async] Attempt {attempt + 1}: Task complete = {task_status.complete}") + + if task_status.complete: + if task_status.error: + raise ValueError(f"Export task failed: {task_status.error}") + + if task_status.outputs and task_status.outputs.storage_uri: + print(f"[Async] Export completed successfully!") + print(f"[Async] Storage URI: {task_status.outputs.storage_uri}") + return task_status.outputs.storage_uri + else: + raise ValueError("Export completed but no storage URI available") + + # Wait before next attempt + await asyncio.sleep(5) + attempt += 1 + + raise TimeoutError("Export task did not complete within the timeout period") + + +def get_presigned_url_for_download(storage_uri: str, api_key: Optional[str] = None) -> str: + """ + Get a presigned URL for downloading the exported data. + + Args: + storage_uri: The storage URI returned from the export task + api_key: Optional API key (uses environment variable if not provided) + + Returns: + The presigned URL for downloading the data + """ + client = openlayer.Openlayer(api_key=api_key) + + # Get presigned URL for the storage URI + presigned_response = client.storage.presigned_url.create( + object_name=storage_uri + ) + + print(f"Presigned URL obtained: {presigned_response.download_url}") + return presigned_response.download_url + + +def main(): + """Main function demonstrating export functionality.""" + # Get configuration from environment variables + pipeline_id = os.getenv("OPENLAYER_PIPELINE_ID") + api_key = os.getenv("OPENLAYER_API_KEY") + + if not pipeline_id: + raise ValueError("OPENLAYER_PIPELINE_ID environment variable is required") + + if not api_key: + raise ValueError("OPENLAYER_API_KEY environment variable is required") + + # Example: Export data from the last 24 hours + end_datetime = datetime.now() + start_datetime = end_datetime - timedelta(days=1) + + print("=== Synchronous Export Example ===") + try: + # Export as JSON + storage_uri_json = export_pipeline_data_sync( + pipeline_id=pipeline_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + format_type="json", + api_key=api_key, + ) + + # Get presigned URL for download + download_url_json = get_presigned_url_for_download( + storage_uri=storage_uri_json, + api_key=api_key, + ) + + print(f"JSON export download URL: {download_url_json}") + + except Exception as e: + print(f"Synchronous export failed: {e}") + + print("\n=== CSV Export Example ===") + try: + # Export as CSV + storage_uri_csv = export_pipeline_data_sync( + pipeline_id=pipeline_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + format_type="csv", + api_key=api_key, + ) + + # Get presigned URL for download + download_url_csv = get_presigned_url_for_download( + storage_uri=storage_uri_csv, + api_key=api_key, + ) + + print(f"CSV export download URL: {download_url_csv}") + + except Exception as e: + print(f"CSV export failed: {e}") + + print("\n=== Asynchronous Export Example ===") + try: + # Example with async client + storage_uri_async = asyncio.run( + export_pipeline_data_async( + pipeline_id=pipeline_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + format_type="json", + api_key=api_key, + ) + ) + + # Get presigned URL for download + download_url_async = get_presigned_url_for_download( + storage_uri=storage_uri_async, + api_key=api_key, + ) + + print(f"Async export download URL: {download_url_async}") + + except Exception as e: + print(f"Asynchronous export failed: {e}") + + +def example_with_specific_dates(): + """Example with specific date ranges.""" + pipeline_id = os.getenv("OPENLAYER_PIPELINE_ID") + api_key = os.getenv("OPENLAYER_API_KEY") + + if not pipeline_id or not api_key: + print("Skipping specific dates example - missing environment variables") + return + + # Example: Export data for a specific date range + start_dt = datetime(2024, 1, 1, 0, 0, 0) + end_dt = datetime(2024, 1, 2, 0, 0, 0) + + print(f"\n=== Specific Date Range Export ===") + print(f"Exporting data from {start_dt} to {end_dt}") + + try: + storage_uri = export_pipeline_data_sync( + pipeline_id=pipeline_id, + start_datetime=start_dt, + end_datetime=end_dt, + format_type="json", + api_key=api_key, + ) + + download_url = get_presigned_url_for_download( + storage_uri=storage_uri, + api_key=api_key, + ) + + print(f"Specific date range export download URL: {download_url}") + + except Exception as e: + print(f"Specific date range export failed: {e}") + + +if __name__ == "__main__": + # Example: current datetime and timestamp conversions + dt_object = datetime.now() + timestamp_float = dt_object.timestamp() + timestamp_integer = int(timestamp_float) + + print(f"Datetime object: {dt_object}") + print(f"Timestamp (float): {timestamp_float}") + print(f"Timestamp (integer): {timestamp_integer}") + print("=" * 50) + + # Run the main examples + main() + + # Run example with specific dates + example_with_specific_dates() + + print("\n" + "=" * 50) + print("Export examples completed!") diff --git a/examples/monitoring/export_with_utils.py b/examples/monitoring/export_with_utils.py new file mode 100644 index 00000000..f6c62356 --- /dev/null +++ b/examples/monitoring/export_with_utils.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +""" +Example using the high-level utility functions for exporting inference pipeline data. + +This example demonstrates the easiest way to export data using the utility functions +that handle all the complexity of polling and error handling. +""" + +import os +import asyncio +from datetime import datetime, timedelta + +import openlayer +from openlayer.lib.data import ( + export_inference_pipeline_data, + export_inference_pipeline_data_async, + get_download_url, + get_download_url_async, +) + + +def simple_export_example(): + """Simple synchronous export using utility functions.""" + print("=== Simple Export with Utility Functions ===") + + # Get configuration from environment + pipeline_id = os.getenv("OPENLAYER_PIPELINE_ID") + api_key = os.getenv("OPENLAYER_API_KEY") + + if not pipeline_id or not api_key: + print("Missing OPENLAYER_PIPELINE_ID or OPENLAYER_API_KEY environment variables") + return + + # Initialize client + client = openlayer.Openlayer(api_key=api_key) + + # Define export timeframe (last 24 hours) + end_time = datetime.now() + start_time = end_time - timedelta(days=1) + + print(f"Exporting data from {start_time} to {end_time}") + + try: + # Export data (this handles all the polling automatically) + storage_uri = export_inference_pipeline_data( + client=client, + inference_pipeline_id=pipeline_id, + start_datetime=start_time, + end_datetime=end_time, + format_type="json", + timeout_seconds=300, # 5 minutes timeout + ) + + print(f"āœ… Export completed! Storage URI: {storage_uri}") + + # Get download URL + download_url = get_download_url(client, storage_uri) + print(f"šŸ“„ Download URL: {download_url}") + + except Exception as e: + print(f"āŒ Export failed: {e}") + + +async def async_export_example(): + """Asynchronous export using utility functions.""" + print("\n=== Async Export with Utility Functions ===") + + # Get configuration from environment + pipeline_id = os.getenv("OPENLAYER_PIPELINE_ID") + api_key = os.getenv("OPENLAYER_API_KEY") + + if not pipeline_id or not api_key: + print("Missing OPENLAYER_PIPELINE_ID or OPENLAYER_API_KEY environment variables") + return + + # Use async context manager + async with openlayer.AsyncOpenlayer(api_key=api_key) as client: + # Define export timeframe (last week) + end_time = datetime.now() + start_time = end_time - timedelta(days=7) + + print(f"[Async] Exporting data from {start_time} to {end_time}") + + try: + # Export data asynchronously + storage_uri = await export_inference_pipeline_data_async( + client=client, + inference_pipeline_id=pipeline_id, + start_datetime=start_time, + end_datetime=end_time, + format_type="csv", # Try CSV format + timeout_seconds=600, # 10 minutes timeout for larger export + ) + + print(f"āœ… [Async] Export completed! Storage URI: {storage_uri}") + + # Get download URL asynchronously + download_url = await get_download_url_async(client, storage_uri) + print(f"šŸ“„ [Async] Download URL: {download_url}") + + except Exception as e: + print(f"āŒ [Async] Export failed: {e}") + + +def multiple_exports_example(): + """Example of exporting multiple date ranges.""" + print("\n=== Multiple Exports Example ===") + + # Get configuration from environment + pipeline_id = os.getenv("OPENLAYER_PIPELINE_ID") + api_key = os.getenv("OPENLAYER_API_KEY") + + if not pipeline_id or not api_key: + print("Missing OPENLAYER_PIPELINE_ID or OPENLAYER_API_KEY environment variables") + return + + client = openlayer.Openlayer(api_key=api_key) + + # Export data for each day of the last week + base_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + + for i in range(7): + start_time = base_date - timedelta(days=i+1) + end_time = base_date - timedelta(days=i) + + print(f"Exporting day {i+1}: {start_time.date()}") + + try: + storage_uri = export_inference_pipeline_data( + client=client, + inference_pipeline_id=pipeline_id, + start_datetime=start_time, + end_datetime=end_time, + format_type="json", + timeout_seconds=120, # Shorter timeout for smaller exports + ) + + print(f" āœ… Day {i+1} exported: {storage_uri}") + + except Exception as e: + print(f" āŒ Day {i+1} failed: {e}") + + +def main(): + """Run all examples.""" + print("šŸš€ Starting inference pipeline data export examples...") + + # Run synchronous example + simple_export_example() + + # Run asynchronous example + asyncio.run(async_export_example()) + + # Run multiple exports example + multiple_exports_example() + + print("\nšŸŽ‰ All examples completed!") + + +if __name__ == "__main__": + main() diff --git a/src/openlayer/lib/core/metrics.py b/src/openlayer/lib/core/metrics.py index c314ba24..4b887352 100644 --- a/src/openlayer/lib/core/metrics.py +++ b/src/openlayer/lib/core/metrics.py @@ -100,7 +100,7 @@ def _parse_args(self) -> None: type=str, required=False, default="", - help="The name of the dataset to compute the metric on. Runs on all " "datasets if not provided.", + help="The name of the dataset to compute the metric on. Runs on all datasets if not provided.", ) # Parse the arguments @@ -231,7 +231,7 @@ def compute(self, datasets: List[Dataset]) -> None: for dataset in datasets: # Check if the metric has already been computed if os.path.exists(os.path.join(dataset.output_path, "metrics", f"{self.key}.json")): - print(f"Metric ({self.key}) already computed on {dataset.name}. " "Skipping.") + print(f"Metric ({self.key}) already computed on {dataset.name}. Skipping.") continue try: diff --git a/src/openlayer/lib/data/export_utils.py b/src/openlayer/lib/data/export_utils.py new file mode 100644 index 00000000..f52dfff3 --- /dev/null +++ b/src/openlayer/lib/data/export_utils.py @@ -0,0 +1,280 @@ +""" +Utility functions for exporting data from inference pipelines. + +This module provides high-level utility functions that encapsulate the export workflow, +making it easier for users to export inference pipeline data without dealing with +the low-level API details. +""" + +import time +from datetime import datetime +from typing import Optional, Literal + +from ... import Openlayer, AsyncOpenlayer +from ...types.inference_pipelines.export_data_response import ExportDataResponse +from ...types.inference_pipelines.task_status_response import TaskStatusResponse + + +def export_inference_pipeline_data( + client: Openlayer, + inference_pipeline_id: str, + start_datetime: datetime, + end_datetime: datetime, + format_type: Literal["json", "csv"] = "json", + timeout_seconds: int = 300, + poll_interval_seconds: int = 5, +) -> str: + """ + Export data from an inference pipeline and wait for completion. + + This is a high-level utility function that handles the entire export workflow: + 1. Initiates the export request + 2. Polls for task completion + 3. Returns the storage URI when ready + + Args: + client: The Openlayer client instance + inference_pipeline_id: The ID of the inference pipeline to export data from + start_datetime: Start datetime for the export range + end_datetime: End datetime for the export range + format_type: Export format, either "json" or "csv" + timeout_seconds: Maximum time to wait for export completion (default: 5 minutes) + poll_interval_seconds: Time between status checks (default: 5 seconds) + + Returns: + The storage URI of the exported data + + Raises: + ValueError: If the export task fails + TimeoutError: If the task doesn't complete within the timeout period + Exception: For other API errors + + Example: + ```python + import openlayer + from datetime import datetime, timedelta + from openlayer.lib.data.export_utils import export_inference_pipeline_data + + client = openlayer.Openlayer() + + # Export last 24 hours of data + end_time = datetime.now() + start_time = end_time - timedelta(days=1) + + storage_uri = export_inference_pipeline_data( + client=client, + inference_pipeline_id="your-pipeline-id", + start_datetime=start_time, + end_datetime=end_time, + format_type="json" + ) + + print(f"Data exported to: {storage_uri}") + ``` + """ + # Convert datetime objects to Unix timestamps + start_timestamp = int(start_datetime.timestamp()) + end_timestamp = int(end_datetime.timestamp()) + + # Start the export task + export_response = client.inference_pipelines.export_data( + inference_pipeline_id=inference_pipeline_id, + start=start_timestamp, + end=end_timestamp, + fmt=format_type, + ) + + # Poll for task completion + max_attempts = timeout_seconds // poll_interval_seconds + attempt = 0 + + while attempt < max_attempts: + task_status = client.inference_pipelines.get_task_status( + task_result_url=export_response.task_result_url + ) + + if task_status.complete: + if task_status.error: + raise ValueError(f"Export task failed: {task_status.error}") + + if task_status.outputs and task_status.outputs.storage_uri: + return task_status.outputs.storage_uri + else: + raise ValueError("Export completed but no storage URI available") + + time.sleep(poll_interval_seconds) + attempt += 1 + + raise TimeoutError( + f"Export task did not complete within {timeout_seconds} seconds" + ) + + +async def export_inference_pipeline_data_async( + client: AsyncOpenlayer, + inference_pipeline_id: str, + start_datetime: datetime, + end_datetime: datetime, + format_type: Literal["json", "csv"] = "json", + timeout_seconds: int = 300, + poll_interval_seconds: int = 5, +) -> str: + """ + Asynchronously export data from an inference pipeline and wait for completion. + + This is the async version of export_inference_pipeline_data. + + Args: + client: The AsyncOpenlayer client instance + inference_pipeline_id: The ID of the inference pipeline to export data from + start_datetime: Start datetime for the export range + end_datetime: End datetime for the export range + format_type: Export format, either "json" or "csv" + timeout_seconds: Maximum time to wait for export completion (default: 5 minutes) + poll_interval_seconds: Time between status checks (default: 5 seconds) + + Returns: + The storage URI of the exported data + + Raises: + ValueError: If the export task fails + TimeoutError: If the task doesn't complete within the timeout period + Exception: For other API errors + + Example: + ```python + import asyncio + import openlayer + from datetime import datetime, timedelta + from openlayer.lib.data.export_utils import export_inference_pipeline_data_async + + async def main(): + async with openlayer.AsyncOpenlayer() as client: + end_time = datetime.now() + start_time = end_time - timedelta(days=1) + + storage_uri = await export_inference_pipeline_data_async( + client=client, + inference_pipeline_id="your-pipeline-id", + start_datetime=start_time, + end_datetime=end_time, + format_type="json" + ) + + print(f"Data exported to: {storage_uri}") + + asyncio.run(main()) + ``` + """ + import asyncio + + # Convert datetime objects to Unix timestamps + start_timestamp = int(start_datetime.timestamp()) + end_timestamp = int(end_datetime.timestamp()) + + # Start the export task + export_response = await client.inference_pipelines.export_data( + inference_pipeline_id=inference_pipeline_id, + start=start_timestamp, + end=end_timestamp, + fmt=format_type, + ) + + # Poll for task completion + max_attempts = timeout_seconds // poll_interval_seconds + attempt = 0 + + while attempt < max_attempts: + task_status = await client.inference_pipelines.get_task_status( + task_result_url=export_response.task_result_url + ) + + if task_status.complete: + if task_status.error: + raise ValueError(f"Export task failed: {task_status.error}") + + if task_status.outputs and task_status.outputs.storage_uri: + return task_status.outputs.storage_uri + else: + raise ValueError("Export completed but no storage URI available") + + await asyncio.sleep(poll_interval_seconds) + attempt += 1 + + raise TimeoutError( + f"Export task did not complete within {timeout_seconds} seconds" + ) + + +def get_download_url( + client: Openlayer, + storage_uri: str, +) -> str: + """ + Get a presigned download URL for exported data. + + Args: + client: The Openlayer client instance + storage_uri: The storage URI returned from an export operation + + Returns: + The presigned download URL + + Example: + ```python + import openlayer + from openlayer.lib.data.export_utils import export_inference_pipeline_data, get_download_url + + client = openlayer.Openlayer() + + # Export data + storage_uri = export_inference_pipeline_data(...) + + # Get download URL + download_url = get_download_url(client, storage_uri) + print(f"Download your data from: {download_url}") + ``` + """ + presigned_response = client.storage.presigned_url.create( + object_name=storage_uri + ) + return presigned_response.download_url + + +async def get_download_url_async( + client: AsyncOpenlayer, + storage_uri: str, +) -> str: + """ + Asynchronously get a presigned download URL for exported data. + + Args: + client: The AsyncOpenlayer client instance + storage_uri: The storage URI returned from an export operation + + Returns: + The presigned download URL + + Example: + ```python + import asyncio + import openlayer + from openlayer.lib.data.export_utils import export_inference_pipeline_data_async, get_download_url_async + + async def main(): + async with openlayer.AsyncOpenlayer() as client: + # Export data + storage_uri = await export_inference_pipeline_data_async(...) + + # Get download URL + download_url = await get_download_url_async(client, storage_uri) + print(f"Download your data from: {download_url}") + + asyncio.run(main()) + ``` + """ + presigned_response = await client.storage.presigned_url.create( + object_name=storage_uri + ) + return presigned_response.download_url + diff --git a/src/openlayer/resources/inference_pipelines/inference_pipelines.py b/src/openlayer/resources/inference_pipelines/inference_pipelines.py index c9c29f5c..aa3e228f 100644 --- a/src/openlayer/resources/inference_pipelines/inference_pipelines.py +++ b/src/openlayer/resources/inference_pipelines/inference_pipelines.py @@ -24,6 +24,7 @@ AsyncRowsResourceWithStreamingResponse, ) from ...types import inference_pipeline_update_params, inference_pipeline_retrieve_params +from ...types.inference_pipelines import export_data_params from ..._types import NOT_GIVEN, Body, Query, Headers, NoneType, NotGiven from ..._utils import maybe_transform, async_maybe_transform from ..._compat import cached_property @@ -45,6 +46,8 @@ from ..._base_client import make_request_options from ...types.inference_pipeline_update_response import InferencePipelineUpdateResponse from ...types.inference_pipeline_retrieve_response import InferencePipelineRetrieveResponse +from ...types.inference_pipelines.export_data_response import ExportDataResponse +from ...types.inference_pipelines.task_status_response import TaskStatusResponse __all__ = ["InferencePipelinesResource", "AsyncInferencePipelinesResource"] @@ -214,6 +217,95 @@ def delete( cast_to=NoneType, ) + def export_data( + self, + inference_pipeline_id: str, + *, + start: int, + end: int, + fmt: Literal["json", "csv"], + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> ExportDataResponse: + """ + Export data from inference pipeline for a specified time range. + + Args: + start: Start timestamp (Unix timestamp in seconds) for the data export range. + + end: End timestamp (Unix timestamp in seconds) for the data export range. + + fmt: Export format. Supported formats: 'json', 'csv'. + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not inference_pipeline_id: + raise ValueError( + f"Expected a non-empty value for `inference_pipeline_id` but received {inference_pipeline_id!r}" + ) + return self._post( + f"/inference-pipelines/{inference_pipeline_id}/export", + body=maybe_transform( + { + "start": start, + "end": end, + "fmt": fmt, + }, + export_data_params.ExportDataParams, + ), + options=make_request_options( + extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout + ), + cast_to=ExportDataResponse, + ) + + def get_task_status( + self, + task_result_url: str, + *, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> TaskStatusResponse: + """ + Get the status of an export task using the task result URL. + + Args: + task_result_url: The task result URL returned from export_data method. + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not task_result_url: + raise ValueError( + f"Expected a non-empty value for `task_result_url` but received {task_result_url!r}" + ) + return self._get( + task_result_url, + options=make_request_options( + extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout + ), + cast_to=TaskStatusResponse, + ) + class AsyncInferencePipelinesResource(AsyncAPIResource): @cached_property @@ -380,6 +472,95 @@ async def delete( cast_to=NoneType, ) + async def export_data( + self, + inference_pipeline_id: str, + *, + start: int, + end: int, + fmt: Literal["json", "csv"], + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> ExportDataResponse: + """ + Export data from inference pipeline for a specified time range. + + Args: + start: Start timestamp (Unix timestamp in seconds) for the data export range. + + end: End timestamp (Unix timestamp in seconds) for the data export range. + + fmt: Export format. Supported formats: 'json', 'csv'. + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not inference_pipeline_id: + raise ValueError( + f"Expected a non-empty value for `inference_pipeline_id` but received {inference_pipeline_id!r}" + ) + return await self._post( + f"/inference-pipelines/{inference_pipeline_id}/export", + body=await async_maybe_transform( + { + "start": start, + "end": end, + "fmt": fmt, + }, + export_data_params.ExportDataParams, + ), + options=make_request_options( + extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout + ), + cast_to=ExportDataResponse, + ) + + async def get_task_status( + self, + task_result_url: str, + *, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, + ) -> TaskStatusResponse: + """ + Get the status of an export task using the task result URL. + + Args: + task_result_url: The task result URL returned from export_data method. + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + timeout: Override the client-level default timeout for this request, in seconds + """ + if not task_result_url: + raise ValueError( + f"Expected a non-empty value for `task_result_url` but received {task_result_url!r}" + ) + return await self._get( + task_result_url, + options=make_request_options( + extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout + ), + cast_to=TaskStatusResponse, + ) + class InferencePipelinesResourceWithRawResponse: def __init__(self, inference_pipelines: InferencePipelinesResource) -> None: @@ -394,6 +575,12 @@ def __init__(self, inference_pipelines: InferencePipelinesResource) -> None: self.delete = to_raw_response_wrapper( inference_pipelines.delete, ) + self.export_data = to_raw_response_wrapper( + inference_pipelines.export_data, + ) + self.get_task_status = to_raw_response_wrapper( + inference_pipelines.get_task_status, + ) @cached_property def data(self) -> DataResourceWithRawResponse: @@ -421,6 +608,12 @@ def __init__(self, inference_pipelines: AsyncInferencePipelinesResource) -> None self.delete = async_to_raw_response_wrapper( inference_pipelines.delete, ) + self.export_data = async_to_raw_response_wrapper( + inference_pipelines.export_data, + ) + self.get_task_status = async_to_raw_response_wrapper( + inference_pipelines.get_task_status, + ) @cached_property def data(self) -> AsyncDataResourceWithRawResponse: @@ -448,6 +641,12 @@ def __init__(self, inference_pipelines: InferencePipelinesResource) -> None: self.delete = to_streamed_response_wrapper( inference_pipelines.delete, ) + self.export_data = to_streamed_response_wrapper( + inference_pipelines.export_data, + ) + self.get_task_status = to_streamed_response_wrapper( + inference_pipelines.get_task_status, + ) @cached_property def data(self) -> DataResourceWithStreamingResponse: @@ -475,6 +674,12 @@ def __init__(self, inference_pipelines: AsyncInferencePipelinesResource) -> None self.delete = async_to_streamed_response_wrapper( inference_pipelines.delete, ) + self.export_data = async_to_streamed_response_wrapper( + inference_pipelines.export_data, + ) + self.get_task_status = async_to_streamed_response_wrapper( + inference_pipelines.get_task_status, + ) @cached_property def data(self) -> AsyncDataResourceWithStreamingResponse: diff --git a/src/openlayer/types/inference_pipelines/__init__.py b/src/openlayer/types/inference_pipelines/__init__.py index 3ccedd4e..7b434638 100644 --- a/src/openlayer/types/inference_pipelines/__init__.py +++ b/src/openlayer/types/inference_pipelines/__init__.py @@ -8,3 +8,6 @@ from .data_stream_response import DataStreamResponse as DataStreamResponse from .test_result_list_params import TestResultListParams as TestResultListParams from .test_result_list_response import TestResultListResponse as TestResultListResponse +from .export_data_params import ExportDataParams as ExportDataParams +from .export_data_response import ExportDataResponse as ExportDataResponse +from .task_status_response import TaskStatusResponse as TaskStatusResponse diff --git a/src/openlayer/types/inference_pipelines/export_data_params.py b/src/openlayer/types/inference_pipelines/export_data_params.py new file mode 100644 index 00000000..dbb2b59b --- /dev/null +++ b/src/openlayer/types/inference_pipelines/export_data_params.py @@ -0,0 +1,18 @@ +# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. + +from __future__ import annotations + +from typing_extensions import Literal, Required, TypedDict + +__all__ = ["ExportDataParams"] + + +class ExportDataParams(TypedDict, total=False): + start: Required[int] + """Start timestamp (Unix timestamp in seconds) for the data export range.""" + + end: Required[int] + """End timestamp (Unix timestamp in seconds) for the data export range.""" + + fmt: Required[Literal["json", "csv"]] + """Export format. Supported formats: 'json', 'csv'.""" diff --git a/src/openlayer/types/inference_pipelines/export_data_response.py b/src/openlayer/types/inference_pipelines/export_data_response.py new file mode 100644 index 00000000..89a0bd0e --- /dev/null +++ b/src/openlayer/types/inference_pipelines/export_data_response.py @@ -0,0 +1,25 @@ +# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. + +from __future__ import annotations + +from typing import Optional + +from ..._models import BaseModel + +__all__ = ["ExportDataResponse", "Outputs"] + + +class Outputs(BaseModel): + storage_uri: Optional[str] = None + """URI of the exported data in storage.""" + + +class ExportDataResponse(BaseModel): + task_result_url: str + """URL to poll for task completion status.""" + + complete: Optional[bool] = None + """Whether the export task has completed.""" + + outputs: Optional[Outputs] = None + """Output information, available when task is complete.""" diff --git a/src/openlayer/types/inference_pipelines/task_status_response.py b/src/openlayer/types/inference_pipelines/task_status_response.py new file mode 100644 index 00000000..235040ad --- /dev/null +++ b/src/openlayer/types/inference_pipelines/task_status_response.py @@ -0,0 +1,25 @@ +# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. + +from __future__ import annotations + +from typing import Optional + +from ..._models import BaseModel + +__all__ = ["TaskStatusResponse", "Outputs"] + + +class Outputs(BaseModel): + storage_uri: Optional[str] = None + """URI of the exported data in storage.""" + + +class TaskStatusResponse(BaseModel): + complete: bool + """Whether the task has completed.""" + + outputs: Optional[Outputs] = None + """Output information, available when task is complete.""" + + error: Optional[str] = None + """Error message if the task failed."""