Using Athena To Process CSV Files

With Athena, you can easily process large CSV files in Transposit.

Dan Moore · Oct 4, 2019

Order ledger

Athena is a serverless query engine you can run against structured data on S3. You can run queries without running a database. Here are the AWS Athena docs. Athena is powerful when paired with Transposit. With Transposit, you can:

Here's a previous post about using Transposit to access CloudWatch data with Athena. In this post, I'll cover parsing CSV files on S3 and making the data available to Transposit applications.

Before you start

If you want to follow along with the code here, you'll need a Transposit account and an AWS account. You'll need permission for the following services:

Setting up Athena

Assume you have a set of CSV files on S3. These files are order files placed on S3 by another system, and you want to further process them.

The files are located at: s3://dan-test-bucket-athena/orders/ and have the following structure:

Email,name,city,sku,full address,amount
test@example.com,Joe Dumas,Denver,widget,"1/2 2nd st, Denver",500
test2@example.com,Jane Ekon,Seattle,widget2,"500 3nd st, Seattle",100

Note that some columns have embedded commas and are surrounded by double quotes. If following along, you'll need to create your own bucket and upload this sample CSV file. You can have as many of these files as you want, and everything under one S3 path will be considered part of the same table.

You'll need to create a table in Athena. You can do this in Transposit via a query, but I did it manually.

CREATE EXTERNAL TABLE IF NOT EXISTS default.orders (
email string,
name string,
city string,
sku string,
fulladdress string,
amount string
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'escapeChar'='\\',
'separatorChar'=',')
LOCATION 's3://dan-test-bucket-athena/orders/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'skip.header.line.count'='1'
);

Because we have commas in fields, we want to use OpenCSVSerde which parses those correctly. We point the Athena table at the S3 location. I have found it easiest to have every field treated as a string, unless it will always be a number/boolean and is always present. However, if you specify a numeric field as a string you won't be able to use operators like sum, unfortunately.

After creating a table, we can now run an Athena query in the AWS console: SELECT email FROM orders will return test@example.com and test2@example.com.

Running the query

Now we can create a Transposit application and Athena data connector.

You'll need to authorize the data connector. You need to set the region to whichever region you used when creating the table (us-west-2, for example). Then put the access and secret key for an IAM user you have created (preferably with limited S3 and Athena privileges).

We then can run an Athena query, like SELECT * FROM orders WHERE city = 'Denver'. We need to do this in two phases, which require at least two operations. Phase one, start the query (we pass the query in to the operation using a parameter):

(params) => {
let queryId = api.run("aws_athena.start_query_execution", {
$body: {
QueryString: params.query,
ClientRequestToken: api.run("this.makeId")[0],
ResultConfiguration: {
OutputLocation: params.resultlocation
}
}
})[0]["QueryExecutionId"];

stash.put("query-id", queryId);
}

The resultlocation is a writable S3 location. We also store the query id (a random string of digits created by the makeId operation) in the stash. If you have multiple queries running at the same time, you'll want to avoid key collisions in the stash. This is required because Athena doesn't guarantee when a query will be done.

During phase two, we need to get the query id from the stash and see if it is done. We could handle this asynchronicity a few ways.

setInterval

Call setInterval and run get_query_execution and see what the state of your query is. It will be one of the documented Athena query states. If the status is SUCCEEDED or FAILED the query has completed. Otherwise, wait for a while and check again.

SELECT QueryExecution.Status.State FROM aws_athena.get_query_execution
WHERE $body=(SELECT {
'QueryExecutionId' : @queryId
})

The issue with this option is that your entire Transposit request must complete under the execution timeout limits. Athena makes no promises about how long it will take, so you may run out of time.

Scheduled tasks

If your Athena query takes a consistent amount of time, use a scheduled task. You can schedule the results processing operation five or more minutes after the query start operation. If you know your data processing duration, this is the simplest solution.

Dynamic scheduled tasks

Another option is to use a dynamic scheduled task. Similar to the setInterval solution, you call a task, check to see if Athena is done, and if it is successful, process the results. If not, you wait again. This solution isn't limited to the duration of the request execution timeout, but is more complicated to reason about.

Results, results, results

In all cases, you'll want to read your data when it is ready (unless the query failed, in which case you'll want to look at the failure and diagnose it). You can use an operation such as this to read the results (where queryId is pulled from the stash):

(params) => {
let results = "";
try {
results = api.run('aws_athena.get_query_results', {
$body: {
QueryExecutionId: params.queryId
}
});
} catch (e) {
console.log(e);
return ["error, check logs"];
}

const cols = results[0].Data;

let data = results.slice(1, results.length).map(d => {
return d.Data
});

let processed_data = data.map((row) => {
let obj = {};
row.forEach(function(item, index) {
let label = cols[index].VarCharValue;
obj[label] = row[index].VarCharValue;
return obj;
});
return obj;
})

return processed_data;
}

This operation postprocesses the data to make the results easier to use. If we ran the Athena query SELECT * FROM orders WHERE city = 'Denver', here are the raw results (column names are always lowercased):

[
{
"Data": [
{
"VarCharValue": "email"
},
{
"VarCharValue": "name"
},
{
"VarCharValue": "city"
},
{
"VarCharValue": "sku"
},
{
"VarCharValue": "fulladdress"
},
{
"VarCharValue": "amount"
}
]
},
{
"Data": [
{
"VarCharValue": "test@example.com"
},
{
"VarCharValue": "Joe Dumas"
},
{
"VarCharValue": "Denver"
},
{
"VarCharValue": "widget"
},
{
"VarCharValue": "1/2 2nd st, Denver"
},
{
"VarCharValue": "500"
}
]
}
]

The results after processing:

[
{
"email": "test@example.com",
"name": "Joe Dumas",
"city": "Denver",
"sku": "widget",
"fulladdress": "1/2 2nd st, Denver",
"amount": "500"
}
]

Now, we could do something else with the retrieved order data. We can use any of the connectors Transposit provides or we can build our own if there's an custom API that we need to access.

Possible options for this sample data include:

Considerations

What if I have embedded newlines in my CSV file?

You'll need to clean them up via preprocessing. If the CSV file isn't too large, you can cleanse it using JavaScript in a Transposit operation like so:

const get_object_sql = "SELECT * FROM aws_s3.get_object 
WHERE Bucket='dan-test-bucket-athena' AND Key='source/path/filename.csv'";

const fullorderscsv_raw = api.query(get_object_sql)[0];

const fullorderscsv_removed_embedded_newlines =
fullorderscsv_raw.replace(/"[^"]*(?:""[^"]*)*"/g, function(m) { return m.replace(/\n/g, ' '); });

If CSV file is too large, you'll see an out of memory error. At that point you can preprocess the files with another tool or consume the file by byte range and clean each section.

What if I have timestamped data files?

I ran into a situation where a timestamped orders file was dropped into an S3 bucket every day by a different process (so, orders-2019-10-10.csv, orders-2019-10-11.csv, etc, all in the same folder). To process only the latest orders in this situation, copy the file into a subfolder. You can do that with this SQL operation:

SELECT * FROM aws_s3.copy_object
WHERE Bucket='dan-test-bucket-athena'
AND x-amz-copy-source='/dan-test-bucket-athena/orders/orders-2019-10-10.csv'
AND Key='orders/forathena/destfile.csv'

In this case, when running the Athena CREATE TABLE statement, set the LOCATION to s3://dan-test-bucket-athena/orders/forathena. You can replace the source file for Athena repeatedly and it will pick up the new data.

How do I evolve this?

Any data pipeline of any complexity will change. To make sure you can test changes, you'll want to have two different Transposit applications (staging and production). Tips for making that work:

To handle changes after production and staging applications are running, I found having two git remotes helpful.

origin	https://console.transposit.com/git/mooredstransposit/athena_staging_app (fetch)
origin	https://console.transposit.com/git/mooredstransposit/athena_staging_app(push)
production	https://console.transposit.com/git/mooredstransposit/athena_production_app (fetch)
production	https://console.transposit.com/git/mooredstransposit/vathena_production_app (push)

Do all your development on the master branch using the console or git, as you see fit. When you first fork the application, create a production branch too. Then you can promote a change like so:

git checkout master
git pull origin master # make sure we have the latest from staging
git checkout production
git merge master # merge in our changes, dealing with any conflicts
git push production production:master # push to production

You can also create a third Transposit application, and set up a test environment with a known set of inputs and a golden output. Run your harness by adding a few endpoints, perhaps setting up different inputs and outputs, to start your query and process the results. Compare the end results with golden data to ensure no regressions have occurred.

Create a README.md for your application with instructions for deployment and documentation.

Conclusion

Athena is a great way to pull large amounts of data from S3 where it might be otherwise stranded. Combining Athena with Transposit lets you easily enrich the data and take actions based on it.

Try intelligent runbooks and simplified incident resolution