Converting CSV to JSON in Filebeat

Introduction

Many organisations use excel files for creating and storing important data. For various reasons it may be useful to import such data into Elasticsearch. For example, one may need to get Master Data that is created in a spreadsheet into Elasticsearch where it could be used for enriching Elasticsearch documents. Or one may wish to use Elasticsearch and Kibana for analysing a dataset that is only available in a spreadsheet. In such cases, one option is to use Filebeat for uploading such CSV data into an Elasticsearch cluster.

In this blog I will show how Filebeat can be used to convert CSV data into JSON-formatted data that can be sent into an Elasticsearch cluster. This will be accomplished by using a built-in CSV processor as well as a custom JavaScript processor which will be applied to every line in a CSV file.

Note that Filebeat is intended for sending log lines into Elasticsearch. On the other hand, the technique described in this blog is not intended to run on a CSV file that continually has lines added to it.  The technique and code presented in this article is intended for ingesting an existing CSV file a single time, and it then terminates Filebeat immediately after the file has been ingested.

Motivation

Filebeat supports a CSV processor which extracts values from a CSV string, and stores the result in an array. However, this processor does not create key-value pairs to maintain the relation between the column names and the extracted values. When using the CSV processor, additional processing (and hard-coding of the field names) is generally required in an ingest node or in Logstash to add the correct field names back into the extracted data.

On the other hand, the approach presented in this blog will automatically extract field names from the CSV header, and then generate key-value pairs based on each row’s values combined with the field names that are extracted from the header row. This technique therefore eliminates the need for additional ingest node or Logstash processing that would otherwise be required.

Code

All code presented in this blog is available at: https://github.com/alexander-marquardt/filebeat-csv-to-json

A note on the Filebeat registry

Because Filebeat is designed for sending log lines from files which are actively being written, it keeps track of the most recent log entry that it has sent to Elasticsearch, and ensures that each entry is only sent once. This is tracked in the Filebeat registry. We should be aware the existence of the registry, as the registry will prevent sending the same CSV data to Elasticsearch multiple times, which can be undesirable when testing.

A note on Filebeat processors

Processors are executed on data as it passes through Filebeat. The code presented in this blog makes use of the CSV processor as well as a custom script processor. The custom script processor will apply custom JavaScript code to each event (in our case, to each to CSV line), which converts the CSV values into key-value pairs in a JSON object.

Example CSV input

We will store the following data in a file called test.csv. This file will be used to show how CSV can be converted into JSON. This CSV is intentionally written in an inconsistent format, to ensure that the code is working correctly for different formats:

first_col,col2,col3,fourth_col
1234,"first 1",123,third 1
5678,first 2,456,"third 2"

Filebeat configuration

We use the following filebeat.yml configuration to call the CSV processor as well as our custom JavaScript.

max_procs: 1 # This code will not work correctly on multiple threads
 
filebeat.inputs:
- type: log
  enabled: true
  close_eof: true
  paths:
    - ${PWD}/test.csv


  processors:
  - decode_csv_fields:
      fields:
        message: decoded_csv_arr
      separator: ","
      ignore_missing: false
      overwrite_keys: true
      trim_leading_space: false
      fail_on_error: true

  - script:
      lang: javascript
      id: convert_csv_into_json
      file: ${PWD}/convert_csv_to_json.js

  - drop_fields:
      fields: ["decoded_csv_arr"]

output.elasticsearch:
  hosts: ["localhost:9200"]

  index: "csv_to_json-%{+YYYY.MM.dd}" 

setup.ilm.enabled: false
setup.template.enabled: false

JavaScript processor code

Below we present the JavaScript code that we use for converting lines in a CSV file into JSON objects. This should be stored in a file called convert_csv_to_json.js which is referenced in the filebeat.yml configuration that we presented above.

When the first line of CSV is passed into this JavaScript processor, the code uses a JavaScript closure to “remember” the header values. When subsequent lines from the CSV file are passed in, the headers are combined with the values in each row to create key-value pairs that are stored in a JSON object.

Note that this will only work as a single threaded process since the closure containing the header fields is only available in the thread that processes the header row. This is ensured by setting max_procs: 1 in filebeat.yml.

// This function takes an array containing the field names, and another that
// contains field values, and returns a json dictionary that combines them.
function convert_csv_to_dict(csv_headers_row, csv_values_row) {
  var json_from_csv =  csv_values_row.reduce(function(result, field, index) {
    result[csv_headers_row[index]] = field;
    return result;
  }, {})

  return json_from_csv;
}


// Define the JavaScript function that will be used to combine the 
// header row with subsequent rows in the CSV file
var headers_fn = (function() {
  var csv_headers_row = null; 

  // Use a JavaScript closure to store the header line (csv_headers_row), 
  // so that we can use the header values for all subsequent CSV entries. 
  return function(csv_arr) {

    var json_from_csv = null;

    if (!csv_headers_row) {
      // if this is the first row, store the headers
      csv_headers_row = csv_arr;
    } else {
      // combine the csv_headers_row with the values to get a dict
      json_from_csv = convert_csv_to_dict(csv_headers_row, csv_arr)
    }
    return json_from_csv;
  }

})();  


// This function is called for each "event" 
// (eg. called once for each line in the log file)
function process(event) {
    var csv_arr = event.Get("decoded_csv_arr");
    var json_from_csv = headers_fn(csv_arr);

    // If the current event was triggered to process the header row,
    // then the json_from_csv will be empty - it only returns a json dict
    // for subsequent rows. Cancel the event so that nothing
    // is sent to the output.
    if (!json_from_csv) {
      event.Cancel();
    }
    event.Put("json_from_csv", json_from_csv);
}

Executing the code

