Я хотел бы создать сервис FastAPI
с одной конечной точкой /get
, которая будет возвращать результат вывода модели ML. Реализовать это довольно легко, но загвоздка в том, что мне периодически нужно обновлять модель на более новую версию (через запрос на другой сервер с моделями, но это не имеет значения), и здесь я вижу проблему!
Что произойдет, если один запрос вызовет старую модель, но в данный момент старая модель заменяется более новой?? Как я могу реализовать такой механизм блокировки с помощью asyncio
?
Вот код:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from sentence_transformers import SentenceTransformer
app = FastAPI()
sbertmodel = None
def create_model():
global sbertmodel
sbertmodel = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')
# if you try to run all predicts concurrently, it will result in CPU trashing.
pool = ProcessPoolExecutor(max_workers=1, initializer=create_model)
def model_predict():
ts = time.time()
vector = sbertmodel.encode('How big is London')
return vector
async def vector_search(vector):
# simulate I/O call (e.g. Vector Similarity Search using a VectorDB)
await asyncio.sleep(0.005)
@app.get("/")
async def entrypoint(request: Request):
loop = asyncio.get_event_loop()
ts = time.time()
# worker should be initialized outside endpoint to avoid cold start
vector = await loop.run_in_executor(pool, model_predict)
print(f"Model : {int((time.time() - ts) * 1000)}ms")
ts = time.time()
await vector_search(vector)
print(f"io task: {int((time.time() - ts) * 1000)}ms")
return "ok"
Обновление моей модели будет реализовано с помощью повторяющихся задач (но сейчас это не важно): https://fastapi-utils.davidmontague.xyz/user-guide/repeated-tasks/
В этом идея сервировки модели: https://luis-sena.medium.com/how-to-optimize-fastapi-for-ml-model-serving-6f75fb9e040d
Обновлено: что важно для одновременного выполнения нескольких запросов, и пока модель обновляется, получите блокировку, чтобы запросы не терпели неудачу, им следует просто подождать немного дольше, потому что это небольшая модель.
Это будет зависеть от того, как вы в данный момент получаете доступ к модели, но общий способ обновления ресурсов во время работы — не заменять исходную ссылку до того, как вы все загрузите и обработаете — поэтому, если у вас есть зависимость или глобальное состояние в вашем приложение, не заменяйте эту часть до загрузки нового ресурса — обслуживайте старый, пока не будет готов новый. Если вам нужна блокировка, вам также может понадобиться, чтобы она была перекрестной (т. е. несколькими рабочими), поэтому вы можете посмотреть что-то вроде Redis (или совместимое), чтобы создать межпроцессную блокировку.
@MatsLindh, если мой API подключен к сети и постоянно обслуживает модели, мне нужно реализовать какую-то фоновую задачу, которая заблокирует текущий ресурс и заменит его старым. Я думал выполнять блокировку только после завершения загрузки модели. Тогда мне нужно было бы каким-то образом заблокировать (модели маленькие) функции, обслуживающей модель. Я просто не знаю, что произойдет, если несколько запросов захотят получить доступ к модели во время обновления??
В примере, который вы связали, замена переменной sbertmodel
обновленной моделью не должна вызвать никаких проблем, поскольку существует только один вызов модели - вы либо вызываете ее, либо нет, и нет последующих вызовов, где можно использовать две разные переменные - поэтому в этом случае блокировка не требуется (именно поэтому я сказал, что это зависит от вашего текущего кода). Если у вас несколько вызовов, вы можете присвоить ссылку на свою собственную переменную, которая не будет заменена во время вызова модели (или использования зависимости) (т. е. скопировать ее из старой).
«поскольку есть только один вызов модели». Извините, я этого не понимаю. В моем блоге вызывается ProcessPoolExecutor, который позволяет (насколько я вижу) одновременному доступу к модели для нескольких запросов. Допустим, есть 10 сопрограмм, желающих получить доступ к модели, и одна сопрограмма хочет ее обновить, что произойдет??? Я думал, что мне нужно реализовать блокировку чтения и записи. Метод обновления должен иметь блокировку записи.... Кстати, спасибо за ответ.
Можно дать прямой ответ с кодом, если в вашем вопросе есть код, с минимальным воспроизводимым примером. Py-файл из 20–30 строк с настройкой модели, заменяющей конечной точкой модели и общей конечной точкой, которая будет использовать модель. При этом я или другой человек мог бы добавить в ваш пример пару строк, показывающих, как добавить asyncio.Lock (если он вообще нужен). Без кода ответ: «Может быть, вы могли бы использовать asyncio.Lock» — это настолько полная вещь, насколько вы получите. Извини.
Спасибо за ваш фрагмент. Когда оно видно, можно написать предложение. для чего там нужно - как оказалось, нужно обновить модель в подпроцессе, и в асинхронной части кода основного процесса не о чем беспокоиться. Сигнализация однако рабочие процессы обновлений требуют некоторого внимания.
Поскольку вы используете рабочие ProcessPool, вам нужен способ предоставить переменные из корневой процесс, который могут «видеть» работники процесса —
Python имеет это в форме
multiprocessing.Manager
объекты -
Ниже я выберу ваш код и добавлю части. необходимо для вашего реквизита «не сразу, но и без противоречий» обновление используемой модели. Оказывается, когда у нас есть переменные, которые можно увидеть в рабочий, все, что нужно, это проверка в модель-раннере сам метод, чтобы узнать, нужно ли обновить модель.\
Я не запускал этот фрагмент — поэтому в именах переменных может быть какая-то опечатка или даже недостающая скобка — используйте в качестве модели,
не "копировать+вставить" (но я тестировал "подвижные части"
объектов Manager.Namespace()
и передать их в качестве параметров
как initargs
в ProcessPoolExecutor
)
import asyncio
import time
import threading
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from fastapi import FastAPI, Request
from sentence_transformers import SentenceTransformer
sbertmodel = None
local_model_iteration = -1
shared_namespace = None
# pool, and other multi-processing objects can`t simply
# be started in the top level of the body, or they't be re
# created in each subprocess!!
# check https://fastapi.tiangolo.com/advanced/events/#lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool, root_namespace
manager = Manager()
root_namespace = manager.NameSpace()
# Values assigned to the "namespace" object are
# visible on the subprocess created by the pool
root_namspace.model_iteration = 0
root_namespace.model_parameters = "multi-qa-MiniLM-L6-cos-v1"
# (as long as we send the namespace object to each subprocess
# and store it there)
pool = ProcessPoolExecutor(max_workers=1, initializer=initialize_subprocess, initargs=(root_namespace,))
with pool, manager:
# pass control to fastapi: all the app is executed
yield
# end of "with" block:
# both the pool and manager are shutdown when fastapi server exits!
app = FastAPI(lifespan=lifespan)
# if you try to run all predicts concurrently, it will result in CPU trashing.
def initialize_subprocess(shared_namespace_arg):
global shared_namespace
# Store the shared namespace in _this_ process:
shared_namespace = shared_namespac_arg
update_model()
def update_model():
"called on worker subprocess start, and at any time the model is outdated"
global local_model_iteration, sbertmodel
local_model_iteration = shared_namespace.model_iteration
# retrieve parameter posted by root process:
sbertmodel = SentenceTransformer(shared_namespace.model_parameters)
def model_predict():
ts = time.time()
# verify if model was updatd from the root process
if shared_namespace.model_iteration > local_model_iteration:
# if so, just update the model
update_model()
# model is synchronied, just do our job:
vector = sbertmodel.encode('How big is London')
return vector
async def vector_search(vector):
# simulate I/O call (e.g. Vector Similarity Search using a VectorDB)
await asyncio.sleep(0.005)
@app.get("/")
async def entrypoint(request: Request):
loop = asyncio.get_event_loop()
ts = time.time()
# worker should be initialized outside endpoint to avoid cold start
vector = await loop.run_in_executor(pool, model_predict)
print(f"Model : {int((time.time() - ts) * 1000)}ms")
ts = time.time()
await vector_search(vector)
print(f"io task: {int((time.time() - ts) * 1000)}ms")
return "ok"
@app.get("/update_model")
async def update_model_endpoint(request: Request):
# extract from the request the needed paramters for the new model
...
new_model_parameters = ...
# uodate the model parameters and model iteration so they are visible
# in the worker(s)
root_namespace.model_parameters = new_model_parameters
# This increment taking place _after_ the "model_parameters" are set
# is all that is needed to keep things running in order here:
root_namespace.model_iteration += 1
return {} # whatever response needed by the endpoint
Пробовали
asyncio.Lock
?