Emulating transactional functionality in Elasticsearch with two-phase commits

Introduction

Elasticsearch supports atomic create, update, and delete operations at the individual document level. However, in some cases it may be necessary to modify multiple documents as a single cohesive unit. Elasticsearch does not have built-in support for multi-document transactions, therefore in this blog post we present a two-phase commit protocol which can be used to emulate multi-document transactions.

Overview

Create, update, and delete operations in Elasticsearch are atomic at the document level, which means that creating, modifying, or deleting a single document either fully succeeds or fails. However, in a multi-document transaction there is no guarantee that an operation that has multiple steps and that impacts multiple documents will either succeed or fail as a cohesive unit.

In some cases inner objects and nested types can be used to combine multiple documents into a single atomic unit, which would guarantee atomicity of create, update, and delete operations on the parent and its embedded sub-documents. In other cases it may not be possible or desirable to embed related documents inside a parent document. Therefore multi-document transactional functionality may be desired.

Given the lack of built-in multi-document transactions in Elasticsearch, multi-document transactional functionality that is built on top of Elasticsearch must be implemented in application code. Such functionality can be achieved with the two-phase commit protocol.

What is the two-phase commit protocol

The two-phase commit protocol is a type of atomic commitment protocol that coordinates processes that participate in a distributed atomic transaction. The protocol ultimately determines whether to commit or rollback a transaction. The protocol achieves this goal even in the event of most temporary system failures.

To permit recovery from a failure, the two-phase commit protocol logs the state of a given transaction as the sequential steps to perform a transaction are executed. In the event of a failure at any stage in a transaction, these transaction logs will be used by recovery procedures to either complete the transaction or to roll it back.

A high-level overview of a two-phase commit implementation

In this article, we present an example of a two-phase commit transaction that is used for tracking the movement of “units” between two accounts, as described in the following sections:

  1. Create mappings: Define the mappings for an accounts index and a transactions index.
  2. Initialize accounts: Create sample accounts with balances that will be used in subsequent steps to demonstrate how the two-phase commit protocol can be used to ensure completion of the movement of units from one account balance to another.
  3. Define an ingest pipeline for inserting the ingest time: An ingest pipeline is used to ensure that every transaction has a creation time written into it.
  4. Scripts that are used by the two-phase commit: Define painless scripts which are used in the two-phase commit.
  5. Create a transaction: A transaction defines which accounts to move units between and how many units to move. It also logs the status of the transaction, which can be used for recovery or rollback.
  6. Core two-phase commit operations: The steps that will be executed to implement the two-phase commit. By following a specific sequence of operations for multi-document transactions, we maintain sufficient state (i.e. a transaction log) at every step to either fix or rollback transactions that have partially completed and then failed to continue execution.
  7. Rolling back transactions: In rare cases some transactions may not be able to complete, and should be rolled-back. This section describes how to undo a two-phase commit that previously started.
  8. Recovering from errors: This section describes the operations that are periodically executed to detect transactions that have become stuck in one of the two-phase commit or rollback stages. Stalled operation will then be restarted based on the most recent transaction state.

Create mappings

Mapping is the process of defining how a document, and the fields it contains, are stored and indexed. This example defines an accounts index and a transactions index, for which the mappings are defined below.

Define the mappings for the accounts index

For our example, we define the following fields on the accounts index:

  • balance: the balance of each account. In this article we refer to the value stored in this field as “units”, which could be a currency, inventory, etc.
  • pending_transactions: an array that will contain a list of the _ids of each transaction that is currently “pending” on each account. The list of pending transactions serves two purposes: (1) it is used for ensuring that the same transaction will never be applied twice to the same account, and (2) it maintains state that will allow rollback steps to set this account back to its pre-transaction state in the event that the transaction cannot run to completion.

Mappings should be defined before writing any documents into the accounts index, as follows:

PUT accounts
{
  "mappings": {
    "properties": {
      "balance": {
        "type":  "long"
      }, 
      "pending_transactions": {
        "type": "keyword"
      }
    }
  }
}

Define the mappings for the transactions index

The transactions index will be used for tracking the state of each multi-document transaction, and will have the following fields defined:

  • amount: the value of the transfer.
  • creation_time: when was the transaction created. This is not strictly required, but stored for additional context.
  • modification_time: when was the transaction last modified. This will be used for determining if recovery should be started.
  • src_acct: the _id of the source account for the transfer.
  • dest_acct: the _id of the destination account for the transfer.
  • transaction_state: one of “created”, “pending”, “committed”, “finished”, “terminating”, or “rolled-back”. In a normal transaction without errors or rollbacks, the transaction state will move from “created” to “pending” to “committed”, and then to “finished”.

Mappings should be defined before writing any documents into the transactions index, as follows:

PUT transactions
{
  "mappings": {
    "properties": {
      "amount": {
        "type": "long"
      },
      "creation_time": {
        "type": "date"
      },
      "modification_time": {
        "type": "date"
      },
      "dest_account": {
        "type": "keyword"
      },
      "src_account": {
        "type": "keyword"
      },
      "transaction_state": {
        "type": "keyword"
      }
    }
  }
}

Initialize accounts

We can initialize documents for accounts A and B, each with a balance of 500 as follows:

PUT accounts/_doc/A
{
  "balance": 500,
  "pending_transactions": []
}

PUT accounts/_doc/B
{
 "balance": 500,
 "pending_transactions": []
}

Define an ingest pipeline for inserting the ingest time

The following pipeline will be used to add “creation_time” and “modification_time” to the transaction documents. The modification_time will be required at later stages in the two-phase commit process for detecting transactions that have failed to complete within a reasonable amount of time. The creation time is stored for additional context.

PUT _ingest/pipeline/initialize_time
{
 "description": "Add ingest timestamp",
 "processors": [
   {
     "set": {
       "field": "_source.creation_time",
       "value": "{{_ingest.timestamp}}"
     }
   },
   {
     "set": {
       "field": "_source.modification_time",
       "value": "{{_ingest.timestamp}}"
     }
   }
 ]
}

Scripts that are used by the two-phase commit

In this section we define several painless scripts which will be executed by update operations that are performed in the two-phase commit steps. Given that updates are atomic, all of the operations inside each of these scripts will either succeed or fail as a unit.

Script to update the transaction state

This script will be used to update the transaction state from the current state to the desired state. Note that if the transaction state on the current transaction is not equal to the current state (e.g. if is already set to the desired state), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

If the transaction state is updated, then this script also updates the modification time of the transaction document which will be required for Recovering from errors procedures.

POST _scripts/update_transaction_state
{
  "script": {
    "lang": "painless",
    "source": """
      if (ctx._source.transaction_state == params.current_state) {
        ctx._source.transaction_state = params.desired_state;

        // Set the modification time in ISO 8601 (UTC)
        Date date = new Date();
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'");
        df.setTimeZone(TimeZone.getTimeZone("UTC"));
        ctx._source.modification_time = df.format(date);

      } else {
        ctx.op = "noop"
      }
    """
  }
}

Script to apply a transaction to an account

This script is used to add the current transaction amount to an account, and to push the the transaction identifier onto a list of pending transactions. Given the atomic nature of document updates, both of these operations will succeed or fail as an atomic unit. This script will be used for updating accounts once a transaction has entered into the “pending” state.

In the event that the transaction has already been applied to the account (as determined by the transaction identifier being in the array of pending transactions for this account), then this script will not update the document and the corresponding update operation will return a result of “noop”.

POST _scripts/apply_transaction_to_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored 
      // in pending_transactions. 
      // If the transaction is already on this account then the 
      // location is >= 0. Otherwise it is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);

      if (idx >= 0) {
        // if the transaction already exists on this account, 
        // do nothing 
        ctx.op = "noop";
      } 
      
      else {
        // New transaction - update the account balance 
        // and add the transaction to pending_transactions
        ctx._source.pending_transactions.add(params.trans_id);
        ctx._source.balance += params.amount;
      }
    """
  }
}

Script to remove a transaction from an account

This script is used for removing a transaction from the pending_transactions array on an account. This will be done once a transaction is no longer in the “pending” transaction state, and immediately after it has entered into the “committed” transaction state.

In the event that the current transaction has already been removed from the account (as determined by the transaction identifier not appearing in the array of pending transactions for this account), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

POST _scripts/remove_transaction_from_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored in 
      // pending_transactions. 
      // If it exists, the location is >= 0. Otherwise is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);
      

      if (idx >= 0) {
        // the transaction exists on this account, remove it 
        ctx._source.pending_transactions.remove(idx);
      } 
      
      else {
        // previously already removed, do nothing 
        ctx.op = "noop";
      }
    """
  }
}

Script to undo a transaction on an account

This script will be used in the event that a transaction is being rolled back. It reverses the previous change that was made to the account balance, and removes the transaction from the account’s pending_transactions array. Given the atomic nature of document updates, both of these operations will succeed or fail as an atomic unit.

In the event that the current transaction has already been removed from the account (as determined by the transaction identifier not appearing in the array of pending transactions for this account), then this script will not modify the document and the corresponding update operation will return a result of “noop”.

POST _scripts/undo_transaction_on_account
{
  "script": {
    "lang": "painless",
    "source": """
    
      // Check if the transaction is already stored in pending_transactions. 
      // If it exists, the location is >= 0. Otherwise is -1. 
      def idx = ctx._source.pending_transactions.indexOf(params.trans_id);
      

      if (idx >= 0) {
        // the transaction exists on this account, remove it 
        ctx._source.pending_transactions.remove(idx);
        ctx._source.balance -= params.amount;
      } 
      
      else {
        // previously already removed, do nothing 
        ctx.op = "noop";
      }
  """
  }
}

Create a transaction

We create a new transaction, and once this transaction has been received by elasticsearch the subsequent steps covered in the remainder of this article will ensure that this transaction will eventually run to completion or will alternatively be rolled-back. This is true even if there is an intermediate failure in any of the individual two-phase commit steps.

Note that the transaction is initially created with a transaction state of “created” and will be moved into a “pending” state when the transaction begins processing. By breaking the transaction into a “created” state followed by a “pending” state, the creation of the transaction can be decoupled from the two-phase commit processing of the transaction. For example, after receiving confirmation that a transaction has been created, an application can spin up a separate thread to start execution of Core step 1 – Update transaction state to “pending”, and the current thread can then immediately respond to the end user with confirmation that the transaction has been received and created. Subsequently, the application can then periodically poll Elasticsearch to get an updated transaction state, which can be displayed to the end user.

We request a transfer of 100 from account A to account B by creating a new document in the transactions index. This request should use the _create endpoint with an application-generated transaction identifier (set to “txn1” for this transaction) to ensure idempotency of the transaction creation. This can be done as follows:

PUT transactions/_create/txn1?pipeline=initialize_time
{ 
  "src_acct": "A", 
  "dest_acct": "B", 
  "amount" : 100, 
  "transaction_state": "created"
}

If we do not receive any response or if it fails, then the above request should be re-submitted, as we are not sure if Elasticsearch received the request for the new transaction. As an unacknowledged transaction could theoretically have been received and already have already changed state due to other threads picking it up and moving it forward (this will be covered in detail later in this article), it is important that we are hitting the _create endpoint, so multiple submissions of a given transaction will not overwrite the transaction state.

If the response from Elasticsearch includes “result”: “created”, then the above request has created the document, as desired. If the application receives a response that indicates “document already exists”, then the document was created in a previous request. In either case, we are assured that Elasticsearch has received the transaction, and we can now begin the two-phase commit of operations for the transaction.

In subsequent steps we will refer the newly submitted transaction as “curr_trans”. If the instructions from this blog are being tested in Kibana dev tools, then the values from the above document will need to be manually copied into subsequent commands wherever we refer to “curr_trans”. If these steps are to be executed in code, then this document could be stored in an object which we will refer to as “curr_trans”.

Core two-phase commit operations

In this section we present a two-phase commit which ensures that all steps required to complete a transaction are executed, and that any failed transactions can be detected and executed to completion or alternatively rolled back. This two-phase commit will prevent partial completion of transactions if there is a failure in any of the sequential two-phase commit steps.

Normal two-phase commit operations for will execute in a single thread that will start at Core step 1 – Update transaction state to “pending”, and will end at Core step 7 – Update transaction state to finished. Recovery procedures may start executing at different stages in the two-phase commit, and will also execute sequentially in a single thread for a single transaction.

If any of the core two-phase commit operations fails or does not receive a response, then it should be retried several times before stopping processing of the current thread. If this happens then the current transaction will be picked up later by recovery procedures.

Core step 1 – Update transaction state to “pending”

This step (Core step 1) can be arrived at immediately after the transaction has been created, however in the event of an error after creating the transaction, this step will be executed after the step called Recovery step1 – transactions that are stuck in the “created” state. The document referred to by “curr_trans” will be determined by whichever step was executed prior to arriving to this current step.

Update the transaction to indicate that processing has started, by updating the transaction state of the “curr_trans” document to “pending” as follows:

POST transactions/_update/txn1  // "txn1" is <curr_trans._id>
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "created",
      "desired_state": "pending"
    }
  }
}

