Using Logstash and Elasticsearch scripted upserts to transform eCommerce purchasing data

Introduction

Logstash is a tool that can be used to collect, process, and forward events to Elasticsearch. In order to demonstrate the power of Logstash when used in conjunction with Elasticsearch’s scripted upserts, I will show you how to create a near-real-time entity-centric index. Once data is transformed into an entity-centric index, many kinds of analysis become possible with simple (cheap) queries rather than more computationally intensive aggregations.

As a note, using the approach demonstrated here would result in documents similar to those generated by Elasticsearch transforms. Nevertheless, the technique that is documented has not been benchmarked against Elasticsearch transforms, as the main goal of this blog is to demonstrate the power and flexibility of Logstash combined with scripted upserts.

Motivation

Scripted upserts are very powerful and flexible as they allow custom Painless code to be executed on each document as it is being upserted. However, in my experience, scripted upserts are not commonly used. I believe this may possibly be due to a lack of end-to-end examples. This blog aims to address this shortcoming by providing one such example.

In this blog I will provide an example that shows how Logstash can execute scripted upserts to transform data. This will achieve functionality similar to what is demonstrated in the tutorial on how to use transforms. The Elasticsearch documentation on transforms explains why one might wish to transform data as follows:

“A lot of Elasticsearch indices are organized as a stream of events: each event is an individual document, for example a single item purchase. Transforms enable you to summarize this data, bringing it into an organized, more analysis-friendly format. For example, you can summarize all the purchases of a single customer in a single document.”

As stated earlier, transforming data into an entity-centric view makes many kinds of analysis possible using simple (cheap) queries rather than more expensive aggregations. For example, if the total value of all purchases for a given customer are transformed so that they are contained in a single document, then it becomes very efficient to query and/or order customers by their total spending. On the contrary, if each purchase were contained in a separate document, then more computationally intensive aggregations would have to be executed to extract the same results.

Using Logstash and scripted upserts to transform the sample eCommerce data

Because the eCommerce transform tutorial is clearly described and because the sample eCommerce data data is readily available, we implement similar functionality in this blog post to demonstrate the power and flexibility of scripted upserts. 

In our example we use Logstash to read data from Elasticsearch, which is done only for demonstration purposes. Normally Logstash would receive data as an externally generated data stream.

Script for upserting the transformed data

Below is a script that computes several of the same metrics as those described in the eCommerce transforms tutorial. We accomplish this by creating a single document for each client that will be updated each time a new purchase is made by that client. This will store a running total of the taxless_total_price.sum and of the total_quantity.sum fields. Additionally, we place a conditional to only update the total_quantity.max field if it has a value that is greater than the previously stored value.

POST _scripts/transform_ecommerce
{
  "script": {
  "lang": "painless",
  "source": """
  
      // If new, then initialize relevant fields.
      if ("create".equals(ctx.op)) {
          ctx._source['total_quantity'] = new HashMap();
          ctx._source['taxless_total_price'] = new HashMap();
          
          ctx._source['total_quantity']['sum'] = 0;
          ctx._source['total_quantity']['max'] = 0;
          ctx._source['taxless_total_price']['sum'] = (double)0;
      }
      
      // compute some of the metrics from the eCommerce 
      // transforms demo
      ctx._source['total_quantity']['sum'] += params.event['total_quantity'];
      ctx._source['taxless_total_price']['sum'] += (double)params.event['taxless_total_price'];
      if (params.event['total_quantity'] > ctx._source['total_quantity']['max']) {
        ctx._source['total_quantity']['max'] = params.event['total_quantity'];
      }
    """
  }
}

Mappings for the transformed index

Set the mapping for the scripted upsert transformed index as follows:

// if it exists, delete it first
DELETE ecommerce_ls_transformed

PUT /ecommerce_ls_transformed
{
  "mappings": {
    "properties": {
      "customer_id": {
        "type": "keyword"
      },
      "taxless_total_price": {
        "properties": {
          "sum": {
            "type": "double"
          }
        }
      },
      "total_quantity": {
        "properties": {
          "max": {
            "type": "integer"
          },
          "sum": {
            "type": "integer"
          }
        }
      }
    }
  }
}

Test the upsert script

We can test the above script before attempting to use it in Logstash. Notice that we are sending the update values to the script in params.event, which is the how Logstash sends scripted upsert data by default. The following update operation can be used to test the previously defined transform_ecommerce script:

POST test_script/_doc/454/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "transform_ecommerce",
    "params": {
      "event": {
        "total_quantity": 1,
        "taxless_total_price": 80.25
      }
    }
  },
  "upsert": {}
}

Set mappings for the copy of the eCommerce index

Copy the mappings from the kibana_sample_data_ecommerce index into the ecommerce_copy index. Setting the mapping on the ecommerce_copy index can be done by pasting the mappings from kibana_sample_data_ecommerce as follows:

// if it exists, remove the old copy
DELETE ecommerce_copy

GET kibana_sample_data_ecommerce/_mappings

PUT ecommerce_copy
{
  "mappings": {
     // paste mappings from kibana_sample_data_ecommerce
 }
}

Define the Logstash pipeline

In the approach documented here, each Logstash event will be driven into two Logstash outputs — one output will drive each event into a “raw” Elasticsearch index (i.e. into an index containing a document corresponding to each Logstash event), and the other output will drive that same event into a transformed index which will be updated based on a subset of the event’s contents.

Below is the Logstash pipeline that will be used for re-ingesting the eCommerce documents, as well as for generating the transformed index. The transformed index will be called ecommerce_ls_transformed and the original documents will be stored in the index called ecommerce_copy.  Store the logstash pipeline below in a file called logstash-transform.conf.

input {
  elasticsearch {
    hosts => "localhost"
    index => "kibana_sample_data_ecommerce"
    query => '{ "query": { "match_all": {} } }'
  }
}

output {

  # Transformed data
  elasticsearch {
    index => "ecommerce_ls_transformed"
    document_id => "%{customer_id}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "transform_ecommerce"
  }

  # Original data
  elasticsearch {
    index => "ecommerce_copy"
  }
}

Run Logstash

Depending on the directory layout, the above pipeline can be executed with a command similar to the following:

./bin/logstash -f ./config/logstash-transform.conf

View the copy of the eCommerce data

After executing the pipeline, the ecommerce_copy index should contain copies of the documents from the kibana_sample_data_ecommerce index. It can be viewed as follows:

GET ecommerce_copy/_search

View the transformed data

The transformed data is in the ecommerce_ls_transformed index, and can be viewed in the same order as the data in the transform tutorial by executing the following query:

GET ecommerce_ls_transformed/_search
{
  "sort": [
    {
      "_id": {
        "order": "asc"
      }
    }
  ]
}

The first document returned from the above query should be the following:

{
   "_index" : "ecommerce_ls_transformed",
   "_type" : "_doc",
   "_id" : "10",
   "_score" : null,
   "_source" : {
   "total_quantity" : {
     "max" : 2,
     "sum" : 118
   },
   "taxless_total_price" : {
     "sum" : 3946.8200000000006
   }
}

Notice that the values in ecommerce_ls_transformed match quite closely with the values computed in the transform tutorial – the total_quanitity values in the first document match perfectly with the tutorial, and the taxless_total_price.sum is very close — 3946.8200000000006 versus 3946.9765625 in the transform tutorial. Presumably this small difference is due to floating-point rounding errors.

Filtering data in Logstash

An astute reader many have noticed that the above approach is sending the full Logstash event to each of the Elasticsearch outputs, even though the ecommerce_ls_transformed index only requires a few fields. The blog about splitting Logstash data demonstrates how to filter the data that is sent to each Elasticsearch index.

Caveats

There are a few caveats that should be considered if the scripted update approach is used:

  • Even if the original event document is ingested, there is a possibility that the associated scripted update fails on the transformed document. This could create an inconsistent view of the data.
  • If there is a stream of data that would trigger many scripted upserts on a single document in a short time period, then this approach may break down due to an overwhelming number of conflicting updates on that document.
  • The Logstash retry policy  and Elasticsearch retry_on_conflict should be understood and set appropriately. More information is available in this documentation about updates and conflicts.
  • If a raw document is deleted, the associated transformed document will not reflect this change.
  • Every update on a document writes a new document to Elasticsearch. Therefore, this approach which indexes the original raw document as well as upserting the transformed document, will effectively double the ingest workload when compared to the ingest workload of only indexing the raw documents.

Conclusions

In this blog we have demonstrated how Logstash can be used in conjunction with scripted upserts. In order to demonstrate this functionality, we explored how Logstash events can be used to create a near-real-time entity-centric view of indexed data. The approach demonstrated here only touches the surface of the capabilities of scripted upserts, and because scripted upserts are based on Painless, custom functionality can be painlessly implemented.