Переменные модуля концентратора TF, используемые в предварительной обработке, не экспортируются в контрольные точки во время обучения

Я использую tensorflow_transform для предварительной обработки текстовых данных с помощью Модуль концентратора TF, а затем использую производные функции для обучения модели. Ниже я попытался привести минимальный рабочий пример.

конвейер.py

1) встраивает два текста, используя ННЛМ
2) вычисляет косинусное расстояние между ними
3) записывает предварительно обработанные данные в .csv файл.
4) экспортирует transform_fn функцию/граф предварительной обработки для последующего использования для обслуживания
5) бежать python pipeline.py

    import tensorflow as tf

    import apache_beam as beam
    from tensorflow_transform.beam.tft_beam_io import transform_fn_io
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.io import WriteToText

    import tensorflow_transform.beam.impl as beam_impl
    from tensorflow_transform.coders.csv_coder import CsvCoder
    from tensorflow_transform.tf_metadata import dataset_metadata, dataset_schema

    import tensorflow_hub as hub

    tf_input_raw_feature_spec = {
        'text_1': tf.FixedLenFeature([], tf.string),
        'text_2': tf.FixedLenFeature([], tf.string),
        'y': tf.FixedLenFeature([], tf.float32),
    }

    SAMPLE_INPUT = [({
        'text_1': 'Help me embed this!',
        'text_2': 'Help me embed this!',
        'y': 1
    }), ({
        'text_1': 'And this as well',
        'text_2': 'Lunch Lunch Lunch',
        'y': 0
    })]

    tf_input_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(tf_input_raw_feature_spec))


    def tf_transform_preprocessing(inputs):
        outputs = {}

        module = hub.Module("https://tfhub.dev/google/nnlm-de-dim128-with-normalization/1")

        text_1_embed = module(inputs['text_1'])
        text_2_embed = module(inputs['text_2'])

        # Calculate Cosine Similarity
        question_normalized = tf.nn.l2_normalize(text_1_embed, 1)
        content_normalized = tf.nn.l2_normalize(text_2_embed, 1)
        outputs['cosine_similarity'] = tf.reduce_sum(tf.multiply(question_normalized, content_normalized),
                                                     keepdims=True,
                                                     axis=1)
        outputs['y'] = inputs['y']

        return outputs


    def run():
        pipeline_options = PipelineOptions()
        pipeline_options.view_as(SetupOptions).save_main_session = True

        with beam.Pipeline(options=pipeline_options) as p,\
                beam_impl.Context(temp_dir='./tmp'):

            pcoll_text = p | beam.Create(SAMPLE_INPUT)

            transformed_dataset, transform_fn = (
                (pcoll_text, tf_input_metadata)
                | 'Analyze and Transform' >> beam_impl.AnalyzeAndTransformDataset(tf_transform_preprocessing))

            transformed_data, transformed_metadata = transformed_dataset

            column_names = transformed_metadata.schema.as_feature_spec().keys()

            (transformed_data | ' Write PCollection to GCS, csv' >> WriteToText(
                file_path_prefix='./preprocessed_output',
                num_shards=1,
                coder=CsvCoder(column_names=column_names, schema=transformed_metadata.schema),
                compression_type='uncompressed',
                header=','.join(column_names)))

            transform_fn | 'Write transformFn' >> transform_fn_io.WriteTransformFn('./metadata')


    if __name__ == '__main__':
        run()

Вход:

SAMPLE_INPUT = [({
    'text_1': 'Help me embed this!',
    'text_2': 'Help me embed this!',
    'y': 1
}), ({
    'text_1': 'And this as well',
    'text_2': 'Lunch Lunch Lunch',
    'y': 0
})]

Предварительно обработанный вывод в preprocessed_output-00000-of-00001.csv:

y,cosine_similarity
1.0,1.0000001
0.0,0.1290714

поезд.py

1) обучает tf.estimator.LinearRegressor предварительно обработанным данным
2) Периодически оценивает и экспортирует модель с помощью Checkpoints
3) Во время этой оценки он также экспортирует serving_input_receiver_fn, который я позже хочу использовать для его обслуживания в производстве. Так как я хочу кормить сырой данных в модель во время обслуживания, я применяю экспортированные tf-transform преобразования в serving_input_fn.
4) бежать python train.py

from sys import argv
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

tf_input_raw_feature_spec = {
    'text_1': tf.FixedLenFeature([], tf.string),
    'text_2': tf.FixedLenFeature([], tf.string),
    'y': tf.FixedLenFeature([], tf.float32),
}

tf_input_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(tf_input_raw_feature_spec))


