Qust Docs

DataFrame 执行计划 / dataframe.py

dataframe.calc_stream

显式流式 update 执行入口。

可执行示例returns: pl.DataFramedataframe

输入 / 输出

输入

按下表列名和类型准备输入。

输入项类型示例
当前对象Expr / DataFrame / Params由调用链左侧对象提供

输出

项目说明
返回类型pl.DataFrame
输出对象DataFrame 执行计划或 Polars DataFrame
输出语义输出列由算子、alias 或底层实现决定;需要稳定列名时显式使用 alias。
执行方式调用后用真实 Polars DataFrame 执行。
核心调用df.calc_stream(data)

打印输入 / 打印输出

下面内容来自本页示例代码真实执行后的 stdout,不是手写占位。

调用

Python
df.calc_stream(data)
参数类型默认值说明
data未标注必填位置参数

完整代码

本页完整例子会执行真实的 calc_data 或对象调用。
展开可复制完整代码
Python
import polars as pl
import qust as qs
from qust import col

data = pl.DataFrame(
    {
        "x": [1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75, 5.0, 5.25, 5.5, 5.75],
    }
)
df = col((col("x") + col("y")).alias("sum_xy")).runtime()

print("算子:")
print('dataframe.calc_stream')
print("场景:")
print('DataFrame 执行计划:组装、保存、执行 Qust plan。')
print("模式:")
print('执行计划示例:calc_stream 保留逐批 update 语义。')
print("输入列:")
print('无固定表格输入列')
print("调用:")
print('df.calc_stream(data)')
print("输入数据:")
print(data)
try:
    out = df.calc_stream(data)
except BaseException as err:
    out = pl.DataFrame({
        "项目": ["调用", "状态", "提示"],
        "内容": ['df.calc_stream(data)', type(err).__name__, str(err)[:200]],
    })
print("输出:")
print(out)

改成业务代码

改哪里怎么改
列名把示例 DataFrame 里的列名换成你的真实列名,列顺序保持和用法一致。
参数只改函数括号里的参数;不要随意改变 rolling/over/batch/select 的链式层级。
输出名需要稳定输出列名时,在表达式尾部加 .alias("name")
调试先打印输入数据和调用字符串,再执行 calc_data;报 schema 错时先检查列数和 dtype。

注意事项

  • 先确认输入列名、顺序、类型和本页一致。
  • 输出列名不符合业务语义时,显式追加 .alias(...)
  • 窗口和分组类算子要确认 rolling/expanding/over/batch 的链式层级。

来源

项目位置
源码文件dataframe.py
类/对象DataFrame