Data Engineering — AWS Data and Analytics — Collection — Kinesis Data Stream

Sanjeeb Mohapatra
7 min readJan 18, 2023

Amazon Kinesis data stream is used for collect and process large data stream in real time. Kinesis data stream reads the data from a stream as data records (referred as Producer) and downstream applications using different mechanism (called consumer) to consume the data stream. Kinesis data streams is one of the scalable and durable real-time data streaming service.

High level Architecture ( The below diagram is from AWS site — Refer — https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html)

Notes: (Below points are some important concepts in Kinesis data stream taken from above AWS site)

  1. The producer continuously produced (pushed) the data to the Kinesis data stream. The data is stored in Shard in Kinesis data stream.

2. The Consumers (such as a custom application running on Amazon EC2 or an Amazon Kinesis Data Firehose delivery stream) can store their results using an AWS service such as Amazon DynamoDB, Amazon Redshift, or Amazon S3.

3. A Kinesis data stream is a set of shards. Each shard has a sequence of data records. Each data record has a sequence number that is assigned by Kinesis Data Streams. A data record is a unit of data stored in the Shard. The data record contains the sequence number and partition key and data (in blob) and is immutable (cannot be changed).

4. The default retention period of data records is 24 hours but it can be extended to 7 days (168 hours).

5. A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity. Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). The data capacity of your stream is a function of the number of shards that you specify for the stream. The total capacity of the stream is the sum of the capacities of its shards.

The main objective of this blog is to perform the below use case.

Problem Statement: Build a real time streaming application using Amazon Kinesis Data Stream.

Details: In this lab, we are going to perform the below steps.

1. Creating a real time data streaming system using Amazon Kinesis Data Streams (KDS).

2. We are going to use Amazon Kinesis, AWS Lambda, Amazon S3, IAM

3. User will upload a file in S3 (for demo, we will upload via aws management console).

4. We set up the event trigger on S3 which will trigger a lambda function. The lambda function will work as a producer and produce the details to Kinesis Data Stream.

5. We will use again 2 consumers (lambda) which will read the data from Kinesis data stream. Then any downstream application can be integrated with these consumers.

The high level architecture diagram is:

Let’s jump into the lab

Step -1: Create a Kinesis Data Stream

1. To create a Kinesis data stream, login to AWS management console, search Kinesis service.

2. Select Kinesis Data Stream and Click on Create data stream

3. Give the name of the data stream

4. Data Stream Capacity (Here we are selecting provisioned but you can select on demand and depend upon the workload, select required option). We selected the number of shards as 1. However, require number shards can be calculated with input data record length, number of records per second and number of consumer.

5. Finally click on the Click Stream

Step -2: Once the KDS (Kinesis Data Stream) is created, click the stream, go to configuration tab, go to Encryption option, click edit and check enable server-side encryption and use the default encryption type (In this case we used the AWS Managed CMK). Click on the Save changes.

Step -3: Create S3 bucket

1. Search S3 service from the aws management console search option.

2. Click on create S3 bucket option

3. Give the bucket name, select the right region (select the same region where you have created your Kinesis Stream)

4. Enable the bucket versioning (However it depends upon the requirement)

5. Select the default encryption option (Amazon S3 managed SSE-S3) and click create bucket to create the S3 bucket.

Step -4: Create lambda function (Producer and Consumers)

In this step, we will create 3 lambda functions (One producer, 2 consumers)

Before creating the lambda function, create a role in IAM and role should have access to S3, Kinesis and lambda basic execution rule. We are not providing the details how to create the role, the policy details Json is available below

Step -4: To create the first lambda function (Producer)

1. Search lambda service from AWS management console

2. Click on Create function

3. Select Author from Scratch

Next Steps:

1. Give the basic information of the lambda function, like name of the function, Runtime environment (In this case we selected python as run time environment)

2. In the permission select the existing role and attach the role ( which you have created in the above step)

In the lambda code, copy paste the below code (this is a simple python code, which will read the object from S3 from event trigger and then push to Kinesis data stream).

import json
import boto3

def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
print("Bucket Name: ", bucket)
file_key_name = event['Records'][0]['s3']['object']['key']
s3_full_path = bucket+"/"+file_key_name
print(s3_full_path)
client = boto3.client('s3')
data_obj = client.get_object(Bucket=bucket, Key=file_key_name)
body = data_obj['Body']
data_string = body.read().decode('utf-8')
print(data_string)

K_client = boto3.client('kinesis')
response = K_client.put_record(
StreamName='sanjeeb-test-kinesis-stream',
Data=data_string,
PartitionKey='123'
)
print(response)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}

Note — We have to not configured the S3 Event trigger yet, to do the same

1. Go to S3 bucket, click on the bucket you have created earlier.

2. Click on Properties

3. Click on create event notification, give the name, put the suffix (in this case we put as .txt so that in case we upload any text file with .txt extension, it will trigger the target event which is a lambda function

4. Select the event type all object created

Select the destination as lambda function and select your lambda function.

Step -5: Create consumers ( 2 lambda functions, name will just different but code and other steps will be same).

1. To create a consumer lambda function.

2. Search lambda in aws management console

3. Select lambda, create a function

4. Select author from scratch

5. Give basic information

6. Select the run time environment as python.

7. Under permission, attach the same lambda role which is created in case of producer.

In the code section of the lambda, paste the below python code

import json
import base64

def lambda_handler(event, context):
record = event['Records']
for rec in record:
data_record=rec['kinesis']['data']
data = base64.b64decode(data_record)
print(data)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}

Next Steps:

1. To consume the data from Kinesis, we need to add trigger, to do the same, click configuration tab of the lambda.

2. Click on trigger

3. Click on add trigger

4. Select the source kinesis

5. Select activate trigger and make others default

To test e2e, upload a file in the S3 bucket and see the cloud watch logs (log group). Select the consumer log group to see the consumer log and producer log group to see the producer log.

Note — We are NOT creating the second lambda consumer function. You can follow the same step and script to create the same ( Only function name needs to be changed).

--

--

Sanjeeb Mohapatra
Sanjeeb Mohapatra

Written by Sanjeeb Mohapatra

Passionate about Data and Analytics

No responses yet