Kinesis Firehose putting JSON objects in S3 without seperator comma
One approach you could consider is to configure data processing for your Kinesis Firehose delivery stream by adding a Lambda function as its data processor, which would be executed before finally delivering the data to the S3 bucket.
DeliveryStream:
...
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
...
BucketARN: !GetAtt MyDeliveryBucket.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt MyTransformDataLambdaFunction.Arn
Type: Lambda
...
And in the Lambda function, ensure that '\n'
is appended to the record's JSON string, see below the Lambda function myTransformData.ts
in Node.js:
import {
FirehoseTransformationEvent,
FirehoseTransformationEventRecord,
FirehoseTransformationHandler,
FirehoseTransformationResult,
FirehoseTransformationResultRecord,
} from 'aws-lambda';
const createDroppedRecord = (
recordId: string
): FirehoseTransformationResultRecord => {
return {
recordId,
result: 'Dropped',
data: Buffer.from('').toString('base64'),
};
};
const processData = (
payloadStr: string,
record: FirehoseTransformationEventRecord
) => {
let jsonRecord;
// ...
// Process the orginal payload,
// And create the record in JSON
return jsonRecord;
};
const transformRecord = (
record: FirehoseTransformationEventRecord
): FirehoseTransformationResultRecord => {
try {
const payloadStr = Buffer.from(record.data, 'base64').toString();
const jsonRecord = processData(payloadStr, record);
if (!jsonRecord) {
console.error('Error creating json record');
return createDroppedRecord(record.recordId);
}
return {
recordId: record.recordId,
result: 'Ok',
// Ensure that '\n' is appended to the record's JSON string.
data: Buffer.from(JSON.stringify(jsonRecord) + '\n').toString('base64'),
};
} catch (error) {
console.error('Error processing record ${record.recordId}: ', error);
return createDroppedRecord(record.recordId);
}
};
const transformRecords = (
event: FirehoseTransformationEvent
): FirehoseTransformationResult => {
let records: FirehoseTransformationResultRecord[] = [];
for (const record of event.records) {
const transformed = transformRecord(record);
records.push(transformed);
}
return { records };
};
export const handler: FirehoseTransformationHandler = async (
event,
_context
) => {
const transformed = transformRecords(event);
return transformed;
};
Once the newline delimiter is in place, AWS services such as Athena will be able to work properly with the JSON record data in the S3 bucket, not just seeing the first JSON record only.
I had this same problem recently, and the only answers I was able to find were basically just to add line breaks ("\n") to the end of every JSON message whenever you posted them to the Kinesis stream, or to use a raw JSON decoder method of some sort that can process concatenated JSON objects without delimiters.
I posted a python code solution which can be found over here on a related Stack Overflow post: https://stackoverflow.com/a/49417680/1546785