The Aggregate filter plugin is used to aggregate information across multiple events that are logically grouped together. It's particularly useful when you need to process events that are part of a larger transaction or when you want to combine data from related events that arrive at different times.
Syntax
aggregate {
task_id => "..."
code => "..."
map_action => "..."
end_of_task => bool
timeout => number
timeout_task_id_field => "..."
push_map_as_event_on_timeout => bool
push_previous_map_as_event => bool
timeout_code => "..."
inactivity_timeout => number
aggregate_maps_path => "..."
}
For detailed information, refer to the official Logstash Aggregate filter plugin documentation.
Example Use Case and Usage
Consider a scenario where you're processing web server logs and want to aggregate information for each user session:
filter {
aggregate {
task_id => "%{user_id}"
code => "
map['page_count'] ||= 0
map['page_count'] += 1
map['duration'] = event.get('@timestamp').to_i - map['start_time']
"
map_action => "create_or_update"
end_of_task => false
timeout => 1800
timeout_task_id_field => "user_id"
push_map_as_event_on_timeout => true
push_previous_map_as_event => true
}
}
This configuration aggregates the number of pages visited and calculates the session duration for each user.
Common Issues and Best Practices
- Memory usage: Be cautious with the number of concurrent aggregations to avoid excessive memory consumption.
- Task timeouts: Set appropriate timeout values to prevent stale data from persisting.
- Data persistence: Use
aggregate_maps_path
for data persistence across Logstash restarts. - Complex aggregations: For very complex aggregations, consider using external databases or caches.
Frequently Asked Questions
Q: How does the Aggregate filter handle out-of-order events?
A: The Aggregate filter processes events as they arrive. It doesn't inherently handle out-of-order events, so you may need to implement custom logic in your code
block to manage this scenario.
Q: Can I use the Aggregate filter with multiple pipelines?
A: Yes, but aggregations are pipeline-specific. If you need to share aggregations across pipelines, you'll need to use an external data store.
Q: How can I debug my Aggregate filter?
A: Use Logstash's debug logging and consider adding temporary output events in your code
block to trace the aggregation process.
Q: What happens to aggregated data when Logstash restarts?
A: By default, aggregated data is lost on restart. Use the aggregate_maps_path
option to persist data across restarts.
Q: Can the Aggregate filter work with data from different sources?
A: Yes, as long as you can define a common task_id
to correlate events from different sources within the same Logstash pipeline.