Know How Guide and Hands on Guide for AWS
Every 2 hour, I need execute the
cd .. && zip -g function.zip basic-athena-query-lambda.py athena_query_helper.py
2. Create lambda function
```bash
aws lambda create-function --function-name BasicAthenaQuery --runtime python3.7 \
--zip-file fileb://function.zip --handler basic-athena-query-lambda.lambda_handler \
--role arn:aws-cn:iam::$account_id:role/lambda_basic_execution \
--timeout 300 --memory-size 256
aws lambda invoke --function-name BasicAthenaQuery \
--payload '{ "database": "sampledb", "table": "user_email", "s3_location": "s3://ray-datalake-lab/sample/user_email" }' \
out --log-type Tail --query 'LogResult' --output text | base64 -d
cat out
#[{"name": "user1", "email": "user1@example.com"}, {"name": "user2", "email": "user2@example.com"}, {"name": "user3", "email": "user3@example.com"}, {"name": "user4", "email": "user4@example.com"}, {"name": "user5", "email": "user5@example.com"}, {"name": "user6", "email": "user6@example.com"}]
zip -g function.zip basic-athena-query-lambda.py
aws lambda update-function-code --function-name BasicAthenaQuery \
--zip-file fileb://function.zip
Check the Lambda execution result and verify the Athena query executed successfully
cleanup
# lambda
aws lambda delete-function --function-name BasicAthenaQuery
drop table user_email
drop database sampledb
Below is main logic to run query against Athena by SDK.
import boto3
query = "Your query"
database = "database_name"
athena_result_bucket = "s3://my-bucket/"
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': athena_result_bucket,
}
)
query_execution_id = response["QueryExecutionId"]
result = poll_status(query_execution_id)
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
# get query results
result = athena_client.get_query_results(
QueryExecutionId=query_execution_id)
But if the query take long time, I use the python retry annotation to polling the execution status.
Seems OK, but it is not Loose couple appoarch, if my query take longer time, I have to modify my code.
@retry(stop_max_attempt_number=10,
wait_exponential_multiplier=300,
wait_exponential_max=1 * 60 * 1000)
def poll_status(_id):
result = athena_client.get_query_execution(QueryExecutionId=_id)
state = result['QueryExecution']['Status']['State']
logger.info("STATUS:" + state)
if state == 'SUCCEEDED':
logger.info(result)
return result
elif state == 'FAILED':
logger.error(result)
return result
else:
logger.info(result)
raise Exception
I need a way to resolve below 2 requirements:
Seems Step function can orchestrate workflow, let’s try it.
Let’s make the scenaio more complex:
Let make the workflow automated
aws lambda create-function –function-name athena_createdb_createtable –runtime python3.7
–zip-file fileb://function.zip –handler athena_createdb_createtable.lambda_handler
–role arn:aws-cn:iam::$account_id:role/lambda_basic_execution
–timeout 300 –memory-size 256
- Create the lambda functions: athena_automate_handler
```bash
zip -g function.zip athena_automate_handler.py athena_query_helper.py
aws lambda create-function --function-name athena_short_running_query --runtime python3.7 \
--zip-file fileb://function.zip --handler athena_automate_handler.athena_short_running_query \
--role arn:aws-cn:iam::$account_id:role/lambda_basic_execution \
--timeout 300 --memory-size 256
aws lambda create-function --function-name athena_start_long_running_query --runtime python3.7 \
--zip-file fileb://function.zip --handler athena_automate_handler.athena_start_long_running_query \
--role arn:aws-cn:iam::$account_id:role/lambda_basic_execution \
--timeout 300 --memory-size 256
aws lambda create-function --function-name athena_get_long_running_status --runtime python3.7 \
--zip-file fileb://function.zip --handler athena_automate_handler.athena_get_long_running_status \
--role arn:aws-cn:iam::$account_id:role/lambda_basic_execution \
--timeout 300 --memory-size 256
aws lambda create-function --function-name athena_get_long_running_result --runtime python3.7 \
--zip-file fileb://function.zip --handler athena_automate_handler.athena_get_long_running_result \
--role arn:aws-cn:iam::$account_id:role/lambda_basic_execution \
--timeout 300 --memory-size 256
The definition of Step function can be found in scripts/stepfunction.json, replace the lambda arn with your environment value
{
"InputData": {
"Athena_Database": "blogdb",
"Athena_Table": "original_csv",
"Output_Data_Bucket": "ray-datalake-lab",
"Output_Prefix": "results/blogdb",
"Athena_DDL_Bucket": "ray-datalake-lab"
}
}
"ResultPath": "$.taskresult",
"Parameters":{
"TaskName": "Create_database_and_table",
"Athena_DDL_File": "scripts/blogdb/create_blogdb_original_csv.ddl",
"Athena_Database.$":"$.InputData.Athena_Database",
"Athena_Table.$": "$.InputData.Athena_Table",
"Output_Data_Bucket.$": "$.InputData.Output_Data_Bucket",
"Output_Prefix.$": "$.InputData.Output_Prefix",
"Athena_DDL_Bucket.$": "$.InputData.Athena_DDL_Bucket"
}
"ResultPath": "$.taskresult",
"Parameters": {
"TaskName": "Convert_Parquet_table_as_select",
"Athena_DDL_File": "scripts/blogdb/create_blogdb_new_parquet.ddl",
"Athena_Database.$": "$.InputData.Athena_Database",
"Athena_Table": "new_parquet",
"Output_Data_Bucket.$": "$.InputData.Output_Data_Bucket",
"Output_Prefix.$": "$.InputData.Output_Prefix",
"Athena_DDL_Bucket.$": "$.InputData.Athena_DDL_Bucket"
}
new_parquet
Partition the data by year, Convert the data to Parquet, and Compress the Data with Snappy
aws s3 ls s3://ray-datalake-lab/sample/athena-ctas-insert-into-optimized/ –recursive –human-readable –region cn-northwest-1 | head -5
3. Step 3: Use INSERT INTO to Add Data from years 2010 to 2014.
- Define the step function parameter
```json
"ResultPath": "$.taskresult",
"Parameters": {
"TaskName": "Insert_Data",
"Athena_DDL_File": "scripts/blogdb/insert_blogdb_new_parquet.ddl",
"Athena_Database.$": "$.InputData.Athena_Database",
"Athena_Table": "new_parquet",
"Output_Data_Bucket.$": "$.InputData.Output_Data_Bucket",
"Output_Prefix.$": "$.InputData.Output_Prefix",
"Athena_DDL_Bucket.$": "$.InputData.Athena_DDL_Bucket"
}
Step function task will INSERT INTO to Add Data from years 2010 to 2014 to new_parquet
.
Check the partitions and parquet files created by the CTAS statement ```bash aws s3 ls s3://ray-datalake-lab/sample/athena-ctas-insert-into-optimized/ –region cn-northwest-1 # PRE year=2010/ # PRE year=2011/ # PRE year=2012/ # PRE year=2013/ # PRE year=2014/ # PRE year=2015/ # PRE year=2016/ # PRE year=2017/ # PRE year=2018/ # PRE year=2019/
aws s3 ls s3://ray-datalake-lab/sample/athena-ctas-insert-into-optimized/ –recursive –human-readable –summarize –region cn-northwest-1
4. Step 4: Query the data and measure performance
- Define the step function parameter
```json
"ResultPath": "$.taskresult",
"Parameters": {
"TaskName": "Query_data_measure_performance1",
"Athena_DDL_File": "scripts/blogdb/query_orders_group_by_year.ddl",
"Athena_Database.$": "$.InputData.Athena_Database",
"Athena_Table": "new_parquet",
"Output_Data_Bucket.$": "$.InputData.Output_Data_Bucket",
"Output_Prefix.$": "$.InputData.Output_Prefix",
"Athena_DDL_Bucket.$": "$.InputData.Athena_DDL_Bucket"
}
Step function task will invoke the query
{"query_execution_result":[{"year":"2014","_col1":"41278"},{"year":"2013","_col1":"41955"},{"year":"2012","_col1":"42088"},{"year":"2011","_col1":"41076"},{"year":"2010","_col1":"40289"}]}
"ResultPath": "$.taskresult",
"Parameters": {
"TaskName": "Query_data_measure_performance2",
"Athena_DDL_File": "scripts/blogdb/analysis_orders_in_2018.ddl",
"Athena_Database.$": "$.InputData.Athena_Database",
"Athena_Table": "new_parquet",
"Output_Data_Bucket.$": "$.InputData.Output_Data_Bucket",
"Output_Prefix.$": "$.InputData.Output_Prefix",
"Athena_DDL_Bucket.$": "$.InputData.Athena_DDL_Bucket"
},
aws lambda delete-function --function-name athena_createdb_createtable
aws lambda delete-function --function-name athena_short_running_query
aws lambda delete-function --function-name athena_start_long_running_query
aws lambda delete-function --function-name athena_get_long_running_status
aws lambda delete-function --function-name athena_get_long_running_result
DROP TABLE IF EXISTS new_parquet
DROP TABLE IF EXISTS original_csv
DROP DATABASE blogdb
clean up the data under s3://ray-datalake-lab/sample/athena-ctas-insert-into-optimized/