S3 Input (SQS) source plugin
Requires pipeline agent v25.8.1 or later, Core Operator v3.67.0 or later, and Pipeline CLI v3.66.0 or later.
The S3 Input (SQS) source plugin (name: s3_sqs
)
lets you continuously ingest new data from Amazon S3 buckets into a telemetry pipeline.
This plugin monitors an SQS queue configured to receive notifications
directly from S3 (opens in a new tab)
or through SNS (opens in a new tab),
then creates logs from the files described in S3 events. This plugin ignores
any SQS notifications that can’t be decoded as references to an object in an S3 bucket,
but it does not filter notifications by event type.
This is a pull-based source plugin.
This plugin doesn’t support duplicates of itself within the same pipeline.
Supported telemetry types
This plugin for Chronosphere Telemetry Pipeline supports these telemetry types:
Logs | Metrics | Traces |
---|---|---|
Requirements
To use the S3 Input (SQS) plugin, you must meet these requirements:
-
Your IAM user or IAM role must have the following permissions for the ARN of your SQS queue:
-
Your IAM user or IAM role must have the
s3:GetObject
(opens in a new tab) permission for all buckets configured to notify your SQS queue. -
Your SQS queue must have a redrive policy and a dead letter queue (opens in a new tab). Additionally, Chronosphere recommends setting the
maxReceiveCount
of your redrive queue to a value greater than1
, which lets SQS retry sending messages upon failure.
Configuration parameters
Use the parameters in this section to configure your plugin. The Telemetry Pipeline web interface uses the values in the Name column to describe the parameters. Items in the Key column are the YAML keys to use in pipeline configuration files.
Required
Name | Key | Description | Default |
---|---|---|---|
AWS SQS Queue Name | sqs_queue_name | Required. The name of the SQS queue whose notifications you want to monitor. | none |
AWS SQS Queue Region | sqs_queue_region | Required if aws_sqs_endpoint isn’t set. The name of the region where your SQS queue exists. For example, us-east-1 . | none |
Advanced
Name | Key | Description | Default |
---|---|---|---|
Regular Expression Object Match | match_regexp | The regular expression for matching or excluding object keys from S3. This plugin processes notifications only for objects that match the specified regular expression. If not set, the default value of .* matches all possible object keys. | .* |
Delete Message from SQS | delete_messages | If true , deletes SQS messages after processing the associated S3 data. If false , the plugin re-processes each message at an interval specified by your SQS visibility timeout (opens in a new tab), and continues to process each message until a redrive policy (opens in a new tab) is triggered or until you meet your SQS message retention policy. Chronosphere recommends not modifying this value unless you’re testing a pipeline during its initial setup. This is because deleting SQS messages prevents the plugin from processing the same message multiple times and creating duplicate logs. Accepted values: true , false . | true |
Line Buffer Max Size | max_line_buffer_size | The maximum line size the plugin will read from JSON or plain text files. | 10MiB |
S3 Assume Role ARN | s3_assume_role_arn | The ARN of the role to assume in S3. This can be an ARN within the same account or across accounts. | none |
S3 Role External ID | s3_role_external_id | The external ID (opens in a new tab) of the role to assume in S3. | none |
SQS Role ARN | sqs_cross_account_role_arn | The ARN of the role to assume in SQS. This can be an ARN within the same account or across accounts. | none |
SQS Role External ID | sqs_role_external_id | The external ID (opens in a new tab) of the role to assume in SQS. | none |
S3 Read Concurrency | s3_read_concurrency | The maximum number of concurrent S3 GetObject (opens in a new tab) calls that this plugin will make. | The number of logical CPUs allocated to each pipeline replica. |
Authentication methods
The S3 Input (SQS) plugin supports the following authentication methods:
EKS Pod Identities
To use EKS Pod Identities for authentication:
-
In AWS, configure EKS Pod Identities (opens in a new tab).
-
In Pipeline CLI, add the following flag to a
create pipeline
orupdate pipeline
command:calyptia {create|update} pipeline --service-account VALUE
Replace
VALUE
with the name of the Kubernetes service account associated with your Pods.
IMDS
To use IMDS for authentication:
- In AWS, configure IAM roles for your EC2 instance (opens in a new tab).
IRSA
To use IRSA for authentication:
-
In AWS, set up IRSA for your EKS cluster (opens in a new tab).
-
Assign an IAM role to your Kubernetes service account (opens in a new tab).
-
In Pipeline CLI, add the following flag to a
create pipeline
orupdate pipeline
command:calyptia {create|update} pipeline --service-account VALUE
Replace
VALUE
with the name of your Kubernetes service account.
Static credentials
To use static credentials for authentication:
- In Telemetry Pipeline, create secrets
that contain the values of your IAM access keys (opens in a new tab).
These secrets must use the key names
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
.
You don’t need to add an explicit reference to these secrets in your pipeline configuration file. If secrets with the correct key names are present, the S3 Input (SQS) plugin automatically detects these values and uses them for authentication.
Supported data types
The S3 Input (SQS) plugin can ingest JSON objects and plain text from files stored in S3 buckets, including gzip-compressed files. Additionally, this plugin can extract and ingest compressed and uncompressed files from tar archives.
JSON
This plugin can ingest data from JSON files with these file extensions:
.json
.jsonl
.ndjson
If a file contains only a single JSON object, this plugin creates a new log from that object. If a file contains multiple newline-delimited JSON (NDJSON) (opens in a new tab) objects, this plugin creates a new log from each JSON object within that file. Key/value pairs from JSON objects are stored as key/value pairs in the resulting log.
For JSON files that use gzip compression (with file extensions such as .json.gzip
or .json.gz
), this plugin decompresses each file before processing it accordingly.
Plain text
If a file doesn’t use a file extension that identifies it as a JSON file, the
S3 Input (SQS) plugin processes that file as plain text. It creates a new log from
each line of the file and stores the content in a key named _raw
within the
resulting log.
For non-JSON files that use gzip compression (with file extensions that include
the .gzip
or .gz
suffix), this plugin decompresses each file before processing
it accordingly.
Tar archives
The plugin can extract and consume files from tar archives with these file extensions:
.tar
.tar.gz
.tar.gzip
After the plugin extracts these files, it processes any JSON and plain text data accordingly, but skips directories and symbolic links.
If files inside a tar archive are gzip-compressed, this plugin decompresses those files accordingly.
Filtering
The S3 Input (SQS) plugin doesn’t filter notifications by event type. If a notification contains a reference to an object in an S3 bucket, the plugin will ingest data from that object, regardless of its associated event type.
To create filters based on event type, you must configure the notification settings of your SQS settings in AWS. For more information, see the AWS Event notification types and destinations (opens in a new tab) guide.
Metadata
The S3 Input (SQS) plugin attaches the following metadata to each log:
__chrono_bucket
: The name of the S3 bucket that contains the file from which the log was created.__chrono_file
: The key of the S3 object from which the log was created.__chrono_tar_file_entry
: For data extracted from tar archives only. The name of the tar archive that contained the file from which the log was created.
Get started
To get started with the S3 Input (SQS) plugin, follow these steps.
-
Either create a new pipeline (opens in a new tab) or modify an existing pipeline (opens in a new tab).
-
For testing purposes, set the pipeline’s destination to standard output (opens in a new tab).
-
Set the pipeline’s source to S3 Input (SQS), and then add values for all required parameters, along with any optional parameters of your choosing.
-
Set up one of the supported authentication methods for the S3 Input (SQS) source plugin.
-
In the Telemetry Pipeline web interface, go to the summary page for that pipeline.
-
In the Pipeline Output section, click Get latest logs.
-
Review this log output to ensure that you’re receiving data from S3. If you don’t receive any data, or if you encounter connection errors, review your plugin configuration settings.
-
After you’ve confirmed that the S3 Input (SQS) plugin is functioning correctly, you can overwrite the standard output destination with the destination where you want to send your telemetry data.