Large file processing (CSV) using AWS Lambda + Step Functions

Large file processing (CSV) using AWS Lambda + Step Functions

Hi. I am currently coding a serverless Email Marketing tool that includes a feature to import "contacts" (email receivers) from a large CSV file. The key point is that I only want to use serverless services, and AWS Lambda 5 minutes timeout may be an issue if your CSV file has millions of rows. For those big files, a long-running serverless strategy is required.

My solution is to use AWS Step Functions to loop AWS Lambda (processing a chunk of records each time) until we finish (I know, it is quite simple, but Amazon just released Step Functions recently ^_^).

Let me show you some code.

The AWS Lambda processing side.

Suppose you have a large CSV file on S3. AWS Lambda code for reading and processing each line looks like this (please note that error catching and some spaghetti code is not included for clarity).

var AWS = require('aws-sdk')
    s3 = new AWS.S3(),
    es = require('event-stream');

var pipeline = s3.getObject({
  Bucket: '<MY-BUCKET>',
  Key: '<MY-CSV-FILE-KEY>'
})
.createReadStream()
.pipe(es.split(/\r|\r?\n/))
.pipe(es.mapSync(function (line) {
    pipeline.pause();
    process(line); // pipeline.resume() is called inside cause 
                   // I want to go one by one (optional).     
})).on('end', function(){
    console.log('pipeline end');
});

Code processes the file records sequentially, but may be interrupted by AWS Lambda timeout if you want to process a big file!

Using AWS Step Functions to loop.

By using AWS Step Functions modeling tool (some sort of "do that", then "do that", etc) we only need two main steps: one step to "process a chunk of records", and a second step to "check if we already have reached the end of file" to continue looping or exit.

That looping workflow is very easy to implement if you take care of AWS Step Functions parameters flow. To sum up, the output parameters from one step is included as the input for the next step. Our lambda looping function will then get as input parameters his own output parameters (ought!), meaning we only need to use some sort of input/output parameter values to manage the main workflow and update the global results.

var chunkSize = 500;
// our looping Lambda function looks like that now...
exports.handler = (event, context, callback) => {
    
// use a global input/output parameter
// (if not included as input (first execution) we initialize the values)
var result = event.hasOwnProperty('results') ? 
    event.results : { processedRows:0 , importedRows:0, errors: [] };


...

// now we take care of chunks, and update output param if EOF is reached
var index = 0, 
    processed = 0,
    end = true;

var pipeline = s3.getObject({
  Bucket: '<MY-BUCKET>',
  Key: '<MY-CSV-FILE-KEY>'
})
.createReadStream()
.pipe(es.split(/\r|\r?\n/))
.pipe(es.mapSync(function (line) {
    index++;
    if ((result.processedRows + 1) < index){ // forward processed records
       if (processed < chunkSize) {      
          processed++; result.processedRows++;
          pipeline.pause();
          process(line);
        } else {
           end = false; // we already processed our chunk, but no EOF.
           pipeline.end();
        }            
   } 
})).on('end', function(){
    result.finished = end; // output if EOF reached.
});

...

}

Step Functions then just need to check the output parameters to manage the loop:

{
  "Comment": "Imports contacts from CSV file.",
  "StartAt": "Import",
  "States": {
    "Import": {
      "Type": "Task",
      "Resource": "arn:...:function:CONTACT_import",
      "Next": "CheckResults"
    },
    "CheckResults" : {
      "Type": "Choice",
      "Choices":[ {
        "And": [
            {  
              	"Variable": "$.code", /* used to check errors */
             	"NumericEquals": 0
            },
            { 	
                "Variable": "$.results.finished", /* EOF? */
                "BooleanEquals": false
            }
      	], 
        "Next": "Import" },    
        {
        "And": [
            {  
              	"Variable": "$.code",
             	"NumericEquals": 0
            },
            { 	
                "Variable": "$.results.finished",
                "BooleanEquals": true
            }
      	], 
        "Next": "SuccessState" }],
      	"Default": "FailState" 
    },
    "SuccessState": {
      "Type": "Succeed"
    },
    "FailState": {
      "Type": "Fail",
      "Cause": "$.results.errors"
    }
  }
}

That's basically how it works.

But still a weak point!

If we take a look at our AWS Lambda code, before processing a chunk, we need to forward the stream to point to the last processed record. That means consuming time and getting close to function timeout!

I'm currently checking the numbers (will update), but how much time is required to forward a big file stream? Do you know some better way to do that?

Please add your suggestions!


---------------------------

Update April / 03 - UX on sendmail.services/AWS Step Functions execution:

Update April / 07 - Take care with parameters size:

In the code I implemented, we use an input/output parameter to pass results along the looping calls, but the size of such attribute is limited. If you exceed a certain size you will get the next message:

error: States.DataLimitExceeded
cause: The state/task 'arn:aws:..:CONTACT_import' 
returned a result with a size exceeding the maximum number of 
characters service limit.

(In my case I tried to log every contact import error, but if you're uploading 5 million contacts you can't log all of the errors, even if you just get 0,1% of errors!)

You can use AWS batch for this

Like
Reply

I recently ran into the same problem.  To avoid moving forward from beginning to the chunk every time, I am using S3 get API with Range Parameter.

Like
Reply

How about using FARGATE ecs:run_task () in lambda (python)

Like
Reply

To view or add a comment, sign in

More articles by Nacho Coll

Others also viewed

Explore content categories