跳到主要内容

Databend 中的用户自定义函数(UDF)

用户自定义函数(UDF)允许您根据特定的数据处理需求创建自定义操作。Databend 支持两种主要类型的 UDF:

UDF 类型描述语言使用场景
Lambda UDF (Lambda UDFs)使用 SQL 语法的简单表达式SQL快速转换和计算
嵌入式 UDF (Embedded UDF)完整的编程语言支持Python (企业版), JavaScript, WASM复杂逻辑和算法

Lambda UDF

Lambda UDF 允许您直接在查询中使用 SQL 表达式定义自定义操作。它们非常适合可以用单个 SQL 表达式表示的简单转换。

语法

CREATE [OR REPLACE] FUNCTION <function_name> AS (<parameter_list>) -> <expression>;

参数

参数描述
function_name要创建的 Lambda UDF 的名称
parameter_list以逗号分隔的参数名称列表
expression定义函数逻辑的 SQL 表达式

使用说明

  • Lambda UDF 使用 SQL 编写,并在 Databend 查询引擎(Query Engine)内部执行
  • 它们可以接受多个参数,但必须返回单个值
  • 参数类型在运行时根据输入数据推断
  • 您可以使用显式类型转换(例如 ::FLOAT)来确保正确的数据类型处理
  • Lambda UDF 可用于 SELECT 语句、WHERE 子句和其他 SQL 表达式
  • 它们存储在数据库中,可以使用 SHOW USER FUNCTIONS 命令查看
  • 可以使用 DROP FUNCTION 命令删除 Lambda UDF

示例:计算年龄

-- Create a Lambda UDF to calculate age in years
CREATE OR REPLACE FUNCTION age AS (dt) ->
date_diff(year, dt, now());

-- Create a table with birthdates
CREATE TABLE persons (
id INT,
name VARCHAR,
birthdate DATE
);

-- Insert sample data
INSERT INTO persons VALUES
(1, 'Alice', '1990-05-15'),
(2, 'Bob', '2000-10-20');

-- Use the Lambda UDF to calculate ages
SELECT
name,
birthdate,
age(birthdate) AS age_in_years
FROM persons;

-- Expected output (results will vary based on current date):
-- +-------+------------+-------------+
-- | name | birthdate | age_in_years|
-- +-------+------------+-------------+
-- | Alice | 1990-05-15 | 35 |
-- | Bob | 2000-10-20 | 24 |
-- +-------+------------+-------------+

嵌入式 UDF (Embedded UDF)

嵌入式 UDF(Embedded UDF),也称为脚本 UDF,允许您使用完整的编程语言编写函数,比 Lambda UDF 具有更大的灵活性和更强的功能。

支持的语言

语言描述需要企业版
Python带有标准库的 Python 3
JavaScript现代 JavaScript (ES6+)

语法

CREATE [OR REPLACE] FUNCTION <function_name>([<parameter_type>, ...])
RETURNS <return_type>
LANGUAGE <language_name>
(IMPORTS = ("<import_path>", ...))
(PACKAGES = ("<package_path>", ...))
HANDLER = '<handler_name>'
AS $$
<function_code>
$$;

参数

参数描述
function_nameUDF 的名称
parameter_type每个输入参数的数据类型
return_type函数返回值的数据类型
language_name编程语言(python 或 javascript)
imports暂存区(Stage)文件列表,例如 @s_udf/your_file.zip。文件将从暂存区下载到路径 sys._xoptions['databend_import_directory'],您可以在 Python 代码中读取并解压它。
packages要从 PyPI 安装的包列表,例如 numpypandas 等。
handler_name代码中将被调用的函数的名称
function_code实现函数的实际代码

Python

Python UDF 允许您在 SQL 查询中利用 Python 丰富的标准库和语法。此功能需要 Databend 企业版(Enterprise)。

备注

Python UDF 只能使用 Python 的标准库;不允许导入第三方库。

数据类型映射

Databend 类型Python 类型
NULLNone
BOOLEANbool
INTint
FLOAT/DOUBLEfloat
DECIMALdecimal.Decimal
VARCHARstr
BINARYbytes
LISTlist
MAPdict
STRUCTobject
JSONdict/list

示例:计算年龄

-- Create a Python UDF to calculate age in years
CREATE OR REPLACE FUNCTION calculate_age_py(VARCHAR)
RETURNS INT
LANGUAGE python HANDLER = 'calculate_age'
AS $$
from datetime import datetime

def calculate_age(birth_date_str):
# Parse the date string into a datetime object
birth_date = datetime.strptime(birth_date_str, '%Y-%m-%d')
today = datetime.now()
age = today.year - birth_date.year
if (today.month, today.day) < (birth_date.month, birth_date.day):
age -= 1
return age
$$;

-- Use the Python UDF
SELECT calculate_age_py('1990-05-15') AS age_result;

-- Expected output (will vary based on current date):
-- +------------+
-- | age_result |
-- +------------+
-- | 35 |
-- +------------+

