Проблема Apache Flink с JOINS в Kinesis Streams Атрибуты Rowtime не должны быть во входных строках обычного соединения

я пытаюсь выполнить простое упражнение у меня есть два потока данных kinesis

  • поток заказов
  • поток отгрузки

Заказы SQL 1


%flink.ssql

CREATE TABLE orders (
    orderid VARCHAR(6),
    orders VARCHAR,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

)
WITH (
    'connector' = 'kinesis',
    'stream' = 'order-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

Отгрузка SQL 2

CREATE TABLE shipment (
    orderid VARCHAR(6),
    shipments  VARCHAR(6),
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

)
WITH (
    'connector' = 'kinesis',
    'stream' = 'shipment-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

Генерация поддельных данных в Kinesis с помощью Python

try:
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from dotenv import load_dotenv
    load_dotenv(".env")
except Exception as e:
    pass

STREAM_NAME_Order = "order-stream"
STREAM_NAME_Shipments = "shipment-stream"


def send_data(kinesis_client):

    order_items_number = random.randrange(1, 10000)

    order_items = {
        "orderid": order_items_number,
        "orders": "1",
        'ts': datetime.datetime.now().isoformat()

    }
    shipping_data = {
        "orderid": order_items_number,
        "shipments": random.randrange(1, 10000),
        'ts': datetime.datetime.now().isoformat()
    }

    partition_key = uuid.uuid4().__str__()
    res = kinesis_client.put_record(
        StreamName=STREAM_NAME_Order,
        Data=json.dumps(order_items),
        PartitionKey=partition_key)
    print(res)
    time.sleep(2)

    res = kinesis_client.put_record(
        StreamName=STREAM_NAME_Shipments,
        Data=json.dumps(shipping_data),
        PartitionKey=partition_key)
    print(res)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis',
                                  aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                  aws_secret_access_key=os.getenv("DEV_SECRET_KEY"),
                                  region_name = "us-east-1",
                                  )
    for i in range(1, 10):
        send_data(kinesis_client)

%flink.ssql(type=update)

SELECT DISTINCT oo.orderid , TUMBLE_START(oo.ts, INTERVAL '10' MINUTE) as event_time
FROM orders as oo
GROUP BY orderid , TUMBLE(oo.ts, INTERVAL '10' MINUTE); 

проблема с присоединением

%flink.ssql(type=update)

SELECT DISTINCT oo.orderid , TUMBLE_START(oo.ts, INTERVAL '10' MINUTE) as event_time , ss.shipments
FROM orders as oo
JOIN  shipment AS ss  ON oo.orderid = ss.orderid
GROUP BY oo.orderid , TUMBLE(oo.ts, INTERVAL '10' MINUTE) , ss.shipments

Сообщения об ошибках

TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

java.io.IOException: Fail to run stream sql job
    at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
    at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:503)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:266)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 


Также пробовал

%flink.ssql(type=update)

SELECT DISTINCT oo.orderid ,
                TUMBLE_START( oo.ts, INTERVAL '1' MINUTE),
                ss.shipments
FROM orders as oo
JOIN  shipment AS ss  ON oo.orderid = ss.orderid
GROUP BY oo.orderid ,
         TUMBLE(CAST(oo.ts AS TIME) ,INTERVAL '1' MINUTE) ,
         ss.shipments

Сообщение об ошибке : Проверка SQL не удалась. От строки 2, столбец 17, до строки 2, столбец 57: вызов вспомогательной групповой функции 'TUMBLE_START' должен иметь соответствующий вызов групповой функции '$TUMBLE' в предложении GROUP BY.

Я не уверен, что именно нужно сделать здесь, любая помощь будет отличной. жду ответа от эксперта

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
51
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
TableException: Rowtime attributes must not be in the input rows of a regular join.
As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

Обычное соединение не может иметь атрибуты времени в своих результатах, потому что атрибут времени не может быть четко определен. Это связано с тем, что строки в динамической таблице должны быть хотя бы приблизительно упорядочены по атрибуту времени, и нет никакого способа гарантировать это для результата обычного соединения (в отличие от интервального соединения, временного соединения или соединения поиска).

В версиях Flink до 1.14 реализация решала эту проблему, не позволяя обычным соединениям иметь атрибуты времени во входных таблицах. Хотя это позволило избежать проблемы, оно было чрезмерно ограничительным.

В вашем случае я предлагаю вам переписать соединение как интервальное соединение, чтобы выходные данные соединения имели атрибут времени, что позволяло применить к нему оконный режим.


Во втором запросе я не уверен на 100%, в чем проблема, но я подозреваю, что проблема в том, что в одном случае вы используете oo.ts, а в другом — CAST(oo.ts AS TIME). Я думаю, они должны быть одинаковыми. Я не думаю, что планировщик SQL Flink достаточно умен, чтобы понять, что здесь происходит.

Большое спасибо @David, я написал статью для людей, которым может быть интересно, как я реализую решение Ссылка linkedin.com/pulse/…

Soumil Nitin Shah 02.01.2023 01:25

Другие вопросы по теме