Written by Illia Kavaliou
Introduction
Databricks and other modern data platforms offer powerful analytical capabilities, but sharing that data externally can be challenging.
This article, the first in a three-part series on building data APIs for modern data platforms including Databricks, Snowflake, and S3 Data Lake, offers step-by-step instructions to implement a secure, scalable, serverless RESTful API that interfaces with Databricks on AWS. It focuses on implementation, and it does not discuss use cases or if providing a highly available API to share analytical data via Databricks (which is rarely used as a transactional database) is the right approach.
By following this guide, you will be able to:
Validate a GET query with parameters
Translate the query into a SQL statement
Return the response in a compressed JSON format
Secure data by ensuring Databricks credentials and schema are never shared
Create an auto-generated OpenAPI specification that serves as live documentation
The article also offers recommendations for authentication, authorisation, application firewalls, and custom domain name implementation.
Architecture Overview
The solution leverages several AWS services to create a robust and secure API:
Amazon API Gateway for API management
Containerised Python AWS Lambda for request validation and response processing
AWS CloudWatch and X-Ray for observability
Amazon Cognito for authentication (optional)
AWS WAF for security (optional)
Amazon Route 53 as a DNS server (optional)
The Databricks SQL Statement Execution API, used to query Databricks catalogs, allows the execution of SQL statements via API, eliminating the need for installing ODBC/JDBC drivers and managing SQL connections. It is compatible with both Classic and Serverless compute workflows, enabling the construction of a fully serverless Databricks API layer.
Implementation Guide
Prerequisites
Python 3.12.7 and pip3 installed
Colima or docker to build containers locally
Databricks access token created with permissions to run a warehouse
CLI access to AWS environment
Scenario
This article will presume that Databricks houses a sample dataset containing aggregated drivers dataset, which can be filtered by year and state.
Disclaimer
To be concise, logging statements, exception handling, validations, code optimisation and comments have been omitted from these snippet examples, but it is crucial to incorporate them into your production code. Furthermore, the setup of IDE and workspaces, Python versions and virtual environments, and execution of unit testing is intentionally not included in this guide to ensure that the primary focus remains on the core concepts and architectural design.
1. Project Structure
First, we will create a simple project structure, which is fairly typical for Python-based Lambda implementations using CDK.
- cdk
- src
- - helpers
- - lambda_handler.py
- - requirements.txt
- tests
- requirements.txt
This project uses two requirements.txt files to separate concerns: the root-level file manages tooling and dependencies for infrastructure and deployment (CDK deployments, unit testing), while the src directory file handles runtime dependencies for the Lambda code so they will be included in the deployment package.
2. Base Lambda Code
First, let's add code to the Python-based Lambda function src/lambda_handler.py that contains a single GET endpoint. This endpoint will retrieve drivers from the Databricks table based on optional query parameters. Below is the basic structure, which utilises AWS Lambda Powertools for built-in logging and tracing:
src/lambda_handler.py
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger, Tracer
app = APIGatewayRestResolver(enable_validation=True)
logger = Logger()
tracer = Tracer()
@app.get("/v1/drivers", compress=True)
def get_drivers():
"""
Code to retrieve data from Databricks will go here
"""
return []
@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
"""
This is the entry point for the lambda function.
"""
return app.resolve(event, context)
The architecture employs a lambdalith approach, utilising an APIGatewayRestResolver from aws_lambda_powertools to manage API routing, which greatly simplifies the internal API routing process. For optimisation, a 'compress' parameter can be added to the decorator to enable gzip and base64 encoding of the responses. This compression is particularly recommended when anticipated payload sizes exceed ~100 kilobytes, as it can significantly reduce response times and bandwidth usage.
3. Helper Methods to Interact with Databricks
Now, let’s add some code to the lambda_handler.get_drivers() method.
src/lambda_handler.py
import json
from typing import Optional
from helpers import data_processing, config, sql_statements
from aws_lambda_powertools.event_handler.exceptions import InternalServerError
...
@app.get("/v1/drivers", compress=True)
def get_drivers(
year: Optional[str] = None,
state: Optional[str] = None
):
"""
Retrieves the drivers by state and year.
"""
databricks_config: config.DatabricksConfig = config.get_config_values()
sql_statement = sql_statements.generate_sql_statement(
databricks_config,
year,
state,
)
databricks_response = data_processing.call_sql_statement_api(
databricks_config, sql_statement
)
if databricks_response["status"]["state"] == "SUCCEEDED":
if databricks_response["result"]:
df = data_processing.process_get_response(databricks_response)
return df.to_dict(orient="records")
return []
else:
raise InternalServerError(json.dumps(databricks_response["status"]))
...
The given code expands a GET endpoint method that retrieves drivers based on two optional parameters, and utilises several helper modules. These modules are responsible for retrieving Databricks credentials, generating SQL statements, executing those statements, and transforming the results into JSON format for output.
The Databricks configuration is sourced from the Lambda environment variables, while the Databricks token is securely stored in the Secrets Manager.
The first helper, config.py, contains a get_config_values() method used to retrieve required Databricks configuration information such as domain, warehouse ID, catalog, schema and token.
src/helpers/config.py
import os
from aws_lambda_powertools.utilities import parameters
class DatabricksConfig:
def init(
self,
databricks_domain: str,
databricks_warehouse_id: str,
databricks_catalog: str,
databricks_schema: str,
databricks_token: str,
):
self.databricks_domain = databricks_domain
self.databricks_warehouse_id = databricks_warehouse_id
self.databricks_catalog = databricks_catalog
self.databricks_schema = databricks_schema
self.databricks_token = databricks_token
def get_config_values() -> DatabricksConfig:
"""
Retrieve the required environment variables from the Lambda function
"""
databricks_domain = os.environ.get("DATABRICKS_DOMAIN")
databricks_warehouse_id = os.environ.get("DATABRICKS_WAREHOUSE_ID")
databricks_catalog = os.environ.get("DATABRICKS_CATALOG")
databricks_schema = os.environ.get("DATABRICKS_SCHEMA")
databricks_token_arn = os.environ.get("DATABRICKS_TOKEN_ARN")
try:
databricks_token = parameters.get_secret(databricks_token_arn)
return DatabricksConfig(
databricks_domain=databricks_domain,
databricks_warehouse_id=databricks_warehouse_id,
databricks_catalog=databricks_catalog,
databricks_schema=databricks_schema,
databricks_token=databricks_token,
)
The next step involves constructing a SQL Statement API query by utilising the Databricks configuration that was acquired in the previous method.
src/helpers/sql_statements.py
from helpers import config
def generate_sql_statement(
databricks_config: config.DatabricksConfig,
year: str,
state_name: str
):
"""
Prepare the SQL statement to retrieve the drivers from the Databricks SQL API based on the input parameters
"""
request_statement = {
"warehouse_id": databricks_config.databricks_warehouse_id,
"catalog": databricks_config.databricks_catalog,
"schema": databricks_config.databricks_schema,
"statement": "SELECT year, state, cohort, gender, drivers FROM licensed_drivers WHERE 1=1",
"parameters": [],
"wait_timeout": "20s",
"on_wait_timeout": "CANCEL",
}
if year:
request_statement["statement"] += " AND Year = :year"
request_statement["parameters"].append({"name": "year", "value": year})
if state_name:
request_statement["statement"] += " AND lower(State) = :state"
request_statement["parameters"].append(
{"name": "state", "value": state_name.lower()}
)
return request_statement
The wait_timeout is set to 20 seconds to ensure that Lambda has enough time to execute before the 29-second API integration timeout is reached.
The data_processing.call_sql_statement_api() method is then executed, making an API call to the Databricks SQL Statements API. This uses Databricks credentials and the SQL statement that was generated earlier.
src/helpers/data_processing.py
import json
import requests
from helpers import config
...
def call_sql_statement_api(
databricks_config: config.DatabricksConfig, sql_statement: dict
) -> dict:
# Call the Databricks SQL API to execute the SQL statement
response = requests.post(
f"https://{databricks_config.databricks_domain}/api/2.0/sql/statements/",
headers={"Authorization": f"Bearer {databricks_config.databricks_token}"},
data=json.dumps(sql_statement),
).json()
return response
...
The data is transformed into a DataFrame object for manipulation and formatting. This is crucial for tasks like JSON serialisation, ensuring numeric data types are accurately represented and preventing further conversion errors. The DataFrame object's flexibility allows for additional data transformation to match the required response payload schema.
src/helpers/data_processing.py
import numpy as np
import pandas as pd
...
def process_get_response(response: dict) -> pd.DataFrame:
"""
Process the response from the Databricks SQL API into a DataFrame object
"""
schema = response["manifest"]["schema"]
data = np.array(response["result"]["data_array"])
df = pd.DataFrame(data, columns=[col["name"] for col in schema["columns"]])
for column in schema["columns"]:
df[column["name"]] = map_data_type(df[column["name"]], column["type_name"])
return df
def map_data_type(series: pd.Series, type_name: str) -> pd.Series:
"""
Map the Databricks SQL data types to the corresponding Pandas data types
"""
if type_name in ("INT", "SHORT", "LONG", "FLOAT", "DOUBLE", "DECIMAL"):
return pd.to_numeric(series, errors="coerce")
return series
...
4. Request Validation
A notable feature of AWS Lambda Powertools is its integrated request validation, which leverages Pydantic. By adding annotations to the parameters and specifying the expected response type, developers can enforce data integrity and ensure that the function receives input that adheres to predefined constraints.
src/lambda_handler.py
from typing import List, Optional
from typing_extensions import Annotated
from aws_lambda_powertools.event_handler.openapi.params import Query
...
@app.get("/v1/drivers", compress=True)
def get_drivers(
year: Annotated[
Optional[str],
Query(
pattern=r"^\d{4}$",
title="Year",
description="Year (YYYY) to query dataset",
),
] = None,
state: Annotated[
Optional[str],
Query(
title="State",
description="US State",
),
] = None,
) -> Optional[List[models.LicensedDriver]]:
...
...
The response type class can be declared in a separate file as shown below.
src/helpers/models.py
from pydantic import BaseModel,
from typing import Annotated
class LicensedDriver(BaseModel):
year: Annotated[
int,
Query(
title="Year",
),
]
state: Annotated[
str,
Query(
title="State",
),
]
Besides validation benefits, this approach also enables the automatic generation of an OpenAPI (Swagger) specification available at GET /swagger endpoint. This specification remains current without manual updates, as it is automatically refreshed whenever the code changes.
src/lambda_handler.py
...
app = APIGatewayRestResolver(enable_validation=True)
app.enable_swagger(
path="/swagger",
title="Databricks Data API",
)
...
5. Lambda Dependencies
Below is an example 'requirements.txt' file located in the lambda code directory. Please ensure that the packages listed are updated to their latest compatible versions.
src/requirements.txt
aws-lambda-powertools==3.5.0
aws-lambda-powertools[tracer]
pydantic==2.6.4
jmespath==1.0.1
requests==2.32.3
numpy==1.26.4
pandas==2.2.1
6. AWS Infrastructure Using CDK
When the core Lambda code is ready, it should be packaged and deployed to AWS. Then, an API endpoint can be created to invoke the code and retrieve the response from Databricks.
Start by adding CDK and Pytest dependencies to the requirements.txt file located in the root directory.
aws-cdk-lib==2.136.1
constructs>=10.0.0,<11.0.0
pytest==8.1.1
pyyaml==6.0.2
Next, install the CDK dependencies:
pip install -r requirements.txt
If you are unfamiliar with CDK, you may use the command cdk init language=python from the CDK folder. However, it is recommended to clean up the auto-generated structure for clarity. The CDK folder structure should look like the example provided below. For better organisation, move unit tests to a separate tests folder in the project root directory.
-- cdk
-- -- templates
-- -- -- databricks_api_stack.py
-- -- app.py
-- -- cdk.json
Modify app.py, the entry point for CDK synth and deploy commands, to synthesise the Databricks API stack using templates from the repository.
cdk/app.py
#!/usr/bin/env python3
import os
import aws_cdk as cdk
from templates.databricks_api_stack import DatabricksApiStack
app = cdk.App()
env = cdk.Environment(
account=os.environ["CDK_DEFAULT_ACCOUNT"],
region=os.environ["CDK_DEFAULT_REGION"],
)
DatabricksApiStack(
app,
"serverless-databricks-api",
env=cdk.Environment(
account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
),
)
app.synth()
The CDK stack template code provided below outlines a deployment of the basic containerised Lambda function with environment variables sourced from cdk.json, a Secrets Manager secret, an API Gateway REST API, a Lambda integration, and routing for both the GET /swagger and GET /drivers endpoints.
cdk/templates/databricks_api_stack.py
#!/usr/bin/env python3
import os
import aws_cdk as cdk
from templates.databricks_api_stack import DatabricksApiStack
from constructs import Construct
from typing import Dict
from aws_cdk import (
Stack,
Tags,
Duration,
BundlingOptions,
BundlingFileAccess,
aws_lambda,
aws_apigateway,
aws_secretsmanager,
SecretValue,
)
class DatabricksApiStack(Stack):
def init(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
Tags.of(self).add("stack_name", construct_id)
self.construct_id = construct_id
self.create_lambda()
self.create_api_gateway()
def create_lambda(self):
# Add your Databricks token value via CI/CD or manual input
token_secret_arn = aws_secretsmanager.Secret(
self,
"DatabricksTokenSecret",
secret_name="/serverless-databricks-api/databricks-token",
description="Databricks token",
secret_string_value=SecretValue.plain_text("YOUR_DATABASE_TOKEN"),
)
# Create the Lambda function
self.lambda_function = aws_lambda.Function(
self,
"DatabricksApiLambda",
runtime=aws_lambda.Runtime.PYTHON_3_12,
architecture=aws_lambda.Architecture.ARM_64,
timeout=Duration.seconds(29),
code=aws_lambda.Code.from_asset(
os.path.join(
os.path.dirname(__file__), "../../src/lambda_handler"
),
bundling=BundlingOptions(
image=aws_lambda.Runtime.PYTHON_3_12.bundling_image,
bundling_file_access=BundlingFileAccess.VOLUME_COPY,
command=[
"bash",
"-c",
"pip install --no-cache -r requirements.txt -t /asset-output && cp -au . /asset-output",
],
),
),
handler="lambda_handler.lambda_handler",
memory_size=512,
tracing=aws_lambda.Tracing.ACTIVE,
environment={
"DATABRICKS_DOMAIN": self.node.try_get_context("databricksDomain"),
"DATABRICKS_WAREHOUSE_ID": self.node.try_get_context(
"databricksWarehouseId"
),
"DATABRICKS_CATALOG": self.node.try_get_context("databricksCatalog"),
"DATABRICKS_SCHEMA": self.node.try_get_context("databricksSchema"),
"DATABRICKS_TOKEN_ARN": token_secret_arn.secret_arn,
"POWERTOOLS_SERVICE_NAME": "databricks-api",
"POWERTOOLS_LOG_LEVEL": "INFO",
},
)
# Grant the Lambda function permission to read the Secrets Manager
token_secret_arn.grant_read(self.lambda_function)
def create_api_gateway(self):
# Create the API Gateway RestApi
self.api = aws_apigateway.RestApi(
self,
"DatabricksApiGateway",
rest_api_name="Serverless Databricks API",
description="API Gateway for Databricks API",
api_key_source_type=aws_apigateway.ApiKeySourceType.HEADER,
deploy=True, # Ensure the API is deployed
deploy_options=aws_apigateway.StageOptions(
stage_name="prod", # Specify a stage name
),
)
# Create the Lambda integration
self.lambda_integration = aws_apigateway.LambdaIntegration(
self.lambda_function,
proxy=True,
)
# Add a resource and method to the API Gateway
swagger = self.api.root.add_resource("swagger")
swagger.add_method(
"GET",
self.lambda_integration,
)
v1 = self.api.root.add_resource("v1")
self.v1_drivers = v1.add_resource("drivers")
v1_drivers.add_method(
"GET",
self.lambda_integration
)
Remember that if CDK templates are deployed from Intel chip-based machines, you should use the x86_64 architecture instead of the ARM_64 architecture for AWS Lambda resource deployment.
Finally, add context variables that define the Databricks Domain, Warehouse ID, Catalog, and Schema for Lambda environment variables to the cdk/cdk.json file.
Consider expanding this CDK code to incorporate VPC and subnets for the Lambda environment, Cognito user pools and app clients, API authentication and authorisation, API keys and usage plans, WAF rules, custom domain names, and other features as needed. These examples are excluded from the guide to ensure the focus remains on core concepts and functionality.
Once CDK templates are created and you are programmatically authenticated into your AWS environment, you can deploy them via the terminal using the “cdk deploy” command.
The deployed API stack can be validated using Postman or similar tools by running the command below. The API Gateway URL for the deployed stage can be found in CloudFormation stack outputs or accessed via AWS console.
curl --location 'https://{API_URL}/v1/drivers?state=California&year=2016' \
--header 'Accept: application/json'
Best Practices and Considerations
Authentication & Security: It’s recommended to use Cognito user pools with access limited to users with pre-defined scopes. Implement WAF with some basic rules for additional security. Rotate Databricks access tokens regularly, preferably as a part of automated release pipelines.
Rate Limiting: Implement throttle limits at both API Gateway and Lambda levels. Implement usage plans with API keys attached and quota setting for each consumer to restrict excessive use of Databricks.
Error Handling: Use proper error status codes and meaningful messages, utilising built-in AWS Lambda Powertools features
Unit Testing: Use unit testing to validate CDK resources as well as core functionality end-to-end flow utilising mocked responses
Performance: Use built-in response compression and ensure Databricks queries are optimised. Consider utilising S3 presigned URL for large datasets responses over 6MB
Agiility: Use .yaml files to store parameters specific to a particular environment and load them dynamically when synthesising cdk code instead of relying on cdk.json variables.
Conclusion
This implementation provides a foundation to build a secure, cost-effective and maintainable solution to expose Databricks data through a serverless RESTful API. The combination of AWS services ensures high availability, scalability, and observability, while maintaining control over data access.
Next articles in this series will cover similar implementations for Snowflake and S3 data lake, highlighting the unique challenges and solutions for each platform.
The Mantel Group team has extensive experience and has developed boilerplates and accelerators to assist you in building high-quality solutions rapidly. Please contact us if you require assistance with best practices for Data API implementation.