Driving Filebeat data into separate indices (uses legacy index templates)

Introduction

When driving data into Elasticsearch from Filebeat, the default behaviour is for all data to be sent into the same destination index regardless of the source of the data. This may not always be desirable since data from different sources may have different access requirements , different retention policies, or different ingest processing requirements.

In this post, we’ll use Filebeat to send data from separate sources into multiple indices, and then we’ll use index lifecycle management (ILM), legacy index templates, and a custom ingest pipeline to further control that data.

If you are interested in using the newer composable templates and data streams to control your Filebeat data, see the blog How to manage Elasticsearch data across multiple indices with Filebeat, ILM, and data streams.

Driving different data types into different destinations

You may have several different types of data that may be collected by filebeat, such as “source1”, “source2”, etc. As these may have different access/security requirements and/or different retention requirements it may be useful to drive data from different sources into different filebeat indices.

Keep in mind that splitting the filebeat data into different destinations adds complexity to the deployment. For many customers, the default behaviour of driving all filebeat data into a single destination pattern is acceptable and does not require the custom configuration that we outline below.

Step 1 – Create alias(es)

Each destination “index” that we will specify in Filebeat will actually be an alias so that index lifecycle management (ILM) will work correctly.

We can create an alias that will work with the Filebeat configuration that we give later in this blog, as follows:

PUT filebeat-7.10.2-source1-000001
{
  "aliases": {
    "filebeat-7.10.2-source1": {
      "is_write_index": true
    }
  }
}

You would want to do the same for other data sources (eg. source2, etc).

In the above alias, by naming the index filebeat-7.10.2-source1, which includes the version number after the word filebeat, we ensure that the default template that is pushed into the cluster by filebeat will be applied to the index.

Step 2 – Define an ILM policy

You should define the index lifecycle management policy (see this link for instructions).

A single policy can be used by multiple indices, or you can define a new policy for each index. In the next section, I assume that you have created a policy called “filebeat-policy”.

Step 3 – Ensure that ILM will use the correct rollover alias

In order for ILM to automate rolling over of indices, we define the rollover alias that will be used. This can be done by creating a high-order template (that overwrites the lower-order templates) so that each unique data type will have a unique rollover alias. For example, the following template can be used to ensure that the source1 data rolls over correctly:

PUT _template/filebeat-7.10.2-source1-ilm
{
  "order": 50,
  "index_patterns": [
    "filebeat-7.10.2-source1-*"
  ],
  "settings": {
    "index": {
      "lifecycle": {
        "name": "filebeat-policy",
        "rollover_alias": "filebeat-7.10.2-source1"
      }
    }
  }
}

Step 4 – (Optional) Define a custom ingest pipeline

Below is an example of a very simple ingest pipeline that we can use to modify documents that are ingested into Elasticsearch. As we will see in the next section, this can selectively be applied to data from different sources depending on the destination index name.

If the ingest pipeline has a failure in it, then the document that triggered the failure is rejected. This is likely undesirable, and may be enhanced by including on_failure error handling into the pipeline code as shown below:

PUT _ingest/pipeline/my_custom_pipeline
{
  "description": "Put your custom ingest pipeline code here",
  "processors": [
    {
      "set": {
        "field": "my-new-field",
        "value": "some new value",
        "on_failure": [
          {
            "set": {
              "field": "error.message",
              "value": "my_custom_pipeline failed to execute set - {{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "Uncaught failure in my_custom_pipeline: {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Step 5 – (Optional) Execution of the custom ingest pipeline

The following template will ensure that our ingest pipeline will be executed against all documents arriving to indices that match the specified index patterns.

Be sure to modify the index patterns below to include all of the index names that should have our custom pipeline applied.

PUT _template/filebeat-template-to-call-my-custom-processors
{
  "order": 51,
  "index_patterns": [
    "filebeat-*-source1-*",
    "filebeat-*-source2-*"
  ],
  "settings": {
    "index": {
      "final_pipeline": "my_custom_pipeline"
    }
  },
  "mappings": {},
  "aliases": {}
}

Notice that we have used final_pipeline rather than default_pipeline. This is to ensure that calling our custom pipeline does not override any of the default Filebeat pipelines that may be called by various Filebeat modules.

Step 6 – Filebeat code to drive data into different destination indices

The following filebeat code can be used as an example of how to drive documents into different destination index aliases. Note that if the alias does not exist, then filebeat will create an index with the specified name rather than driving into an alias with the specified name, which is undesirable. Therefore, defining the alias as shown in Step 1 of this blog should be done before running filebeat with this configuration.

The example below would drive data into an alias (or index) called filebeat-7.10.2-source1 (assuming we are running Filebeat version 7.10.2).

filebeat.inputs:
- type: log
 enabled: true
 paths:
   - /tmp/<path to your input>.txt
 fields_under_root: true
 fields:
   data_type: "source1"


setup.template.enabled: true
setup.template.name: "filebeat-%{[agent.version]}"
setup.template.pattern: "filebeat-%{[agent.version]}-*"
setup.template.fields: "fields.yml"
setup.template.overwrite: false
setup.ilm.enabled: false # we handle ILM in the cluster, so not defined here


output.elasticsearch:
 hosts: ["localhost:9200"]
 indices:
   - index: "filebeat-%{[agent.version]}-%{[data_type]}"

In the above example, there are several setup.template settings which will ensure that the default filebeat templates are loaded correctly into the cluster if they do not already exist. See configure elasticsearch index template loading for more information.

Upgrading filebeat

Once you have implemented the above, then when upgrading to a new version of filebeat you will have to ensure that a new index alias is pointing to the correct underlying indices (re-execute step 1), and that ILM will use the correct alias (re-execute step 3). If these steps are done, then the new version of filebeat should be able to execute the same filebeat.yml that we have defined above in step 6 without modification.

Appendix 1 – code for testing the ingest pipeline

The pipeline below will send two documents into the pipeline given in Step 4. This can be used for validating that the pipeline is behaving as expected.

POST _ingest/pipeline/my_custom_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this is doc 1"
      }
    },
    {
      "_source": {
        "message": "this is doc 2"
      }
    }
  ]
}

Conclusion

In this post, I showed how to use Filebeat to send data from separate sources into multiple indices, and how to use index lifecycle management (ILM), legacy index templates, and a custom ingest pipeline to further control that data.

Using Kibana’s Painless Lab (Beta) to test an ingest processor script

Introduction

In several previous blog posts I have shown how a Painless script can be used to process new documents as they are ingested into an Elasticsearch cluster. In each of these posts I have made use of the simulate pipeline API to test the Painless scripts.

While developing such scripts, it may be helpful to use Painless Lab (Beta) in Kibana to debug Painless scripts. In this blog I will show how to use Painless Lab to develop and debug custom scripts, and then show how these can be then easily copied into ingest pipelines.

Example

In the blog post titled Using Elasticsearch Painless scripting to recursively iterate through JSON fields, we demonstrated how to iterate over all elements in a document, and then delete each field where the value is an empty string. The code was written as a script processor in an ingest pipeline, and then simulated.

When developing this Painless script (before putting the code into an ingest pipeline), Painless Lab can be used to catch syntax errors in real time. The code from that blog can be tested in Painless Lab as demonstrated below.

There are a few modifications to the ingest pipeline code from the previous blog to get it to execute correctly in Painless Lab.

When used in an ingest processor (which is where this will ultimately execute after it is debugged), the script will expect the “ctx” variable to contain the source of the document that is currently being processed. However, because Painless Lab does not (yet) provide a way of directly passing “ctx” to the script, this can be faked by setting “Parameters” to a JSON document with a field called “ctx” (line 2 in the above diagram on the right) that contains the “real” document as its value. We then create a variable “ctx” in the script which is set to “params.ctx” (line 17 in the above diagram on the left).

You can easily view the output by clicking on the “Output” tab as follows.

Notice that in the above “Output” the result is as expected – the “key3” has been removed as it contained an empty string. Now that we have confirmed that the script is behaving as expected, it will require some modification to get it into a state that can be used in an ingest pipeline. In the above example, line 17 and line 19 would need to be removed. You will then end up with the same script as the one that was demonstrated and verified in Using Elasticsearch Painless scripting to recursively iterate through JSON fields. It is therefore quite straightforward to copy code that has been built in Painless Lab using this technique into an ingest pipeline.

Conclusion

In this blog, I have shown how you can use the Painless Lab for debugging scripts that are used in an ingest processor. This provides real-time syntax verification, and immediate feedback on what the output document will look like.

Acknowledgemenet

Thanks to Honza Kral for pointing out the trick of setting ctx = params.ctx.

Using Logstash and Elasticsearch to calculate transaction duration in a microservices architecture

September 16, 2020

Introduction

Elasticsearch  allows you to unify your observability data in a powerful datastore so you can search and apply interactive analytics in real time to a huge number of use cases.

In one such use case, you may be using Elasticsearch to monitor a system that is composed of multiple microservices that process a given transaction. For such a system, you may be collecting an event corresponding to when the first microservice in the system starts processing the transaction, and another event corresponding to when the last microservice in the system finishes processing the transaction. In such an approach, each event should include a field with the transaction identifier, which will allow multiple events corresponding to a single transaction to be combined for analysis.

In this blog I discuss how Elasticsearch in combination with Logstash may be used to ingest multiple events that correspond to a given transaction as it is processed by multiple microservices, and how to calculate the time difference between these different events, which I will refer to as the “transaction duration”.

The approach discussed here will work even if the events corresponding to a given transaction arrive to Logstash out-of-order, and it could be easily extended to compute delays between any microservices that process a given transaction.

A note about event ordering

If the events corresponding to a given transaction are guaranteed to arrive in order, then it may be possible to use Logstash’s Elapsed filter plugin.

Alternatively, the approach described in this article should work regardless of the order which events arrive in.

Using scripted upserts to transform data

In a previous blog post, I described how to use Logstash and Elasticsearch scripted upserts to transform data. The approach in this blog is very similar, but has the explicit goal of calculating the duration between the “start” and “end” events for a given transaction.

The approach described in this blog will ultimately result in two indices being written into Elasticsearch. One index will contain original documents corresponding to each monitoring event, and another index will contain transformed documents which will track the transaction duration.

For the purposes of this blog, we expect events to contain a “@timestamp” field, a “tags” array that contains a value of “start_event” or “end_event” somewhere in the array, and a transaction identifier which we have stored in a field called “ident”. For example, a document could look as follows:

{
  "message": "abc",
  "ident": "id1",
  "@timestamp": "2020-08-18T19:43:36.000Z",
  "other_field": "other_val 1",
  "tags": [
    "start_event"
  ]
}

As we will ultimately be using Logstash to call Elasticsearch scripted upserts to compute the duration of each transaction, it is worth highlighting that Logstash sends the source of each document into the scripted upsert as params.event rather than in the standard ctx._source that we normally expect.

The following script will calculate the time difference between the “start_time” and the “end_time” even if the end event arrives before the start event.

POST _scripts/calculate_transaction_duration
{
  "script": {
  "lang": "painless",
  "source": """
  

        def position_of_start_event_in_tags = params.event['tags'].indexOf('start_event');

        // if this is a "start event" then store the timestamp in the start_time field
        if (position_of_start_event_in_tags >= 0) {
          ctx._source['start_time'] = params.event['@timestamp']
        }
      
        def position_of_end_event_in_tags = params.event['tags'].indexOf('end_event');

        // if this is a "end event" then store the timestamp in the end_time field
        if (position_of_end_event_in_tags >= 0) {
          ctx._source['end_time'] = params.event['@timestamp']
        }
        
        // if both start and end times exist, calculate the difference 
        if (ctx._source.containsKey('start_time') && ctx._source.containsKey('end_time')) {
          ctx._source['duration_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx._source['start_time']), ZonedDateTime.parse(ctx._source['end_time']))/1000;
        }
        // OPTIONAL COPY (PROBABLY NOT NEEDED)  - copy remaining fields into the _source
        //for (x in params.event.entrySet()) {
        //  ctx._source[x.getKey()] = x.getValue();
        //}

    """
  }
}

We can then test the above script directly from Dev tools by running both of the following commands (in any order) which will update the document with an _id of “id1” in the test_script index:

POST test_script/_doc/id1/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "calculate_transaction_duration",
    "params": {
      "event": {
        "message": "abc", "ident": "id1", "@timestamp": "2020-08-18T19:43:36.000Z", "other_field": "other_val 1", "tags": ["start_event"]
      }
    }
  },
  "upsert": {}
}


POST test_script/_doc/id1/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "calculate_transaction_duration",
    "params": {
      "event": {
        "message": "def", "ident": "id1", "@timestamp": "2020-08-18T19:53:36.000Z", "other_field": "other_val 2", "tags": ["end_event"]
      }
    }
  },
  "upsert": {}
}

After running the above code, we can view the document that contains the transaction duration as follows:

GET test_script/_doc/id1

Which will respond with the following:

 {
  "_index" : "test_script",
  "_type" : "_doc",
  "_id" : "id1",
  "_version" : 2,
  "_seq_no" : 4,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "start_time" : "2020-08-18T19:43:36.000Z",
    "end_time" : "2020-08-18T19:53:36.000Z",
    "duration_in_seconds" : 600
  }
}

We now have scripted upserts working and tested within Elasticsearch. Now let’s get this working from Logstash.

The following Logstash pipeline will send two transactions each with two events into Elasticsearch. Notice that the last two events corresponding to the transaction “id2” are out-of-order. This is no issue, as the script that we demonstrated above will handle this correctly.

input {
  # The generator creates input events.
  # Notice how the events associated with id2 are "out of order"
  generator {
    lines => [
     '{"message": "abc", "ident": "id1", "@timestamp": "2020-08-18T19:43:36.000Z", "other_field": "other_val 1", "tags": ["start_event"]}',
     '{"message": "def", "ident": "id1", "@timestamp": "2020-08-18T19:53:36.000Z", "other_field": "other_val 2", "tags": ["end_event"]}',
     '{"message": "ghi", "ident": "id2", "@timestamp": "2020-08-20T19:43:56.000Z", "other_field": "other_val 4", "tags": ["end_event"]}',
     '{"message": "jkl", "ident": "id2", "@timestamp": "2020-08-20T19:43:36.000Z", "other_field": "other_val 3", "tags": ["start_event"]}'
    ]
    count => 1
    codec =>  "json"
  }
}
filter {}
output {

  # Transformed data
  elasticsearch {
    index => "transaction_duration"
    document_id => "%{ident}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "calculate_transaction_duration"
  }

  # Original data
  elasticsearch {
    index => "transaction_original"
  }
}

The above pipeline can be saved into a file called scripted-elapsed.conf and directly executed as follows:

/bin/logstash -f scripted-elapsed.conf --config.reload.automatic

After running the above Logstash pipeline, there will be two indices created in the locally running Elasticsearch. One is the index that contains the original events and is called “transaction_original”, and the other is the transformed index called “transaction_duration” that contains the duration of each transaction.

We can look at the “transaction_duration” index with the following command:

GET transaction_duration/_search

Which will respond with the following two documents which correspond to each transaction:

    "hits" : [
      {
        "_index" : "transaction_duration",
        "_type" : "_doc",
        "_id" : "id2",
        "_score" : 1.0,
        "_source" : {
          "end_time" : "2020-08-20T19:43:56.000Z",
          "start_time" : "2020-08-20T19:43:36.000Z",
          "duration_in_seconds" : 20
        }
      },
      {
        "_index" : "transaction_duration",
        "_type" : "_doc",
        "_id" : "id1",
        "_score" : 1.0,
        "_source" : {
          "end_time" : "2020-08-18T19:53:36.000Z",
          "start_time" : "2020-08-18T19:43:36.000Z",
          "duration_in_seconds" : 600
        }
      }
    ]

We have now verified that the script to calculate event duration is functioning correctly when we call it from Logstash!

Conclusion

In this blog post, I first discussed how a given transaction may result in multiple events being sent into Elasticsearch. I then showed how you can use Logstash to execute scripted upserts which calculate the duration of a given transaction by comparing the timestamps of the related events.

Storing ingest time and calculating ingest lag in Elasticsearch

Introduction

When viewing and analysing data with Elasticsearch, it is not uncommon to see visualisations and monitoring and alerting solutions that make use of timestamps that have been generated on remote/monitored systems. However, relying on remote-generated timestamps may be risky.

If there is a delay between the occurrence of a remote event and the event arriving to Elasticsearch, or if a malicious user changes the time on a remote system, then important events could fly under the radar. Therefore, when ingesting documents into Elasticsearch it is often helpful to store the ingest time of each document, as well as to monitor how long it takes for each event to arrive to the Elasticsearch cluster. A larger than normal lag may indicate that there is a problem with the ingest process or a problem with the time setting on a remote system.

In this blog I will show how to use an ingest node with the Set Processor to add an ingest timestamp to documents when they arrive at an Elasticsearch cluster. This timestamp can be used in visualisations, monitoring, and alerting.

Additionally, I will show how to use the Script Processor to calculate the ingest lag. This lag is the difference between the timestamp of when an event has occurred on a remote/monitored system versus the time that the corresponding document arrives at an Elasticsearch cluster. This can be used for ensuring that the ingest process is not causing excessive delay, and for verifying that remote timestamps are set correctly.

Adding an ingest timestamp and calculating ingest lag

Below I give an example of an ingest pipeline that adds an ingest timestamp called “ingest_time”. It also calculates the lag between the remote event timestamp and the time that the event arrives at Elasticsearch and stores this in a field called “lag_in_seconds”.

The “ingest_time” field serves two purposes: (1) it can and likely should be used as the time field in Kibana visualisations and for monitoring and alerting as discussed later in this blog, and (2) it is used in the lag calculation.

Note that we assume that each document arrives to the Elasticsearch cluster with a field called “event_timestamp” that corresponds to when each event occurred on the remote/monitored system. The name of your event timestamp field will likely be different for your data and should be modified accordingly.

We write our pipeline to Elasticsearch as follows:

PUT _ingest/pipeline/calculate_lag
{
  "description": "Add an ingest timestamp and calculate ingest lag",
  "processors": [
    {
      "set": {
        "field": "_source.ingest_time",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
            if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp")) {
              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
            }
        """
      }
    }
  ]
}

And we can then test the pipeline with the simulate API as follows:

POST _ingest/pipeline/calculate_lag/_simulate
{
  "docs": [
    {
      "_source": {
        "event_timestamp": "2019-11-07T20:39:00.000Z"
      }
    }
  ]
}

Which should respond with something similar to the following, which includes the “lag_in_seconds” and the “ingest_time” fields:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "lag_in_seconds" : 17950223,
          "ingest_time" : "2020-06-02T14:49:23.236Z",
          "event_timestamp" : "2019-11-07T20:39:00.000Z"
        },
        "_ingest" : {
          "timestamp" : "2020-06-02T14:49:23.236Z"
        }
      }
    }
  ]
}

Finally, we can write a real document to Elasticsearch with the pipeline as follows:

PUT test_index/_doc/1?pipeline=calculate_lag
{
  "event_timestamp": "2019-11-07T20:39:00.000Z",
  "other_field": "whatever"
}

And we can retrieve the document as follows

GET test_index/_doc/1

Which should respond with the following:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "lag_in_seconds" : 17950358,
    "ingest_time" : "2020-06-02T14:51:38.068Z",
    "event_timestamp" : "2019-11-07T20:39:00.000Z",
    "other_field" : "whatever"
  }
}

Specifying the pipeline in index settings

When using an ingest pipeline in a production deployment it may be preferable to apply the pipeline to the index settings, rather than specifying the pipeline in the PUT URL. This can be done by adding index.default_pipeline to the index settings as follows:

PUT test_index/_settings
{
  "index.default_pipeline": "calculate_lag"
}

Now any document that is sent into test_index will pass through the calculate_lag pipeline without the need for ?pipeline=calculate_lag in the URL. We can verify this is working with the following PUT command.

PUT test_index/_doc/2
{
  "event_timestamp": "2019-11-07T20:39:00.000Z",
  "other_field": "This is a new doc"
}

Execute the following command to see the document that we have just ingested:

GET test_index/_doc/2

Which should return an enriched document that looks like the following:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "lag_in_seconds" : 17951461,
    "ingest_time" : "2020-06-02T15:10:01.670Z",
    "event_timestamp" : "2019-11-07T20:39:00.000Z",
    "other_field" : "This is a new doc"
  }
}

How to use ingest lag

Now that ingest lag has been calculated, it can be used to trigger alerts if the lag exceeds a certain threshold or it can be fed into a machine learning job to detect unexpected deviations in the ingest processing time. It could also be displayed in Kibana dashboards.

However, examples of how to use the lag are beyond the scope of this blog.

Using the ingest timestamp

When looking at visualizations in Kibana or watching for anomalies, we often consider events that occurred in the last day or the last week. However, if we depend on the remotely-generated event timestamp as opposed to the ingest timestamp, then any lag on the ingest process may cause those documents to never be viewed or monitored. For example, if an event occurred yesterday, but just arrived at the cluster today, it would not show up in the dashboard for today’s events because its timestamp is from yesterday. Furthermore, it would not have been available when we looked at dashboards yesterday because it was not yet stored in Elasticsearch.

On the other hand, if we visualize data and set up alerts using the ingest timestamp, we are guaranteed that we are actually considering the most recent events to arrive at elasticsearch regardless of when the events occurred. This will ensure that events are not missed if the ingest process gets temporarily backed up.

Another advantage of using the ingest timestamp is related to the fact that the event timestamp could be tampered with. For example, imagine that we are monitoring a remote system and a hacker sets its internal clock to some date in the 1980s. If we depend on the remote event timestamp, it is possible that we will miss all activity that a malicious user is performing on that system — unless we specifically look for events that are stored as having occurred in the 1980s. On the other hand, if we depend on the ingest timestamp, we are guaranteed that when we look at recent events, we will consider all of the events that have recently arrived to the Elasticsearch cluster regardless of the timestamp given to the event by a remote system.

Conclusion

In this blog I have shown how to use an ingest processor to store the ingest time and calculate the lag of the ingest process. Additionally, I outlined the advantages of using the ingest time for monitoring and in visualizations, as well as the risks of using remote event timestamps.