The following command line can be used for executing the code which converts the CSV into JSON, and then sends the resulting documents into Elasticsearch.

rm -rf my_reg; ./filebeat  -once -E filebeat.registry.path=${PWD}/my_reg

There are a few things to point out about this command line.

  1. It deletes the registry directory before executing filebeat. This means that the input file will be sent each time that Filebeat is executed. To prevent multiple copies of the same document from appearing in the destination index, the destination index should be deleted before running this code.
  2. It is storing the registry in the local directory, which makes it easier to find and delete it.
  3. It is running with the “-once” option, which makes filebeat exit once the command has completed.

Results

Once the above code has executed, there should be an index written into Elasticsearch that starts with “csv_to_json-“. Looking into this index, we can see that the documents contain the following field, which has been extracted from the CSV file.

"json_from_csv" : {
  "col3" : "123",
  "fourth_col" : "third 1",
  "first_col" : "1234",
  "col2" : "first 1"
}

Conclusion

In this blog, I have shown how filebeat can be used to convert CSV data into JSON objects in the documents that are sent to Elasticsearch. Because the field names in the JSON object are extracted directly from the CSV file, this technique eliminates the need for either ingest nodes or Logstash which would otherwise be required for adding the correct field names to the CSV data.

Using Logstash and Elasticsearch scripted upserts to transform eCommerce purchasing data

Introduction

Logstash is a tool that can be used to collect, process, and forward events to Elasticsearch. In order to demonstrate the power of Logstash when used in conjunction with Elasticsearch’s scripted upserts, I will show you how to create a near-real-time entity-centric index. Once data is transformed into an entity-centric index, many kinds of analysis become possible with simple (cheap) queries rather than more computationally intensive aggregations.

As a note, using the approach demonstrated here would result in documents similar to those generated by Elasticsearch transforms. Nevertheless, the technique that is documented has not been benchmarked against Elasticsearch transforms, as the main goal of this blog is to demonstrate the power and flexibility of Logstash combined with scripted upserts.

Motivation

Scripted upserts are very powerful and flexible as they allow custom Painless code to be executed on each document as it is being upserted. However, in my experience, scripted upserts are not commonly used. I believe this may possibly be due to a lack of end-to-end examples. This blog aims to address this shortcoming by providing one such example.

In this blog I will provide an example that shows how Logstash can execute scripted upserts to transform data. This will achieve functionality similar to what is demonstrated in the tutorial on how to use transforms. The Elasticsearch documentation on transforms explains why one might wish to transform data as follows:

“A lot of Elasticsearch indices are organized as a stream of events: each event is an individual document, for example a single item purchase. Transforms enable you to summarize this data, bringing it into an organized, more analysis-friendly format. For example, you can summarize all the purchases of a single customer in a single document.”

As stated earlier, transforming data into an entity-centric view makes many kinds of analysis possible using simple (cheap) queries rather than more expensive aggregations. For example, if the total value of all purchases for a given customer are transformed so that they are contained in a single document, then it becomes very efficient to query and/or order customers by their total spending. On the contrary, if each purchase were contained in a separate document, then more computationally intensive aggregations would have to be executed to extract the same results.

Using Logstash and scripted upserts to transform the sample eCommerce data

Because the eCommerce transform tutorial is clearly described and because the sample eCommerce data data is readily available, we implement similar functionality in this blog post to demonstrate the power and flexibility of scripted upserts. 

In our example we use Logstash to read data from Elasticsearch, which is done only for demonstration purposes. Normally Logstash would receive data as an externally generated data stream.

Script for upserting the transformed data

Below is a script that computes several of the same metrics as those described in the eCommerce transforms tutorial. We accomplish this by creating a single document for each client that will be updated each time a new purchase is made by that client. This will store a running total of the taxless_total_price.sum and of the total_quantity.sum fields. Additionally, we place a conditional to only update the total_quantity.max field if it has a value that is greater than the previously stored value.

POST _scripts/transform_ecommerce
{
  "script": {
  "lang": "painless",
  "source": """
  
      // If new, then initialize relevant fields.
      if ("create".equals(ctx.op)) {
          ctx._source['total_quantity'] = new HashMap();
          ctx._source['taxless_total_price'] = new HashMap();
          
          ctx._source['total_quantity']['sum'] = 0;
          ctx._source['total_quantity']['max'] = 0;
          ctx._source['taxless_total_price']['sum'] = (double)0;
      }
      
      // compute some of the metrics from the eCommerce 
      // transforms demo
      ctx._source['total_quantity']['sum'] += params.event['total_quantity'];
      ctx._source['taxless_total_price']['sum'] += (double)params.event['taxless_total_price'];
      if (params.event['total_quantity'] > ctx._source['total_quantity']['max']) {
        ctx._source['total_quantity']['max'] = params.event['total_quantity'];
      }
    """
  }
}

Mappings for the transformed index

Set the mapping for the scripted upsert transformed index as follows:

// if it exists, delete it first
DELETE ecommerce_ls_transformed

PUT /ecommerce_ls_transformed
{
  "mappings": {
    "properties": {
      "customer_id": {
        "type": "keyword"
      },
      "taxless_total_price": {
        "properties": {
          "sum": {
            "type": "double"
          }
        }
      },
      "total_quantity": {
        "properties": {
          "max": {
            "type": "integer"
          },
          "sum": {
            "type": "integer"
          }
        }
      }
    }
  }
}

Test the upsert script

We can test the above script before attempting to use it in Logstash. Notice that we are sending the update values to the script in params.event, which is the how Logstash sends scripted upsert data by default. The following update operation can be used to test the previously defined transform_ecommerce script:

POST test_script/_doc/454/_update
{
  "scripted_upsert": true,
  "script": {
    "id": "transform_ecommerce",
    "params": {
      "event": {
        "total_quantity": 1,
        "taxless_total_price": 80.25
      }
    }
  },
  "upsert": {}
}

Set mappings for the copy of the eCommerce index

Copy the mappings from the kibana_sample_data_ecommerce index into the ecommerce_copy index. Setting the mapping on the ecommerce_copy index can be done by pasting the mappings from kibana_sample_data_ecommerce as follows:

// if it exists, remove the old copy
DELETE ecommerce_copy

GET kibana_sample_data_ecommerce/_mappings

PUT ecommerce_copy
{
  "mappings": {
     // paste mappings from kibana_sample_data_ecommerce
 }
}

Define the Logstash pipeline

In the approach documented here, each Logstash event will be driven into two Logstash outputs — one output will drive each event into a “raw” Elasticsearch index (i.e. into an index containing a document corresponding to each Logstash event), and the other output will drive that same event into a transformed index which will be updated based on a subset of the event’s contents.

Below is the Logstash pipeline that will be used for re-ingesting the eCommerce documents, as well as for generating the transformed index. The transformed index will be called ecommerce_ls_transformed and the original documents will be stored in the index called ecommerce_copy.  Store the logstash pipeline below in a file called logstash-transform.conf.

input {
  elasticsearch {
    hosts => "localhost"
    index => "kibana_sample_data_ecommerce"
    query => '{ "query": { "match_all": {} } }'
  }
}

output {

  # Transformed data
  elasticsearch {
    index => "ecommerce_ls_transformed"
    document_id => "%{customer_id}"
    action => "update"
    scripted_upsert => true
    script_lang => ""
    script_type => "indexed"
    script => "transform_ecommerce"
  }

  # Original data
  elasticsearch {
    index => "ecommerce_copy"
  }
}

Run Logstash

Depending on the directory layout, the above pipeline can be executed with a command similar to the following:

./bin/logstash -f ./config/logstash-transform.conf

View the copy of the eCommerce data

After executing the pipeline, the ecommerce_copy index should contain copies of the documents from the kibana_sample_data_ecommerce index. It can be viewed as follows:

GET ecommerce_copy/_search

View the transformed data

The transformed data is in the ecommerce_ls_transformed index, and can be viewed in the same order as the data in the transform tutorial by executing the following query:

GET ecommerce_ls_transformed/_search
{
  "sort": [
    {
      "_id": {
        "order": "asc"
      }
    }
  ]
}

The first document returned from the above query should be the following:

{
   "_index" : "ecommerce_ls_transformed",
   "_type" : "_doc",
   "_id" : "10",
   "_score" : null,
   "_source" : {
   "total_quantity" : {
     "max" : 2,
     "sum" : 118
   },
   "taxless_total_price" : {
     "sum" : 3946.8200000000006
   }
}

Notice that the values in ecommerce_ls_transformed match quite closely with the values computed in the transform tutorial – the total_quanitity values in the first document match perfectly with the tutorial, and the taxless_total_price.sum is very close — 3946.8200000000006 versus 3946.9765625 in the transform tutorial. Presumably this small difference is due to floating-point rounding errors.

Filtering data in Logstash

An astute reader many have noticed that the above approach is sending the full Logstash event to each of the Elasticsearch outputs, even though the ecommerce_ls_transformed index only requires a few fields. The blog about splitting Logstash data demonstrates how to filter the data that is sent to each Elasticsearch index.

Caveats

There are a few caveats that should be considered if the scripted update approach is used:

  • Even if the original event document is ingested, there is a possibility that the associated scripted update fails on the transformed document. This could create an inconsistent view of the data.
  • If there is a stream of data that would trigger many scripted upserts on a single document in a short time period, then this approach may break down due to an overwhelming number of conflicting updates on that document.
  • The Logstash retry policy  and Elasticsearch retry_on_conflict should be understood and set appropriately. More information is available in this documentation about updates and conflicts.
  • If a raw document is deleted, the associated transformed document will not reflect this change.
  • Every update on a document writes a new document to Elasticsearch. Therefore, this approach which indexes the original raw document as well as upserting the transformed document, will effectively double the ingest workload when compared to the ingest workload of only indexing the raw documents.

Conclusions

In this blog we have demonstrated how Logstash can be used in conjunction with scripted upserts. In order to demonstrate this functionality, we explored how Logstash events can be used to create a near-real-time entity-centric view of indexed data. The approach demonstrated here only touches the surface of the capabilities of scripted upserts, and because scripted upserts are based on Painless, custom functionality can be painlessly implemented.

Emulating transactional functionality in Elasticsearch with two-phase commits

Introduction

Elasticsearch supports atomic create, update, and delete operations at the individual document level, but does not have built-in support for multi-document transactions. Although Elasticsearch does not position itself as a system of record for storing data, in some cases it may be necessary to modify multiple documents as a single cohesive unit. Therefore, in this blog post we present a two-phase commit protocol which can be used to emulate multi-document transactions.

Overview

Create, update, and delete operations in Elasticsearch are atomic at the document level, which means that creating, modifying, or deleting a single document either fully succeeds or fails. However, there is no guarantee that an operation that has multiple steps and that impacts multiple documents will either succeed or fail as a cohesive unit.

In some cases inner objects and nested types can be used to combine multiple documents into a single atomic unit, which would guarantee atomicity of create, update, and delete operations on the parent and its embedded sub-documents. In other cases it may not be possible or desirable to embed related documents inside a parent document. Therefore multi-document transactional functionality may be desired.

Given the lack of built-in multi-document transactions in Elasticsearch, multi-document transactional functionality that is built on top of Elasticsearch must be implemented in application code. Such functionality can be achieved with the two-phase commit protocol.

What is the two-phase commit protocol

The two-phase commit protocol is a type of atomic commitment protocol that coordinates processes that participate in a distributed atomic transaction. The protocol ultimately determines whether to commit or rollback a transaction. The protocol achieves this goal even in the event of most temporary system failures.

To permit recovery from a failure, the two-phase commit protocol logs the state of a given transaction as the sequential steps to perform the transaction are executed. In the event of a failure at any stage in a transaction these transaction logs will be used by recovery procedures to either complete the transaction or to roll it back.

A high-level overview of a two-phase commit implementation

In this article, we present an example of a two-phase commit transaction that is used for tracking the movement of “units” between two accounts, as described in the following sections:

  1. Create mappings: Define the mappings for an accounts index and a transactions index.
  2. Initialize accounts: Create sample accounts with balances that will be used in subsequent steps to demonstrate how the two-phase commit protocol can be used to ensure completion of the movement of units from one account balance to another.
  3. Define an ingest pipeline for inserting the ingest time: An ingest pipeline is used to ensure that every transaction has a creation time written into it.
  4. Scripts that are used by the two-phase commit: Define painless scripts which are used in the two-phase commit.
  5. Create a transaction: A transaction defines which accounts to move units between and how many units to move. It also logs the status of the transaction, which can be used for recovery or rollback.
  6. Core two-phase commit operations: The steps that will be executed to implement the two-phase commit. By following a specific sequence of operations for multi-document transactions, we maintain sufficient state (i.e. a transaction log) at every step to either fix or rollback transactions that have only partially completed. The core operations are the following:
    1. Update transaction state to “pending”.
    2. Apply the transaction to the source account.
    3. Apply the transaction to the destination account.
    4. Update the transaction state to “committed”.
    5. Remove the transaction identifier from the source account.
    6. Remove the transaction identifier from the destination account.
    7. Update transaction state to “finished”.
  7. Rolling back transactions: In rare cases some transactions may not be able to complete, and should be rolled-back. This section describes how to undo a two-phase commit that previously started.
  8. Recovering from errors: This section describes the operations that are periodically executed to detect transactions that have become stuck in one of the two-phase commit or rollback stages. Stalled operation will then be restarted based on the most recent transaction state.

Create mappings

Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. This example defines an accounts index and a transactions index, for which the mappings are defined below.

Define the mappings for the accounts index

For our example, we define the following fields on the accounts index:

  • balance: the balance of each account. In this article we refer to the value stored in this field as “units”, which could be a currency, inventory, etc.
  • pending_transactions: an array that will contain a list of the _ids of each transaction that is currently “pending” on each account. The list of pending transactions serves two purposes: (1) it is used for ensuring that the same transaction will never be applied twice to the same account, and (2) it maintains state that will allow rollback steps to set this account back to its pre-transaction state in the event that the transaction cannot run to completion.

Mappings should be defined before writing any documents into the accounts index, as follows:

PUT accounts
{
  "mappings": {
    "properties": {
      "balance": {
        "type":  "long"
      }, 
      "pending_transactions": {
        "type": "keyword"
      }
    }
  }
}

Define the mappings for the transactions index

The transactions index will be used for tracking the state of each multi-document transaction, and will have the following fields defined:

  • amount: the value of the transfer.
  • creation_time: when was the transaction created. This is not strictly required, but stored for additional context.
  • modification_time: when was the transaction last modified. This will be used for determining if recovery should be started.
  • src_acct: the _id of the source account for the transfer.
  • dest_acct: the _id of the destination account for the transfer.
  • transaction_state: one of “created”, “pending”, “committed”, “finished”, “terminating”, or “rolled-back”. In a normal transaction without errors or rollbacks, the transaction state will move from “created” to “pending” to “committed”, and then to “finished”.

Mappings should be defined before writing any documents into the transactions index, as follows:

PUT transactions
{
  "mappings": {
    "properties": {
      "amount": {
        "type":  "long"
      },
      "creation_time": {
        "type": "date"
      },
      "modification_time": {
        "type": "date"
      },
      "dest_account": {
        "type": "keyword"
      },
      "src_account": {
        "type": "keyword"
      },
      "transaction_state": {
        "type": "keyword"
      }
    }
  }
}

Initialize accounts

We can initialize documents for accounts A and B, each with a balance of 500 as follows:

PUT accounts/_doc/A
{
  "balance": 500,
  "pending_transactions": []
}

PUT accounts/_doc/B
{
 "balance": 500,
 "pending_transactions": []
}

Define an ingest pipeline for inserting the ingest time

The following pipeline will be used to add “creation_time” and “modification_time” to the transaction documents. The modification_time will be required at later stages in the two-phase commit process for detecting transactions that have failed to complete within a reasonable amount of time. The creation time is stored for additional context.

PUT _ingest/pipeline/initialize_time
{
 "description": "Add ingest timestamp",
 "processors": [
   {
     "set": {
       "field": "_source.creation_time",
       "value": "{{_ingest.timestamp}}"
     }
   },
   {
     "set": {
       "field": "_source.modification_time",
       "value": "{{_ingest.timestamp}}"
     }
   }
 ]
}

Scripts that are used by the two-phase commit

In this section we define several painless scripts which will be executed by update operations that are performed in the two-phase commit steps. Given that updates are atomic, all of the operations inside each of these scripts will either succeed or fail as a unit.

Script to update the transaction state

This script will be used to update the transaction state from the current state to the desired state. The transaction state can be one of “created”, “pending”, “committed”, “finished”, “terminating”, or “rolled-back”.

Note that if the transaction state on the current transaction is not equal to the current state (e.g. if is already set to the desired state), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

If the transaction state is updated, then this script also updates the modification time of the transaction document which will be required for Recovering from errors procedures.

POST _scripts/update_transaction_state
{
  "script": {
    "lang": "painless",
    "source": """
      if (ctx._source.transaction_state == params.current_state) {
        ctx._source.transaction_state = params.desired_state;

        // Set the modification time in ISO 8601 (UTC)
        Date date = new Date();
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'");
        df.setTimeZone(TimeZone.getTimeZone("UTC"));
        ctx._source.modification_time = df.format(date);

      } else {
        ctx.op = "noop"
      }
    """
  }
}

Script to apply a transaction to an account

This script is used to add the current transaction amount to an account, and to push the the transaction identifier onto a list of pending transactions. Given the atomic nature of document updates, both of these operations will succeed or fail as an atomic unit. This script will be used for updating accounts once a transaction has entered into the “pending” state.

In the event that the transaction has already been applied to the account (as determined by the transaction identifier being in the array of pending transactions for this account), then this script will not update the document and the corresponding update operation will return a result of “noop”.

POST _scripts/apply_transaction_to_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored 
      // in pending_transactions. 
      // If the transaction is already on this account then the 
      // location is >= 0. Otherwise it is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);

      if (idx >= 0) {
        // if the transaction already exists on this account, 
        // do nothing 
        ctx.op = "noop";
      } 
      
      else {
        // New transaction - update the account balance 
        // and add the transaction to pending_transactions
        ctx._source.pending_transactions.add(params.trans_id);
        ctx._source.balance += params.amount;
      }
    """
  }
}

Script to remove a transaction from an account

This script is used for removing a transaction from the pending_transactions array on an account. This will be done once a transaction is no longer in the “pending” transaction state, and immediately after it has entered into the “committed” transaction state.

In the event that the current transaction has already been removed from the account (as determined by the transaction identifier not appearing in the array of pending transactions for this account), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

POST _scripts/remove_transaction_from_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored in 
      // pending_transactions. 
      // If it exists, the location is >= 0. Otherwise is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);
      

      if (idx >= 0) {
        // the transaction exists on this account, remove it 
        ctx._source.pending_transactions.remove(idx);
      } 
      
      else {
        // previously already removed, do nothing 
        ctx.op = "noop";
      }
    """
  }
}

