Using Logstash and Elasticsearch scripted upserts to transform eCommerce purchasing data

Introduction

Logstash is a tool that can be used to collect, process, and forward events to Elasticsearch. In order to demonstrate the power of Logstash when used in conjunction with Elasticsearch’s scripted upserts, I will show you how to create a near-real-time entity-centric index. Once data is transformed into an entity-centric index, many kinds of analysis become possible with simple (cheap) queries rather than more computationally intensive aggregations.

As a note, using the approach demonstrated here would result in documents similar to those generated by Elasticsearch transforms. Nevertheless, the technique that is documented has not been benchmarked against Elasticsearch transforms, as the main goal of this blog is to demonstrate the power and flexibility of Logstash combined with scripted upserts.

Motivation

Scripted upserts are very powerful and flexible as they allow custom Painless code to be executed on each document as it is being upserted. However, in my experience, scripted upserts are not commonly used. I believe this may possibly be due to a lack of end-to-end examples. This blog aims to address this shortcoming by providing one such example.

In this blog I will provide an example that shows how Logstash can execute scripted upserts to transform data. This will achieve functionality similar to what is demonstrated in the tutorial on how to use transforms. The Elasticsearch documentation on transforms explains why one might wish to transform data as follows:

“A lot of Elasticsearch indices are organized as a stream of events: each event is an individual document, for example a single item purchase. Transforms enable you to summarize this data, bringing it into an organized, more analysis-friendly format. For example, you can summarize all the purchases of a single customer in a single document.”

As stated earlier, transforming data into an entity-centric view makes many kinds of analysis possible using simple (cheap) queries rather than more expensive aggregations. For example, if the total value of all purchases for a given customer are transformed so that they are contained in a single document, then it becomes very efficient to query and/or order customers by their total spending. On the contrary, if each purchase were contained in a separate document, then more computationally intensive aggregations would have to be executed to extract the same results.

Using Logstash and scripted upserts to transform the sample eCommerce data

Because the eCommerce transform tutorial is clearly described and because the sample eCommerce data data is readily available, we implement similar functionality in this blog post to demonstrate the power and flexibility of scripted upserts. 

In our example we use Logstash to read data from Elasticsearch, which is done only for demonstration purposes. Normally Logstash would receive data as an externally generated data stream.

Script for upserting the transformed data

Below is a script that computes several of the same metrics as those described in the eCommerce transforms tutorial. We accomplish this by creating a single document for each client that will be updated each time a new purchase is made by that client. This will store a running total of the taxless_total_price.sum and of the total_quantity.sum fields. Additionally, we place a conditional to only update the total_quantity.max field if it has a value that is greater than the previously stored value.

POST _scripts/transform_ecommerce
{
  "script": {
  "lang": "painless",
  "source": """
  
      // If new, then initialize relevant fields.
      if ("create".equals(ctx.op)) {
          ctx._source['total_quantity'] = new HashMap();
          ctx._source['taxless_total_price'] = new HashMap();
          
          ctx._source['total_quantity']['sum'] = 0;
          ctx._source['total_quantity']['max'] = 0;
          ctx._source['taxless_total_price']['sum'] = (double)0;
      }
      
      // compute some of the metrics from the eCommerce 
      // transforms demo
      ctx._source['total_quantity']['sum'] += params.event['total_quantity'];
      ctx._source['taxless_total_price']['sum'] += (double)params.event['taxless_total_price'];
      if (params.event['total_quantity'] > ctx._source['total_quantity']['max']) {
        ctx._source['total_quantity']['max'] = params.event['total_quantity'];
      }
    """
  }
}

Mappings for the transformed index

Set the mapping for the scripted upsert transformed index as follows:

// if it exists, delete it first
DELETE ecommerce_ls_transformed

PUT /ecommerce_ls_transformed
{
  "mappings": {
    "properties": {
      "customer_id": {
        "type": "keyword"
      },
      "taxless_total_price": {
        "properties": {
          "sum": {
            "type": "double"
          }
        }
      },
      "total_quantity": {
        "properties": {
          "max": {
            "type": "integer"
          },
          "sum": {
            "type": "integer"
          }
        }
      }
    }
  }
}

Test the upsert script

We can test the above script before attempting to use it in Logstash. Notice that we are sending the update values to the script in params.event, which is the how Logstash sends scripted upsert data by default. The following update operation can be used to test the previously defined transform_ecommerce script:

