Amazon Web Services Kinesis support for Node.js
Introduction
Amazon Kinesis is a family of services provided by Amazon Web Services for processing and analyzing real-time streaming data at a large scale.
Objects
| Icon | Description |
|---|---|
![]() |
NodeJS AWS Kinesis Producer |
![]() |
NodeJS AWS Kinesis Consumer |
![]() |
NodeJS AWS Unknown Kinesis Producer |
![]() |
NodeJS AWS Unknown Kinesis Consumer |
Supported Libraries
The following libraries are supported:
| Library | Require | Supported Producer APIs | Supported Consumer APIs |
|---|---|---|---|
| AWS SDK V2 | aws-sdk | putRecord putRecords |
getShardIterator getRecords |
| AWS SDK V3 | @aws-sdk/client-kinesis | PutRecordCommand PutRecordsCommand |
GetShardIteratorCommand GetRecordsCommand |
Supported links
The com.castsoftware.wbslinker will create a callLink between the Kinesis Producer and Kinesis Consumer which have the same stream name.
Examples
API V2
When analyzing the following source code:
const AWS = require("aws-sdk");
const REGION = "us-east-1";
const kinesis = new AWS.Kinesis({ region: REGION });
const streamName = "data-stream-v2";
const params = {
StreamName: streamName,
Data: JSON.stringify({ message: "test data" }),
PartitionKey: "partition1"
};
const putRecord = async () => {
try {
const data = await kinesis.putRecord(params).promise();
console.log("Success", data.SequenceNumber);
} catch (err) {
console.error("Error", err);
}
};
const getRecords = async () => {
try {
const iteratorParams = {
StreamName: streamName,
ShardId: "shardId-000000000000",
ShardIteratorType: "LATEST"
};
const iterator = await kinesis.getShardIterator(iteratorParams).promise();
const recordsParams = {
ShardIterator: iterator.ShardIterator
};
const data = await kinesis.getRecords(recordsParams).promise();
} catch (err) {
console.log("Error", err);
}
};
putRecord();
getRecords();
The following results will be produced:

API V3
When analyzing the following source code:
const {
KinesisClient,
PutRecordCommand,
GetShardIteratorCommand,
GetRecordsCommand
} = require("@aws-sdk/client-kinesis");
const REGION = "us-east-1";
const kinesisClient = new KinesisClient({ region: REGION });
const streamName = "data-stream-v3";
const params = {
StreamName: streamName,
Data: Buffer.from(JSON.stringify({ message: "test data" })),
PartitionKey: "partition1"
};
const putRecord = async () => {
try {
const data = await kinesisClient.send(new PutRecordCommand(params));
console.log("Success", data.SequenceNumber);
} catch (err) {
console.error("Error", err);
}
};
const getRecords = async () => {
try {
const iteratorParams = {
StreamName: streamName,
ShardId: "shardId-000000000000",
ShardIteratorType: "LATEST"
};
const iterator = await kinesisClient.send(new GetShardIteratorCommand(iteratorParams));
const recordsParams = {
ShardIterator: iterator.ShardIterator
};
const data = await kinesisClient.send(new GetRecordsCommand(recordsParams));
} catch (err) {
console.log("Error", err);
}
};
putRecord();
getRecords();
The following results will be produced:




