Using Elasticsearch Painless scripting to recursively iterate through JSON fields

Authors

  • Alexander Marquardt
  • Honza Kral

Introduction

Painless is a simple, secure scripting language designed specifically for use with Elasticsearch. It is the default scripting language for Elasticsearch and can safely be used for inline and stored scripts. In one of its many use cases, Painless can modify documents as they are ingested into your Elasticsearch cluster. In this use case, you may find that you would like to use Painless to evaluate every field in each document that is received by Elasticsearch. However, because of the hierarchical nature of JSON documents, how to iterate over all of the fields may be non-obvious.

This blog provides examples that demonstrate how Painless can iterate across all fields in each document that Elasticsearch receives, regardless of wheather fields appear directly in the top-level JSON body, or if they are contained in sub-documents or arrays.

Example one – remove empty fields

The following painless script called “remove_empty_fields” shows how to loop over all elements in a document, and deletes each field where the value is an empty string.

PUT _ingest/pipeline/remove_empty_fields
 {
   "processors": [
     {
       "script": {
         "lang": "painless",
         "source": """

           void iterateAllFields(def x) {
             if (x instanceof List) {
               for (def v: x) {
                 iterateAllFields(v);
               }
             }
             if (!(x instanceof Map)) {
               return;
             }
             x.entrySet().removeIf(e -> e.getValue() == "");
             for (def v: x.values()) {
               iterateAllFields(v);
             }
           }

           iterateAllFields(ctx);
       """
       }
     }
   ]
 }

Notice that we use removeIf in the above code, which will correctly remove fields with an empty string as a value. Using a more naive approach with a for loop to iterate over the fields returned by “x.entrySet()” and then executing remove statement within the for loop to directly delete an element will result in a “ConcurrentModfiicationException”, as you cannot modify the Map as it is being looped over.

We can test the above script with the following call to the simulate pipeline API as follows.

POST _ingest/pipeline/remove_empty_fields/_simulate
 {
   "docs": [
     {
       "_source": {
         "key1": "first value",
         "key2": "some other value",
         "key3": "",
         "sudoc": {
           "a": "abc",
           "b": ""
         }
       }
     },
     {
       "_source": {
         "key1": "",
         "key2": "some other value",
         "list_of_docs": [
           {
             "foo": "abc",
             "bar": ""
           },
           {
             "baz": "",
             "subdoc_in_list": {"child1": "xxx", "child2": ""}
           }
         ]
       }
     }
   ]
 }

Which will return the following results, where each field that contains an empty string has been removed.

{
   "docs" : [
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "key1" : "first value",
           "key2" : "some other value",
           "sudoc" : {
             "a" : "abc"
           }
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T10:59:29.105406Z"
         }
       }
     },
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "list_of_docs" : [
             {
               "foo" : "abc"
             },
             {
               "subdoc_in_list" : {
                 "child1" : "xxx"
               }
             }
           ],
           "key2" : "some other value"
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T10:59:29.105411Z"
         }
       }
     }
   ]
 }

Example two – remove fields where the field name matches a regular expression

The following painless script called “remove_unwanted_keys” shows how you can remove keys with a name that match a regular expression. In this example, we delete any fields where the field name starts with “unwanted_key_”.

Note that by default regexes are disabled. To load this script you will first need to set “script.painless.regex.enabled” to “true” in “elasticsearch.yml”.

PUT _ingest/pipeline/remove_unwanted_keys
 {
   "processors": [
     {
       "script": {
         "lang": "painless",
         "source": """

           void iterateAllFields(def x) {
             if (x instanceof List) {
               for (def v: x) {
                 iterateAllFields(v);
               }
             }
             if (!(x instanceof Map)) {
               return;
             }
             x.entrySet().removeIf(e -> e.getKey() =~ /unwanted_key_.*/);
             for (def v: x.values()) {
               iterateAllFields(v);
             }
           }

           iterateAllFields(ctx);
       """
       }
     }
   ]
 }

We can then test the above script with the following call to the simulate pipeline API as follows.

POST _ingest/pipeline/remove_unwanted_keys/_simulate
 {
   "docs": [
     {
       "_source": {
         "key1": "first value",
         "key2": "some other value",
         "key3": "",
         "unwanted_key_something": "get rid of this",
         "unwanted_key_2": "this too",
         "sudoc": {
           "foo": "abc",
           "bar": ""
         }
       }
     }
   ]
 }

Which will return the following results, where each field name that started with “unwanted_key_” has been removed.

{
   "docs" : [
     {
       "doc" : {
         "_index" : "_index",
         "_type" : "_doc",
         "_id" : "_id",
         "_source" : {
           "key1" : "first value",
           "key2" : "some other value",
           "key3" : "",
           "sudoc" : {
             "bar" : "",
             "foo" : "abc"
           }
         },
         "_ingest" : {
           "timestamp" : "2020-11-06T11:19:56.839119Z"
         }
       }
     }
   ]
 }

Conclusion

In this blog we have presented two examples of how all elements in a JSON document can be iterated over, regardless of if they are included in the top-level JSON, or within sub-documents or arrays.

Understanding and fixing “too many script compilations” errors in Elasticsearch

Introduction

When using Elasticsearch, in some rare instances you may see an error such as “Too many dynamic script compilations within X minutes”. Such an error may be caused by a poor script design where parameters are hard-coded. In other cases this may be due to the script cache being too small or the compilation limit being too low. In this article, I will show how to determine if these default limits are too low, and how these limits can be modified.

Warning

In this blog I will show you how to change default settings used for caching scripts Elasticsearch. Changing these to very large values may impact cluster performance and in the worst case could even cause your cluster to crash.

Script caching

Scripts are cached by default so that they only need to be recompiled when updates occur. However, as these scripts are stored in a cache, if the cache gets filled up, then some of the previously compiled scripts will be removed from the cache and would need to be recompiled again if they are needed in the future. For more information, see the documentation on script caching.

Deprecated script settings (Read this if you are running 7.8 or earlier)

Versions of Elasticsearch 7.8 and earlier will compile up to 15 inline scripts per minute. These compiled scripts are then stored in the script cache which by default can store up to 100 scripts.

The statistics for the script cache can be viewed with the following command:

GET /_nodes/stats?metric=script&filter_path=nodes.*.script.* 

Which should respond with something similar to the following:

{
  "nodes" : {
    "XfXvXJ7xSLynbdZBsFwG3A" : {
      "script" : {
        "compilations" : 28,
        "cache_evictions" : 0,
        "compilation_limit_triggered" : 0
      }
    },
    "pzrnXnehTrKEN0urD7j9eg" : {
      "script" : {
        "compilations" : 407081,
        "cache_evictions" : 406981,
        "compilation_limit_triggered" : 5176579
      }
    }
    ... etc ...

The numbers shown are counted since the last restart of each node. If the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may therefore indicate that the cache is too small.

A high value for compilation_limit_triggered may be a side effect of having a cache that is too small, or possibly poor script design where parameters are hard-coded .

The script cache may be configured by setting script.cache.max_size in the elasticsearch.yml configuration file as follows.

script.cache.max_size: 300

And you can dynamically set script.max_compilations_rate as follows:

PUT _cluster/settings
{
  "persistent": {
    "script.max_compilations_rate": "250/5m"
  }
}

However both of these settings are  now deprecated.

Script settings in Elasticsearch 7.9 and newer

Starting in Elasticsearch 7.9, by default scripts are stored depending on the contexts which they execute in. Contexts allow different defaults to be set for different kinds of scripts that Elasticsearch may execute. There are many contexts available, such as “watcher_transform”, “bucket aggregation”, “aggs_combine”, and many others. For those adventurous enough to look in the source code, instantiation of contexts can be seen with this search on GitHub.

Contexts are enabled by default starting in 7.9. However, if contexts (for some reason) are not currently enabled, they can be enabled with the following command:

PUT _cluster/settings
{
    "persistent": {
        "script.max_compilations_rate": "use-context"
    }
}

If contexts are used, they can be viewed with the following command:

GET /_nodes/stats?filter_path=nodes.*.script_cache.contexts

This should respond with a list of the contexts that are used for executing scripts, such as the following:

    {
        "nodes" : {
          "lqxteGihTpifU5lvV7BEmg" : {
            "script_cache" : {
            "contexts" : [
                {
                    "context" : "aggregation_selector",
                    "compilations" : 1,
                    "cache_evictions" : 0,
                    "compilation_limit_triggered" : 0
                }

                 ... etc ...
        
                {
                   "context" : "xpack_template",
                   "compilations" : 0,
                   "cache_evictions" : 0,
                   "compilation_limit_triggered" : 0
                 }
            
                 .... etc ...

If the response above is empty, then “use-context” may not be enabled, and can be enabled as described above.

As with previous versions of Elasticsearch, if the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may be an indicator that the cache is too small.

For most contexts, you can compile up to 75 scripts per 5 minutes by default. For ingest contexts, the default script compilation rate is unlimited. For most contexts, the default cache size is 100. For ingest contexts, the default cache size is 200. These defaults are given in the 7.9 documentation on how to use scripts.

You can set script.context.$CONTEXT.cache_max_size in the elasticsearch.yml configuration file. For example, to set the max size for the xpack_template context, you would add the following to elasticsearch.yml.

script.context.xpack_template.cache_max_size: 300

On the other hand,script.context.$CONTEXT.max_compilations_rate may be set dynamically. For example you can configure the compilations rate for the xpack_template context as follows:

PUT _cluster/settings
{
    "persistent": {
        "script.context.xpack_template.max_compilations_rate": "150/5m"
    }
}

Conclusion

In this blog, I have shown how you can look deeper into Elasticsearch to try to diagnose the potential cause of script compilation errors, and how to modify default settings if necessary.

Acknowledgement

Thanks to my Elastic colleague Michael Bischoff for providing guidance on how to investigate and fix the “too many script compilations within X minutes” issue.

Converting CSV to JSON in Filebeat

Introduction

Many organisations use excel files for creating and storing important data. For various reasons it may be useful to import such data into Elasticsearch. For example, one may need to get Master Data that is created in a spreadsheet into Elasticsearch where it could be used for enriching Elasticsearch documents. Or one may wish to use Elasticsearch and Kibana for analysing a dataset that is only available in a spreadsheet. In such cases, one option is to use Filebeat for uploading such CSV data into an Elasticsearch cluster.

In this blog I will show how Filebeat can be used to convert CSV data into JSON-formatted data that can be sent into an Elasticsearch cluster. This will be accomplished by using a built-in CSV processor as well as a custom JavaScript processor which will be applied to every line in a CSV file.

Note that Filebeat is intended for sending log lines into Elasticsearch. On the other hand, the technique described in this blog is not intended to run on a CSV file that continually has lines added to it.  The technique and code presented in this article is intended for ingesting an existing CSV file a single time, and it then terminates Filebeat immediately after the file has been ingested.

Motivation

Filebeat supports a CSV processor which extracts values from a CSV string, and stores the result in an array. However, this processor does not create key-value pairs to maintain the relation between the column names and the extracted values. When using the CSV processor, additional processing (and hard-coding of the field names) is generally required in an ingest node or in Logstash to add the correct field names back into the extracted data.

On the other hand, the approach presented in this blog will automatically extract field names from the CSV header, and then generate key-value pairs based on each row’s values combined with the field names that are extracted from the header row. This technique therefore eliminates the need for additional ingest node or Logstash processing that would otherwise be required.

Code

All code presented in this blog is available at: https://github.com/alexander-marquardt/filebeat-csv-to-json

A note on the Filebeat registry

Because Filebeat is designed for sending log lines from files which are actively being written, it keeps track of the most recent log entry that it has sent to Elasticsearch, and ensures that each entry is only sent once. This is tracked in the Filebeat registry. We should be aware the existence of the registry, as the registry will prevent sending the same CSV data to Elasticsearch multiple times, which can be undesirable when testing.

A note on Filebeat processors

Processors are executed on data as it passes through Filebeat. The code presented in this blog makes use of the CSV processor as well as a custom script processor. The custom script processor will apply custom JavaScript code to each event (in our case, to each to CSV line), which converts the CSV values into key-value pairs in a JSON object.

Example CSV input

We will store the following data in a file called test.csv. This file will be used to show how CSV can be converted into JSON. This CSV is intentionally written in an inconsistent format, to ensure that the code is working correctly for different formats:

first_col,col2,col3,fourth_col
1234,"first 1",123,third 1
5678,first 2,456,"third 2"

Filebeat configuration

We use the following filebeat.yml configuration to call the CSV processor as well as our custom JavaScript.

max_procs: 1 # This code will not work correctly on multiple threads
 
filebeat.inputs:
- type: log
  enabled: true
  close_eof: true
  paths:
    - ${PWD}/test.csv


  processors:
  - decode_csv_fields:
      fields:
        message: decoded_csv_arr
      separator: ","
      ignore_missing: false
      overwrite_keys: true
      trim_leading_space: false
      fail_on_error: true

  - script:
      lang: javascript
      id: convert_csv_into_json
      file: ${PWD}/convert_csv_to_json.js

  - drop_fields:
      fields: ["decoded_csv_arr"]

output.elasticsearch:
  hosts: ["localhost:9200"]

  index: "csv_to_json-%{+YYYY.MM.dd}" 

setup.ilm.enabled: false
setup.template.enabled: false

JavaScript processor code

Below we present the JavaScript code that we use for converting lines in a CSV file into JSON objects. This should be stored in a file called convert_csv_to_json.js which is referenced in the filebeat.yml configuration that we presented above.

When the first line of CSV is passed into this JavaScript processor, the code uses a JavaScript closure to “remember” the header values. When subsequent lines from the CSV file are passed in, the headers are combined with the values in each row to create key-value pairs that are stored in a JSON object.

Note that this will only work as a single threaded process since the closure containing the header fields is only available in the thread that processes the header row. This is ensured by setting max_procs: 1 in filebeat.yml.

// This function takes an array containing the field names, and another that
// contains field values, and returns a json dictionary that combines them.
function convert_csv_to_dict(csv_headers_row, csv_values_row) {
  var json_from_csv =  csv_values_row.reduce(function(result, field, index) {
    result[csv_headers_row[index]] = field;
    return result;
  }, {})

  return json_from_csv;
}


// Define the JavaScript function that will be used to combine the 
// header row with subsequent rows in the CSV file
var headers_fn = (function() {
  var csv_headers_row = null; 

  // Use a JavaScript closure to store the header line (csv_headers_row), 
  // so that we can use the header values for all subsequent CSV entries. 
  return function(csv_arr) {

    var json_from_csv = null;

    if (!csv_headers_row) {
      // if this is the first row, store the headers
      csv_headers_row = csv_arr;
    } else {
      // combine the csv_headers_row with the values to get a dict
      json_from_csv = convert_csv_to_dict(csv_headers_row, csv_arr)
    }
    return json_from_csv;
  }

})();  


// This function is called for each "event" 
// (eg. called once for each line in the log file)
function process(event) {
    var csv_arr = event.Get("decoded_csv_arr");
    var json_from_csv = headers_fn(csv_arr);

    // If the current event was triggered to process the header row,
    // then the json_from_csv will be empty - it only returns a json dict
    // for subsequent rows. Cancel the event so that nothing
    // is sent to the output.
    if (!json_from_csv) {
      event.Cancel();
    }
    event.Put("json_from_csv", json_from_csv);
}

Executing the code

The following command line can be used for executing the code which converts the CSV into JSON, and then sends the resulting documents into Elasticsearch.

rm -rf my_reg; ./filebeat  -once -E filebeat.registry.path=${PWD}/my_reg

There are a few things to point out about this command line.

  1. It deletes the registry directory before executing filebeat. This means that the input file will be sent each time that Filebeat is executed. To prevent multiple copies of the same document from appearing in the destination index, the destination index should be deleted before running this code.
  2. It is storing the registry in the local directory, which makes it easier to find and delete it.
  3. It is running with the “-once” option, which makes filebeat exit once the command has completed.

Results

Once the above code has executed, there should be an index written into Elasticsearch that starts with “csv_to_json-“. Looking into this index, we can see that the documents contain the following field, which has been extracted from the CSV file.

"json_from_csv" : {
  "col3" : "123",
  "fourth_col" : "third 1",
  "first_col" : "1234",
  "col2" : "first 1"
}

Conclusion

In this blog, I have shown how filebeat can be used to convert CSV data into JSON objects in the documents that are sent to Elasticsearch. Because the field names in the JSON object are extracted directly from the CSV file, this technique eliminates the need for either ingest nodes or Logstash which would otherwise be required for adding the correct field names to the CSV data.

Using Logstash and Elasticsearch scripted upserts to transform eCommerce purchasing data

Introduction

Logstash is a tool that can be used to collect, process, and forward events to Elasticsearch. In order to demonstrate the power of Logstash when used in conjunction with Elasticsearch’s scripted upserts, I will show you how to create a near-real-time entity-centric index. Once data is transformed into an entity-centric index, many kinds of analysis become possible with simple (cheap) queries rather than more computationally intensive aggregations.

As a note, using the approach demonstrated here would result in documents similar to those generated by Elasticsearch transforms. Nevertheless, the technique that is documented has not been benchmarked against Elasticsearch transforms, as the main goal of this blog is to demonstrate the power and flexibility of Logstash combined with scripted upserts.

Motivation

Scripted upserts are very powerful and flexible as they allow custom Painless code to be executed on each document as it is being upserted. However, in my experience, scripted upserts are not commonly used. I believe this may possibly be due to a lack of end-to-end examples. This blog aims to address this shortcoming by providing one such example.

In this blog I will provide an example that shows how Logstash can execute scripted upserts to transform data. This will achieve functionality similar to what is demonstrated in the tutorial on how to use transforms. The Elasticsearch documentation on transforms explains why one might wish to transform data as follows:

“A lot of Elasticsearch indices are organized as a stream of events: each event is an individual document, for example a single item purchase. Transforms enable you to summarize this data, bringing it into an organized, more analysis-friendly format. For example, you can summarize all the purchases of a single customer in a single document.”

As stated earlier, transforming data into an entity-centric view makes many kinds of analysis possible using simple (cheap) queries rather than more expensive aggregations. For example, if the total value of all purchases for a given customer are transformed so that they are contained in a single document, then it becomes very efficient to query and/or order customers by their total spending. On the contrary, if each purchase were contained in a separate document, then more computationally intensive aggregations would have to be executed to extract the same results.

Using Logstash and scripted upserts to transform the sample eCommerce data

Because the eCommerce transform tutorial is clearly described and because the sample eCommerce data data is readily available, we implement similar functionality in this blog post to demonstrate the power and flexibility of scripted upserts. 

In our example we use Logstash to read data from Elasticsearch, which is done only for demonstration purposes. Normally Logstash would receive data as an externally generated data stream.

Script for upserting the transformed data

Below is a script that computes several of the same metrics as those described in the eCommerce transforms tutorial. We accomplish this by creating a single document for each client that will be updated each time a new purchase is made by that client. This will store a running total of the taxless_total_price.sum and of the total_quantity.sum fields. Additionally, we place a conditional to only update the total_quantity.max field if it has a value that is greater than the previously stored value.

POST _scripts/transform_ecommerce
{
  "script": {
  "lang": "painless",
  "source": """
  
      // If new, then initialize relevant fields.
      if ("create".equals(ctx.op)) {
          ctx._source['total_quantity'] = new HashMap();
          ctx._source['taxless_total_price'] = new HashMap();
          
          ctx._source['total_quantity']['sum'] = 0;
          ctx._source['total_quantity']['max'] = 0;
          ctx._source['taxless_total_price']['sum'] = (double)0;
      }
      
      // compute some of the metrics from the eCommerce 
      // transforms demo
      ctx._source['total_quantity']['sum'] += params.event['total_quantity'];
      ctx._source['taxless_total_price']['sum'] += (double)params.event['taxless_total_price'];
      if (params.event['total_quantity'] > ctx._source['total_quantity']['max']) {
        ctx._source['total_quantity']['max'] = params.event['total_quantity'];
      }
    """
  }
}

Mappings for the transformed index

Set the mapping for the scripted upsert transformed index as follows:

// if it exists, delete it first
DELETE ecommerce_ls_transformed

PUT /ecommerce_ls_transformed
{
  "mappings": {
    "properties": {
      "customer_id": {
        "type": "keyword"
      },
      "taxless_total_price": {
        "properties": {
          "sum": {
            "type": "double"
          }
        }
      },
      "total_quantity": {
        "properties": {
          "max": {
            "type": "integer"
          },
          "sum": {
            "type": "integer"
          }
        }
      }
    }
  }
}

Test the upsert script

We can test the above script before attempting to use it in Logstash. Notice that we are sending the update values to the script in params.event, which is the how Logstash sends scripted upsert data by default. The following update operation can be used to test the previously defined transform_ecommerce script:

POST test_script/_doc/454/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "transform_ecommerce",
    "params": {
      "event": {
        "total_quantity": 1,
        "taxless_total_price": 80.25
      }
    }
  },
  "upsert": {}
}

Set mappings for the copy of the eCommerce index

Copy the mappings from the kibana_sample_data_ecommerce index into the ecommerce_copy index. Setting the mapping on the ecommerce_copy index can be done by pasting the mappings from kibana_sample_data_ecommerce as follows:

// if it exists, remove the old copy
DELETE ecommerce_copy

GET kibana_sample_data_ecommerce/_mappings

PUT ecommerce_copy
{
  "mappings": {
     // paste mappings from kibana_sample_data_ecommerce
 }
}

Define the Logstash pipeline

In the approach documented here, each Logstash event will be driven into two Logstash outputs — one output will drive each event into a “raw” Elasticsearch index (i.e. into an index containing a document corresponding to each Logstash event), and the other output will drive that same event into a transformed index which will be updated based on a subset of the event’s contents.

Below is the Logstash pipeline that will be used for re-ingesting the eCommerce documents, as well as for generating the transformed index. The transformed index will be called ecommerce_ls_transformed and the original documents will be stored in the index called ecommerce_copy.  Store the logstash pipeline below in a file called logstash-transform.conf.

input {
  elasticsearch {
    hosts => "localhost"
    index => "kibana_sample_data_ecommerce"
    query => '{ "query": { "match_all": {} } }'
  }
}

output {

  # Transformed data
  elasticsearch {
    index => "ecommerce_ls_transformed"
    document_id => "%{customer_id}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "transform_ecommerce"
  }

  # Original data
  elasticsearch {
    index => "ecommerce_copy"
  }
}

Run Logstash

Depending on the directory layout, the above pipeline can be executed with a command similar to the following:

./bin/logstash -f ./config/logstash-transform.conf

View the copy of the eCommerce data

After executing the pipeline, the ecommerce_copy index should contain copies of the documents from the kibana_sample_data_ecommerce index. It can be viewed as follows:

GET ecommerce_copy/_search

View the transformed data

The transformed data is in the ecommerce_ls_transformed index, and can be viewed in the same order as the data in the transform tutorial by executing the following query:

GET ecommerce_ls_transformed/_search
{
  "sort": [
    {
      "_id": {
        "order": "asc"
      }
    }
  ]
}

The first document returned from the above query should be the following:

{
   "_index" : "ecommerce_ls_transformed",
   "_type" : "_doc",
   "_id" : "10",
   "_score" : null,
   "_source" : {
   "total_quantity" : {
     "max" : 2,
     "sum" : 118
   },
   "taxless_total_price" : {
     "sum" : 3946.8200000000006
   }
}

Notice that the values in ecommerce_ls_transformed match quite closely with the values computed in the transform tutorial – the total_quanitity values in the first document match perfectly with the tutorial, and the taxless_total_price.sum is very close — 3946.8200000000006 versus 3946.9765625 in the transform tutorial. Presumably this small difference is due to floating-point rounding errors.

Filtering data in Logstash

An astute reader many have noticed that the above approach is sending the full Logstash event to each of the Elasticsearch outputs, even though the ecommerce_ls_transformed index only requires a few fields. The blog about splitting Logstash data demonstrates how to filter the data that is sent to each Elasticsearch index.

Caveats

There are a few caveats that should be considered if the scripted update approach is used:

  • Even if the original event document is ingested, there is a possibility that the associated scripted update fails on the transformed document. This could create an inconsistent view of the data.
  • If there is a stream of data that would trigger many scripted upserts on a single document in a short time period, then this approach may break down due to an overwhelming number of conflicting updates on that document.
  • The Logstash retry policy  and Elasticsearch retry_on_conflict should be understood and set appropriately. More information is available in this documentation about updates and conflicts.
  • If a raw document is deleted, the associated transformed document will not reflect this change.
  • Every update on a document writes a new document to Elasticsearch. Therefore, this approach which indexes the original raw document as well as upserting the transformed document, will effectively double the ingest workload when compared to the ingest workload of only indexing the raw documents.

Conclusions

In this blog we have demonstrated how Logstash can be used in conjunction with scripted upserts. In order to demonstrate this functionality, we explored how Logstash events can be used to create a near-real-time entity-centric view of indexed data. The approach demonstrated here only touches the surface of the capabilities of scripted upserts, and because scripted upserts are based on Painless, custom functionality can be painlessly implemented.