Script to undo a transaction on an account

This script will be used in the event that a transaction is being rolled back. It reverses the previous change that was made to the account balance, and removes the transaction from the account’s pending_transactions array. Given the atomic nature of document updates, both of these operations will succeed or fail as an atomic unit.

In the event that the current transaction has already been removed from the account (as determined by the transaction identifier not appearing in the array of pending transactions for this account), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

POST _scripts/undo_transaction_on_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored in pending_transactions. 
      // If it exists, the location is >= 0. Otherwise is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);
      

      if (idx >= 0) {
        // the transaction exists on this account, remove it 
        ctx._source.pending_transactions.remove(idx);
        ctx._source.balance -= params.amount;
      } 
      
      else {
        // previously already removed, do nothing 
        ctx.op = "noop";
      }
  """
  }
}

Create a transaction

We create a new transaction, and once this transaction has been received by elasticsearch the subsequent steps covered in the remainder of this article will ensure that this transaction will eventually run to completion or will alternatively be rolled-back. This is true even if there is an intermediate failure in any of the individual two-phase commit steps.

Note that the transaction is initially created with a transaction state of “created” and will be moved into a “pending” state when the transaction begins processing. By breaking the transaction into a “created” state followed by a “pending” state, the creation of the  transaction can be decoupled from the two-phase commit processing of the transaction. For example, after receiving confirmation that a transaction has been created, an application can spin up a separate thread to start execution of Core step 1 – Update transaction state to “pending”, and the current thread can immediately respond to the end user with confirmation that the transaction has been received and created. If desired, the application can then periodically poll Elasticsearch to get an updated transaction state which can be displayed to the end user.

We request a transfer of 100 from account A to account B by creating a new document in the transactions index. This request should use the _create endpoint with an application-generated transaction identifier (set to “txn1” for this transaction) to ensure idempotency of the transaction creation. This can be done as follows:

PUT transactions/_create/txn1?pipeline=initialize_time
{ 
  "src_acct": "A", 
  "dest_acct": "B", 
  "amount" : 100, 
  "transaction_state": "created"
}

If we do not receive any response or if it fails, then the above request should be re-submitted, as we are not sure if Elasticsearch received and processed the request for the new transaction. However, because an unacknowledged transaction could theoretically have been received and already changed state due to recovery threads picking it up and moving it forward (this will be covered in detail later in this article), it is important that we are hitting the _create endpoint, so multiple submissions of a given transaction will not overwrite the transaction state of a transaction that was already accepted.

If the response from Elasticsearch includes “result”: “created”, then the above request has created the document, and we can now begin the two-phase commit of operations for the transaction.

If the application receives a response that indicates “document already exists”, then the transaction document was previously created. In order to avoid the risk of multiple threads simultaneously working on this transaction, do not being two-phase commit operations, and instead let the recovery procedures handle the processing of this transaction.

Note that in subsequent steps we will refer the newly submitted transaction as “curr_trans”. If the instructions from this blog are being tested in Kibana dev tools, then the values from the above document will need to be manually copied into subsequent commands wherever we refer to “curr_trans”. If these steps are to be executed in code, then this document could be stored in an object.

Core two-phase commit operations

In this section we present a two-phase commit which ensures that all steps required to complete a transaction are executed, and that any failed transactions can be detected and executed to completion or alternatively rolled back. This two-phase commit will prevent partial completion of transactions if there is a failure in any of the sequential two-phase commit steps.

Normal two-phase commit operations will execute in a single thread that will start at Core step 1 – Update transaction state to “pending”, and will end at Core step 7 – Update transaction state to finished. Recovery procedures may start executing at different stages in the two-phase commit, and will also execute sequentially in a single thread for a single transaction.

If any of the core two-phase commit operations fails or does not receive a response, then it should be retried several times before stopping processing of the current thread. If this happens then the failed transaction will be picked up later by recovery procedures.

Core step 1 – Update transaction state to “pending”

This step (Core step 1) can be arrived at immediately after the transaction has been created, however in the event of an error immediately after creating the transaction, this step will be executed after the step called Recovery step1 – transactions that are stuck in the “created” state. The document referred to by “curr_trans” will be determined by whichever step was executed prior to arriving to this current step.

Update the transaction to indicate that processing has started. This is accomplished by updating the transaction state of the “curr_trans” document to “pending” as follows:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "created",
      "desired_state": "pending"
    }
  }
}

Ensure that the the “result” value in the response is “updated”, and continue to the next step.

If the result field in the response is “noop” then processing of this transaction should be stopped. The _source returned by the above command should be inspected to ensure that the transaction is in the “pending” state. If necessary, recovery procedures will eventually detect this stopped transaction and then restart the transaction at Core step 2 – Apply the transaction to the source account.

If the request fails and continues to fail after several retries, the current thread can exit and the transaction can remain in the “created” state to be detected and fixed by recovery procedures.

Core step 2 – Apply the transaction to the source account

This step (Core step 2) will normally execute immediately after Core step 1 – Update transaction state to “pending”. However, in the event of a previous error that has prevented this step from executing, this step may be triggered by the step called Recover transactions that are in the “pending” transaction state.

Update the balance and pending_transactions in the source account in the source account as follows:

// "A" is from <curr_trans._source.src_acct>
POST accounts/_update/A?_source
{
  "script": {
    "id": "apply_transaction_to_account",
    "params": {
      "trans_id": "txn1", // from <curr_trans._id>
      "amount": -100      // i.e. remove <curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, then the returned _source should be inspected to ensure that the transaction has already been applied to the source account. As long as the transaction has been applied to the source account, continue to the next step as we must guarantee that the destination account is also updated before continuing on to change the transaction state to “committed”.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 3 – Apply the transaction to the destination account

This step (Core step 3) will execute immediately after Core step 2. Execute the following code to apply the transaction to the destination account:

// "B" is from <curr_trans._source.dest_acct>
POST accounts/_update/B?_source
{
  "script": {
    "id": "apply_transaction_to_account",
    "params": {
      "trans_id": "txn1", // from <curr_trans._id>
      "amount": 100       // from +<curr_trans._source.amount>
    }
  }
}

The response to the above command should return a result with the value “updated”.

If it returns a result of “noop”, then the returned _source should be inspected to ensure that the transaction has already been applied to the destination account. As long as the transaction has been applied to the destination account, continue with the next step to update the transaction state to “committed”.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 4 – Update the transaction state to “committed”

This step (Core step 4) will only be called immediately after a pending transaction has been applied to both the source account and the destination account. This step is not called directly by any recovery procedures. This ensures that a “committed” transaction state means that both the source and destination balances are guaranteed to have have been updated.

Once a transaction moves into a “committed” transaction state, the order of execution of the two-phase commit steps (or of the recovery procedures) guarantee that no further attempts to modify the source or destination account balances will take place for the current transaction.

Execute the following command to update the transaction state to “committed”:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "pending",
      "desired_state": "committed"
    }
  }
}

Ensure that the the “result” value in the response is equal to “updated”, and continue to the next step.

If the result field in the response is “noop” then processing of this transaction should be stopped. The returned _source should be inspected to ensure that the transaction is in the “committed” state. If necessary, recovery procedures will eventually detect this stopped transaction and will then restart the transaction at Core step 5 – Remove the transaction from the source account.

If the request fails and continues to fail after several retries, the current thread can exit as the transaction can remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 5 – Remove the transaction identifier from the source account

This step (Core step 5) will normally execute immediately after Core step 4 – Update transaction state to “committed”. However, in the event of a previous error that has prevented this step from executing, this will be called after Recover transactions that are in the “committed” transaction state.

The transaction id in the pending transactions array on the source account was previously required to ensure that a given transaction would only be applied once to the source account’s balance. Now that the account balances have been committed, this can now be removed as follows:

// "A" is from <curr_trans._source.src_acct>
POST accounts/_update/A?_source
{
  "script": {
    "id": "remove_transaction_from_account",
    "params": {
      "trans_id": "txn1" // from <curr_trans._id>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, the returned _source should be inspected to ensure that the transaction has been removed from the source account. If so, continue to the next step as we must guarantee that the destination account is also updated before moving to the “finished” state.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “committed” state to be detected and fixed by future recovery procedures.

Core step 6 – Remove the transaction identifier from the destination account

This step (Core step 6) will execute immediately after Core step 5. We can remove the transaction from the destination account as follows:

// "B" is from <curr_trans._source.dest_acct>
POST accounts/_update/B?_source
{
  "script": {
    "id": "remove_transaction_from_account",
    "params": {
      "trans_id": "txn1" // from <curr_trans._id>
    }
  }
}

The response to the above command should return a result with the value “updated”.

If it returns a result of “noop”, the returned _source should be inspected to ensure that the transaction has been removed from the destination account. If so, continue to the next step.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “committed” state to be detected and fixed by recovery procedures.

Core step 7 – Update transaction state to “finished”

This step (Core step 7) will only be called immediately after a committed transaction has been removed from both the source account and the destination account. This step is not called directly by any recovery procedures. This sequence ensures that a “finished” transaction state means that the current transaction has been removed from both the source and destination account’s pending transactions arrays.

Execute the following command to update the transaction state to “finished”:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "committed",
      "desired_state": "finished"
    }
  }
}

The response to the above command should normally return a result with the value “updated”.

If it returns a result of “noop”, then inspect the returned _source to confirm that the transaction state is “finished”.

If the request fails and continues to fail after several retries, the current thread can exit as the transaction can remain in the “committed” state to be detected and fixed by recovery procedures.

Rolling back transactions

In some cases it may be necessary to roll back a transaction, which can be done as described below.

Rollback of “created” transactions

A transaction that is in a “created” state can be directly set to “rolled-back”. This can be accomplished as follows:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "created",
      "desired_state": "rolled-back"
    }
  }
}

If the result field in the response is “updated” then rollback of this transaction has succeeded.

If the result is “noop” then the transaction may have already moved past the “created” state, and therefore an alternate rollback procedure should be followed, depending on the current state of the transaction. The current transaction state can be viewed in the returned _source.

Rollback of “pending” transactions

If a transaction is in the “pending” state, then the transaction can be rolled-back by executing the following steps which will reverse the modifications to the source and destination accounts, as well as change the status of the transaction..

Rollback step 1 – Update the transaction state to “terminating”

Update the transaction state on the current transaction from “pending” to “terminating” as shown below.

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "pending",
      "desired_state": "terminating"
    }
  }
}

The response to the above command should return a result with the value “updated”.

If a result of “noop” is returned in the response, then the rollback of the current transaction should be stopped and the returned _source should be inspected to confirm that the transaction is not in the “pending” state, and that it is already in either the “committed” state or “terminating” state.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “pending” state to be detected and fixed by normal Recovering from errors in the two-phase commit procedures (which, if successful, would move the transaction to a “finished” state, rather than “rolled-back), or alternatively a new rollback can be retried.

Rollback step 2 – Undo the transaction on the source account

This step (Rollback step 2) can be arrived at immediately after the transaction has been set to the “terminating” state, however in the event of an error after updating the state, this step will be executed after the step called Recover transactions that are in the “terminating” transaction state.

If the source account balance has been modified, then it will have an entry for the current transaction’s _id in its pending transactions array. Therefore the amount that was removed from the source account balance must be added back, and the transaction must be removed from the pending transactions array. This can be accomplished with the following code:

// "A" is from <curr_trans._source.src_acct>
POST accounts/_update/A?_source
{
  "script": {
    "id": "undo_transaction_on_account",
    "params": {
      "trans_id": "txn1", // <curr_trans._id>
      "amount": -100      // Undo -<curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, the returned _source should be inspected to confirm that the transaction has already been rolled-back on the source account. In this case, continue to the next step as it is possible that the destination account has not yet been rolled-back.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state to be detected and fixed by recovery procedures.

Rollback step 3 – Undo the transaction from the destination account

If the destination account balance has been modified, then it will have an entry for the current transaction’s _id in its pending transactions array. Therefore the amount that was added to the destination account balance must be removed, and the transaction must be removed from the pending transactions array. This can be accomplished with the following code:

// "B" is from <curr_trans._source.dest_acct>
POST accounts/_update/B?_source
{
  "script": {
    "id": "undo_transaction_on_account",
    "params": {
      "trans_id": "txn1", // <curr_trans._id>
      "amount": 100      //  Undo <curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, the returned _source should be inspected to confirm that the transaction has already been rolled-back on the destination account. In this case, continue to the next step as the transaction state needs to be set to “rolled-back”.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state to be detected and fixed by recovery procedures.

Rollback step 4 – Set the transaction state to “rolled-back”

The rollback operation is completed by updating the transaction state to “rolled-back”.

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "terminating",
      "desired_state": "rolled-back"
    }
  }
}

The response to the above command should return a result with the value “updated”.

If the response result is “noop”, then the returned _source should be inspected to confirm that the transaction state has already been set to “rolled-back”.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state, which will be detected and fixed by recovery procedures.

Rollback of “committed” transactions

If a transaction is in the “committed” state, then it should be allowed to complete (i.e. it should be allowed to move to a transaction state of “finished”). After completion, then another transaction can be executed to reverse the transaction.

If for some reason a transaction in the “committed” state cannot proceed to the “finished” state on its own, even after the steps in Recover transactions that are in the “committed” transaction state have been executed, then manual intervention may be required to understand and fix the root cause of the issue.

Recovering from errors

Recovery operations can be looped over to detect and recover from a failure in any of the stages in the Core two-phase commit operations. Recovery should only be initiated on a transaction that has not completed within a reasonable amount of time since its last update (i.e. since its modification timestamp). Recovery should not be attempted on transactions that may still be executing, or that may have unacknowledged modifications to their transaction state, as this could introduce race conditions.

A simple heuristic to ensure that recovery is not attempted on transactions that are currently executing is the following:

  1. Before recovering a transaction, wait until the time that has passed since its last modification is greater than the maximum processing time of all two-phase commit steps. In other words, wait long enough to ensure that the transaction should have either executed to completion or failed, including retries and timeouts.
  2. Ensure that the looping of the recovery operation does not happen faster than the maximum processing time of all two-phase commit steps. In other words, do not start the next iteration of the recovery loop until after waiting long enough to ensure that previous recovery operations will have either completed or failed, including retries and timeouts.

Unacknowledged modifications (a.k.a. dirty reads) can be avoided by using the modification_time to compute how much time has passed since the last update to the transaction, and not attempting to recover a transaction until several minutes have passed since its most recent update. This will give elastic sufficient time to detect and fix any unacknowledged writes.

In the examples below, we wait 2 minutes from the most recent update to a transaction before we attempt recovery, which should meet the above requirements. We could also consider executing the recovery loop once every minute. Both of these values should be validated based on the timeouts and number of retries for a given implementation.

More sophisticated and precise control over the timing of the launching of recovery procedures could be achieved by maintaining a mapping of transactions and their associated threads. Before launching a recovery procedure, the thread associated with each transaction could be checked to ensure that it is no longer alive. The details of such an implementation will be language dependent, and are beyond the scope of this article.

Get transactions that need to be recovered

Transactions that have not been modified in the past 2 minutes, may be “stuck”, and can be detected with the following query:

GET transactions/_search
{
 "size": 100,
 "query": {
   "bool": {
     "filter": [
       {
         "terms": {
           "transaction_state": [
             "created", 
             "pending",
             "terminating", 
             "committed"
             ]
         }
       },
       {
         "range": {
           "modification_time": {
             "lte": "now-2m"
           }
         }
       }

     ]
   }
 },
 "sort": [
   {
     "modification_time": {
       "order": "asc"
     }
   }
 ]
}

Recover transactions that are stuck in the “created” state

For each “created” transaction returned from the above query, create a new thread to resume executing it starting at Core step 1 – Update transaction state to “pending”.

Recover transactions that are in the “pending” state

For each “pending” transaction returned from the above query, create a new thread to resume executing it starting at Core step 2 – Apply the transaction to the source account.

Recover transactions that are in the “committed” state

For each “committed” transaction that is returned, create a new thread to resume executing it starting at Core step 5 – Remove the transaction from the source account.

Recover transactions that are in the “terminating” state

For each “pending” transaction that is returned, create a new thread to resume executing it starting at Rollback step 2 – Undo the transaction on the source account.

Recover very long running transactions

If any “created”, “pending”, or “committed” transactions are returned that have a modification time that is more than an hour ago, they may need additional investigations and should possibly be rolled back as described in the Rolling back transactions section.

If a “terminating” transaction is unable to move into the “rolled-back” transaction state, then it should be investigated and manual intervention may be required.

Conclusions

In some cases it may be necessary to modify multiple documents as a single cohesive unit. Elasticsearch does not have built-in support for multi-document transactions, therefore in this blog this blog post we have presented a two-phase commit protocol which can be used to emulate multi-document transactions.

Converting local time to ISO 8601 time in Elasticsearch

Introduction

If a timestamp field is sent into Elasticsearch without any timezone information, then it will be assumed to be UTC time (Coordinated Universal Time). However, if the timestamp actually represents a local time, then assuming that it is UTC will likely cause problems when displaying the data in Kibana or other applications. In this blog I describe how to use an ingest pipeline to convert timestamps from a local timezone into universal timestamps that conform to the ISO 8601 date and time format.

Converting timestamps from local into universal time

If the timezone of the originating data is known, then it is possible to use an ingest processor to convert from the local time to ISO 8601 format. Below is an example ingest pipeline that uses the date processor to convert the field called ‘my_time’ from ‘Europe/Madrid’ time into ISO 8601 (UTC + offset) format.

PUT _ingest/pipeline/chage_local_time_to_iso
{
  "processors": [
    {
      "date" : {
        "field" : "my_time",
        "target_field": "my_time", 
        "formats" : ["dd/MM/yyyy HH:mm:ss"],
        "timezone" : "Europe/Madrid"
      }
    }
  ]
}

In order to test the above pipeline, we can execute the following code to simulate inserting a document:

POST _ingest/pipeline/chage_local_time_to_iso/_simulate
{
  "docs": [
    {
      "_source": {
        "my_time": "12/10/2019 21:31:12",
        "other_field": "whatever"
      }
    }
  ]
}

The above will respond with output that looks as follows, which we can see has the correct ISO 8601 offset for ‘Europe/Madrid’ for October 12th when daylight savings is in effect. As expected, the field ‘my_time’ shows an offset of ‘+02:00’.

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "my_time" : "2019-10-12T21:31:12.000+02:00",
          "other_field" : "whatever"
        },
        "_ingest" : {
          "timestamp" : "2019-10-16T19:28:20.999077Z"
        }
      }
    }
  ]
}

We can also verify that the above pipeline is respecting daylight savings (which ends on October 27th in Spain) by submitting a document with a date of October 30th as follows:

POST _ingest/pipeline/chage_local_time_to_iso/_simulate
{
  "docs": [
    {
      "_source": {
        "my_time": "30/10/2019 21:31:12",
        "other_field": "whatever"
      }
    }
  ]
}

Which responds with the following output, that has an offset of ‘+01:00’ as expected since daylight savings is no longer in effect:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "my_time" : "2019-10-30T21:31:12.000+01:00",
          "other_field" : "whatever"
        },
        "_ingest" : {
          "timestamp" : "2019-10-16T19:30:40.401948Z"
        }
      }
    }
  ]
}

Finally, we can insert a “real” document into Elasticsearch as follows:

PUT test_index/_doc/1?pipeline=chage_local_time_to_iso
{
  "my_time": "15/10/2019 01:11:55",
  "other_field": "whatever"
}

And we can retrieve the document as follows:

GET test_index/_doc/1

Which should respond with the following, that has the correct offset of ‘+02:00’ on Oct 15th for the ‘Europe/Madrid’ timezone:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_time" : "2019-10-15T01:11:55.000+02:00",
    "other_field" : "whatever"
  }
}

Conclusion

In this blog we have demonstrated how Elasticsearch ingest processors can be used to ingest data into Elasticsearch with unambiguous ISO 8601 timestamps.