POST test_script/_doc/454/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "transform_ecommerce",
    "params": {
      "event": {
        "total_quantity": 1,
        "taxless_total_price": 80.25
      }
    }
  },
  "upsert": {}
}

Set mappings for the copy of the eCommerce index

Copy the mappings from the kibana_sample_data_ecommerce index into the ecommerce_copy index. Setting the mapping on the ecommerce_copy index can be done by pasting the mappings from kibana_sample_data_ecommerce as follows:

// if it exists, remove the old copy
DELETE ecommerce_copy

GET kibana_sample_data_ecommerce/_mappings

PUT ecommerce_copy
{
  "mappings": {
     // paste mappings from kibana_sample_data_ecommerce
 }
}

Define the Logstash pipeline

In the approach documented here, each Logstash event will be driven into two Logstash outputs — one output will drive each event into a “raw” Elasticsearch index (i.e. into an index containing a document corresponding to each Logstash event), and the other output will drive that same event into a transformed index which will be updated based on a subset of the event’s contents.

Below is the Logstash pipeline that will be used for re-ingesting the eCommerce documents, as well as for generating the transformed index. The transformed index will be called ecommerce_ls_transformed and the original documents will be stored in the index called ecommerce_copy.  Store the logstash pipeline below in a file called logstash-transform.conf.

input {
  elasticsearch {
    hosts => "localhost"
    index => "kibana_sample_data_ecommerce"
    query => '{ "query": { "match_all": {} } }'
  }
}

output {

  # Transformed data
  elasticsearch {
    index => "ecommerce_ls_transformed"
    document_id => "%{customer_id}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "transform_ecommerce"
  }

  # Original data
  elasticsearch {
    index => "ecommerce_copy"
  }
}

Run Logstash

Depending on the directory layout, the above pipeline can be executed with a command similar to the following:

./bin/logstash -f ./config/logstash-transform.conf

View the copy of the eCommerce data

After executing the pipeline, the ecommerce_copy index should contain copies of the documents from the kibana_sample_data_ecommerce index. It can be viewed as follows:

GET ecommerce_copy/_search

View the transformed data

The transformed data is in the ecommerce_ls_transformed index, and can be viewed in the same order as the data in the transform tutorial by executing the following query:

GET ecommerce_ls_transformed/_search
{
  "sort": [
    {
      "_id": {
        "order": "asc"
      }
    }
  ]
}

The first document returned from the above query should be the following:

