用python3进行数据订阅

我先利用sql语句新建了一个主题,存放now()-60m间的所有数据,数据的更新频率是100ms一个。现在我利用下面的示例代码对该主题进行订阅,每一分钟调用一次subscribe(),并分别设置autoOffsetReset = "earliest",autoCommitState = "False",希望订阅后不要自动提交偏移量。出现了下面两个问题:
(1)第一次订阅(创建消费者再订阅),只能读到前10m的数据,而不是一个小时内的所有数据
(2)后面的订阅(利用第一次订阅创建好的消费者继续订阅)发现数据是从上次订阅结束的位置开始的,也就是还是自动提交了偏移量

案例代码如下,订阅时subscribe函数略有改动且没有手动提交偏置

db              =                    # 数据库名称
topic           =             # 主题名称
user            =                     # 数据库用户名
password        =                 # 数据库密码
host            =           # 数据库服务器地址
port            =                       # 数据库服务端口
groupId         = "group1"                  # 消费者组ID
clientId        = "1"                       # 消费者客户端ID
tdConnWsScheme  = "ws"                      # 连接协议(WebSocket)
autoOffsetReset = "latest"                  # 消费者起始偏移量,最新的消息
autoCommitState = "true"                    # 自动提交偏移量的状态
autoCommitIntv  = "1000"                    # 自动提交偏移量的时间间隔(毫秒)

# 导入 TMQ 消费者模块
from taos.tmq import Consumer

# 创建消费者
def create_consumer():
    try:
        # 创建一个TMQ消费者并配置消费者参数
        consumer = Consumer(
            {
                "group.id": groupId,                    # 消费者组ID
                "client.id": clientId,                  # 客户端ID
                "td.connect.user": user,                # 用户名
                "td.connect.pass": password,            # 密码
                "enable.auto.commit": autoCommitState,  # 是否自动提交偏移量
                "auto.commit.interval.ms": autoCommitIntv,  # 自动提交偏移量的间隔
                "auto.offset.reset": autoOffsetReset,   # 消费起始偏移量
                "td.connect.ip": host,                  # 服务器IP
                "td.connect.port": str(port),           # 服务器端口
            }
        )
        # 成功创建消费者后打印相关信息
        print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
        return consumer
    except Exception as err:
        # 捕获异常并打印错误信息
        print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
        raise err

# 消费者订阅主题并消费数据
def subscribe(consumer):
    try:
        # 订阅指定的主题
        consumer.subscribe(["topic_meters"])
        print("Subscribe topics successfully")
        # 循环轮询获取数据
        for i in range(50):
            records = consumer.poll(1)  # 每次获取一条记录
            if records:
                # 检查是否有错误
                err = records.error()
                if err is not None:
                    print(f"Poll data error, {err}")
                    raise err

                # 获取数据并打印
                val = records.value()
                if val:
                    for block in val:
                        data = block.fetchall()  # 获取所有数据行
                        print(f"data: {data}")

    except Exception as err:
        # 捕获异常并打印错误信息
        print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
        raise err

def commit_offset(consumer):
    try:
        # 轮询并处理50条数据
        for i in range(50):
            records = consumer.poll(1)
            if records:
                err = records.error()
                if err is not None:
                    print(f"Poll data error, {err}")
                    raise err

                val = records.value()
                if val:
                    for block in val:
                        print(block.fetchall())

                # 处理数据后手动提交偏移量
                consumer.commit(records)
                print("Commit offset manually successfully.");

    except Exception as err:
        # 捕获异常并打印错误信息
        print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
        raise err

# 设置消费者的偏移量,从流的开头消费数据
def seek_offset(consumer):
    try:
        # 获取当前分配的分区
        assignments = consumer.assignment()
        if assignments:
            # 将每个分区的偏移量设置为 0(从头开始)
            for partition in assignments:
                partition.offset = 0
                consumer.seek(partition)  # 设置偏移量
                print(f"Assignment seek to beginning successfully.")
    except Exception as err:
        # 捕获异常并打印错误信息
        print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
        raise err

# 取消消费者的订阅并关闭消费者
def unsubscribe(consumer):
    try:
        # 取消订阅
        consumer.unsubscribe()
        print("Consumer unsubscribed successfully.")
    except Exception as err:
        # 捕获异常并打印错误信息
        print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
    finally:
        # 最终关闭消费者并释放资源
        if consumer:
            consumer.close()
            print("Consumer closed successfully.")

# 主函数
if __name__ == "__main__":
    consumer = None

    try:
        # 执行数据库准备操作
        prepareMeta()
        # 创建消费者
        consumer = create_consumer()
        # 订阅主题并消费数据
        subscribe(consumer)
        # 将偏移量调整到流的最开始
        seek_offset(consumer)
        # 手动提交偏移量
        commit_offset(consumer)
    except Exception as err:
        # 捕获异常并打印错误信息
        print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
    finally:
        # 取消订阅并关闭消费者
        unsubscribe(consumer);

请补充一下数据库 以及驱动的版本 还有所选择用的连接方式。

TDE版本是3.3.2.0 community
taospy版本是2.7.13
使用的原生连接

实际应用代码如下:

参数设置部分

通过这些参数创建消费者并进行订阅,并调用subscribe()函数返回订阅的数据: