Airflow: можно ли повторно использовать и выполнять один и тот же экземпляр оператора много раз, сохраняя состояние между запусками?

Не могли бы вы объяснить, возможно ли при определенных обстоятельствах, что экземпляр оператора может быть повторно использован, и метод execute() будет выполняться много раз, а состояние сохраняется между запусками execute()?

Другими словами, возможен ли такой сценарий в Airflow:

  1. переменная self в операторе инициализируется в init.

  2. Метод execute() считывает собственную переменную и изменяет ее.

  3. execute() запускается еще раз для одного и того же экземпляра оператора, например, из-за перезапуска или чего-то еще и может ли прочитать переменную self, измененную предыдущим запуском?

    class MyOperator(BaseOperator):
    
      def __init__(self,
                 param_1
                 ...
                 param_n):
    
          self.var1=param_1
    
      def execute(self, context):
          #do some logic with self variable
          self.var1  += 1 #
    

Вы не можете получить доступ к предыдущим запускам таким образом. Можете ли вы объяснить о самой проблеме? Вы описали здесь свой подход к решению, однако могут быть и другие идеи, если вы поделитесь самой проблемой.

Elad Kalif 23.12.2020 10:55

Я просто хочу знать, безопасно ли изменять собственные переменные в методе выполнения и иметь некоторую логику, основанную на этом, или экземпляр можно повторно использовать в воздушном потоке, тогда это небезопасно, и я не должен трогать их и использовать локальные переменные, полученные из переменных экземпляра

leftjoin 23.12.2020 11:00

@Elad Я не пытаюсь таким образом получить доступ к предыдущему состоянию экземпляра. Я просто хочу подтверждения, что такой сценарий вообще невозможен.

leftjoin 23.12.2020 11:05
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
3
209
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Описанный вами сценарий невозможен по следующей причине.

Когда Airflow Scheduler отправляет ваш экземпляр задачи в очередь, задача инициализируется каждый такт в рабочем потоке.

Это связано с тем, что при каждом такте заполнения DagBag экземпляры оператора инициализируются.

Любое значение, хранящееся там между запусками, сбрасывается при повторной инициализации.

Если вам нужно хранить значения между прогонами, вы можете использовать модель Variable для хранения таких значений.

from airflow.models import Variable

def execute(self, context):
    #do some logic with self variable 
    var1 = Variable.get(
            "count", 
            deserialize_json=True,
            default_var=0
        )
    var1 += 1    
    Variable.set("count", var1)

Другие вопросы по теме