While Splunk is well equipped for ingesting large quantities of data without issue, it can be significantly more difficult to extract the original raw data from Splunk (if you ever need to). This blog post is based on a true “worst case scenario” story when an excessive amount of bad data was accidentally ingested into Splunk and then how it was eventually handled.
In the Splunk world, it’s normal to find yourself dealing with massive amounts of data - that’s what Splunk was designed for after all. While Splunk is well equipped for ingesting large quantities of data without issue, it can be significantly more difficult to extract the original raw data from Splunk (if you ever need to).
In many respects, this makes sense. Splunk is primarily designed to be a log archive and analysis platform. The true power of Splunk comes from being able to return the needle in the haystack with some cool visualizations along the way. But, what if you find yourself needing an inordinate amount of hay? Can Splunk be coerced to export a massive amount of data?
You might be asking yourself, “Hey Tom and Ryan, why would you ever want to do such a thing?” Well, we’re glad you asked. This blog is actually based on a true story where a very important Technology Add-on (TA) went missing from an Index Cluster. That’s not great in general, but especially when you think about the Splunk data pipeline. In our specific case, this missing TA equated to 4 days worth of data not being extracted properly. That resulted in 4 days worth of data that wasn’t CIM compliant, and subsequently 4 days of data that wasn’t populating a customer’s Splunk App for Enterprise Security.
The Palo Alto Networks Add-on for Splunk is the add-on that went missing in our case. This add-on requires that data is ingested via a very specific sourcetype. When it passes through the Indexing tier, it is broken out into different sourcetypes to be analyzed in Splunk. For version 5.x or later of this add-on, the incoming syslog must be configured to use the sourcetype pan:log. The add-on will automatically break this up into different sourcetypes, such as pan:config, pan:traffic, and pan:threat.
Unfortunately, if you don’t follow the add-on installation instructions and pick a different sourcetype, none of this magic will happen. Additionally, if you do pick the right sourcetype, but don’t have the add-on present (which is what happened in this case), you will wind up with a bunch of data that you can’t effectively use in Splunk. Garbage in, garbage out.
There is another issue that customers typically have with their data that made this problem truly difficult to tackle. That is: “How to store backups of syslog data”. Unfortunately for the customer we were working with, the Palo Alto logs were approximately 80-90% of their daily License of 300GB per day. It’s fairly unreasonable to expect a customer to pay for storage to not only Index all of that data to meet their retention policies, but to also store a raw copy of the data separately. One of the reasons that is so ludicrous, is because Splunk technically stores your data in its raw format in a default field called _raw - so there really is no need.
So, what do you do if you wind up in a similarly all-around bad situation? You find a way to export that data, despite the fact that you may have been told “it’s not possible” by some fairly reputable sources.
Fortunately, Splunk has several mechanisms available to return the raw events from a search. For a small dataset, this can be done through SplunkWeb when viewing the search results. For a larger dataset, this often will require the search to be run a second time (even if it was already completed), in order to ensure all the events are returned properly. When dealing with exports containing millions of events or hundreds of gigabytes of data (where a search to export this data could conceivably take days to run), this approach isn’t all that practical.
To address this issue, there are other methods available to export data from Splunk, the full list of which can be referenced here. According to Splunk, “for large exports, the most stable method of search data retrieval is the Command Line Interface (CLI)”. Since we were facing what appeared to be over a terabyte of exported events, this seemed to fit into the “large exports” category. When working with a Splunk Cloud deployment, you don’t technically have CLI access. But this is Splunk - there has to be a way to get this to work.
One of the most powerful Splunk features is the the Splunk CLI. While this is not available locally for Splunk Cloud, you can request access to the Splunk Cloud management port, which we do for all of our customers. With that level of access, we have the ability to run search commands on a remote Splunk instance - which means that the CLI export method was now available with Splunk Cloud.
We started by spinning up a VM in the lab with a lot of disk space. For this example, we calculated approximately 300gb of raw syslog, over the course of a 4 day window, which approximates to 1.2TB of space. If you need to do a similar calculation, you can use the following search:
index=_internal component=metrics series=<your_sourcetype_here> | stats sum(kb) as total_kb | eval total_gb=total_kb/1024/1024
In this lab instance, we installed a copy of Splunk to match the customer’s Splunk Cloud instance, which, at the time of this writing, was 6.4.x. (Note: we initially tried this with a 6.5 instance, but due to some SSL changes in that version, it was unsuccessful. Rather than troubleshooting that issue, matching the version was the simplest solution). From this system, we were able to test running searches against the remote Splunk instance using a sample command such as the following:
$SPLUNK_HOME/bin/splunk search "index::pan_logs sourcetype::pan:log" -output rawdata -maxout 0 -max_time 0 -uri https://customer.splunkcloud.com:8089 > output.csv
You can run this search to verify your basic connectivity and confirm you are getting data returned. However, you probably don’t want to run this as-is because by default it will return everything in that sourcetype/index combination. In the case of a massive amount of data, timeouts and bandwidth will be your enemy.
Once we had our CLI Access and a healthy amount of storage, that was just about all we needed to get off the ground and start exporting our data. All we needed was a little bit of python, which can be found below.
import datetime import os import subprocess import re Import getpass # STUFF THAT NEEDS TO BE DONE BEFORE RUNNING THIS SCRIPT# #Specify the location of your Splunk Home Directory splunk_home='/opt/splunk/bin/splunk' search_string='index::pan_logs sourcetype::pan:log' #Specify a local username and password so we don't need to do Two Factor #Replace this with a username for your environment. Python will prompt for a password user="admin" p = getpass.getpass() password=p ### Define Stopping Time # # datetime.time takes in 3 variables (Hour in 24 hour format, Minute, Second) # datetime.date takes in 3 variables (YYYY, Month, Day) # # Currently time needs to be an interval of 15 minutes. # Examples include 3:00, 3:15, 3:30, 3:45. This will hopefully be modified in future versions. t_stop = datetime.time(14, 30, 00) d_stop = datetime.date(2017, 4, 4) ### Define the Starting Time # # datetime.time takes in 3 variables (Hour in 24 hour format, Minute, Second) # datetime.date takes in 3 variables (YYYY, Month, Day) # ## Currently time needs to be an interval of 15 minutes. # Examples include 3:00, 3:15, 3:30, 3:45. This will hopefully be modified in future versions. t_start = datetime.time(17, 30, 00) d_start = datetime.date(2017, 4, 4) #--------------------------------------------------------------------# dt_stop = datetime.datetime.combine(d_stop, t_stop) dt_start = datetime.datetime.combine(d_start, t_start) format = "%m/%d/%Y:%T" i = 0 while True: #if the Start Time and Stop time are the same, end the loop if dt_start==dt_stop: print "This is the end" print dt_start.strftime(format) + " it the same as " + dt_stop.strftime(format) break else: #Find the "earliest" time for the search (15 minutes before the current start time) dt_start_early = dt_start - datetime.timedelta(minutes=15) #Convert times to a readable format dt_start_early_string = dt_start_early.strftime(format) dt_start_string=dt_start.strftime(format) #ensure the file exists that we'll export to. file = open(str(i)+"_"+str(i+1)+"export.csv","w+") file.close() #spawn a process to run a search p = subprocess.Popen([splunk_home + " search \""+search_string+" earliest=\""+str(dt_start_early_string)+"\" latest=\""+str(dt_start_string)+"\"\" -output rawdata -maxout 0 -max_time 0 -auth "+user+":"+ password +" -uri https://customer.splunkcloud.com:8089 > "+str(i)+"_"+str(i+1)+"export.csv"], stdout=subprocess.PIPE, shell=True) output, err = p.communicate() #Set the next start time to the earliest time of our last search dt_start = dt_start_early #increment our counter for next file name i=i+1
This situation was made extra exciting because we ended up running two Splunk instances on the data exfiltration node at the same time, one for outputting data to files and one to re-index data as those files were written. We don’t recommend this scenario long term, but it helped us set this up over a weekend and when we came back in on Monday, everything was solved. Theoretically, if you had everything in place right away, you could do this with one UF, or one HF. We were figuring this out in stages however, so we setup a UF to start downloading the data as we knew it would take a while to export the quantity of data we needed. We utilized a second Splunk instance to start testing ingesting the data to a test index to make sure it was working as expected and to fine-tune some hostname props/transforms settings. This was probably overkill in hindsight, but if you go to implement this, keep in mind you may be able to streamline things even more.
1.) This code runs a search for the data that was indexed incorrectly. In our case, anything that was indexed as “pan:log” was actually unusable. Data should have ended up in the indexes as pan:threat, pan:traffic, etc. So to identify the “bad” data that we needed to re-index, we set our search_string to “index::pan_logs sourcetype::pan:log”. The code also has two variables for “Start” and “End” times. So you can say “I want to look for this data over this span of time”. Both times should be in increments of 15 minutes. So if your most recent “bad” event is at June 4, 2017 at 10:01AM, you’ll set your start time to June 1, 2017 at 10:15AM. If the oldest bad event is June 1, 2017 at 9:37AM, you’ll set your end time to June 1, 2017 at 9:30AM. This way you make sure to capture all bad events.
2.) Next when you run the script, it will export data in 15 minute chunks and leave them in the same folder that you ran the script in. This helps alleviate any issues with timeouts that can result in trying to download 1.2 TB of data all at once. (Especially if you work from home and are plagued with a terrible internet connection.)
3.) You’ll need to decide how to re-index the data. For this script, you’ll setup a File Input to read in the files from whatever directory you exported them to. Ours looked like the following:
[monitor:///opt/splunk/export/*.csv] disabled = false followTail = 0 sourcetype=pan:log index=pan_logs blacklist = \.gz$
4.) Lastly you’ll need to decide if you’re going to have a Heavy Forwarder or Universal Forwarder do the work. We used a Heavy Forwarder for some very specific reasons that I won’t get into here. Especially because Heavy Forwarders are, in general, not recommended. The point being, we displayed a Heavy Forwarder in this diagram, but you could also do this with a single Universal Forwarder if you didn’t need any custom props/transforms (we did in this case).
Last but not least, we should at least show the results from our efforts. Below is a look at the Palo Alto logs from the 4 days in question:
Since the logs that we exported and re-indexed had a different source than the logs previously indexed, we could look across all Palo Alto logs and use a timechart command with a count to create a nice visualization. The search we used and graph are included here. The only sourcetype we filtered out was “pan:log” since we didn’t care about those events. You’ll notice that the blue line shows a count of events that came from this script. The yellow line is the count of events that came from the standard process. So what we can see is that we successfully backfilled this timeframe with all 1.2TB of data.
index::pan_logs sourcetype!=pan:log | fields source| eval source_location=case(LIKE(source,"/opt/splunk/%"),"script", 1=1, "standard") | timechart count by source_location
Hopefully this helps someone else in the future who ends up in a “worst case scenario” when they accidentally ingest an excessive amount of bad data that they don’t have a backup of. I know this is the first time we had to tackle this problem and in theory, though it seemed possible, it’s always nice to see theory meet practice. If anyone has tackled this problem before, finds this useful, or has any comments or questions feel free to comment below.
If you're looking for something different than the typical "one-size-fits-all" security mentality, you've come to the right place.