Enriching CloudTrail logs

Enriching CloudTrail logs can help make your AWS environment more secure and understandable.

Dan Moore · Oct 17, 2019

Gold Nugget

What is CloudTrail

CloudTrail is an AWS service that records every API call made in your AWS account (well, almost every call since there are a few unsupported services). It writes the call records to an S3 bucket where you can process it further. Capturing all these log files helps with security analysis, auditability and debugging. But it can be hard to find the nuggets of useful information in the torrent of data.

A CloudTrail record looks like this:

{
"Records": [{
"eventVersion": "1.05",
"userIdentity": { ... },
"eventTime": "2019-10-08T17:19:36Z",
"eventSource": "s3.amazonaws.com",
"eventName": "GetBucketObjectLockConfiguration",
"awsRegion": "us-east-2",
"eventID": "dddddddd-dddd-dddd-dddd-dddddddddd",
"sourceIPAddress": "11.11.111.111",
"userAgent": "[S3Console/0.4, aws-internal/3 aws-sdk-java/1.11.633 Linux/4.9.184-0.1.ac.235.83.329.metal1.x86_64 OpenJDK_64-Bit_Server_VM/25.222-b10 java/1.8.0_222 vendor/Oracle_Corporation]",
"errorCode": "ObjectLockConfigurationNotFoundError",
"errorMessage": "Object Lock configuration does not exist for this bucket",
"requestParameters": { ... },
"responseElements": null,
"additionalEventData": { ... },
"requestID": "XXXXXXXXXXXXXX",
"eventType": "AwsApiCall",
"recipientAccountId": "111111111111",
"vpcEndpointId": "vpce-xxxxxxxxxx"
}]
}

The specific contents of each event will differ depending on the service but for an S3 call, there were over forty keys in the JSON object. That's a lot of data. What you're looking for will vary depending on what you are trying to do, but a few attributes are worth highlighting:

All this data is dropped into your S3 bucket. More details on CloudTrail.

The log files are stored on S3 with a key that looks like this: AWSLogs/1111111111/CloudTrail/us-east-2/2019/10/08/11111111111_CloudTrail_us-east-2_20191008T0250Z_A3fA0ZBQgkhsIIXO.json. It stores the files in a structured way, including the account number, region, year, month and day.

How to query

Once this data is in your S3 bucket, you need to decide what to do with it.

Don't do anything with it

If you just want to know that you could go back in time and investigate API calls if needed. Create a destination bucket, and then turn on and configure CloudTrail. Make sure your bucket has appropriate security and possibly lifecycle rules. For instance, if you want to save money, you can push older CloudTrail logs to a different storage class.

Set up querying

If you want to be able to understand the CloudTrail data, you'll need to pull it into an analytics/querying engine for all but the smallest acounts.

You have a number of options. You could do this with Athena since it can read JSON documents. (If you use Athena, you can query it from Slack.) You could pull the data into elasticsearch and use Kibana. Here's a sample architecture. You can also pull them into any other log ingestion service that can read from S3.

Alert on the data

You may want to alert on CloudTrail events. Guardduty is one of many possible solutions. When things that are not normal happen in your AWS infrastructure, you should investigate and/or take action. Depending on the severity of the issue and the maturity of your organization, you could even set up automatic actions: if we see any files being read from super-secret-bucket, alert security and shut down service-that-depends-on-bucket.

Why enrich the log files

CloudTrail logs give you a lot of information about the API access. IP address, date and time, service accessed. But adding even information can help make unexpected behavior obvious and increase your understanding of the system. You can create a set of rules which examines log files and adds external information. Below are examples of data you can layer onto your CloudTrail logs.

Abnormal behavior

Outside events

Events within your company or execution environment

What all of these have in common is that they may affect the security, performance or operation of your system and they could either could never be determined from the CloudTrail logs alone (this user is on vacation) or would require additional analysis to be noticed (this API call was at 3am). You may want to assign a priority level for further investigation or action. Applying these business rules make the logs more valuable, and you can do this before you push the data into your querying or alerting infrastructure.

How to enrich the log files

