2

I'm having trouble finding examples of what I'm trying to do...

I'd like to create a Lambda function in Java. I thought I'd always use Javascript for Lambda functions, but in this case I'll end up re-using application logic already written in Java, so it makes sense.

In the past I've written Javascript Lambda functions that are triggered by Kinesis events. Super simple, function receives the events as a parameter, do something, voila. I'd like to do the same thing with Java. Really simple :

Kinesis Event(s) -> Trigger Function -> (Java) Receive Kinesis Events, do something with them

Anyone have experience with this kind of use case?

3
  • Your question is not specific. As general as your question is - the answer - it is the same regardless of the language run by the lambda. Commented May 3, 2017 at 20:40
  • I gave some code for a Java Lambda for S3 events here. They should be similar. Commented May 3, 2017 at 20:48
  • johni, as is shown in the answer, module.exports = function(event, context){} is definitely not the same as public PutRecordsResult eventHandler(KinesisEvent event, Context context){} Commented May 6, 2017 at 2:15

1 Answer 1

5

Here is some sample code I wrote to demonstrate the same concept internally. This code forwards events from one stream to another.

Note this code does not handle retries if there are errors in forwarding, nor is it meant to be performant in a production environment, but it does demonstrate how to handle the records from the publishing stream.

import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class KinesisToKinesis {

    private LambdaLogger logger;
    final private AmazonKinesisClient kinesisClient = new AmazonKinesisClient();

    public PutRecordsResult eventHandler(KinesisEvent event, Context context) {
        logger = context.getLogger();
        if (event == null || event.getRecords() == null) {
            logger.log("Event contains no data" + System.lineSeparator());
            return null;
        } else {
            logger.log("Received " + event.getRecords().size() +
                " records from " + event.getRecords().get(0).getEventSourceARN() + System.lineSeparator());
        }

        final Long startTime = System.currentTimeMillis();

        // set up the client
        Region region;
        final Map<String, String> environmentVariables = System.getenv();
        if (environmentVariables.containsKey("AWS_REGION")) {
            region = Region.getRegion(Regions.fromName(environmentVariables.get("AWS_REGION")));
        } else {
            region = Region.getRegion(Regions.US_WEST_2);
            logger.log("Using default region: " + region.toString() + System.lineSeparator());
        }
        kinesisClient.setRegion(region);

        Long elapsed = System.currentTimeMillis() - startTime;
        logger.log("Finished setup in " + elapsed + " ms" + System.lineSeparator());

        PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("usagecounters-global");
        List<PutRecordsRequestEntry> putRecordsRequestEntryList = event.getRecords().parallelStream()
            .map(r -> new PutRecordsRequestEntry()
                    .withData(ByteBuffer.wrap(r.getKinesis().getData().array()))
                    .withPartitionKey(r.getKinesis().getPartitionKey()))
            .collect(Collectors.toList());

        putRecordsRequest.setRecords(putRecordsRequestEntryList);

        elapsed = System.currentTimeMillis() - startTime;
        logger.log("Processed " + putRecordsRequest.getRecords().size() +
            " records in " + elapsed + " ms" + System.lineSeparator());

        PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
        elapsed = System.currentTimeMillis() - startTime;
        logger.log("Forwarded " + putRecordsRequest.getRecords().size() +
                " records to Kinesis " + putRecordsRequest.getStreamName() +
                " in " + elapsed + " ms" + System.lineSeparator());
        return putRecordsResult;
    }
}
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks devonlazarus, this is a helpful example to start with! It was particularly the entry point and how the event parameter is typed that I was confused with, since in JS it's the same regardless of the incoming event. Cheers!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.