HTTP API collector source plugin

Plugin name: http_loader.

HTTP API Collector lets developers gather and process data from various external sources through HTTP requests. This collector lets developers seamlessly collect data from various HTTP endpoints for processing and storage.

You can use the HTTP API Collector source plugin to configure your Calyptia Core pipeline to collect data from your HTTP endpoints.

Configuration parameters

The HTTP API Collector source plugin provides these configuration parameters.

Request

KeyDescriptionDefault
methodRequest method. Defaults to GET, or POST if body is set. Supports templating.GET
urlRequired. Request URL. Supports templating.no value
headerRequest headers, string separated by new line character \n. Supports templating.User-Agent: Fluent-Bit HTTP Loader Plugin
bodyRequest body. Supports templating.no value

TLS

KeyDescriptionDefault
tls_certPEM-encoded TLS certificate. Must be set in combination with tls_key.no value
tls_keyPEM-encoded TLS key. Must be set in combination with tls_cert.no value
ca_certPEM-encoded CA certificate.no value

TLS (as file paths)

KeyDescriptionDefault
tls_cert_filePEM encoded TLS certificate file path. Must be set in combination with tls_key_file.no value
tls_key_filePEM encoded TLS key file path. Must be set in combination with tls_cert_file.no value
ca_cert_filePEM encoded CA certificate file path.no value

Proxy

KeyDescriptionDefault
proxyProxy URL to make requests through.no value
no_proxyComma-separated list of URLs to exclude from proxying.no value

Time control

KeyDescriptionDefault
timeoutControls the request timeout, string duration. If set, must be greater than 0s.0s
pull_intervalControls the time between requests, string duration. If set, must be greater than 0s. Supports templating.1s
waitHow much time to wait before starting collection. Useful to sync with pull_interval. Supports templating. Evaluate to a string duration.0s

Output control

KeyDescriptionDefault
skipControls when to skip sending records to Fluent Bit. Supports templating. Defaults to ignore error status codes, and empty response body.{{or (ge .Response.StatusCode 400) (empty .Response.Body)}}
stopControls when the plugin will stop working. Use it dump data into Fluent Bit, and then finish.no value
outControls what to send to Fluent Bit. Supports templating. Defaults to send the response body.{{toJson .Response.Body}}

Retry

KeyDescriptionDefault
retryControls whether to retry the current request. Supports templating. Evaluate to a Boolean.false
max_retriesMaximum number of retries.1

OAuth2

KeyDescriptionDefault
oauth2_token_urlOAuth2 token endpoint at where to exchange a token. Enables OAuth2 using the client-credentials flow.no value
oauth2_client_idOAuth2 client ID.no value
oauth2_client_secretOAuth2 client secret. Sensible field, prefer using pipeline secrets.no value
oauth2_scopesOAuth2 scopes. String, each scope separated by space.no value
oauth2_endpoint_paramsOAuth2 endpoint parameters. String in URL query string format.no value

Cookie-based authentication

KeyDescriptionDefault
auth_cookie_urlCookie-based authentication URL.no value
auth_cookie_methodCookie-based authentication request method. Defaults to GET, or POST if auth_cookie_body is set.no value
auth_cookie_headerCookie-based authentication request headers. String separated by new line character \n.no value
auth_cookie_bodyCookie-based authentication request body.no value

Storage

KeyDescriptionDefault
data_dirControls where to store data, data which is used to resume collecting. Defaults to /data/storage if exists, or a temporary directory if available, otherwise storage is disabled./data/storage
data_expControls for how much time data can be used after resume.0s
store_response_bodySupports go-templating. Allows definition of what to store as response body in storage. This is meant for large response payloads so the plugin doesn't exceeds the 5 MiB limit available for data storage in the Cloud at the pipeline metadata API.toJson .Response.Body

Templating

The available go-templates (opens in a new tab) include the following data:

  • Index: (int) always available. Is the current fetch index.
  • Request: (Request struct) available after a successful fetch.
  • Response: (Response struct) available after a successful fetch.
  • LastRequestTime (*time.Time) stores the time when the last request was made. Available after a successful request.
  • LastResponseTime (*time.Time) stores the time when the last response was received. Available after a successful request and response roundtrip.

