Aggregate event data using Ingest Processor
Learn how to aggregate event data using Ingest Processor to optimize data flow and reduce log volume by processing partial aggregations in batches.
You can create a pipeline that aggregates your incoming event data to reduce the volume of raw logs being sent to your destination.
For example, when working with network flow record data, you can get the value sum of total bytes transferred from each source IP address. You can then send these aggregations as singular events to your data destination to be indexed.
Data is aggregated using the stats command in an SPL2 pipeline. See the stats command documentation in the SPL2 Search Reference manual for more information.
Differences between ingest-time aggregations and search-time aggregations
Due to the streaming data flow through Ingest Processor pipelines, the stats command performs partial aggregations in batches. These batches can be made up of single or multiple events, depending on the rate of your data flow through pipelines and data source configurations. If aggregable data is processed across multiple batches, each batch is processed independently. These batches of partial aggregations are then routed as events to your pipeline destination. If you want to retrieve a complete and finalized aggregation of your data, you must run a search using the stats command in Splunk platform once your data is indexed. See Processing aggregations at search-time for more information.
| Optional argument | Can be used in an Ingest Processor pipeline | Can be used in Splunk platform search |
|---|---|---|
| all-num | Yes | |
| batch_id | Yes | |
| batch_time | Yes | |
| by-clause | Yes | Yes |
| delim | Yes | |
| instance_id | Yes | |
| partitions | Yes | |
| span | Yes | Yes |
Optimizing aggregation efficiency
Using the stats command in an Ingest Processor pipeline creates partial aggregations output in batches, reducing the number of events sent to your indexers. These partial aggregations can be made more efficient for maximum event reduction. The efficiency of your aggregations depends on the batch size, with larger batch sizes being more efficient. Batch sizes can be configured depending on your source. See the table below for additional notes for each source protocol.
| Source protocol | Configuration information |
|---|---|
| HTTP Event Client (HEC) | Adjust batch size directly in the upstream HEC client. The maximum batch time span and batch size for an Ingest Processor pipeline aggregation is 4 seconds and 30MB, respectively. Any batch timeout or maximum size adjustments on an upstream HEC client beyond this limit will not take effect to improve aggregation efficiency. |
| S2S | No configurations available to adjust batch size for aggregation efficiency. |
Create a pipeline to aggregate your data
Managing aggregations
Aggregation patterns
Different types of aggregations can be created with the stats command in an Ingest Processor pipeline depending on how many fields you want your indexed events to retain.
Summary pattern: Choosing a limited number of fields to aggregate in your pipeline filters out unselected fields and outputs a distilled summarization of your data. See Create a pipeline to aggregate your data for an example of the summary pattern.
Passthrough pattern: If you want to retain all fields from your events and still reduce the overall number of events being indexed, you can add all desired fields in the BY <clause> section of your pipeline statement. See below for an example of the passthrough pattern in practice.
| server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|
| web-01 | 10.1.2.3 | /index.html | 500 | web |
| web-01 | 10.1.2.3 | /index.html | 500 | web |
| web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
... | stats sum(bytes_out) AS bytes_out BY server_name, server_ip, file_requested, sourcetype
| rename orig_sourcetype AS sourcetype
| eval _raw=json_delete(_raw, "orig_sourcetype"
| _raw | server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|---|
| {"server_name": "web-01", "server_ip": "10.1.2.3", "file_requested": "/index.html", "bytes_out": 1000} | web-01 | 10.1.2.3 | /index.html | 1000 | web |
| {"server_name": "web-02", "server_ip": "10.1.2.4", "file_requested": "/css/main.css", "bytes_out": 1200} | web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
Processing aggregations at search-time
Now that you have an applied pipeline aggregating your data in batches and sending these batches to your Splunk platform destination, you can finalize your batched aggregations by running a search in Splunk platform using the stats command. There are corresponding SPL1 queries for each statistical function run in an Ingest Processor pipeline to finalize your aggregations. See the following examples for these search statements.
| server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|
| web-01 | 10.1.2.3 | /index.html | 500 | web |
| web-01 | 10.1.2.3 | /images/logo.png | 8500 | web |
| web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
... | stats sum(bytes_out) as bytes_out BY server_name
| eval sourcetype="web:summary"
| server_name | bytes_out | _raw | sourcetype |
|---|---|---|---|
| web-01 | 9000 | {"server_name": "web_01", "bytes_out":9000} | web:summary |
| web-02 | 1200 | {"server_name": "web-02", "bytes_out":1200} | web:summary |
index=<myindex> sourcetype="web:summary" | stats sum(bytes_out) BY server_name
... | stats count() as event_count BY server_name
| eval sourcetype="web:summary"
| server_name | event_count | _raw | sourcetype |
|---|---|---|---|
| web-01 | 2 | {"server_name": "web-01", "count": 2} | web:summary |
| web-02 | 1 | {"server_name": "web-02", "count":1} | web:summary |
index=<myindex> sourcetype="web:summary" | stats sum(event_count) BY server_name
... | stats min(bytes_out) as min_bytes BY server_name
| eval sourcetype="web:summary"
index=<myindex> sourcetype="web:summary" | stats min(min_bytes_out) BY server_name
... | stats max(bytes_out) as max_bytes BY server_name
| eval sourcetype="web:summary"
| server_name | max_bytes_out | _raw | sourcetype |
|---|---|---|---|
| web-01 | 8500 | {"server_name": "web-01", "max_bytes_out":8500} | web:summary |
| web-02 | 1200 | {"server_name": "web-02", "max_bytes_out":1200} | web:summary |
index=<myindex> sourcetype="web:summary" | stats max(max_bytes_out) BY server_name
... | stats sum(bytes_out) as bytes_out, count(bytes_out) as event_count BY server_name
| eval sourcetype="web:summary"
| server_name | bytes_out | event_count | _raw | sourcetype |
|---|---|---|---|---|
| web-01 | 9000 | 2 | {"server_name": "web-01", "bytes_out":9000} | web:summary |
| web-02 | 1200 | 1 | {"server_name":"web-02", "bytes_out":1200} | web:summary |
index=<myindex> sourcetype="web:summary"
| stats sum(bytes_out) AS bytes_out, sum(event_count) AS event_count BY server_name
| eval bytes_avg=bytes_out/event_count
| _time | server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|---|
| 2025-01-01 12:00:05 | web-01 | 10.1.2.3 | /index.html | 500 | web |
| 2025-01-01 12:01:22 | web-01 | 10.1.2.3 | /images/logo.png | 8500 | web |
| 2025-01-01 12:00:28 | web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
… | stats sum(bytes_out) as bytes_out BY span(_time, 1m), server_name
| eval sourcetype="web:summary"
| _time | server_name | bytes_out | _raw | sourcetype |
|---|---|---|---|---|
| 2025-01-01 12:00:00 | web-01 | 500 | {"server_name": "web-01", "bytes_out": 500, "_time":1735732800} | web:summary |
| 2025-01-01 12:01:00 | web-01 | 8500 | {"server_name": "web-01", "bytes_out": 8500, "_time":1735732860} | web:summary |
| 2025-01-01 12:00:00 | web-02 | 1200 | {"server_name": "web-01", "bytes_out": 1200} | web:summary |
index=<myindex> sourcetype="web:summary" | stats sum(bytes_out) BY _time, server_name