Spark读取taos中的数据查询到的数据被替换为列名

TDengine中的数据:

spark读取到的数据:

TDengine版本:3.3.6.13,驱动版本:3.6.3

这个应该是代码解析处理的问题,请参考一下官网的示例代码,检查一下查询结果解析部分的处理。

package com.chint.tdengine.test;

import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DemoRead {
// read
public static void main(String args) {
// create spark
SparkSession spark = SparkSession.builder()
.appName(“appSparkTest”)
.master(“local[*]”)
.getOrCreate();

    // connect
    String url      = "jdbc:TAOS-WS://10.*.*.132:6041/spv?user=root&password=taosdata";
    String driver   = "com.taosdata.jdbc.ws.WebSocketDriver";
    int    timeout  = 60; // seconds
    try {
        // query sql
        DataFrameReader reader = spark.read()
                .format("jdbc")
                .option("url", url)
                .option("driver", driver)
                .option("queryTimeout", timeout);

        // map table
        String dbtable  = "spv_history_day";
        Dataset<Row> df = reader.option("dbtable", dbtable).load();
        String log      = String.*format*("------------ show dbtable read:%s -----------", dbtable);
        System.*out*.println(log);

        // show schema
        df.printSchema();

        // show data
        df.show(Integer.*MAX_VALUE*, 40, false);

        // out succ log
        System.*out*.println("test read successfully!");

    } catch (Exception ex) {
        System.*out*.println("Failed to read error Message: " + ex.getMessage());
        ex.printStackTrace();
    }

    // stop
    spark.stop();
}

}

Tdengine中建表语句:

CREATE

STABLE IF NOT EXISTS spv_history_day (

collect_time TIMESTAMP,

point_value DOUBLE

) TAGS (

site_code BINARY(32),

device_code BINARY(32),

point_code BINARY(32),

tenant_id BINARY(20)

);

spark执行结果报错,类型转化错误:

即使是我在代码中直接指定schema也还是不行:

String customSchema = “collect_time STRING, point_value STRING, site_code STRING, device_code STRING, point_code STRING, tenant_id STRING”;

String query = "SELECT " +

"CAST(collect_time AS TIMESTAMP) as collect_time, " +

"CAST(point_value AS DOUBLE) as point_value, " +

"site_code, device_code, point_code, tenant_id " +

“FROM (SELECT * FROM spv_history_day WHERE 1=1) t”;

Dataset df = spark.read()

.format(“jdbc”)

.option(“url”, url)

.option(“driver”, “com.taosdata.jdbc.ws.WebSocketDriver”)

.option(“query”, query) // 使用自定义查询

.option(“customSchema”, customSchema) // 强制指定 Schema

.option(“queryTimeout”, 60)

.load();

这样也是一样的报错

@TDuser_sheyj 帮忙指导一下吧。

之前遇到过,应该是 sql 方言问题。 TDengine 不认识 spark 加的引号 “ 方言,只认识反引号 `

请先参考代码示例,为 TDengine 加一个方言 tdengine-eco/spark/src/main/java/com/taosdata/java/TDengineDialect.java at main · taosdata/tdengine-eco · GitHub

感谢,通过这种方式问题已解决