Kinesis Events
Important: These docs are for the outdated Jets 5 versions and below. For the latest Jets docs: docs.rubyonjets.com
Jets supports Kinesis Events as a Lambda trigger. You can use Lambda to process the data from Kinesis. The Lambda function has access to the stream data via event
and kinesis_data
.
Example
Generate code.
jets generate job data --type kinesis --name file
It looks something like this.
Here is an example connecting an existing Kinesis stream to a Lambda function in a Job.
app/jobs/data_job.rb
class DataJob < ApplicationJob
kinesis_event "my-stream" # existing stream
def file
puts "event #{JSON.dump(event)}"
puts "kinesis_data #{JSON.dump(kinesis_data)}"
end
end
Here’s the Lambda function with Kinesis as a trigger.
You can also check that the Lambda function is connected to Kinesis via the aws lambda list-event-source-mappings:
$ aws lambda list-event-source-mappings
{
"EventSourceMappings": [
{
"UUID": "861c866f-356a-4dba-9191-6ed853118fba",
"StateTransitionReason": "User action",
"LastModified": 1550287680.0,
"BatchSize": 100,
"State": "Enabled",
"FunctionArn": "arn:aws:lambda:us-west-2:112233445566:function:demo-dev-data_job-file",
"EventSourceArn": "arn:aws:kinesis:us-west-2:112233445566:stream/my-stream",
"LastProcessingResult": "OK"
}
]
}
$
Send Test Data
Here’s an example of sending the data. Note, you have to base64 encode the payload data.
aws kinesis put-record --stream-name my-stream --partition-key 1 --data $(echo "hello world" | base64)
Tailing Logs
It helps to tail the logs and watch the event as it comes through.
jets logs -f -n data_job-file
Event Payload
The event payload from CloudWatch Log is a base64 encoded String within a JSON structure. Here’s an example to help explain:
event
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49593016666855735301798073856083438124424404568371101698",
"data": "aGVsbG8gd29ybGQ=",
"approximateArrivalTimestamp": 1550289189.474
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49593016666855735301798073856083438124424404568371101698",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::112233445566:role/demo-dev-DataJob-OUD26QSQSWKN-DataJobFileIamRole-CM2L7G0KZVY",
"awsRegion": "us-west-2",
"eventSourceARN": "arn:aws:kinesis:us-west-2:112233445566:stream/my-stream"
}
]
}
kinesis_data
["hello world"]
Here the data aGVsbG8gd29ybGQ=
is hello world
. Example:
$ echo "aGVsbG8gd29ybGQ=" | base64 -d
hello world
Since there can be multiple records, kinesis_data is an Array that contains the unencoded data of each element.
Here’s a screenshot of the event in the CloudWatch Log console.
Create Stream Example
Here’s an example of creating a kinesis stream via the CLI:
aws kinesis create-stream --stream-name my-stream --shard-count 1
IAM Policy
Jets generates an IAM policy for the Lambda function associated with the Kinesis event that allows the permissions needed. You can control and override the IAM policy with normal IAM Policies if required, though.