SQS Events
Important: These docs are for the outdated Jets 5 versions and below. For the latest Jets docs: docs.rubyonjets.com
Jets supports SQS Events as a Lambda trigger. So you can send a message to an SQS queue and it triggers a Lambda function to run. The Lambda function has access to the message data via event
.
There are a few ways to connect an SQS queue to a Lambda function with Jets.
- Existing SQS Queue
- Generated Function SQS Queue
- Generated Shared SQS Queue
We’ll cover each of them:
Existing SQS Queue
Here is an example connecting an existing SQS queue to a Lambda function in a Job
Generate code.
jets generate job waiter --type sqs --name order
It looks something like this.
app/jobs/waiter_job.rb
class WaiterJob < ApplicationJob
class_timeout 30 # must be less than or equal to the SQS queue default timeout
sqs_event "hello-queue"
def order
puts "order event #{JSON.dump(event)}"
end
end
Ultimately, the sqs_event
declaration generates a Lambda::EventSourceMapping. The properties of the mapping can be set with an additional Hash argument:
sqs_event("hello-queue", batch_size: 10)
Generated Function SQS Queue
Jets can create and manage an SQS queue for a specific function. This is done with a special :generate_queue
argument.
class HardJob < ApplicationJob
class_timeout 30 # must be less than or equal to the SQS queue default timeout
sqs_event :generate_queue
def lift
puts "lift event #{JSON.dump(event)}"
end
end
A special :queue_properties
key will set the SQS::Queue properties. Other keys set the Lambda::EventSourceMapping properties. Example:
sqs_event(:generate_queue,
batch_size: 10, # property of EventSourceMapping
queue_properties: {
message_retention_period: 345600, # 4 days in seconds
})
Here’s an example screenshot of a generated SQS queue:
Note, SQS Queues managed by Jets are deleted when you delete the Jets application.
Generated Shared SQS Queue
Jets can also support creating a shared SQS Queue via a Shared Resource. Here’s how you create the SQS queue as a shared resource:
app/shared/resources/list.rb
class List < Jets::Stack
sqs_queue(:waitlist)
end
You can reference the Shared Queue like so:
app/jobs/hard_job.rb
class HardJob < ApplicationJob
class_timeout 30 # must be less than or equal to the SQS queue default timeout
depends_on :list # so we can reference list shared resources
sqs_event ref(:waitlist) # reference sqs queue in shared resource
def fix
puts "fix #{JSON.dump(event)}"
end
end
Underneath the hood, Jets provisions resources via CloudFormation. The use of depends_on
ensures that Jets will pass the shared resource List
stack outputs to the HardJob
stack as input parameters. This allows HardJob
to reference resources from the separate child List
stack.
For those learning CloudFormation, these resources might help:
- AWS CloudFormation Declarative Infrastructure Code Tutorial
- A Simple Introduction to AWS CloudFormation Part 1: EC2 Instance
- Working with Nested Stacks
Accessing the SQS Url
You can access the SQS url with the lookup
method. The method is available to Jets::Stack
subclasses like the List
class here. Here’s an example:
app/jobs/postman_job.rb
class PostmanJob < ApplicationJob
include Jets::AwsServices
iam_policy "sqs"
def deliver
puts "queue arn: #{List.lookup(:waitlist)}"
puts "queue url: #{List.lookup(:waitlist_url)}"
queue_url = List.lookup(:waitlist_url)
message_body = JSON.dump({"test": "hello world"})
sqs.send_message(
queue_url: queue_url,
message_body: message_body,
)
end
end
Send Test Message
Here’s an example of sending a message to an SQS queue via the aws sqs send-message CLI:
aws sqs send-message --queue-url https://sqs.us-west-2.amazonaws.com/112233445566/test-queue --message-body '{"test": "hello world"}'
You can send a message via the SQS Console, sdk, etc also.
Tailing Logs
It helps to tail the logs and watch the event as it comes through.
jets logs -f -n waiter_job-order
Event Payloads
Here’s an example of the event payload.
{
"Records": [
{
"messageId": "1e0bfe01-f9df-46c0-8d86-2fd898e4dee9",
"receiptHandle": "AQEBgxVw0hjHeNKB1brir4hr0Fxvz4ERJIqd7bP/iHw82/+UUx/r4W0KG3FSiEA4A+Vk0oS8dT6W8be/Bn7eJjKspZfW2KzC0xzsCmS+BihySk1SX9FM5SW1rFd3bFWYtT6s7pOX2inaU/THtn7Envp5Rs+zehmNIspnLPZkf9h3RFSQk12xaVaOmCQnHtz9o8uKIXwMEwn5IhlJgC0DIuM1v8NZK8Hc65b4xpf09vf01LEA/XdXm24SjfJ0fl7ev2rBXtkMitAfNmKd8x0fcbG3O7H7wB+CIKR4+QvGcI6u9QuAdPU5MpIJ46niJmrtnIx70S5Go1paUYMa77ABBjFWoJkJHvHouuiohEQHdMrH1QSyabNBS2Nw2dikhBcXVtLQW4iH+xNXwLIVUxarAk9EHokh1iGWZsG91whmPaAl0t2Vdfo6Dcm0/6IgXhKcLFIw",
"body": "{\"test\": \"hello world\"}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1550605918693",
"SenderId": "AIDAJTCD6O457Q7BMTLYM",
"ApproximateFirstReceiveTimestamp": "1550605918704"
},
"messageAttributes": {},
"md5OfBody": "3d635e69eb93fd184b47a31d460ca2b6",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:112233445566:demo-dev-List-3VJ13ADFT5VZ-Waitlist-X35N8JKWZTL3",
"awsRegion": "us-west-2"
}
]
}
sqs_events
The sqs_event
helper method unravels the data and provides the SQS message body.
[{
"test": "hello world"
}]
IAM Policy
An IAM policy is generated for the Lambda function associated with the SQS event that allows the permissions needed. You can control and override the IAM policy with normal IAM Policies if needed though.