def make_input_fn(input_file_pattern, num_epochs, batch_size, label_variable, shuffle=False):
    return tf.contrib.data.make_csv_dataset(file_pattern=input_file_pattern,
                                            batch_size=batch_size,
                                            label_name=label_variable,
                                            num_epochs=num_epochs,
                                            shuffle=shuffle)


def make_serving_input_fn(tf_transform_output):
    tf_transform_output.load_transform_graph()
    raw_feature_spec = tf_input_metadata.schema.as_feature_spec()
    raw_feature_spec.pop('y')

    def serving_input_fn():
        raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(raw_feature_spec,
                                                                                   default_batch_size=None)
        serving_input_receiver = raw_input_fn()

        # Apply the transform function on raw input
        raw_features = serving_input_receiver.features
        transformed_features = tf_transform_output.transform_raw_features(raw_features)
        return tf.estimator.export.ServingInputReceiver(transformed_features, serving_input_receiver.receiver_tensors)

    return serving_input_fn


def train(args):
    tf.logging.set_verbosity(tf.logging.INFO)
    tf_transform_output = tft.TFTransformOutput(args['tf_transform'])

    # model and all outputs under this relative path
    model_dir = './logs/'

    train_input_files = ['preprocessed_output-00000-of-00001']

    tf.logging.info(train_input_files)

    def train_input_fn():
        return make_input_fn(input_file_pattern=train_input_files,
                             num_epochs=args['num_epochs'],
                             batch_size=args['batch_size'],
                             label_variable=args['label_variable'],
                             shuffle=True)

    eval_input_files = ['preprocessed_output-00000-of-00001']

    tf.logging.info(eval_input_files)

    def eval_input_fn():
        return make_input_fn(input_file_pattern=eval_input_files,
                             num_epochs=1,
                             batch_size=args['batch_size'],
                             label_variable=args['label_variable'])

    feature_columns = [tf.feature_column.numeric_column(key='cosine_similarity')]

    estimator = tf.estimator.LinearRegressor(feature_columns=feature_columns, model_dir=model_dir)

    train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=args['train_max_steps'])

    serving_input_receiver_fn = make_serving_input_fn(tf_transform_output)

    exporter = tf.estimator.LatestExporter(name='model_export', serving_input_receiver_fn=serving_input_receiver_fn)

    eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=None, exporters=[exporter], throttle_secs=150)

    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)


if __name__ == '__main__':
    args = {
        'tf_transform': './metadata',
        'num_epochs': 10,
        'batch_size': 1,
        'label_variable': 'y',
        'train_max_steps': 1000
    }
    train(args)

Проблема

Всякий раз, когда я бегу train.py, это успешно

  • загружает тренировочные данные
  • строит модель
  • поезда до первого Checkpoint,

но всегда терпит неудачу, когда пытается восстановить из Checkpoint и продолжить обучение со следующим сообщением об ошибке:

NotFoundError (see above for traceback): Restoring from checkpoint failed. This is most likely due to a Variable name or other graph key that is missing from the checkpoint. Please ensure that you have not altered the graph expected based on the checkpoint. Original error:

Key transform/module/embeddings not found in checkpoint
         [[node save/RestoreV2_1 (defined at /.../env/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:924) ]]

Насколько я понимаю, он не может восстановить части графа модуля TF Hub, используемого на этапе предварительной обработки (transform/module/embeddings). Удаление exporter из eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=None, exporters=[exporter], throttle_secs=150) позволяет успешно завершить обучение, но, очевидно, не экспортирует saved_model.

TLDR

Как использовать TF Hub модуль в tf-transform предварительной обработке и применять эти преобразования данных в serving среде в сочетании с обученной моделью?

Приложение

требования.txt

apache-beam[gcp]==2.11
tensorflow-transform==0.13
tensorflow==1.13.1
tensorflow-hub==0.4.0

Заранее большое спасибо!

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
296
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Ответил на гитхабе. Ниже приведена ссылка https://github.com/tensorflow/transform/issues/125#issuecomment-514558533.

Публикация ответа здесь на благо сообщества.

Добавление tftransform_output.load_transform_graph() к train_input_fn решит проблему. Это относится к тому, как работает tf.Learn. В вашем serving graph он пытается читать из обучения checkpoint, но, поскольку вы используете материализованные данные, ваш обучающий график не содержит встраивания.

Ниже приведен код для того же самого:

def train_input_fn():
        tf_transform_output.load_transform_graph()
        return make_input_fn(input_file_pattern=train_input_files,
                             num_epochs=args['num_epochs'],
                             batch_size=args['batch_size'],
                             label_variable=args['label_variable'],
                             shuffle=True)

Другие вопросы по теме