Logstash Aggregate Filter Plugin

The Aggregate filter plugin is used to aggregate information across multiple events that are logically grouped together. It's particularly useful when you need to process events that are part of a larger transaction or when you want to combine data from related events that arrive at different times.

Syntax

aggregate {
  task_id => "..."
  code => "..."
  map_action => "..."
  end_of_task => bool
  timeout => number
  timeout_task_id_field => "..."
  push_map_as_event_on_timeout => bool
  push_previous_map_as_event => bool
  timeout_code => "..."
  inactivity_timeout => number
  aggregate_maps_path => "..."
}

For detailed information, refer to the official Logstash Aggregate filter plugin documentation.

Example Use Case and Usage

Consider a scenario where you're processing web server logs and want to aggregate information for each user session:

filter {
  aggregate {
    task_id => "%{user_id}"
    code => "
      map['page_count'] ||= 0
      map['page_count'] += 1
      map['duration'] = event.get('@timestamp').to_i - map['start_time']
    "
    map_action => "create_or_update"
    end_of_task => false
    timeout => 1800
    timeout_task_id_field => "user_id"
    push_map_as_event_on_timeout => true
    push_previous_map_as_event => true
  }
}

This configuration aggregates the number of pages visited and calculates the session duration for each user.

Common Issues and Best Practices

  1. Memory usage: Be cautious with the number of concurrent aggregations to avoid excessive memory consumption.
  2. Task timeouts: Set appropriate timeout values to prevent stale data from persisting.
  3. Data persistence: Use aggregate_maps_path for data persistence across Logstash restarts.
  4. Complex aggregations: For very complex aggregations, consider using external databases or caches.

Frequently Asked Questions

Q: How does the Aggregate filter handle out-of-order events?
A: The Aggregate filter processes events as they arrive. It doesn't inherently handle out-of-order events, so you may need to implement custom logic in your code block to manage this scenario.

Q: Can I use the Aggregate filter with multiple pipelines?
A: Yes, but aggregations are pipeline-specific. If you need to share aggregations across pipelines, you'll need to use an external data store.

Q: How can I debug my Aggregate filter?
A: Use Logstash's debug logging and consider adding temporary output events in your code block to trace the aggregation process.

Q: What happens to aggregated data when Logstash restarts?
A: By default, aggregated data is lost on restart. Use the aggregate_maps_path option to persist data across restarts.

Q: Can the Aggregate filter work with data from different sources?
A: Yes, as long as you can define a common task_id to correlate events from different sources within the same Logstash pipeline.

Pulse - Elasticsearch Operations Done Right

Pulse can solve your Elasticsearch issues

Subscribe to the Pulse Newsletter

Get early access to new Pulse features, insightful blogs & exclusive events , webinars, and workshops.

We use cookies to provide an optimized user experience and understand our traffic. To learn more, read our use of cookies; otherwise, please choose 'Accept Cookies' to continue using our website.