Как обеспечить быструю и надежную миграцию наборов данных на серверную часть fiware?

Передача очень больших наборов данных на fiware серверную часть: SourceDB ->Orion -> Cygnus -> Postgres. Для этого я пишу скрипт Python, извлекающий строки, и для каждой извлеченной строки отправляю полезную нагрузку в Orion.

Скрипт запускается с интервалом 150 мс (0,15 с), но, к моему удивлению, после примерно 10 итераций только два значения (первое и последнее отправленные полезные данные) были сохранены в приемнике Postgres. Это означает, что 80% наборов данных не сохраняются в приемнике.

Скрипт:

import psycopg2
from time import sleep
from config import config
from tqdm import tqdm
import requests
import json

def val_json():
    db = "select to_json(d) from (  select \
        n.noise_data as measurand, \
        n.factor as \"sonometerClass\", \
        to_timestamp(n.seconds) as \"dateObserved\", \
        l.description as name, \
            json_build_object( \
                'coordinates', \
                json_build_array(l.node_lon, l.node_lat) \
            ) as location \
        from noise as n \
            inner join deployment as d on \
                d.deployment_id = n.deployment_id \
            inner join location as l on \
                l.location_id = d.location_id \
        order by n.seconds asc \
    ) as d"
    return db

def main():

    url = 'http://localhost:1026/v2/entities/002/attrs?options=keyValues'
    headers = {"Content-Type": "application/json", \
               "fiware-service": "urbansense",  \
               "fiware-servicepath": "/basic"}

    conn = None
    try:
        params = config()
        with psycopg2.connect(**params) as conn:
            with conn.cursor(name='my_cursor') as cur:
                cur.itersize = 5000
                cur.execute(val_json())

                for row in tqdm(cur):
                    jsonData = json.dumps(row)
                    if jsonData.startswith('[') and jsonData.endswith(']'):
                        jsonData = jsonData[1:-1]
                        print()
                        print(jsonData)
                    requests.post(url, data= jsonData, headers=headers)
                    sleep(0.15)

                cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

if __name__ == '__main__':
    main()

Первые десять итераций (полезные нагрузки):

