я использую sqlalchemy[asyncio], sqlalchemy, fastapi, alembic, asyncmy. моя база данных — mysql и имеет следующую структуру проекта:
core/
├─ database.py
├─ settings.py
alemibc/
├─ versions/
├─ env.py
├─ script.py.mako
models/
├─ users/
│ ├─ user_model.py
├─ notes/
│ ├─ note_model.py
alembic.ini
это моя база данных.py:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from .settings import settings
# Create the async engine
engine = create_async_engine(
settings.DATABASE_CONNECTION_URL,
pool_size=50, # Start with 50, adjust based on testing
max_overflow=100, # Allow up to 100 additional connections
pool_timeout=30, # 30 seconds timeout for waiting for a connection
pool_recycle=1800, # Recycle connections every 30 minutes
echo=True # Set to False in production
)
# Create an async session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
)
# Base class for our models
Base = declarative_base()
metadata = Base.metadata
# Dependency to get DB session in FastAPI
async def get_db():
async with AsyncSessionLocal() as session:
yield session
моя пользовательская модель:
from sqlalchemy import Column, Integer, String
from core.database import Base
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(30), unique=False, nullable=False)
email = Column(String(50), unique=True, index=True, nullable=False)
password = Column(String(200), nullable=False)
обратите внимание на модель:
from sqlalchemy import Column, Integer, String, ForeignKey
from core.database import Base
class Note(Base):
__tablename__ = 'notes'
id = Column(Integer, primary_key=True)
title = Column(String(100), unique=False, nullable=False)
content = Column(String(30000), unique=True, index=True, nullable=False)
author_id = Column(Integer, ForeignKey('users.id', ondelete = "CASCADE"), nullable=False)
окр.py:
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from core.database import Base
from models.notes.note_model import Note
from models.users.user_model import User
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts = {"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = AsyncEngine(
engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix = "sqlalchemy.",
poolclass=pool.NullPool,
)
)
with await connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with await context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
когда я запускаю миграцию, я получаю пустые функции обновления и понижения версии
"""Initial migration
Revision ID: ae6c8d3aeccc
Revises:
Create Date: 2024-07-13 00:35:37.358247
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'ae6c8d3aeccc'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass
у меня также есть проблемы с асинхронностью и ожиданием в функции run_migrations_online(), когда я запускаю ее как синхронную по умолчанию, она выдает ошибку, поэтому я нашел решение, чтобы поставить ожидание после ключевого слова «с».
функция run_migrations_online() по умолчанию:
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix = "sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
ошибка возникает, если я запускаю приведенный выше код функции по умолчанию:
(notes-app-env) PS F:\Coding\test-python-backends\fastapi-blog> alembic revision --autogenerate -m "Initial migration4"
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Scripts\alembic.exe\__main__.py", line 7, in <module>
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\config.py", line 636, in main
CommandLine(prog=prog).main(argv=argv)
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\config.py", line 626, in main
self.run_cmd(cfg, options)
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\config.py", line 603, in run_cmd
fn(
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\command.py", line 236, in revision
script_directory.run_env()
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\script\base.py", line 582, in run_env
util.load_python_file(self.dir, "env.py")
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\util\pyfiles.py", line 95, in load_python_file
module = load_module_py(module_id, path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\alembic\util\pyfiles.py", line 113, in load_module_py
spec.loader.exec_module(module) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "F:\Coding\test-python-backends\fastapi-blog\alembic\env.py", line 81, in <module>
run_migrations_online()
File "F:\Coding\test-python-backends\fastapi-blog\alembic\env.py", line 69, in run_migrations_online
with connectable.connect() as connection:
^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\engine\base.py", line 3276, in connect
return self._connection_cls(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\engine\base.py", line 146, in __init__
self._dbapi_connection = engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\engine\base.py", line 3300, in raw_connection
return self.pool.connect()
^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 449, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 1263, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 712, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\impl.py", line 308, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 390, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 674, in __init__
self.__connect()
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 900, in __connect
with util.safe_reraise():
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
raise exc_value.with_traceback(exc_tb)
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\pool\base.py", line 896, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\engine\create.py", line 643, in connect
return dialect.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\engine\default.py", line 620, in connect
return self.loaded_dbapi.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\dialects\mysql\asyncmy.py", line 284, in connect
await_only(creator_fn(*arg, **kw)),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\Coding\test-python-backends\fastapi-blog\notes-app-env\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 123, in await_only
raise exc.MissingGreenlet(
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/20/xd2s)
Я отвечаю почти на каждый вопрос по stackoverflow и github, но не могу понять.
я также сейчас создал только «notes-app-db» в рабочей среде MySQL, и кроме этого в нем нет таблиц или чего-то еще
Хорошо, я обнаружил, что перегонный куб, если он используется с асинхронными драйверами БД, такими как asyncpg, aiomysql, asyncmy, требует изменения файла env, чтобы он также был асинхронным. поэтому нам нужно изменить весь файл env.py на приведенный ниже, а затем добавить ваши модели, базу и другие материалы.
вот асинхронный шаблон ниже:
import asyncio
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts = {"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = AsyncEngine(
engine_from_config(
config.get_section(config.config_ini_section),
prefix = "sqlalchemy.",
poolclass=pool.NullPool,
future=True,
)
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
и запустите миграцию, и все работает нормально, ничего больше не нужно менять, поскольку это похоже на мои файлы, которые я упоминал в вопросе.
вот как вы можете изменить шаблон, который будет использоваться более чем для одной из ваших моделей или только для одной:
import asyncio
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
from alembic import context
from core.database import Base
from models.notes.note_model import Note
from models.users.user_model import User
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts = {"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = AsyncEngine(
engine_from_config(
config.get_section(config.config_ini_section),
prefix = "sqlalchemy.",
poolclass=pool.NullPool,
future=True,
)
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
надеюсь, что это поможет, и это проблема github, из которой я получил шаблон
Из любопытства - есть ли какая-либо польза (помимо удобства, я полагаю) от использования асинхронных драйверов для миграции?