Amazon Web Services Kinesis support for Node.js


Introduction

Amazon Kinesisexternal link is a family of services provided by Amazon Web Services for processing and analyzing real-time streaming data at a large scale.

Objects

Icon Description
NodeJS AWS Kinesis Producer
NodeJS AWS Kinesis Consumer
NodeJS AWS Unknown Kinesis Producer
NodeJS AWS Unknown Kinesis Consumer

Supported Libraries

The following libraries are supported:

Library Require Supported Producer APIs Supported Consumer APIs
AWS SDK V2 aws-sdk putRecord
putRecords
getShardIterator
getRecords
AWS SDK V3 @aws-sdk/client-kinesis PutRecordCommand
PutRecordsCommand
GetShardIteratorCommand
GetRecordsCommand

The com.castsoftware.wbslinker will create a callLink between the Kinesis Producer and Kinesis Consumer which have the same stream name.

Examples

API V2

When analyzing the following source code:

const AWS = require("aws-sdk");

const REGION = "us-east-1";
const kinesis = new AWS.Kinesis({ region: REGION });

const streamName = "data-stream-v2";

const params = {
  StreamName: streamName,
  Data: JSON.stringify({ message: "test data" }),
  PartitionKey: "partition1"
};

const putRecord = async () => {
  try {
    const data = await kinesis.putRecord(params).promise();
    console.log("Success", data.SequenceNumber);
  } catch (err) {
    console.error("Error", err);
  }
};

const getRecords = async () => {
  try {
    const iteratorParams = {
      StreamName: streamName,
      ShardId: "shardId-000000000000",
      ShardIteratorType: "LATEST"
    };
    const iterator = await kinesis.getShardIterator(iteratorParams).promise();
    const recordsParams = {
      ShardIterator: iterator.ShardIterator
    };
    const data = await kinesis.getRecords(recordsParams).promise();
  } catch (err) {
    console.log("Error", err);
  }
};

putRecord();
getRecords();

The following results will be produced:

API V3

When analyzing the following source code:

const {
  KinesisClient,
  PutRecordCommand,
  GetShardIteratorCommand,
  GetRecordsCommand
} = require("@aws-sdk/client-kinesis");

const REGION = "us-east-1";
const kinesisClient = new KinesisClient({ region: REGION });

const streamName = "data-stream-v3";

const params = {
  StreamName: streamName,
  Data: Buffer.from(JSON.stringify({ message: "test data" })),
  PartitionKey: "partition1"
};

const putRecord = async () => {
  try {
    const data = await kinesisClient.send(new PutRecordCommand(params));
    console.log("Success", data.SequenceNumber);
  } catch (err) {
    console.error("Error", err);
  }
};

const getRecords = async () => {
  try {
    const iteratorParams = {
      StreamName: streamName,
      ShardId: "shardId-000000000000",
      ShardIteratorType: "LATEST"
    };
    const iterator = await kinesisClient.send(new GetShardIteratorCommand(iteratorParams));
    const recordsParams = {
      ShardIterator: iterator.ShardIterator
    };
    const data = await kinesisClient.send(new GetRecordsCommand(recordsParams));
  } catch (err) {
    console.log("Error", err);
  }
};

putRecord();
getRecords();

The following results will be produced: