Counting unique beats agents sending data into Elasticsearch

Introduction

When using Beats with Elasticsearch, it may be useful to keep track of how many unique agents are sending data into an Elasticsearch cluster, and how many documents each agent is submitting. Such information for example could be useful for detecting if beats agents are behaving as expected.

In this blog post, I first discuss how to efficiently specify a filter for documents corresponding to a particular time range, followed by several methods for detecting how many beats agents are sending documents to Elasticsearch within the specified time range.

How to filter for documents in a specific time range

This section describes how to efficiently filter for documents from a particular time range. In the following example, we filter for documents that were received yesterday:

GET filebeat-*/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt" : "now/d"
          }
        }
      }
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Notice that we use the filter context for the range query. Using the filter context is efficient for two reasons:

  1. Operations inside a filter context must answer a yes/no question – either documents fall into the time range or they do not. Because this is a yes/no question, a _score is not computed when filtering documents like this.
  2. The data inside a filter can be efficiently cached by the Node Query Cache, which “caches queries which are being used in a filter context”.

It is worth highlighting that if the parameters inside the filter are different on each query, then the results of the filter cannot be efficiently cached. This would be the case if the range that is being queried is continually changing. This may unintentionally occur if “now” is used inside a range query without any rounding.

In the above example we ensure that the filter can cache documents by using date math to round the range that we are searching in to the nearest day (as indicated by the “/d”). Compare this to the following which would give us all documents in the 24 hours prior to the current moment.

GET filebeat-*/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d",
            "lt" : "now"
          }
        }
      }
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Note that the above filter cannot be cached because “now” is changing at every millisecond.

A middle-ground may be to round to the nearest hour to allow the filter to be cached most of the time, except once per hour when the range is modified. Rounding to the nearest hour could be done as follows:

GET filebeat-*/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/h",
            "lt" : "now/h"
          }
        }
      }
    }
  },
  "sort": [{"@timestamp": "desc"}]
}

Now that we have covered how to efficiently query for a documents in a particular time range, we are ready to demonstrate how to count the number of unique beats agents that are submitting documents to Elasticsearch.

A basic query to get a count of unique agents

To get a count of unique beats agents we can use a cardinality aggregation as shown below.

POST filebeat-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt" : "now/d"          
          }
        }
      }
    }
  },
  "aggs" : {
      "unique_agent_id_count" : {
          "cardinality" : {
              "field" : "agent.id.keyword",
              "precision_threshold": 500 
          }
      }
  }
}

Note that we first filter documents by time (in this case documents from yesterday), and then execute the  cardinality aggregation on the filtered set of documents . Also notice that the size is set to 0 – this tells ES that we are not interested in seeing the actual documents that match the range query, we just want to see the results of the cardinality aggregation done across those documents.

Get an example document from each agent using field collapsing

The example below demonstrates how to use field collapsing to return the _source of a single document corresponding to each beats agent that submitted a document yesterday. Be aware that by default a search will only return 10 hits. In order to see all documents that match a given query the size should be increased, or if a large number of results are expected then pagination techniques should be used. In the example below we have set the size to 100, which will return up to 100 unique agents.

GET filebeat-*/_search
{
  "size" : 100,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt": "now/d"
          }
        }
      }
    }
  },
  "collapse": {
    "field": "agent.id.keyword"
  },
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}

Get an example document from each agent using a terms aggregation and top hits

We can use a terms aggregation and top hits aggregation to get each unique agent as well as a count of the number of documents submitted from each unique agent. Be aware that this code is likely less efficient than the above and may not be practical if a very large number of agents are reporting into Elasticsearch.

GET filebeat-*/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt" : "now/d"
          }
        }
      }
    }
  },
  "aggs": {
    "unique_agents": {
      "terms": {
        "field": "agent.id.keyword",
        "size": 500,
      },
      "aggs": {
        "get_a_single_doc_for_each_unique_id": {
          "top_hits": {
            "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ],
            "size": 1
          }
        }
      }
    }
  }
}

