HTTP API collector source plugin
Plugin name: http_loader
.
The HTTP API Collector source plugin lets you retrieve data from your HTTP endpoints and ingest it into Chronosphere Telemetry Pipeline.
Configuration parameters
The HTTP API Collector source plugin provides these configuration parameters. Items in the Name column display in the Calyptia Dashboard. Items in the Key column are the YAML keys to use in pipeline configuration files.
General
Name | Key | Description | Default |
---|---|---|---|
URL | url | Required. Request URL. Supports templating. | none |
HTTP Method | method | Request method. Defaults to GET , or POST if body is set. Supports templating. | GET |
Headers | header | Request headers, string separated by new line character \n . Supports templating. | User-Agent: Fluent-Bit HTTP Loader Plugin |
Optional Request body | body | Request body. Supports templating. | none |
Optional Request body | store_response_body | Supports go-templating. Allows definition of what to store as response body in storage. This is meant for large response payloads so the plugin doesn't exceed the 5 MiB limit available for data storage in the Cloud at the pipeline metadata API. | toJson .Response.Body |
Request Timeout | timeout | Controls the request timeout, string duration. If set, must be greater than 0s . | 0s |
Pull Interval | pull_interval | Controls the time between requests, string duration. If set, must be greater than 0s . Supports templating. | 1s |
Wait | wait | How much time to wait before starting collection. Useful to sync with pull_interval . Supports templating. Evaluate to a string duration. | 0s |
Retry | retry | Controls whether to retry the current request. Supports templating. Evaluate to a Boolean. | false |
Max Retries | max_retries | Maximum number of retries. | 1 |
Stop | stop | Controls when to stop collecting. Supports templating. | false |
Cookie-Based Authentication
Name | Key | Description | Default |
---|---|---|---|
Auth Cookie URL | auth_cookie_url | Cookie-based authentication URL. | none |
Auth Cookie Method | auth_cookie_method | Cookie-based authentication request method. | Defaults to GET , or POST if auth_cookie_body is set. |
Auth Cookie Header | auth_cookie_header | Cookie-based authentication request headers. String separated by new line character \n . | none |
Auth Cookie Body | auth_cookie_body | Cookie-based authentication request body. | none |
OAuth2
Name | Key | Description | Default |
---|---|---|---|
OAuth2 Client ID | oauth2_client_id | OAuth2 client ID. | none |
OAuth2 Client Secret | oauth2_client_secret | OAuth2 client secret. Sensible field, prefer using pipeline secrets. | none |
OAuth2 Scopes | oauth2_scopes | OAuth2 scopes. String, each scope separated by space. | none |
OAuth2 Additional Params | oauth2_endpoint_params | OAuth2 endpoint parameters. String in URL query string format. | none |
Storage
Name | Key | Description | Default |
---|---|---|---|
Data Directory | data_dir | Controls where to store data used to resume collecting. Defaults to /data/storage if exists, or a temporary directory if available, otherwise storage is disabled. | /data/storage |
Data Expiration Time | data_exp | Controls how much time data can be used after resume. | 0s |
Output
Name | Key | Description | Default |
---|---|---|---|
Skip | skip | Controls when to skip sending records to Fluent Bit. Supports templating. Defaults to ignore error status codes, and empty response body. | {{or (ge .Response.StatusCode 400) (empty .Response.Body)}} |
Out | out | Controls what to send to Fluent Bit. Supports templating. Defaults to send the response body. | {{toJson .Response.Body}} |
Templating
The available go-templates (opens in a new tab) include the following data:
Index
: (int
) always available. Is the current fetch index.Request
: (Request
struct) available after a successful fetch.Response
: (Response
struct) available after a successful fetch.LastRequestTime
(*time.Time
) stores the time when the last request was made. Available after a successful request.LastResponseTime
(*time.Time
) stores the time when the last response was received. Available after a successful request and response roundtrip.
The Request
struct includes these fields:
Method
:string
.URL
:*url.URL
.Header
:http.Header
.Body
:any
.
The Response
struct includes these fields:
StatusCode
:int
.Header
:http.Header
.Body
:any
.
You can also use any of the following functions provided by sprig (opens in a new tab):
timeRFC3339
:func() string
returns a constant RFC3339 (2006-01-02T15:04:05Z07:00
) time format. Can be useful to formattime.Time
as JSON timestamp.nextLink
:func (http.Header) string
returns the URL from the headerLink: rel=next
. Useful for pagination.parseDuration
:func (string) (Duration, error)
parses a string astime.Duration
.has
:func(obj any, key string)
checks whether the given key exists inside an object.jq
:func(query string, data any) (any, error)
transforms data usingjq
.log
:func(any...)
prints a message to stdout.logf
:func(format string, args any...)
prints a formatted message to stdout.set_variable
:func(key string, value any)
store a persistent variable you can reference later on inside other go-template executions.has_variable
:func(key string)
checks whether the given variable is set.get_variable
:func(key string)
retrieve a previously stored persistent variable, returns the value or an empty string.unset_variable
:func(key string)
deletes a previously stored persistent variable.
Time
A common action is to manipulate time parameters. Here are some common operations:
now
returns the current time astime.Time
type.now.Format "2006-01-02T15:04:05Z07:00"
returns the current time as a string in RFC3339 format.now.Format timeRFC3339
same as before but using a utility constant.mustToDate timeRFC3339 .Response.Body.time
converts thetime
field inside the response body totime.Time
using the RFC3339 time format..Response.Body.time | mustToDate timeRFC3339
same as before but using the pipe operator.mustDateModify "+1h" now
return the current time plus 1 hour astime.Time
.now | mustDateModify "-1h"
similar to the previous one, but uses the pipe operator, and also subtracts 1 hour. -(now | mustDateModify "+1h").Format timeRFC3339
returns the current time plus 1 hour as a string in RFC3339 time format. Notice the usage of parenthesis. -((.Response.Body.time | mustToDate timeRFC3339) | mustDateModify "+1h").Format timeRFC3339
parses the response bodytime
field totime.Time
then adds 1 hour, then formats it to string in RFC3339 time format.
Additional Go template resources
- Go package template documentation (opens in a new tab)
- Gopher Academy Blog Using Go Templates (opens in a new tab) post
- Mastering HTML templates in Go - The fundamentals (opens in a new tab), by Philipp Tanlak
- HashiCorp Learn Go template syntax (opens in a new tab) page
- Sprig Function Documentation (opens in a new tab)
Examples
Use the following examples to help you retrieve data from your HTTP endpoints and ingest it into Chronosphere Telemetry Pipeline.
SWAPI example
Here's an example using the SWAPI (opens in a new tab) API:
pipeline:
inputs:
- name: http_loader
url: |-
{{with .Response.Body.next}}{{.}}{{else}}https://swapi.dev/api/people{{end}}
out: |-
{{toJson .Response.Body.results}}
skip: |-
{{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}
By fetching https://swapi.dev/api/people
, you'll receive a response body similar to
the following:
{
"next": "https://swapi.dev/api/people/?page=2",
"results": [
{...},
{...},
]
}
URL
If a value is returned for next
in the response body, be sure to use that as the
URL. Otherwise, set the default to https://swapi.dev/api/people
.
Out
Select the results
field from the response body and transform it to JSON. This will
attempt to send an array to Fluent Bit. However, arrays aren't supported, which
causes the plugin to split the array and send each item in the following manner:
{ "index": 1, "value": {} }
{ "index": 2, "value": {} }
{ "index": 3, "value": {} }
Another way to visualize this is:
arr.map((value, index) => ({ index, value })).forEach(send)
Skip
With skip, you can control when to skip sending data to Fluent Bit. As an example,
you can skip if the response status code is greater or equal to 400, or if the
results
field in the response body is empty.
Transform
Say you are only interested in the name
property on each result.
{
"next": "https://swapi.dev/api/people/?page=2",
"results": [
{
"name": "Luke Skywalker",
...
},
{
"name": "C-3PO",
...
}
]
}
Inside the go-templating, you can use jq
to transform the data. Change out
to the
following to map over each result and extract only the name
:
{{.Response.Body | jq ".results.[] | {name}" | toJson}}
JSON placeholder example
The JSON placeholder (opens in a new tab) API supports pagination,
but doesn't provide a next
URL to lookup. Instead, the client should pass a _page
query string parameter, starting at 1
:
pipeline:
inputs:
- name: http_loader
url: |-
https://jsonplaceholder.typicode.com/posts?_limit=10&_page={{add .Index 1}}
retry: |-
{{ge .Response.StatusCode 500}}
max_retries: 3
Index
Inside the template context you will always find the variable .Index
, which is
an auto-increasing number each time the plugin makes a request. It starts at 0
.
In this case, to start the pagination at 1
, add 1
to the index to advance the
page.
Retry
You can enable retrying of requests. In this case, you want to retry if you get a
response with an status code greater than or equal to 500
. Also, in the case the
HTTP client fails for some other reason (perhaps a networking issue) and you don't
get a response back, the request will be retried.
You can control how many tries will be performed using max_retries
. In this case,
the plugin will try at most three (3
) times.
Okta example
When using the Okta API (opens in a new tab),
it returns a Link: <url>; rel="next"
header which you can use for pagination:
pipeline:
inputs:
- name: http_loader
url: |-
{{- with nextLink .Response.Header -}}
{{.}}
{{- else -}}
https://{replaceWithYourDomain}/api/v1/logs
{{- end -}}
headers: "Authorization: SSWS {{secrets.oktaAPIToken}}"
Next link
There's a helper function called nextLink
that takes some headers, and then finds
the Link
header with rel=next
.
In case you get the header, use that as the URL, otherwise you'll set the default Okta URL of your domain.
Notice that with
works as an if
, but overrides that current template data with
the result. In this case, it's equivalent to:
{{- if nextLink .Response.Header -}}
{{nextLink .Response.Header}}
Carbon Black example
The Carbon Black (opens in a new tab)
API uses an offset style pagination controlled by start
and rows
. You should be
more interested in the time-range filtering, and mixing pagination with it.
Control time range with either a combination of time_range.start
and time_range.end
,
which are both fixed timestamps.
There's also the option range
a relative value (-2h
). However, to have complete control,
use fixed timestamps.
Issue a POST
request with a body like the following:
{
"time_range": {
"start": "2023-08-14T01:00:00.000Z",
"end": "2023-08-14T02:00:00.000Z"
},
"start": 1,
"rows": 5
}
Advance the page by changing start
to previous start
+ rows
. For example,
1
, 6
, 11
, 16
, and so on.
Change time_range
only after you have finished paging through the current time
range.
pipeline:
inputs:
- name: http_loader
url: https://defense.conferdeploy.net/api/alerts/v7/orgs/ABCD1234/alerts/_search
header: |-
Content-Type: application/json
X-Auth-Token: {{secrets.blackCarbonAuthToken}}
body: |-
{{- $now := now.UTC.Truncate (parseDuration "1h") -}}
{{- $timeStart := $now | mustDateModify "-1h" -}}
{{- $timeEnd := $now -}}
{{- $start := 1 -}}
{{- if and .Request .Response -}}
{{- $prevTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
{{- $start = add .Request.Body.start 5 -}}
{{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
{{- $timeStart = $prevTimeEnd -}}
{{- $timeEnd = ($timeStart | mustDateModify "+1h") -}}
{{- $start = 1 -}}
{{- end -}}
{{- if $timeEnd.Before $prevTimeEnd -}}
{{- $timeEnd = $prevTimeEnd -}}
{{- $timeStart = ($timeEnd | mustDateModify "-1h") -}}
{{- end -}}
{{- end -}}
{"time_range":{"start":"{{$timeStart.Format timeRFC3339}}","end":"{{$timeEnd.Format timeRFC3339}}"},"start":{{$start}},"rows":5}
out: |-
{{toJson .Response.Body.results}}
skip: |-
{{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}
pull_interval: |-
{{- if ge .Response.StatusCode 400 -}}
10s
{{- else -}}
{{- $interval := parseDuration "100ms" -}}
{{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
{{- $interval = parseDuration "1h" -}}
{{- $currentTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
{{- $nextTimeEnd := $currentTimeEnd.Add $interval -}}
{{- if $nextTimeEnd.After now -}}
{{- $interval = $nextTimeEnd.Sub now -}}
{{- if le $interval.Nanoseconds 0 -}}
{{- $interval = parseDuration "1ns" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{$interval}}
{{- end -}}
Make use of go-templating variables to keep order.
Body
In the body
, first define a variable to hold the current timestamp truncated to a 1
hour unit. Use it to define both time range start and end. Also, set the pagination
to start
at 1
.
The variables .Request
and .Response
are not initially available, and are
available only after a successful fetch. The following code happens inside an if
that determines if both of them are available.
There you can retrieve the previous time range end. Advanced the page by increasing
the pagination start
by 5
(rows).
Then, determine if less than 5
(rows) results
were returned, which means you have
reached the pages end, and pagination is finished. In this case, change the time
range window by adding to the previous request values. You can reset pagination by
setting page start
to 1
.
There's an additional if
that checks if the time range window is less
than the previous time range window, in which case it sets it to the previous one.
Finally, use these variables to construct the JSON request body.
Pull interval
Then, it's important to set pull_interval
. If you're not careful, you could start
to ask for time windows in the future, which wouldn't return any data.
First, do a validation check. If you receive a status code greater than or equal to
400, return 10s
as pull_interval
(you may even use a higher value).
Now, set an initial pull interval of 100ms
. Then, in case results
inside the
response body is empty or less than 5 (rows), it means you have finished paginating.
In such case, move the time window. To do so, increase the pull_interval
to 1h
.
Be sure to add an if
to avoid moving the time window into the future, in which case
you can limit the pull interval to next - now
.
Determine if the resulting pull_interval
is negative, and set it to a minimum value
of 1ns
.
In resume, in case of error, use a pull interval of 10s
. In case the pagination has
ended, use 1h
but maxed to now. Otherwise, if you're still paginating, set it to
100ms
.
Sentinel One example
When using the Sentinel One (opens in a new tab) API, pagination uses a cursor-based style. You can also control a time window, similar to how it is done on the Carbon Black API.
The Carbon Black API used a token inside the request headers. But, because Sentinel One uses cookies, there's a separate login endpoint that you can specify in the plugin to fetch the auth cookie, and then store in the cookie jar.
pipeline:
inputs:
- name: http_loader
url: |-
{{- $now := now.UTC.Truncate (parseDuration "1h") -}}
{{- $start := $now | mustDateModify "-1h" -}}
{{- $end := $now -}}
{{- if and .Request .Response -}}
{{- $prevEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
{{- if (empty .Response.Body.pagination.nextCursor) -}}
{{- $start = $prevEnd -}}
{{- $end = ($start | mustDateModify "+1h") -}}
{{- end -}}
{{- if $end.Before $prevEnd -}}
{{- $end = $prevEnd -}}
{{- $start = ($end | mustDateModify "-1h") -}}
{{- end -}}
{{- end -}}
https://usea1-partners.sentinelone.net/web/api/v2.1/threats?updatedAt__gte={{$start.Format timeRFC3339}}&updatedAt__lte={{$end.Format timeRFC3339}}&limit=5{{with .Response.Body.pagination.nextCursor}}&cursor={{.}}{{end}}
out: |-
{{toJson .Response.Body.data}}
skip: |-
{{or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) (empty .Response.Body.data)}}
pull_interval: |-
{{- if or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) -}}
10s
{{- else -}}
{{- $interval := parseDuration "100ms" -}}
{{- if empty .Response.Body.pagination.nextCursor -}}
{{- $interval = parseDuration "1h" -}}
{{- $currentEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
{{- $nextEnd := $currentEnd.Add $interval -}}
{{- if $nextEnd.After now -}}
{{- $interval = $nextEnd.Sub now -}}
{{- if le $interval.Nanoseconds 0 -}}
{{- $interval = parseDuration "1ns" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{$interval}}
{{- end -}}
auth_cookie_url: |-
https://usea1-partners.sentinelone.net/web/api/v2.1/users/login/by-api-token
auth_cookie_body: |-
{"data": {"apiToken": "{{secrets.sentinelOneAPIToken}}"}}
auth_cookie_header: |-
Content-Type: application/json
Auth Cookie URL
If you set auth_cookie_url
, the plugin issues a request to this URL before starting
collecting.
Auth Cookie Exp
You can control for how much time the cookie is valid by using auth_cookie_exp
,
which accepts a duration string. After the cookie is expired, the plugin will attempt
to renew the cookie.
This body doesn't support go-templates. The notation {{secrets.*}}
is a feature
available in the entire Calyptia Core pipelines. It's syntax might resemble
go-templates, but they are not go-templates.
Dynatrace example
The following examples uses the
Dynatrace API (opens in a new tab).
One important aspect of this API is that it uses a nextPageKey
for pagination on
the URL query string parameters, but this parameter is exclusive. When setting this
parameter, you can set only that parameter, and not the rest of the filters.
This prevents you from referencing the previous request parameters. For that,
use the has_variable
, get_variable
, set_variable
and unset_variable
helpers
so you can persist data in the plugin execution. These variables are persisted even
after the Fluent Bit process is restarted.
pipeline:
inputs:
- Name: http_loader
url: |-
{{- $environmentID := "ggy66547" -}}
{{- with .Response.Body.nextPageKey -}}
https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?nextPageKey={{.}}
{{- else -}}
{{- $now := now.UTC.Truncate (parseDuration "1h") -}}
{{- $from := $now | mustDateModify "-1h" -}}
{{- $to := $now -}}
{{- if has_variable "to" -}}
{{- $prevTo := get_variable "to" | mustToDate timeRFC3339 -}}
{{- $from = $prevTo -}}
{{- $to = ($from | mustDateModify "+1h") -}}
{{- if $to.Before $prevTo -}}
{{- $to = $prevTo -}}
{{- $from = ($to | mustDateModify "-1h") -}}
{{- end -}}
{{- end -}}
{{- set_variable "to" ($to.Format timeRFC3339) -}}
https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?from={{$from.Format timeRFC3339}}&to={{$to.Format timeRFC3339}}&sort=timestamp&pageSize=10
{{- end -}}
header: "Authorization: Api-Token {{secrets.dynatraceAPIToken}}"
out: |-
{{toJson .Response.Body.auditLogs}}
skip: |-
{{or (ge .Response.StatusCode 400) (empty .Response.Body.auditLogs)}}
pull_interval: |-
{{- if ge .Response.StatusCode 400 -}}
{{ log "error:" .Response.Body }}
10s
{{- else -}}
{{- $interval := parseDuration "100ms" -}}
{{- if empty .Response.Body.nextPageKey -}}
{{- $interval = parseDuration "1h" -}}
{{- $currentTo := get_variable "to" | mustToDate timeRFC3339 -}}
{{- $nextTo := $currentTo.Add $interval -}}
{{- if $nextTo.After now -}}
{{- $interval = $nextTo.Sub now -}}
{{- if le $interval.Nanoseconds 0 -}}
{{- $interval = parseDuration "1ns" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{$interval}}
{{- end -}}
The url
go-template determines if you received a nextPageKey
in the response
body. In that case, use that to paginate. Otherwise, set the time window parameters,
similar to the previous examples, but also make use of set_variable
to persist the
to
parameter. Doing that lets you reference it later, and not depend on the
previous request.