Я сканирую все элементы из таблицы DynamoDB — в функции Lambda — с помощью DocumentClient
. Затем я перебираю каждый элемент и извлекаю полезную нагрузку, которая мне нужна. Я буду использовать этот элемент из полезной нагрузки в качестве параметра с ExpressionAttributeValues
в новом запросе.
Все работает денди независимо. Проблема связана с использованием асинхронной функции queryItems
, когда она вложена в метод массива forEach(). Я получаю ошибку синтаксического анализа с функцией queryItems. Я могу запросить таблицу, когда вызову функцию вне цикла, но как еще я буду запрашивать каждый элемент независимо?
Я не знаю, как с этим справиться.
'use strict';
const aws = require('aws-sdk');
const docClient = new aws.DynamoDB.DocumentClient();
var paramsAll = {
TableName: 'MyTable',
Select: "ALL_ATTRIBUTES"
};
exports.handler = async (event, context) => {
try {
let arr = [];
let sequence = '';
//scan all items in table
docClient.scan(paramsAll, function(err, data) {
if (err) {
//handle error
}
else {
//Loop through each item in the table:
let items = (data.Items);
items.forEach(function(Item) {
let p = (Item.payload);
//Extract sequence from the payload
sequence = (p.seq);
arr.push(sequence);
//perform other function with this array (not listed for brevity)
});
//Here is where I'm having the issue:
arr.forEach(function(Item) {
//Pass these items as a paramater within queryItems function but getting Parsing Error: unexpected token queryItems
const results = await queryItems(Item);
//do something with the results...
})
}
});
}
catch (err) {
return { error: err };
}
};
async function queryItems(p) {
try {
var params = {
TableName: 'MyTable',
KeyConditionExpression: '#seq = :value',
ExpressionAttributeValues: { ':value': p },
ExpressionAttributeNames: { '#seq': 'seq' }
};
const data = await docClient.query(params).promise();
return data;
}
catch (err) {
return err;
}
}
Я определенно столкнулся с подобной проблемой. Я считаю, что происходит просто проблема с синтаксисом Javascript, где await
ing queryItems
внутри синхронной функции, предоставленной forEach
, приведет к ошибке. (Хотя при запуске кода я получаю конкретную ошибку "SyntaxError: await is only valid in async functions and the top level bodies of modules"
, так что может быть что-то еще.)
Я не вижу ничего плохого в запросах DynamoDB, но предложения hoangdv точны. В частности, я бы также предложил использовать стиль обещания для scan
, и хотя for...loop
определенно будет работать, использование Promise.all
и map
будет намного быстрее для выполнения всех запросов. Вот как я бы изменил код:
'use strict';
const aws = require('aws-sdk');
const docClient = new aws.DynamoDB.DocumentClient();
// avoid var unless you specifically require it's hoisting behavior.
const paramsAll = {
TableName: 'MyTable',
Select: "ALL_ATTRIBUTES" // most likely not needed, I'd review this section of the docs: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html#DDB-Scan-request-Select
};
exports.handler = async (event, context) => {
try {
// unless you need to assign a new array to this variable, it is better practice to use const instead.
const arr = [];
// let sequence = ''; // see line 24 for why I commented this out.
// scan all items in table.
// Destructure Items out of the response.
// You may also need to continue scanning with the LastEvaluatedKey depending on the size of your table, and/or your use case.
// You'd continue scanning in a while loop, for example.
const { Items, LastEvaluatedKey } = await docClient.scan(paramsAll).promise();
// push the sequence to the arr.
// There is most likely a reason you omitted for brevity to have sequence defined above,
// but since this example doesn't need it above, I've omitted it entirely
Items.forEach(Item => {
const p = Item.payload;
arr.push(p.seq);
});
// use a for loop or map here instead. forEach will return undefined, which cannot be await'ed.
// instead, map will return a new array of Promises (since the callback is async).
// Then, you can use Promise.all to await until each Promise in the array is resolved.
// Keep in mind, depending on how many items you are iterating through, you may run into DynamoDB's ThrottlingException.
// You would have to batch the queries (in other words, split the arr into pieces, and iterate over each piece), which would have to be done before using map. Then, sleep for a few milliseconds before starting on the next piece.
// I doubt the queries will be quick enough to cause this when using a for loop, though.
await Promise.all(arr.map(async Item => {
const results = await queryItems(Item);
// do something with the results...
}));
}
catch (err) {
// Again, not sure what the use case is, but just FYI this is not a valid return value if this lambda function is intended for use with using API Gateway.
// See here :) https://docs.aws.amazon.com/lambda/latest/dg/services-apigateway.html#apigateway-types-transforms
return { error: err };
}
};
// Presumably, MyTable has a partitionKey of seq, otherwise this KeyConditionExpression is invalid.
async function queryItems(p) {
try {
var params = {
TableName: 'MyTable',
KeyConditionExpression: '#seq = :value',
ExpressionAttributeValues: { ':value': p },
ExpressionAttributeNames: { '#seq': 'seq' }
};
const data = await docClient.query(params).promise();
return data;
}
catch (err) {
return err;
}
}
Комментарий от @hoangvd заставил меня понять, что это, скорее всего, простая синтаксическая ошибка JavaScript, и я начал удалять этот пост. Ваши подробные комментарии и объяснения очень полезны! Это прекрасно работает и многое объясняет. Большое спасибо!
Ваша проблема заключается в том, как вы ожидаете в цикле for, лучше всего использовать Promise.all()
с map
для ожидания внутри цикла:
await Promise.all(arr.map(async Item => {
const results = await queryItems(Item);
// do something with the results...
}));
Однако, похоже, я не очень хорошо понимаю вашу логику.
MyTable
, но не разбиваете ее на страницы, то есть вы получаете только данные объемом до 1 МБ.seq
, а затем еще раз читаете каждый элемент из MyTable
, на этот раз используя Query
и seq
в качестве ключа?Я загружаю сообщения MQTT в таблицу и почти в реальном времени считываю сообщения, чтобы получить правильную последовательность событий, поскольку порядок MQTT не сохраняется. Поскольку он обрабатывается так быстро, я предполагаю, что лимита в 1 МБ будет достаточно? Каждая полезная нагрузка имеет связанную с ней последовательность, поэтому я беру то, что есть (сканирую все), в правильном порядке и удаляю по мере обработки каждого события, где в игру вступают queryItems. Я предполагаю, что есть более простой метод для этого безумия, но я еще не обнаружил его.
Во-первых, если все, нет гарантии заказа со сканом. Во-вторых, когда ваша таблица превышает 1 МБ, ваше сканирование будет возвращать только частичные результаты, все, что существует после чтения 1 МБ, будет проигнорировано.
Также преобразуйте
docClient.scan
в стиль обещаний и используйтеfor..loop
вместоforEach