There are three “size” settings in the code above:

  1. We have set the size to 0 for the query results – this just means that we don’t return the documents that match the range query, as we are only interested in the results of the aggregation.
  2. A terms aggregation by default will only return the top 10 hits. In the example above we have increased the size of the terms aggregation to 500. Be careful, as setting this to a very large value to handle a very large number of agents may be slow. For a very large number of agents, terms aggregations may become infeasible.
  3. Inside the top hits aggregations, we have specified a size of 1, meaning that a single document will be returned for each term.

Conclusion

In this blog, we have demonstrated how to ensure the best performance when filtering for documents, followed by several methods for detecting how many unique beats agents are submitting documents into an Elasticsearch cluster.

Debugging Elasticsearch and Lucene with IntelliJ IDEA

Now posted on the Elastic blog

January 14, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/how-to-debug-elasticsearch-source-code-in-intellij-idea.

Introduction

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software. In this blog post, I discuss how to setup an IntelliJ IDEA project that will allow interactive debugging of Elasticsearch and Lucene source code.

The instructions presented in this blog have been tested on Mac OSX with IntelliJ IDEA 2018.3 (Community Edition), and OpenJDK 11.

Download Elasticsearch

Get a copy of the Elasticsearch source code from github as follows:

git clone https://github.com/elastic/elasticsearch.git

Checkout the branch for the Elasticsearch release that you want to debug.

cd elasticsearch
git checkout --track origin/6.6

Review text files included with the distribution

Within the “elasticsearch” directory, there are several text files that should be reviewed. In particular, “CONTRIBUTING.md” includes a description of the process for importing Elasticsearch code into an IntelliJ IDEA project, and “TESTING.asciidoc” describes ways to build and debug the code. The remainder of this blog post is based on the instructions in these files.

Configure the code for use with IntelliJ IDEA

The build system used by Elasticsearch is gradle, and at least Java 11 is required to build Elasticsearch gradle tools. Before executing gradlew, ensure that your JAVA_HOME environment variable is set correctly. For example my JAVA_HOME (on OSX) is set as follows:

JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.0.2.jdk/Contents/Home
export JAVA_HOME 

Finally, execute the following command to configure Elasticsearch for use in an IntelliJ IDEA project.

./gradlew idea

The above command may take a few minutes to execute, and once it is complete, your project is ready to be imported into IntelliJ IDEA.

Import Elasticsearch into an IntelliJ IDEA project

1. Open IntelliJ Idea, and if you don’t have any other projects open, you will see a screen that looks like the image below. Click on “Import project”

 

Screen Shot 2019-02-02 at 8.56.52 PM

2. Open the “elasticsearch” directory that was created by the previously executed “git clone” command.

Screen Shot 2019-02-02 at 9.22.19 PM

3. Select “Import project from external model” -> “Gradle”, and the click on “Next”

Screen Shot 2019-02-02 at 9.28.06 PM

4. Select “Use default gradle wrapper (recommended)” and set “Gradle JVM” to version 11, as shown below. Then click on “Finish”.

Screen Shot 2019-02-02 at 9.28.36 PM

5. After completing the above steps, IntelliJ IDEA will start building the source code. The IntelliJ IDEA window should look similar to the image below once the build has completed.

Screen Shot 2019-02-02 at 9.36.27 PM

Start Elasticsearch for debugging

One way to debug Elasticsearch is to start the project in debug mode from the command line with the following command:

./gradlew run --debug-jvm

It may take a few minutes for the above process to fully start, at which point you can connect to the process from IntelliJ IDEA  by clicking on “Run” -> “Attach to Process” as shown below:

Screen Shot 2019-02-02 at 9.49.47 PM.png

This will allow you to select the process to attach to, which should look similar to the following:

Screen Shot 2019-02-02 at 9.53.27 PM

You should now be able to set breakpoints and step through both Elasticsearch and Lucene code.

Conclusion

In this blog post, I have demonstrated how to setup a project in IntelliJ IDEA that will allow interactive debugging of Elasticsearch and Lucene source code. You are now ready to dig deep into the internal workings of Elasticsearch!

 

A step-by-step guide to enabling security, TLS/SSL, and PKI authentication in Elasticsearch

Now posted on the Elastic blog

December 12, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/elasticsearch-security-configure-tls-ssl-pki-authentication

Introduction

When Elasticsearch security is enabled for a cluster that is running with a production license, the use of TLS/SSL for transport communications is obligatory and must be correctly setup. Additionally, once security has been enabled, all communications to an Elasticsearch cluster must be authenticated, including communications from Kibana and/or application servers.

The simplest way that Kibana and/or application servers can authenticate to an Elasticsearch cluster is by embedding a username and password in their configuration files or source code. However, in many organizations, it is forbidden to store usernames and passwords in such locations. In this case, one alternative is to use Public Key Infrastructure (PKI) (client certificates) for authenticating to an Elasticsearch cluster.

Configuring security along with TLS/SSL and PKI can seem daunting at first, and so this blog gives step-by-step instructions on how to: enable security; configure TLS/SSL; set passwords for built-in users; use PKI for authentication; and finally, how to authenticate Kibana to an Elasticsearch cluster using PKI.

Enabling security

In order to enable security it is necessary to have either a Gold or Platinum subscription, or a trial license enabled via Kibana or API. For example, the following command would enable a trial license via the API:

curl -X POST "localhost:9200/_xpack/license/start_trial?acknowledge=true"

Where localhost must be replaced with the name of a node in our Elasticsearch cluster.

After enabling a license, security can be enabled. We must modify the elasticsearch.yml file on each node in the cluster with the following line:

xpack.security.enabled: true

For a cluster that is running in production mode with a production license, once security is enabled, transport TLS/SSL must also be enabled. However, if we are running with a trial license, then transport TLS/SSL is not obligatory.

If we are running with a production license and we attempt to start the cluster with security enabled before we have enabled transport TLS/SSL, we will see the following error message:

Transport SSL must be enabled for setups with production licenses. Please set [xpack.security.transport.ssl.enabled] to [true] or disable security by setting [xpack.security.enabled] to [false]

Configuration of TLS/SSL is covered in the following sections.

TLS/SSL encryption

Elasticsearch has two levels of communications, transport communications and http communications. The transport protocol is used for internal communications between Elasticsearch nodes, and the http protocol is used for communications from clients to the Elasticsearch cluster. Securing these communications will be discussed in the following paragraphs.

Transport TLS/SSL encryption

The transport protocol is used for communication between nodes within an Elasticsearch cluster. Because each node in an Elasticsearch cluster is both a client and a server to other nodes in the cluster, all transport certificates must be both client and server certificates. If TLS/SSL certificates do not have Extended Key Usage defined, then they are already defacto client and server certificates. If transport certificates do have an Extended Key Usage section, which is usually the case for CA-signed certificates used in corporate environments, then they must explicitly enable both clientAuth and serverAuth.

Note that Elasticsearch comes with a utility called elasticsearch-certutil that can be used for generating self-signed certificates that can be used for encrypting internal communications within an Elasticsearch cluster.

The following commands can be used for generating certificates that can be used for transport communications, as described in this page on Encrypting Communications in Elasticsearch:

bin/elasticsearch-certutil ca
ENTER ENTER
bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
ENTER ENTER ENTER

Once the above commands have been executed, we will have TLS/ SSL certificates that can be used for encrypting communications.

The newly created certificates should be copied into a sub-directory called certs located within the config directory. The certificates will then be specified in the elasticsearch.yml file as follows:

xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

Now restart all of the nodes in our Elasticsearch cluster for the above changes to take effect.

Define built-in user’s passwords

We must now define passwords for the built-in users as described in Setting built-in user passwords. Note that if we are running with a Gold or Platinum license, the previous steps to enable TLS/SSL for the transport communications must be executed before the cluster will start. Additionally, note that setting built-in user’s passwords should be completed before we enable TLS/SSL for http communications, as the command for setting passwords will communicate with the cluster via unsecured http.

Built-in users passwords can be setup with the following command:

bin/elasticsearch-setup-passwords interactive

Be sure to remember the passwords that we have assigned for each of the built-in users. We will make use of the elastic superuser to help configure PKI authentication later in this blog.

Http TLS/SSL encryption

For http communications, the Elasticsearch nodes will only act as servers and therefore can use Server certificates —  i.e. http TLS/SSL certificates do not need to enable Client authentication.

In many cases, certificates for http communications would be signed by a corporate CA. It is worth noting that the certificates used for encrypting http communications can be totally independent from the certificates that are used for transport communications.

To reduce the number of steps in this blog, we’ll use the same certificates for http communications as we have already used for the transport communications. These are specified in elasticsearch.yml file as follows:

xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.truststore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.client_authentication: optional

Enabling PKI authentication

As discussed in Configuring a PKI Realm, the following must be added to the elasticsearch.yml file to allow PKI authentication.

xpack.security.authc.realms.pki1.type: pki

Combined changes to elasticsearch.yml

Once the above steps have been followed, we should have the following defined in our elasticsearch.yml configuration:

xpack.security.enabled: true

xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.truststore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.client_authentication: optional

xpack.security.authc.realms.pki1.type: pki

Note that once the above changes have been made to our elasticsearch.yml file, we will have to restart all of the Elasticsearch nodes in our cluster in order for the changes to take effect.

Creating a client certificate

Certificates that will be used for PKI authentication must be signed by the same CA as the certificates that are used for encrypting http communications. Normally, these would be signed by an official CA within an organization. However, because we have already used a self signed CA, we also sign our http client certificates with that same self-signed CA which we previously saved as elastic-stack-ca.p12. We can create a certificate for client authentication as follows:

bin/elasticsearch-certutil cert --ca \
config/certs/elastic-stack-ca.p12 \
-name "CN=something,OU=Consulting Team,DC=mydomain,DC=com"
ENTER
client.p12 ENTER
ENTER

The above will create a file called client.p12, which contains all of the information required for PKI authentication to our Elasticsearch cluster. However, in order to use this certificate it is helpful to break it into its private key, public certificate, and CA certificate. This can be done with the following commands:

Private Key

openssl pkcs12 -in client.p12 -nocerts -nodes  > client.key

Public Certificate

openssl pkcs12 -in client.p12 -clcerts -nokeys  > client.cer

CA Certificate

openssl pkcs12 -in client.p12 -cacerts -nokeys -chain > client-ca.cer

Which should produce three files,

  1. client.key —  The private key
  2. client.cer —  The public certificate
  3. client-ca.cer — The CA that signed the public certificate

Create a directory called certs in Kibana’s config directory, and move all of the client certificates there.

Configure Kibana to authenticate to elasticsearch

Now that we have enabled security on the Elasticsearch cluster, communications to the cluster must be authenticated. Therefore, if we plan on using Kibana to interact with the cluster, then we must enable security and configure Kibana to authenticate to the cluster as the kibana user over https. As we have not yet fully setup PKI authentication from Kibana to the Elasticsearch cluster, authentication can initially be done with the following lines in the kibana.yml file:

elasticsearch.url: "https://localhost:9200" #ensure https not http
xpack.security.enabled: true
elasticsearch.username: "kibana"
elasticsearch.password: "our new kibana password here"
elasticsearch.ssl.certificateAuthorities: config/certs/client-ca.cer
elasticsearch.ssl.verificationMode: certificate

Ensure that we change localhost to the name of one of our Elasticsearch nodes, and that the certificates are available in the config/certs directory within the Kibana folder.

Note that the kibana user is like a service account that works behind the scenes to authenticate the Kibana application to the Elasticsearch cluster. We will generally never directly login to the Elasticsearch cluster or into the Kibana UI as the kibana user.

Restart Kibana in order for it to authenticate to the Elasticsearch cluster as the kibana user. We should be able to now login through the Kibana UI as the elastic built-in superuser.

PKI Authentication

We can use the three new client certificate files to test PKI authentication to the cluster with curl. Open a new terminal and cd to Kibana’s config/certs directory, and use curl to call the authenticate API as shown below.

curl https://localhost:9200/_xpack/security/_authenticate?pretty \
--key client.key --cert client.cer --cacert client-ca.cer -k -v

Be sure to replace localhost with the name of a node in our Elasticsearch cluster and be sure to use https (not http). Also note that the -k option is required as we did not create certificates with the hostnames specified, and therefore hostname verification must be turned off.

The above command should respond with something similar to the following:

{
 "username" : "something",
 "roles" : [ ],
 "full_name" : null,
 "email" : null,
 "metadata" : {
   "pki_dn" : "CN=something, OU=Consulting Team, DC=mydomain, DC=com"
 },
 "enabled" : true
}

Notice that the roles is currently empty which means that although we have authenticated to Elasticsearch, we are not authorized to perform any actions. Authentication is allowed because the client certificate that we sent to the cluster was signed by the same CA as the http TLS/SSL certificates used by the Elasticsearch nodes. Now that we are authenticated, we need to authorize this user to be able to do something.

The pki_dn value returned from the authenticate API will be used to configure the roles that will be assigned to this certificate.

Open the Kibana UI and if we have not already done so, login as the elastic user. As the elastic user has superuser privileges, this user can assign roles to the certificate. Execute the following command from Dev Tools in Kibana, ensuring that the previously returned pki_dn value is copied into the dn field as follows:

PUT _xpack/security/role_mapping/kibana_certificate_authorization
{
 "roles" : [ "kibana_system" ],
 "rules" : { "field" : { "dn" : "CN=something, OU=Consulting Team, DC=mydomain, DC=com" } },
 "enabled": true
}

Now that we have assigned kibana_system role to this certificate, verify this is set correctly with another call to the authenticate API:

curl https://localhost:9200/_xpack/security/_authenticate?pretty \
--key client.key --cert client.cer --cacert client-ca.cer -k -v

And we should see the following response, which indicates that we now have the “kibana_system” role assigned to this certificate.

{
 "username" : "something",
 "roles" : [
   "kibana_system"

 ],
 "full_name" : null,
 "email" : null,
 "metadata" : {
   "pki_dn" : "CN=something, OU=Consulting Team, DC=mydomain, DC=com"
 },
 "enabled" : true
}

Using PKI to authenticate Kibana to the Elasticsearch cluster

Now that we have tested our client-side certificate and assigned the kibana_system role to the certificate, we can use this certificate instead of a username and password, to authenticate Kibana to Elasticsearch.

Remove the following lines from our kibana.yml file:

elasticsearch.username: "kibana"
elasticsearch.password: "XXXXXX"

Ensure that all relevant certificates are copied to Kibana’s config/certs directory, and add the following lines to our kibana.yml file:

elasticsearch.url: "https://localhost:9200" #ensure https
xpack.security.enabled: true
elasticsearch.ssl.certificate: config/certs/client.cer
elasticsearch.ssl.key: config/certs/client.key
elasticsearch.ssl.certificateAuthorities: config/certs/client-ca.cer
elasticsearch.ssl.verificationMode: certificate

We can now restart Kibana, and it should authenticate to our Elasticsearch cluster, without any need for an embedded username and password!

Conclusion

In this blog post, I have demonstrated how to: enable security; configure TLS/SSL; set passwords for built-in users; use PKI for authentication; and finally, how to authenticate Kibana to an Elasticsearch cluster using PKI.

If you have any questions about PKI authentication with Elasticsearch, or any other Elasticsearch-related topics, have a look at our Discuss forums for valuable discussion,  insights, and information.

Using Logstash to drive filtered data from a single source into multiple output destinations

Now posted on the Elastic blog

Jan 15, 2019 update: A newer version of this article has been published on Elastic’s website as: https://www.elastic.co/blog/using-logstash-to-split-data-and-send-it-to-multiple-outputs

Overview

In this blog post we demonstrate how Logstash can be used to accomplish the following tasks:

  1. Create multiple copies of an input stream.
  2. Filter each unique copy of the input stream to only contain desired fields.
  3. Drive the modified copies of the input stream into different output destinations.

Note that in this blog post, we do not make use of pipeline-to-pipeline communication (beta) which could also likely achieve much of the functionality described here.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example logstash pipeline. 

Example Logstash pipeline

Below is a logstash pipeline that should be stored in a file called ‘clones.conf’. This pipeline does the following:

  1. Reads stock market trades as CSV-formatted input from a CSV file. Note that you should modify ‘clones.conf’ to use the correct path to your ‘stocks.csv’ file.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Uses the clone filter plugin to create two copies of each document. The clone filter will add a new ‘type’ field to each copy that corresponds to the names given in the clones array. (Note that the original version of each document will still exist in addition to the copies, but will not have a ‘type’ field added to it).
  5. For each copy:
    1. Adds metadata to each document corresponding to the ‘type’ that was added by the clone function. This allows us to later remove the ‘type’ field, while retaining the information required for routing different copies to different outputs.
    2. Uses the prune filter plugin to remove all fields except those which are whitelisted for the specific output.
  6. Removes the ‘type’ field that the clone function inserted into the documents. This is not strictly necessary, but eliminates the ‘type’ data and prevents it from being written to Elasticsearch.
  7. Writes the resulting documents to different outputs, depending on the value defined in the metadata field that we added in step 5.
input {
  file {
    path => "${HOME}/stocks.csv"
    start_position => "beginning"

    # The following line will ensure re-reading of input 
    # each time logstash executes.
    sincedb_path => "/dev/null"
  }
}

filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 'DAX' => 'float'
    'SMI' => 'float'
    'CAC' => 'float'
    'FTSE' => 'float'}
  }
  date {
    match => ['time', 'UNIX']
  }

  # The following line will create 2 additional 
  # copies of each document (i.e. including the 
  # original, 3 in total). 
  # Each copy will have a "type" field added 
  # corresponding to the name given in the array.
  clone {
    clones => ['copy_only_SMI', 'copy_only_FTSE']
  }

  if [type] == 'copy_only_SMI' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_SMI" } 
    }
    # Remove everything except "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
  } 

  else if [type] == 'copy_only_FTSE' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_FTSE" } 
    }
    prune {
       whitelist_names => [ "FTSE"]
    }
  } 

  # Remove 'type' which was added in the clone
  mutate {
    remove_field => ['type']
  }
}

output {
  stdout { codec =>  "rubydebug" }

  if [@metadata][type] == 'copy_only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'copy_only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system. Note that specifying ‘config.reload.automatic’ is optional, but allows us to automatically reload ‘clones.conf’ without restarting Logstash. Remember that ‘clones.conf’ that is used below is the file that contains the pipeline described in the previous section.

./logstash -f ./clones.conf --config.reload.automatic

Once logstash has read the stocks.csv file, we can check the various outputs that have been written. We have written three indexes called ‘smi_data’, ‘ftse_data’, and ‘stocks_original’.

Check the SMI index

GET /smi_data/_search

Should display documents with the following structure. Notice that only “SMI” data appears in the ‘smi_data’ index.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5
        }
      }

Check the FTSE index

GET /ftse_data/_search

Should display documents with the following structure. Notice that only “FTSE” field appears in documents in the ‘ftse_data’ index.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Check the original documents index

GET /stocks_originals/_search

Should display documents with the following structure. Notice that the entire original version of the documents appears in the ‘stocks_original’ index.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusion

In this blog post, we have demonstrated how to use Logstash to create multiple copies of an input stream, to then modify documents in each stream as required for different outputs, and to then drive the different streams into different outputs.

Using Logstash prune capabilities to whitelist sub-documents

Overview

Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash, and that all other fields are dropped. In this blog post we demonstrate the use of Logstash to whitelist desired fields and desired sub-documents before indexing into Elasticsearch.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example command line given later in this blogpost. 

Example Logstash pipeline

Below is a Logstash pipeline which can be stored in a file called ‘stocks.conf’, that does the following:

  1. Reads stock market trades as CSV-formatted input from stdin.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Moves DAX and CAC fields into a nested structure called “my_nest”.
  5. Whitelists the “my_nest” field (which contains a sub-document) and the “SMI” field so that all other (non-whitelisted) fields will be removed.
  6. Writes the resulting documents to an Elasticsearch index called “stocks_whitelist_test”.
# For this simple example, pipe in data from stdin. 
input {
    stdin {}
}

filter {
    csv {
        columns => ["time","DAX","SMI","CAC","FTSE"]
        separator => ","
        convert => { 'DAX' => 'float'
        'SMI' => 'float'
        'CAC' => 'float'
        'FTSE' => 'float'}
    }
    date {
        match => ['time', 'UNIX']
    }
    mutate {
        # Move DAX and CAC into a sub-document 
        # called 'my_nest'
        rename => {
            "DAX" => "[my_nest][DAX]"
            "CAC" => "[my_nest][CAC]"
        }
    }
     
    # Remove everything except "SMI" and the 
    # "my_nest" sub-document 
    prune {
         whitelist_names => [ "SMI", "my_nest" ]
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_whitelist_test"
    }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system:

cat ./stocks.csv | ./logstash -f ./stocks.conf

You can the check the data that you have stored in Elasticsearch by executing the following comand from Kibana’s dev console:

GET /stocks_whitelist_test/_search

Which should display documents with the following structure:

      {
        "_index": "stocks_whitelist_test",
        "_type": "doc",
        "_id": "KANygWUBsYalOV9yOKsD",
        "_score": 1,
        "_source": {
          "my_nest": {
            "CAC": 1718,
            "DAX": 1606.51
          },
          "SMI": 1678.6
        }
      }

Notice that only “my_nest” and “SMI” have been indexed as indicated by the contents of the document’s “_source”. Also note that the “FTSE” and “time” fields have been removed as they were not in the prune filter’s whitelist.

Conclusion

In this blog post, we have demonstrated how Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash.

Deduplicating documents in Elasticsearch

Now posted on the Elastic blog

December 11, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/how-to-find-and-remove-duplicate-documents-in-elasticsearch

Overview

In this blog post we cover how to detect and remove duplicate documents from Elasticsearch by using either Logstash or alternatively by using custom code written in Python.

Example document structure

For the purposes of this blog post, we assume that the documents in the Elasticsearch cluster have the following structure. This corresponds to a dataset that contains documents representing stock market trades.

    {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Given this example document structure, for the purposes of this blog we arbitrarily assume that if multiple documents have the same values for the [“CAC”, “FTSE”, “SMI”] fields that they are duplicates of each other.

Using logstash for deduplicating Elasticsearch documents

Logstash may be used for detecting and removing duplicate documents from an Elasticsearch index. This process is described in this blogpost.

In the example below I have written a simple Logstash configuration that reads documents from an index on an Elasticsearch cluster, then uses the fingerprint filter to compute a unique _id value for each document based on a hash of the [“CAC”, “FTSE”, “SMI”] fields, and finally writes each document back to a new index on that same Elasticsearch cluster such that duplicate documents will be written the the same _id and therefore eliminated.

Additionally, with minor modifications, the same Logstash filter could also be applied to future documents written into the newly created index in order to ensure that duplicates are removed in near real-time. This could be accomplished by changing the input section in the example below to accept documents from your real-time input source rather than pulling documents from an existing index.

Be aware that using custom _id values (i.e. an _id that is not generated by Elasticsearch) will have some impact on the write performance of your index operations. 

Also, it is worth noting that depending on the hash algorithm used, this approach may theoretically result in a non-zero number of hash collisions for the _id value, which could theoretically result in two non-identical documents being mapped to the same _id. For most practical cases, the probability of a has collision is likely very low.

A simple Logstash configuration to de-duplicate an existing index using the fingerprint filter is given below.

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}

filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

A custom Python script for deduplicating Elasticsearch documents

A memory-efficient approach

If Logstash is not used, then deduplication may be efficiently accomplished with a custom python script. For this approach, we compute the hash of the [“CAC”, “FTSE”, “SMI”] fields that we have defined to uniquely identify a document. We then use this hash as a key in a python dictionary, where the associated value of each dictionary entry will be an array of the _ids of the documents that map to the same hash.

If more than one document has the same hash, then the duplicate documents that map to the same hash can be deleted.  Alternatively, if you are concerned about the possibility of hash collisions, then the contents of documents that map to the same hash can be examined to see if the documents are really identical, and if so, then duplicates can be eliminated.

Detection algorithm analysis

For a 50GB index, if we assume that the index contains 0.4 kB documents, then there would be 125 million documents in the index. In this case the amount memory required for storing the deduplication data structures in memory when using a 128-bit md5 hash would be on the order of 128 bits x 125 Million = 2GB of memory, plus the 160 bit _ids will require another 160 bits x 125 Million = 2.5 GB of of memory. This algorithm will therefore require on the order of 4.5GB RAM to keep all relevant data structures in memory. This memory footprint can be dramatically reduced if the approach discussed in the following section can be applied.

Potential enhancement to the algorithm to reduce memory usage, as well as to continuously remove new duplicate documents

If you are storing time-series data, and you know that duplicate documents will only occur within some small amount of time of each other, then you may be able to improve this algorithm’s memory footprint by repeatedly executing the algorithm on a subset of the documents in the index, with each subset corresponding to a different time window. For example, if you have a years worth of data then you could use range queries on your datetime field (inside a filter context  for best performance) to step through your data set one week at a time. This would require that the algorithm is executed 52 times (once for each week) — and in this case, this approach would reduce the worst-case memory footprint by a factor of 52.

In the above example, you may be concerned about not detecting duplicate documents that span between weeks. Lets assume that you know that duplicate documents cannot occur more than 2 hours apart. Then you would need to ensure that each execution of the algorithm includes documents that overlap by 2 hours with the last set of documents analyzed by the previous execution of the algorithm. For the weekly example, you would therefore need to query 170 hours (1 week + 2 hours) worth of time-series documents to ensure that no duplicates are missed.

If you wish to periodically clear out duplicate documents from your indices on an on-going basis, you can execute this algorithm on recently received documents. The same logic applies as above — ensure that recently received documents are included in the analysis along with enough of an overlap with slightly older documents to ensure that duplicates are not inadvertently missed.

Python code to detect duplicate documents

The following code demonstrates how documents can can be efficiently evaluated to see if they are identical, and then eliminated if desired. However, in order to prevent accidental deletion of documents, in this example we do not actually execute a delete operation. Such functionality would be straightforward to include.

The code to deduplicate documents from Elasticsearch can also be found on github.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}

# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]


# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])

        _id = item["_id"]

        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()

        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)


# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})

    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])

    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])

    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')

        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])

        # Update the scroll ID
        sid = data['_scroll_id']

        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])


def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hasval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In order to remove the possibility of hash collisions,
            # write code here to check all fields in the docs to
            # see if they are truly identical - if so, then execute a
            # DELETE operation on all except one.
            # In this example, we just print the docs.
            print("doc=%s\n" % doc)



def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()


main()