Я пытаюсь адаптировать следующий код (из этого ответа):
from sqlalchemy import create_engine, select
conn = create_engine("DB URL...").connect()
q = select([huge_table])
proxy = conn.execution_options(stream_results=True).execute(q)
while 'batch not empty': # equivalent of 'while True', but clearer
batch = proxy.fetchmany(100000) # 100,000 rows at a time
if not batch:
break
for row in batch:
# Do your stuff here...
proxy.close()
использовать сеанс вместо прямого соединения. Я в основном создаю такие сеансы:
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
где
engine = create_engine(get_db_url(), **SQLALCHEMY_ENGINE_OPTIONS)
create_database(engine.url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Есть идеи, как использовать параметры выполнения и выбирать их эквиваленты, но с сеансами?
Это работает:
db.execute(select(EventConfigurationDB)).fetchmany(100)
Но это не так:
db.execution_options(stream_results=True).execute(select(EventConfigurationDB)).fetchmany(100)
Вы можете использовать yield_per.
Вот полный пример.
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Something(Base):
__tablename__ = 'something'
id: Mapped[int] = mapped_column(primary_key=True)
engine = create_engine('sqlite:///temp.db', echo=True)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add_all(Something() for _ in range(100))
session.commit()
with Session(engine) as session:
statement = select(Something).execution_options(yield_per=10)
result = session.execute(statement)
while 'batch not empty':
batch = result.fetchmany(10)
if not batch:
break
print(len(batch))
Чтобы ответить на прямой вопрос. Вы можете сделать session.execute(select(Something).execution_options(stream_results=True)).fetchmany(100)
.