Combining Elasticsearch stemmers and synonyms to improve search relevance

The article called The same, but different: Boosting the power of Elasticsearch with synonyms gives a great introduction to why and how you can incorporate synonyms into your Elasticsearch-powered application. Here I build upon that blog and show how you can combine stemmers and multi-word synonyms to take the quality of your search results to the next level.

Motivation

Imagine that you are using Elasticsearch to power a search application for finding books, and in this application you want to treat the following words as synonyms:

  • brainstorm
  • brainstorming
  • brainstormed
  • brain storm
  • brain storming
  • brain stormed
  • envisage
  • envisaging
  • envisaged
  • etc.

It is tedious and error prone to explicitly use synonyms to define all possible conjugations, declensions, and inflections of a word or of a compound word.

However, it is possible to reduce the size of the list of synonyms by making use of a stemmer to extract the stem of each word before applying synonyms. This would allow us to get the same results as the above synonym list by specifying only the following synonyms:

  • brainstorm
  • brain storm
  • envisage

Custom analyzers

In this section, I show code snippets that define custom analyzers that can be used for matching synonyms. Later on in this blog I show how to submit the analyzers to Elasticsearch.

The blog called The same, but different: Boosting the power of Elasticsearch with synonyms goes into details on the difference between index-time and search-time synonyms. In the solution presented here, I make use of search-time synonyms.

We will create a synonym graph token filter that matches multi-word synonyms and will be called “my_graph_synonyms” as follows:

        "filter": {
          "my_graph_synonyms": {
            "type": "synonym_graph",
            "synonyms": [
              "mind, brain",
              "brain storm, brainstorm, envisage"
            ]
          }
        }

Next we need to define two separate custom analyzers, one that will be applied to text at index-time, and another that will be applied to text at search-time.

We define an analyzer called “my_index_time_analyzer” which uses the standard tokenizer and the lowercase token filter and the stemmer token filter as follows:

      "my_index_time_analyzer": {
        "tokenizer": "standard",
        "filter": [
          "lowercase",
          "stemmer"
        ]
      }

We define an analyzer called “my_search_time_analyzer”, which also makes use of the standard tokenizer and the lowercase token filter and the stemmer token filter (as above). However, this also includes our custom token filter called “my_graph_synonyms”, which ensures that synonyms will be matched at search-time:

      "my_search_time_analyzer": {
        "tokenizer": "standard",
        "filter": [
          "lowercase",
          "stemmer",
          "my_graph_synonyms"
        ]
      }

Mappings

Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. Each document is a collection of fields, which each have their own data type. In this example we define the mapping for a document with a single field called “my_new_text_field”, which we define as “text”. This field will make use of “my_index_time_analyzer” when documents are indexed, and will make use of “my_search_time_analyzer” when documents are searched. The mapping looks as follows:

  "mappings": {
    "properties": {
      "my_new_text_field": {
        "type": "text",
        "analyzer": "my_index_time_analyzer",
        "search_analyzer": "my_search_time_analyzer"
      }
    }
  }

Bringing it together

Below we bring together our custom analyzers and mappings and apply it to an index called “test_index” as follows:

PUT /test_index
{
  "settings": {
    "index": {
      "analysis": {
        "filter": {
          "my_graph_synonyms": {
            "type": "synonym_graph",
            "synonyms": [
              "mind, brain",
              "brain storm, brainstorm, envisage"
            ]
          }
        },
        "analyzer": {
          "my_index_time_analyzer": {
            "tokenizer": "standard",
            "filter": [
              "lowercase",
              "stemmer"
            ]
          },
          "my_search_time_analyzer": {
            "tokenizer": "standard",
            "filter": [
              "lowercase",
              "stemmer",
              "my_graph_synonyms"
            ]
          }
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "my_new_text_field": {
        "type": "text",
        "analyzer": "my_index_time_analyzer",
        "search_analyzer": "my_search_time_analyzer"
      }
    }
  }
}

Testing our custom search-time analyzer

If we wish to see how an analyzer is tokenizing and normalizing a given string, we can directly call the _analyze api as follows:

POST test_index/_analyze
{
  "text" : "Brainstorm",
  "analyzer": "my_search_time_analyzer"
}

Testing on real documents

We can use the _bulk API to drive several documents into Elasticsearch as follows:

POST test_index/_bulk
{ "index" : { "_id" : "1" } }
{"my_new_text_field": "This is a brainstorm" }
{ "index" : { "_id" : "2" } }
{"my_new_text_field": "A different brain storm" }
{ "index" : { "_id" : "3" } }
{"my_new_text_field": "About brainstorming" }
{ "index" : { "_id" : "4" } }
{"my_new_text_field": "I had a storm in my brain" }
{ "index" : { "_id" : "5" } }
{"my_new_text_field": "I envisaged something like that" }

After driving the sample documents into “test_index”, we can execute a search that will correctly respond with document #1, #2, #3 and #5, as follows:

GET test_index/_search
{
  "query": {
    "match": {
      "my_new_text_field": "brain storm"
    }
  }
}

We can execute the following search which correctly returns only documents #2 and #4, as follows:

GET test_index/_search
{
  "query": {
    "match": {
      "my_new_text_field": "brain"
    }
  }
}

We can execute the following search which will correctly respond with document #1, #2, #3 and #5, as follows:

GET test_index/_search
{
  "query": {
    "match": {
      "my_new_text_field": "brainstorming"
    }
  }
}

We can execute the following search which correctly returns documents #2 and #4, as follows:

GET test_index/_search
{
  "query": {
    "match": {
      "my_new_text_field": "mind storm"
    }
  }
}

And finally, we can execute the following search which correctly returns only documents #2 and #4 as follows:

GET test_index/_search
{
  "query": {
    "match": {
      "my_new_text_field": {
        "query": "storm brain"
      }
    }
  }
}

Conclusion

In this blog I demonstrated how you can combine stemmers and multi-word synonyms in Elasticsearch to improve the quality of your search results.

Emulating transactional functionality in Elasticsearch with two-phase commits

Introduction

Elasticsearch supports atomic create, update, and delete operations at the individual document level, but does not have built-in support for multi-document transactions. Although Elasticsearch does not position itself as a system of record for storing data, in some cases it may be necessary to modify multiple documents as a single cohesive unit. Therefore, in this blog post we present a two-phase commit protocol which can be used to emulate multi-document transactions.

Overview

Create, update, and delete operations in Elasticsearch are atomic at the document level, which means that creating, modifying, or deleting a single document either fully succeeds or fails. However, there is no guarantee that an operation that has multiple steps and that impacts multiple documents will either succeed or fail as a cohesive unit.

In some cases inner objects and nested types can be used to combine multiple documents into a single atomic unit, which would guarantee atomicity of create, update, and delete operations on the parent and its embedded sub-documents. In other cases it may not be possible or desirable to embed related documents inside a parent document. Therefore multi-document transactional functionality may be desired.

Given the lack of built-in multi-document transactions in Elasticsearch, multi-document transactional functionality that is built on top of Elasticsearch must be implemented in application code. Such functionality can be achieved with the two-phase commit protocol.

What is the two-phase commit protocol

The two-phase commit protocol is a type of atomic commitment protocol that coordinates processes that participate in a distributed atomic transaction. The protocol ultimately determines whether to commit or rollback a transaction. The protocol achieves this goal even in the event of most temporary system failures.

To permit recovery from a failure, the two-phase commit protocol logs the state of a given transaction as the sequential steps to perform the transaction are executed. In the event of a failure at any stage in a transaction these transaction logs will be used by recovery procedures to either complete the transaction or to roll it back.

A high-level overview of a two-phase commit implementation

In this article, we present an example of a two-phase commit transaction that is used for tracking the movement of “units” between two accounts, as described in the following sections:

  1. Create mappings: Define the mappings for an accounts index and a transactions index.
  2. Initialize accounts: Create sample accounts with balances that will be used in subsequent steps to demonstrate how the two-phase commit protocol can be used to ensure completion of the movement of units from one account balance to another.
  3. Define an ingest pipeline for inserting the ingest time: An ingest pipeline is used to ensure that every transaction has a creation time written into it.
  4. Scripts that are used by the two-phase commit: Define painless scripts which are used in the two-phase commit.
  5. Create a transaction: A transaction defines which accounts to move units between and how many units to move. It also logs the status of the transaction, which can be used for recovery or rollback.
  6. Core two-phase commit operations: The steps that will be executed to implement the two-phase commit. By following a specific sequence of operations for multi-document transactions, we maintain sufficient state (i.e. a transaction log) at every step to either fix or rollback transactions that have only partially completed. The core operations are the following:
    1. Update transaction state to “pending”.
    2. Apply the transaction to the source account.
    3. Apply the transaction to the destination account.
    4. Update the transaction state to “committed”.
    5. Remove the transaction identifier from the source account.
    6. Remove the transaction identifier from the destination account.
    7. Update transaction state to “finished”.
  7. Rolling back transactions: In rare cases some transactions may not be able to complete, and should be rolled-back. This section describes how to undo a two-phase commit that previously started.
  8. Recovering from errors: This section describes the operations that are periodically executed to detect transactions that have become stuck in one of the two-phase commit or rollback stages. Stalled operation will then be restarted based on the most recent transaction state.

Create mappings

Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. This example defines an accounts index and a transactions index, for which the mappings are defined below.

Define the mappings for the accounts index

For our example, we define the following fields on the accounts index:

  • balance: the balance of each account. In this article we refer to the value stored in this field as “units”, which could be a currency, inventory, etc.
  • pending_transactions: an array that will contain a list of the _ids of each transaction that is currently “pending” on each account. The list of pending transactions serves two purposes: (1) it is used for ensuring that the same transaction will never be applied twice to the same account, and (2) it maintains state that will allow rollback steps to set this account back to its pre-transaction state in the event that the transaction cannot run to completion.

Mappings should be defined before writing any documents into the accounts index, as follows:

PUT accounts
{
  "mappings": {
    "properties": {
      "balance": {
        "type":  "long"
      }, 
      "pending_transactions": {
        "type": "keyword"
      }
    }
  }
}

Define the mappings for the transactions index

The transactions index will be used for tracking the state of each multi-document transaction, and will have the following fields defined:

  • amount: the value of the transfer.
  • creation_time: when was the transaction created. This is not strictly required, but stored for additional context.
  • modification_time: when was the transaction last modified. This will be used for determining if recovery should be started.
  • src_acct: the _id of the source account for the transfer.
  • dest_acct: the _id of the destination account for the transfer.
  • transaction_state: one of “created”, “pending”, “committed”, “finished”, “terminating”, or “rolled-back”. In a normal transaction without errors or rollbacks, the transaction state will move from “created” to “pending” to “committed”, and then to “finished”.

Mappings should be defined before writing any documents into the transactions index, as follows:

PUT transactions
{
  "mappings": {
    "properties": {
      "amount": {
        "type":  "long"
      },
      "creation_time": {
        "type": "date"
      },
      "modification_time": {
        "type": "date"
      },
      "dest_account": {
        "type": "keyword"
      },
      "src_account": {
        "type": "keyword"
      },
      "transaction_state": {
        "type": "keyword"
      }
    }
  }
}

Initialize accounts

We can initialize documents for accounts A and B, each with a balance of 500 as follows:

PUT accounts/_doc/A
{
  "balance": 500,
  "pending_transactions": []
}

PUT accounts/_doc/B
{
 "balance": 500,
 "pending_transactions": []
}

Define an ingest pipeline for inserting the ingest time

The following pipeline will be used to add “creation_time” and “modification_time” to the transaction documents. The modification_time will be required at later stages in the two-phase commit process for detecting transactions that have failed to complete within a reasonable amount of time. The creation time is stored for additional context.

PUT _ingest/pipeline/initialize_time
{
 "description": "Add ingest timestamp",
 "processors": [
   {
     "set": {
       "field": "_source.creation_time",
       "value": "{{_ingest.timestamp}}"
     }
   },
   {
     "set": {
       "field": "_source.modification_time",
       "value": "{{_ingest.timestamp}}"
     }
   }
 ]
}

Scripts that are used by the two-phase commit

In this section we define several painless scripts which will be executed by update operations that are performed in the two-phase commit steps. Given that updates are atomic, all of the operations inside each of these scripts will either succeed or fail as a unit.

Script to update the transaction state

This script will be used to update the transaction state from the current state to the desired state. The transaction state can be one of “created”, “pending”, “committed”, “finished”, “terminating”, or “rolled-back”.

Note that if the transaction state on the current transaction is not equal to the current state (e.g. if is already set to the desired state), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

If the transaction state is updated, then this script also updates the modification time of the transaction document which will be required for Recovering from errors procedures.

POST _scripts/update_transaction_state
{
  "script": {
    "lang": "painless",
    "source": """
      if (ctx._source.transaction_state == params.current_state) {
        ctx._source.transaction_state = params.desired_state;

        // Set the modification time in ISO 8601 (UTC)
        Date date = new Date();
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'");
        df.setTimeZone(TimeZone.getTimeZone("UTC"));
        ctx._source.modification_time = df.format(date);

      } else {
        ctx.op = "noop"
      }
    """
  }
}

Script to apply a transaction to an account

This script is used to add the current transaction amount to an account, and to push the the transaction identifier onto a list of pending transactions. Given the atomic nature of document updates, both of these operations will succeed or fail as an atomic unit. This script will be used for updating accounts once a transaction has entered into the “pending” state.

In the event that the transaction has already been applied to the account (as determined by the transaction identifier being in the array of pending transactions for this account), then this script will not update the document and the corresponding update operation will return a result of “noop”.

POST _scripts/apply_transaction_to_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored 
      // in pending_transactions. 
      // If the transaction is already on this account then the 
      // location is >= 0. Otherwise it is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);

      if (idx >= 0) {
        // if the transaction already exists on this account, 
        // do nothing 
        ctx.op = "noop";
      } 
      
      else {
        // New transaction - update the account balance 
        // and add the transaction to pending_transactions
        ctx._source.pending_transactions.add(params.trans_id);
        ctx._source.balance += params.amount;
      }
    """
  }
}

Script to remove a transaction from an account

This script is used for removing a transaction from the pending_transactions array on an account. This will be done once a transaction is no longer in the “pending” transaction state, and immediately after it has entered into the “committed” transaction state.

In the event that the current transaction has already been removed from the account (as determined by the transaction identifier not appearing in the array of pending transactions for this account), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

POST _scripts/remove_transaction_from_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored in 
      // pending_transactions. 
      // If it exists, the location is >= 0. Otherwise is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);
      

      if (idx >= 0) {
        // the transaction exists on this account, remove it 
        ctx._source.pending_transactions.remove(idx);
      } 
      
      else {
        // previously already removed, do nothing 
        ctx.op = "noop";
      }
    """
  }
}

