Requires pipeline agent v25.8.1 or later,
Core Operator v3.67.0 or later, and
Pipeline CLI v3.66.0 or later.
The S3 Input (SQS) source plugin (name: s3_sqs)
lets you continuously ingest new data from Amazon S3 buckets into a telemetry pipeline.
This plugin monitors an SQS queue configured to receive notifications
directly from S3
or through SNS,
then creates logs from the files described in S3 events. This plugin ignores
any SQS notifications that can’t be decoded as references to an object in an S3 bucket,
but it does not filter notifications by event type.
This is a pull-based source plugin.
This plugin doesn’t support duplicates of itself within the same pipeline.
Supported telemetry types
The for Chronosphere Telemetry Pipeline supports these telemetry types:
Requirements
To use the S3 Input (SQS) plugin, you must meet these requirements:
-
Your IAM user or IAM role must have the following permissions for the ARN of your
SQS queue:
-
Your IAM user or IAM role must have the
s3:GetObject
permission for all buckets configured to notify your SQS queue.
-
Your SQS queue must have
a redrive policy and a dead letter queue.
Additionally, Chronosphere recommends setting the
maxReceiveCount of your redrive
queue to a value greater than 1, which lets SQS retry sending messages upon failure.
Configuration parameters
Use the parameters in this section to configure the . The
Telemetry Pipeline web interface uses the items in the Name column to
describe these parameters. Pipeline configuration files
use the items in the Key column as YAML keys.
Required
| Name | Key | Description | Default |
|---|
| AWS SQS Queue Name | sqs_queue_name | Required. The name of the SQS queue whose notifications you want to monitor. | none |
| AWS SQS Queue Region | sqs_queue_region | Required if aws_sqs_endpoint isn’t set. The name of the region where your SQS queue exists. For example, us-east-1. | none |
Advanced
| Name | Key | Description | Default |
|---|
| Regular Expression Object Match | match_regexp | The regular expression for matching or excluding object keys from S3. This plugin processes notifications only for objects that match the specified regular expression. If not set, the default value of .* matches all possible object keys. | .* |
| Delete Message from SQS | delete_messages | If true, deletes SQS messages after processing the associated S3 data. If false, the plugin re-processes each message at an interval specified by your SQS visibility timeout, and continues to process each message until a redrive policy is triggered or until you meet your SQS message retention policy. Chronosphere recommends not modifying this value unless you’re testing a pipeline during its initial setup. This is because deleting SQS messages prevents the plugin from processing the same message multiple times and creating duplicate logs. Accepted values: true, false. | true |
| Line Buffer Max Size | max_line_buffer_size | The maximum line size the plugin will read from JSON or plain text files. | 10MiB |
| S3 Assume Role ARN | s3_assume_role_arn | The ARN of the IAM role for accessing S3 buckets. This can be an ARN within the same account or across accounts. | none |
| S3 Role External ID | s3_role_external_id | The external ID of the role to assume in S3. | none |
| SQS Assume Role ARN | sqs_assume_role_arn | The ARN of the IAM role for accessing the SQS queue. This can be an ARN within the same account or across accounts. | none |
| SQS Role External ID | sqs_role_external_id | The external ID of the role to assume in SQS. | none |
| SQS Queue Owner Account ID | sqs_queue_owner_account_id | The AWS account ID of the queue owner for cross-account access. | none |
| S3 Read Concurrency | s3_read_concurrency | The maximum number of concurrent S3 GetObject calls that this plugin will make. | The number of logical CPUs allocated to each pipeline replica. |
| Memory Buffer Limit | mem_buf_limit | For pipelines with the Deployment or DaemonSet workload type only. Sets a limit for how much buffered data the plugin can write to memory, which affects backpressure. This value must follow Fluent Bit’s rules for unit sizes. If unspecified, no limit is enforced. | none |
Authentication methods
The S3 Input (SQS) plugin supports the following authentication methods:
EKS Pod Identities
To use EKS Pod Identities for authentication:
-
In AWS, configure EKS Pod Identities.
-
In Pipeline CLI, add the following flag to a
create pipeline
or update pipeline command:
calyptia {create|update} pipeline --service-account VALUE
Replace VALUE with the name of the Kubernetes service account associated
with your Pods.
IMDS
To use IMDS for authentication:
IRSA
To use IRSA for authentication:
-
In AWS, set up IRSA for your EKS cluster.
-
Assign an IAM role to your Kubernetes service account.
-
In Pipeline CLI, add the following flag to a
create pipeline
or update pipeline command:
calyptia {create|update} pipeline --service-account VALUE
Replace VALUE with the name of your Kubernetes service account.
Static credentials
To use static credentials for authentication:
- In Telemetry Pipeline, create secrets
that contain the values of your IAM access keys.
These secrets must use the key names
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
You don’t need to add an explicit [reference](/pipeline-configure secrets#reference-a-secret)
to these secrets in your pipeline configuration file. If secrets with the correct
key names are present, the S3 Input (SQS) plugin automatically detects these values
and uses them for authentication.
Cross-account access
The S3 Input (SQS) plugin supports cross-account access through the following methods.
Cross-account access and authentication are independent. You can use any cross-account
access method with any authentication method.
Cross-account access using IAM roles
To set up cross-account access using IAM roles, use the following
configuration parameters:
sqs_assume_role_arn: Required.
sqs_role_external_id: Required if you need to use an external ID to assume
the SQS role.
The S3 (SQS) plugin will extract the necessary account ID from
the value of sqs_assume_role_arn, which means that the sqs_queue_owner_account_id
parameter isn’t required. However, if you do specify a value for sqs_queue_owner_account_id,
that value takes precedence over the value extracted from sqs_assume_role_arn.
Cross-account access using resource-based policies
To set up cross-account access using resource-based policies, use the
following configuration parameters:
sqs_assume_role_arn: Required if you’re using an assumed role.
sqs_role_external_id: Required if you need to use an external ID to assume
the SQS role.
sqs_queue_owner_account_id: Required if the SQS queue to which your policy
is attached has a different owner than the account specified in
sqs_assume_role_arn, or if you aren’t using assumed roles.
Supported data types
The S3 Input (SQS) plugin can ingest JSON objects and plain text
from files stored in S3 buckets, including gzip-compressed files. Additionally,
this plugin can extract and ingest compressed and uncompressed files from
tar archives.
JSON
This plugin can ingest data from JSON files with these file extensions:
If a file contains only a single JSON object, this plugin creates
a new log from that object. If a file contains multiple
newline-delimited JSON (NDJSON)
objects, this plugin creates a new log from each JSON object within that file.
Key/value pairs from JSON objects are stored as key/value pairs in the resulting log.
For JSON files that use gzip compression (with file extensions such as .json.gzip
or .json.gz), this plugin decompresses each file before processing it accordingly.
Plain text
If a file doesn’t use a file extension that identifies it as a JSON file, the
S3 Input (SQS) plugin processes that file as plain text. It creates a new log from
each line of the file and stores the content in a key named _raw within the
resulting log.
For non-JSON files that use gzip compression (with file extensions that include
the .gzip or .gz suffix), this plugin decompresses each file before processing
it accordingly.
Tar archives
The plugin can extract and consume files from tar archives with these file extensions:
After the plugin extracts these files, it processes
any JSON and plain text data accordingly, but skips directories
and symbolic links.
If files inside a tar archive are gzip-compressed, this plugin decompresses those
files accordingly.
Filtering
The S3 Input (SQS) plugin doesn’t filter notifications by event type. If a
notification contains a reference to an object in an S3 bucket, the plugin will
ingest data from that object, regardless of its associated event type.
To create filters based on event type, you must configure the notification settings
of your SQS settings in AWS. For more information, see the AWS
Event notification types and destinations
guide.
The S3 Input (SQS) plugin attaches the following metadata to each log:
__chrono_bucket: The name of the S3 bucket that contains the file from which
the log was created.
__chrono_file: The key of the S3 object from which the log was created.
__chrono_tar_file_entry: For data extracted from tar archives
only. The name of the tar archive that contained the file from which the log
was created.
Get started
To get started with the S3 Input (SQS) plugin, follow these steps.
-
Either create a new pipeline
or modify an existing pipeline.
-
For testing purposes, set the pipeline’s destination to
standard output.
-
Set the pipeline’s source to S3 Input (SQS), and then add values for all required
parameters, along with any optional parameters of your choosing.
-
Set up one of the supported authentication methods
for the S3 Input (SQS) source plugin.
-
In the Telemetry Pipeline web interface, go to the summary page for that pipeline.
-
In the Pipeline Output section,
click Get latest logs.
-
Review this log output to ensure that you’re receiving data from S3. If you don’t
receive any data, or if you encounter connection errors, review your plugin
configuration settings.
-
After you’ve confirmed that the S3 Input (SQS) plugin is functioning correctly,
you can overwrite the standard output destination with the destination where
you want to send your telemetry data.