{
   "_index" : "ecommerce_ls_transformed",
   "_type" : "_doc",
   "_id" : "10",
   "_score" : null,
   "_source" : {
   "total_quantity" : {
     "max" : 2,
     "sum" : 118
   },
   "taxless_total_price" : {
     "sum" : 3946.8200000000006
   }
}

Notice that the values in ecommerce_ls_transformed match quite closely with the values computed in the transform tutorial – the total_quanitity values in the first document match perfectly with the tutorial, and the taxless_total_price.sum is very close — 3946.8200000000006 versus 3946.9765625 in the transform tutorial. Presumably this small difference is due to floating-point rounding errors.

Filtering data in Logstash

An astute reader many have noticed that the above approach is sending the full Logstash event to each of the Elasticsearch outputs, even though the ecommerce_ls_transformed index only requires a few fields. The blog about splitting Logstash data demonstrates how to filter the data that is sent to each Elasticsearch index.

Caveats

There are a few caveats that should be considered if the scripted update approach is used:

  • Even if the original event document is ingested, there is a possibility that the associated scripted update fails on the transformed document. This could create an inconsistent view of the data.
  • If there is a stream of data that would trigger many scripted upserts on a single document in a short time period, then this approach may break down due to an overwhelming number of conflicting updates on that document.
  • The Logstash retry policy  and Elasticsearch retry_on_conflict should be understood and set appropriately. More information is available in this documentation about updates and conflicts.
  • If a raw document is deleted, the associated transformed document will not reflect this change.
  • Every update on a document writes a new document to Elasticsearch. Therefore, this approach which indexes the original raw document as well as upserting the transformed document, will effectively double the ingest workload when compared to the ingest workload of only indexing the raw documents.

Conclusions

In this blog we have demonstrated how Logstash can be used in conjunction with scripted upserts. In order to demonstrate this functionality, we explored how Logstash events can be used to create a near-real-time entity-centric view of indexed data. The approach demonstrated here only touches the surface of the capabilities of scripted upserts, and because scripted upserts are based on Painless, custom functionality can be painlessly implemented.

Improving the performance of Logstash persistent queues

Introduction

By default, Logstash uses in-memory bounded queues between pipeline stages (inputs → pipeline workers) to buffer events. However, in order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which can be enabled to store the message queue on disk. The queue sits between the input and filter stages as follows:

input → queue → filter + output

According to the following blog post, Logstash persistent queues should have a small impact on overall throughput. While this is likely true for use cases where the pipeline is CPU bound, it is not always the case.

Motivation

In a recent Logstash implementation, enabling Logstash persistent queues caused a slowdown of about 75%, from about 40K events/s down to about 10K events/s. Somewhat surprisingly, based on disk I/O metrics it was clear that the disks were not saturated. Additionally, standard Logstash tuning techniques such as testing different batch sizes and adding more worker threads were unable to remedy this slowdown.

Why persistent queues may impact Logstash performance

Investigations showed that the reason that throughput was limited is because a single Logstash pipeline runs a single-threaded persistent queue, or to put it another way, a single Logstash pipeline only drives data to disk from a single thread. This is true even if that pipeline has multiple inputs, as additional inputs in a single pipeline do not increase Disk I/O threads. Furthermore, because enabling the persistent queue adds synchronous disk I/O (wait time) into the pipeline, it reduces throughput even if none of the resources on the system are maxed-out.

Solution

Given that Logstash throughput was limited by synchronous disk I/O rather than resource constraints, more threads running in parallel were needed to drive the disks harder and to increase the overall throughput. This was accomplished by running multiple identical pipelines in parallel within a single Logstash process, and then load balancing the input data stream across the pipelines. If data is driven into Logstash by filebeat, load balancing can be done by specifying multiple Logstash outputs in filebeat.

Result

After increasing the number of pipelines to 4 and splitting the input data across these 4 pipelines, Logstash performance with persistent queues increased up to about 30K events/s, or only 25% worse than without persistent queues. At this point the disks were saturated, and no further performance improvements were possible.

Feedback

As shown in the comments below, this approach has also helped other Logstash users with substantial performance gains. Did this solution help you? If so, please consider leaving a comment below!

Using Logstash to drive filtered data from a single source into multiple output destinations

Overview

In this blog post we demonstrate how Logstash can be used to accomplish the following tasks:

  1. Create multiple copies of an input stream.
  2. Filter each unique copy of the input stream to only contain desired fields.
  3. Drive the modified copies of the input stream into different output destinations.

Note that in this blog post, we do not make use of pipeline-to-pipeline communication (beta) which could also likely achieve much of the functionality described here.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example logstash pipeline. 

Example Logstash pipeline

Below is a logstash pipeline that should be stored in a file called ‘clones.conf’. This pipeline does the following:

  1. Reads stock market trades as CSV-formatted input from a CSV file. Note that you should modify ‘clones.conf’ to use the correct path to your ‘stocks.csv’ file.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Uses the clone filter plugin to create two copies of each document. The clone filter will add a new ‘type’ field to each copy that corresponds to the names given in the clones array. (Note that the original version of each document will still exist in addition to the copies, but will not have a ‘type’ field added to it).
  5. For each copy:
    1. Adds metadata to each document corresponding to the ‘type’ that was added by the clone function. This allows us to later remove the ‘type’ field, while retaining the information required for routing different copies to different outputs.
    2. Uses the prune filter plugin to remove all fields except those which are whitelisted for the specific output.
  6. Removes the ‘type’ field that the clone function inserted into the documents. This is not strictly necessary, but eliminates the ‘type’ data and prevents it from being written to Elasticsearch.
  7. Writes the resulting documents to different outputs, depending on the value defined in the metadata field that we added in step 5.
input {
  file {
    path => "${HOME}/stocks.csv"
    start_position => "beginning"

    # The following line will ensure re-reading of input 
    # each time logstash executes.
    sincedb_path => "/dev/null"
  }
}

filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 'DAX' => 'float'
    'SMI' => 'float'
    'CAC' => 'float'
    'FTSE' => 'float'}
  }
  date {
    match => ['time', 'UNIX']
  }

  # The following line will create 2 additional 
  # copies of each document (i.e. including the 
  # original, 3 in total). 
  # Each copy will have a "type" field added 
  # corresponding to the name given in the array.
  clone {
    clones => ['copy_only_SMI', 'copy_only_FTSE']
  }

  if [type] == 'copy_only_SMI' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_SMI" } 
    }
    # Remove everything except "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
  } 

  else if [type] == 'copy_only_FTSE' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_FTSE" } 
    }
    prune {
       whitelist_names => [ "FTSE"]
    }
  } 

  # Remove 'type' which was added in the clone
  mutate {
    remove_field => ['type']
  }
}

