https://github.com/tomwynn/coldToFrozenScripts/blob/master/coldToFrozenS3.py

https://github.com/tomwynn/coldToFrozenScripts/blob/master/coldToFrozenBlob.py

Data archiving is usually an afterthought. Most organizations only implement data archiving when compliance requires. But what if an incident happens and requires data going back further than current retention for forensics.

This post provides a walkthrough of some simple scripts to implement with Splunk to archive data directly from Splunk to more cost effective storages such as AWS S3 or Azure Bob Storage for long term storage.

These two scripts perform similarly in the first few functions then will differ at the calls to two different Cloud Service Providers

Note: the scripts also take into account of the instance id to create the objects with prefixes nicely for ease of data retrieving later.

Walkthrough

  1. I always like to setup my python scripts with some logging for errors and troubleshooting:
#Set up logging
log_file_path = '/opt/splunk/var/log/splunk/splunkArchive.log'
app_name = 'SplunkArchive'
def get_module_logger(app_name,file_path):
        logger = logging.getLogger(app_name)
        fh = logging.FileHandler(file_path)
        fh.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - process=%(name)s - status=%(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        logger.addHandler(fh)
        logger.setLevel(logging.INFO)
        return logger

#Define time
today=round(time.mktime(datetime.datetime.today().timetuple()))
one_month_earlier=today-120*86400

#Start Logger
logger = get_module_logger(app_name='SplunkArchive',file_path=log_file_path)
#logger.info('Started on '+str(datetime.datetime.today()))
#logger.info("TEST")

2. The script consists of two main functions:

Quick function to handle new bucket with two parameters base and files – defined in main

def handleNewBucket(base, files):
    print('Archiving bucket: ' + base)
    for f in files:
        full = os.path.join(base, f)
        if os.path.isfile(full):
            os.remove(full)

A function to compress buckets

def make_index_bucket_tarfile(output_filename,source_dir):
        with tarfile.open(output_filename, "w:gz") as tar:
                try:
                        tar.add(source_dir,arcname=os.path.basename(source_dir))
                        logger.info(output_filename+' was created')
                except (OSError, tarfile.TarError) as e:
                        logger.error('Error: tar archive creation failed' + str(e))
                else:
                        shutil.rmtree(source_dir)
                        logger.info(os.path.basename(source_dir)+' was removed')

Processes Splunk bucket data and make the call to S3 service

if __name__ == "__main__":
    if len(sys.argv) != 2:
        sys.exit('usage: python coldToFrozenExample.py <bucket_dir_to_archive>')

    if not os.path.isdir(ARCHIVE_DIR):
        try:
            os.mkdir(ARCHIVE_DIR)
        except OSError:
            # Ignore already exists errors, another concurrent invokation may have already created this dir
            sys.stderr.write("mkdir warning: Directory '" + ARCHIVE_DIR + "' already exists\n")

    bucket = sys.argv[1]
    if not os.path.isdir(bucket):
        sys.exit('Given bucket is not a valid directory: ' + bucket)

    rawdatadir = os.path.join(bucket, 'rawdata')
    if not os.path.isdir(rawdatadir):
        sys.exit('No rawdata directory, given bucket is likely invalid: ' + bucket)

    files = os.listdir(bucket)
    journal = os.path.join(rawdatadir, 'journal.gz')
    if os.path.isfile(journal):
        handleNewBucket(bucket, files)
    else:
        sys.exit('No journal file found, bucket invalid:' + bucket)

    if bucket.endswith('/'):
        bucket = bucket[:-1]

    indexname = os.path.basename(os.path.dirname(os.path.dirname(bucket)))
    destdir = os.path.join(ARCHIVE_DIR, indexname, os.path.basename(bucket))
    container_name = indexname
    full_index_path=os.path.join(ARCHIVE_DIR,indexname)

    s3bucket = 'splunkarchive'
    instance_id = urllib.request.urlopen('http://169.254.169.254/latest/meta-data/instance-id').read().decode()
    print('instance ID:', instance_id)
    remotepath = 's3://{s3bucket}/{instanceid}/{index}/'.format(
        s3bucket=s3bucket,
        instanceid=instance_id,
        index=indexname) 

    while os.path.isdir(destdir):
        print('Warning: This bucket already exists in the archive directory')
        print('Adding a random extension to this directory...')
        destdir += '.' + str(random.randrange(10))

    shutil.copytree(bucket, destdir)

    for bucket_dir in os.listdir(full_index_path):
        bucket_path=os.path.join(full_index_path,bucket_dir)
        if os.path.isdir(bucket_path):
            output_filename=full_index_path+'/'+hostname+'_'+indexname+'_'+bucket_dir+'.tar.gz'
            make_index_bucket_tarfile(output_filename,bucket_path)

    for files in os.listdir(full_index_path):
                logger.info('files: '+files)
                file_path=os.path.join(full_index_path,files)
                logger.info('file_path: '+file_path)
                if files.endswith((".gz")):
                        s3args = 'cp ' + file_path + ' ' + remotepath
                        command = '/usr/local/bin/aws s3 ' + s3args
                        command = command.split(' ')
                        #logger.info('command: '+command)
                        try:
                            awscli = subprocess.check_call(
                            command,
			                stdout=sys.stdout,
			                stderr=sys.stderr,
			                timeout=900)
                            #os.remove(file_path)
                        except subprocess.TimeoutExpired:
                            raise ColdToFrozenS3Error("S3 upload timedout and was killed")
                        except:
                            raise ColdToFrozenS3Error("Failed executing AWS CLI")
                        if os.path.isfile(file_path):
                            os.remove(file_path)
                        print('Froze {0} OK'.format(sys.argv[1]))

Processes Splunk bucket data and make the call to Blob Storage

if __name__ == "__main__":
        if len(sys.argv) != 2:
                sys.exit('usage: python cold2frozen.py <bucket_path>')
        if not os.path.isdir(ARCHIVE_DIR):
                try:
                        os.mkdir(ARCHIVE_DIR)
                except OSError:
                        sys.stderr.write("mkdir warning: Directory '" + ARCHIVE_DIR + "' already exists\n")
        bucket = sys.argv[1]
        if not os.path.isdir(bucket):
                sys.exit('Given bucket is not a valid directory: ' + bucket)
        rawdatadir = os.path.join(bucket, 'rawdata')
        if not os.path.isdir(rawdatadir):
                sys.exit('No rawdata directory, given bucket is likely invalid: ' + bucket)
        files = os.listdir(bucket)
        journal = os.path.join(rawdatadir, 'journal.gz')
        if os.path.isfile(journal):
                archiveBucket(bucket, files)
        else:
                sys.exit('No journal file found, bucket invalid:' + bucket)

        if bucket.endswith('/'):
                bucket = bucket[:-1]


	    indexname = os.path.basename(os.path.dirname(os.path.dirname(bucket)))
        destdir = os.path.join(ARCHIVE_DIR,indexname,os.path.basename(bucket))
        container_name = indexname


        block_blob_service = BlockBlobService(account_name='test', account_key='test')
        block_blob_service.create_container(indexname)

        full_index_path=os.path.join(ARCHIVE_DIR,indexname)

        blobs_generator = block_blob_service.list_blobs(container_name)
        logger.info("blobs_generator %s" % blobs_generator)
        for bucket_dir in os.listdir(full_index_path):
                bucket_path=os.path.join(full_index_path,bucket_dir)
                if os.path.isdir(bucket_path):
                        logger.info('Working on the bucket '+bucket_path)
                        logger.info('The Bucket is older than 120 days')
                        output_filename=full_index_path+'/'+hostname+'_'+indexname+'_'+bucket_dir+'.tar.gz'
                        make_index_bucket_tarfile(output_filename,bucket_path)
        for files in os.listdir(full_index_path):
                file_path=os.path.join(full_index_path,files)
                if files.endswith((".gz")):
                        block_blob_service.create_blob_from_path(container_name, files, file_path)
                        os.remove(file_path)
        while os.path.isdir(destdir):
                print 'Warning: This bucket already exists in the archive directory'
                print 'Adding a random extension to this directory...'
                destdir += '.' + str(random.randrange(10))
        shutil.copytree(bucket, destdir)