create stream sm2 interval(5m) sliding(5m) from `ori_data`.`ori_eq_realtime_data` partition by psid,eqtype,eqid,datapoint,tbname stream_options(max_delay(30s)|watermark(3s)) into `lnzh_data_cloud`.`eq_realtime_data` as select _twstart as ts, first(doublevalue) as doublevalue, first(strvalue) as strvalue, avg(doublevalue) as avgvalue, max(doublevalue) as maxvalue, min(doublevalue) as minvalue from %%tbname where ts >= _twstart and ts <= _twend;
create stream if not exists lute_iot_device.t31_daily_stats_stream_min_max_avg interval(1d) sliding (1d) from lute_iot_device.product_property_3yhuya partition by tbname stream_options (watermark (2m) | fill_history (1)) into lute_iot_device.t31_daily_stats output_subtable (concat(‘t31_daily_’, lower(tbname))) ( ts, uid composite key, calculate_time, window_end, avg_temperature, max_temperature, min_temperature ) tags ( device_name varchar(64) as replace (tbname, ‘device_property_’, ‘’) ) as select _twstart as ts, uid as uid, cast(_tlocaltime / 1000000 as timestamp) as calculate_time, _twend as window_end, round(avg(temperature), 2) as avg_temperature, max( case when identifier = ‘high_temperature_alarm’ then temperature else null end ) as max_temperature, min( case when identifier = ‘low_temperature_alarm’ then temperature else null end ) as min_temperature from %%tbname where ts >= _twstart and ts <= _twend partition by device_name, uid;