output {
  stdout { codec =>  "rubydebug" }

  if [@metadata][type] == 'copy_only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'copy_only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system. Note that specifying ‘config.reload.automatic’ is optional, but allows us to automatically reload ‘clones.conf’ without restarting Logstash. Remember that ‘clones.conf’ that is used below is the file that contains the pipeline described in the previous section.

./logstash -f ./clones.conf --config.reload.automatic

Once logstash has read the stocks.csv file, we can check the various outputs that have been written. We have written three indexes called ‘smi_data’, ‘ftse_data’, and ‘stocks_original’.

Check the SMI index

GET /smi_data/_search

Should display documents with the following structure. Notice that only “SMI” data appears in the ‘smi_data’ index.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5
        }
      }

Check the FTSE index

GET /ftse_data/_search

Should display documents with the following structure. Notice that only “FTSE” field appears in documents in the ‘ftse_data’ index.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Check the original documents index

GET /stocks_originals/_search

Should display documents with the following structure. Notice that the entire original version of the documents appears in the ‘stocks_original’ index.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusion

In this blog post, we have demonstrated how to use Logstash to create multiple copies of an input stream, to then modify documents in each stream as required for different outputs, and to then drive the different streams into different outputs.

Using Logstash prune capabilities to whitelist sub-documents

Overview

Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash, and that all other fields are dropped. In this blog post we demonstrate the use of Logstash to whitelist desired fields and desired sub-documents before indexing into Elasticsearch.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example command line given later in this blogpost. 

Example Logstash pipeline

Below is a Logstash pipeline which can be stored in a file called ‘stocks.conf’, that does the following:

  1. Reads stock market trades as CSV-formatted input from stdin.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Moves DAX and CAC fields into a nested structure called “my_nest”.
  5. Whitelists the “my_nest” field (which contains a sub-document) and the “SMI” field so that all other (non-whitelisted) fields will be removed.
  6. Writes the resulting documents to an Elasticsearch index called “stocks_whitelist_test”.
# For this simple example, pipe in data from stdin. 
input {
    stdin {}
}

filter {
    csv {
        columns => ["time","DAX","SMI","CAC","FTSE"]
        separator => ","
        convert => { 'DAX' => 'float'
        'SMI' => 'float'
        'CAC' => 'float'
        'FTSE' => 'float'}
    }
    date {
        match => ['time', 'UNIX']
    }
    mutate {
        # Move DAX and CAC into a sub-document 
        # called 'my_nest'
        rename => {
            "DAX" => "[my_nest][DAX]"
            "CAC" => "[my_nest][CAC]"
        }
    }
     
    # Remove everything except "SMI" and the 
    # "my_nest" sub-document 
    prune {
         whitelist_names => [ "SMI", "my_nest" ]
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_whitelist_test"
    }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system:

cat ./stocks.csv | ./logstash -f ./stocks.conf

You can the check the data that you have stored in Elasticsearch by executing the following comand from Kibana’s dev console:

GET /stocks_whitelist_test/_search

Which should display documents with the following structure:

      {
        "_index": "stocks_whitelist_test",
        "_type": "doc",
        "_id": "KANygWUBsYalOV9yOKsD",
        "_score": 1,
        "_source": {
          "my_nest": {
            "CAC": 1718,
            "DAX": 1606.51
          },
          "SMI": 1678.6
        }
      }

Notice that only “my_nest” and “SMI” have been indexed as indicated by the contents of the document’s “_source”. Also note that the “FTSE” and “time” fields have been removed as they were not in the prune filter’s whitelist.

Conclusion

In this blog post, we have demonstrated how Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash.