TELEMETRY PIPELINE
HTTP API collector

HTTP API collector source plugin

The HTTP API Collector source plugin lets you retrieve data from your HTTP endpoints and ingest it into Chronosphere Telemetry Pipeline.

Supported telemetry types

This plugin supports these telemetry types:

LogsMetricsTraces

Configuration parameters

Use the parameters in this section to configure your plugin. The Telemetry Pipeline web interface uses the values in the Name column to describe the parameters. Items in the Key column are the YAML keys to use in pipeline configuration files.

General

NameKeyDescriptionDefault
URLurlRequired. The request URL. Supports templating.none
HTTP MethodmethodThe request method. Defaults to GET, or POST if body is set. Supports templating.GET
HeadersheaderAny request headers, formatted as strings separated by the new line character \n. Supports templating.User-Agent: Fluent-Bit HTTP Loader Plugin
Optional Request bodybodyThe request body. Supports templating.none
Optional Request bodystore_response_bodyWhat to store as the response body. This is meant for large response payloads so the plugin doesn't exceed the 5 MiB limit available for data storage in the Cloud at the pipeline metadata API. Supports templating.toJson .Response.Body
Request TimeouttimeoutControls the request timeout, string duration. If set, must be greater than 0s.0s
Pull Intervalpull_intervalControls the time between requests, string duration. If set, must be greater than 0s. Supports templating.1s
WaitwaitHow much time to wait before starting collection. Useful to sync with pull_interval. Supports templating. Evaluate to a string duration.0s
RetryretryControls whether to retry the current request. Supports templating. Evaluate to a Boolean.false
Max Retriesmax_retriesThe maximum number of retries.1
StopstopControls when to stop collecting. Supports templating.false

Cookie-Based Authentication

NameKeyDescriptionDefault
Auth Cookie URLauth_cookie_urlCookie-based authentication URL.none
Auth Cookie Methodauth_cookie_methodCookie-based authentication request method.Defaults to GET, or POST if auth_cookie_body is set.
Auth Cookie Headerauth_cookie_headerCookie-based authentication request headers. String separated by new line character \n.none
Auth Cookie Bodyauth_cookie_bodyCookie-based authentication request body.none

OAuth2

NameKeyDescriptionDefault
OAuth2 Client IDoauth2_client_idOAuth2 client ID.none
OAuth2 Client Secretoauth2_client_secretOAuth2 client secret. Sensible field, prefer using pipeline secrets.none
OAuth2 Scopesoauth2_scopesOAuth2 scopes. String, each scope separated by space.none
OAuth2 Additional Paramsoauth2_endpoint_paramsOAuth2 endpoint parameters. String in URL query string format.none

Digest Auth

NameKeyDescriptionDefault
Usernameauth_digest_usernameUsername for HTTP Digest authentication.none
Passwordauth_digest_passwordPassword for HTTP Digest authentication.none

Storage

NameKeyDescriptionDefault
Data Directorydata_dirControls where to store data used to resume collecting. Defaults to /data/storage if exists, or a temporary directory if available, otherwise storage is disabled./data/storage
Data Expiration Timedata_expControls how much time data can be used after resume.0s

Output

NameKeyDescriptionDefault
SkipskipControls when to skip sending records to Telemetry Pipeline. Supports templating. Defaults to ignore error status codes, and empty response body.{{or (ge .Response.StatusCode 400) (empty .Response.Body)}}
OutoutControls what to send to Telemetry Pipeline. Supports templating. Defaults to send the response body.{{toJson .Response.Body}}

Templating

The available go-templates (opens in a new tab) include the following data:

  • Index: (int) always available. Is the current fetch index.
  • Request: (Request struct) available after a successful fetch.
  • Response: (Response struct) available after a successful fetch.
  • LastRequestTime (*time.Time) stores the time when the last request was made. Available after a successful request.
  • LastResponseTime (*time.Time) stores the time when the last response was received. Available after a successful request and response roundtrip.

The Request struct includes these fields:

  • Method: string.
  • URL: *url.URL.
  • Header: http.Header.
  • Body: any.

