python unit tests for reading and writing functions
New here? Learn about Bountify and follow @bountify to get notified of new bounties! x

I have 5 functions I need unit tests written for. The functions are reading and writing files to an S3 bucket. The unit tests should test 1) if the function is working as expected given the correct inputs 2) if the function raises an error if given the wrong inputs. The answer doesn't have to include mocking Amazon s3.

def write_parquet_file(final_df, filename, prefix, environment, div, cat):
'''
Function to write parquet files with staging architecture
Input: 
String final_df:  the data frame to be written
String filename: the file name to write to
String prefix: the prefix for all output files
String environment: production or development
String div: division
String cat: category (None if not applicable)
'''

if not filename.endswith('.parquet'):
    filename = f'{filename}.parquet'

staging_path, destination = generate_file_path(prefix=prefix, environment=environment, filename=filename, cat=cat, div=div)

import os
try:
    logger.info(f'Attempting to write {filename} to {staging_path}')
    final_df.write.parquet(staging_path, mode='overwrite')
    logger.info(f'Successfully wrote {filename} to staging')
except Exception as e:
    logger.error(f'Failed to write {filename} to staging')
    logger.error(e, exc_info=True)
    raise

remove_s3_file(destination)
move_s3_file(staging_path, destination)



def read_partitioned_csv(filepath, file_schema, partition_column, cat, spark, sep='|'):
'''
Function to read partitioned csv
Input:
String: file name
String: file schema
String: partition column
String: partition value
Spark Session Object:
String sep:  the csv seperator

Output:
Spark dataframe containing the data from the csv (within the partition)
'''

# Create the full path for the csv by appending the partition column and value to the path key.  
# 
# If either PARTITION_COLUMN or cat is an empty string, then set both to empty and set the connector to empty 
# and retrieve the entire csv.  If both are not empty then set the connector to = and only retrieve a 
# subset of the csv

if (partition_column == '') | ( cat == ''):  # if either is '', then set both to '' and set the connector to ''
    connector = ''
    partition_column = ''
    cat = ''
else:
    connector = '='

path = f'{filepath}/{partition_column}{connector} cat}' # construct the path to the csv 

file_schema = extract_schema(file_schema) # convert the schema string into a structure type

try:
    df = spark.read.csv(path, header=True, inferSchema=False, schema=file_schema, sep=sep)
    logger.info(f'Successfully read {filepath} for partition {partition_column}= cat}')
    return df

except Exception as e:
    logger.error(f'Failed to read {filepath} for partition {partition_column}= cat}')
    logger.error(e, exc_info=True)
    raise



def read_pyarrow_file(filename, prefix, environment, div, cat):
'''
Function to read parquet files in a partitioned directory structure

Input:
String: file name
String: output location
String: partition value
Spark Session Object:

Output:
Spark dataframe containing the data from the parquet (from the partitioned directory)
'''
import s3fs
import pyarrow.parquet as pq

if not filename.endswith('.parquet'):
    filename = f'{filename}.parquet'

_, path = generate_file_path(prefix=prefix, environment=environment, filename=filename, cat=cat, div=div)

s3 = s3fs.S3FileSystem()

try:
    df = pq.ParquetDataset(path, filesystem=s3).read_pandas().to_pandas()
    logger.info(f'Successfully read {path}')
    return df
except Exception as e:
    logger.error(f'Failed to read {path}')
    logger.error(e, exc_info=True)
    raise


def read_parquet_file(filename, prefix, environment, div, cat, spark):
'''
Function to read parquet files from a partitioned directory structure

Input:
String: file name (without .parquet)
String: output location
String environment - either 'development' or 'production'
string div - division
String: partition value
Spark Session Object:

Output:
Spark dataframe containing the data from the parquet (from the partitioned directory)
'''

if not filename.endswith('.parquet'):
    filename = f'{filename}.parquet'

_, path = generate_file_path(prefix=prefix, environment=environment, filename=filename, cat=cat, div=div)

try:
    df = spark.read.parquet(path)
    logger.info(f'Successfully read {path}')
    return df
except Exception as e:
    logger.error(f'Failed to read {path}')
    logger.error(e, exc_info=True)
    raise


def write_standard_csv(df, filename, prefix, environment, cat, div, sep=',', header=True, mode='overwrite'):
'''Function to write standard csv files to S3. 
   Input:
   Spark Dataframe: df
   String: Filename
   String: S3 Path 
   String: Environment
   String: category
   String: division

   Output: Human readable csv file'''
import os
import shutil

def renamer(filename):
    '''Renames all Spark partition filenames to just the part and the parition number.csv. Removes the alphanumeric portion 
    that is written at write time'''
    for pathname in os.listdir(filename):
        basename = os.path.basename(pathname)
        new_filename = '{}.csv'.format(filename)
        if new_filename != basename:
            os.rename(os.path.join(filename,basename), new_filename)

if not filename.endswith('.csv'):
    filename = f'{filename}.csv'

# Generate file path method will make sure filename, prefix, environment, div, cat are valid inputs
staging_path, destination = generate_file_path(prefix = prefix, 
                                               environment = environment, 
                                               filename = filename, 
                                               cat = cat, 
                                               div = div)   


emr_staging_prefix = pathlib.Path(__file__).parents[1].joinpath('staging').resolve()

logger.info(f'Attempting to write {filename} to {staging_path}')
df.coalesce(1).write.csv(staging_path, mode=mode, header=header, sep=sep)
if os.system(f"""aws s3 ls '{staging_path}'""") == 0:
    logger.info(f'Successfully wrote {filename} to {staging_path}')
else:
    logger.error(f'Failed writing {filename} to {staging_path}')
    raise

remove_s3_file(destination)
move_s3_file(staging_path, destination)

emr_dir = os.path.splitext(filename)[0]
logger.info(f'Attempting to bring {filename} back to the EMR ({emr_staging_prefix}/{emr_dir})')
if os.system(f"""aws s3 cp '{destination}' '{emr_staging_prefix}/{emr_dir}' --sse --recursive --exclude '*' --include '*.csv'""") == 0:
    logger.info(f'Successfully brought {filename} back to the EMR ({emr_staging_prefix}/{emr_dir})')
else:
    logger.error(f'Failed to bring {filename} back to EMR ({emr_staging_prefix}/{emr_dir})')
    raise

renamer(f'{emr_staging_prefix}/{emr_dir}')

logger.info(f'Attempting to write {emr_staging_prefix}/{filename} to {destination}')
if os.system(f"""aws s3 cp '{emr_staging_prefix}/{filename}' '{destination}' --sse""") == 0:
    logger.info(f'Successfully wrote {emr_staging_prefix}/{filename} to {destination} as human readable csv')
else:
    logger.error(f'Failed to write {emr_staging_prefix}/{filename} to {destination} as human readable csv')
    raise

shutil.rmtree(f'{emr_staging_prefix}/{emr_dir}')
os.system(f"""aws s3 rm '{destination}' --recursive""")
4 months ago

Crowdsource coding tasks.

0 Solutions