page loader

Ideas that Matter

AWS Lambda functions subscription with AWS Kinesis streams

Image Carousel

Kinesis is stream solution provided in AWS Infrastructure while Lambda is service allowed to serverless architecture to execute required functionality.

Combination of both can be used for effective & scalable stream data based solutions. Their are lot of good tutorials for setup kinsis & lambda functions. We will skip that detail in this article. Instead of this we go for integration & related parts.

 

So let we have existing kinesis stream called TestSream having 1 shard. Also we have lambda called TestFunction in node js 6.10 environment with following code processing stream data.

 

 

    exports.handler = (event, context, callback) => {

        // TODO implement

        const promise = require('bluebird')

        promises = event.Records.map( (row, idx) => {

            let parsedData = Buffer.from(row.kinesis.data, 'base64').toString();

           

            try {

                console.log(parsedData);

               

                return showInput(parsedData);

            } catch (err) {

                console.log(err)

                return null;

            }

        });

       

        callback(null, promise.all( promises));

       

        function showInput(parsedData) {

            console.log('--------- RECORDS ---------');

            console.log(parsedData);

            return parsedData;

        }

    };

 

 

As kinesis send data in following format :

 

    {

    "Records": [ {

         "kinesis":

          {

            "kinesisSchemaVersion": "1.0",

            "partitionKey": "$(newuuid())",

            "sequenceNumber": "49575565746068647617510853834761057612953005562844413954",

            "data": "eyJkYXRhIjpbeyJhY2MiOjEwNy4wMjUsImFsdCI6MCwiY2xpZW50aWQiOiIwMTIxNzk5MTk2NzYiLfQ==",

             "approximateArrivalTimestamp": 1501671734.604

          },

          "eventSource": "aws:kinesis",

          "eventVersion": "1.0",

          "eventID": "shardId-000000000000:49575565746068647617510853834761057612953005562844413954",

           "eventName": "aws:kinesis:record",

           "invokeIdentityArn": "arn:aws:iam::011111111111:role/service-role/lambdaRole",

           "awsRegion": "us-east-1",

           "eventSourceARN": "arn:aws:kinesis:us-east-1:011111111111:stream/TestStream"

       } ]

    }

 

 

Their could be multiple rows in Records so we need to process all. data argument in kinesis in each row have the byte representation + base64 encoded for kinesis streams. We first need to base 64 decode & convert byte data to string, Then only we can get data for further processing.

 

On setup part we need to two things to take care of.

 

1) We need to add kinesis stream as lambda trigger.

2) Setup lambda role with required stream access permissions.

 

For first, Goto Triggers -> Add trigger tab, then choose kinesis. After that we get options for link kinesis stream, Choose TestStream from Kinesis Stream dropdown. Second option Batch Size signify the larger number of records lambda retrieve from kinesis. Then we have three option for starting position.

 

a) Trim horizion, is starting point of kinsis stream. So if a stream have 24 hours data. Trim horizion send from start.

b) Latest, is sending only latest availale records, this is more suitable on latest / current data processing.

c) At timestamp, where we can provide timestamp to start with.

 

Let we choose Latest and save trigger.  Now we need to assign proper permission to lambda role to get kinesis data.

 

Policy document similar to following will assign required permission to role.

 

    {

        "Version": "2012-10-17",

        "Statement": [

            {

                "Sid": "Stmt1495188255000",

                "Effect": "Allow",

                "Action": [

                    "kinesis:DescribeStream",

                    "kinesis:GetShardIterator",

                    "kinesis:GetRecords",

                    "kinesis:ListStreams"

                ],

                "Resource": [

                    "arn:aws:kinesis:us-east-1:011111111111:stream/TestStream"

                ]

            }

        ]

    }

 

 

 

 

 

This is basic how lambda function can be used with kinesis streams.

 

When we start working on complex lambda functions, we face some issues.  Lets see this issues.

 

First any unhandled problem in code trigger kinesis the data to reinsert data into kinesis queue, which could lead for same data processed again & again which can block other data processing.

One solution to fix this issue to make some front facing lambda before our actual lambda function. Front facing lambda can use AWS node js apis for Lambda function to invoke asynchrously our processing lambda function from front facing lambda.

 

    exports.handler = (event, context, callback) => {

        // TODO implement

        const promise = require('bluebird')

        const aws = require('aws-sdk');

 

        aws.config.update({ region: process.env.region });

        const awsLambda = new aws.Lambda({

          region: process.env.region, // change to your region

          accessKeyId: process.env.accessKeyId,

          secretAccessKey: process.env.secretAccessKey

        });

 

          return new promise((resolve, reject) => {

            awsLambda.invoke(

              {

                FunctionName: 'TestFunction',

                InvocationType: 'Event',

                Payload: JSON.stringify(event, null, 2)

              },

              (error, result) => {

                if (error) {

                  reject(error);

                }

 

                resolve('OK');

              }

            );

          })

        .then(results => {

          callback('OK');

        })

        .catch(err => {

          console.log(err);

          callback(null);

        });       

    };

 

0 Comments