The Response struct includes these fields:

  • StatusCode: int.
  • Header: http.Header.
  • Body: any.

You can also use any of the following functions provided by sprig (opens in a new tab):

  • timeRFC3339: func() string returns a constant RFC3339 (2006-01-02T15:04:05Z07:00) time format. Can be useful to format time.Time as JSON timestamp.
  • nextLink: func (http.Header) string returns the URL from the header Link: rel=next. Useful for pagination.
  • parseDuration: func (string) (Duration, error) parses a string as time.Duration.
  • has: func(obj any, key string) checks whether the given key exists inside an object.
  • jq: func(query string, data any) (any, error) transforms data using jq.
  • log: func(any...) prints a message to stdout.
  • logf: func(format string, args any...) prints a formatted message to stdout.
  • set_variable: func(key string, value any) store a persistent variable you can reference later on inside other go-template executions.
  • has_variable: func(key string) checks whether the given variable is set.
  • get_variable: func(key string) retrieve a previously stored persistent variable, returns the value or an empty string.
  • unset_variable: func(key string) deletes a previously stored persistent variable.

Time

A common action is to manipulate time parameters. Here are some common operations:

  • now returns the current time as time.Time type.
  • now.Format "2006-01-02T15:04:05Z07:00" returns the current time as a string in RFC3339 format.
  • now.Format timeRFC3339 same as before but using a utility constant.
  • mustToDate timeRFC3339 .Response.Body.time converts the time field inside the response body to time.Time using the RFC3339 time format.
  • .Response.Body.time | mustToDate timeRFC3339 same as before but using the pipe operator.
  • mustDateModify "+1h" now return the current time plus 1 hour as time.Time.
  • now | mustDateModify "-1h" similar to the previous one, but uses the pipe operator, and also subtracts 1 hour. - (now | mustDateModify "+1h").Format timeRFC3339 returns the current time plus 1 hour as a string in RFC3339 time format. Notice the usage of parenthesis. - ((.Response.Body.time | mustToDate timeRFC3339) | mustDateModify "+1h").Format timeRFC3339 parses the response body time field to time.Time then adds 1 hour, then formats it to string in RFC3339 time format.

Additional Go template resources

Examples

Use the following examples to help you retrieve data from your HTTP endpoints and ingest it into Chronosphere Telemetry Pipeline.

SWAPI example

Here's an example using the SWAPI (opens in a new tab) API:

pipeline:
    inputs:
        - name: http_loader
          url: |-
              {{with .Response.Body.next}}{{.}}{{else}}https://swapi.dev/api/people{{end}}
          out: |-
              {{toJson .Response.Body.results}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}

By fetching https://swapi.dev/api/people, you'll receive a response body similar to the following:

{
    "next": "https://swapi.dev/api/people/?page=2",
    "results": [
        {...},
        {...},
    ]
}

URL

If a value is returned for next in the response body, be sure to use that as the URL. Otherwise, set the default to https://swapi.dev/api/people.

Out

Select the results field from the response body and transform it to JSON. This will attempt to send an array to Telemetry Pipeline. However, arrays aren't supported, which causes the plugin to split the array and send each item in the following manner:

{ "index": 1, "value": {} }
{ "index": 2, "value": {} }
{ "index": 3, "value": {} }

Another way to visualize this is:

arr.map((value, index) => ({ index, value })).forEach(send)

Skip

With skip, you can control when to skip sending data to Telemetry Pipeline. As an example, you can skip if the response status code is greater or equal to 400, or if the results field in the response body is empty.

Transform

Say you are only interested in the name property on each result.

{
    "next": "https://swapi.dev/api/people/?page=2",
    "results": [
        {
            "name": "Luke Skywalker",
            ...
        },
        {
            "name": "C-3PO",
            ...
        }
    ]
}

Inside the go-templating, you can use jq to transform the data. Change out to the following to map over each result and extract only the name:

{{.Response.Body | jq ".results.[] | {name}" | toJson}}