The Request struct includes these fields:

  • Method: string.
  • URL: *url.URL.
  • Header: http.Header.
  • Body: any.

The Response struct includes these fields:

  • StatusCode: int.
  • Header: http.Header.
  • Body: any.

You can also use any of the following functions provided by sprig (opens in a new tab):

  • timeRFC3339: func() string returns a constant RFC3339 (2006-01-02T15:04:05Z07:00) time format. Can be useful to format time.Time as JSON timestamp.
  • nextLink: func (http.Header) string returns the URL from the header Link: rel=next. Useful for pagination.
  • parseDuration: func (string) (Duration, error) parses a string as time.Duration.
  • has: func(obj any, key string) checks whether the given key exists inside an object.
  • jq: func(query string, data any) (any, error) transforms data using jq.
  • log: func(any...) prints a message to stdout.
  • logf: func(format string, args any...) prints a formatted message to stdout.
  • set_variable: func(key string, value any) store a persistent variable you can reference later on inside other go-template executions.
  • has_variable: func(key string) checks whether the given variable is set.
  • get_variable: func(key string) retrieve a previously stored persistent variable, returns the value or an empty string.
  • unset_variable: func(key string) deletes a previously stored persistent variable.

Time

A common action is to manipulate time parameters. Here are some common operations:

  • now returns the current time as time.Time type.
  • now.Format "2006-01-02T15:04:05Z07:00" returns the current time as a string in RFC3339 format.
  • now.Format timeRFC3339 same as before but using a utility constant.
  • mustToDate timeRFC3339 .Response.Body.time converts the time field inside the response body to time.Time using the RFC3339 time format.
  • .Response.Body.time | mustToDate timeRFC3339 same as before but using the pipe operator.
  • mustDateModify "+1h" now return the current time plus 1 hour as time.Time.
  • now | mustDateModify "-1h" similar to the previous one, but uses the pipe operator, and also subtracts 1 hour. - (now | mustDateModify "+1h").Format timeRFC3339 returns the current time plus 1 hour as a string in RFC3339 time format. Notice the usage of parenthesis. - ((.Response.Body.time | mustToDate timeRFC3339) | mustDateModify "+1h").Format timeRFC3339 parses the response body time field to time.Time then adds 1 hour, then formats it to string in RFC3339 time format.

Additional Go template resources

Example: SWAPI

Here's an example using the SWAPI (opens in a new tab) API:

pipeline:
    inputs:
        - name: http_loader
          url: |-
              {{with .Response.Body.next}}{{.}}{{else}}https://swapi.dev/api/people{{end}}
          out: |-
              {{toJson .Response.Body.results}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}

By fetching https://swapi.dev/api/people, you'll receive a response body similar to the following:

{
    "next": "https://swapi.dev/api/people/?page=2",
    "results": [
        {...},
        {...},
    ]
}

URL

If a value is returned for next in the response body, be sure to use that as the URL. Otherwise, set the default to https://swapi.dev/api/people.

Out

Select the results field from the response body and transform it to JSON. This will attempt to send an array to Fluent Bit. However, arrays aren't supported, which causes the plugin to split the array and send each item in the following manner:

{ "index": 1, "value": {} }
{ "index": 2, "value": {} }
{ "index": 3, "value": {} }

Another way to visualize this is:

arr.map((value, index) => ({ index, value })).forEach(send)

Skip

With skip, you can control when to skip sending data to Fluent Bit. As an example, you can skip if the response status code is greater or equal to 400, or if the results field in the response body is empty.

Transform

Say you are only interested in the name property on each result.

{
    "next": "https://swapi.dev/api/people/?page=2",
    "results": [
        {
            "name": "Luke Skywalker",
            ...
        },
        {
            "name": "C-3PO",
            ...
        }
    ]
}

Inside the go-templating, you can use jq to transform the data. Change out to the following to map over each result and extract only the name:

{{.Response.Body | jq ".results.[] | {name}" | toJson}}

Example: JSON placeholder

The JSON placeholder (opens in a new tab) API supports pagination, but doesn't provide a next URL to lookup. Instead, the client should pass a _page query string parameter, starting at 1:

pipeline:
    inputs:
        - name: http_loader
          url: |-
            https://jsonplaceholder.typicode.com/posts?_limit=10&_page={{add .Index 1}}
          retry: |-
            {{ge .Response.StatusCode 500}}
          max_retries: 3

Index

Inside the template context you will always find the variable .Index, which is an auto-increasing number each time the plugin makes a request. It starts at 0.

In this case, to start the pagination at 1, add 1 to the index to advance the page.

Retry

You can enable retrying of requests. In this case, you want to retry if you get a response with an status code greater than or equal to 500. Also, in the case the HTTP client fails for some other reason (perhaps a networking issue) and you don't get a response back, the request will be retried.

You can control how many tries will be performed using max_retries. In this case, the plugin will try at most three (3) times.

Example: Okta

When using the Okta API (opens in a new tab), it returns a Link: <url>; rel="next" header which you can use for pagination:

pipeline:
    inputs:
        - name: http_loader
          url: |-
              {{- with nextLink .Response.Header -}}
                  {{.}}
              {{- else -}}
                  https://{replaceWithYourDomain}/api/v1/logs
              {{- end -}}
          headers: "Authorization: SSWS {{secrets.oktaAPIToken}}"

Next link

There's a helper function called nextLink that takes some headers, and then finds the Link header with rel=next.

In case you get the header, use that as the URL, otherwise you'll set the default Okta URL of your domain.

Notice that with works as an if, but overrides that current template data with the result. In this case, it's equivalent to:

{{- if nextLink .Response.Header -}}
    {{nextLink .Response.Header}}

Example: Carbon Black

The Carbon Black (opens in a new tab) API uses an offset style pagination controlled by start and rows. You should be more interested in the time-range filtering, and mixing pagination with it.

Control time range with either a combination of time_range.start and time_range.end, which are both fixed timestamps.

There's also the option range a relative value (-2h). However, to have complete control, use fixed timestamps.

Issue a POST request with a body like the following:

{
    "time_range": {
        "start": "2023-08-14T01:00:00.000Z",
        "end": "2023-08-14T02:00:00.000Z"
    },
    "start": 1,
    "rows": 5
}

Advance the page by changing start to previous start + rows. For example, 1, 6, 11, 16, and so on.

Change time_range only after you have finished paging through the current time range.