Ensure that the the “result” value in the response is equal to “updated”.

If the result field in the response is “noop” then processing of this transaction should be stopped, as this transaction document has already been updated. This could indicate that another thread has already updated the transaction state and could be in the process of applying the transaction to the source and destination accounts. In some rare cases a “noop” response could also be caused by an internal retry which has internally updated the document more than once. Regardless of the cause of a “noop” response, if necessary, a transaction that is stopped here will be detected and restarted by the recovery procedures.

Core step 2 – Apply the transaction to the source account

This step (Core step 2) will normally immediately after Core step 1 – Update transaction state to “pending”. However, in the event of a previous error that has prevented this step from executing, this will be called after Recover transactions that are in the “pending” transaction state.

In other words, if a previous thread updated the transaction state to “pending” and then failed without subsequently executing this step, this situation will eventually be detected by Recover transactions that are in the “pending” transaction state, and this step will then be subsequently restarted.

Update the balance and pending_transactions in the source account in the source account as follows:

POST accounts/_update/A   // "A" is from curr_trans._source.src_acct
{
  "script": {
    "id": "apply_transaction_to_account",
    "params": {
      "trans_id": "txn1", // from <curr_trans._id>
      "amount": -100      // i.e. remove <curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”. If it returns a result of “noop”, it may mean that another thread previously updated the balance on the source account. In case of both “updated” and “noop”, continue to the next step as it is possible that the alternate thread failed before updating the destination account.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “pending” state to be detected and fixed by future recovery procedures.

Core step 3 – Apply the transaction to the destination account

This step (Core step 3) will execute immediately after Core step 2. Execute the following code to apply the transaction to the destination account:

POST accounts/_update/B   // "B" is from curr_trans._source.dest_acct
{
  "script": {
    "id": "apply_transaction_to_account",
    "params": {
      "trans_id": "txn1", // from <curr_trans._id>
      "amount": 100       // from +<curr_trans._source.amount>
    }
  }
}

The response to the above command should return a result with the value “updated”. If it returns a result of “noop”, it means that another thread previously updated the balance on the destination account. As long as an error is not received, continue to the next step.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “pending” state to be detected and fixed by recovery procedures.

Core step 4 – Update the transaction state to “committed”

This step (Core step 4) will only be called immediately after a pending transaction has been applied to both the source account and the destination account. This step is not called directly by any recovery procedures. This ensures that a “committed” transaction state means that both the source and destination balances are guaranteed to have have been updated.

Once a transaction moves into a “committed” transaction state, the order of execution of the two-phase commit steps (or of the recovery procedures) guarantee that no further attempts to modify the source or destination account balances will take place, as those steps are only executed on “pending” transactions.

Execute the following command to update the transaction state to “committed”:

POST transactions/_update/txn1  // "txn1" is <curr_trans._id>
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "pending",
      "desired_state": "committed"
    }
  }
}

Ensure that the the “result” value in the response is equal to “updated”.

If the result field in the response is “noop” then processing of this transaction should be stopped, as this transaction document has already been updated. This could indicate that another thread has already updated the transaction state and is currently in the process of removing the transaction from the source and destination accounts. In some rare cases a “noop” response could also be caused by an internal retry which has internally updated the document more than once. Regardless of the cause of a “noop” response, if necessary, a transaction that is stopped here will be detected and restarted by the recovery procedures.

Core step 5 – Remove the transaction from the source account

This step (Core step 5) will normally execute immediately after Core step 4 – Update transaction state to “committed”. However, in the event of a previous error that has prevented this step from executing, this will be called after Recover transactions that are in the “committed” transaction state.

Now that the account balances have been committed, we can remove the transaction from the pending transactions array on the source account. This was previously required to ensure that a given transaction would only be applied once to the source account’s balance. As the subsequent steps in the two-phase commit sequence do not modify the account balance, this can now be removed as follows:

POST accounts/_update/A  // "A" is from curr_trans._source.src_acct
{
  "script": {
    "id": "remove_transaction_from_account",
    "params": {
      "trans_id": "txn1" // from <curr_trans._id>
    }
  }
}

The response to the above command should normally have a result of “updated”. If it returns a result of “noop”, it means that another thread may have previously removed the transaction from the source account. In either case, continue to the next step as it is possible that the alternate thread failed before updating the destination account.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “committed” state to be detected and fixed by future recovery procedures.

Core step 6 – Remove the transaction from the destination account

This step (Core step 6) will execute immediately after Core step 5. We can remove the transaction from the destination account as follows:

POST accounts/_update/B  // "B" is from curr_trans._source.dest_acct
{
  "script": {
    "id": "remove_transaction_from_account",
    "params": {
      "trans_id": "txn1" // from <curr_trans._id>
    }
  }
}

The response to the above command should normally return a result with the value “updated”. As long as an error is not received, continue to the next step.

If the request fails and continues to fail after several retries, do not continue to the next step, as the transaction should remain in the “committed” state to be detected and fixed by recovery procedures.

Core step 7 – Update transaction state to finished

This step (Core step 7) will only be called immediately after a committed transaction has been removed from both the source account and the destination account. This step is not called directly by any recovery procedures. This ensures that a “finished” transaction state means that the current transaction has been removed from both the source and destination account’s pending transactions arrays.

Execute the following command to update the transaction state to “finished”:

POST transactions/_update/txn1  // "txn1" is <curr_trans._id>
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "committed",
      "desired_state": "finished"
    }
  }
}

The response to the above command should normally return a result with the value “updated”. If it returns a result of “noop” then another thread may have already modified the state to finished. In this case get the transaction to confirm that the transaction state is “finished”.

If the request fails and continues to fail after several retries, the current thread can exit as the transaction can remain in the “committed” state to be detected and fixed by recovery procedures.

Rolling back transactions

In some cases it may be necessary to roll back a transaction, which can be done as described below.

Rollback of “created” transactions

A transaction that is in a “created” state can be directly set to “rolled-back”. This can be accomplished as follows:

POST transactions/_update/txn1  // "txn1" is <curr_trans._id>
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "created",
      "desired_state": "rolled-back"
    }
  }
}

If the result field in the response is “updated” then rollback of this transaction has succeeded.

If the result is “noop” then the transaction may have already moved past the “created” state, and therefore an alternate rollback procedure should be followed, depending on the current state of the transaction.

Rollback of “pending” transactions

If a transaction is in the “pending” state, then the transaction can be rolled-back by executing the following steps which will reverse the modifications to the source and destination accounts, as well as change the status of the transaction..

Rollback step 1 – Update the transaction state to “terminating”

Update the transaction state on the current transaction from “pending” to “terminating” as shown below.

POST transactions/_update/txn1  // "txn1" is <curr_trans._id>
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "pending",
      "desired_state": "terminating"
    }
  }
}

The response to the above command should return a result with the value “updated”. If a result of “noop” is returned in the response, then the rollback of the current transaction should be stopped as the transaction is no longer in the “pending” state, and should therefore be allowed to continue to the “finished” state, at which point we should then submit a new transaction to reverse it.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “pending” state to be detected and fixed by normal Recovering from errors in the two-phase commit procedures (which, if successful, would move the transaction to a “finished” state, rather than “rolled-back), or alternatively a new rollback can be retried.

Rollback step 2 – Undo the transaction on the source account

This step (Rollback step 2) can be arrived at immediately after the transaction has been set to the “terminating” state, however in the event of an error after updating the state, this step will be executed after the step called Recover transactions that are in the “terminating” transaction state.

If the source account balance has been modified, then it will have an entry for the current transaction’s _id in its pending transactions array. Therefore the amount that was removed from the source account balance must be added back, and the transaction must be removed from the pending transactions array. This can be accomplished with the following code:

POST accounts/_update/A   // "A" is from curr_trans._source.src_acct
{
  "script": {
    "id": "undo_transaction_on_account",
    "params": {
      "trans_id": "txn1", // <curr_trans._id>
      "amount": -100      // Undo -<curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”. If it returns a result of “noop”, it means that another thread previously rolled-back the transaction on the source account. In either case, continue to the next step as it is possible that the alternate thread updated the source account but failed before updating the destination account.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state to be detected and fixed by recovery procedures.

Rollback step 3 – Undo the transaction from the destination account

If the destination account balance has been modified, then it will have an entry for the current transaction’s _id in its pending transactions array. Therefore the amount that was added to the destination account balance must be removed, and the transaction must be removed from the pending transactions array. This can be accomplished with the following code:

POST accounts/_update/B   // "B" is from curr_trans._source.dest_acct
{
  "script": {
    "id": "undo_transaction_on_account",
    "params": {
      "trans_id": "txn1", // <curr_trans._id>
      "amount": 100      //  Undo <curr_trans._source.amount>
    }
  }
}

The response to the above command should normally have a result of “updated”. If it returns a result of “noop”, it means that another thread previously rolled-back the transaction on the destination account. In either case, continue to the next step as it is possible that the alternate thread updated the source account but failed before setting the transaction state to rolled-back.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state to be detected and fixed by recovery procedures.

Rollback step 4 – Set the transaction state to “rolled-back”

The rollback operation is completed by updating the transaction state to “rolled-back”.

POST transactions/_update/txn1  // "txn1" is <curr_trans._id>
{
  "script": {
    "id": "update_transaction_state",
    "params": {
      "current_state": "terminating",
      "desired_state": "rolled-back"
    }
  }
}

The response to the above command should return a result with the value “updated”. If the response result is “noop”, then the rollback for the current transaction may have already been completed by another thread. In this case, get the transaction to confirm that it is in the “rolled-back” transaction state.

If the request fails and continues to fail even after several retries, the current thread should exit as the transaction should remain in the “terminating” state, which will be detected and fixed by recovery procedures.

Rollback of “committed” transactions

If a transaction is in the “committed” state, then it should be allowed to complete (i.e. it should be allowed to move to a transaction state of “finished”). After completion, then another transaction can be executed to reverse the transaction.

If for some reason a transaction in the “committed” state cannot proceed to the “finished” state on its own, even after the steps in Recover transactions that are in the “committed” transaction state have been executed, then manual intervention may be required to understand and fix the root cause of the issue.

Recovering from errors

Recovery operations can be looped over to detect and recover from a failure in any of the stages in the Core two-phase commit operations. Recovery should only be initiated on a transaction that has not completed within a reasonable amount of time since its last update (as determined by the current time and its modification timestamp). Recovery should not be attempted on transactions that may still be in-flight or that may have unacknowledged modifications to their transaction state, as this could introduce race conditions.

A simple heuristic to ensure that recovery is not attempted on in-flight transactions is to do the following:

  1. Before recovering a transaction, wait until the time that has passed since its last modification is greater than the maximum processing time of any of the two-phase commit states. In other words, wait long enough to ensure that the transaction state and the source and destination accounts have either been updated or failed, including retries and timeouts.
  2. Ensure that the looping of the recovery operation does not happen faster than the maximum processing time of any of the two-phase commit states. In other words, do not start the next iteration of the recovery loop until after waiting long enough to ensure that modifications to the transaction state as well as to the source and destination accounts has completed or failed, including retries and timeouts.

Unacknowledged modifications (a.k.a. dirty reads) can be avoided by using the modification_time to compute how much time has passed since the last update to the transaction, and not attempting to recover a transaction until several minutes have passed since its most recent update. This will give elastic sufficient time to detect and fix any unacknowledged writes.

In the examples below, we wait 5 minutes from the most recent update to a transaction before we attempt recovery, which should meet the above requirements. We could also consider executing the recovery loop once every minute. Both of these values should be validated based on the timeouts and number of retries for a given implementation.

More sophisticated control over the timing of the launching of recovery procedures could be achieved by maintaining a mapping of transactions and their associated threads. Before launching a recovery procedure, the thread associated with each transaction could be checked to ensure that it is no longer alive. The details of such an implementation will be language dependent, and are beyond the scope of this article.

Get transactions that need to be recovered

Transactions that have not been modified in the past 5 minutes, may be “stuck”, and can be detected with the following query:

GET transactions/_search
{
 "size": 100,
 "query": {
   "bool": {
     "filter": [
       {
         "terms": {
           "transaction_state": [
             "created", 
             "pending",
             "terminating", 
             "committed"
             ]
         }
       },
       {
         "range": {
           "modification_time": {
             "lte": "now-5m/m"
           }
         }
       }

     ]
   }
 },
 "sort": [
   {
     "modification_time": {
       "order": "asc"
     }
   }
 ]
}

Recover transactions that are stuck in the “created” state

For each “created” transaction returned from the above query, create a new thread to resume executing it starting at Core step 1 – Update transaction state to “pending”.

Recover transactions that are in the “pending” state

For each “pending” transaction returned from the above query, create a new thread to resume executing it starting at Core step 2 – Apply the transaction to the source account.

Recover transactions that are in the “committed” state

For each “committed” transaction that is returned, create a new thread to resume executing it starting at Core step 5 – Remove the transaction from the source account.

Recover transactions that are in the “terminating” state

For each “pending” transaction that is returned, create a new thread to resume executing it starting at Rollback step 2 – Undo the transaction on the source account.

Recover very long running transactions

If any “created”, “pending”, or “committed” transactions are returned that have a modification time that is more than an hour ago, they may need additional investigations and should possibly be rolled back as described in the Rolling back transactions section.

If a “terminating” transaction is unable to move into the “rolled-back” transaction state, then it should be investigated and manual intervention may be required.

Conclusions

In some cases it may be necessary to modify multiple documents as a single cohesive unit. Elasticsearch does not have built-in support for multi-document transactions, therefore in this blog this blog post we have presented a two-phase commit protocol which can be used to emulate multi-document transactions.

Converting local time to ISO 8601 time in Elasticsearch

Now posted on Elastic’s website

Nov 7, 2019 – This article has now been published on Elastic’s website:  https://www.elastic.co/blog/converting-local-time-to-iso-8601-time-in-elasticsearch

Introduction

If a timestamp field is sent into Elasticsearch without any timezone information, then it will be assumed to be UTC time (Coordinated Universal Time). However, if the timestamp actually represents a local time, then assuming that it is UTC will likely cause problems when displaying the data in Kibana or other applications. In this blog I describe how to use an ingest pipeline to convert timestamps from a local timezone into universal timestamps that conform to the ISO 8601 date and time format.

Converting timestamps from local into universal time

If the timezone of the originating data is known, then it is possible to use an ingest processor to convert from the local time to ISO 8601 format. Below is an example ingest pipeline that uses the date processor to convert the field called ‘my_time’ from ‘Europe/Madrid’ time into ISO 8601 (UTC + offset) format.

PUT _ingest/pipeline/chage_local_time_to_iso
{
  "processors": [
    {
      "date" : {
        "field" : "my_time",
        "target_field": "my_time", 
        "formats" : ["dd/MM/yyyy HH:mm:ss"],
        "timezone" : "Europe/Madrid"
      }
    }
  ]
}

In order to test the above pipeline, we can execute the following code to simulate inserting a document:

POST _ingest/pipeline/chage_local_time_to_iso/_simulate
{
  "docs": [
    {
      "_source": {
        "my_time": "12/10/2019 21:31:12",
        "other_field": "whatever"
      }
    }
  ]
}

The above will respond with output that looks as follows, which we can see has the correct ISO 8601 offset for ‘Europe/Madrid’ for October 12th when daylight savings is in effect. As expected, the field ‘my_time’ shows an offset of ‘+02:00’.

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "my_time" : "2019-10-12T21:31:12.000+02:00",
          "other_field" : "whatever"
        },
        "_ingest" : {
          "timestamp" : "2019-10-16T19:28:20.999077Z"
        }
      }
    }
  ]
}

