AWS : REST API + Athena (Gateway & Lambda)

REST API + Athena using Gateway and Lambda

1. Create a Lambda function

Please follow the steps to load data from S3 to Athena from here

Please follow the steps to create Lambda function from here

2. Code to integrate Athena , Lambda and REST response

#####################################################
# Code : To Integrate Athena and APi gateway response
# Developer : Debaditya Chakravorty
# Credits : Thanks to all AWS and other blogs
#####################################################

import json
import boto3
import time


def lambda_handler(event,context):
	client = boto3.client('athena')

	state = event['queryStringParameters']['state']
	county = event['queryStringParameters']['county']
	
	col_name = state+","+county

	query_string = 'select * from debdata where statecode = \'' + state + '\' and county = \'' + county +'\' ;'
	DATABASE_NAME = 'debtest'
	output_dir = 's3://deb-lambda-output/data/'

	query_id = client.start_query_execution(
			QueryString = query_string,
			QueryExecutionContext = {
				'Database': DATABASE_NAME
			},
			ResultConfiguration = {
				'OutputLocation': output_dir
			}
		)['QueryExecutionId']

	#queryId = queryStart['QueryExecutionId']


	query_status = None
	while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
		query_status = client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
		if query_status == 'FAILED' or query_status == 'CANCELLED':
			raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query_string))
		# This time is required and can be bring down to 5 sec
        time.sleep(10)
	results_paginator = client.get_paginator('get_query_results')
	results_iter = results_paginator.paginate(
		QueryExecutionId=query_id,
		PaginationConfig={
		'PageSize': 1000
		}
	)
	results = []
	data_list = []
	for results_page in results_iter:
		for row in results_page['ResultSet']['Rows']:
			data_list.append(row['Data'])
	for datum in data_list[1:]:
		results.append([x['VarCharValue'] for x in datum])
	
	#2. Construct the body of the response object
	queryResponse = {}
	queryResponse[col_name] = results


	#3. Construct http response object
	responseObject = {}
	responseObject['statusCode'] = 200
	responseObject['headers'] = {}
	responseObject['headers']['Content-Type'] = 'application/json'
	responseObject['body'] = json.dumps(queryResponse)
	
	#4. Return the response object
	return responseObject

3. Create API Gateway

Please follow the steps to create REST endpoint from here

4. Test the REST endpoint with Query string

example : state=FL&county=CLAY COUNTY

Below is the result from Athena which is updated from S3 via Glue crawler

Note :

  • The steps mentioned above is for POC.

  • In Production or any organization , CloudFormation template and proper IAM roles would be utilized (Concept of least privilege)