How to write more than 25 items/rows into Table for DynamoDB?
function putInHistory(data,cb) {
var arrayOfArray25 = _.chunk(data, 25);
async.every(arrayOfArray25, function(arrayOf25, callback) {
var params = {
RequestItems: {
[TABLES.historyTable]: []
}
};
arrayOf25.forEach(function(item){
params.RequestItems[TABLES.historyTable].push({
PutRequest: {
Item: item
}
})
});
docClient.batchWrite(params, function(err, data) {
if (err){
console.log(err);
callback(err);
} else {
console.log(data);
callback(null, true);
};
});
}, function(err, result) {
if(err){
cb(err);
} else {
if(result){
cb(null,{allWritten:true});
} else {
cb(null,{allWritten:false});
}
}
});
}
You can use lodash to make chunks of data from the array and then use async library's each/every method to do a batchWrite on chunks of 25 elements
You can only send up to 25 items in a single BatchWriteItem request, but you can send as many BatchWriteItem requests as you want at one time. Assuming you've provisioned enough write throughput, you should be able to speed things up significantly by splitting those 20k rows between multiple threads/processes/hosts and pushing them to the database in parallel.
It's maybe a bit heavyweight for that small of a dataset, but you can use AWS Data Pipeline to ingest data from S3. It basically automates the process of creating a Hadoop cluster to suck down your data from S3 and send it to DynamoDB in a bunch of parallel BatchWriteItem requests.
I was looking for some code to do this with the JavaScript SDK. I couldn't find it, so I put it together myself. I hope this helps someone else!
function multiWrite(table, data, cb) {
var AWS = require('aws-sdk');
var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});
// Build the batches
var batches = [];
var current_batch = [];
var item_count = 0;
for(var x in data) {
// Add the item to the current batch
item_count++;
current_batch.push({
PutRequest: {
Item: data[x]
}
});
// If we've added 25 items, add the current batch to the batches array
// and reset it
if(item_count%25 == 0) {
batches.push(current_batch);
current_batch = [];
}
}
// Add the last batch if it has records and is not equal to 25
if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);
// Handler for the database operations
var completed_requests = 0;
var errors = false;
function handler(request) {
return function(err, data) {
// Increment the completed requests
completed_requests++;
// Set the errors flag
errors = (errors) ? true : err;
// Log the error if we got one
if(err) {
console.error(JSON.stringify(err, null, 2));
console.error("Request that caused database error:");
console.error(JSON.stringify(request, null, 2));
}
// Make the callback if we've completed all the requests
if(completed_requests == batches.length) {
cb(errors);
}
}
}
// Make the requests
var params;
for(x in batches) {
// Items go in params.RequestItems.id array
// Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
params = '{"RequestItems": {"' + table + '": []}}';
params = JSON.parse(params);
params.RequestItems[table] = batches[x];
// Perform the batchWrite operation
db.batchWrite(params, handler(params));
}
}