示例:在 Python UDF 中使用 imports/packages

CREATE OR REPLACE FUNCTION package_udf()
RETURNS FLOAT
LANGUAGE PYTHON
IMPORTS = ('@s1/a.zip')
PACKAGES = ('scikit-learn')
HANDLER = 'udf'
AS
$$
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

import fcntl
import os
import sys
import threading
import zipfile

# File lock class for synchronizing write access to /tmp.
class FileLock:
def __enter__(self):
self._lock = threading.Lock()
self._lock.acquire()
self._fd = open('/tmp/lockfile.LOCK', 'w+')
fcntl.lockf(self._fd, fcntl.LOCK_EX)

def __exit__(self, type, value, traceback):
self._fd.close()
self._lock.release()

import_dir = sys._xoptions['databend_import_directory']

zip_file_path = import_dir + "/a.zip"
extracted = '/tmp'

# extract the zip to directory `/tmp/a`
with FileLock():
if not os.path.isdir(extracted + '/a'):
with zipfile.ZipFile(zip_file_path, 'r') as myzip:
myzip.extractall(extracted)

def udf():
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

model = RandomForestClassifier()
model.fit(X_train, y_train)
return model.score(X_test, y_test)
$$;

SELECT package_udf();

╭───────────────────╮
│ package_udf()
│ Nullable(Float32)
├───────────────────┤
1
╰───────────────────╯

JavaScript

JavaScript UDF 允许您在 SQL 查询中使用现代 JavaScript (ES6+) 特性,为 Web 开发人员提供了熟悉的语法。

数据类型映射

Databend 类型JavaScript 类型
NULLnull
BOOLEANBoolean
INTNumber
FLOAT/DOUBLENumber
DECIMALBigDecimal
VARCHARString
BINARYUint8Array
DATE/TIMESTAMPDate
ARRAYArray
MAPObject
STRUCTObject
JSONObject/Array

示例:计算年龄

-- Create a JavaScript UDF to calculate age in years
CREATE OR REPLACE FUNCTION calculate_age_js(VARCHAR)
RETURNS INT
LANGUAGE javascript HANDLER = 'calculateAge'
AS $$
export function calculateAge(birthDateStr) {
// Parse the date string into a Date object
const birthDate = new Date(birthDateStr);
const today = new Date();

let age = today.getFullYear() - birthDate.getFullYear();

// Adjust age if birthday hasn't occurred yet this year
const monthDiff = today.getMonth() - birthDate.getMonth();
if (monthDiff < 0 || (monthDiff === 0 && today.getDate() < birthDate.getDate())) {
age--;
}

return age;
}
$$;

-- Use the JavaScript UDF
SELECT calculate_age_js('1990-05-15') AS age_result;

-- Expected output (will vary based on current date):
-- +------------+
-- | age_result |
-- +------------+
-- | 35 |
-- +------------+

WASM UDF

WASM UDF 允许您使用 Rust 定义函数并将其构建为 WASM 模块,然后加载到 Databend 中。

示例:斐波那契计算

  1. 创建一个名为 arrow-udf-example 的新项目
cargo new arrow-udf-example
  1. 将以下依赖项添加到 Cargo.toml
[package]
name = "arrow-udf-example"
version = "0.1.0"

[lib]
crate-type = ["cdylib"]

[dependencies]
arrow-udf = "0.8"
  1. src/lib.rs 中实现 UDF
use arrow_udf::function;

#[function("fib(int) -> int")]
fn fib(n: i32) -> i32 {
let (mut a, mut b) = (0, 1);
for _ in 0..n {
let c = a + b;
a = b;
b = c;
}
a
}
  1. 构建项目
cargo build --release --target wasm32-wasip1
  1. 将 wasm 模块加载到 Databend
cp /target/wasm32-wasip1/release/arrow_udf_example.wasm  /tmp

并通过 bendsql 创建暂存区(Stage)并将 wasm 模块放入暂存区

🐳 root@default:) create stage s_udf;
🐳 root@default:) put fs:///tmp/arrow_udf_example.wasm @s_udf/;

🐳 root@default:) CREATE OR REPLACE FUNCTION fib_wasm (INT) RETURNS INT LANGUAGE wasm HANDLER = 'fib' AS $$@s_udf/arrow_udf_example.wasm$$;


🐳 root@default:) select fib_wasm(10::Int32);
╭─────────────────────╮
│ fib_wasm(10::Int32)
│ Nullable(Int32)
├─────────────────────┤
55
╰─────────────────────╯

管理 UDF

Databend 提供了几个命令来帮助您管理 UDF:

命令描述示例
SHOW USER FUNCTIONS列出当前数据库中的所有 UDFSHOW USER FUNCTIONS;
DROP FUNCTION删除一个 UDFDROP FUNCTION age;
ALTER FUNCTION修改一个 UDFALTER FUNCTION age RENAME TO calculate_age;

有关 UDF 管理命令的完整文档,请参阅用户自定义函数