自定义函数 大概需求是获取数据按时间排序只获取递增的数据并且点与点直接不能超过2000 使用的是C语言版本
TD版本是3.3.5.0
gcc是9.3.0 一开始4.几版编译不通过后换的
以下是函数代码
#include “taosudf.h”
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#define MAX_POINTS 10000
// 全局状态变量
static int64_t g_timestamps[MAX_POINTS];
static double g_values[MAX_POINTS];
static int32_t g_count = 0;
static int32_t g_process_called = 0;
// UDF初始化函数
DLL_EXPORT int32_t filtered_max_init() {
g_count = 0;
g_process_called = 0;
return 0;
}
// UDF聚合开始函数
DLL_EXPORT int32_t filtered_max_start(SUdfInterBuf *interBuf) {
return 0;
}
// UDF聚合处理函数
DLL_EXPORT int32_t filtered_max(SUdfDataBlock *inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
// 基本的缓冲区设置
if (newInterBuf != NULL && interBuf != NULL) {
newInterBuf->buf = interBuf->buf;
newInterBuf->bufLen = interBuf->bufLen;
newInterBuf->numOfResult = interBuf->numOfResult;
}
g_process_called = 1;
// 安全检查
if (inputBlock == NULL) {
return 0;
}
if (inputBlock->numOfCols < 2) {
return 0;
}
// 检查是否还有空间
if (g_count >= MAX_POINTS) {
return 0;
}
// 获取列
SUdfColumn* ts_col = inputBlock->udfCols[0];
SUdfColumn* val_col = inputBlock->udfCols[1];
if (ts_col == NULL || val_col == NULL) {
return 0;
}
// 处理每一行
for (int32_t i = 0; i < inputBlock->numOfRows && g_count < MAX_POINTS; i++) {
// 跳过空值
if (udfColDataIsNull(ts_col, i) || udfColDataIsNull(val_col, i)) {
continue;
}
// 提取时间戳
void* ts_data = udfColDataGetData(ts_col, i);
if (ts_data == NULL) {
continue;
}
int64_t timestamp = 0;
if (ts_col->colMeta.type == TSDB_DATA_TYPE_TIMESTAMP) {
timestamp = *(int64_t*)ts_data;
} else {
continue;
}
// 提取数值
void* val_data = udfColDataGetData(val_col, i);
if (val_data == NULL) {
continue;
}
double value = 0.0;
switch (val_col->colMeta.type) {
case TSDB_DATA_TYPE_TINYINT:
value = (double)(*(int8_t*)val_data); break;
case TSDB_DATA_TYPE_SMALLINT:
value = (double)(*(int16_t*)val_data); break;
case TSDB_DATA_TYPE_INT:
value = (double)(*(int32_t*)val_data); break;
case TSDB_DATA_TYPE_BIGINT:
value = (double)(*(int64_t*)val_data); break;
case TSDB_DATA_TYPE_FLOAT:
value = (double)(*(float*)val_data); break;
case TSDB_DATA_TYPE_DOUBLE:
value = *(double*)val_data; break;
default:
continue;
}
// 存储数据点
g_timestamps[g_count] = timestamp;
g_values[g_count] = value;
g_count++;
}
return 0;
}
// UDF聚合完成函数
DLL_EXPORT int32_t filtered_max_finish(SUdfInterBuf *interBuf, SUdfInterBuf *result) {
if (result == NULL || result->bufLen < sizeof(double)) {
return -1;
}
double final_result = 0.0;
// 如果没有调用process或没有数据
if (g_process_called == 0 || g_count == 0) {
final_result = 0.0;
} else if (g_count == 1) {
// 只有一个点
final_result = g_values[0];
} else {
// 多个点:排序并过滤
// 创建索引数组
int* indices = (int*)malloc(g_count * sizeof(int));
if (indices == NULL) {
final_result = 0.0;
goto finish;
}
// 初始化索引
for (int i = 0; i < g_count; i++) {
indices[i] = i;
}
// 按时间戳排序(冒泡排序)
for (int i = 0; i < g_count - 1; i++) {
for (int j = 0; j < g_count - 1 - i; j++) {
if (g_timestamps[indices[j]] > g_timestamps[indices[j + 1]]) {
int temp = indices[j];
indices[j] = indices[j + 1];
indices[j + 1] = temp;
}
}
}
// 构建递增序列并过滤差值超过2000的点
int32_t filtered_count = 0;
double* filtered_values = (double*)malloc(g_count * sizeof(double));
if (filtered_values == NULL) {
free(indices);
final_result = 0.0;
goto finish;
}
// 保留第一个点
filtered_values[filtered_count++] = g_values[indices[0]];
// 处理剩余点
for (int32_t i = 1; i < g_count; i++) {
double prev = filtered_values[filtered_count - 1];
double current = g_values[indices[i]];
// 检查是否递增且差值不超过2000
if (current >= prev && fabs(current - prev) <= 2000.0) {
filtered_values[filtered_count++] = current;
}
}
// 计算最大值
if (filtered_count > 0) {
final_result = filtered_values[0];
for (int32_t i = 1; i < filtered_count; i++) {
if (filtered_values[i] > final_result) {
final_result = filtered_values[i];
}
}
}
free(filtered_values);
free(indices);
}
finish:
(double)result->buf = final_result;
result->bufLen = sizeof(double);
result->numOfResult = 1;
return 0;
}
// UDF销毁函数
DLL_EXPORT int32_t filtered_max_destroy() {
// 重置全局状态
g_count = 0;
g_process_called = 0;
return 0;
}
现在问题是频繁调用会报错 一开始报错udf pipe read error
现在又报错udf function execution failure
好像是调用11次就会报错一次