У меня так много проблем, пытаясь заставить это работать.
Я разрабатываю API, в котором есть действия (называемые в модели «Сессия»), которые имеют два атрибута, связанных со временем: ДАТА (начальная дата и время самого действия) и ПРОДОЛЖИТЕЛЬНОСТЬ (продолжительность самого действия).
Когда создается новое действие, я хочу проверить, не будет ли персонал, который будет работать над новым действием, пересекаться с другим действием.
Я использую Python (fastAPI) в качестве платформы BE и базу данных MySQL. На данный момент я сделал следующие подходы:
def get_overlapping_staffSession(db: Session, session: schemas.SessionCreate) -> list[models.StaffSession]:
staff_ids = []
new_session_start = session.date
new_session_end = (session.date + timedelta(hours=session.duration))
for staff in session.staff:
staff_ids.append(staff.id)
overlapping_sessions = (
db.query(models.StaffSession)
.join(models.Session, models.StaffSession.id_session == models.Session.id)
.filter(
and_(
models.StaffSession.id_staff.in_(staff_ids),
models.Session.date <= new_session_end,
models.Session.date + (models.Session.duration * timedelta(hours=1)) >= new_session_start
)
)
).all()
return overlapping_sessions
Это должно вернуть массив с моделями StaffSession, которые находятся в новом временном диапазоне и имеют сотрудников, которые будут работать в новом действии.
Значение session.date имеет тип datetime , а дата столбца в модели Session имеет тип datetime. Я видел в Интернете разные примеры, в которых не возникло проблем с использованием этого.
Чтобы убедиться, что это работает, я попробовал реализовать тот же запрос в MySQL. В следующем примере используются те же значения (идентификатор и диапазон времени), что и для тестирования приведенного выше кода.
SELECT
staffsession.*
FROM
staffsession
JOIN
session ON staffsession.id_session = session.id
WHERE
staffsession.id_staff IN (12)
AND session.date <= '2024-04-15T16:30:00+01:00'
AND session.date + INTERVAL session.duration SECOND >= '2024-04-15T13:30:00+01:00';
При выполнении этого запроса он отлично работает в MySQL, в API он возвращает все StaffSession, содержащие идентификаторы в Staff_ids, но без фильтрации по дате.
Любая помощь будет более чем желательна и спасибо за ваше время.
Там Session.duration*timedelta(hours=1) и INTERVAL session.duration SECOND. Это правда?






Согласно документации SQLAlchemy для типа INTERVAL:
Тип объектов datetime.timedelta().
Тип Interval имеет дело с объектами datetime.timedelta. В PostgreSQL и Oracle используется собственный тип INTERVAL; для других значение сохраняется как дата, относящаяся к «эпохе» (1 января 1970 г.).
Обратите внимание, что тип Interval в настоящее время не обеспечивает арифметические операции с датами на платформах, которые не поддерживают типы интервалов изначально. Такие операции обычно требуют преобразования обеих частей выражения (например, сначала преобразование обеих сторон в целочисленные значения эпохи), что в настоящее время является ручной процедурой (например, через выражение.func).
Таким образом, арифметика даты и времени с использованием интервалов напрямую не поддерживается в MySQL. Мы можем проверить это, выполнив интервальную арифметику из запроса в вопросе в некоторые выборочные даты:
import sqlalchemy as sa
engine = sa.create_engine(...)
sessions = sa.Table(...)
# (create the table, insert some dates...)
QUERY = """
SELECT
date + INTERVAL duration SECOND
FROM
sessions
ORDER BY id
"""
with engine.connect() as conn:
rows = conn.execute(sa.text(QUERY))
for row in rows:
print(row)
результаты
(datetime.datetime(2024, 4, 15, 17, 30),)
(datetime.datetime(2024, 4, 15, 17, 0),)
(datetime.datetime(2024, 4, 15, 13, 0),)
но запрос SQLAlchemy
with engine.connect() as conn:
q = sa.select(
sessions.c.date + (sessions.c.duration * dt.timedelta(hours=1))
).order_by(sessions.c.id)
rows = conn.execute(q)
for row in rows:
print(row)
результаты
(20240422255000.0,)
(20240422252000.0,)
(20240422212000.0,)
Есть как минимум два способа обойти это. Мы можем преобразовать все элементы в временные метки UNIX и обратно или использовать sqlalchemy.text для вставки требуемого SQL.
with engine.connect() as conn:
q = sa.select(
sa.func.from_unixtime(
sa.func.unix_timestamp(sessions.c.date) + (sessions.c.duration)
),
sessions.c.date + sa.text('INTERVAL sessions.duration SECOND')
).order_by(sessions.c.id)
rows = conn.execute(q)
for row in rows:
print(row)
давая нам
(datetime.datetime(2024, 4, 15, 17, 30), datetime.datetime(2024, 4, 15, 17, 30))
(datetime.datetime(2024, 4, 15, 17, 0), datetime.datetime(2024, 4, 15, 17, 0))
(datetime.datetime(2024, 4, 15, 13, 0), datetime.datetime(2024, 4, 15, 13, 0))
В приведенном выше примере используется основной синтаксис SQLAlchemy. Эквивалентный синтаксис ORM будет
overlapping_sessions = (
db.query(models.StaffSession)
.join(models.Session, models.StaffSession.id_session == models.Session.id)
.filter(
and_(
models.StaffSession.id_staff.in_(staff_ids),
models.Session.date <= new_session_end,
sa.func.from_unixtime(sa.func.unix_timestamp(models.Sessions.date) + (model.Session.duration)) >= new_session_start
)
)
).all()
Применение обходного пути преобразования временной метки в unix сработало без проблем. Спасибо за ваш развернутый ответ!
Если ваш SQL-запрос работает должным образом, а SQLAlchemy — нет, это может означать только одно: SQL-запрос, отправляемый SQLAlchemy, отличается, если вы установите параметр engine
echo=TruePython зарегистрирует оператор отправки, что, вероятно, поможет решить вашу проблему. Я бы также посоветовал вам внимательно следить за версией SQLAlchemy, которую вы используете, многие исторические ответы относятся к <2.0 - поскольку вы используетеquery()иfilter(), а неselect()иwhere(). Я предполагаю, что вы ищете на устаревшие ответы - сам был там.