我先利用sql语句新建了一个主题,存放now()-60m间的所有数据,数据的更新频率是100ms一个。现在我利用下面的示例代码对该主题进行订阅,每一分钟调用一次subscribe(),并分别设置autoOffsetReset = "earliest",autoCommitState = "False",希望订阅后不要自动提交偏移量。出现了下面两个问题:
(1)第一次订阅(创建消费者再订阅),只能读到前10m的数据,而不是一个小时内的所有数据
(2)后面的订阅(利用第一次订阅创建好的消费者继续订阅)发现数据是从上次订阅结束的位置开始的,也就是还是自动提交了偏移量
案例代码如下,订阅时subscribe函数略有改动且没有手动提交偏置
db = # 数据库名称
topic = # 主题名称
user = # 数据库用户名
password = # 数据库密码
host = # 数据库服务器地址
port = # 数据库服务端口
groupId = "group1" # 消费者组ID
clientId = "1" # 消费者客户端ID
tdConnWsScheme = "ws" # 连接协议(WebSocket)
autoOffsetReset = "latest" # 消费者起始偏移量,最新的消息
autoCommitState = "true" # 自动提交偏移量的状态
autoCommitIntv = "1000" # 自动提交偏移量的时间间隔(毫秒)
# 导入 TMQ 消费者模块
from taos.tmq import Consumer
# 创建消费者
def create_consumer():
try:
# 创建一个TMQ消费者并配置消费者参数
consumer = Consumer(
{
"group.id": groupId, # 消费者组ID
"client.id": clientId, # 客户端ID
"td.connect.user": user, # 用户名
"td.connect.pass": password, # 密码
"enable.auto.commit": autoCommitState, # 是否自动提交偏移量
"auto.commit.interval.ms": autoCommitIntv, # 自动提交偏移量的间隔
"auto.offset.reset": autoOffsetReset, # 消费起始偏移量
"td.connect.ip": host, # 服务器IP
"td.connect.port": str(port), # 服务器端口
}
)
# 成功创建消费者后打印相关信息
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
return consumer
except Exception as err:
# 捕获异常并打印错误信息
print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# 消费者订阅主题并消费数据
def subscribe(consumer):
try:
# 订阅指定的主题
consumer.subscribe(["topic_meters"])
print("Subscribe topics successfully")
# 循环轮询获取数据
for i in range(50):
records = consumer.poll(1) # 每次获取一条记录
if records:
# 检查是否有错误
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
# 获取数据并打印
val = records.value()
if val:
for block in val:
data = block.fetchall() # 获取所有数据行
print(f"data: {data}")
except Exception as err:
# 捕获异常并打印错误信息
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
def commit_offset(consumer):
try:
# 轮询并处理50条数据
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
print(block.fetchall())
# 处理数据后手动提交偏移量
consumer.commit(records)
print("Commit offset manually successfully.");
except Exception as err:
# 捕获异常并打印错误信息
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# 设置消费者的偏移量,从流的开头消费数据
def seek_offset(consumer):
try:
# 获取当前分配的分区
assignments = consumer.assignment()
if assignments:
# 将每个分区的偏移量设置为 0(从头开始)
for partition in assignments:
partition.offset = 0
consumer.seek(partition) # 设置偏移量
print(f"Assignment seek to beginning successfully.")
except Exception as err:
# 捕获异常并打印错误信息
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# 取消消费者的订阅并关闭消费者
def unsubscribe(consumer):
try:
# 取消订阅
consumer.unsubscribe()
print("Consumer unsubscribed successfully.")
except Exception as err:
# 捕获异常并打印错误信息
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
finally:
# 最终关闭消费者并释放资源
if consumer:
consumer.close()
print("Consumer closed successfully.")
# 主函数
if __name__ == "__main__":
consumer = None
try:
# 执行数据库准备操作
prepareMeta()
# 创建消费者
consumer = create_consumer()
# 订阅主题并消费数据
subscribe(consumer)
# 将偏移量调整到流的最开始
seek_offset(consumer)
# 手动提交偏移量
commit_offset(consumer)
except Exception as err:
# 捕获异常并打印错误信息
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
finally:
# 取消订阅并关闭消费者
unsubscribe(consumer);
请补充一下数据库 以及驱动的版本 还有所选择用的连接方式。
TDE版本是3.3.2.0 community
taospy版本是2.7.13
使用的原生连接