We can also verify that the above pipeline is respecting daylight savings (which ends on October 27th in Spain) by submitting a document with a date of October 30th as follows:

POST _ingest/pipeline/chage_local_time_to_iso/_simulate
{
  "docs": [
    {
      "_source": {
        "my_time": "30/10/2019 21:31:12",
        "other_field": "whatever"
      }
    }
  ]
}

Which responds with the following output, that has an offset of ‘+01:00’ as expected since daylight savings is no longer in effect:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "my_time" : "2019-10-30T21:31:12.000+01:00",
          "other_field" : "whatever"
        },
        "_ingest" : {
          "timestamp" : "2019-10-16T19:30:40.401948Z"
        }
      }
    }
  ]
}

Finally, we can insert a “real” document into Elasticsearch as follows:

PUT test_index/_doc/1?pipeline=chage_local_time_to_iso
{
  "my_time": "15/10/2019 01:11:55",
  "other_field": "whatever"
}

And we can retrieve the document as follows:

GET test_index/_doc/1

Which should respond with the following, that has the correct offset of ‘+02:00’ on Oct 15th for the ‘Europe/Madrid’ timezone:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_time" : "2019-10-15T01:11:55.000+02:00",
    "other_field" : "whatever"
  }
}

Conclusion

In this blog we have demonstrated how Elasticsearch ingest processors can be used to ingest data into Elasticsearch with unambiguous ISO 8601 timestamps.

How to mitigate hangovers

Introduction

Many natural products exist that are purported to be able to prevent hangovers, but unfortunately when one talks about natural hangover remedies it is difficult to find many well funded scientific studies that validate or refute such claims. This is at least in part because few if any companies are willing to invest millions of dollars investigating and running clinical trials to validate natural products that cannot then be patented to make a profit from.

In this article, I briefly discuss the main cause of hangovers, and will then discuss two natural supplements that can mitigate the negative impacts of drinking alcohol. This article refers to scientific publications wherever possible, but like many other articles on this topic it is also partially based on pseudo-science and personal experience.

What causes a hangover

In order to understand the cause of a hangover, it is necessary to first understand at a high-level the process how alcohol is broken down. As described in this report from the National Institute of Health, alcohol is broken down in two stages:

  1. Most of the ethanol (aka alcohol) is broken down in the liver, which transforms the ethanol into a toxic compound called acetaldehyde, a known carcinogen.
  2. Then, in a second step, acetaldehyde  (from step 1) is further metabolised by the liver down to another, less active byproduct called acetate. This acetate then is broken down into water and carbon dioxide for easy elimination.

As discussed in this article from Scientific American, most hangover symptoms are linked to elevated levels of acetaldehyde, and it specifically states the following:

That dreadful feeling the next day is the condition often called a hangover, which the journal Alcohol and Alcoholism characterizes as “general misery” with symptoms of drowsiness, concentration problems, dry mouth, dizziness, gastrointestinal complaints, sweating, nausea, hyperexcitability and anxiety. Most of these symptoms have been linked to elevated levels of acetaldehyde.

If acetaldehyde is the cause of hangovers, then the natural conclusion is that in order to reduce hangovers, one should try to reduce the amount of acetaldehyde in the body.  This can be achieved in the following ways:

  1. Drink less alcohol.
  2. Improve liver function to speed up the conversion of acetaldehyde into acetate.

For the remainder of this blog, we focus on option #2.

How to reduce acetaldehyde buildup caused by heavy drinking

Given that the breakdown of alcohol occurs primarily in the liver, if one can improve liver function then one could theoretically remove acetaldehyde from one’s system faster, and therefore mitigate the resultant hangover. It turns out that a naturally occurring product that improves liver function exists, and that it is readily and cheaply available. It is called milk thistle, and it has been used for thousands of years for improving liver function.

According to this article from the National Institute of Health,  the following has been shown about milk thistle:

  1. Evidence exists that milk thistle may be hepatoprotective (protects the liver) through a number of mechanisms: antioxidant activity, toxin blockade at the membrane level, enhanced protein synthesis, antifibriotic activity, and possible anti-inflammatory or immunomodulating effects.
  2. Among six studies of milk thistle and chronic alcoholic liver disease, four reported significant improvement in at least one measurement of liver function.

