Невозможно обработать большой объем данных с помощью цикла for

Я загружаю OHLC за 2 года за 10 тысяч символов и записываю его в базу данных. Когда я пытаюсь вытащить весь список, он вылетает (но не вылетает, если я загружаю 20%):

import config 
from alpaca_trade_api.rest import REST, TimeFrame
import sqlite3
import pandas as pd
import datetime
from dateutil.relativedelta import relativedelta

start_date = (datetime.datetime.now() - relativedelta(years=2)).date()
start_date = pd.Timestamp(start_date, tz='America/New_York').isoformat()
end_date = pd.Timestamp(datetime.datetime.now(), tz='America/New_York').isoformat()
conn = sqlite3.connect('allStockData.db') 
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
origin_symbols = pd.read_sql_query("SELECT  symbol, name from stock", conn)
df = origin_symbols
df_dict = df.to_dict('records')
startTime = datetime.datetime.now()
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
temp_data = []

for key in df_dict:
    symbol = key['symbol']
    print(f"downloading ${symbol}")
    # stock_id =  key['id']

    barsets = api.get_bars_iter(symbol, TimeFrame.Day, start_date, end_date)
    barsets = list(barsets)

    for index, bar in enumerate(barsets):
            bars =  pd.DataFrame({'date': bar.t.date(), 'symbol': symbol, 'open': bar.o, 'high': bar.h, 'low': bar.l, 'close': bar.c, 'volume': bar.v, 'vwap': bar.vw}, index=[0])
            temp_data.append(bars)

print("loop complete")
data = pd.concat(temp_data)

# write df back to sql, replacing the previous table
data.to_sql('daily_ohlc_init', if_exists='replace', con=conn, index=True)

endTime = datetime.datetime.now()

print(f'time elapsed to pull data was {endTime - startTime}')

Чтобы это работало, я добавляю эту строку после df_dict, чтобы ограничить загружаемые символы:

df_dict = df_dict[0:2000]

Это позволит мне писать в базу данных, но мне нужен весь словарь (около 10 тысяч символов). Как мне записать в базу данных без сбоя?

Вам действительно нужно заменить предыдущую таблицу? Как насчет if_exists='append' и записи в таблицу кусками df_dict размером 2000?

Mercury 19.11.2022 10:06
14 Задание: Типы данных и структуры данных Python для DevOps
14 Задание: Типы данных и структуры данных Python для DevOps
проверить тип данных используемой переменной, мы можем просто написать: your_variable=100
Python PyPDF2 - запись метаданных PDF
Python PyPDF2 - запись метаданных PDF
Python скрипт, который будет записывать метаданные в PDF файл, для этого мы будем использовать PDF ридер из библиотеки PyPDF2 . PyPDF2 - это...
Переменные, типы данных и операторы в Python
Переменные, типы данных и операторы в Python
В Python переменные используются как место для хранения значений. Пример переменной формы:
Почему Python - идеальный выбор для проекта AI и ML
Почему Python - идеальный выбор для проекта AI и ML
Блог, которым поделился Harikrishna Kundariya в нашем сообществе Developer Nation Community.
Как автоматически добавлять котировки в заголовки запросов с помощью PyCharm
Как автоматически добавлять котировки в заголовки запросов с помощью PyCharm
Как автоматически добавлять котировки в заголовки запросов с помощью PyCharm
Анализ продукта магазина на Tokopedia
Анализ продукта магазина на Tokopedia
Tokopedia - это место, где продавцы могут продавать свои товары. Товар должен быть размещен на витрине, чтобы покупателям было легче найти товар...
2
1
55
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку вы упомянули, что можете заставить его работать с 2000 записями df_dict за раз, возможный простой подход может быть таким:

api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)

num_records = len(df_dict)
chunk_size = 2000
num_passes = num_records // chunk_size + int(num_records % chunk_size != 0)

for i in range(num_passes):
    start = i * chunk_size
    end = min((i + 1) * chunk_size, num_records)
    df_chunk = df_dict[start: end]

    temp_data = []
    for key in df_chunk:
        symbol = key['symbol']
        print(f"downloading ${symbol}")
        barsets = api.get_bars_iter(symbol, TimeFrame.Day, start_date, end_date)
        barsets = list(barsets)

        for index, bar in enumerate(barsets):
            bars =  [bar.t.date(), symbol, bar.o, bar.h, bar.l, bar.c, bar.v, bar.vw]
            temp_data.append(bars)
    
    # should be a bit more efficient to create a dataframe just once
    columns = ['date', 'symbol', 'open', 'high', 'low', 'close', 'volume', 'vwap']
    data = pd.DataFrame(temp_data, columns=columns)

    # should delete previous table when writing first chunk, then start appending from next passes through df_dict
    data.to_sql('daily_ohlc_init', if_exists='replace' if i == 0 else 'append', con=conn, index=True)

    print(f"Internal loop finished processing records {start} to {end} out of {num_records}.")

endTime = datetime.datetime.now()
print(f'time elapsed to pull data was {endTime - startTime}')

Привет, я считаю, что расчет num_passes был неправильным. Если быть точным, то должно быть num_passes = num_records // chunk_size + int(num_records % chunk_size != 0). Например, если записей 11К, num_passes должно быть 11000//2000 = 5 проходов... и затем +1, чтобы поймать оставшиеся 1000 записей; в основном потолок разделить. Я отредактировал фрагмент с исправлениями.

Mercury 19.11.2022 12:20

Другие вопросы по теме

Похожие вопросы