Using AWS Lambda with Amazon Kinesis

Know How Guide and Hands on Guide for AWS

Using AWS Lambda with Amazon Kinesis

kinesis-pull-10

Preprare

  1. Create the IAM execution role
    Permissions – AWSLambdaKinesisExecutionRole.
    Role name – lambda-kinesis-role.
    
  2. Create a Kinesis stream
aws kinesis create-stream --stream-name lambda-stream --shard-count 1 --region ${AWS_REGION}

# Record the ARN of kinesis stream
Stream_ARN=$(aws kinesis describe-stream --stream-name lambda-stream --region ${AWS_REGION} --query 'StreamDescription.StreamARN' --output text)
echo $Stream_ARN

Node.js Example

  1. Create the function
zip function.zip index.js
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \
--role lambda-kinesis-role-arn
aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.json out.txt --region ${AWS_REGION}

# Cloudwatch logs
INFO	Decoded payload: Hello, this is a test.
  1. Add an event source in AWS Lambda ```bash aws lambda create-event-source-mapping –function-name ProcessKinesisRecords
    –event-source $Stream_ARN –batch-size 100 –starting-position LATEST –region ${AWS_REGION}

aws lambda list-event-source-mappings –function-name ProcessKinesisRecords
–event-source $Stream_ARN –region ${AWS_REGION}


![kinesis-lambda](/aws-is-how/lambda/kinesis-lambda/media/kinesis-lambda.png)

3. Testing from kinesis
```bash
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, new stream record." --region ${AWS_REGION}

View the logs in the CloudWatch console

INFO	Decoded payload: Hello, new stream record.
  1. Add addtional consumer
    • Add the DynamoDB and SNS permission to lambda-kinesis-role
    • create dynamodb table kinesis-lambda-table with partition key as recordId
    • create a SNS topic with name kinesis-lambda-sns
    • Lambda code

kinesis-lambda-ddb-sns

source code: index-ddb.js

  1. Verify message delivery to DynamoDB table and SNS topic
    aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.json out.txt --region ${AWS_REGION}
    aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
    --data "Hello, new stream record." --region ${AWS_REGION}
    

DynamoDB

SNS-notification

Lambda Python Exmaple

  1. Producer stock-producer.py

  2. Consumer stock-consumer.py

zip function.zip stock-consumer.py
aws lambda create-function --function-name ProcessStockKinesisRecords \
--zip-file fileb://function.zip --handler stock-consumer.lambda_handler --runtime python3.8 \
--role lambda-kinesis-role-arn
  1. Add an event source in AWS Lambda ```bash aws lambda create-event-source-mapping –function-name ProcessStockKinesisRecords
    –event-source $Stream_ARN –batch-size 100 –starting-position LATEST –region ${AWS_REGION}

aws lambda list-event-source-mappings –function-name ProcessStockKinesisRecords
–event-source $Stream_ARN –region ${AWS_REGION}


4. To configure a destination for failed-event records

- Under Lambda function Designer, choose Add destination.
- For Source, choose Stream invocation: For Stream, choose a stream that is mapped to the function: stream/lambda-stream
- For Destination: SNS


5. End to end verification

Single record

aws lambda invoke –function-name ProcessStockKinesisRecords –payload file://input.json out.txt –region ${AWS_REGION}

Multiple record

python stock-producer.py


## Cleanup
```bash
# Delete the lambda function
aws lambda delete-function --function-name ProcessKinesisRecords --region ${AWS_REGION}
aws lambda delete-function --function-name ProcessStockKinesisRecords --region ${AWS_REGION}

# Delete the DyanmoDB table and SNS topic
 - dynamodb table kinesis-lambda-table and kinesis-stock-table
 - SNS topic with name kinesis-lambda-sns

# Delete the stream
aws kinesis delete-stream --stream-name lambda-stream --region ${AWS_REGION}

Resource

Using AWS Lambda with Amazon Kinesis

https://www.casleyconsulting.co.jp/blog/engineer/5752/

https://note.com/tsukamoto/n/n9af2d6fec470