$python3 noiselevelObserved.py
0it [00:00, ?it/s]
{"measurand": 64.8, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:15+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
1it [00:00,  1.75it/s]
{"measurand": 58.8, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:16+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
2it [00:00,  2.23it/s]
{"measurand": 56.5, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:17+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
3it [00:00,  2.76it/s]
{"measurand": 61.1, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
4it [00:01,  3.31it/s]
{"measurand": 108.5, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Pr. Liberdade Cardosas", "location": {"coordinates": [-8.611119, 41.146023]}}
5it [00:01,  3.86it/s]
{"measurand": 56.5, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
6it [00:01,  4.35it/s]
{"measurand": 59.9, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
7it [00:01,  4.78it/s]
{"measurand": 97.2, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "D. Manuel II", "location": {"coordinates": [-8.625192, 41.148558]}}
8it [00:01,  5.12it/s]
{"measurand": 108.6, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Pr. Liberdade Cardosas", "location": {"coordinates": [-8.611119, 41.146023]}}
9it [00:01,  5.41it/s]
{"measurand": 57.1, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
10it [00:02,  5.63it/s]
{"measurand": 53.9, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:20+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}

Чтение значений атрибутов сохраняется в postgres:

postgres=# select  * from urbansense.basic_002_noiselevelobserved ;
  recvtimets   |         recvtime         | fiwareservicepath | entityid |     entitytype     |    attrname    |    attrtype     |               attrvalue               | attrmd
---------------+--------------------------+-------------------+----------+--------------------+----------------+-----------------+---------------------------------------+--------
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | dateObserved   | Text            | 1970-01-01T01:00:15+01:00             | []
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | latitude       | Number          | 41.1591                               | []
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | location       | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | longitude      | Number          | -8.65915                              | []
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | measurand      | Number          | 64.8                                  | []
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | name           | Text            | Trindade                              | []
 1559045918129 | 2019-05-28T12:18:38.129Z | /basic            | 002      | NoiseLevelObserved | sonometerClass | Number          | 1                                     | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | dateObserved   | Text            | 1970-01-01T01:00:20+01:00             | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | latitude       | Number          | 41.1591                               | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | location       | StructuredValue | {"coordinates":[-8.63041,41.158091]}  | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | longitude      | Number          | -8.65915                              | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | measurand      | Number          | 53.9                                  | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | name           | Text            | Casa da Musica                        | []
 1559045919723 | 2019-05-28T12:18:39.723Z | /basic            | 002      | NoiseLevelObserved | sonometerClass | Number          | 0                                     | []
(14 rows)

Изменение частоты стрельбы на 1-секундный интервал не дает большого улучшения, только 3 набора результатов (полезные нагрузки) были сохранены (потеряно 70%):

postgres=# select  * from urbansense.basic_002_noiselevelobserved ;
  recvtimets   |         recvtime         | fiwareservicepath | entityid |     entitytype     |    attrname    |    attrtype     |               attrvalue               | attrmd
---------------+--------------------------+-------------------+----------+--------------------+----------------+-----------------+---------------------------------------+--------
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | dateObserved   | Text            | 1970-01-01T01:00:15+01:00             | []
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | latitude       | Number          | 41.1591                               | []
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | location       | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | longitude      | Number          | -8.65915                              | []
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | measurand      | Number          | 64.8                                  | []
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | name           | Text            | Trindade                              | []
 1559046840569 | 2019-05-28T12:34:00.569Z | /basic            | 002      | NoiseLevelObserved | sonometerClass | Number          | 1                                     | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | dateObserved   | Text            | 1970-01-01T01:00:18+01:00             | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | latitude       | Number          | 41.1591                               | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | location       | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | longitude      | Number          | -8.65915                              | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | measurand      | Number          | 56.5                                  | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | name           | Text            | Trindade                              | []
 1559046845620 | 2019-05-28T12:34:05.620Z | /basic            | 002      | NoiseLevelObserved | sonometerClass | Number          | 0                                     | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | dateObserved   | Text            | 1970-01-01T01:00:20+01:00             | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | latitude       | Number          | 41.1591                               | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | location       | StructuredValue | {"coordinates":[-8.63041,41.158091]}  | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | longitude      | Number          | -8.65915                              | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | measurand      | Number          | 53.9                                  | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | name           | Text            | Casa da Musica                        | []
 1559046850679 | 2019-05-28T12:34:10.679Z | /basic            | 002      | NoiseLevelObserved | sonometerClass | Number          | 0                                     | []
(21 rows)

Честно говоря, я бы не хотел, чтобы частота стрельбы достигала Второй из-за размера данных, которые необходимо перенести для продолжения моих исследований. Когда я впервые попробовал это с интервалом в 1 секунду, я понял, что на это уйдут месяцы (может быть, 4).

Вопрос:Orion CB (или, может быть, Cygnus) не создан для такой мощности получения значений объектов/атрибутов с такой скоростью (150 мс / 0,15 с), или, может быть, Cygnus не является достаточно умен для получения уведомлений от Orion с такой скоростью?.

Я буду признателен за любое предложение, чтобы обеспечить сохранение всех значений в кратчайшие сроки.

Брокер был протестирован на пропускную способность и способен обрабатывать тысячи запросов в секунду. Конечно, никогда не тестировал с вашими точными запросами. Однако я не могу представить, что узким местом здесь является orion... Я очень мало знаю о Cygnus, и на вашем месте я бы написал скрипт на python для получения уведомлений и сохранения их в Postgres. Если повезет, вы сможете использовать скорость стрельбы не менее сотни запросов в секунду...

kzangeli 28.05.2019 16:23

@kzangeli «... Я бы написал скрипт на Python для получения уведомлений и сохранения их в Postgres ...» Я не понимаю, что вы здесь имеете в виду. Чтобы обойти Cygnus, что-то вроде: SourceDB ->Orion -> Postgres ?

arilwan 28.05.2019 16:34

Как-то так да. Может быть вариант, если все остальное не работает. Очень легко (если вы свободно владеете Python) написать скрипт для получения уведомлений от брокера. Вы можете взглянуть на аккумулятор в репозитории Orion. Мы используем его для функциональных тестов. Я не разработчик Python, но я полагаю, что довольно легко писать в Postgres из того же скрипта Python.

kzangeli 29.05.2019 11:07

@kzangeli А, просто проверяю журналы Cygnus и брокера. Для брокера «скорость», с которой он запускает из журнала, подтверждает, что он получает полезные данные с входящей скоростью. Однако для Cygnus есть небольшая задержка (около 4 секунд) в «скорости стрельбы». Может быть, для операций записи в базу данных может быть проблема с памятью?

arilwan 29.05.2019 13:28

«Или, может быть, Cygnus недостаточно умен, чтобы получать уведомления от Orion с такой скоростью» -> уведомление каждые 0,15 с составляет около 6,66 TPS. Это слишком мало. И Orion, и Cygnus должны с этим справиться. Возможно, в Cygnus есть неправильная конфигурация.

fgalan 29.05.2019 14:02

Чтобы подтвердить мое предположение о 6,66 уведомлениях в секунду... сколько сущностей ваш скрипт обновляет в каждом кадре (т.е. сколько сущностей обновляется каждые 0,15 секунды)?

fgalan 29.05.2019 14:04

@fgalan 3 объекта для каждого уведомления. Относится ли это к значению "throttling": 5 в подписке Orion-Cygnus, которую я установил?

arilwan 29.05.2019 14:05

Таким образом, фактическая нагрузка составляет 3 x 6,66 TPS = 20 TPS. Еще до малого. Да, "throttling": 5 может повлиять. Я бы рекомендовал изменить вашу подписку, чтобы удалить ее.

fgalan 29.05.2019 14:09

@fgalan Да!. Именно в этом причина. Теперь он работает отлично. Большое спасибо за ваше время.

arilwan 29.05.2019 14:33
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
9
41
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Судя по обсуждению в комментариях к вопросу, решение этой проблемы заключалось в удалении параметра throttling в подписке. Это логично: при троттлинге некоторые уведомления не отправляются (в данном конкретном случае 80% всех уведомлений).

Другие вопросы по теме