У меня есть вложенный запрос, который мне нужно написать в проекте Pyhton с использованием SQLAlchemy, но все, что я пробовал, похоже, не работает, и я продолжаю получать ошибки. Это SQL-запрос (я использую Snowflake), и он возвращает все учетные записи с максимальным количеством заказов (есть несколько учетных записей с одинаковым максимальным количеством):
SELECT ACCOUNT_ID, COUNT(*) as MAX_ORDERS
FROM ORDERS
GROUP BY ACCOUNT_ID
HAVING COUNT(*) = (
SELECT MAX(COUNT_ORDERS)
FROM (
SELECT ACCOUNT_ID, COUNT(ORDER_ID) as COUNT_ORDERS
FROM ORDERS
GROUP BY ACCOUNT_ID
ORDER BY COUNT(ORDER_ID) DESC
)
);
Это класс Orders, написанный на Python:
class Order(Base):
__tablename__ = "ORDERS"
order_id = db.Column(db.INTEGER, primary_key=True)
account_id = db.Column(db.INTEGER)
bank_to = db.Column(db.String(16777216))
account_to = db.Column(db.INTEGER)
amount = db.Column(db.Float)
k_symbol = db.Column(db.String(16777216))
Я пробовал разделить на 3 запроса, связанных друг с другом, но перехожу от одной ошибки к другой. Кто-нибудь, пожалуйста, помогите мне разобраться в этом вложенном запросе?






Вы можете использовать два подзапроса, чтобы делать то, что хотите.
from sqlalchemy import create_engine, String, select, func, text, column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "ORDERS"
order_id: Mapped[int] = mapped_column(primary_key=True)
account_id: Mapped[int]
bank_to: Mapped[str] = mapped_column(String(16777216))
account_to: Mapped[int]
amount: Mapped[float]
k_symbol: Mapped[str] = mapped_column(String(16777216))
engine = create_engine("sqlite:///temp.sqlite", echo=True)
Base.metadata.create_all(engine)
with Session(engine) as session:
subq_1 = (
select(Order.account_id, func.count(Order.order_id).label("COUNT_ORDERS"))
.group_by(Order.account_id)
.order_by(func.count(Order.order_id).desc())
)
subq_2 = select(func.max(column("COUNT_ORDERS"))).select_from(subq_1.subquery())
statement = (
select(Order.account_id, func.count(text("*")).label("MAX_ORDERS"))
.group_by(Order.account_id)
.having(func.count(text("*")) == subq_2.scalar_subquery())
)
for i in session.scalars(statement):
print(i)
Это генерирует запрос
SELECT "ORDERS".account_id, count(*) AS "MAX_ORDERS"
FROM "ORDERS" GROUP BY "ORDERS".account_id
HAVING count(*) = (SELECT max("COUNT_ORDERS") AS max_1
FROM (SELECT "ORDERS".account_id AS account_id, count("ORDERS".order_id) AS "COUNT_ORDERS"
FROM "ORDERS" GROUP BY "ORDERS".account_id ORDER BY count("ORDERS".order_id) DESC) AS anon_1)
Отлично. Большое спасибо. Он работает отлично.