Given that milk thistle has been shown to improve liver function and to protect the liver, it stands to reason that if it is taken after drinking it should help metabolise acetaldehyde. Therefore, if one takes milk thistle after drinking and before going to bed, theoretically the amount of acetaldehyde in the body should be reduced while one sleeps, and the hangover should therefore be reduced.

How to reduce that lethargic feeling after drinking

Alcohol is known to reduce vitamin B concentrations as reported in this article. Additionally, low vitamin B levels can cause anaemia as documented in this article. Therefore, taking vitamin B supplements can reduce the feelings of lethargy if one’s vitamin B levels have been depleted from excess alcohol consumption.

Conclusions

Two cheap and readily available supplements that can reduce hangovers and the effects of excessive alcohol consumption are milk thistle, and vitamin B. If these supplements are taken before going to sleep after a night of drinking, they should help mitigate the hangover that would have otherwise been experienced. It is important to note that these supplements must be taken before going to bed – by morning it may be too late.

Financial implications of exercising share options

Disclaimer

I am not an accountant and this article should not be considered as financial or tax advice. I am providing analysis and calculations which may be used at your own peril. This article is written to demonstrate basic concepts, and does not account for country-specific tax laws or company-specific share option details. Your individual situation may invalidate some or all of the arguments and/or calculations made in this blog post.

Introduction

In many jurisdictions when share options are exercised, the income from such an exercise is taxed at the normal income tax rate. On the other hand, gains on shares (not options) are generally taxed at the more favourable capital gains rate. One therefore might assume that it is beneficial from a taxation perspective to convert company-issued share options into shares. In this blog we analyse the total profit generated from different strategies related to when and how share options are exercised, and will demonstrate that holding on to share options for as long as possible is likely a better strategy than converting options to shares.

The purpose of this blog is to demonstrate a thought process and methodology that can then be extended to account for country-specific tax rates, and country-specific tax benefits. It does not claim to give a universal answer on whether share options should be exercised to convert into shares.

What is a share option

A share option (aka stock option) is the right to buy  shares in a company at a fixed price. For example, if one has been issued 1000 share options with a strike price of $10, this means that at some point in the future one can buy up to 1000 shares for $10 each. If those shares are then trading at $100, then each share option would be worth $90, as determined by the market price at that time minus strike price. Another way of thinking about this is if someone can buy something for $10 that they can immediately sell for $100, then whatever it is that they are buying is worth $90. In our example, the 1000 share options would therefore have a value of $90,000.

Assumptions

In the remainder of this blog there are some simplifying assumptions made:

  1. The value of shares of the company that you have share options for will continue to increase.
  2. Tax is incurred when a share option is exercised.
  3. The rate of taxation on gains from a share exercise is higher than the rate of taxation on capital gains.

Country-specific tax considerations and/or benefits are out of scope of this blog.

What does it mean to exercise a share option

Exercising a share option refers to the act of paying the strike price to convert the share option into a share. Continuing with the previous example, we saw that 1000 options to buy shares for $10 would have a value of $90,00, assuming that the shares are currently worth $100 each. However, if we go ahead and actually exercise the shares, then we have triggered a taxable event of $90,000.

The $90,000 benefit from exercising the share options would generally considered income, and will normally be taxed at one’s standard income tax rate.  For example, if one is in the 40% income tax bracket then one would have to pay $36,000 in tax in order to be allowed to hold on to one’s shares. In order to convert these share options into shares, the total cost is the strike price of $10,000 plus the $36,000 of income tax. After paying $46,000 one would have shares worth $100,000. One is therefore $54,000 ahead compared to if one had not been granted the stock options.

In order to avoid having to pay cash out-of-pocket in order to exercise a share, one also has the option of a cashless exercise – this is where the money to pay the tax and the exercise price is paid by exercising and immediately selling a portion of one’s share options to cover the strike price and taxes. In the above example, one would pay $46,000  to cover the exercise price plus tax, which can be paid by selling $46,000/$100=460 shares. In this approach, one would be left with 540 shares worth $100 each. As expected, in this scenario one is also $54,000 ahead than if one had not been granted stock options.

Is it a good idea to exercise share options as soon as possible?

If one expects the share price to continue to rise, then one may be tempted to exercise all share options to convert into shares, to take advantage of the lower tax rate applied to capital gains. In this section, we consider why this is unlikely to be a good strategy.

Let’s imagine a scenario where the share price continues to rise to $1000, and we are still employed by the same company. Would we be farther ahead financially if we had exercised our shares at $100 and later sell the shares at $1000, or just held on to the original share options and finally exercise and sell at $1000? The table below shows the difference between the two scenarios. The original formulas can be seen in the first tab of this spreadsheet.

Assumed income tax rate 40%
<– Assumes that taxes are due on exercise (may be country specific)
Assumed capital gains tax rate 18%
<– Even if zero, in the calculations below, holding options until sale is preferable.
Assumed number of share options 1000
Cashless exercise and hold Hold and exercise at time of final sale
Per share exercise price 10 10
Per share value at exercise 100 1,000
Per share value at sale 1,000 1,000
Taxable income at exercise 90,000 990,000
Exercise cost (strike price * number shares) 10,000 10,000
Tax on exercise (income tax rate * taxable income) 36,000 396,000
Cost of cashless exercise (tax + exercise price) 46,000 406,000
Number of shares to sell to pay cost of exercise 460 406
Number of shares owned after exercise 540 594
Capital gain 486,000 0
Tax on capital gain 87,480 0
Pre-tax gain 576,000 990,000
Total tax per share (income tax + capital gain tax) 123,480 396,000
Total profit after tax 452,520 594,000

Notice that holding onto the share options for as long as possible has resulted in a greater profit than exercising the share options earlier and benefitting from a lower capital gains rate. In-fact, even if the capital gains rate is set to zero, in a scenario where the future value of the shares is $1000, it is still more profitable to hold on to share options rather than exercising to convert to shares.

This happens because in many countries, at the moment that one exercises shares, one incurs an immediate tax liability. Additionally, one also has to pay the strike price. In our example of a cashless exercise of 1000 shares at $100, the number of shares left after paying for exercise costs is only 540 – and therefore instead of enjoying growth on 1000 share options, one would instead only enjoy growth on 540 shares. Because of this reduction in the number of shares after a cashless exercise, the amount of growth is dramatically reduced versus the growth that would have been experienced if original options had been held.

On the other hand, the tax has indeed been dramatically reduced by performing a cashless exercise at $100 rather than waiting and exercising at $1000. However, the amount of tax savings does not compensate for the lost growth.

The above may lead one to conclude that they should therefore use cash to exercise their share options. We will investigate this in the next section.

Should share options be exercised with cash

If one has extra cash around, they may believe it would be a good idea to use cash to pay the cost of exercising their share options, rather than executing a cashless exercise and hold. This is likely true, and if the share price continues to rise then it would result in more profit than a cashless exercise and hold.

However, if one believes in their company enough to wish to invest cash, then they should consider if it is best to use that cash to exercise their existing share options to convert them to shares, or if it would instead be better to just buy additional shares on the open market with that cash. We therefore compare these two scenarios in the table below. Original calculations can be seen in the second tab of this spreadsheet.

Assumed income tax rate 40%
<– Assumes that taxes are due on exercise (may be country specific)
Assumed capital gains tax rate 18%
<– Even if zero, in the calculations below, using cash to buy more is better than using it to exercise
Assumed number of share options 1000
Pay cash to exercise Wait until sale to exercise, and instead buy more shares
Per share exercise price 10 10
Per share value at exercise 100 1000
Per share value at sale 1000 1000
Taxable income at exercise 90,000 990,000
Exercise cost (strike price * number shares) 10,000 10,000
Tax on exercise (income tax rate * taxable income) 36,000 396,000
Cost of cash exercise (tax + exercise price) 46,000 0
Cash paid (purchase more shares) 0 46,000
Sale value of additional shares (purchased in lieu of cash-exercise) 0 460,000
Capital gain 900,000 414,000
Tax on capital gain 162,000 74,520
Pre-tax gain 990,000 1,404,000
Total tax (income tax + capital gain tax) 198,000 470,520
Total profit after tax 792,000 933,480

Notice that the above calculations demonstrate that it is more beneficial to buy shares on the open market rather than using that same cash to exercise and hold shares. This is true even if the capital gains rate is zero. Again, this is due to the loss of future growth on any amount that has been paid in tax as well as any future growth on the cash that was used to pay the strike price.

Caveats

In pre-IPO companies where it is not possible to allocate cash to buying additional shares, it is likely beneficial from a tax perspective to early exercise such share options to convert them to shares. This is because the alternative of buying additional shares on the open market is not an option.

Share options generally expire if one leaves their employer. Therefore share options should be exercised before they expire and become worthless.

Share options likely expire a certain number of years after their grant, and should be exercised before they expire. The following article provides additional information related to exercising stock options: https://kellblog.com/2019/08/18/avoiding-the-ten-year-stock-option-trap/.

Conclusions

Based on the above calculations, if one wants to maintain their investment in their publicly listed company, and have the expectation that their company stock price will go up, then it would generally be financially beneficial to hold on to stock options as long as possible rather than exercising those options to convert them to stock. If one wishes to invest additional cash, then it is likely better allocated buying additional shares rather than to exercise options.

Disclaimer: this should not be considered as financial or tax advice, and your individual tax circumstances may differ. In the above calculations we disregard any country-specific tax laws that may increase the attractiveness of exercising share options.

 

Counting unique beats agents sending data into Elasticsearch

Introduction

When using Beats with Elasticsearch, it may be useful to keep track of how many unique agents are sending data into an Elasticsearch cluster, and how many documents each agent is submitting. Such information for example could be useful for detecting if beats agents are behaving as expected.

In this blog post, I first discuss how to efficiently specify a filter for documents corresponding to a particular time range, followed by several methods for detecting how many beats agents are sending documents to Elasticsearch within the specified time range.

How to filter for documents in a specific time range

This section describes how to efficiently filter for documents from a particular time range. In the following example, we filter for documents that were received yesterday:

GET filebeat-*/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt" : "now/d"
          }
        }
      }
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Notice that we use the filter context for the range query. Using the filter context is efficient for two reasons:

  1. Operations inside a filter context must answer a yes/no question – either documents fall into the time range or they do not. Because this is a yes/no question, a _score is not computed when filtering documents like this.
  2. The data inside a filter can be efficiently cached by the Node Query Cache, which “caches queries which are being used in a filter context”.

It is worth highlighting that if the parameters inside the filter are different on each query, then the results of the filter cannot be efficiently cached. This would be the case if the range that is being queried is continually changing. This may unintentionally occur if “now” is used inside a range query without any rounding.

In the above example we ensure that the filter can cache documents by using date math to round the range that we are searching in to the nearest day (as indicated by the “/d”). Compare this to the following which would give us all documents in the 24 hours prior to the current moment.

GET filebeat-*/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d",
            "lt" : "now"
          }
        }
      }
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Note that the above filter cannot be cached because “now” is changing at every millisecond.

A middle-ground may be to round to the nearest hour to allow the filter to be cached most of the time, except once per hour when the range is modified. Rounding to the nearest hour could be done as follows:

GET filebeat-*/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/h",
            "lt" : "now/h"
          }
        }
      }
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Now that we have covered how to efficiently query for a documents in a particular time range, we are ready to demonstrate how to count the number of unique beats agents that are submitting documents to Elasticsearch.

A basic query to get a count of unique agents

To get a count of unique beats agents we can use a cardinality aggregation as shown below.

POST filebeat-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt" : "now/d"          
          }
        }
      }
    }
  },
  "aggs" : {
      "unique_agent_id_count" : {
          "cardinality" : {
              "field" : "agent.id.keyword",
              "precision_threshold": 500 
          }
      }
  }
}

Note that we first filter documents by time (in this case documents from yesterday), and then execute the  cardinality aggregation on the filtered set of documents . Also notice that the size is set to 0 – this tells ES that we are not interested in seeing the actual documents that match the range query, we just want to see the results of the cardinality aggregation done across those documents.

Get an example document from each agent using field collapsing

The example below demonstrates how to use field collapsing to return the _source of a single document corresponding to each beats agent that submitted a document yesterday. Be aware that by default a search will only return 10 hits. In order to see all documents that match a given query the size should be increased, or if a large number of results are expected then pagination techniques should be used. In the example below we have set the size to 100, which will return up to 100 unique agents.

GET filebeat-*/_search
{
  "size" : 100,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt": "now/d"
          }
        }
      }
    }
  },
  "collapse": {
    "field": "agent.id.keyword"
  },
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}

Get an example document from each agent using a terms aggregation and top hits

We can use a terms aggregation and top hits aggregation to get each unique agent as well as a count of the number of documents submitted from each unique agent. Be aware that this code is likely less efficient than the above and may not be practical if a very large number of agents are reporting into Elasticsearch.

