Using Logstash to drive filtered data from a single source into multiple output destinations

Overview

In this blog post we demonstrate how Logstash can be used to accomplish the following tasks:

  1. Create multiple copies of an input stream.
  2. Filter each unique copy of the input stream to only contain desired fields.
  3. Drive the modified copies of the input stream into different output destinations.

Note that in this blog post, we do not make use of pipeline-to-pipeline communication (beta) which could also likely achieve much of the functionality described here.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example logstash pipeline. 

Example Logstash pipeline

Below is a logstash pipeline that should be stored in a file called ‘clones.conf’. This pipeline does the following:

  1. Reads stock market trades as CSV-formatted input from a CSV file. Note that you should modify ‘clones.conf’ to use the correct path to your ‘stocks.csv’ file.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Uses the clone filter plugin to create two copies of each document. The clone filter will add a new ‘type’ field to each copy that corresponds to the names given in the clones array. (Note that the original version of each document will still exist in addition to the copies, but will not have a ‘type’ field added to it).
  5. For each copy:
    1. Adds metadata to each document corresponding to the ‘type’ that was added by the clone function. This allows us to later remove the ‘type’ field, while retaining the information required for routing different copies to different outputs.
    2. Uses the prune filter plugin to remove all fields except those which are whitelisted for the specific output.
  6. Removes the ‘type’ field that the clone function inserted into the documents. This is not strictly necessary, but eliminates the ‘type’ data and prevents it from being written to Elasticsearch.
  7. Writes the resulting documents to different outputs, depending on the value defined in the metadata field that we added in step 5.
input {
  file {
    path => "${HOME}/stocks.csv"
    start_position => "beginning"

    # The following line will ensure re-reading of input 
    # each time logstash executes.
    sincedb_path => "/dev/null"
  }
}

filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 'DAX' => 'float'
    'SMI' => 'float'
    'CAC' => 'float'
    'FTSE' => 'float'}
  }
  date {
    match => ['time', 'UNIX']
  }

  # The following line will create 2 additional 
  # copies of each document (i.e. including the 
  # original, 3 in total). 
  # Each copy will have a "type" field added 
  # corresponding to the name given in the array.
  clone {
    clones => ['copy_only_SMI', 'copy_only_FTSE']
  }

  if [type] == 'copy_only_SMI' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_SMI" } 
    }
    # Remove everything except "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
  } 

  else if [type] == 'copy_only_FTSE' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_FTSE" } 
    }
    prune {
       whitelist_names => [ "FTSE"]
    }
  } 

  # Remove 'type' which was added in the clone
  mutate {
    remove_field => ['type']
  }
}

output {
  stdout { codec =>  "rubydebug" }

  if [@metadata][type] == 'copy_only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'copy_only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system. Note that specifying ‘config.reload.automatic’ is optional, but allows us to automatically reload ‘clones.conf’ without restarting Logstash. Remember that ‘clones.conf’ that is used below is the file that contains the pipeline described in the previous section.

./logstash -f ./clones.conf --config.reload.automatic

Once logstash has read the stocks.csv file, we can check the various outputs that have been written. We have written three indexes called ‘smi_data’, ‘ftse_data’, and ‘stocks_original’.

Check the SMI index

GET /smi_data/_search

Should display documents with the following structure. Notice that only “SMI” data appears in the ‘smi_data’ index.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5
        }
      }

Check the FTSE index

GET /ftse_data/_search

Should display documents with the following structure. Notice that only “FTSE” field appears in documents in the ‘ftse_data’ index.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Check the original documents index

GET /stocks_originals/_search

Should display documents with the following structure. Notice that the entire original version of the documents appears in the ‘stocks_original’ index.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusion

In this blog post, we have demonstrated how to use Logstash to create multiple copies of an input stream, to then modify documents in each stream as required for different outputs, and to then drive the different streams into different outputs.

Using Logstash prune capabilities to whitelist sub-documents

Overview

Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash, and that all other fields are dropped. In this blog post we demonstrate the use of Logstash to whitelist desired fields and desired sub-documents before indexing into Elasticsearch.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example command line given later in this blogpost. 

Example Logstash pipeline

Below is a Logstash pipeline which can be stored in a file called ‘stocks.conf’, that does the following:

  1. Reads stock market trades as CSV-formatted input from stdin.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Moves DAX and CAC fields into a nested structure called “my_nest”.
  5. Whitelists the “my_nest” field (which contains a sub-document) and the “SMI” field so that all other (non-whitelisted) fields will be removed.
  6. Writes the resulting documents to an Elasticsearch index called “stocks_whitelist_test”.
# For this simple example, pipe in data from stdin. 
input {
    stdin {}
}

filter {
    csv {
        columns => ["time","DAX","SMI","CAC","FTSE"]
        separator => ","
        convert => { 'DAX' => 'float'
        'SMI' => 'float'
        'CAC' => 'float'
        'FTSE' => 'float'}
    }
    date {
        match => ['time', 'UNIX']
    }
    mutate {
        # Move DAX and CAC into a sub-document 
        # called 'my_nest'
        rename => {
            "DAX" => "[my_nest][DAX]"
            "CAC" => "[my_nest][CAC]"
        }
    }
     
    # Remove everything except "SMI" and the 
    # "my_nest" sub-document 
    prune {
         whitelist_names => [ "SMI", "my_nest" ]
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_whitelist_test"
    }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system:

cat ./stocks.csv | ./logstash -f ./stocks.conf

You can the check the data that you have stored in Elasticsearch by executing the following comand from Kibana’s dev console:

GET /stocks_whitelist_test/_search

Which should display documents with the following structure:

      {
        "_index": "stocks_whitelist_test",
        "_type": "doc",
        "_id": "KANygWUBsYalOV9yOKsD",
        "_score": 1,
        "_source": {
          "my_nest": {
            "CAC": 1718,
            "DAX": 1606.51
          },
          "SMI": 1678.6
        }
      }

Notice that only “my_nest” and “SMI” have been indexed as indicated by the contents of the document’s “_source”. Also note that the “FTSE” and “time” fields have been removed as they were not in the prune filter’s whitelist.

Conclusion

In this blog post, we have demonstrated how Logstash’s prune filter plugin can make use of whitelists to ensure that only specific desired fields are output from Logstash.

Deduplicating documents in Elasticsearch

Now posted on the Elastic blog

December 11, 2018 update: This article has been published on Elastic’s website as: https://www.elastic.co/blog/how-to-find-and-remove-duplicate-documents-in-elasticsearch

Overview

In this blog post we cover how to detect and remove duplicate documents from Elasticsearch by using either Logstash or alternatively by using custom code written in Python.

Example document structure

For the purposes of this blog post, we assume that the documents in the Elasticsearch cluster have the following structure. This corresponds to a dataset that contains documents representing stock market trades.

    {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Given this example document structure, for the purposes of this blog we arbitrarily assume that if multiple documents have the same values for the [“CAC”, “FTSE”, “SMI”] fields that they are duplicates of each other.

Using logstash for deduplicating Elasticsearch documents

Logstash may be used for detecting and removing duplicate documents from an Elasticsearch index. This process is described in this blogpost.

In the example below I have written a simple Logstash configuration that reads documents from an index on an Elasticsearch cluster, then uses the fingerprint filter to compute a unique _id value for each document based on a hash of the [“CAC”, “FTSE”, “SMI”] fields, and finally writes each document back to a new index on that same Elasticsearch cluster such that duplicate documents will be written the the same _id and therefore eliminated.

Additionally, with minor modifications, the same Logstash filter could also be applied to future documents written into the newly created index in order to ensure that duplicates are removed in near real-time. This could be accomplished by changing the input section in the example below to accept documents from your real-time input source rather than pulling documents from an existing index.

Be aware that using custom _id values (i.e. an _id that is not generated by Elasticsearch) will have some impact on the write performance of your index operations. 

Also, it is worth noting that depending on the hash algorithm used, this approach may theoretically result in a non-zero number of hash collisions for the _id value, which could theoretically result in two non-identical documents being mapped to the same _id. For most practical cases, the probability of a has collision is likely very low.

A simple Logstash configuration to de-duplicate an existing index using the fingerprint filter is given below.

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}

filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
    }
}

output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

A custom Python script for deduplicating Elasticsearch documents

A memory-efficient approach

If Logstash is not used, then deduplication may be efficiently accomplished with a custom python script. For this approach, we compute the hash of the [“CAC”, “FTSE”, “SMI”] fields that we have defined to uniquely identify a document. We then use this hash as a key in a python dictionary, where the associated value of each dictionary entry will be an array of the _ids of the documents that map to the same hash.

If more than one document has the same hash, then the duplicate documents that map to the same hash can be deleted.  Alternatively, if you are concerned about the possibility of hash collisions, then the contents of documents that map to the same hash can be examined to see if the documents are really identical, and if so, then duplicates can be eliminated.

Detection algorithm analysis

For a 50GB index, if we assume that the index contains 0.4 kB documents, then there would be 125 million documents in the index. In this case the amount memory required for storing the deduplication data structures in memory when using a 128-bit md5 hash would be on the order of 128 bits x 125 Million = 2GB of memory, plus the 160 bit _ids will require another 160 bits x 125 Million = 2.5 GB of of memory. This algorithm will therefore require on the order of 4.5GB RAM to keep all relevant data structures in memory. This memory footprint can be dramatically reduced if the approach discussed in the following section can be applied.

Potential enhancement to the algorithm to reduce memory usage, as well as to continuously remove new duplicate documents

If you are storing time-series data, and you know that duplicate documents will only occur within some small amount of time of each other, then you may be able to improve this algorithm’s memory footprint by repeatedly executing the algorithm on a subset of the documents in the index, with each subset corresponding to a different time window. For example, if you have a years worth of data then you could use range queries on your datetime field (inside a filter context  for best performance) to step through your data set one week at a time. This would require that the algorithm is executed 52 times (once for each week) — and in this case, this approach would reduce the worst-case memory footprint by a factor of 52.

In the above example, you may be concerned about not detecting duplicate documents that span between weeks. Lets assume that you know that duplicate documents cannot occur more than 2 hours apart. Then you would need to ensure that each execution of the algorithm includes documents that overlap by 2 hours with the last set of documents analyzed by the previous execution of the algorithm. For the weekly example, you would therefore need to query 170 hours (1 week + 2 hours) worth of time-series documents to ensure that no duplicates are missed.

If you wish to periodically clear out duplicate documents from your indices on an on-going basis, you can execute this algorithm on recently received documents. The same logic applies as above — ensure that recently received documents are included in the analysis along with enough of an overlap with slightly older documents to ensure that duplicates are not inadvertently missed.

Python code to detect duplicate documents

The following code demonstrates how documents can can be efficiently evaluated to see if they are identical, and then eliminated if desired. However, in order to prevent accidental deletion of documents, in this example we do not actually execute a delete operation. Such functionality would be straightforward to include.

The code to deduplicate documents from Elasticsearch can also be found on github.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}

# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]


# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])

        _id = item["_id"]

        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()

        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)


# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})

    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])

    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])

    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')

        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])

        # Update the scroll ID
        sid = data['_scroll_id']

        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])


def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hasval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In order to remove the possibility of hash collisions,
            # write code here to check all fields in the docs to
            # see if they are truly identical - if so, then execute a
            # DELETE operation on all except one.
            # In this example, we just print the docs.
            print("doc=%s\n" % doc)



def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()


main()