Using Kibana’s Painless Lab (Beta) to test an ingest processor script

Introduction

In several previous blog posts I have shown how a Painless script can be used to process new documents as they are ingested into an Elasticsearch cluster. In each of these posts I have made use of the simulate pipeline API to test the Painless scripts.

While developing such scripts, it may be helpful to use Painless Lab (Beta) in Kibana to debug Painless scripts. In this blog I will show how to use Painless Lab to develop and debug custom scripts, and then show how these can be then easily copied into ingest pipelines.

Example

In the blog post titled Using Elasticsearch Painless scripting to recursively iterate through JSON fields, we demonstrated how to iterate over all elements in a document, and then delete each field where the value is an empty string. The code was written as a script processor in an ingest pipeline, and then simulated.

When developing this Painless script (before putting the code into an ingest pipeline), Painless Lab can be used to catch syntax errors in real time. The code from that blog can be tested in Painless Lab as demonstrated below.

There are a few modifications to the ingest pipeline code from the previous blog to get it to execute correctly in Painless Lab.

When used in an ingest processor (which is where this will ultimately execute after it is debugged), the script will expect the “ctx” variable to contain the source of the document that is currently being processed. However, because Painless Lab does not (yet) provide a way of directly passing “ctx” to the script, this can be faked by setting “Parameters” to a JSON document with a field called “ctx” (line 2 in the above diagram on the right) that contains the “real” document as its value. We then create a variable “ctx” in the script which is set to “params.ctx” (line 17 in the above diagram on the left).

You can easily view the output by clicking on the “Output” tab as follows.

Notice that in the above “Output” the result is as expected – the “key3” has been removed as it contained an empty string. Now that we have confirmed that the script is behaving as expected, it will require some modification to get it into a state that can be used in an ingest pipeline. In the above example, line 17 and line 19 would need to be removed. You will then end up with the same script as the one that was demonstrated and verified in Using Elasticsearch Painless scripting to recursively iterate through JSON fields. It is therefore quite straightforward to copy code that has been built in Painless Lab using this technique into an ingest pipeline.

Conclusion

In this blog, I have shown how you can use the Painless Lab for debugging scripts that are used in an ingest processor. This provides real-time syntax verification, and immediate feedback on what the output document will look like.

Acknowledgemenet

Thanks to Honza Kral for pointing out the trick of setting ctx = params.ctx.

Using Elasticsearch Painless scripting to recursively iterate through JSON fields

Authors

  • Alexander Marquardt
  • Honza Kral

Introduction

Painless is a simple, secure scripting language designed specifically for use with Elasticsearch. It is the default scripting language for Elasticsearch and can safely be used for inline and stored scripts. In one of its many use cases, Painless can modify documents as they are ingested into your Elasticsearch cluster. In this use case, you may find that you would like to use Painless to evaluate every field in each document that is received by Elasticsearch. However, because of the hierarchical nature of JSON documents, how to iterate over all of the fields may be non-obvious.

This blog provides examples that demonstrate how Painless can iterate across all fields in each document that Elasticsearch receives, regardless of wheather fields appear directly in the top-level JSON body, or if they are contained in sub-documents or arrays.

Example one – remove empty fields

The following painless script called “remove_empty_fields” shows how to loop over all elements in a document, and deletes each field where the value is an empty string.

PUT _ingest/pipeline/remove_empty_fields
 {
   "processors": [
     {
       "script": {
         "lang": "painless",
         "source": """

           void iterateAllFields(def x) {
             if (x instanceof List) {
               for (def v: x) {
                 iterateAllFields(v);
               }
             }
             if (!(x instanceof Map)) {
               return;
             }
             x.entrySet().removeIf(e -> e.getValue() == "");
             for (def v: x.values()) {
               iterateAllFields(v);
             }
           }

           iterateAllFields(ctx);
       """
       }
     }
   ]
 }

Notice that we use removeIf in the above code, which will correctly remove fields with an empty string as a value. Using a more naive approach with a for loop to iterate over the fields returned by “x.entrySet()” and then executing remove statement within the for loop to directly delete an element will result in a “ConcurrentModfiicationException”, as you cannot modify the Map as it is being looped over.

We can test the above script with the following call to the simulate pipeline API as follows.

POST _ingest/pipeline/remove_empty_fields/_simulate
 {
   "docs": [
     {
       "_source": {
         "key1": "first value",
         "key2": "some other value",
         "key3": "",
         "sudoc": {
           "a": "abc",
           "b": ""
         }
       }
     },
     {
       "_source": {
         "key1": "",
         "key2": "some other value",
         "list_of_docs": [
           {
             "foo": "abc",
             "bar": ""
           },
           {
             "baz": "",
             "subdoc_in_list": {"child1": "xxx", "child2": ""}
           }
         ]
       }
     }
   ]
 }

Which will return the following results, where each field that contains an empty string has been removed.

{
   "docs" : [
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "key1" : "first value",
           "key2" : "some other value",
           "sudoc" : {
             "a" : "abc"
           }
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T10:59:29.105406Z"
         }
       }
     },
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "list_of_docs" : [
             {
               "foo" : "abc"
             },
             {
               "subdoc_in_list" : {
                 "child1" : "xxx"
               }
             }
           ],
           "key2" : "some other value"
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T10:59:29.105411Z"
         }
       }
     }
   ]
 }

Example two – remove fields where the field name matches a regular expression

The following painless script called “remove_unwanted_keys” shows how you can remove keys with a name that match a regular expression. In this example, we delete any fields where the field name starts with “unwanted_key_”.

Note that by default regexes are disabled. To load this script you will first need to set “script.painless.regex.enabled” to “true” in “elasticsearch.yml”.

PUT _ingest/pipeline/remove_unwanted_keys
 {
   "processors": [
     {
       "script": {
         "lang": "painless",
         "source": """

           void iterateAllFields(def x) {
             if (x instanceof List) {
               for (def v: x) {
                 iterateAllFields(v);
               }
             }
             if (!(x instanceof Map)) {
               return;
             }
             x.entrySet().removeIf(e -> e.getKey() =~ /unwanted_key_.*/);
             for (def v: x.values()) {
               iterateAllFields(v);
             }
           }

           iterateAllFields(ctx);
       """
       }
     }
   ]
 }

We can then test the above script with the following call to the simulate pipeline API as follows.

POST _ingest/pipeline/remove_unwanted_keys/_simulate
 {
   "docs": [
     {
       "_source": {
         "key1": "first value",
         "key2": "some other value",
         "key3": "",
         "unwanted_key_something": "get rid of this",
         "unwanted_key_2": "this too",
         "sudoc": {
           "foo": "abc",
           "bar": ""
         }
       }
     }
   ]
 }

Which will return the following results, where each field name that started with “unwanted_key_” has been removed.

{
   "docs" : [
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "key1" : "first value",
           "key2" : "some other value",
           "key3" : "",
           "sudoc" : {
             "bar" : "",
             "foo" : "abc"
           }
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T11:19:56.839119Z"
         }
       }
     }
   ]
 }

Conclusion

In this blog we have presented two examples of how all elements in a JSON document can be iterated over, regardless of if they are included in the top-level JSON, or within sub-documents or arrays.

Understanding and fixing “too many script compilations” errors in Elasticsearch

Introduction

When using Elasticsearch, in some rare instances you may see an error such as “Too many dynamic script compilations within X minutes”. Such an error may be caused by a poor script design where parameters are hard-coded. In other cases this may be due to the script cache being too small or the compilation limit being too low. In this article, I will show how to determine if these default limits are too low, and how these limits can be modified.

Warning

In this blog I will show you how to change default settings used for caching scripts Elasticsearch. Changing these to very large values may impact cluster performance and in the worst case could even cause your cluster to crash.

Script caching

Scripts are cached by default so that they only need to be recompiled when updates occur. However, as these scripts are stored in a cache, if the cache gets filled up, then some of the previously compiled scripts will be removed from the cache and would need to be recompiled again if they are needed in the future. For more information, see the documentation on script caching.

Deprecated script settings

Versions of Elasticsearch 7.8 and earlier will compile up to 15 inline scripts per minute. These compiled scripts are then stored in the script cache which by default can store up to 100 scripts.

The statistics for the script cache can be viewed with the following command:

GET /_nodes/stats?metric=script&filter_path=nodes.*.script.* 

Which should respond with something similar to the following:

{
  "nodes" : {
    "XfXvXJ7xSLynbdZBsFwG3A" : {
      "script" : {
        "compilations" : 28,
        "cache_evictions" : 0,
        "compilation_limit_triggered" : 0
      }
    },
    "pzrnXnehTrKEN0urD7j9eg" : {
      "script" : {
        "compilations" : 407081,
        "cache_evictions" : 406981,
        "compilation_limit_triggered" : 5176579
      }
    }
    ... etc ...

The numbers shown are counted since the last restart of each node. If the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may therefore indicate that the cache is too small.

A high value for compilation_limit_triggered may be a side effect of having a cache that is too small, or possibly poor script design where parameters are hard-coded .

The script cache may be configured by setting script.cache.max_size in the elasticsearch.yml configuration file as follows.

script.cache.max_size: 300

And you can dynamically set script.max_compilations_rate as follows:

PUT _cluster/settings
{
  "persistent": {
    "script.max_compilations_rate": "250/5m"
  }
}

However both of these settings are  now deprecated.

Script settings in Elasticsearch 7.9 and newer

Starting in Elasticsearch 7.9, by default scripts are stored depending on the contexts which they execute in. Contexts allow different defaults to be set for different kinds of scripts that Elasticsearch may execute. There are many contexts available, such as “watcher_transform”, “bucket aggregation”, “aggs_combine”, and many others. For those adventurous enough to look in the source code, instantiation of contexts can be seen with this search on GitHub.

Contexts are enabled by default starting in 7.9. However, if contexts (for some reason) are not currently enabled, they can be enabled with the following command:

PUT _cluster/settings
{
    "persistent": {
        "script.max_compilations_rate": "use-context"
    }
}

If contexts are used, they can be viewed with the following command:

GET /_nodes/stats?filter_path=nodes.*.script_cache.contexts

This should respond with a list of the contexts that are used for executing scripts, such as the following:

    {
        "nodes" : {
          "lqxteGihTpifU5lvV7BEmg" : {
            "script_cache" : {
            "contexts" : [
                {
                    "context" : "aggregation_selector",
                    "compilations" : 1,
                    "cache_evictions" : 0,
                    "compilation_limit_triggered" : 0
                }

                 ... etc ...
        
                {
                   "context" : "xpack_template",
                   "compilations" : 0,
                   "cache_evictions" : 0,
                   "compilation_limit_triggered" : 0
                 }
            
                 .... etc ...

If the response above is empty, then “use-context” may not be enabled, and can be enabled as described above.

As with previous versions of Elasticsearch, if the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may be an indicator that the cache is too small.

For most contexts, you can compile up to 75 scripts per 5 minutes by default. For ingest contexts, the default script compilation rate is unlimited. For most contexts, the default cache size is 100. For ingest contexts, the default cache size is 200. These defaults are given in the 7.9 documentation on how to use scripts.

You can set script.context.$CONTEXT.cache_max_size in the elasticsearch.yml configuration file. For example, to set the max size for the xpack_template context, you would add the following to elasticsearch.yml.

script.context.xpack_template.cache_max_size: 300

On the other hand,script.context.$CONTEXT.max_compilations_rate may be set dynamically. For example you can configure the compilations rate for the xpack_template context as follows:

PUT _cluster/settings
{
    "persistent": {
        "script.context.xpack_template.max_compilations_rate": "150/5m"
    }
}

Conclusion

In this blog, I have shown how you can look deeper into Elasticsearch to try to diagnose the potential cause of script compilation errors, and how to modify default settings if necessary.

Acknowledgement

Thanks to my Elastic colleague Michael Bischoff for providing guidance on how to investigate and fix the “too many script compilations within X minutes” issue.

Using Logstash and Elasticsearch to calculate transaction duration in a microservices architecture

September 16, 2020

Introduction

Elasticsearch  allows you to unify your observability data in a powerful datastore so you can search and apply interactive analytics in real time to a huge number of use cases.

In one such use case, you may be using Elasticsearch to monitor a system that is composed of multiple microservices that process a given transaction. For such a system, you may be collecting an event corresponding to when the first microservice in the system starts processing the transaction, and another event corresponding to when the last microservice in the system finishes processing the transaction. In such an approach, each event should include a field with the transaction identifier, which will allow multiple events corresponding to a single transaction to be combined for analysis.

In this blog I discuss how Elasticsearch in combination with Logstash may be used to ingest multiple events that correspond to a given transaction as it is processed by multiple microservices, and how to calculate the time difference between these different events, which I will refer to as the “transaction duration”.

The approach discussed here will work even if the events corresponding to a given transaction arrive to Logstash out-of-order, and it could be easily extended to compute delays between any microservices that process a given transaction.

A note about event ordering

If the events corresponding to a given transaction are guaranteed to arrive in order, then it may be possible to use Logstash’s Elapsed filter plugin.

Alternatively, the approach described in this article should work regardless of the order which events arrive in.

Using scripted upserts to transform data

In a previous blog post, I described how to use Logstash and Elasticsearch scripted upserts to transform data. The approach in this blog is very similar, but has the explicit goal of calculating the duration between the “start” and “end” events for a given transaction.

The approach described in this blog will ultimately result in two indices being written into Elasticsearch. One index will contain original documents corresponding to each monitoring event, and another index will contain transformed documents which will track the transaction duration.

For the purposes of this blog, we expect events to contain a “@timestamp” field, a “tags” array that contains a value of “start_event” or “end_event” somewhere in the array, and a transaction identifier which we have stored in a field called “ident”. For example, a document could look as follows:

{
  "message": "abc",
  "ident": "id1",
  "@timestamp": "2020-08-18T19:43:36.000Z",
  "other_field": "other_val 1",
  "tags": [
    "start_event"
  ]
}

As we will ultimately be using Logstash to call Elasticsearch scripted upserts to compute the duration of each transaction, it is worth highlighting that Logstash sends the source of each document into the scripted upsert as params.event rather than in the standard ctx._source that we normally expect.

The following script will calculate the time difference between the “start_time” and the “end_time” even if the end event arrives before the start event.

POST _scripts/calculate_transaction_duration
{
  "script": {
  "lang": "painless",
  "source": """
  

        def position_of_start_event_in_tags = params.event['tags'].indexOf('start_event');

        // if this is a "start event" then store the timestamp in the start_time field
        if (position_of_start_event_in_tags >= 0) {
          ctx._source['start_time'] = params.event['@timestamp']
        }
      
        def position_of_end_event_in_tags = params.event['tags'].indexOf('end_event');

        // if this is a "end event" then store the timestamp in the end_time field
        if (position_of_end_event_in_tags >= 0) {
          ctx._source['end_time'] = params.event['@timestamp']
        }
        
        // if both start and end times exist, calculate the difference 
        if (ctx._source.containsKey('start_time') && ctx._source.containsKey('end_time')) {
          ctx._source['duration_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx._source['start_time']), ZonedDateTime.parse(ctx._source['end_time']))/1000;
        }
        // OPTIONAL COPY (PROBABLY NOT NEEDED)  - copy remaining fields into the _source
        //for (x in params.event.entrySet()) {
        //  ctx._source[x.getKey()] = x.getValue();
        //}

    """
  }
}

We can then test the above script directly from Dev tools by running both of the following commands (in any order) which will update the document with an _id of “id1” in the test_script index:

POST test_script/_doc/id1/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "calculate_transaction_duration",
    "params": {
      "event": {
        "message": "abc", "ident": "id1", "@timestamp": "2020-08-18T19:43:36.000Z", "other_field": "other_val 1", "tags": ["start_event"]
      }
    }
  },
  "upsert": {}
}


POST test_script/_doc/id1/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "calculate_transaction_duration",
    "params": {
      "event": {
        "message": "def", "ident": "id1", "@timestamp": "2020-08-18T19:53:36.000Z", "other_field": "other_val 2", "tags": ["end_event"]
      }
    }
  },
  "upsert": {}
}

After running the above code, we can view the document that contains the transaction duration as follows:

GET test_script/_doc/id1

Which will respond with the following:

 {
  "_index" : "test_script",
  "_type" : "_doc",
  "_id" : "id1",
  "_version" : 2,
  "_seq_no" : 4,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "start_time" : "2020-08-18T19:43:36.000Z",
    "end_time" : "2020-08-18T19:53:36.000Z",
    "duration_in_seconds" : 600
  }
}

We now have scripted upserts working and tested within Elasticsearch. Now let’s get this working from Logstash.

The following Logstash pipeline will send two transactions each with two events into Elasticsearch. Notice that the last two events corresponding to the transaction “id2” are out-of-order. This is no issue, as the script that we demonstrated above will handle this correctly.

input {
  # The generator creates input events.
  # Notice how the events associated with id2 are "out of order"
  generator {
    lines => [
     '{"message": "abc", "ident": "id1", "@timestamp": "2020-08-18T19:43:36.000Z", "other_field": "other_val 1", "tags": ["start_event"]}',
     '{"message": "def", "ident": "id1", "@timestamp": "2020-08-18T19:53:36.000Z", "other_field": "other_val 2", "tags": ["end_event"]}',
     '{"message": "ghi", "ident": "id2", "@timestamp": "2020-08-20T19:43:56.000Z", "other_field": "other_val 4", "tags": ["end_event"]}',
     '{"message": "jkl", "ident": "id2", "@timestamp": "2020-08-20T19:43:36.000Z", "other_field": "other_val 3", "tags": ["start_event"]}'
    ]
    count => 1
    codec =>  "json"
  }
}
filter {}
output {

  # Transformed data
  elasticsearch {
    index => "transaction_duration"
    document_id => "%{ident}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "calculate_transaction_duration"
  }

  # Original data
  elasticsearch {
    index => "transaction_original"
  }
}

The above pipeline can be saved into a file called scripted-elapsed.conf and directly executed as follows:

/bin/logstash -f scripted-elapsed.conf --config.reload.automatic

After running the above Logstash pipeline, there will be two indices created in the locally running Elasticsearch. One is the index that contains the original events and is called “transaction_original”, and the other is the transformed index called “transaction_duration” that contains the duration of each transaction.

We can look at the “transaction_duration” index with the following command:

GET transaction_duration/_search

Which will respond with the following two documents which correspond to each transaction:

    "hits" : [
      {
        "_index" : "transaction_duration",
        "_type" : "_doc",
        "_id" : "id2",
        "_score" : 1.0,
        "_source" : {
          "end_time" : "2020-08-20T19:43:56.000Z",
          "start_time" : "2020-08-20T19:43:36.000Z",
          "duration_in_seconds" : 20
        }
      },
      {
        "_index" : "transaction_duration",
        "_type" : "_doc",
        "_id" : "id1",
        "_score" : 1.0,
        "_source" : {
          "end_time" : "2020-08-18T19:53:36.000Z",
          "start_time" : "2020-08-18T19:43:36.000Z",
          "duration_in_seconds" : 600
        }
      }
    ]

We have now verified that the script to calculate event duration is functioning correctly when we call it from Logstash!

Conclusion

In this blog post, I first discussed how a given transaction may result in multiple events being sent into Elasticsearch. I then showed how you can use Logstash to execute scripted upserts which calculate the duration of a given transaction by comparing the timestamps of the related events.