Debugging Elasticsearch and Lucene with IntelliJ IDEA

Now posted on the Elastic blog

January 14, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/how-to-debug-elasticsearch-source-code-in-intellij-idea.

Introduction

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software. In this blog post, I discuss how to setup an IntelliJ IDEA project that will allow interactive debugging of Elasticsearch and Lucene source code.

The instructions presented in this blog have been tested on Mac OSX with IntelliJ IDEA 2018.3 (Community Edition), and OpenJDK 11.

Download Elasticsearch

Get a copy of the Elasticsearch source code from github as follows:

git clone https://github.com/elastic/elasticsearch.git

Checkout the branch for the Elasticsearch release that you want to debug.

cd elasticsearch
git checkout --track origin/6.6

Review text files included with the distribution

Within the “elasticsearch” directory, there are several text files that should be reviewed. In particular, “CONTRIBUTING.md” includes a description of the process for importing Elasticsearch code into an IntelliJ IDEA project, and “TESTING.asciidoc” describes ways to build and debug the code. The remainder of this blog post is based on the instructions in these files.

Configure the code for use with IntelliJ IDEA

The build system used by Elasticsearch is gradle, and at least Java 11 is required to build Elasticsearch gradle tools. Before executing gradlew, ensure that your JAVA_HOME environment variable is set correctly. For example my JAVA_HOME (on OSX) is set as follows:

JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.0.2.jdk/Contents/Home
export JAVA_HOME 

Finally, execute the following command to configure Elasticsearch for use in an IntelliJ IDEA project.

./gradlew idea

The above command may take a few minutes to execute, and once it is complete, your project is ready to be imported into IntelliJ IDEA.

Import Elasticsearch into an IntelliJ IDEA project

1. Open IntelliJ Idea, and if you don’t have any other projects open, you will see a screen that looks like the image below. Click on “Import project”

 

Screen Shot 2019-02-02 at 8.56.52 PM

2. Open the “elasticsearch” directory that was created by the previously executed “git clone” command.

Screen Shot 2019-02-02 at 9.22.19 PM

3. Select “Import project from external model” -> “Gradle”, and the click on “Next”

Screen Shot 2019-02-02 at 9.28.06 PM

4. Select “Use default gradle wrapper (recommended)” and set “Gradle JVM” to version 11, as shown below. Then click on “Finish”.

Screen Shot 2019-02-02 at 9.28.36 PM

5. After completing the above steps, IntelliJ IDEA will start building the source code. The IntelliJ IDEA window should look similar to the image below once the build has completed.

Screen Shot 2019-02-02 at 9.36.27 PM

Start Elasticsearch for debugging

One way to debug Elasticsearch is to start the project in debug mode from the command line with the following command:

./gradlew run --debug-jvm

It may take a few minutes for the above process to fully start, at which point you can connect to the process from IntelliJ IDEA  by clicking on “Run” -> “Attach to Process” as shown below:

Screen Shot 2019-02-02 at 9.49.47 PM.png

This will allow you to select the process to attach to, which should look similar to the following:

Screen Shot 2019-02-02 at 9.53.27 PM

You should now be able to set breakpoints and step through both Elasticsearch and Lucene code.

Conclusion

In this blog post, I have demonstrated how to setup a project in IntelliJ IDEA that will allow interactive debugging of Elasticsearch and Lucene source code. You are now ready to dig deep into the internal workings of Elasticsearch!

 

How cleanup HTML exported from Google Docs for WordPress

Introduction

When writing a blog post it is sometimes convenient to do the first draft in Google Docs, and then export the contents of the post to be used in WordPress. However, Google Docs adds a lot of  formatting to the documents which may override the default look and feel of your WordPress page.

In this blog post we show how to remove this extra formatting from the HTML created by Google Docs, by using a text editor that supports regex find-and-replace functionality.

We have tested the following instructions with Sublime.

Process

In the Google Doc select the contents that you wish to paste into your WordPress document, paste into in the “Visual” editor tab  in WordPress, then click on the HTML tab to see the HTML. Now select the HTML text and copy it into Sublime or another editor that supports regular expression find-and-replace functionality.

In Sublime, click on Find->Replace. This will bring up a box at the bottom of the screen. In this box ensure that the regular expression and wrap buttons are selected. The following will remove all of the <span> tags that have been added by google docs, and will leave just the contents that are wrapped inside the <span> tags.

Find: (?s)<span.*?>(.*?)</span>
Replace: $1

After executing the above on the entire document with “Replace All”, select the contents from the sublime document, copy it into the WordPress HTML tab, and then switch back to the Visual editor view. Your document should now look like the rest of your WordPress documents.

Conclusion

Many text editors support built-in regular expression find-and-replace functionality. We have demonstrated how this functionality can be used to help clean-up HTML created by Google Docs.

A step-by-step guide to enabling security, TLS/SSL, and PKI authentication in Elasticsearch

Now posted on the Elastic blog

December 12, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/elasticsearch-security-configure-tls-ssl-pki-authentication

Introduction

When Elasticsearch security is enabled for a cluster that is running with a production license, the use of TLS/SSL for transport communications is obligatory and must be correctly setup. Additionally, once security has been enabled, all communications to an Elasticsearch cluster must be authenticated, including communications from Kibana and/or application servers.

The simplest way that Kibana and/or application servers can authenticate to an Elasticsearch cluster is by embedding a username and password in their configuration files or source code. However, in many organizations, it is forbidden to store usernames and passwords in such locations. In this case, one alternative is to use Public Key Infrastructure (PKI) (client certificates) for authenticating to an Elasticsearch cluster.

Configuring security along with TLS/SSL and PKI can seem daunting at first, and so this blog gives step-by-step instructions on how to: enable security; configure TLS/SSL; set passwords for built-in users; use PKI for authentication; and finally, how to authenticate Kibana to an Elasticsearch cluster using PKI.

Enabling security

In order to enable security it is necessary to have either a Gold or Platinum subscription, or a trial license enabled via Kibana or API. For example, the following command would enable a trial license via the API:

curl -X POST "localhost:9200/_xpack/license/start_trial?acknowledge=true"

Where localhost must be replaced with the name of a node in our Elasticsearch cluster.

After enabling a license, security can be enabled. We must modify the elasticsearch.yml file on each node in the cluster with the following line:

xpack.security.enabled: true

For a cluster that is running in production mode with a production license, once security is enabled, transport TLS/SSL must also be enabled. However, if we are running with a trial license, then transport TLS/SSL is not obligatory.

If we are running with a production license and we attempt to start the cluster with security enabled before we have enabled transport TLS/SSL, we will see the following error message:

Transport SSL must be enabled for setups with production licenses. Please set [xpack.security.transport.ssl.enabled] to [true] or disable security by setting [xpack.security.enabled] to [false]

Configuration of TLS/SSL is covered in the following sections.

TLS/SSL encryption

Elasticsearch has two levels of communications, transport communications and http communications. The transport protocol is used for internal communications between Elasticsearch nodes, and the http protocol is used for communications from clients to the Elasticsearch cluster. Securing these communications will be discussed in the following paragraphs.

Transport TLS/SSL encryption

The transport protocol is used for communication between nodes within an Elasticsearch cluster. Because each node in an Elasticsearch cluster is both a client and a server to other nodes in the cluster, all transport certificates must be both client and server certificates. If TLS/SSL certificates do not have Extended Key Usage defined, then they are already defacto client and server certificates. If transport certificates do have an Extended Key Usage section, which is usually the case for CA-signed certificates used in corporate environments, then they must explicitly enable both clientAuth and serverAuth.

Note that Elasticsearch comes with a utility called elasticsearch-certutil that can be used for generating self-signed certificates that can be used for encrypting internal communications within an Elasticsearch cluster.

The following commands can be used for generating certificates that can be used for transport communications, as described in this page on Encrypting Communications in Elasticsearch:

bin/elasticsearch-certutil ca
ENTER ENTER
bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
ENTER ENTER ENTER

Once the above commands have been executed, we will have TLS/ SSL certificates that can be used for encrypting communications.

The newly created certificates should be copied into a sub-directory called certs located within the config directory. The certificates will then be specified in the elasticsearch.yml file as follows:

xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

Now restart all of the nodes in our Elasticsearch cluster for the above changes to take effect.

Define built-in user’s passwords

We must now define passwords for the built-in users as described in Setting built-in user passwords. Note that if we are running with a Gold or Platinum license, the previous steps to enable TLS/SSL for the transport communications must be executed before the cluster will start. Additionally, note that setting built-in user’s passwords should be completed before we enable TLS/SSL for http communications, as the command for setting passwords will communicate with the cluster via unsecured http.

Built-in users passwords can be setup with the following command:

bin/elasticsearch-setup-passwords interactive

Be sure to remember the passwords that we have assigned for each of the built-in users. We will make use of the elastic superuser to help configure PKI authentication later in this blog.

Http TLS/SSL encryption

For http communications, the Elasticsearch nodes will only act as servers and therefore can use Server certificates —  i.e. http TLS/SSL certificates do not need to enable Client authentication.

In many cases, certificates for http communications would be signed by a corporate CA. It is worth noting that the certificates used for encrypting http communications can be totally independent from the certificates that are used for transport communications.

To reduce the number of steps in this blog, we’ll use the same certificates for http communications as we have already used for the transport communications. These are specified in elasticsearch.yml file as follows:

xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.truststore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.client_authentication: optional

Enabling PKI authentication

As discussed in Configuring a PKI Realm, the following must be added to the elasticsearch.yml file to allow PKI authentication.

xpack.security.authc.realms.pki1.type: pki

Combined changes to elasticsearch.yml

Once the above steps have been followed, we should have the following defined in our elasticsearch.yml configuration:

xpack.security.enabled: true

xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.truststore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.client_authentication: optional

xpack.security.authc.realms.pki1.type: pki

Note that once the above changes have been made to our elasticsearch.yml file, we will have to restart all of the Elasticsearch nodes in our cluster in order for the changes to take effect.

Creating a client certificate

Certificates that will be used for PKI authentication must be signed by the same CA as the certificates that are used for encrypting http communications. Normally, these would be signed by an official CA within an organization. However, because we have already used a self signed CA, we also sign our http client certificates with that same self-signed CA which we previously saved as elastic-stack-ca.p12. We can create a certificate for client authentication as follows:

bin/elasticsearch-certutil cert --ca \
config/certs/elastic-stack-ca.p12 \
-name "CN=something,OU=Consulting Team,DC=mydomain,DC=com"
ENTER
client.p12 ENTER
ENTER

The above will create a file called client.p12, which contains all of the information required for PKI authentication to our Elasticsearch cluster. However, in order to use this certificate it is helpful to break it into its private key, public certificate, and CA certificate. This can be done with the following commands:

Private Key

openssl pkcs12 -in client.p12 -nocerts -nodes | \
sed -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' > client.key

Public Certificate

openssl pkcs12 -in client.p12 -clcerts -nokeys | \
sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > client.cer

CA Certificate

openssl pkcs12 -in client.p12 -cacerts -nokeys -chain | \
sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > client-ca.cer

Which should produce three files,

  1. client.key —  The private key
  2. client.cer —  The public certificate
  3. client-ca.cer — The CA that signed the public certificate

Create a directory called certs in Kibana’s config directory, and move all of the client certificates there.

Configure Kibana to authenticate to elasticsearch

Now that we have enabled security on the Elasticsearch cluster, communications to the cluster must be authenticated. Therefore, if we plan on using Kibana to interact with the cluster, then we must enable security and configure Kibana to authenticate to the cluster as the kibana user over https. As we have not yet fully setup PKI authentication from Kibana to the Elasticsearch cluster, authentication can initially be done with the following lines in the kibana.yml file:

elasticsearch.url: "https://localhost:9200" #ensure https not http
xpack.security.enabled: true
elasticsearch.username: "kibana"
elasticsearch.password: "our new kibana password here"
elasticsearch.ssl.certificateAuthorities: config/certs/client-ca.cer
elasticsearch.ssl.verificationMode: certificate

Ensure that we change localhost to the name of one of our Elasticsearch nodes, and that the certificates are available in the config/certs directory within the Kibana folder.

Note that the kibana user is like a service account that works behind the scenes to authenticate the Kibana application to the Elasticsearch cluster. We will generally never directly login to the Elasticsearch cluster or into the Kibana UI as the kibana user.

Restart Kibana in order for it to authenticate to the Elasticsearch cluster as the kibana user. We should be able to now login through the Kibana UI as the elastic built-in superuser.

PKI Authentication

We can use the three new client certificate files to test PKI authentication to the cluster with curl. Open a new terminal and cd to Kibana’s config/certs directory, and use curl to call the authenticate API as shown below.

curl https://localhost:9200/_xpack/security/_authenticate?pretty \
--key client.key --cert client.cer --cacert client-ca.cer -k -v

Be sure to replace localhost with the name of a node in our Elasticsearch cluster and be sure to use https (not http). Also note that the -k option is required as we did not create certificates with the hostnames specified, and therefore hostname verification must be turned off.

The above command should respond with something similar to the following:

{
 "username" : "something",
 "roles" : [ ],
 "full_name" : null,
 "email" : null,
 "metadata" : {
   "pki_dn" : "CN=something, OU=Consulting Team, DC=mydomain, DC=com"
 },
 "enabled" : true
}

Notice that the roles is currently empty which means that although we have authenticated to Elasticsearch, we are not authorized to perform any actions. Authentication is allowed because the client certificate that we sent to the cluster was signed by the same CA as the http TLS/SSL certificates used by the Elasticsearch nodes. Now that we are authenticated, we need to authorize this user to be able to do something.

The pki_dn value returned from the authenticate API will be used to configure the roles that will be assigned to this certificate.

Open the Kibana UI and if we have not already done so, login as the elastic user. As the elastic user has superuser privileges, this user can assign roles to the certificate. Execute the following command from Dev Tools in Kibana, ensuring that the previously returned pki_dn value is copied into the dn field as follows:

PUT _xpack/security/role_mapping/kibana_certificate_authorization
{
 "roles" : [ "kibana_system" ],
 "rules" : { "field" : { "dn" : "CN=something, OU=Consulting Team, DC=mydomain, DC=com" } },
 "enabled": true
}

Now that we have assigned kibana_system role to this certificate, verify this is set correctly with another call to the authenticate API:

curl https://localhost:9200/_xpack/security/_authenticate?pretty \
--key client.key --cert client.cer --cacert client-ca.cer -k -v

And we should see the following response, which indicates that we now have the “kibana_system” role assigned to this certificate.

{
 "username" : "something",
 "roles" : [
   "kibana_system"

 ],
 "full_name" : null,
 "email" : null,
 "metadata" : {
   "pki_dn" : "CN=something, OU=Consulting Team, DC=mydomain, DC=com"
 },
 "enabled" : true
}

Using PKI to authenticate Kibana to the Elasticsearch cluster

Now that we have tested our client-side certificate and assigned the kibana_system role to the certificate, we can use this certificate instead of a username and password, to authenticate Kibana to Elasticsearch.

Remove the following lines from our kibana.yml file:

elasticsearch.username: "kibana"
elasticsearch.password: "XXXXXX"

Ensure that all relevant certificates are copied to Kibana’s config/certs directory, and add the following lines to our kibana.yml file:

elasticsearch.url: "https://localhost:9200" #ensure https
xpack.security.enabled: true
elasticsearch.ssl.certificate: config/certs/client.cer
elasticsearch.ssl.key: config/certs/client.key
elasticsearch.ssl.certificateAuthorities: config/certs/client-ca.cer
elasticsearch.ssl.verificationMode: certificate

We can now restart Kibana, and it should authenticate to our Elasticsearch cluster, without any need for an embedded username and password!

Conclusion

In this blog post, I have demonstrated how to: enable security; configure TLS/SSL; set passwords for built-in users; use PKI for authentication; and finally, how to authenticate Kibana to an Elasticsearch cluster using PKI.

If you have any questions about PKI authentication with Elasticsearch, or any other Elasticsearch-related topics, have a look at our Discuss forums for valuable discussion,  insights, and information.

Using Logstash to drive filtered data from a single source into multiple output destinations

Now posted on the Elastic blog

Jan 15, 2019 update: A newer version of this article has been published on Elastic’s website as: https://www.elastic.co/blog/using-logstash-to-split-data-and-send-it-to-multiple-outputs

Overview

In this blog post we demonstrate how Logstash can be used to accomplish the following tasks:

  1. Create multiple copies of an input stream.
  2. Filter each unique copy of the input stream to only contain desired fields.
  3. Drive the modified copies of the input stream into different output destinations.

Note that in this blog post, we do not make use of pipeline-to-pipeline communication (beta) which could also likely achieve much of the functionality described here.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example logstash pipeline. 

Example Logstash pipeline

Below is a logstash pipeline that should be stored in a file called ‘clones.conf’. This pipeline does the following:

  1. Reads stock market trades as CSV-formatted input from a CSV file. Note that you should modify ‘clones.conf’ to use the correct path to your ‘stocks.csv’ file.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Uses the clone filter plugin to create two copies of each document. The clone filter will add a new ‘type’ field to each copy that corresponds to the names given in the clones array. (Note that the original version of each document will still exist in addition to the copies, but will not have a ‘type’ field added to it).
  5. For each copy:
    1. Adds metadata to each document corresponding to the ‘type’ that was added by the clone function. This allows us to later remove the ‘type’ field, while retaining the information required for routing different copies to different outputs.
    2. Uses the prune filter plugin to remove all fields except those which are whitelisted for the specific output.
  6. Removes the ‘type’ field that the clone function inserted into the documents. This is not strictly necessary, but eliminates the ‘type’ data and prevents it from being written to Elasticsearch.
  7. Writes the resulting documents to different outputs, depending on the value defined in the metadata field that we added in step 5.
input {
  file {
    path => "${HOME}/stocks.csv"
    start_position => "beginning"

    # The following line will ensure re-reading of input 
    # each time logstash executes.
    sincedb_path => "/dev/null"
  }
}

filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 'DAX' => 'float'
    'SMI' => 'float'
    'CAC' => 'float'
    'FTSE' => 'float'}
  }
  date {
    match => ['time', 'UNIX']
  }

  # The following line will create 2 additional 
  # copies of each document (i.e. including the 
  # original, 3 in total). 
  # Each copy will have a "type" field added 
  # corresponding to the name given in the array.
  clone {
    clones => ['copy_only_SMI', 'copy_only_FTSE']
  }

  if [type] == 'copy_only_SMI' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_SMI" } 
    }
    # Remove everything except "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
  } 

  else if [type] == 'copy_only_FTSE' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_FTSE" } 
    }
    prune {
       whitelist_names => [ "FTSE"]
    }
  } 

  # Remove 'type' which was added in the clone
  mutate {
    remove_field => ['type']
  }
}

output {
  stdout { codec =>  "rubydebug" }

  if [@metadata][type] == 'copy_only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'copy_only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system. Note that specifying ‘config.reload.automatic’ is optional, but allows us to automatically reload ‘clones.conf’ without restarting Logstash. Remember that ‘clones.conf’ that is used below is the file that contains the pipeline described in the previous section.

./logstash -f ./clones.conf --config.reload.automatic

Once logstash has read the stocks.csv file, we can check the various outputs that have been written. We have written three indexes called ‘smi_data’, ‘ftse_data’, and ‘stocks_original’.

Check the SMI index

GET /smi_data/_search

Should display documents with the following structure. Notice that only “SMI” data appears in the ‘smi_data’ index.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5
        }
      }

Check the FTSE index

GET /ftse_data/_search

Should display documents with the following structure. Notice that only “FTSE” field appears in documents in the ‘ftse_data’ index.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Check the original documents index

GET /stocks_originals/_search

Should display documents with the following structure. Notice that the entire original version of the documents appears in the ‘stocks_original’ index.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusion

In this blog post, we have demonstrated how to use Logstash to create multiple copies of an input stream, to then modify documents in each stream as required for different outputs, and to then drive the different streams into different outputs.

Using Logstash prune capabilities to whitelist sub-documents

Overview

Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash, and that all other fields are dropped. In this blog post we demonstrate the use of Logstash to whitelist desired fields and desired sub-documents before indexing into Elasticsearch.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example command line given later in this blogpost. 

Example Logstash pipeline

Below is a Logstash pipeline which can be stored in a file called ‘stocks.conf’, that does the following:

  1. Reads stock market trades as CSV-formatted input from stdin.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Moves DAX and CAC fields into a nested structure called “my_nest”.
  5. Whitelists the “my_nest” field (which contains a sub-document) and the “SMI” field so that all other (non-whitelisted) fields will be removed.
  6. Writes the resulting documents to an Elasticsearch index called “stocks_whitelist_test”.
# For this simple example, pipe in data from stdin. 
input {
    stdin {}
}

filter {
    csv {
        columns => ["time","DAX","SMI","CAC","FTSE"]
        separator => ","
        convert => { 'DAX' => 'float'
        'SMI' => 'float'
        'CAC' => 'float'
        'FTSE' => 'float'}
    }
    date {
        match => ['time', 'UNIX']
    }
    mutate {
        # Move DAX and CAC into a sub-document 
        # called 'my_nest'
        rename => {
            "DAX" => "[my_nest][DAX]"
            "CAC" => "[my_nest][CAC]"
        }
    }
     
    # Remove everything except "SMI" and the 
    # "my_nest" sub-document 
    prune {
         whitelist_names => [ "SMI", "my_nest" ]
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_whitelist_test"
    }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system:

cat ./stocks.csv | ./logstash -f ./stocks.conf

You can the check the data that you have stored in Elasticsearch by executing the following comand from Kibana’s dev console:

GET /stocks_whitelist_test/_search

Which should display documents with the following structure:

      {
        "_index": "stocks_whitelist_test",
        "_type": "doc",
        "_id": "KANygWUBsYalOV9yOKsD",
        "_score": 1,
        "_source": {
          "my_nest": {
            "CAC": 1718,
            "DAX": 1606.51
          },
          "SMI": 1678.6
        }
      }

Notice that only “my_nest” and “SMI” have been indexed as indicated by the contents of the document’s “_source”. Also note that the “FTSE” and “time” fields have been removed as they were not in the prune filter’s whitelist.

Conclusion

In this blog post, we have demonstrated how Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash.

Deduplicating documents in Elasticsearch

Now posted on the Elastic blog

December 11, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/how-to-find-and-remove-duplicate-documents-in-elasticsearch

Overview

In this blog post we cover how to detect and remove duplicate documents from Elasticsearch by using either Logstash or alternatively by using custom code written in Python.

Example document structure

For the purposes of this blog post, we assume that the documents in the Elasticsearch cluster have the following structure. This corresponds to a dataset that contains documents representing stock market trades.

    {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Given this example document structure, for the purposes of this blog we arbitrarily assume that if multiple documents have the same values for the [“CAC”, “FTSE”, “SMI”] fields that they are duplicates of each other.

Using logstash for deduplicating Elasticsearch documents

Logstash may be used for detecting and removing duplicate documents from an Elasticsearch index. This process is described in this blogpost.

In the example below I have written a simple Logstash configuration that reads documents from an index on an Elasticsearch cluster, then uses the fingerprint filter to compute a unique _id value for each document based on a hash of the [“CAC”, “FTSE”, “SMI”] fields, and finally writes each document back to a new index on that same Elasticsearch cluster such that duplicate documents will be written the the same _id and therefore eliminated.

Additionally, with minor modifications, the same Logstash filter could also be applied to future documents written into the newly created index in order to ensure that duplicates are removed in near real-time. This could be accomplished by changing the input section in the example below to accept documents from your real-time input source rather than pulling documents from an existing index.

Be aware that using custom _id values (i.e. an _id that is not generated by Elasticsearch) will have some impact on the write performance of your index operations. 

Also, it is worth noting that depending on the hash algorithm used, this approach may theoretically result in a non-zero number of hash collisions for the _id value, which could theoretically result in two non-identical documents being mapped to the same _id. For most practical cases, the probability of a has collision is likely very low.

A simple Logstash configuration to de-duplicate an existing index using the fingerprint filter is given below.

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}

filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

A custom Python script for deduplicating Elasticsearch documents

A memory-efficient approach

If Logstash is not used, then deduplication may be efficiently accomplished with a custom python script. For this approach, we compute the hash of the [“CAC”, “FTSE”, “SMI”] fields that we have defined to uniquely identify a document. We then use this hash as a key in a python dictionary, where the associated value of each dictionary entry will be an array of the _ids of the documents that map to the same hash.

If more than one document has the same hash, then the duplicate documents that map to the same hash can be deleted.  Alternatively, if you are concerned about the possibility of hash collisions, then the contents of documents that map to the same hash can be examined to see if the documents are really identical, and if so, then duplicates can be eliminated.

Detection algorithm analysis

For a 50GB index, if we assume that the index contains 0.4 kB documents, then there would be 125 million documents in the index. In this case the amount memory required for storing the deduplication data structures in memory when using a 128-bit md5 hash would be on the order of 128 bits x 125 Million = 2GB of memory, plus the 160 bit _ids will require another 160 bits x 125 Million = 2.5 GB of of memory. This algorithm will therefore require on the order of 4.5GB RAM to keep all relevant data structures in memory. This memory footprint can be dramatically reduced if the approach discussed in the following section can be applied.

Potential enhancement to the algorithm to reduce memory usage, as well as to continuously remove new duplicate documents

If you are storing time-series data, and you know that duplicate documents will only occur within some small amount of time of each other, then you may be able to improve this algorithm’s memory footprint by repeatedly executing the algorithm on a subset of the documents in the index, with each subset corresponding to a different time window. For example, if you have a years worth of data then you could use range queries on your datetime field (inside a filter context  for best performance) to step through your data set one week at a time. This would require that the algorithm is executed 52 times (once for each week) — and in this case, this approach would reduce the worst-case memory footprint by a factor of 52.

In the above example, you may be concerned about not detecting duplicate documents that span between weeks. Lets assume that you know that duplicate documents cannot occur more than 2 hours apart. Then you would need to ensure that each execution of the algorithm includes documents that overlap by 2 hours with the last set of documents analyzed by the previous execution of the algorithm. For the weekly example, you would therefore need to query 170 hours (1 week + 2 hours) worth of time-series documents to ensure that no duplicates are missed.

If you wish to periodically clear out duplicate documents from your indices on an on-going basis, you can execute this algorithm on recently received documents. The same logic applies as above — ensure that recently received documents are included in the analysis along with enough of an overlap with slightly older documents to ensure that duplicates are not inadvertently missed.

Python code to detect duplicate documents

The following code demonstrates how documents can can be efficiently evaluated to see if they are identical, and then eliminated if desired. However, in order to prevent accidental deletion of documents, in this example we do not actually execute a delete operation. Such functionality would be straightforward to include.

The code to deduplicate documents from Elasticsearch can also be found on github.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}

# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]


# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])

        _id = item["_id"]

        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()

        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)


# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})

    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])

    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])

    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')

        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])

        # Update the scroll ID
        sid = data['_scroll_id']

        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])


def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hasval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In order to remove the possibility of hash collisions,
            # write code here to check all fields in the docs to
            # see if they are truly identical - if so, then execute a
            # DELETE operation on all except one.
            # In this example, we just print the docs.
            print("doc=%s\n" % doc)



def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()


main()

 

Alexander Marquardt – Patents

Method and system for estimating the reliability of blacklists of botnet-infected computers

Versatile logic element and logic array block

  • US patent 7,218,133
  • Issued May 15, 2007 
  • See patent.

Routing Architecture For A Programmable Logic Device

  • US patent 6,970,014 
  • Issued Nov 29, 2005
  • See patent.