Script to undo a transaction on an account

This script will be used in the event that a transaction is being rolled back. It reverses the previous change that was made to the account balance, and removes the transaction from the account’s pending_transactions array. Given the atomic nature of document updates, both of these operations will succeed or fail as an atomic unit.

In the event that the current transaction has already been removed from the account (as determined by the transaction identifier not appearing in the array of pending transactions for this account), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

POST _scripts/undo_transaction_on_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored in pending_transactions. 
      // If it exists, the location is >= 0. Otherwise is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);
      

      if (idx >= 0) {
        // the transaction exists on this account, remove it 
        ctx._source.pending_transactions.remove(idx);
        ctx._source.balance -= params.amount;
      } 
      
      else {
        // previously already removed, do nothing 
        ctx.op = "noop";
      }
  """
  }
}

Create a transaction

We create a new transaction, and once this transaction has been received by elasticsearch the subsequent steps covered in the remainder of this article will ensure that this transaction will eventually run to completion or will alternatively be rolled-back. This is true even if there is an intermediate failure in any of the individual two-phase commit steps.

Note that the transaction is initially created with a transaction state of “created” and will be moved into a “pending” state when the transaction begins processing. By breaking the transaction into a “created” state followed by a “pending” state, the creation of the  transaction can be decoupled from the two-phase commit processing of the transaction. For example, after receiving confirmation that a transaction has been created, an application can spin up a separate thread to start execution of Core step 1 – Update transaction state to “pending”, and the current thread can immediately respond to the end user with confirmation that the transaction has been received and created. If desired, the application can then periodically poll Elasticsearch to get an updated transaction state which can be displayed to the end user.

We request a transfer of 100 from account A to account B by creating a new document in the transactions index. This request should use the _create endpoint with an application-generated transaction identifier (set to “txn1” for this transaction) to ensure idempotency of the transaction creation. This can be done as follows:

PUT transactions/_create/txn1?pipeline=initialize_time
{ 
  "src_acct": "A", 
  "dest_acct": "B", 
  "amount" : 100, 
  "transaction_state": "created"
}

If we do not receive any response or if it fails, then the above request should be re-submitted, as we are not sure if Elasticsearch received and processed the request for the new transaction. However, because an unacknowledged transaction could theoretically have been received and already changed state due to recovery threads picking it up and moving it forward (this will be covered in detail later in this article), it is important that we are hitting the _create endpoint, so multiple submissions of a given transaction will not overwrite the transaction state of a transaction that was already accepted.

If the response from Elasticsearch includes “result”: “created”, then the above request has created the document, and we can now begin the two-phase commit of operations for the transaction.

If the application receives a response that indicates “document already exists”, then the transaction document was previously created. In order to avoid the risk of multiple threads simultaneously working on this transaction, do not being two-phase commit operations, and instead let the recovery procedures handle the processing of this transaction.

Note that in subsequent steps we will refer the newly submitted transaction as “curr_trans”. If the instructions from this blog are being tested in Kibana dev tools, then the values from the above document will need to be manually copied into subsequent commands wherever we refer to “curr_trans”. If these steps are to be executed in code, then this document could be stored in an object.

Core two-phase commit operations

In this section we present a two-phase commit which ensures that all steps required to complete a transaction are executed, and that any failed transactions can be detected and executed to completion or alternatively rolled back. This two-phase commit will prevent partial completion of transactions if there is a failure in any of the sequential two-phase commit steps.

Normal two-phase commit operations will execute in a single thread that will start at Core step 1 – Update transaction state to “pending”, and will end at Core step 7 – Update transaction state to finished. Recovery procedures may start executing at different stages in the two-phase commit, and will also execute sequentially in a single thread for a single transaction.

If any of the core two-phase commit operations fails or does not receive a response, then it should be retried several times before stopping processing of the current thread. If this happens then the failed transaction will be picked up later by recovery procedures.

Core step 1 – Update transaction state to “pending”

This step (Core step 1) can be arrived at immediately after the transaction has been created, however in the event of an error immediately after creating the transaction, this step will be executed after the step called Recovery step1 – transactions that are stuck in the “created” state. The document referred to by “curr_trans” will be determined by whichever step was executed prior to arriving to this current step.

Update the transaction to indicate that processing has started. This is accomplished by updating the transaction state of the “curr_trans” document to “pending” as follows:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "created",
      "desired_state": "pending"
    }
  }
}

Ensure that the the “result” value in the response is “updated”, and continue to the next step.

If the result field in the response is “noop” then processing of this transaction should be stopped. The _source returned by the above command should be inspected to ensure that the transaction is in the “pending” state. If necessary, recovery procedures will eventually detect this stopped transaction and then restart the transaction at Core step 2 – Apply the transaction to the source account.

If the request fails and continues to fail after several retries, the current thread can exit and the transaction can remain in the “created” state to be detected and fixed by recovery procedures.

Core step 2 – Apply the transaction to the source account

This step (Core step 2) will normally execute immediately after Core step 1 – Update transaction state to “pending”. However, in the event of a previous error that has prevented this step from executing, this step may be triggered by the step called Recover transactions that are in the “pending” transaction state.

Update the balance and pending_transactions in the source account in the source account as follows:

// "A" is from <curr_trans._source.src_acct>
POST accounts/_update/A?_source
{
  "script": {
    "id": "apply_transaction_to_account",
    "params": {
      "trans_id": "txn1", // from <curr_trans._id>
      "amount": -100      // i.e. remove <curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, then the returned _source should be inspected to ensure that the transaction has already been applied to the source account. As long as the transaction has been applied to the source account, continue to the next step as we must guarantee that the destination account is also updated before continuing on to change the transaction state to “committed”.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 3 – Apply the transaction to the destination account

This step (Core step 3) will execute immediately after Core step 2. Execute the following code to apply the transaction to the destination account:

// "B" is from <curr_trans._source.dest_acct>
POST accounts/_update/B?_source
{
  "script": {
    "id": "apply_transaction_to_account",
    "params": {
      "trans_id": "txn1", // from <curr_trans._id>
      "amount": 100       // from +<curr_trans._source.amount>
    }
  }
}

The response to the above command should return a result with the value “updated”.

If it returns a result of “noop”, then the returned _source should be inspected to ensure that the transaction has already been applied to the destination account. As long as the transaction has been applied to the destination account, continue with the next step to update the transaction state to “committed”.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 4 – Update the transaction state to “committed”

This step (Core step 4) will only be called immediately after a pending transaction has been applied to both the source account and the destination account. This step is not called directly by any recovery procedures. This ensures that a “committed” transaction state means that both the source and destination balances are guaranteed to have have been updated.

Once a transaction moves into a “committed” transaction state, the order of execution of the two-phase commit steps (or of the recovery procedures) guarantee that no further attempts to modify the source or destination account balances will take place for the current transaction.

Execute the following command to update the transaction state to “committed”:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "pending",
      "desired_state": "committed"
    }
  }
}

Ensure that the the “result” value in the response is equal to “updated”, and continue to the next step.

If the result field in the response is “noop” then processing of this transaction should be stopped. The returned _source should be inspected to ensure that the transaction is in the “committed” state. If necessary, recovery procedures will eventually detect this stopped transaction and will then restart the transaction at Core step 5 – Remove the transaction from the source account.

If the request fails and continues to fail after several retries, the current thread can exit as the transaction can remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 5 – Remove the transaction identifier from the source account

This step (Core step 5) will normally execute immediately after Core step 4 – Update transaction state to “committed”. However, in the event of a previous error that has prevented this step from executing, this will be called after Recover transactions that are in the “committed” transaction state.

The transaction id in the pending transactions array on the source account was previously required to ensure that a given transaction would only be applied once to the source account’s balance. Now that the account balances have been committed, this can now be removed as follows:

// "A" is from <curr_trans._source.src_acct>
POST accounts/_update/A?_source
{
  "script": {
    "id": "remove_transaction_from_account",
    "params": {
      "trans_id": "txn1" // from <curr_trans._id>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, the returned _source should be inspected to ensure that the transaction has been removed from the source account. If so, continue to the next step as we must guarantee that the destination account is also updated before moving to the “finished” state.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “committed” state to be detected and fixed by future recovery procedures.

Core step 6 – Remove the transaction identifier from the destination account

This step (Core step 6) will execute immediately after Core step 5. We can remove the transaction from the destination account as follows:

// "B" is from <curr_trans._source.dest_acct>
POST accounts/_update/B?_source
{
  "script": {
    "id": "remove_transaction_from_account",
    "params": {
      "trans_id": "txn1" // from <curr_trans._id>
    }
  }
}

The response to the above command should return a result with the value “updated”.

If it returns a result of “noop”, the returned _source should be inspected to ensure that the transaction has been removed from the destination account. If so, continue to the next step.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “committed” state to be detected and fixed by recovery procedures.

Core step 7 – Update transaction state to “finished”

This step (Core step 7) will only be called immediately after a committed transaction has been removed from both the source account and the destination account. This step is not called directly by any recovery procedures. This sequence ensures that a “finished” transaction state means that the current transaction has been removed from both the source and destination account’s pending transactions arrays.

Execute the following command to update the transaction state to “finished”:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "committed",
      "desired_state": "finished"
    }
  }
}

The response to the above command should normally return a result with the value “updated”.

If it returns a result of “noop”, then inspect the returned _source to confirm that the transaction state is “finished”.

If the request fails and continues to fail after several retries, the current thread can exit as the transaction can remain in the “committed” state to be detected and fixed by recovery procedures.

Rolling back transactions

In some cases it may be necessary to roll back a transaction, which can be done as described below.

Rollback of “created” transactions

A transaction that is in a “created” state can be directly set to “rolled-back”. This can be accomplished as follows:

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "created",
      "desired_state": "rolled-back"
    }
  }
}

If the result field in the response is “updated” then rollback of this transaction has succeeded.

If the result is “noop” then the transaction may have already moved past the “created” state, and therefore an alternate rollback procedure should be followed, depending on the current state of the transaction. The current transaction state can be viewed in the returned _source.

Rollback of “pending” transactions

If a transaction is in the “pending” state, then the transaction can be rolled-back by executing the following steps which will reverse the modifications to the source and destination accounts, as well as change the status of the transaction..

Rollback step 1 – Update the transaction state to “terminating”

Update the transaction state on the current transaction from “pending” to “terminating” as shown below.

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "pending",
      "desired_state": "terminating"
    }
  }
}

The response to the above command should return a result with the value “updated”.

If a result of “noop” is returned in the response, then the rollback of the current transaction should be stopped and the returned _source should be inspected to confirm that the transaction is not in the “pending” state, and that it is already in either the “committed” state or “terminating” state.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “pending” state to be detected and fixed by normal Recovering from errors in the two-phase commit procedures (which, if successful, would move the transaction to a “finished” state, rather than “rolled-back), or alternatively a new rollback can be retried.

Rollback step 2 – Undo the transaction on the source account

This step (Rollback step 2) can be arrived at immediately after the transaction has been set to the “terminating” state, however in the event of an error after updating the state, this step will be executed after the step called Recover transactions that are in the “terminating” transaction state.

If the source account balance has been modified, then it will have an entry for the current transaction’s _id in its pending transactions array. Therefore the amount that was removed from the source account balance must be added back, and the transaction must be removed from the pending transactions array. This can be accomplished with the following code:

// "A" is from <curr_trans._source.src_acct>
POST accounts/_update/A?_source
{
  "script": {
    "id": "undo_transaction_on_account",
    "params": {
      "trans_id": "txn1", // <curr_trans._id>
      "amount": -100      // Undo -<curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, the returned _source should be inspected to confirm that the transaction has already been rolled-back on the source account. In this case, continue to the next step as it is possible that the destination account has not yet been rolled-back.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state to be detected and fixed by recovery procedures.

Rollback step 3 – Undo the transaction from the destination account

If the destination account balance has been modified, then it will have an entry for the current transaction’s _id in its pending transactions array. Therefore the amount that was added to the destination account balance must be removed, and the transaction must be removed from the pending transactions array. This can be accomplished with the following code:

// "B" is from <curr_trans._source.dest_acct>
POST accounts/_update/B?_source
{
  "script": {
    "id": "undo_transaction_on_account",
    "params": {
      "trans_id": "txn1", // <curr_trans._id>
      "amount": 100      //  Undo <curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”.

If it returns a result of “noop”, the returned _source should be inspected to confirm that the transaction has already been rolled-back on the destination account. In this case, continue to the next step as the transaction state needs to be set to “rolled-back”.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state to be detected and fixed by recovery procedures.

Rollback step 4 – Set the transaction state to “rolled-back”

The rollback operation is completed by updating the transaction state to “rolled-back”.

// "txn1" is <curr_trans._id>
POST transactions/_update/txn1?_source
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "terminating",
      "desired_state": "rolled-back"
    }
  }
}

The response to the above command should return a result with the value “updated”.

If the response result is “noop”, then the returned _source should be inspected to confirm that the transaction state has already been set to “rolled-back”.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state, which will be detected and fixed by recovery procedures.

Rollback of “committed” transactions

If a transaction is in the “committed” state, then it should be allowed to complete (i.e. it should be allowed to move to a transaction state of “finished”). After completion, then another transaction can be executed to reverse the transaction.

If for some reason a transaction in the “committed” state cannot proceed to the “finished” state on its own, even after the steps in Recover transactions that are in the “committed” transaction state have been executed, then manual intervention may be required to understand and fix the root cause of the issue.

Recovering from errors

Recovery operations can be looped over to detect and recover from a failure in any of the stages in the Core two-phase commit operations. Recovery should only be initiated on a transaction that has not completed within a reasonable amount of time since its last update (i.e. since its modification timestamp). Recovery should not be attempted on transactions that may still be executing, or that may have unacknowledged modifications to their transaction state, as this could introduce race conditions.

A simple heuristic to ensure that recovery is not attempted on transactions that are currently executing is the following:

  1. Before recovering a transaction, wait until the time that has passed since its last modification is greater than the maximum processing time of all two-phase commit steps. In other words, wait long enough to ensure that the transaction should have either executed to completion or failed, including retries and timeouts.
  2. Ensure that the looping of the recovery operation does not happen faster than the maximum processing time of all two-phase commit steps. In other words, do not start the next iteration of the recovery loop until after waiting long enough to ensure that previous recovery operations will have either completed or failed, including retries and timeouts.

Unacknowledged modifications (a.k.a. dirty reads) can be avoided by using the modification_time to compute how much time has passed since the last update to the transaction, and not attempting to recover a transaction until several minutes have passed since its most recent update. This will give elastic sufficient time to detect and fix any unacknowledged writes.

In the examples below, we wait 2 minutes from the most recent update to a transaction before we attempt recovery, which should meet the above requirements. We could also consider executing the recovery loop once every minute. Both of these values should be validated based on the timeouts and number of retries for a given implementation.

More sophisticated and precise control over the timing of the launching of recovery procedures could be achieved by maintaining a mapping of transactions and their associated threads. Before launching a recovery procedure, the thread associated with each transaction could be checked to ensure that it is no longer alive. The details of such an implementation will be language dependent, and are beyond the scope of this article.

Get transactions that need to be recovered

Transactions that have not been modified in the past 2 minutes, may be “stuck”, and can be detected with the following query:

GET transactions/_search
{
 "size": 100,
 "query": {
   "bool": {
     "filter": [
       {
         "terms": {
           "transaction_state": [
             "created", 
             "pending",
             "terminating", 
             "committed"
             ]
         }
       },
       {
         "range": {
           "modification_time": {
             "lte": "now-2m"
           }
         }
       }

     ]
   }
 },
 "sort": [
   {
     "modification_time": {
       "order": "asc"
     }
   }
 ]
}

Recover transactions that are stuck in the “created” state

For each “created” transaction returned from the above query, create a new thread to resume executing it starting at Core step 1 – Update transaction state to “pending”.

Recover transactions that are in the “pending” state

For each “pending” transaction returned from the above query, create a new thread to resume executing it starting at Core step 2 – Apply the transaction to the source account.

Recover transactions that are in the “committed” state

For each “committed” transaction that is returned, create a new thread to resume executing it starting at Core step 5 – Remove the transaction from the source account.

Recover transactions that are in the “terminating” state

For each “pending” transaction that is returned, create a new thread to resume executing it starting at Rollback step 2 – Undo the transaction on the source account.

Recover very long running transactions

If any “created”, “pending”, or “committed” transactions are returned that have a modification time that is more than an hour ago, they may need additional investigations and should possibly be rolled back as described in the Rolling back transactions section.

If a “terminating” transaction is unable to move into the “rolled-back” transaction state, then it should be investigated and manual intervention may be required.

Conclusions

In some cases it may be necessary to modify multiple documents as a single cohesive unit. Elasticsearch does not have built-in support for multi-document transactions, therefore in this blog this blog post we have presented a two-phase commit protocol which can be used to emulate multi-document transactions.

Using Logstash prune capabilities to whitelist sub-documents

Overview

Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash, and that all other fields are dropped. In this blog post we demonstrate the use of Logstash to whitelist desired fields and desired sub-documents before indexing into Elasticsearch.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example command line given later in this blogpost. 

Example Logstash pipeline

Below is a Logstash pipeline which can be stored in a file called ‘stocks.conf’, that does the following:

  1. Reads stock market trades as CSV-formatted input from stdin.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Moves DAX and CAC fields into a nested structure called “my_nest”.
  5. Whitelists the “my_nest” field (which contains a sub-document) and the “SMI” field so that all other (non-whitelisted) fields will be removed.
  6. Writes the resulting documents to an Elasticsearch index called “stocks_whitelist_test”.
# For this simple example, pipe in data from stdin. 
input {
    stdin {}
}

filter {
    csv {
        columns => ["time","DAX","SMI","CAC","FTSE"]
        separator => ","
        convert => { 'DAX' => 'float'
        'SMI' => 'float'
        'CAC' => 'float'
        'FTSE' => 'float'}
    }
    date {
        match => ['time', 'UNIX']
    }
    mutate {
        # Move DAX and CAC into a sub-document 
        # called 'my_nest'
        rename => {
            "DAX" => "[my_nest][DAX]"
            "CAC" => "[my_nest][CAC]"
        }
    }
     
    # Remove everything except "SMI" and the 
    # "my_nest" sub-document 
    prune {
         whitelist_names => [ "SMI", "my_nest" ]
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_whitelist_test"
    }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system:

cat ./stocks.csv | ./logstash -f ./stocks.conf

You can the check the data that you have stored in Elasticsearch by executing the following comand from Kibana’s dev console:

GET /stocks_whitelist_test/_search

Which should display documents with the following structure:

      {
        "_index": "stocks_whitelist_test",
        "_type": "doc",
        "_id": "KANygWUBsYalOV9yOKsD",
        "_score": 1,
        "_source": {
          "my_nest": {
            "CAC": 1718,
            "DAX": 1606.51
          },
          "SMI": 1678.6
        }
      }

Notice that only “my_nest” and “SMI” have been indexed as indicated by the contents of the document’s “_source”. Also note that the “FTSE” and “time” fields have been removed as they were not in the prune filter’s whitelist.

Conclusion

In this blog post, we have demonstrated how Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash.

Deduplicating documents in Elasticsearch

Overview

In this blog post we cover how to detect and remove duplicate documents from Elasticsearch by using either Logstash or alternatively by using custom code written in Python.

Example document structure

For the purposes of this blog post, we assume that the documents in the Elasticsearch cluster have the following structure. This corresponds to a dataset that contains documents representing stock market trades.

    {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Given this example document structure, for the purposes of this blog we arbitrarily assume that if multiple documents have the same values for the [“CAC”, “FTSE”, “SMI”] fields that they are duplicates of each other.

Using logstash for deduplicating Elasticsearch documents

Logstash may be used for detecting and removing duplicate documents from an Elasticsearch index. This process is described in this blogpost.

In the example below I have written a simple Logstash configuration that reads documents from an index on an Elasticsearch cluster, then uses the fingerprint filter to compute a unique _id value for each document based on a hash of the [“CAC”, “FTSE”, “SMI”] fields, and finally writes each document back to a new index on that same Elasticsearch cluster such that duplicate documents will be written the the same _id and therefore eliminated.

Additionally, with minor modifications, the same Logstash filter could also be applied to future documents written into the newly created index in order to ensure that duplicates are removed in near real-time. This could be accomplished by changing the input section in the example below to accept documents from your real-time input source rather than pulling documents from an existing index.

Be aware that using custom _id values (i.e. an _id that is not generated by Elasticsearch) will have some impact on the write performance of your index operations. 

Also, it is worth noting that depending on the hash algorithm used, this approach may theoretically result in a non-zero number of hash collisions for the _id value, which could theoretically result in two non-identical documents being mapped to the same _id. For most practical cases, the probability of a has collision is likely very low.

A simple Logstash configuration to de-duplicate an existing index using the fingerprint filter is given below.

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}

filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

A custom Python script for deduplicating Elasticsearch documents

A memory-efficient approach

If Logstash is not used, then deduplication may be efficiently accomplished with a custom python script. For this approach, we compute the hash of the [“CAC”, “FTSE”, “SMI”] fields that we have defined to uniquely identify a document. We then use this hash as a key in a python dictionary, where the associated value of each dictionary entry will be an array of the _ids of the documents that map to the same hash.

If more than one document has the same hash, then the duplicate documents that map to the same hash can be deleted.  Alternatively, if you are concerned about the possibility of hash collisions, then the contents of documents that map to the same hash can be examined to see if the documents are really identical, and if so, then duplicates can be eliminated.

Detection algorithm analysis

For a 50GB index, if we assume that the index contains 0.4 kB documents, then there would be 125 million documents in the index. In this case the amount memory required for storing the deduplication data structures in memory when using a 128-bit md5 hash would be on the order of 128 bits x 125 Million = 2GB of memory, plus the 160 bit _ids will require another 160 bits x 125 Million = 2.5 GB of of memory. This algorithm will therefore require on the order of 4.5GB RAM to keep all relevant data structures in memory. This memory footprint can be dramatically reduced if the approach discussed in the following section can be applied.

Potential enhancement to the algorithm to reduce memory usage, as well as to continuously remove new duplicate documents

If you are storing time-series data, and you know that duplicate documents will only occur within some small amount of time of each other, then you may be able to improve this algorithm’s memory footprint by repeatedly executing the algorithm on a subset of the documents in the index, with each subset corresponding to a different time window. For example, if you have a years worth of data then you could use range queries on your datetime field (inside a filter context  for best performance) to step through your data set one week at a time. This would require that the algorithm is executed 52 times (once for each week) — and in this case, this approach would reduce the worst-case memory footprint by a factor of 52.

In the above example, you may be concerned about not detecting duplicate documents that span between weeks. Lets assume that you know that duplicate documents cannot occur more than 2 hours apart. Then you would need to ensure that each execution of the algorithm includes documents that overlap by 2 hours with the last set of documents analyzed by the previous execution of the algorithm. For the weekly example, you would therefore need to query 170 hours (1 week + 2 hours) worth of time-series documents to ensure that no duplicates are missed.

If you wish to periodically clear out duplicate documents from your indices on an on-going basis, you can execute this algorithm on recently received documents. The same logic applies as above — ensure that recently received documents are included in the analysis along with enough of an overlap with slightly older documents to ensure that duplicates are not inadvertently missed.

Python code to detect duplicate documents

The following code demonstrates how documents can can be efficiently evaluated to see if they are identical, and then eliminated if desired. However, in order to prevent accidental deletion of documents, in this example we do not actually execute a delete operation. Such functionality would be straightforward to include.

The code to deduplicate documents from Elasticsearch can also be found on github.

#!/usr/local/bin/python3

# A description and analysis of this code can be found at 
# https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/

import hashlib
from elasticsearch import Elasticsearch, helpers

ES_HOST = 'localhost:9200'
ES_USER = 'elastic'
ES_PASSWORD = 'elastic'

es = Elasticsearch([ES_HOST], http_auth=(ES_USER, ES_PASSWORD))
dict_of_duplicate_docs = {}

# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]


# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hit):

    combined_key = ""
    for mykey in keys_to_include_in_hash:
        combined_key += str(hit['_source'][mykey])

    _id = hit["_id"]

    hashval = hashlib.md5(combined_key.encode('utf-8')).digest()

    # If the hashval is new, then we will create a new key
    # in the dict_of_duplicate_docs, which will be
    # assigned a value of an empty array.
    # We then immediately push the _id onto the array.
    # If hashval already exists, then
    # we will just push the new _id onto the existing array
    dict_of_duplicate_docs.setdefault(hashval, []).append(_id)


# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    for hit in helpers.scan(es, index='stocks'):
        populate_dict_of_duplicate_docs(hit)


def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hasval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In order to remove the possibility of hash collisions,
            # write code here to check all fields in the docs to
            # see if they are truly identical - if so, then execute a
            # DELETE operation on all except one.
            # In this example, we just print the docs.
            print("doc=%s\n" % doc)



def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()


main()
%d bloggers like this: