TELEMETRY PIPELINE
S3 Input (SQS)

S3 Input (SQS) source plugin

Requires pipeline agent v25.8.1 or later, Core Operator v3.67.0 or later, and Pipeline CLI v3.66.0 or later.

The S3 Input (SQS) source plugin (name: s3_sqs) lets you continuously ingest new data from Amazon S3 buckets into a telemetry pipeline. This plugin monitors an SQS queue configured to receive notifications directly from S3 (opens in a new tab) or through SNS (opens in a new tab), then creates logs from the files described in S3 events. This plugin ignores any SQS notifications that can’t be decoded as references to an object in an S3 bucket, but it does not filter notifications by event type.

This is a pull-based source plugin.

This plugin doesn’t support duplicates of itself within the same pipeline.

Supported telemetry types

This plugin for Chronosphere Telemetry Pipeline supports these telemetry types:

LogsMetricsTraces

Requirements

To use the S3 Input (SQS) plugin, you must meet these requirements:

Configuration parameters

Use the parameters in this section to configure your plugin. The Telemetry Pipeline web interface uses the values in the Name column to describe the parameters. Items in the Key column are the YAML keys to use in pipeline configuration files.

Required

NameKeyDescriptionDefault
AWS SQS Queue Namesqs_queue_nameRequired. The name of the SQS queue whose notifications you want to monitor.none
AWS SQS Queue Regionsqs_queue_regionRequired if aws_sqs_endpoint isn’t set. The name of the region where your SQS queue exists. For example, us-east-1.none

Advanced

NameKeyDescriptionDefault
Regular Expression Object Matchmatch_regexpThe regular expression for matching or excluding object keys from S3. This plugin processes notifications only for objects that match the specified regular expression. If not set, the default value of .* matches all possible object keys..*
Delete Message from SQSdelete_messagesIf true, deletes SQS messages after processing the associated S3 data. If false, the plugin re-processes each message at an interval specified by your SQS visibility timeout (opens in a new tab), and continues to process each message until a redrive policy (opens in a new tab) is triggered or until you meet your SQS message retention policy. Chronosphere recommends not modifying this value unless you’re testing a pipeline during its initial setup. This is because deleting SQS messages prevents the plugin from processing the same message multiple times and creating duplicate logs. Accepted values: true, false.true
Line Buffer Max Sizemax_line_buffer_sizeThe maximum line size the plugin will read from JSON or plain text files.10MiB
S3 Assume Role ARNs3_assume_role_arnThe ARN of the role to assume in S3. This can be an ARN within the same account or across accounts.none
S3 Role External IDs3_role_external_idThe external ID (opens in a new tab) of the role to assume in S3.none
SQS Role ARNsqs_cross_account_role_arnThe ARN of the role to assume in SQS. This can be an ARN within the same account or across accounts.none
SQS Role External IDsqs_role_external_idThe external ID (opens in a new tab) of the role to assume in SQS.none
S3 Read Concurrencys3_read_concurrencyThe maximum number of concurrent S3 GetObject (opens in a new tab) calls that this plugin will make.The number of logical CPUs allocated to each pipeline replica.

Authentication methods

The S3 Input (SQS) plugin supports the following authentication methods:

EKS Pod Identities

To use EKS Pod Identities for authentication:

  1. In AWS, configure EKS Pod Identities (opens in a new tab).

  2. In Pipeline CLI, add the following flag to a create pipeline or update pipeline command:

    calyptia {create|update} pipeline --service-account VALUE

    Replace VALUE with the name of the Kubernetes service account associated with your Pods.

IMDS

To use IMDS for authentication:

IRSA

To use IRSA for authentication:

  1. In AWS, set up IRSA for your EKS cluster (opens in a new tab).

  2. Assign an IAM role to your Kubernetes service account (opens in a new tab).

  3. In Pipeline CLI, add the following flag to a create pipeline or update pipeline command:

    calyptia {create|update} pipeline --service-account VALUE

    Replace VALUE with the name of your Kubernetes service account.

Static credentials

To use static credentials for authentication:

You don’t need to add an explicit reference to these secrets in your pipeline configuration file. If secrets with the correct key names are present, the S3 Input (SQS) plugin automatically detects these values and uses them for authentication.

Supported data types

The S3 Input (SQS) plugin can ingest JSON objects and plain text from files stored in S3 buckets, including gzip-compressed files. Additionally, this plugin can extract and ingest compressed and uncompressed files from tar archives.

JSON

This plugin can ingest data from JSON files with these file extensions:

  • .json
  • .jsonl
  • .ndjson

If a file contains only a single JSON object, this plugin creates a new log from that object. If a file contains multiple newline-delimited JSON (NDJSON) (opens in a new tab) objects, this plugin creates a new log from each JSON object within that file. Key/value pairs from JSON objects are stored as key/value pairs in the resulting log.

For JSON files that use gzip compression (with file extensions such as .json.gzip or .json.gz), this plugin decompresses each file before processing it accordingly.

Plain text

If a file doesn’t use a file extension that identifies it as a JSON file, the S3 Input (SQS) plugin processes that file as plain text. It creates a new log from each line of the file and stores the content in a key named _raw within the resulting log.

For non-JSON files that use gzip compression (with file extensions that include the .gzip or .gz suffix), this plugin decompresses each file before processing it accordingly.

Tar archives

The plugin can extract and consume files from tar archives with these file extensions:

  • .tar
  • .tar.gz
  • .tar.gzip

After the plugin extracts these files, it processes any JSON and plain text data accordingly, but skips directories and symbolic links.

If files inside a tar archive are gzip-compressed, this plugin decompresses those files accordingly.

Filtering

The S3 Input (SQS) plugin doesn’t filter notifications by event type. If a notification contains a reference to an object in an S3 bucket, the plugin will ingest data from that object, regardless of its associated event type.

To create filters based on event type, you must configure the notification settings of your SQS settings in AWS. For more information, see the AWS Event notification types and destinations (opens in a new tab) guide.

Metadata

The S3 Input (SQS) plugin attaches the following metadata to each log:

  • __chrono_bucket: The name of the S3 bucket that contains the file from which the log was created.
  • __chrono_file: The key of the S3 object from which the log was created.
  • __chrono_tar_file_entry: For data extracted from tar archives only. The name of the tar archive that contained the file from which the log was created.

Get started

To get started with the S3 Input (SQS) plugin, follow these steps.

  1. Either create a new pipeline (opens in a new tab) or modify an existing pipeline (opens in a new tab).

  2. For testing purposes, set the pipeline’s destination to standard output (opens in a new tab).

  3. Set the pipeline’s source to S3 Input (SQS), and then add values for all required parameters, along with any optional parameters of your choosing.

  4. Set up one of the supported authentication methods for the S3 Input (SQS) source plugin.

  5. In the Telemetry Pipeline web interface, go to the summary page for that pipeline.

  6. In the Pipeline Output section, click Get latest logs.

  7. Review this log output to ensure that you’re receiving data from S3. If you don’t receive any data, or if you encounter connection errors, review your plugin configuration settings.

  8. After you’ve confirmed that the S3 Input (SQS) plugin is functioning correctly, you can overwrite the standard output destination with the destination where you want to send your telemetry data.