Using Logstash to drive filtered data from a single source into multiple output destinations

Overview

In this blog post we demonstrate how Logstash can be used to accomplish the following tasks:

  1. Create multiple copies of an input stream.
  2. Filter each unique copy of the input stream to only contain desired fields.
  3. Drive the modified copies of the input stream into different output destinations.

Note that in this blog post, we do not make use of pipeline-to-pipeline communication (beta) which could also likely achieve much of the functionality described here.

Example input file

As an input to Logstash, we use a CSV file that contains stock market trades. A few example CSV stock market trades are given below. 

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time”, “DAX”, “SMI”, “CAC”, “FTSE” . You may wish to copy and paste the above lines into a CSV file called stocks.csv in order to execute the example logstash pipeline. 

Example Logstash pipeline

Below is a logstash pipeline that should be stored in a file called ‘clones.conf’. This pipeline does the following:

  1. Reads stock market trades as CSV-formatted input from a CSV file. Note that you should modify ‘clones.conf’ to use the correct path to your ‘stocks.csv’ file.
  2. Maps each row of the CSV input to a JSON document, where the CSV columns map to JSON fields.
  3. Converts the time field to Unix format.
  4. Uses the clone filter plugin to create two copies of each document. The clone filter will add a new ‘type’ field to each copy that corresponds to the names given in the clones array. (Note that the original version of each document will still exist in addition to the copies, but will not have a ‘type’ field added to it).
  5. For each copy:
    1. Adds metadata to each document corresponding to the ‘type’ that was added by the clone function. This allows us to later remove the ‘type’ field, while retaining the information required for routing different copies to different outputs.
    2. Uses the prune filter plugin to remove all fields except those which are whitelisted for the specific output.
  6. Removes the ‘type’ field that the clone function inserted into the documents. This is not strictly necessary, but eliminates the ‘type’ data and prevents it from being written to Elasticsearch.
  7. Writes the resulting documents to different outputs, depending on the value defined in the metadata field that we added in step 5.
input {
  file {
    path => "${HOME}/stocks.csv"
    start_position => "beginning"

    # The following line will ensure re-reading of input 
    # each time logstash executes.
    sincedb_path => "/dev/null"
  }
}

filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 'DAX' => 'float'
    'SMI' => 'float'
    'CAC' => 'float'
    'FTSE' => 'float'}
  }
  date {
    match => ['time', 'UNIX']
  }

  # The following line will create 2 additional 
  # copies of each document (i.e. including the 
  # original, 3 in total). 
  # Each copy will have a "type" field added 
  # corresponding to the name given in the array.
  clone {
    clones => ['copy_only_SMI', 'copy_only_FTSE']
  }

  if [type] == 'copy_only_SMI' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_SMI" } 
    }
    # Remove everything except "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
  } 

  else if [type] == 'copy_only_FTSE' {
    mutate { 
      add_field => { "[@metadata][type]" => "copy_only_FTSE" } 
    }
    prune {
       whitelist_names => [ "FTSE"]
    }
  } 

  # Remove 'type' which was added in the clone
  mutate {
    remove_field => ['type']
  }
}

output {
  stdout { codec =>  "rubydebug" }

  if [@metadata][type] == 'copy_only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'copy_only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testing the logstash pipeline

To test this pipeline with the example CSV data, you could execute something similar to the following command, modifying it to ensure that you use paths that are correct for your system. Note that specifying ‘config.reload.automatic’ is optional, but allows us to automatically reload ‘clones.conf’ without restarting Logstash. Remember that ‘clones.conf’ that is used below is the file that contains the pipeline described in the previous section.

./logstash -f ./clones.conf --config.reload.automatic

Once logstash has read the stocks.csv file, we can check the various outputs that have been written. We have written three indexes called ‘smi_data’, ‘ftse_data’, and ‘stocks_original’.

Check the SMI index

GET /smi_data/_search

Should display documents with the following structure. Notice that only “SMI” data appears in the ‘smi_data’ index.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5
        }
      }

Check the FTSE index

GET /ftse_data/_search

Should display documents with the following structure. Notice that only “FTSE” field appears in documents in the ‘ftse_data’ index.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Check the original documents index

GET /stocks_originals/_search

Should display documents with the following structure. Notice that the entire original version of the documents appears in the ‘stocks_original’ index.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusion

In this blog post, we have demonstrated how to use Logstash to create multiple copies of an input stream, to then modify documents in each stream as required for different outputs, and to then drive the different streams into different outputs.

One thought on “Using Logstash to drive filtered data from a single source into multiple output destinations”

  1. A very useful example, and well explained. Allthow I found the prune function a bit too aggressive. I had a field called ‘type’ and ‘eventtype’ and it took both even if I just whitelisted ‘type’

    Like

Leave a comment