TDengine中的数据:
spark读取到的数据:
TDengine版本:3.3.6.13,驱动版本:3.6.3
这个应该是代码解析处理的问题,请参考一下官网的示例代码,检查一下查询结果解析部分的处理。
package com.chint.tdengine.test;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class DemoRead {
// read
public static void main(String args) {
// create spark
SparkSession spark = SparkSession.builder()
.appName(“appSparkTest”)
.master(“local[*]”)
.getOrCreate();
// connect
String url = "jdbc:TAOS-WS://10.*.*.132:6041/spv?user=root&password=taosdata";
String driver = "com.taosdata.jdbc.ws.WebSocketDriver";
int timeout = 60; // seconds
try {
// query sql
DataFrameReader reader = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("queryTimeout", timeout);
// map table
String dbtable = "spv_history_day";
Dataset<Row> df = reader.option("dbtable", dbtable).load();
String log = String.*format*("------------ show dbtable read:%s -----------", dbtable);
System.*out*.println(log);
// show schema
df.printSchema();
// show data
df.show(Integer.*MAX_VALUE*, 40, false);
// out succ log
System.*out*.println("test read successfully!");
} catch (Exception ex) {
System.*out*.println("Failed to read error Message: " + ex.getMessage());
ex.printStackTrace();
}
// stop
spark.stop();
}
}
Tdengine中建表语句:
CREATE
STABLE IF NOT EXISTS spv_history_day (
collect_time TIMESTAMP,
point_value DOUBLE
) TAGS (
site_code BINARY(32),
device_code BINARY(32),
point_code BINARY(32),
tenant_id BINARY(20)
);
spark执行结果报错,类型转化错误:
即使是我在代码中直接指定schema也还是不行:String customSchema = “collect_time STRING, point_value STRING, site_code STRING, device_code STRING, point_code STRING, tenant_id STRING”;
String query = "SELECT " +
"CAST(collect_time AS TIMESTAMP) as collect_time, " +
"CAST(point_value AS DOUBLE) as point_value, " +
"site_code, device_code, point_code, tenant_id " +
“FROM (SELECT * FROM spv_history_day WHERE 1=1) t”;
Dataset df = spark.read()
.format(“jdbc”)
.option(“url”, url)
.option(“driver”, “com.taosdata.jdbc.ws.WebSocketDriver”)
.option(“query”, query) // 使用自定义查询
.option(“customSchema”, customSchema) // 强制指定 Schema
.option(“queryTimeout”, 60)
.load();
这样也是一样的报错
请 @TDuser_sheyj 帮忙指导一下吧。
之前遇到过,应该是 sql 方言问题。 TDengine 不认识 spark 加的引号 “ 方言,只认识反引号 `
请先参考代码示例,为 TDengine 加一个方言 tdengine-eco/spark/src/main/java/com/taosdata/java/TDengineDialect.java at main · taosdata/tdengine-eco · GitHub
感谢,通过这种方式问题已解决