JSON placeholder example

The JSON placeholder (opens in a new tab) API supports pagination, but doesn't provide a next URL to lookup. Instead, the client should pass a _page query string parameter, starting at 1:

pipeline:
    inputs:
        - name: http_loader
          url: |-
            https://jsonplaceholder.typicode.com/posts?_limit=10&_page={{add .Index 1}}
          retry: |-
            {{ge .Response.StatusCode 500}}
          max_retries: 3

Index

Inside the template context you will always find the variable .Index, which is an auto-increasing number each time the plugin makes a request. It starts at 0.

In this case, to start the pagination at 1, add 1 to the index to advance the page.

Retry

You can enable retrying of requests. In this case, you want to retry if you get a response with an status code greater than or equal to 500. Also, in the case the HTTP client fails for some other reason (perhaps a networking issue) and you don't get a response back, the request will be retried.

You can control how many tries will be performed using max_retries. In this case, the plugin will try at most three (3) times.

Okta example

When using the Okta API (opens in a new tab), it returns a Link: <url>; rel="next" header which you can use for pagination:

pipeline:
    inputs:
        - name: http_loader
          url: |-
              {{- with nextLink .Response.Header -}}
                  {{.}}
              {{- else -}}
                  https://{replaceWithYourDomain}/api/v1/logs
              {{- end -}}
          headers: "Authorization: SSWS {{secrets.oktaAPIToken}}"

Next link

There's a helper function called nextLink that takes some headers, and then finds the Link header with rel=next.

In case you get the header, use that as the URL, otherwise you'll set the default Okta URL of your domain.

Notice that with works as an if, but overrides that current template data with the result. In this case, it's equivalent to:

{{- if nextLink .Response.Header -}}
    {{nextLink .Response.Header}}

Carbon Black example

The Carbon Black (opens in a new tab) API uses an offset style pagination controlled by start and rows. You should be more interested in the time-range filtering, and mixing pagination with it.

Control time range with either a combination of time_range.start and time_range.end, which are both fixed timestamps.

There's also the option range a relative value (-2h). However, to have complete control, use fixed timestamps.

Issue a POST request with a body like the following:

{
    "time_range": {
        "start": "2023-08-14T01:00:00.000Z",
        "end": "2023-08-14T02:00:00.000Z"
    },
    "start": 1,
    "rows": 5
}

Advance the page by changing start to previous start + rows. For example, 1, 6, 11, 16, and so on.

Change time_range only after you have finished paging through the current time range.

pipeline:
    inputs:
        - name: http_loader
          url: https://defense.conferdeploy.net/api/alerts/v7/orgs/ABCD1234/alerts/_search
          header: |-
              Content-Type: application/json
              X-Auth-Token: {{secrets.blackCarbonAuthToken}}
          body: |-
              {{- $now := now.UTC.Truncate (parseDuration "1h") -}}
              {{- $timeStart := $now | mustDateModify "-1h" -}}
              {{- $timeEnd := $now -}}
              {{- $start := 1 -}}
 
              {{- if and .Request .Response -}}
                  {{- $prevTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
                  {{- $start = add .Request.Body.start 5 -}}
                  {{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
                      {{- $timeStart = $prevTimeEnd -}}
                      {{- $timeEnd = ($timeStart | mustDateModify "+1h") -}}
                      {{- $start = 1 -}}
                  {{- end -}}
 
                  {{- if $timeEnd.Before $prevTimeEnd -}}
                      {{- $timeEnd = $prevTimeEnd -}}
                      {{- $timeStart = ($timeEnd | mustDateModify "-1h") -}}
                  {{- end -}}
              {{- end -}}
 
              {"time_range":{"start":"{{$timeStart.Format timeRFC3339}}","end":"{{$timeEnd.Format timeRFC3339}}"},"start":{{$start}},"rows":5}
          out: |-
              {{toJson .Response.Body.results}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}
          pull_interval: |-
              {{- if ge .Response.StatusCode 400 -}}
                10s
              {{- else -}}
                {{- $interval := parseDuration "100ms" -}}
 
                {{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
                    {{- $interval = parseDuration "1h" -}}
                    {{- $currentTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
                    {{- $nextTimeEnd := $currentTimeEnd.Add $interval -}}
                    {{- if $nextTimeEnd.After now -}}
                        {{- $interval = $nextTimeEnd.Sub now -}}
                        {{- if le $interval.Nanoseconds 0 -}}
                            {{- $interval = parseDuration "1ns" -}}
                        {{- end -}}
                    {{- end -}}
                {{- end -}}
 
                {{$interval}}
              {{- end -}}

Make use of go-templating variables to keep order.

Body

In the body, first define a variable to hold the current timestamp truncated to a 1 hour unit. Use it to define both time range start and end. Also, set the pagination to start at 1.

The variables .Request and .Response are not initially available, and are available only after a successful fetch. The following code happens inside an if that determines if both of them are available.

There you can retrieve the previous time range end. Advanced the page by increasing the pagination start by 5 (rows).

Then, determine if less than 5 (rows) results were returned, which means you have reached the pages end, and pagination is finished. In this case, change the time range window by adding to the previous request values. You can reset pagination by setting page start to 1.

There's an additional if that checks if the time range window is less than the previous time range window, in which case it sets it to the previous one.

Finally, use these variables to construct the JSON request body.

Pull interval

Then, it's important to set pull_interval. If you're not careful, you could start to ask for time windows in the future, which wouldn't return any data.

First, do a validation check. If you receive a status code greater than or equal to 400, return 10s as pull_interval (you may even use a higher value).

Now, set an initial pull interval of 100ms. Then, in case results inside the response body is empty or less than 5 (rows), it means you have finished paginating. In such case, move the time window. To do so, increase the pull_interval to 1h. Be sure to add an if to avoid moving the time window into the future, in which case you can limit the pull interval to next - now.

Determine if the resulting pull_interval is negative, and set it to a minimum value of 1ns.

In resume, in case of error, use a pull interval of 10s. In case the pagination has ended, use 1h but maxed to now. Otherwise, if you're still paginating, set it to 100ms.

Sentinel One example

When using the Sentinel One (opens in a new tab) API, pagination uses a cursor-based style. You can also control a time window, similar to how it is done on the Carbon Black API.

The Carbon Black API used a token inside the request headers. But, because Sentinel One uses cookies, there's a separate login endpoint that you can specify in the plugin to fetch the auth cookie, and then store in the cookie jar.

pipeline:
    inputs:
        - name: http_loader
          url: |-
            {{- $now := now.UTC.Truncate (parseDuration "1h") -}}
            {{- $start := $now | mustDateModify "-1h" -}}
            {{- $end := $now -}}
 
            {{- if and .Request .Response -}}
                {{- $prevEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
                {{- if (empty .Response.Body.pagination.nextCursor) -}}
                    {{- $start = $prevEnd -}}
                    {{- $end = ($start | mustDateModify "+1h") -}}
                {{- end -}}
 
                {{- if $end.Before $prevEnd -}}
                    {{- $end = $prevEnd -}}
                    {{- $start = ($end | mustDateModify "-1h") -}}
                {{- end -}}
            {{- end -}}
 
            https://usea1-partners.sentinelone.net/web/api/v2.1/threats?updatedAt__gte={{$start.Format timeRFC3339}}&updatedAt__lte={{$end.Format timeRFC3339}}&limit=5{{with .Response.Body.pagination.nextCursor}}&cursor={{.}}{{end}}
          out: |-
              {{toJson .Response.Body.data}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) (empty .Response.Body.data)}}
          pull_interval: |-
              {{- if or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) -}}
                10s
              {{- else -}}
                {{- $interval := parseDuration "100ms" -}}
 
                {{- if empty .Response.Body.pagination.nextCursor -}}
                    {{- $interval = parseDuration "1h" -}}
                    {{- $currentEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
                    {{- $nextEnd := $currentEnd.Add $interval -}}
                    {{- if $nextEnd.After now -}}
                        {{- $interval = $nextEnd.Sub now -}}
                        {{- if le $interval.Nanoseconds 0 -}}
                            {{- $interval = parseDuration "1ns" -}}
                        {{- end -}}
                    {{- end -}}
                {{- end -}}
 
                {{$interval}}
              {{- end -}}
          auth_cookie_url: |-
            https://usea1-partners.sentinelone.net/web/api/v2.1/users/login/by-api-token
          auth_cookie_body: |-
            {"data": {"apiToken": "{{secrets.sentinelOneAPIToken}}"}}
          auth_cookie_header: |-
            Content-Type: application/json

Auth Cookie URL

If you set auth_cookie_url, the plugin issues a request to this URL before starting collecting.

Auth Cookie Exp

You can control for how much time the cookie is valid by using auth_cookie_exp, which accepts a duration string. After the cookie is expired, the plugin will attempt to renew the cookie.

This body doesn't support go-templates. The notation {{ secrets.* }} is a feature available in Telemetry Pipeline. Its syntax might resemble go-templates, but they are not go-templates.

Dynatrace example

The following examples uses the Dynatrace API (opens in a new tab). One important aspect of this API is that it uses a nextPageKey for pagination on the URL query string parameters, but this parameter is exclusive. When setting this parameter, you can set only that parameter, and not the rest of the filters.

This prevents you from referencing the previous request parameters. For that, use the has_variable, get_variable, set_variable and unset_variable helpers so you can persist data in the plugin execution. These variables are persisted even after the Telemetry Pipeline process is restarted.

pipeline:
    inputs:
        - Name: http_loader
          url: |-
              {{- $environmentID := "ggy66547" -}}
              {{- with .Response.Body.nextPageKey -}}
                  https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?nextPageKey={{.}}
              {{- else -}}
                  {{- $now := now.UTC.Truncate (parseDuration "1h") -}}
                  {{- $from := $now | mustDateModify "-1h" -}}
                  {{- $to := $now -}}
 
                  {{- if has_variable "to" -}}
                      {{- $prevTo := get_variable "to" | mustToDate timeRFC3339 -}}
                      {{- $from = $prevTo -}}
                      {{- $to = ($from | mustDateModify "+1h") -}}
 
                      {{- if $to.Before $prevTo -}}
                          {{- $to = $prevTo -}}
                          {{- $from = ($to | mustDateModify "-1h") -}}
                      {{- end -}}
                  {{- end -}}
 
                  {{- set_variable "to" ($to.Format timeRFC3339) -}}
 
                  https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?from={{$from.Format timeRFC3339}}&to={{$to.Format timeRFC3339}}&sort=timestamp&pageSize=10
              {{- end -}}
          header: "Authorization: Api-Token {{secrets.dynatraceAPIToken}}"
          out: |-
              {{toJson .Response.Body.auditLogs}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (empty .Response.Body.auditLogs)}}
          pull_interval: |-
              {{- if ge .Response.StatusCode 400 -}}
                {{ log "error:" .Response.Body }}
                10s
              {{- else -}}
                {{- $interval := parseDuration "100ms" -}}
 
                {{- if empty .Response.Body.nextPageKey -}}
                    {{- $interval = parseDuration "1h" -}}
                    {{- $currentTo := get_variable "to" | mustToDate timeRFC3339 -}}
                    {{- $nextTo := $currentTo.Add $interval -}}
                    {{- if $nextTo.After now -}}
                        {{- $interval = $nextTo.Sub now -}}
                        {{- if le $interval.Nanoseconds 0 -}}
                            {{- $interval = parseDuration "1ns" -}}
                        {{- end -}}
                    {{- end -}}
                {{- end -}}
 
                {{$interval}}
              {{- end -}}

The url go-template determines if you received a nextPageKey in the response body. In that case, use that to paginate. Otherwise, set the time window parameters, similar to the previous examples, but also make use of set_variable to persist the to parameter. Doing that lets you reference it later, and not depend on the previous request.