流计算滑动触发,历史数据覆盖

【TDengine 使用环境】
测试环境

【TDengine 版本】3.3.8.8 oss

【操作系统以及版本】linux

【部署方式】容器部署

【集群节点数】1

【集群副本数】1

【描述业务影响】采用SLIDING(0s),无窗口,实时触发,计算结果会覆盖历史计算结果

【遇到的问题:问题现象及影响】

  1. 根据触发表写入事件的时间实时触发流计算,计算结果会被最新的计算结果覆盖。是什么原因?需要实时触发,且记录触发时间,应该如何修改?
    完整创建流SQL语句如下:
    CREATE STREAM IF NOT EXISTS energy_storage.stream_pcs_sliding_calc
    SLIDING(0s)
    FROM energy_storage.meas_pcs
    STREAM_OPTIONS(
    IGNORE_DISORDER |
    LOW_LATENCY_CALC
    )
    INTO energy_storage.pcs_realtime_result
    AS
    SELECT
    CAST(_tlocaltime/1000000 AS TIMESTAMP) AS ts,
    SUM(latest_v) AS total_power,
    COUNT(latest_v) AS device_count,
    AVG(latest_v) AS avg_power
    FROM (
    SELECT
    tbname,
    LAST(v) AS latest_v
    FROM energy_storage.meas_pcs
    GROUP BY tbname
    );

  2. 采用period(5m)每5分钟定时触发不会数据覆盖问题。请问每个月天数不同,如何在每月结束时定时触发,实现月结算?

【资源配置】16G 32C

【报错完整截图】

001

image

002

image

003

image

result:

SLIDING(0s) : 0s 滑动?这个是什么业务场景需要?

这种情况就应该不能创建流成功才对。我们试试。

场景是系统中有数据不定时变化并写入,变化周期在数秒到数十秒之间,当该数据变化时,立即触发统计计算。

SLIDING(0s)确实建流成功了。

采用COUNT_WINDOW 触发方式时,必须显式使用 PARTITION BY tbname,无法拿到超表下所有子表的最新值,所以不适用。

采用INTERVAL(10s) SLIDING(10s)窗口触发方式,担心需要等待10s窗口结束之后才更新结果,所以之前没使用。但后续试验发现,这一个流并不会等待10s窗口结束之后才更新结果,而是有数据变化可以立即更新(0.2s-1.5s后)。我不清楚窗口触发的工作原理,是不是使用有问题?

如果还有更好的实现方式,还请您说明一下。

INTERVAL(10s) SLIDING(10s): 这个是事件的时间窗口,而不是系统的时间间隔。比如达到的记录的主键时间依次是 08:00:00, 08:00:11, 08:0022。如果他们是在1s内全部达到了,那么就会产生两次计算,输出2条结果记录。而并不会等待10s。

当然,如果主键时间 就是按照采样时间到达,中间相隔10s, 那就是 每隔10s 才会进行计算。

所以,实际的计算时延,取决于 记录达到的时间。

好的,我还想问period(1d)可以在系统时间每天结束时计算,那么有没有方法在自然月或自然年结束时定时计算?

同问这个,有没有自带的按月计算和按年计算的。

目前还不支持,正在研发排期开发中,大概下季度初会发布。

采用INTERVAL(10s) SLIDING(10s),运行一段时间之后,依然会存在历史数据覆盖的问题,如何解决?

请将原始数据 和 计算的结果 发出来看看。

历史数据覆盖,只能是流计算出的结果的主键 重复了,才会覆盖。

原始数据(INSERT数据时间,值,质量位):

pcs_001

image

pcs_002

pcs_003

image

计算结果(_tlocaltime/1000000,和,计数,平均):

没重复吧,刚开始流计算正常,几天后出现历史数据覆盖问题,什么原因啊?

哪些数据被覆盖了?

2026-03-02T13:31:35.324+08:00时,执行操作:INSERT INTO energy_storage.pcs_002 USING energy_storage.meas_pcs TAGS(1,1,1,1,1,2) VALUES (now , 200, 1);

2026-03-02T13:31:36.040+08:00结果更新.

2026-03-02T13:32:06.510+08:00 时,执行操作:INSERT INTO energy_storage.pcs_002 USING energy_storage.meas_pcs TAGS(1,1,1,1,1,2) VALUES (now , 80, 1);

2026-03-02T13:32:08.150+08:00结果更新,并覆盖上1条结果的total_power、device_count、avg_power.

2026-03-02T13:35:44.081+08:00时,执行操作:INSERT INTO energy_storage.pcs_002 USING energy_storage.meas_pcs TAGS(1,1,1,1,1,2) VALUES (now , 200, 1);

2026-03-02T13:35:45.198+08:00 结果更新,并覆盖上2条结果的total_power、device_count、avg_power.

2026-03-02T13:31:36.040+08:00结果更新.

2026-03-02T13:32:08.150+08:00结果更新.

2026-03-02T13:35:45.198+08:00 结果更新.

是流计算按时输出了新的结果记录,同时也将 上面的结果中的 total_power、device_count、avg_power 字段值覆盖了,但没有覆盖 结果的主键ts字段。

是这样吗?

是这样的。

如果能复现的话,那就请我们的研发 @Kane 帮忙看看吧,需要能够远程。

可以复现,可以腾讯会议

这是因为 _tlocaltime/1000000 作为时间戳主键吧,这是窗口触发计算时的系统时间,那么每次计算结果的 ts 主键肯定是不相同,不会覆盖

可以再完整提供一次以下信息吗?我先本地看一看:

  1. 建表语句;
  2. 建流语句
  3. 写入语句
  4. 期望行为

1、建表语句

CREATE STABLE IF NOT EXISTS energy_storage.meas_pcs (ts TIMESTAMP, v DOUBLE, q SMALLINT) TAGS (
st_id SMALLINT,
v_id SMALLINT,
b_id SMALLINT,
p_id SMALLINT,
t_id SMALLINT,
m_id INT
);

2、建流语句

CREATE STREAM IF NOT EXISTS energy_storage.stream_pcs_sliding_calc
INTERVAL(1s) SLIDING(1s)
FROM energy_storage.meas_pcs
STREAM_OPTIONS(
IGNORE_DISORDER |
LOW_LATENCY_CALC
)
INTO energy_storage.pcs_realtime_result
AS
SELECT
CAST(_tlocaltime/1000000 AS TIMESTAMP) AS ts,
SUM(latest_v) AS total_power,
COUNT(latest_v) AS device_count,
AVG(latest_v) AS avg_power
FROM (
SELECT
tbname,
LAST(v) AS latest_v
FROM energy_storage.meas_pcs
GROUP BY tbname
);

3、写入语句

INSERT INTO energy_storage.pcs_001 USING energy_storage.meas_pcs TAGS(1,1,1,1,1,1) VALUES (now , 100.0, 1);

INSERT INTO energy_storage.pcs_002 USING energy_storage.meas_pcs TAGS(1,1,1,1,1,2) VALUES (now , 200.0, 1);

INSERT INTO energy_storage.pcs_003 USING energy_storage.meas_pcs TAGS(1,1,1,1,1,3) VALUES (now , 300.0, 1);

4、期望行为

对超表energy_storage.meas_pcs下的所有子表(例如pcs_001,pcs_002,pcs_003…)的最新v值进行求和、计数、求平均等计算,每写入一条数据,结果实时更新,以计算时间为主键,写入到普通表pcs_realtime_result中。

5、备注

新建流时可能不会存在问题(2026-02-26的结果正常),过一段时间之后(2026-03-02),同样的写入方式,就会出现历史数据覆盖的问题。