Implement data handling logic using SPL2 data types
Use SPL2 data types to implement data handling logic that lets you selectively process and route different subsets of data.
Data typing allows you to differentiate between the various kinds of data available in your system and identify data subsets of interest. For example, if you define a sales_log data type for logs that pertain to sales transactions, you can then distinguish those sales logs from all of the other logs being sent to the same Splunk Cloud Platform deployment by checking whether a given log matches the sales_log data type.
When ingesting data using an Edge Processor or Ingest Processor pipeline, you can use data typing to identify specific subsets of data in the overall stream of incoming data, and then implement handling logic to process and route those subsets of data in different ways. Defining this handling logic based on data types allows you to identify and select data based on its schema instead of its exact literal value.
To implement data handling logic, you can follow this high-level pattern:
-
For each subset of data that you want to process differently, choose a data type that uniquely matches the data. If the built-in data types in SPL2 do not meet your needs, then define custom data types that do. For more information, see Built-in data types and Custom data types.
-
For each subset of data, create a separate processing path in your SPL2 statement using the
branchcommand. For more information, see the branch command chapter in the SPL2 Search Reference. -
Configure each processing path to filter for a specific subset of data and then transform and route the data as needed. To filter the data, use the IS operator in a
wherecommand to test if the incoming data matches one of the chosen data types, and only allow data that returns TRUE to continue down the processing path. For more information, see IS operator in the current manual and the where command chapter in the SPL2 Search Reference.
The following example featuring the fictitious Buttercup Games company demonstrates how to implement data handling logic using data types. In this example, we will use an Ingest Processor pipeline. Similar logic is also applicable to Edge Processor pipelines, but be aware that the logs_to_metrics function featured in this example is not supported by Edge Processors at this time.
For information about creating pipelines, see the following:
-
The Working with pipelines chapter in the Use Ingest Processors manual
-
The Working with pipelines chapter in the Use Edge Processors manual
Example: Selectively process and route Buttercup Games website data
The Buttercup Games company hosts a website that provides information about their products and allows customers to make purchases online. The website generates a variety of logs that are passed through the Ingest Processor in Splunk Cloud Platform before they are indexed for storage and downstream operations. The logs generated by the website include the following kinds:
-
Network traffic logs, which look like this:
175.44.24.82 - - [11/Jan/2023:03:07:00] "ingress from 10.1.0.1" - 3786 363 209.160.24.63 - - [11/Jan/2023:03:07:50] "ingress from 10.100.5.16" - 2980 352 112.111.162.4 - - [11/Jan/2023:03:08:10] "egress to 10.1.0.200" - 2014 355 -
Audit logs, which look like this:
Wed Jan 11 2023 00:15:06 auth_serv1 sshd[60445]: pam_unix(sshd:session): session opened for user mdubios by (uid=0) Wed Jan 11 2023 00:15:06 auth_serv1 sshd[3759]: Failed password for djohnson from 194.8.74.23 port 3769 ssh2 Wed Jan 11 2023 00:15:08 auth_serv1 sshd[5276]: Failed password for invalid user appserver from 194.8.74.23 port 3351
Each type of log is used by different teams in the company for different purposes, and each team has unique requirements for how the logs need to be formatted and stored:
-
The Performance team wants the network traffic logs formatted as metric data points and stored in an index named
traffic_metrics, so that they can create dashboards to monitor fluctuations in website performance over time. The team also wants an unprocessed copy of the network traffic logs to be sent to an Amazon S3 bucket for cold storage. -
The Security team wants the audit logs to be formatted into events and then stored in an index named
web_auditthat only individuals with elevated permissions can access.
As a data administrator working at Buttercup Games, you are tasked with configuring an Ingest Processor pipeline that processes and routes the website logs as requested by each team.
To implement the required data handling logic, you do the following:
Define custom data types that describe the website log formats
Start by defining these 2 custom data types:
-
The
traffictype, which describes the format of the network traffic logs. -
The
audittype, which describes the format of the audit logs.
You can then check the incoming data against these types to distinguish between network traffic logs, audit logs, and other logs from the Buttercup Games website.
The following SPL2 expressions define the traffic data type. The regular expression that describes the network traffic log format is returned by a custom function named traffic_regex instead of being included literally in the type definition, so that the regular expression can be easily reused in other parts of the module.
type traffic = string WHERE match($value, traffic_regex());
function traffic_regex(): regex {
return /(?P<src_ip>(((?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)(?:\.(?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)){3}))(?:)?)\s\-\s\-\s\[(?P<timestamp>(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])[.\/-](?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y|i)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)[.\/-](?:\d\d){1,2}:(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))\]\s\"(?P<type>(ingress|egress)).*\s(?P<dest_ip>(((?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)(?:\.(?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)){3}))(?:)?)\"\s\-\s(?P<bytes>\d+)\s(?P<interval>\d+)/;
}
The following SPL2 expressions define the audit data type. The regular expression that describes the format of the audit logs is returned by a custom function named audit_regex.
type audit = string WHERE match($value, audit_regex());
function audit_regex(): regex {
return /(?P<timestamp>(?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)\s(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y|i)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\s(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])\s(?:\d\d){1,2}\s(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))\s(?P<host>\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b))\ssshd\[(?P<sshd>[^\]]*)\]\:\s(?P<msg>.*)/;
}
Create a branched pipeline
Create an Ingest Processor pipeline that uses the branch command to provide 3 different processing paths: one for data of type traffic, another for data of type audit, and one more for data that doesn't match either of those types.
Then, configure each path to do the following:
-
Filter the incoming data based on whether it matches the
trafficorauditdata types. -
If the data matches the log type that the particular pipeline path is intended to handle, then allow that data to continue downstream for further processing.
The following example shows a partial SPL2 statement for the pipeline. It creates the 3 processing paths and filters each type of log into the appropriate path, but does not include the commands for actually transforming and routing each log type.
$pipeline = | from $source
| branch
[ | where _raw IS traffic | ... ],
[ | where _raw IS audit | ... ],
[ | where NOT (_raw is traffic) AND NOT (_raw is audit) | ... ];
Transform and route the network traffic logs
The first branch in the pipeline is for processing data of type traffic. The Performance team wants these logs to be formatted as metric data points and stored in an index named traffic_metrics. They also want an unprocessed copy of the logs to be sent to an Amazon S3 bucket for cold storage.
To process the network traffic logs as requested, configure the first branch of the pipeline as follows:
-
Extract the relevant pieces of information from the
_rawfield of the log into top-level event fields. You can use therexcommand to extract most of the fields, and then use thestrptimefunction to create event timestamps in UNIX time. -
Then, use the
logs_to_metricscommand to generate metric data points from the extracted information.-
You'll need to use this command twice: once to generate the
bytesmetric data points, and a second time to generate theintervalmetric data points. -
Each
logs_to_metricscommand is enclosed by athruexpression that ensures the metrics are routed to a destination that supports metrics, such as Splunk Observability Cloud or a metrics index in Splunk Cloud Platform. The destination platform is determined by theintocommand at the end of thethruexpression. -
You can include an
evalcommand in thethruexpression to specify that the data needs to be sent to an index namedtraffic_metrics.
-
-
Finally, use the
fieldscommand to drop any fields that were only needed to support the generation of the metric data points, and then use theintocommand at the end of the pipeline branch to send an unprocessed copy of the network traffic logs to Amazon S3 for cold storage.
To make the logs_to_metrics command available in the pipeline, you'll need to include the following import statement:
import logs_to_metrics from /splunk/ingest/commands
The following is a partial SPL2 statement that shows the configuration of the first branch of the pipeline:
$pipeline = | from $source
| branch
[
| where _raw IS traffic
| rex field=_raw traffic_regex()
| eval _time = strptime(timestamp, "%d/%b/%Y:%H:%M:%S")
| thru
[
| logs_to_metrics name="bytes" metrictype="counter" value=bytes time=_time dimensions={"src_ip": src_ip, "dest_ip": dest_ip, "type": 'type'}
| eval index="traffic_metrics"
| into $metrics_destination
]
| thru
[
| logs_to_metrics name="interval" metrictype="counter" value=interval time=_time dimensions={"src_ip": src_ip, "dest_ip": dest_ip, "type": 'type'}
| eval index="traffic_metrics"
| into $metrics_destination
]
| fields - src_ip, bytes, dest_ip, interval, type, timestamp, _time
| into $s3_destination
],
[ | where _raw IS audit | ... ],
[ | where NOT (_raw is traffic) AND NOT (_raw is audit) | ... ];
The resulting data sent to the traffic_metrics index looks like this:
| _time | metric_name | metric_type | metric_value | dimensions |
|---|---|---|---|---|
|
7:07:00 PM 10 Jan 2023 |
bytes |
counter |
3786 |
{"src_ip":"175.44.24.82","dest_ip":"10.1.0.1","type":"ingress"} |
|
7:07:50 PM 10 Jan 2023 |
bytes |
counter |
2980 |
{"src_ip":"209.160.24.63","dest_ip":"10.100.5.16","type":"ingress"} |
|
7:08:10 PM 10 Jan 2023 |
bytes |
counter |
3014 |
{"src_ip":"112.111.162.4","dest_ip":"10.1.0.200","type":"egress"} |
|
7:07:00 PM 10 Jan 2023 |
interval |
counter |
363 |
{"src_ip":"175.44.24.82","dest_ip":"10.1.0.1","type":"ingress"} |
|
7:07:50 PM 10 Jan 2023 |
interval |
counter |
352 |
{"src_ip":"209.160.24.63","dest_ip":"10.100.5.16","type":"ingress"} |
|
7:08:10 PM 10 Jan 2023 |
interval |
counter |
355 |
{"src_ip":"112.111.162.4","dest_ip":"10.1.0.200","type":"egress"} |
Transform and route the audit logs
The second branch in the pipeline is for processing data of type audit. The Security team wants these logs to be formatted into events and then stored in an index named web_audit.
To process the audit logs as requested, configure the second branch of the pipeline as follows:
-
Extract the relevant pieces of information from the
_rawfield of the log into top-level event fields. You can use therexcommand to extract most of the fields, and then use thestrptimefunction to create event timestamps in UNIX time. -
Then, use the
fieldscommand to drop the_rawandtimestampfields, which are made redundant by the field and timestamp extractions. -
Finally, use the
evalcommand to specify that the data needs to be sent to an index namedweb_audit, and use theintocommand to send the processed data to Splunk Cloud Platform.
The following is a partial SPL2 statement that shows the configuration of the second branch of the pipeline:
$pipeline = | from $source
| branch
[ | where _raw IS traffic | ... ],
[
| where _raw IS audit
| rex field=_raw audit_regex()
| eval _time = strptime(timestamp, "%a %b %d %Y %H:%M:%S")
| fields - _raw, timestamp
| eval index = "web_audit"
| into $destination
],
[ | where NOT (_raw is traffic) AND NOT (_raw is audit) | ... ];
The resulting data sent to the web_audit index looks like this:
| _time | msg | host | sshd |
|---|---|---|---|
|
4:15:06 PM 10 Jan 2023 |
pam_unix(sshd:session): session opened for user mdubios by (uid=0) |
authsv1 |
60445 |
|
4:15:06 PM 10 Jan 2023 |
Failed password for djohnson from 194.8.74.23 port 3769 ssh2 |
authsv1 |
3759 |
|
4:15:08 PM 10 Jan 2023 |
Failed password for invalid user appserver from 194.8.74.23 port 3351 |
authsv1 |
5276 |
Send all other log types to an index
The third branch in the pipeline is for processing data that does not match the traffic type or the audit type. Since you currently don't have specific requirements for this data, you want to send it unchanged to an index for storage and later assessment.
Use an eval command in the third pipeline branch to specify that the data needs to be sent to an index named main, and then use the into command to send the data to Splunk Cloud Platform. The following is a partial SPL2 statement that shows the configuration of the third branch of the pipeline:
$pipeline = | from $source
| branch
[ | where _raw IS traffic | ... ],
[ | where _raw IS audit | ... ],
[
| where NOT (_raw is traffic) AND NOT (_raw is audit)
| eval index = "main"
| into $destination
];
Results
The complete Ingest Processor pipeline for selectively processing and routing different types of logs from the Buttercup Games website is as follows:
import logs_to_metrics from /splunk/ingest/commands
type traffic = string WHERE match($value, traffic_regex());
type audit = string WHERE match($value, audit_regex());
function traffic_regex(): regex {
return /(?P<src_ip>(((?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)(?:\.(?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)){3}))(?:)?)\s\-\s\-\s\[(?P<timestamp>(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])[.\/-](?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y|i)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)[.\/-](?:\d\d){1,2}:(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))\]\s\"(?P<type>(ingress|egress)).*\s(?P<dest_ip>(((?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)(?:\.(?:2(?:5[0-5]|[0-4][0-9])|[0-1][0-9][0-9]|[0-9][0-9]?)){3}))(?:)?)\"\s\-\s(?P<bytes>\d+)\s(?P<interval>\d+)/;
}
function audit_regex(): regex {
return /(?P<timestamp>(?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)\s(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y|i)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\s(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])\s(?:\d\d){1,2}\s(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))\s(?P<host>\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b))\ssshd\[(?P<sshd>[^\]]*)\]\:\s(?P<msg>.*)/;
}
$pipeline = from $source
| branch
[
| where _raw IS traffic
| rex field=_raw traffic_regex()
| eval _time = strptime(timestamp, "%d/%b/%Y:%H:%M:%S")
| thru
[
| logs_to_metrics name="bytes" metrictype="counter" value=bytes time=_time dimensions={"src_ip": src_ip, "dest_ip": dest_ip, "type": 'type'}
| eval index="traffic_metrics"
| into $metrics_destination
]
| thru
[
| logs_to_metrics name="interval" metrictype="counter" value=interval time=_time dimensions={"src_ip": src_ip, "dest_ip": dest_ip, "type": 'type'}
| eval index="traffic_metrics"
| into $metrics_destination
]
| fields - src_ip, bytes, dest_ip, interval, type, timestamp, _time
| into $s3_destination
],
[
| where _raw IS audit
| rex field=_raw audit_regex()
| eval _time = strptime(timestamp, "%a %b %d %Y %H:%M:%S")
| fields - _raw, timestamp
| eval index = "web_audit"
| into $destination
],
[
| where NOT (_raw is traffic) AND NOT (_raw is audit)
| eval index = "main"
| into $destination
];
Your data ingestion workflow now includes handling logic that identifies network traffic logs and audit logs from the Buttercup Games website based on their contents, and then processes and routes each type of log according to the requirements from the Performance and Security teams. Any incoming data that is not a network traffic log or an audit log gets routed to another index for storage and later assessment.
See also
Related reference
Creating and using data schemas with SPL2 data types
The Route data using pipelines chapter in the Use Ingest Processors manual
The Route data using pipelines chapter in the Use Edge Processors manual