При записи данных временных рядов из кадра данных pandas в корзину InfluxDB, чтобы проверить, существует ли уже определенная строка данных в корзине (и, таким образом, предотвратить повторную запись данных).
Формат данных временного ряда, который существует в фрейме данных pandas (пример):
epoch,open,high,low,close,volume
1332374520.0,2.341,2.341,2.341,2.341,1.0
1332374700.0,2.343,2.343,2.343,2.343,1.0
1332374940.0,2.344,2.344,2.344,2.344,1.0
1332375420.0,2.344,2.344,2.344,2.344,2.0
1332375660.0,2.344,2.344,2.344,2.344,2.0
1332376080.0,2.344,2.344,2.344,2.344,1.0
Текущая программа Python, как показано ниже, не обнаруживает, что те же данные уже записаны в корзину database. Если программа запускается снова и снова, вывод оператора print должен быть виден, уведомляя об обнаружении повторяющихся данных.
import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
# Example OHLCV data
data = {
"epoch": [1330902000, 1330902060, 1330902120],
"open": [2.55, 2.532, 2.537],
"high": [2.55, 2.538, 2.549],
"low": [2.521, 2.531, 2.537],
"close": [2.534, 2.538, 2.548],
"volume": [150, 69, 38]
}
concat_of_all_dfs = pd.DataFrame(data)
def data_point_exists(epoch, bucket, org):
query = f'''
from(bucket: "{bucket}")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "ohlcv")
|> filter(fn: (r) => r["epoch"] == {epoch})
'''
result = query_api.query(org=org, query=query)
return len(result) > 0
if __name__ == "__main__":
# Database credentials
token = os.getenv('INFLUXDB_TOKEN')
bucket = "bucket_test"
org = "organisation_test"
url = "http://localhost:8086"
# Initialize InfluxDB Client
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# Write data points one by one
for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
epoch = row['epoch']
if not data_point_exists(epoch, bucket, org):
point = Point("ohlcv") \
.field("epoch", row['epoch']) \
.field("open", row['open']) \
.field("high", row['high']) \
.field("low", row['low']) \
.field("close", row['close']) \
.field("volume", row['volume'])
write_api.write(bucket=bucket, org=org, record=point)
else:
print(f"Data point for epoch {epoch} already exists. Skipping...")
client.close()
Есть ли какие-либо ошибки в приведенном выше коде, которые могли бы помешать обнаружению повторяющихся данных (возможно, в функции запроса, видимой через скрипт Flux, или где-либо еще)?
извините, я не вижу в коде ничего, что могло бы предотвратить/обнаружить дубликаты, посмотрите, нужна ли вам функциональность pandas drop_duplications pandas.pydata.org/pandas-docs/stable/reference/api/…
@ticktalk Спасибо за ваше предложение. Если интересно, я добавил ниже ответ о том, как я решил проблему.






Я решил проблему, изменив код запроса для поиска epoch в таблице для каждой итерации. Было бы хорошо оптимизировать эту программу для пакетной записи в базу данных, сохранив при этом функциональность проверки дубликатов, так как это довольно медленно, но пока это работает.
Обратите внимание, что некоторые функции, относящиеся к фрейму данных pandas, опущены.
import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
import datetime
def data_point_exists(epoch, bucket, org):
# Define the Flux query to fetch all fields
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: 0) // adjust the range as needed
|> filter(fn: (r) => r["_measurement"] == "ohlcv")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "epoch", "open", "high", "low", "close", "volume"])
'''
# Execute the query
result = query_api.query(org=org, query=flux_query)
# Search to see if epoch exists
for table in result:
for record in table.records:
if record["epoch"] == epoch:
return True
return False
if __name__ == "__main__":
# Database credentials
token = os.getenv('INFLUXDB_TOKEN')
bucket = "bucket"
org = "org"
url = "http://localhost:8086"
# Initialize InfluxDB Client
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# Write data points one by one
for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
epoch = row['epoch']
flag = data_point_exists(epoch, bucket, org)
if flag:
print("Repeat detected")
else:
point = Point("ohlcv") \
.field("epoch", row['epoch']) \
.field("open", row['open']) \
.field("high", row['high']) \
.field("low", row['low']) \
.field("close", row['close']) \
.field("volume", row['volume'])
write_api.write(bucket=bucket, org=org, record=point)
client.close()
Слишком большое количество заголовков раздражает читателей. Старайтесь избегать жирного текста и заголовков, когда в этом нет необходимости.