Storing ingest time and calculating ingest lag in Elasticsearch

Introduction

When viewing and analysing data with Elasticsearch, it is not uncommon to see visualisations and monitoring and alerting solutions that make use of timestamps that have been generated on remote/monitored systems. However, relying on remote-generated timestamps may be risky.

If there is a delay between the occurrence of a remote event and the event arriving to Elasticsearch, or if a malicious user changes the time on a remote system, then important events could fly under the radar. Therefore, when ingesting documents into Elasticsearch it is often helpful to store the ingest time of each document, as well as to monitor how long it takes for each event to arrive to the Elasticsearch cluster. A larger than normal lag may indicate that there is a problem with the ingest process or a problem with the time setting on a remote system.

In this blog I will show how to use an ingest node with the Set Processor to add an ingest timestamp to documents when they arrive at an Elasticsearch cluster. This timestamp can be used in visualisations, monitoring, and alerting.

Additionally, I will show how to use the Script Processor to calculate the ingest lag. This lag is the difference between the timestamp of when an event has occurred on a remote/monitored system versus the time that the corresponding document arrives at an Elasticsearch cluster. This can be used for ensuring that the ingest process is not causing excessive delay, and for verifying that remote timestamps are set correctly.

Adding an ingest timestamp and calculating ingest lag

Below I give an example of an ingest pipeline that adds an ingest timestamp called “ingest_time”. It also calculates the lag between the remote event timestamp and the time that the event arrives at Elasticsearch and stores this in a field called “lag_in_seconds”.

The “ingest_time” field serves two purposes: (1) it can and likely should be used as the time field in Kibana visualisations and for monitoring and alerting as discussed later in this blog, and (2) it is used in the lag calculation.

Note that we assume that each document arrives to the Elasticsearch cluster with a field called “event_timestamp” that corresponds to when each event occurred on the remote/monitored system. The name of your event timestamp field will likely be different for your data and should be modified accordingly.

We write our pipeline to Elasticsearch as follows:

PUT _ingest/pipeline/calculate_lag
{
  "description": "Add an ingest timestamp and calculate ingest lag",
  "processors": [
    {
      "set": {
        "field": "_source.ingest_time",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
            if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp")) {
              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
            }
        """
      }
    }
  ]
}

And we can then test the pipeline with the simulate API as follows:

POST _ingest/pipeline/calculate_lag/_simulate
{
  "docs": [
    {
      "_source": {
        "event_timestamp": "2019-11-07T20:39:00.000Z"
      }
    }
  ]
}

Which should respond with something similar to the following, which includes the “lag_in_seconds” and the “ingest_time” fields:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "lag_in_seconds" : 17950223,
          "ingest_time" : "2020-06-02T14:49:23.236Z",
          "event_timestamp" : "2019-11-07T20:39:00.000Z"
        },
        "_ingest" : {
          "timestamp" : "2020-06-02T14:49:23.236Z"
        }
      }
    }
  ]
}

Finally, we can write a real document to Elasticsearch with the pipeline as follows:

PUT test_index/_doc/1?pipeline=calculate_lag
{
  "event_timestamp": "2019-11-07T20:39:00.000Z",
  "other_field": "whatever"
}

And we can retrieve the document as follows

GET test_index/_doc/1

Which should respond with the following:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "lag_in_seconds" : 17950358,
    "ingest_time" : "2020-06-02T14:51:38.068Z",
    "event_timestamp" : "2019-11-07T20:39:00.000Z",
    "other_field" : "whatever"
  }
}

Specifying the pipeline in index settings

When using an ingest pipeline in a production deployment it may be preferable to apply the pipeline to the index settings, rather than specifying the pipeline in the PUT URL. This can be done by adding index.default_pipeline to the index settings as follows:

PUT test_index/_settings
{
  "index.default_pipeline": "calculate_lag"
}

Now any document that is sent into test_index will pass through the calculate_lag pipeline without the need for ?pipeline=calculate_lag in the URL. We can verify this is working with the following PUT command.

PUT test_index/_doc/2
{
  "event_timestamp": "2019-11-07T20:39:00.000Z",
  "other_field": "This is a new doc"
}

Execute the following command to see the document that we have just ingested:

GET test_index/_doc/2

Which should return an enriched document that looks like the following:

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "lag_in_seconds" : 17951461,
    "ingest_time" : "2020-06-02T15:10:01.670Z",
    "event_timestamp" : "2019-11-07T20:39:00.000Z",
    "other_field" : "This is a new doc"
  }
}

How to use ingest lag

Now that ingest lag has been calculated, it can be used to trigger alerts if the lag exceeds a certain threshold or it can be fed into a machine learning job to detect unexpected deviations in the ingest processing time. It could also be displayed in Kibana dashboards.

However, examples of how to use the lag are beyond the scope of this blog.

Using the ingest timestamp

When looking at visualizations in Kibana or watching for anomalies, we often consider events that occurred in the last day or the last week. However, if we depend on the remotely-generated event timestamp as opposed to the ingest timestamp, then any lag on the ingest process may cause those documents to never be viewed or monitored. For example, if an event occurred yesterday, but just arrived at the cluster today, it would not show up in the dashboard for today’s events because its timestamp is from yesterday. Furthermore, it would not have been available when we looked at dashboards yesterday because it was not yet stored in Elasticsearch.

On the other hand, if we visualize data and set up alerts using the ingest timestamp, we are guaranteed that we are actually considering the most recent events to arrive at elasticsearch regardless of when the events occurred. This will ensure that events are not missed if the ingest process gets temporarily backed up.

Another advantage of using the ingest timestamp is related to the fact that the event timestamp could be tampered with. For example, imagine that we are monitoring a remote system and a hacker sets its internal clock to some date in the 1980s. If we depend on the remote event timestamp, it is possible that we will miss all activity that a malicious user is performing on that system — unless we specifically look for events that are stored as having occurred in the 1980s. On the other hand, if we depend on the ingest timestamp, we are guaranteed that when we look at recent events, we will consider all of the events that have recently arrived to the Elasticsearch cluster regardless of the timestamp given to the event by a remote system.

Conclusion

In this blog I have shown how to use an ingest processor to store the ingest time and calculate the lag of the ingest process. Additionally, I outlined the advantages of using the ingest time for monitoring and in visualizations, as well as the risks of using remote event timestamps.