GET filebeat-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt" : "now/d"
          }
        }
      }
    }
  },
  "aggs": {
    "unique_agents": {
      "terms": {
        "field": "agent.id.keyword",
        "size": 500,
      },
      "aggs": {
        "get_a_single_doc_for_each_unique_id": {
          "top_hits": {
            "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ],
            "size": 1
          }
        }
      }
    }
  }
}

There are three “size” settings in the code above:

  1. We have set the size to 0 for the query results – this just means that we don’t return the documents that match the range query, as we are only interested in the results of the aggregation.
  2. A terms aggregation by default will only return the top 10 hits. In the example above we have increased the size of the terms aggregation to 500. Be careful, as setting this to a very large value to handle a very large number of agents may be slow. For a very large number of agents, terms aggregations may become infeasible.
  3. Inside the top hits aggregations, we have specified a size of 1, meaning that a single document will be returned for each term.

Conclusion

In this blog, we have demonstrated how to ensure the best performance when filtering for documents, followed by several methods for detecting how many unique beats agents are submitting documents into an Elasticsearch cluster.

Improving the performance of Logstash persistent queues

 

Now posted on the Elastic blog

November 14, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/using-parallel-logstash-pipelines-to-improve-persistent-queue-performance.

Introduction

By default, Logstash uses in-memory bounded queues between pipeline stages (inputs → pipeline workers) to buffer events. However, in order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which can be enabled to store the message queue on disk. The queue sits between the input and filter stages as follows:

input → queue → filter + output

According to the following blog post, Logstash persistent queues should have a small impact on overall throughput. While this is likely true for use cases where the pipeline is CPU bound, it is not always the case.

Motivation

In a recent Logstash implementation, enabling Logstash persistent queues caused a slowdown of about 75%, from about 40K events/s down to about 10K events/s. Somewhat surprisingly, based on disk I/O metrics it was clear that the disks were not saturated. Additionally, standard Logstash tuning techniques such as testing different batch sizes and adding more worker threads were unable to remedy this slowdown.

Why persistent queues may impact Logstash performance

Investigations showed that the reason that throughput was limited is because a single Logstash pipeline runs a single-threaded persistent queue, or to put it another way, a single Logstash pipeline only drives data to disk from a single thread. This is true even if that pipeline has multiple inputs, as additional inputs in a single pipeline do not increase Disk I/O threads. Furthermore, because enabling the persistent queue adds synchronous disk I/O (wait time) into the pipeline, it reduces throughput even if none of the resources on the system are maxed-out.

Solution

Given that Logstash throughput was limited by synchronous disk I/O rather than resource constraints, more threads running in parallel were needed to drive the disks harder and to increase the overall throughput. This was accomplished by running multiple identical pipelines in parallel within a single Logstash process, and then load balancing the input data stream across the pipelines. If data is driven into Logstash by filebeat, load balancing can be done by specifying multiple Logstash outputs in filebeat.

Result

After increasing the number of pipelines to 4 and splitting the input data across these 4 pipelines, Logstash performance with persistent queues increased up to about 30K events/s, or only 25% worse than without persistent queues. At this point the disks were saturated, and no further performance improvements were possible.

Feedback

As shown in the comments below, this approach has also helped other Logstash users with substantial performance gains. Did this solution help you? If so, please consider leaving a comment below!

 

 

Debugging Elasticsearch and Lucene with IntelliJ IDEA

Now posted on the Elastic blog

January 14, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/how-to-debug-elasticsearch-source-code-in-intellij-idea.

Introduction

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software. In this blog post, I discuss how to setup an IntelliJ IDEA project that will allow interactive debugging of Elasticsearch and Lucene source code.

The instructions presented in this blog have been tested on Mac OSX with IntelliJ IDEA 2018.3 (Community Edition), and OpenJDK 11.

Download Elasticsearch

Get a copy of the Elasticsearch source code from github as follows:

git clone https://github.com/elastic/elasticsearch.git

Checkout the branch for the Elasticsearch release that you want to debug.

cd elasticsearch
git checkout --track origin/6.6

Review text files included with the distribution

Within the “elasticsearch” directory, there are several text files that should be reviewed. In particular, “CONTRIBUTING.md” includes a description of the process for importing Elasticsearch code into an IntelliJ IDEA project, and “TESTING.asciidoc” describes ways to build and debug the code. The remainder of this blog post is based on the instructions in these files.

Configure the code for use with IntelliJ IDEA

The build system used by Elasticsearch is gradle, and at least Java 11 is required to build Elasticsearch gradle tools. Before executing gradlew, ensure that your JAVA_HOME environment variable is set correctly. For example my JAVA_HOME (on OSX) is set as follows:

JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.0.2.jdk/Contents/Home
export JAVA_HOME 

Finally, execute the following command to configure Elasticsearch for use in an IntelliJ IDEA project.

./gradlew idea

The above command may take a few minutes to execute, and once it is complete, your project is ready to be imported into IntelliJ IDEA.

Import Elasticsearch into an IntelliJ IDEA project

1. Open IntelliJ Idea, and if you don’t have any other projects open, you will see a screen that looks like the image below. Click on “Import project”

 

Screen Shot 2019-02-02 at 8.56.52 PM

2. Open the “elasticsearch” directory that was created by the previously executed “git clone” command.

Screen Shot 2019-02-02 at 9.22.19 PM

3. Select “Import project from external model” -> “Gradle”, and the click on “Next”

Screen Shot 2019-02-02 at 9.28.06 PM

4. Select “Use default gradle wrapper (recommended)” and set “Gradle JVM” to version 11, as shown below. Then click on “Finish”.

Screen Shot 2019-02-02 at 9.28.36 PM

5. After completing the above steps, IntelliJ IDEA will start building the source code. The IntelliJ IDEA window should look similar to the image below once the build has completed.

Screen Shot 2019-02-02 at 9.36.27 PM

Start Elasticsearch for debugging

One way to debug Elasticsearch is to start the project in debug mode from the command line with the following command:

./gradlew run --debug-jvm

It may take a few minutes for the above process to fully start, at which point you can connect to the process from IntelliJ IDEA  by clicking on “Run” -> “Attach to Process” as shown below:

Screen Shot 2019-02-02 at 9.49.47 PM.png

This will allow you to select the process to attach to, which should look similar to the following:

Screen Shot 2019-02-02 at 9.53.27 PM

You should now be able to set breakpoints and step through both Elasticsearch and Lucene code.

Conclusion

In this blog post, I have demonstrated how to setup a project in IntelliJ IDEA that will allow interactive debugging of Elasticsearch and Lucene source code. You are now ready to dig deep into the internal workings of Elasticsearch!