To do add this additional data, you need to read the log files, apply business rules for evenets and then store the new data. Then, point the logging or alerting services at the enriched logs. Transposit can help. The below code provides two business rules:

The first step to enriching the files is to pull the files from the CloudTrail bucket. Note that this operation will timeout if there are too many objects, but for a proof of concept this is fine. For a production system, use the region and date key suffixes (us-east-2/2019/10/08, for example) to limit the objects listed.

  const bucket_name = 'my-cloudtrail';
const processed_prefix = 'processed/';
const stash_suffix = "-processed";
const results = api.run("this.list_objects",{
bucket_name: bucket_name,
log_path: 'AWSLogs/444444444444/CloudTrail/'
});

this.list_objects looks like this (we're filtering out empty objects):

SELECT * FROM aws_s3.list_objects_v2
WHERE Bucket=@bucket_name
AND prefix=@log_path
AND Size > 0

Below, we are skipping files that have already been processed by checking to see if there is an entry in the stash (below we'll see when we add the key). In addition, we use the free ipstack service to map IP addresses to physical locations, so we want to minimize the number of calls. We just cache them in memory.

  const ip_address_to_country = {}; 
results.forEach((keyObj) => {
const result_records = [];
const key = keyObj.Key;
if (stash.get(key+stash_suffix)) {
return;
}

Load each CloudTrail log object's contents into memory.

    const content = api.query("SELECT * FROM aws_s3.get_object WHERE Bucket=@bucket_name AND Key=@key",{key:key, bucket_name: bucket_name});
content.forEach((record_obj) => {
const records = record_obj.Records;
const record_keys = Object.keys(records);
record_keys.forEach((rk) => {
const entry = records[rk];

Now we have the file contents and can start enriching it according to my business rules. Again, if the IP address is from outside of the US, we mark the priority of the record 'high'. (We use the xpriority key to avoid namespace collisions with AWS services.) The country code is also stored.

        if (entry.sourceIPAddress) {
const ip = entry.sourceIPAddress;
if (ip_address_to_country[ip] === undefined) {
const country_code = api.run("this.get_country_from_ip",{ipaddress:ip})[0].country_code;
ip_address_to_country[ip] = country_code;
}
if (ip_address_to_country[ip] != 'US') {
entry.xpriority = "HIGH";
entry.xcountry_code = ip_address_to_country[ip];
}
}

The second rule is if the eventSource is the IAM service, then mark the priority as high.

        if (entry.eventSource == "iam.amazonaws.com") {
entry.xpriority = "HIGH";
}

Now that the rules have been applied, save off the resulting object to an array.

        result_records.push(entry);
}); // end of processing one object
}); // end of processing list of objects

The record has been enriched, but we need to store it. We store it with the same keyname, but add a prefix so that it's easy to point other systems at the enriched records. You can also perform any other data transformation needed. For example, since we're using Athena, which needs each record to be on a separate line, we use the join method to do so. We also can't gzip the file within Transposit (yet) so we change the keyname to end in .json.

    // athena wants json with each record on a different line
const body = result_records.map(r => JSON.stringify(r)).join("\n");
// can't gzip it just yet
const processed_key = (processed_prefix + key).replace(".gz","");
const res = api.query("SELECT * FROM aws_s3.put_object WHERE Bucket=@bucket_name AND Key=@key AND $body=@body", {
bucket_name: bucket_name,
key: processed_key,
body: body
});

When the log object is successfully processed, we record that in the stash (as first mentioned above) so it won't be reprocessed in the future. The stash has size limits; if this was a production system, we should use DynamoDB.

    if (res != "success") {
console.log("error processing: "+key);
} else {
stash.put(key+stash_suffix,true);
}
});

Conclusion

Enriching your CloudTrail events gives you more insight into how your AWS infrastructure is used. You can write whatever business rules make sense to highlight AWS usage that is of interest or concern. With Transposit, you can also call out to other APIs, including your own, to enrich the data further. Whether you call external services to inject outside data or simply tag behaviors for further investigation, Transposit makes it easy to add more data into your CloudTrail logs.

Try intelligent runbooks and simplified incident resolution