я пытаюсь выполнить простое упражнение у меня есть два потока данных kinesis
%flink.ssql
CREATE TABLE orders (
orderid VARCHAR(6),
orders VARCHAR,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'order-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
CREATE TABLE shipment (
orderid VARCHAR(6),
shipments VARCHAR(6),
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'shipment-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
try:
import datetime
import json
import random
import boto3
import os
import uuid
import time
from dotenv import load_dotenv
load_dotenv(".env")
except Exception as e:
pass
STREAM_NAME_Order = "order-stream"
STREAM_NAME_Shipments = "shipment-stream"
def send_data(kinesis_client):
order_items_number = random.randrange(1, 10000)
order_items = {
"orderid": order_items_number,
"orders": "1",
'ts': datetime.datetime.now().isoformat()
}
shipping_data = {
"orderid": order_items_number,
"shipments": random.randrange(1, 10000),
'ts': datetime.datetime.now().isoformat()
}
partition_key = uuid.uuid4().__str__()
res = kinesis_client.put_record(
StreamName=STREAM_NAME_Order,
Data=json.dumps(order_items),
PartitionKey=partition_key)
print(res)
time.sleep(2)
res = kinesis_client.put_record(
StreamName=STREAM_NAME_Shipments,
Data=json.dumps(shipping_data),
PartitionKey=partition_key)
print(res)
if __name__ == '__main__':
kinesis_client = boto3.client('kinesis',
aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
aws_secret_access_key=os.getenv("DEV_SECRET_KEY"),
region_name = "us-east-1",
)
for i in range(1, 10):
send_data(kinesis_client)
%flink.ssql(type=update)
SELECT DISTINCT oo.orderid , TUMBLE_START(oo.ts, INTERVAL '10' MINUTE) as event_time
FROM orders as oo
GROUP BY orderid , TUMBLE(oo.ts, INTERVAL '10' MINUTE);
%flink.ssql(type=update)
SELECT DISTINCT oo.orderid , TUMBLE_START(oo.ts, INTERVAL '10' MINUTE) as event_time , ss.shipments
FROM orders as oo
JOIN shipment AS ss ON oo.orderid = ss.orderid
GROUP BY oo.orderid , TUMBLE(oo.ts, INTERVAL '10' MINUTE) , ss.shipments
TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
java.io.IOException: Fail to run stream sql job
at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:503)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:266)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
%flink.ssql(type=update)
SELECT DISTINCT oo.orderid ,
TUMBLE_START( oo.ts, INTERVAL '1' MINUTE),
ss.shipments
FROM orders as oo
JOIN shipment AS ss ON oo.orderid = ss.orderid
GROUP BY oo.orderid ,
TUMBLE(CAST(oo.ts AS TIME) ,INTERVAL '1' MINUTE) ,
ss.shipments
Сообщение об ошибке : Проверка SQL не удалась. От строки 2, столбец 17, до строки 2, столбец 57: вызов вспомогательной групповой функции 'TUMBLE_START' должен иметь соответствующий вызов групповой функции '$TUMBLE' в предложении GROUP BY.
Я не уверен, что именно нужно сделать здесь, любая помощь будет отличной. жду ответа от эксперта
TableException: Rowtime attributes must not be in the input rows of a regular join.
As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Обычное соединение не может иметь атрибуты времени в своих результатах, потому что атрибут времени не может быть четко определен. Это связано с тем, что строки в динамической таблице должны быть хотя бы приблизительно упорядочены по атрибуту времени, и нет никакого способа гарантировать это для результата обычного соединения (в отличие от интервального соединения, временного соединения или соединения поиска).
В версиях Flink до 1.14 реализация решала эту проблему, не позволяя обычным соединениям иметь атрибуты времени во входных таблицах. Хотя это позволило избежать проблемы, оно было чрезмерно ограничительным.
В вашем случае я предлагаю вам переписать соединение как интервальное соединение, чтобы выходные данные соединения имели атрибут времени, что позволяло применить к нему оконный режим.
Во втором запросе я не уверен на 100%, в чем проблема, но я подозреваю, что проблема в том, что в одном случае вы используете oo.ts
, а в другом — CAST(oo.ts AS TIME)
. Я думаю, они должны быть одинаковыми. Я не думаю, что планировщик SQL Flink достаточно умен, чтобы понять, что здесь происходит.
Большое спасибо @David, я написал статью для людей, которым может быть интересно, как я реализую решение Ссылка linkedin.com/pulse/…