Driving Filebeat data into separate indices (uses legacy index templates)

Introduction

When driving data into Elasticsearch from Filebeat, the default behaviour is for all data to be sent into the same destination index regardless of the source of the data. This may not always be desirable since data from different sources may have different access requirements , different retention policies, or different ingest processing requirements.

In this post, we’ll use Filebeat to send data from separate sources into multiple indices, and then we’ll use index lifecycle management (ILM), legacy index templates, and a custom ingest pipeline to further control that data.

If you are interested in using the newer composable templates and data streams to control your Filebeat data, see the blog How to manage Elasticsearch data across multiple indices with Filebeat, ILM, and data streams.

Driving different data types into different destinations

You may have several different types of data that may be collected by filebeat, such as “source1”, “source2”, etc. As these may have different access/security requirements and/or different retention requirements it may be useful to drive data from different sources into different filebeat indices.

Keep in mind that splitting the filebeat data into different destinations adds complexity to the deployment. For many customers, the default behaviour of driving all filebeat data into a single destination pattern is acceptable and does not require the custom configuration that we outline below.

Step 1 – Create alias(es)

Each destination “index” that we will specify in Filebeat will actually be an alias so that index lifecycle management (ILM) will work correctly.

We can create an alias that will work with the Filebeat configuration that we give later in this blog, as follows:

PUT filebeat-7.10.2-source1-000001
{
  "aliases": {
    "filebeat-7.10.2-source1": {
      "is_write_index": true
    }
  }
}

You would want to do the same for other data sources (eg. source2, etc).

In the above alias, by naming the index filebeat-7.10.2-source1, which includes the version number after the word filebeat, we ensure that the default template that is pushed into the cluster by filebeat will be applied to the index.

Step 2 – Define an ILM policy

You should define the index lifecycle management policy (see this link for instructions).

A single policy can be used by multiple indices, or you can define a new policy for each index. In the next section, I assume that you have created a policy called “filebeat-policy”.

Step 3 – Ensure that ILM will use the correct rollover alias

In order for ILM to automate rolling over of indices, we define the rollover alias that will be used. This can be done by creating a high-order template (that overwrites the lower-order templates) so that each unique data type will have a unique rollover alias. For example, the following template can be used to ensure that the source1 data rolls over correctly:

PUT _template/filebeat-7.10.2-source1-ilm
{
  "order": 50,
  "index_patterns": [
    "filebeat-7.10.2-source1-*"
  ],
  "settings": {
    "index": {
      "lifecycle": {
        "name": "filebeat-policy",
        "rollover_alias": "filebeat-7.10.2-source1"
      }
    }
  }
}

Step 4 – (Optional) Define a custom ingest pipeline

Below is an example of a very simple ingest pipeline that we can use to modify documents that are ingested into Elasticsearch. As we will see in the next section, this can selectively be applied to data from different sources depending on the destination index name.

If the ingest pipeline has a failure in it, then the document that triggered the failure is rejected. This is likely undesirable, and may be enhanced by including on_failure error handling into the pipeline code as shown below:

PUT _ingest/pipeline/my_custom_pipeline
{
  "description": "Put your custom ingest pipeline code here",
  "processors": [
    {
      "set": {
        "field": "my-new-field",
        "value": "some new value",
        "on_failure": [
          {
            "set": {
              "field": "error.message",
              "value": "my_custom_pipeline failed to execute set - {{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "Uncaught failure in my_custom_pipeline: {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Step 5 – (Optional) Execution of the custom ingest pipeline

The following template will ensure that our ingest pipeline will be executed against all documents arriving to indices that match the specified index patterns.

Be sure to modify the index patterns below to include all of the index names that should have our custom pipeline applied.

PUT _template/filebeat-template-to-call-my-custom-processors
{
  "order": 51,
  "index_patterns": [
    "filebeat-*-source1-*",
    "filebeat-*-source2-*"
  ],
  "settings": {
    "index": {
      "final_pipeline": "my_custom_pipeline"
    }
  },
  "mappings": {},
  "aliases": {}
}

Notice that we have used final_pipeline rather than default_pipeline. This is to ensure that calling our custom pipeline does not override any of the default Filebeat pipelines that may be called by various Filebeat modules.

Step 6 – Filebeat code to drive data into different destination indices

The following filebeat code can be used as an example of how to drive documents into different destination index aliases. Note that if the alias does not exist, then filebeat will create an index with the specified name rather than driving into an alias with the specified name, which is undesirable. Therefore, defining the alias as shown in Step 1 of this blog should be done before running filebeat with this configuration.

The example below would drive data into an alias (or index) called filebeat-7.10.2-source1 (assuming we are running Filebeat version 7.10.2).

filebeat.inputs:
- type: log
 enabled: true
 paths:
   - /tmp/<path to your input>.txt
 fields_under_root: true
 fields:
   data_type: "source1"


setup.template.enabled: true
setup.template.name: "filebeat-%{[agent.version]}"
setup.template.pattern: "filebeat-%{[agent.version]}-*"
setup.template.fields: "fields.yml"
setup.template.overwrite: false
setup.ilm.enabled: false # we handle ILM in the cluster, so not defined here


output.elasticsearch:
 hosts: ["localhost:9200"]
 indices:
   - index: "filebeat-%{[agent.version]}-%{[data_type]}"

In the above example, there are several setup.template settings which will ensure that the default filebeat templates are loaded correctly into the cluster if they do not already exist. See configure elasticsearch index template loading for more information.

Upgrading filebeat

Once you have implemented the above, then when upgrading to a new version of filebeat you will have to ensure that a new index alias is pointing to the correct underlying indices (re-execute step 1), and that ILM will use the correct alias (re-execute step 3). If these steps are done, then the new version of filebeat should be able to execute the same filebeat.yml that we have defined above in step 6 without modification.

Appendix 1 – code for testing the ingest pipeline

The pipeline below will send two documents into the pipeline given in Step 4. This can be used for validating that the pipeline is behaving as expected.

POST _ingest/pipeline/my_custom_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this is doc 1"
      }
    },
    {
      "_source": {
        "message": "this is doc 2"
      }
    }
  ]
}

Conclusion

In this post, I showed how to use Filebeat to send data from separate sources into multiple indices, and how to use index lifecycle management (ILM), legacy index templates, and a custom ingest pipeline to further control that data.

Using Kibana’s Painless Lab (Beta) to test an ingest processor script

Introduction

In several previous blog posts I have shown how a Painless script can be used to process new documents as they are ingested into an Elasticsearch cluster. In each of these posts I have made use of the simulate pipeline API to test the Painless scripts.

While developing such scripts, it may be helpful to use Painless Lab (Beta) in Kibana to debug Painless scripts. In this blog I will show how to use Painless Lab to develop and debug custom scripts, and then show how these can be then easily copied into ingest pipelines.

Example

In the blog post titled Using Elasticsearch Painless scripting to recursively iterate through JSON fields, we demonstrated how to iterate over all elements in a document, and then delete each field where the value is an empty string. The code was written as a script processor in an ingest pipeline, and then simulated.

When developing this Painless script (before putting the code into an ingest pipeline), Painless Lab can be used to catch syntax errors in real time. The code from that blog can be tested in Painless Lab as demonstrated below.

There are a few modifications to the ingest pipeline code from the previous blog to get it to execute correctly in Painless Lab.

When used in an ingest processor (which is where this will ultimately execute after it is debugged), the script will expect the “ctx” variable to contain the source of the document that is currently being processed. However, because Painless Lab does not (yet) provide a way of directly passing “ctx” to the script, this can be faked by setting “Parameters” to a JSON document with a field called “ctx” (line 2 in the above diagram on the right) that contains the “real” document as its value. We then create a variable “ctx” in the script which is set to “params.ctx” (line 17 in the above diagram on the left).

You can easily view the output by clicking on the “Output” tab as follows.

Notice that in the above “Output” the result is as expected – the “key3” has been removed as it contained an empty string. Now that we have confirmed that the script is behaving as expected, it will require some modification to get it into a state that can be used in an ingest pipeline. In the above example, line 17 and line 19 would need to be removed. You will then end up with the same script as the one that was demonstrated and verified in Using Elasticsearch Painless scripting to recursively iterate through JSON fields. It is therefore quite straightforward to copy code that has been built in Painless Lab using this technique into an ingest pipeline.

Conclusion

In this blog, I have shown how you can use the Painless Lab for debugging scripts that are used in an ingest processor. This provides real-time syntax verification, and immediate feedback on what the output document will look like.

Acknowledgemenet

Thanks to Honza Kral for pointing out the trick of setting ctx = params.ctx.

Using Elasticsearch Painless scripting to recursively iterate through JSON fields

Authors

  • Alexander Marquardt
  • Honza Kral

Introduction

Painless is a simple, secure scripting language designed specifically for use with Elasticsearch. It is the default scripting language for Elasticsearch and can safely be used for inline and stored scripts. In one of its many use cases, Painless can modify documents as they are ingested into your Elasticsearch cluster. In this use case, you may find that you would like to use Painless to evaluate every field in each document that is received by Elasticsearch. However, because of the hierarchical nature of JSON documents, how to iterate over all of the fields may be non-obvious.

This blog provides examples that demonstrate how Painless can iterate across all fields in each document that Elasticsearch receives, regardless of wheather fields appear directly in the top-level JSON body, or if they are contained in sub-documents or arrays.

Example one – remove empty fields

The following painless script called “remove_empty_fields” shows how to loop over all elements in a document, and deletes each field where the value is an empty string.

PUT _ingest/pipeline/remove_empty_fields
 {
   "processors": [
     {
       "script": {
         "lang": "painless",
         "source": """

           void iterateAllFields(def x) {
             if (x instanceof List) {
               for (def v: x) {
                 iterateAllFields(v);
               }
             }
             if (!(x instanceof Map)) {
               return;
             }
             x.entrySet().removeIf(e -> e.getValue() == "");
             for (def v: x.values()) {
               iterateAllFields(v);
             }
           }

           iterateAllFields(ctx);
       """
       }
     }
   ]
 }

Notice that we use removeIf in the above code, which will correctly remove fields with an empty string as a value. Using a more naive approach with a for loop to iterate over the fields returned by “x.entrySet()” and then executing remove statement within the for loop to directly delete an element will result in a “ConcurrentModfiicationException”, as you cannot modify the Map as it is being looped over.

We can test the above script with the following call to the simulate pipeline API as follows.

POST _ingest/pipeline/remove_empty_fields/_simulate
 {
   "docs": [
     {
       "_source": {
         "key1": "first value",
         "key2": "some other value",
         "key3": "",
         "sudoc": {
           "a": "abc",
           "b": ""
         }
       }
     },
     {
       "_source": {
         "key1": "",
         "key2": "some other value",
         "list_of_docs": [
           {
             "foo": "abc",
             "bar": ""
           },
           {
             "baz": "",
             "subdoc_in_list": {"child1": "xxx", "child2": ""}
           }
         ]
       }
     }
   ]
 }

Which will return the following results, where each field that contains an empty string has been removed.

{
   "docs" : [
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "key1" : "first value",
           "key2" : "some other value",
           "sudoc" : {
             "a" : "abc"
           }
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T10:59:29.105406Z"
         }
       }
     },
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "list_of_docs" : [
             {
               "foo" : "abc"
             },
             {
               "subdoc_in_list" : {
                 "child1" : "xxx"
               }
             }
           ],
           "key2" : "some other value"
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T10:59:29.105411Z"
         }
       }
     }
   ]
 }

Example two – remove fields where the field name matches a regular expression

The following painless script called “remove_unwanted_keys” shows how you can remove keys with a name that match a regular expression. In this example, we delete any fields where the field name starts with “unwanted_key_”.

Note that by default regexes are disabled. To load this script you will first need to set “script.painless.regex.enabled” to “true” in “elasticsearch.yml”.

PUT _ingest/pipeline/remove_unwanted_keys
 {
   "processors": [
     {
       "script": {
         "lang": "painless",
         "source": """

           void iterateAllFields(def x) {
             if (x instanceof List) {
               for (def v: x) {
                 iterateAllFields(v);
               }
             }
             if (!(x instanceof Map)) {
               return;
             }
             x.entrySet().removeIf(e -> e.getKey() =~ /unwanted_key_.*/);
             for (def v: x.values()) {
               iterateAllFields(v);
             }
           }

           iterateAllFields(ctx);
       """
       }
     }
   ]
 }

We can then test the above script with the following call to the simulate pipeline API as follows.

POST _ingest/pipeline/remove_unwanted_keys/_simulate
 {
   "docs": [
     {
       "_source": {
         "key1": "first value",
         "key2": "some other value",
         "key3": "",
         "unwanted_key_something": "get rid of this",
         "unwanted_key_2": "this too",
         "sudoc": {
           "foo": "abc",
           "bar": ""
         }
       }
     }
   ]
 }

Which will return the following results, where each field name that started with “unwanted_key_” has been removed.

{
   "docs" : [
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "key1" : "first value",
           "key2" : "some other value",
           "key3" : "",
           "sudoc" : {
             "bar" : "",
             "foo" : "abc"
           }
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T11:19:56.839119Z"
         }
       }
     }
   ]
 }

Conclusion

In this blog we have presented two examples of how all elements in a JSON document can be iterated over, regardless of if they are included in the top-level JSON, or within sub-documents or arrays.

Understanding and fixing “too many script compilations” errors in Elasticsearch

Introduction

When using Elasticsearch, in some rare instances you may see an error such as “Too many dynamic script compilations within X minutes”. Such an error may be caused by a poor script design where parameters are hard-coded. In other cases this may be due to the script cache being too small or the compilation limit being too low. In this article, I will show how to determine if these default limits are too low, and how these limits can be modified.

Warning

In this blog I will show you how to change default settings used for caching scripts Elasticsearch. Changing these to very large values may impact cluster performance and in the worst case could even cause your cluster to crash.

Script caching

Scripts are cached by default so that they only need to be recompiled when updates occur. However, as these scripts are stored in a cache, if the cache gets filled up, then some of the previously compiled scripts will be removed from the cache and would need to be recompiled again if they are needed in the future. For more information, see the documentation on script caching.

Deprecated script settings

Versions of Elasticsearch 7.8 and earlier will compile up to 15 inline scripts per minute. These compiled scripts are then stored in the script cache which by default can store up to 100 scripts.

The statistics for the script cache can be viewed with the following command:

GET /_nodes/stats?metric=script&filter_path=nodes.*.script.* 

Which should respond with something similar to the following:

{
  "nodes" : {
    "XfXvXJ7xSLynbdZBsFwG3A" : {
      "script" : {
        "compilations" : 28,
        "cache_evictions" : 0,
        "compilation_limit_triggered" : 0
      }
    },
    "pzrnXnehTrKEN0urD7j9eg" : {
      "script" : {
        "compilations" : 407081,
        "cache_evictions" : 406981,
        "compilation_limit_triggered" : 5176579
      }
    }
    ... etc ...

The numbers shown are counted since the last restart of each node. If the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may therefore indicate that the cache is too small.

A high value for compilation_limit_triggered may be a side effect of having a cache that is too small, or possibly poor script design where parameters are hard-coded .

The script cache may be configured by setting script.cache.max_size in the elasticsearch.yml configuration file as follows.

script.cache.max_size: 300

And you can dynamically set script.max_compilations_rate as follows:

PUT _cluster/settings
{
  "persistent": {
    "script.max_compilations_rate": "250/5m"
  }
}

However both of these settings are  now deprecated.

Script settings in Elasticsearch 7.9 and newer

Starting in Elasticsearch 7.9, by default scripts are stored depending on the contexts which they execute in. Contexts allow different defaults to be set for different kinds of scripts that Elasticsearch may execute. There are many contexts available, such as “watcher_transform”, “bucket aggregation”, “aggs_combine”, and many others. For those adventurous enough to look in the source code, instantiation of contexts can be seen with this search on GitHub.

Contexts are enabled by default starting in 7.9. However, if contexts (for some reason) are not currently enabled, they can be enabled with the following command:

PUT _cluster/settings
{
    "persistent": {
        "script.max_compilations_rate": "use-context"
    }
}

If contexts are used, they can be viewed with the following command:

GET /_nodes/stats?filter_path=nodes.*.script_cache.contexts

This should respond with a list of the contexts that are used for executing scripts, such as the following:

    {
        "nodes" : {
          "lqxteGihTpifU5lvV7BEmg" : {
            "script_cache" : {
            "contexts" : [
                {
                    "context" : "aggregation_selector",
                    "compilations" : 1,
                    "cache_evictions" : 0,
                    "compilation_limit_triggered" : 0
                }

                 ... etc ...
        
                {
                   "context" : "xpack_template",
                   "compilations" : 0,
                   "cache_evictions" : 0,
                   "compilation_limit_triggered" : 0
                 }
            
                 .... etc ...

If the response above is empty, then “use-context” may not be enabled, and can be enabled as described above.

As with previous versions of Elasticsearch, if the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may be an indicator that the cache is too small.

For most contexts, you can compile up to 75 scripts per 5 minutes by default. For ingest contexts, the default script compilation rate is unlimited. For most contexts, the default cache size is 100. For ingest contexts, the default cache size is 200. These defaults are given in the 7.9 documentation on how to use scripts.

You can set script.context.$CONTEXT.cache_max_size in the elasticsearch.yml configuration file. For example, to set the max size for the xpack_template context, you would add the following to elasticsearch.yml.

script.context.xpack_template.cache_max_size: 300

On the other hand,script.context.$CONTEXT.max_compilations_rate may be set dynamically. For example you can configure the compilations rate for the xpack_template context as follows:

PUT _cluster/settings
{
    "persistent": {
        "script.context.xpack_template.max_compilations_rate": "150/5m"
    }
}

Conclusion

In this blog, I have shown how you can look deeper into Elasticsearch to try to diagnose the potential cause of script compilation errors, and how to modify default settings if necessary.

Acknowledgement

Thanks to my Elastic colleague Michael Bischoff for providing guidance on how to investigate and fix the “too many script compilations within X minutes” issue.