Splunk How-To: Optimizing Database Storage with Splunk DB Connect

What if your database was smart enough to know what data Splunk has already indexed and what it hasn’t? Splunk’s DB Connect app allows Splunk administrators to index data from databases in Splunk. Find out what the benefits are and how you can do this in Splunk.



Introduction

Splunk’s DB Connect app allows Splunk administrators to index data from databases in Splunk. This can be extremely useful for both the DBA and Splunk user. The benefits include saving space on the database server, using Splunk as a long-term storage solution (to avoid having to grow databases to tremendous sizes), and even using that database data in your Splunk searches. With these abilities comes the chore of purging database records - but only after Splunk has indexed them.

What if your database was smart enough to know what data Splunk has already indexed, and what it hasn’t? If Splunk could tell the database what data has been indexed, we could potentially remove any possibility of data being purged from the database before you index it.  Splunk pulls database records through use of the “DB input” function. This exercise requires Splunk DB Connect v3.1.0 or later.

Database rising inputs look something like:

“SELECT * FROM my_table WHERE my_rising_column > ? ORDER BY my_rising_column ASC”

The magic of this search only pulls data that is newer than the last time Splunk’s DB input pulled data from this source. This is called a checkpoint value, and is stored in $SPLUNK_HOME/var/lib/splunk/modinputs/server/splunk_app_db_connect/. Every time Splunk pulls data from a database rising input, it will write the checkpoint value here, along with writing it to the _internal index.

So what happens if your database feed breaks for whatever reason? Is your DBA kind enough to keep storage around for a day, week, or even month? What happens when your Splunk admin goes on vacation and can’t fix the input quickly enough to avoid losing data? The answer is simple: include a failsafe for your database to know not to automatically delete data before Splunk is able to index it.

So how does it work?

Recently I’ve been working on this very use case: looking for a technique for Splunk to be able to communicate back to the database where it stopped reading the data - or now that you know some Splunk lingo - how to share Splunk’s checkpoint value with the database purging mechanism. To accomplish this we will need the following:

      - A method to find the DB input’s checkpoint value automatically
      - A way for Splunk to communicate this value to the database

Obtaining the checkpoint value

Fortunately for us, Splunk is a logging and analytics tool at heart. This means we can find the rising column value for a DB input fairly easily. For our test case, the DB input in question will be called “my_test_input”, which happens to be an input reading from an Oracle database. The Splunk server running DB Connect is called “heavyforwarder1” and our search head is called “searchhead1”. So let’s start by finding that checkpoint value (hint: if you’re in a distributed environment, run these searches from your search head):

index=_internal host=heavyforwarder1 sourcetype=dbx_server action=dump_checkpoint file="/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/my_test_input" 

2017-08-08 03:10:00.020 -0400  [QuartzScheduler_Worker-18] INFO  
c.s.d.s.dbinput.task.DbInputCheckpointRepository - action=dump_checkpoint 
file=/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/my_test_input 
value={"value":"6602","appVersion":"3.1.0","columnType":2,"timestamp":"2017-08-08T03:1
0:00.020-04:00"}

Now that we have the log we need, let’s table it out to make it a bit easier to read (and work with later):

index=_internal host=heavyforwarder1 sourcetype=dbx_server action=dump_checkpoint 
file="/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/my_test_input"
| rex field=value "\{\"value\":\"(?<checkpoint_value>\d+)\"" 
| rex field=file "/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/(?<db_input_name>\w+)" 
| table _time db_input_name checkpoint_value 
| sort - _time 
| head 1 

Our search now let’s us know the time, db input name, and latest checkpoint value:

Getting our checkpoint value to the database

Now that we have this checkpoint value, if only we had a way to send it to the database. Over the years I’ve watched DB Connect evolve from version 1, to what it is now. Coincidentally, DB Connect v3.1.0 allows for us to run stored procedures successfully against connected databases. The goal here is for Splunk to kick off a stored procedure against the database to insert the checkpoint value. For reference, this is the stored procedure I’m using:

CREATE OR REPLACE PROCEDURE SPLUNKT.TEST_ORASP(
  p_ref_cursor  OUT SYS_REFCURSOR,
  p_var_in      IN  varchar)
AS
BEGIN
OPEN p_ref_cursor FOR
SELECT to_char(sysdate,'mm/dd/yyyy hh24:mi')|| ', you passed-in: "'|| p_var_in ||'".' out_var FROM dual;
-- INSERT INTO splunkt.SplunkFwlog (seq) VALUES (to_number(p_var_in));
-- commit;
END

The Splunk setup I am currently working with is a distributed environment. Long story short, that means that internal Splunk logs from all Splunk Infrastructure are viewable on searchhead1 after they getting written to the indexers. Since we are running this search on the search head, we will also need DB Connect installed here so we can initiate the stored procedure to reach the database. Ensure that a DBX connection is enabled on the search head to the database server.

Combining DB stored procedures with Splunk searches

Now that we have a live connection to the database, we know the stored procedure we need to run, and we have the checkpoint value, we can initiate all the magic. Here is our Splunk SPL syntax for calling the stored procedure (with test data):

| dbxquery connection=my_test_database procedure=\"{call test_storedproc_splunk.send_rc(?,?) }\" params=\"1"

The above Splunk search string will run the stored procedure “test_storedproc_splunk” and pass a checkpoint value of “1” into the database.

If that returns without an error, we’re in good shape knowing that we can write to the DB without issue. Now let’s actually make this useful by grabbing the current Splunk DB input checkpoint value and sending it to the database all in one shot:

index=_internal host=heavyforwarder1 sourcetype=dbx_server action=dump_checkpoint file="/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/my_test_input" 
| rex field=value "\{\"value\":\"(?<checkpoint_value>\d+)\"" 
| rex field=file "/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/(?<db_input_name>\w+)" 
| table _time db_input_name checkpoint_value 
| sort - _time 
| head 1 
| map search=" | dbxquery connection=my_test_database procedure=\"{call test_storedproc_splunk.send_rc(?,?) }\" params=\"$checkpoint_value$\" "

Success! We were able to successfully pass in the latest checkpoint value of this input:

Ready to Purge the DB

We know that our Splunk DB input has received data up to database table record 6602. Our database is now also aware of this thanks to our Splunk search/stored procedure call. This means that the database can efficiently purge records 0-6602 without worrying about if Splunk will be missing the data or not. The only remaining item in this (so-far-manual) process would be to configure the database to purge data older than the inputted value of 6602 (or whatever Splunk sends next). Once your DBA has this set up, we are ready to automate the saved search that will power this beast.

There are not many requirements to how often you send the checkpoint value to your database, but in my case, the DB owner want to be able to purge data at least once a day. My input runs every 15 minutes so I can have up-to-date data in Splunk. This means that some data will sit in the database already-Splunked until my daily saved search kicks off to send the checkpoint value to the database. I chose to configure this to run at midnight every night. Here’s my savedsearches.conf (this can be done by saving your ad hoc search as a report and scheduling it accordingly):

[Test_Database_checkpoint_population_return]
action.email.useNSSubject = 1
alert.track = 0
cron_schedule = 0 0 * * *
dispatch.earliest_time = -24h@h
dispatch.latest_time = now
display.events.fields = ["host","source","sourcetype","tag","index","result","time_result","timezone_result","eventtype","action","vendor_action","MESSAGE_CODE","size","rule_name","user","dest","src_ip","src_mac","value","file"]
display.general.type = statistics
display.page.search.mode = verbose
display.page.search.tab = statistics
display.statistics.rowNumbers = 1
display.visualizations.charting.chart = line
display.visualizations.show = 0
enableSched = 1
request.ui_dispatch_app = search
request.ui_dispatch_view = search
schedule_window = 15
search = index=_internal host=heavyforwarder1 sourcetype=dbx_server action=dump_checkpoint file="/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/my_test_input" \
| rex field=value "\{\"value\":\"(?<checkpoint_value>\d+)\"" \
| rex field=file "/opt/splunk/var/lib/splunk/modinputs/server/splunk_app_db_connect/(?<db_input_name>\w+)" \
| table _time db_input_name checkpoint_value \
| sort - _time \
| head 1 \
| map search=" | dbxquery connection=my_test_database procedure=\"{call test_storedproc_splunk.send_rc(?,?) }\" params=\"$checkpoint_value$\" "

That’s it - you now have a fully-functional failsafe for purging your database while knowing Splunk has already indexed what it needs to. There will be times this may need some adjusting due to the flexibility of Splunk database inputs, but the foundation will remain the same. This method can easily be duplicated for other database inputs as well by replacing the checkpoint file name in the “file” field of your Splunk search.




Close off Canvas Menu