UDF 命名空间 / udf.py
udf.dispatch
为不同的输入分配不同的`Expr` 参数 - `f`: 闭包或者函数,这个函数输入 `pl.Schema`, 输出一个 `Expr` 作用 ✅ - 该方法用于执行 `dispatch` 对应能力;具体业务语义以上文原始说明为准。
接口示例returns: Exprudf
输入 / 输出
输入
- 函数签名:`dispatch(f: Callable[[pl.Schema], Expr])` - 参数约束和边界条件以上文描述为准;如无上文说明,按类型注解/默认值执行。
| 输入项 | 类型 | 示例 |
|---|---|---|
x | Float64 | 1.0 |
输出
| 项目 | 说明 |
|---|---|
| 返回类型 | Expr |
| 输出对象 | 表达式/执行计划/配置对象 |
| 输出语义 | 输出列由算子、alias 或底层实现决定;需要稳定列名时显式使用 alias。 |
| 执行方式 | 先构造对象,再放入 DataSet、Monitor、UDF 或真实执行上下文。 |
| 核心调用 | col("x").udf.dispatch(lambda schema: col("x") + col.lit(1.0)) |
打印输入 / 打印输出
下面内容来自本页示例代码真实执行后的 stdout,不是手写占位。
打印输入
shape: (20, 1) ┌──────┐ │ x │ │ --- │ │ f64 │ ╞══════╡ │ 1.0 │ │ 1.25 │ │ 1.5 │ │ 1.75 │ │ 2.0 │ │ … │ │ 4.75 │ │ 5.0 │ │ 5.25 │ │ 5.5 │ │ 5.75 │ └──────┘
打印输出
shape: (4, 2)
┌────────────────┬─────────────────────────────────┐
│ 项目 ┆ 内容 │
│ --- ┆ --- │
│ str ┆ str │
╞════════════════╪═════════════════════════════════╡
│ 调用 ┆ col("x").udf.dispatch(lambda s… │
│ 返回类型 ┆ Expr │
│ 状态 ┆ 表达式构造完成 │
│ 怎么得到业务表 ┆ 放进 │
│ ┆ col(...).runtime().calc_dat… │
└────────────────┴─────────────────────────────────┘调用
col("x").udf.dispatch(lambda schema: col("x") + col.lit(1.0))| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
f | Callable[[pl.Schema], Expr] | 必填 | 位置参数 |
源码参数说明
- `f`: 闭包或者函数,这个函数输入 `pl.Schema`, 输出一个 `Expr` 作用 ✅ - 该方法用于执行 `dispatch` 对应能力;具体业务语义以上文原始说明为准。
完整代码
这个算子页使用接口示例:不伪造计算结果。需要真实上下文、多输入源、Monitor session、UDF 回调、策略状态,或当前底层实现之后再执行。
展开可复制完整代码
import datetime as dt
import polars as pl
import qust as qs
from qust import col, pms
data = pl.DataFrame(
{
"x": [1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5, 4.75, 5.0, 5.25, 5.5, 5.75],
}
)
class AddOne(qs.UdfRow):
def output_schema(self, input_schema):
return [("x_plus_1", qs.dt.Float64)]
def update(self, x):
self.x = x
def calc(self):
return [None if self.x is None else self.x + 1.0]
class PassThroughBatch(qs.UdfBatch):
def calc_batch(self, packet):
return packet.get_dataframe()
pool = qs.DataPool("doc_pool")
monitor = qs.Monitor()
right = qs.DataSource("right")
schema = [("x", qs.dt.Float64)]
plot_expr = col("x")
df = col("x").runtime()
print("算子:")
print('udf.dispatch')
print("场景:")
print('UDF:接入 Python 自定义行算子或批算子。')
print("模式:")
print('接口示例:只构造表达式或对象,不伪造计算结果。')
print("输入列:")
print('x')
print("调用:")
print('col("x").udf.dispatch(lambda schema: col("x") + col.lit(1.0))')
print("输入数据模板:")
print(data)
try:
result = col("x").udf.dispatch(lambda schema: col("x") + col.lit(1.0))
except BaseException as err:
out = pl.DataFrame({
"项目": ["调用", "状态", "错误类型", "错误信息"],
"内容": ['col("x").udf.dispatch(lambda schema: col("x") + col.lit(1.0))', "未执行成数据表", type(err).__name__, str(err)[:200]],
})
print("输出:")
print(out)
else:
out = pl.DataFrame({
"项目": ["调用", "返回类型", "状态", "怎么得到业务表"],
"内容": ['col("x").udf.dispatch(lambda schema: col("x") + col.lit(1.0))', type(result).__name__, "表达式构造完成", "放进 col(...).runtime().calc_data(data)、col.with_cols(...).runtime().calc_data(data),或对应 Monitor/DataSet 运行上下文。"],
})
print("输出:")
print(out)改成业务代码
| 改哪里 | 怎么改 |
|---|---|
| 列名 | 把示例 DataFrame 里的列名换成你的真实列名,列顺序保持和用法一致。 |
| 参数 | 只改函数括号里的参数;不要随意改变 rolling/over/batch/select 的链式层级。 |
| 输出名 | 需要稳定输出列名时,在表达式尾部加 .alias("name")。 |
| 调试 | 先打印输入数据和调用字符串,再执行 calc_data;报 schema 错时先检查列数和 dtype。 |
注意事项
- 参数类型与预期不一致会导致运行时报错或返回空值。
来源
| 项目 | 位置 |
|---|---|
| 源码文件 | udf.py |
| 类/对象 | Udf |