Elasticsearch – too many script compilations

Introduction

When using Elasticsearch, in some rare instances you may see an error such as “Too many dynamic script compilations within X minutes”. Such an error may be caused by a poor script design where parameters are hard-coded. In other cases this may be due to the script cache being too small or the compilation limit being too low. In this article, I will show how to determine if these default limits are too low, and how these limits can be modified.

Script caching

Scripts are cached by default so that they only need to be recompiled when updates occur. For more information, see the documentation on script caching.

Depreciated script settings (can use prior to 8.0)

In older versions of Elasticsearch, by default it will compile up to 75 scripts per 5 minutes. These compiled scripts are then stored in the script cache. The statistics for the script cache can be viewed with the following command:

GET /_nodes/stats?metric=script&filter_path=nodes.*.script.* 

Which should respond with something similar to the following:

{
  "nodes" : {
    "XfXvXJ7xSLynbdZBsFwG3A" : {
      "script" : {
        "compilations" : 28,
        "cache_evictions" : 0,
        "compilation_limit_triggered" : 0
      }
    },
    "pzrnXnehTrKEN0urD7j9eg" : {
      "script" : {
        "compilations" : 407081,
        "cache_evictions" : 406981,
        "compilation_limit_triggered" : 5176579
      }
    }
    ... etc ...


The numbers shown are counted since the last restart of each node. If the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may therefore indicate that the cache is too small.

A high value for compilation_limit_triggered may be a side effect of having a cache that is too small, or possibly poor script design where parameters are hard-coded .

In versions of Elasticsearch prior to 8.0, the script cache may be configured by setting script.cache.max_size in the elasticsearch.yml configuration file as follows.

script.cache.max_size: 300

And you can dynamically set script.max_compilations_rate as follows:

PUT _cluster/settings
{
  "persistent": {
    "script.max_compilations_rate": "250/5m"
  }
}

However both of these settings are  now depreciated.

Script settings in 8.0 and newer

Starting in Elasticsearch 7.9, scripts may be stored in different caches for different “contexts”. This can be enabled with the following command, which is the default setting starting in 7.9:

PUT _cluster/settings
{
    "persistent": {
        "script.max_compilations_rate": "use-context"
    }
}

If contexts are used, they can be viewed with the following command:

GET /_nodes/stats?filter_path=nodes.*.script_cache.contexts

This should respond with a list of the contexts that are currently being used for executing scripts, such as the following:

    {
        "nodes" : {
          "lqxteGihTpifU5lvV7BEmg" : {
            "script_cache" : {
            "contexts" : [
                {
                    "context" : "aggregation_selector",
                    "compilations" : 1,
                    "cache_evictions" : 0,
                    "compilation_limit_triggered" : 0
                }

                 ... etc ...
        
                {
                   "context" : "xpack_template",
                   "compilations" : 0,
                   "cache_evictions" : 0,
                   "compilation_limit_triggered" : 0
                 }
            
                 .... etc ...

If the response above is empty, then you may not have yet enabled “use-context” as described above.

As with previous versions of Elasticsearch, if the compilations and cache_evictions have large numbers or are constantly increasing, this may indicate that the cache is churning, and may be an indicator that the cache is too small.

In newer versions of Elasticsearch, script cache size and compilation limits are set per $CONTEXT , where $CONTEXT is determined by where/how the script is being executed.

You can set script.context.$CONTEXT.cache_max_size in your elasticsearch.yml configuration file. For example, to set the max size for the xpack_template context, you would add the following to elasticsearch.yml. Note that in Elasticsearch 7.9, this setting may only be used if you have enabled “use-context” as instructed above.

script.context.xpack_template.cache_max_size: 300

On the other hand,script.context.$CONTEXT.max_compilations_rate may be set dynamically. For example you can configure the compilations rate for the xpack_template context as follows:

PUT _cluster/settings
{
    "persistent": {
        "script.context.xpack_template.max_compilations_rate": "150/5m"
    }
}

Conclusion

In this blog, I have shown how you can look deeper into Elasticsearch to try to diagnose the potential cause of script compilation errors, and how to modify default settings if necessary.

Using Logstash and Elasticsearch to calculate transaction duration in a microservices architecture

September 16, 2020

Introduction

Elasticsearch  allows you to unify your observability data in a powerful datastore so you can search and apply interactive analytics in real time to a huge number of use cases.

In one such use case, you may be using Elasticsearch to monitor a system that is composed of multiple microservices that process a given transaction. For such a system, you may be collecting an event corresponding to when the first microservice in the system starts processing the transaction, and another event corresponding to when the last microservice in the system finishes processing the transaction. In such an approach, each event should include a field with the transaction identifier, which will allow multiple events corresponding to a single transaction to be combined for analysis.

In this blog I discuss how Elasticsearch in combination with Logstash may be used to ingest multiple events that correspond to a given transaction as it is processed by multiple microservices, and how to calculate the time difference between these different events, which I will refer to as the “transaction duration”.

The approach discussed here will work even if the events corresponding to a given transaction arrive to Logstash out-of-order, and it could be easily extended to compute delays between any microservices that process a given transaction.

A note about event ordering

If the events corresponding to a given transaction are guaranteed to arrive in order, then it may be possible to use Logstash’s Elapsed filter plugin.

Alternatively, the approach described in this article should work regardless of the order which events arrive in.

Using scripted upserts to transform data

In a previous blog post, I described how to use Logstash and Elasticsearch scripted upserts to transform data. The approach in this blog is very similar, but has the explicit goal of calculating the duration between the “start” and “end” events for a given transaction.

The approach described in this blog will ultimately result in two indices being written into Elasticsearch. One index will contain original documents corresponding to each monitoring event, and another index will contain transformed documents which will track the transaction duration.

For the purposes of this blog, we expect events to contain a “@timestamp” field, a “tags” array that contains a value of “start_event” or “end_event” somewhere in the array, and a transaction identifier which we have stored in a field called “ident”. For example, a document could look as follows:

{
  "message": "abc",
  "ident": "id1",
  "@timestamp": "2020-08-18T19:43:36.000Z",
  "other_field": "other_val 1",
  "tags": [
    "start_event"
  ]
}

As we will ultimately be using Logstash to call Elasticsearch scripted upserts to compute the duration of each transaction, it is worth highlighting that Logstash sends the source of each document into the scripted upsert as params.event rather than in the standard ctx._source that we normally expect.

The following script will calculate the time difference between the “start_time” and the “end_time” even if the end event arrives before the start event.

POST _scripts/calculate_transaction_duration
{
  "script": {
  "lang": "painless",
  "source": """
  

        def position_of_start_event_in_tags = params.event['tags'].indexOf('start_event');

        // if this is a "start event" then store the timestamp in the start_time field
        if (position_of_start_event_in_tags >= 0) {
          ctx._source['start_time'] = params.event['@timestamp']
        }
      
        def position_of_end_event_in_tags = params.event['tags'].indexOf('end_event');

        // if this is a "end event" then store the timestamp in the end_time field
        if (position_of_end_event_in_tags >= 0) {
          ctx._source['end_time'] = params.event['@timestamp']
        }
        
        // if both start and end times exist, calculate the difference 
        if (ctx._source.containsKey('start_time') && ctx._source.containsKey('end_time')) {
          ctx._source['duration_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx._source['start_time']), ZonedDateTime.parse(ctx._source['end_time']))/1000;
        }
        // OPTIONAL COPY (PROBABLY NOT NEEDED)  - copy remaining fields into the _source
        //for (x in params.event.entrySet()) {
        //  ctx._source[x.getKey()] = x.getValue();
        //}

    """
  }
}

We can then test the above script directly from Dev tools by running both of the following commands (in any order) which will update the document with an _id of “id1” in the test_script index:

POST test_script/_doc/id1/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "calculate_transaction_duration",
    "params": {
      "event": {
        "message": "abc", "ident": "id1", "@timestamp": "2020-08-18T19:43:36.000Z", "other_field": "other_val 1", "tags": ["start_event"]
      }
    }
  },
  "upsert": {}
}


POST test_script/_doc/id1/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "calculate_transaction_duration",
    "params": {
      "event": {
        "message": "def", "ident": "id1", "@timestamp": "2020-08-18T19:53:36.000Z", "other_field": "other_val 2", "tags": ["end_event"]
      }
    }
  },
  "upsert": {}
}

After running the above code, we can view the document that contains the transaction duration as follows:

GET test_script/_doc/id1

Which will respond with the following:

 {
  "_index" : "test_script",
  "_type" : "_doc",
  "_id" : "id1",
  "_version" : 2,
  "_seq_no" : 4,
  "_primary_term" : 3,
  "found" : true,
  "_source" : {
    "start_time" : "2020-08-18T19:43:36.000Z",
    "end_time" : "2020-08-18T19:53:36.000Z",
    "duration_in_seconds" : 600
  }
}

We now have scripted upserts working and tested within Elasticsearch. Now let’s get this working from Logstash.

The following Logstash pipeline will send two transactions each with two events into Elasticsearch. Notice that the last two events corresponding to the transaction “id2” are out-of-order. This is no issue, as the script that we demonstrated above will handle this correctly.

input {
  # The generator creates input events.
  # Notice how the events associated with id2 are "out of order"
  generator {
    lines => [
     '{"message": "abc", "ident": "id1", "@timestamp": "2020-08-18T19:43:36.000Z", "other_field": "other_val 1", "tags": ["start_event"]}',
     '{"message": "def", "ident": "id1", "@timestamp": "2020-08-18T19:53:36.000Z", "other_field": "other_val 2", "tags": ["end_event"]}',
     '{"message": "ghi", "ident": "id2", "@timestamp": "2020-08-20T19:43:56.000Z", "other_field": "other_val 4", "tags": ["end_event"]}',
     '{"message": "jkl", "ident": "id2", "@timestamp": "2020-08-20T19:43:36.000Z", "other_field": "other_val 3", "tags": ["start_event"]}'
    ]
    count => 1
    codec =>  "json"
  }
}
filter {}
output {

  # Transformed data
  elasticsearch {
    index => "transaction_duration"
    document_id => "%{ident}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "calculate_transaction_duration"
  }

  # Original data
  elasticsearch {
    index => "transaction_original"
  }
}

The above pipeline can be saved into a file called scripted-elapsed.conf and directly executed as follows:

/bin/logstash -f scripted-elapsed.conf --config.reload.automatic

After running the above Logstash pipeline, there will be two indices created in the locally running Elasticsearch. One is the index that contains the original events and is called “transaction_original”, and the other is the transformed index called “transaction_duration” that contains the duration of each transaction.

We can look at the “transaction_duration” index with the following command:

GET transaction_duration/_search

Which will respond with the following two documents which correspond to each transaction:

    "hits" : [
      {
        "_index" : "transaction_duration",
        "_type" : "_doc",
        "_id" : "id2",
        "_score" : 1.0,
        "_source" : {
          "end_time" : "2020-08-20T19:43:56.000Z",
          "start_time" : "2020-08-20T19:43:36.000Z",
          "duration_in_seconds" : 20
        }
      },
      {
        "_index" : "transaction_duration",
        "_type" : "_doc",
        "_id" : "id1",
        "_score" : 1.0,
        "_source" : {
          "end_time" : "2020-08-18T19:53:36.000Z",
          "start_time" : "2020-08-18T19:43:36.000Z",
          "duration_in_seconds" : 600
        }
      }
    ]

We have now verified that the script to calculate event duration is functioning correctly when we call it from Logstash!

Conclusion

In this blog post, I first discussed how a given transaction may result in multiple events being sent into Elasticsearch. I then showed how you can use Logstash to execute scripted upserts which calculate the duration of a given transaction by comparing the timestamps of the related events.

Storing ingest time and calculating ingest lag in Elasticsearch

Introduction

When viewing and analysing data with Elasticsearch, it is not uncommon to see visualisations and monitoring and alerting solutions that make use of timestamps that have been generated on remote/monitored systems. However, relying on remote-generated timestamps may be risky.

If there is a delay between the occurrence of a remote event and the event arriving to Elasticsearch, or if a malicious user changes the time on a remote system, then important events could fly under the radar. Therefore, when ingesting documents into Elasticsearch it is often helpful to store the ingest time of each document, as well as to monitor how long it takes for each event to arrive to the Elasticsearch cluster. A larger than normal lag may indicate that there is a problem with the ingest process or a problem with the time setting on a remote system.

In this blog I will show how to use an ingest node with the Set Processor to add an ingest timestamp to documents when they arrive at an Elasticsearch cluster. This timestamp can be used in visualisations, monitoring, and alerting.

Additionally, I will show how to use the Script Processor to calculate the ingest lag. This lag is the difference between the timestamp of when an event has occurred on a remote/monitored system versus the time that the corresponding document arrives at an Elasticsearch cluster. This can be used for ensuring that the ingest process is not causing excessive delay, and for verifying that remote timestamps are set correctly.

Adding an ingest timestamp and calculating ingest lag

Below I give an example of an ingest pipeline that adds an ingest timestamp called “ingest_time”. It also calculates the lag between the remote event timestamp and the time that the event arrives at Elasticsearch and stores this in a field called “lag_in_seconds”.

The “ingest_time” field serves two purposes: (1) it can and likely should be used as the time field in Kibana visualisations and for monitoring and alerting as discussed later in this blog, and (2) it is used in the lag calculation.

Note that we assume that each document arrives to the Elasticsearch cluster with a field called “event_timestamp” that corresponds to when each event occurred on the remote/monitored system. The name of your event timestamp field will likely be different for your data and should be modified accordingly.

We write our pipeline to Elasticsearch as follows:

PUT _ingest/pipeline/calculate_lag
{
  "description": "Add an ingest timestamp and calculate ingest lag",
  "processors": [
    {
      "set": {
        "field": "_source.ingest_time",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
            if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp")) {
              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
            }
        """
      }
    }
  ]
}

And we can then test the pipeline with the simulate API as follows:

POST _ingest/pipeline/calculate_lag/_simulate
{
  "docs": [
    {
      "_source": {
        "event_timestamp": "2019-11-07T20:39:00.000Z"
      }
    }
  ]
}

Which should respond with something similar to the following, which includes the “lag_in_seconds” and the “ingest_time” fields:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "lag_in_seconds" : 17950223,
          "ingest_time" : "2020-06-02T14:49:23.236Z",
          "event_timestamp" : "2019-11-07T20:39:00.000Z"
        },
        "_ingest" : {
          "timestamp" : "2020-06-02T14:49:23.236Z"
        }
      }
    }
  ]
}

Finally, we can write a real document to Elasticsearch with the pipeline as follows:

PUT test_index/_doc/1?pipeline=calculate_lag
{
  "event_timestamp": "2019-11-07T20:39:00.000Z",
  "other_field": "whatever"
}

And we can retrieve the document as follows

GET test_index/_doc/1

Which should respond with the following:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "lag_in_seconds" : 17950358,
    "ingest_time" : "2020-06-02T14:51:38.068Z",
    "event_timestamp" : "2019-11-07T20:39:00.000Z",
    "other_field" : "whatever"
  }
}

Specifying the pipeline in index settings

When using an ingest pipeline in a production deployment it may be preferable to apply the pipeline to the index settings, rather than specifying the pipeline in the PUT URL. This can be done by adding index.default_pipeline to the index settings as follows:

PUT test_index/_settings
{
  "index.default_pipeline": "calculate_lag"
}

Now any document that is sent into test_index will pass through the calculate_lag pipeline without the need for ?pipeline=calculate_lag in the URL. We can verify this is working with the following PUT command.

PUT test_index/_doc/2
{
  "event_timestamp": "2019-11-07T20:39:00.000Z",
  "other_field": "This is a new doc"
}

Execute the following command to see the document that we have just ingested:

GET test_index/_doc/2

Which should return an enriched document that looks like the following:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "lag_in_seconds" : 17951461,
    "ingest_time" : "2020-06-02T15:10:01.670Z",
    "event_timestamp" : "2019-11-07T20:39:00.000Z",
    "other_field" : "This is a new doc"
  }
}

How to use ingest lag

Now that ingest lag has been calculated, it can be used to trigger alerts if the lag exceeds a certain threshold or it can be fed into a machine learning job to detect unexpected deviations in the ingest processing time. It could also be displayed in Kibana dashboards.

However, examples of how to use the lag are beyond the scope of this blog.

Using the ingest timestamp

When looking at visualizations in Kibana or watching for anomalies, we often consider events that occurred in the last day or the last week. However, if we depend on the remotely-generated event timestamp as opposed to the ingest timestamp, then any lag on the ingest process may cause those documents to never be viewed or monitored. For example, if an event occurred yesterday, but just arrived at the cluster today, it would not show up in the dashboard for today’s events because its timestamp is from yesterday. Furthermore, it would not have been available when we looked at dashboards yesterday because it was not yet stored in Elasticsearch.

On the other hand, if we visualize data and set up alerts using the ingest timestamp, we are guaranteed that we are actually considering the most recent events to arrive at elasticsearch regardless of when the events occurred. This will ensure that events are not missed if the ingest process gets temporarily backed up.

Another advantage of using the ingest timestamp is related to the fact that the event timestamp could be tampered with. For example, imagine that we are monitoring a remote system and a hacker sets its internal clock to some date in the 1980s. If we depend on the remote event timestamp, it is possible that we will miss all activity that a malicious user is performing on that system — unless we specifically look for events that are stored as having occurred in the 1980s. On the other hand, if we depend on the ingest timestamp, we are guaranteed that when we look at recent events, we will consider all of the events that have recently arrived to the Elasticsearch cluster regardless of the timestamp given to the event by a remote system.

Conclusion

In this blog I have shown how to use an ingest processor to store the ingest time and calculate the lag of the ingest process. Additionally, I outlined the advantages of using the ingest time for monitoring and in visualizations, as well as the risks of using remote event timestamps.

Converting CSV to JSON in Filebeat

Introduction

Many organisations use excel files for creating and storing important data. For various reasons it may be useful to import such data into Elasticsearch. For example, one may need to get Master Data that is created in a spreadsheet into Elasticsearch where it could be used for enriching Elasticsearch documents. Or one may wish to use Elasticsearch and Kibana for analysing a dataset that is only available in a spreadsheet. In such cases, one option is to use Filebeat for uploading such CSV data into an Elasticsearch cluster.

In this blog I will show how Filebeat can be used to convert CSV data into JSON-formatted data that can be sent into an Elasticsearch cluster. This will be accomplished by using a built-in CSV processor as well as a custom JavaScript processor which will be applied to every line in a CSV file.

Note that Filebeat is intended for sending log lines into Elasticsearch. On the other hand, the technique described in this blog is not intended to run on a CSV file that continually has lines added to it.  The technique and code presented in this article is intended for ingesting an existing CSV file a single time, and it then terminates Filebeat immediately after the file has been ingested.

Motivation

Filebeat supports a CSV processor which extracts values from a CSV string, and stores the result in an array. However, this processor does not create key-value pairs to maintain the relation between the column names and the extracted values. When using the CSV processor, additional processing (and hard-coding of the field names) is generally required in an ingest node or in Logstash to add the correct field names back into the extracted data.

On the other hand, the approach presented in this blog will automatically extract field names from the CSV header, and then generate key-value pairs based on each row’s values combined with the field names that are extracted from the header row. This technique therefore eliminates the need for additional ingest node or Logstash processing that would otherwise be required.

Code

All code presented in this blog is available at: https://github.com/alexander-marquardt/filebeat-csv-to-json

A note on the Filebeat registry

Because Filebeat is designed for sending log lines from files which are actively being written, it keeps track of the most recent log entry that it has sent to Elasticsearch, and ensures that each entry is only sent once. This is tracked in the Filebeat registry. We should be aware the existence of the registry, as the registry will prevent sending the same CSV data to Elasticsearch multiple times, which can be undesirable when testing.

A note on Filebeat processors

Processors are executed on data as it passes through Filebeat. The code presented in this blog makes use of the CSV processor as well as a custom script processor. The custom script processor will apply custom JavaScript code to each event (in our case, to each to CSV line), which converts the CSV values into key-value pairs in a JSON object.

Example CSV input

We will store the following data in a file called test.csv. This file will be used to show how CSV can be converted into JSON. This CSV is intentionally written in an inconsistent format, to ensure that the code is working correctly for different formats:

first_col,col2,col3,fourth_col
1234,"first 1",123,third 1
5678,first 2,456,"third 2"

Filebeat configuration

We use the following filebeat.yml configuration to call the CSV processor as well as our custom JavaScript.

max_procs: 1 # This code will not work correctly on multiple threads
 
filebeat.inputs:
- type: log
  enabled: true
  close_eof: true
  paths:
    - ${PWD}/test.csv


  processors:
  - decode_csv_fields:
      fields:
        message: decoded_csv_arr
      separator: ","
      ignore_missing: false
      overwrite_keys: true
      trim_leading_space: false
      fail_on_error: true

  - script:
      lang: javascript
      id: convert_csv_into_json
      file: ${PWD}/convert_csv_to_json.js

  - drop_fields:
      fields: ["decoded_csv_arr"]

output.elasticsearch:
  hosts: ["localhost:9200"]

  index: "csv_to_json-%{+YYYY.MM.dd}" 

setup.ilm.enabled: false
setup.template.enabled: false

JavaScript processor code

Below we present the JavaScript code that we use for converting lines in a CSV file into JSON objects. This should be stored in a file called convert_csv_to_json.js which is referenced in the filebeat.yml configuration that we presented above.

When the first line of CSV is passed into this JavaScript processor, the code uses a JavaScript closure to “remember” the header values. When subsequent lines from the CSV file are passed in, the headers are combined with the values in each row to create key-value pairs that are stored in a JSON object.

Note that this will only work as a single threaded process since the closure containing the header fields is only available in the thread that processes the header row. This is ensured by setting max_procs: 1 in filebeat.yml.

// This function takes an array containing the field names, and another that
// contains field values, and returns a json dictionary that combines them.
function convert_csv_to_dict(csv_headers_row, csv_values_row) {
  var json_from_csv =  csv_values_row.reduce(function(result, field, index) {
    result[csv_headers_row[index]] = field;
    return result;
  }, {})

  return json_from_csv;
}


// Define the JavaScript function that will be used to combine the 
// header row with subsequent rows in the CSV file
var headers_fn = (function() {
  var csv_headers_row = null; 

  // Use a JavaScript closure to store the header line (csv_headers_row), 
  // so that we can use the header values for all subsequent CSV entries. 
  return function(csv_arr) {

    var json_from_csv = null;

    if (!csv_headers_row) {
      // if this is the first row, store the headers
      csv_headers_row = csv_arr;
    } else {
      // combine the csv_headers_row with the values to get a dict
      json_from_csv = convert_csv_to_dict(csv_headers_row, csv_arr)
    }
    return json_from_csv;
  }

})();  


// This function is called for each "event" 
// (eg. called once for each line in the log file)
function process(event) {
    var csv_arr = event.Get("decoded_csv_arr");
    var json_from_csv = headers_fn(csv_arr);

    // If the current event was triggered to process the header row,
    // then the json_from_csv will be empty - it only returns a json dict
    // for subsequent rows. Cancel the event so that nothing
    // is sent to the output.
    if (!json_from_csv) {
      event.Cancel();
    }
    event.Put("json_from_csv", json_from_csv);
}

Executing the code

The following command line can be used for executing the code which converts the CSV into JSON, and then sends the resulting documents into Elasticsearch.

rm -rf my_reg; ./filebeat  -once -E filebeat.registry.path=${PWD}/my_reg

There are a few things to point out about this command line.

  1. It deletes the registry directory before executing filebeat. This means that the input file will be sent each time that Filebeat is executed. To prevent multiple copies of the same document from appearing in the destination index, the destination index should be deleted before running this code.
  2. It is storing the registry in the local directory, which makes it easier to find and delete it.
  3. It is running with the “-once” option, which makes filebeat exit once the command has completed.

Results

Once the above code has executed, there should be an index written into Elasticsearch that starts with “csv_to_json-“. Looking into this index, we can see that the documents contain the following field, which has been extracted from the CSV file.

"json_from_csv" : {
  "col3" : "123",
  "fourth_col" : "third 1",
  "first_col" : "1234",
  "col2" : "first 1"
}

Conclusion

In this blog, I have shown how filebeat can be used to convert CSV data into JSON objects in the documents that are sent to Elasticsearch. Because the field names in the JSON object are extracted directly from the CSV file, this technique eliminates the need for either ingest nodes or Logstash which would otherwise be required for adding the correct field names to the CSV data.