Using AWS Lambda with Amazon Kinesis

Know How Guide and Hands on Guide for AWS

Using AWS Lambda with Amazon Kinesis



  1. Create the IAM execution role
    Permissions – AWSLambdaKinesisExecutionRole.
    Role name – lambda-kinesis-role.
  2. Create a Kinesis stream
aws kinesis create-stream --stream-name lambda-stream --shard-count 1 --region ${AWS_REGION}

# Record the ARN of kinesis stream
Stream_ARN=$(aws kinesis describe-stream --stream-name lambda-stream --region ${AWS_REGION} --query 'StreamDescription.StreamARN' --output text)
echo $Stream_ARN

Node.js Example

  1. Create the function
zip index.js
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb:// --handler index.handler --runtime nodejs12.x \
--role lambda-kinesis-role-arn
aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.json out.txt --region ${AWS_REGION}

# Cloudwatch logs
INFO	Decoded payload: Hello, this is a test.
  1. Add an event source in AWS Lambda ```bash aws lambda create-event-source-mapping –function-name ProcessKinesisRecords
    –event-source $Stream_ARN –batch-size 100 –starting-position LATEST –region ${AWS_REGION}

aws lambda list-event-source-mappings –function-name ProcessKinesisRecords
–event-source $Stream_ARN –region ${AWS_REGION}


3. Testing from kinesis
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, new stream record." --region ${AWS_REGION}

View the logs in the CloudWatch console

INFO	Decoded payload: Hello, new stream record.
  1. Add addtional consumer
    • Add the DynamoDB and SNS permission to lambda-kinesis-role
    • create dynamodb table kinesis-lambda-table with partition key as recordId
    • create a SNS topic with name kinesis-lambda-sns
    • Lambda code


source code: index-ddb.js

  1. Verify message delivery to DynamoDB table and SNS topic
    aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.json out.txt --region ${AWS_REGION}
    aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
    --data "Hello, new stream record." --region ${AWS_REGION}



Lambda Python Exmaple

  1. Producer

  2. Consumer

aws lambda create-function --function-name ProcessStockKinesisRecords \
--zip-file fileb:// --handler stock-consumer.lambda_handler --runtime python3.8 \
--role lambda-kinesis-role-arn
  1. Add an event source in AWS Lambda ```bash aws lambda create-event-source-mapping –function-name ProcessStockKinesisRecords
    –event-source $Stream_ARN –batch-size 100 –starting-position LATEST –region ${AWS_REGION}

aws lambda list-event-source-mappings –function-name ProcessStockKinesisRecords
–event-source $Stream_ARN –region ${AWS_REGION}

4. To configure a destination for failed-event records

- Under Lambda function Designer, choose Add destination.
- For Source, choose Stream invocation: For Stream, choose a stream that is mapped to the function: stream/lambda-stream
- For Destination: SNS

5. End to end verification

Single record

aws lambda invoke –function-name ProcessStockKinesisRecords –payload file://input.json out.txt –region ${AWS_REGION}

Multiple record


## Cleanup
# Delete the lambda function
aws lambda delete-function --function-name ProcessKinesisRecords --region ${AWS_REGION}
aws lambda delete-function --function-name ProcessStockKinesisRecords --region ${AWS_REGION}

# Delete the DyanmoDB table and SNS topic
 - dynamodb table kinesis-lambda-table and kinesis-stock-table
 - SNS topic with name kinesis-lambda-sns

# Delete the stream
aws kinesis delete-stream --stream-name lambda-stream --region ${AWS_REGION}


Using AWS Lambda with Amazon Kinesis