pipeline:
    inputs:
        - name: http_loader
          url: https://defense.conferdeploy.net/api/alerts/v7/orgs/ABCD1234/alerts/_search
          header: |-
              Content-Type: application/json
              X-Auth-Token: {{secrets.blackCarbonAuthToken}}
          body: |-
              {{- $now := now.UTC.Truncate (parseDuration "1h") -}}
              {{- $timeStart := $now | mustDateModify "-1h" -}}
              {{- $timeEnd := $now -}}
              {{- $start := 1 -}}
 
              {{- if and .Request .Response -}}
                  {{- $prevTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
                  {{- $start = add .Request.Body.start 5 -}}
                  {{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
                      {{- $timeStart = $prevTimeEnd -}}
                      {{- $timeEnd = ($timeStart | mustDateModify "+1h") -}}
                      {{- $start = 1 -}}
                  {{- end -}}
 
                  {{- if $timeEnd.Before $prevTimeEnd -}}
                      {{- $timeEnd = $prevTimeEnd -}}
                      {{- $timeStart = ($timeEnd | mustDateModify "-1h") -}}
                  {{- end -}}
              {{- end -}}
 
              {"time_range":{"start":"{{$timeStart.Format timeRFC3339}}","end":"{{$timeEnd.Format timeRFC3339}}"},"start":{{$start}},"rows":5}
          out: |-
              {{toJson .Response.Body.results}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}
          pull_interval: |-
              {{- if ge .Response.StatusCode 400 -}}
                10s
              {{- else -}}
                {{- $interval := parseDuration "100ms" -}}
 
                {{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
                    {{- $interval = parseDuration "1h" -}}
                    {{- $currentTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
                    {{- $nextTimeEnd := $currentTimeEnd.Add $interval -}}
                    {{- if $nextTimeEnd.After now -}}
                        {{- $interval = $nextTimeEnd.Sub now -}}
                        {{- if le $interval.Nanoseconds 0 -}}
                            {{- $interval = parseDuration "1ns" -}}
                        {{- end -}}
                    {{- end -}}
                {{- end -}}
 
                {{$interval}}
              {{- end -}}

Make use of go-templating variables to keep order.

Body

In the body, first define a variable to hold the current timestamp truncated to a 1 hour unit. Use it to define both time range start and end. Also, set the pagination to start at 1.

The variables .Request and .Response are not initially available, and are available only after a successful fetch. The following code happens inside an if that determines if both of them are available.

There you can retrieve the previous time range end. Advanced the page by increasing the pagination start by 5 (rows).

Then, determine if less than 5 (rows) results were returned, which means you have reached the pages end, and pagination is finished. In this case, change the time range window by adding to the previous request values. You can reset pagination by setting page start to 1.

There's an additional if that checks if the time range window is less than the previous time range window, in which case it sets it to the previous one.

Finally, use these variables to construct the JSON request body.

Pull interval

Then, it's important to set pull_interval. If you're not careful, you could start to ask for time windows in the future, which wouldn't return any data.

First, do a validation check. If you receive a status code greater than or equal to 400, return 10s as pull_interval (you may even use a higher value).

Now, set an initial pull interval of 100ms. Then, in case results inside the response body is empty or less than 5 (rows), it means you have finished paginating. In such case, move the time window. To do so, increase the pull_interval to 1h. Be sure to add an if to avoid moving the time window into the future, in which case you can limit the pull interval to next - now.

Determine if the resulting pull_interval is negative, and set it to a minimum value of 1ns.

In resume, in case of error, use a pull interval of 10s. In case the pagination has ended, use 1h but maxed to now. Otherwise, if you're still paginating, set it to 100ms.

Example: Sentinel One

When using the Sentinel One (opens in a new tab) API, pagination uses a cursor-based style. You can also control a time window, similar to how it is done on the Carbon Black API.

The Carbon Black API used a token inside the request headers. But, because Sentinel One uses cookies, there's a separate login endpoint that you can specify in the plugin to fetch the auth cookie, and then store in the cookie jar.

pipeline:
    inputs:
        - name: http_loader
          url: |-
            {{- $now := now.UTC.Truncate (parseDuration "1h") -}}
            {{- $start := $now | mustDateModify "-1h" -}}
            {{- $end := $now -}}
 
            {{- if and .Request .Response -}}
                {{- $prevEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
                {{- if (empty .Response.Body.pagination.nextCursor) -}}
                    {{- $start = $prevEnd -}}
                    {{- $end = ($start | mustDateModify "+1h") -}}
                {{- end -}}
 
                {{- if $end.Before $prevEnd -}}
                    {{- $end = $prevEnd -}}
                    {{- $start = ($end | mustDateModify "-1h") -}}
                {{- end -}}
            {{- end -}}
 
            https://usea1-partners.sentinelone.net/web/api/v2.1/threats?updatedAt__gte={{$start.Format timeRFC3339}}&updatedAt__lte={{$end.Format timeRFC3339}}&limit=5{{with .Response.Body.pagination.nextCursor}}&cursor={{.}}{{end}}
          out: |-
              {{toJson .Response.Body.data}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) (empty .Response.Body.data)}}
          pull_interval: |-
              {{- if or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) -}}
                10s
              {{- else -}}
                {{- $interval := parseDuration "100ms" -}}
 
                {{- if empty .Response.Body.pagination.nextCursor -}}
                    {{- $interval = parseDuration "1h" -}}
                    {{- $currentEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
                    {{- $nextEnd := $currentEnd.Add $interval -}}
                    {{- if $nextEnd.After now -}}
                        {{- $interval = $nextEnd.Sub now -}}
                        {{- if le $interval.Nanoseconds 0 -}}
                            {{- $interval = parseDuration "1ns" -}}
                        {{- end -}}
                    {{- end -}}
                {{- end -}}
 
                {{$interval}}
              {{- end -}}
          auth_cookie_url: |-
            https://usea1-partners.sentinelone.net/web/api/v2.1/users/login/by-api-token
          auth_cookie_body: |-
            {"data": {"apiToken": "{{secrets.sentinelOneAPIToken}}"}}
          auth_cookie_header: |-
            Content-Type: application/json

Auth Cookie URL

If you set auth_cookie_url, the plugin issues a request to this URL before starting collecting.

Auth Cookie Exp

You can control for how much time the cookie is valid by using auth_cookie_exp, which accepts a duration string. After the cookie is expired, the plugin will attempt to renew the cookie.

Important: This body does not support go-templates. The notation {{secrets.*}} is a feature available in the entire Calyptia Core pipelines. It's syntax might resemble go-templates but they are not.

Example: Dynatrace

The following examples uses the Dynatrace API (opens in a new tab). One important aspect of this API is that it uses a nextPageKey for pagination on the URL query string parameters, but this parameter is exclusive. When setting this parameter, you can set only that parameter, and not the rest of the filters.

This prevents you from referencing the previous request parameters. For that, use the has_variable, get_variable, set_variable and unset_variable helpers so you can persist data in the plugin execution. These variables are persisted even after the Fluent Bit process is restarted.

pipeline:
    inputs:
        - Name: http_loader
          url: |-
              {{- $environmentID := "ggy66547" -}}
              {{- with .Response.Body.nextPageKey -}}
                  https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?nextPageKey={{.}}
              {{- else -}}
                  {{- $now := now.UTC.Truncate (parseDuration "1h") -}}
                  {{- $from := $now | mustDateModify "-1h" -}}
                  {{- $to := $now -}}
 
                  {{- if has_variable "to" -}}
                      {{- $prevTo := get_variable "to" | mustToDate timeRFC3339 -}}
                      {{- $from = $prevTo -}}
                      {{- $to = ($from | mustDateModify "+1h") -}}
 
                      {{- if $to.Before $prevTo -}}
                          {{- $to = $prevTo -}}
                          {{- $from = ($to | mustDateModify "-1h") -}}
                      {{- end -}}
                  {{- end -}}
 
                  {{- set_variable "to" ($to.Format timeRFC3339) -}}
 
                  https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?from={{$from.Format timeRFC3339}}&to={{$to.Format timeRFC3339}}&sort=timestamp&pageSize=10
              {{- end -}}
          header: "Authorization: Api-Token {{secrets.dynatraceAPIToken}}"
          out: |-
              {{toJson .Response.Body.auditLogs}}
          skip: |-
              {{or (ge .Response.StatusCode 400) (empty .Response.Body.auditLogs)}}
          pull_interval: |-
              {{- if ge .Response.StatusCode 400 -}}
                {{ log "error:" .Response.Body }}
                10s
              {{- else -}}
                {{- $interval := parseDuration "100ms" -}}
 
                {{- if empty .Response.Body.nextPageKey -}}
                    {{- $interval = parseDuration "1h" -}}
                    {{- $currentTo := get_variable "to" | mustToDate timeRFC3339 -}}
                    {{- $nextTo := $currentTo.Add $interval -}}
                    {{- if $nextTo.After now -}}
                        {{- $interval = $nextTo.Sub now -}}
                        {{- if le $interval.Nanoseconds 0 -}}
                            {{- $interval = parseDuration "1ns" -}}
                        {{- end -}}
                    {{- end -}}
                {{- end -}}
 
                {{$interval}}
              {{- end -}}

The url go-template determines if you received a nextPageKey in the response body. In that case, use that to paginate. Otherwise, set the time window parameters, similar to the previous examples, but also make use of set_variable to persist the to parameter. Doing that lets you reference it later, and not depend on the previous request.