跳到主要内容

Java

您可以通过为Java编程语言设计的原生接口,即Databend JDBC驱动,从各种客户端工具和应用程序连接并与Databend进行交互。

安装Databend JDBC驱动

本主题概述了下载和安装Databend JDBC驱动以用于基于Java的项目的步骤。该驱动要求Java LTS(长期支持)版本1.8或更高。如果您的客户端机器没有最低要求的Java版本,请安装Oracle JavaOpenJDK

要下载Databend JDBC驱动:

  1. 访问Maven中央仓库:https://repo1.maven.org/maven2/com/databend/databend-jdbc/
  2. 点击最新版本的目录。
  3. 下载jar文件,例如_databend-jdbc-0.1.1.jar_。

要验证Databend JDBC驱动的版本,例如_databend-jdbc-0.1.1.jar_,请在终端中运行以下命令:

java -jar databend-jdbc-0.2.1.jar --version

Databend JDBC驱动以JAR文件形式提供,可以直接集成到您的基于Java的项目中。或者,您可以在项目的pom.xml文件中声明Maven依赖,如下所示:

<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.2.1</version>
</dependency>
你知道吗?

您还可以通过Databend JDBC驱动从DBeaver连接到Databend。更多信息,请参见使用JDBC连接Databend

数据类型映射

下表展示了Databend数据类型与其对应的Java等效类型之间的对应关系:

DatabendJava
TINYINTByte
SMALLINTShort
INTInteger
BIGINTLong
UInt8Short
UInt16Integer
UInt32Long
UInt64BigInteger
Float32Float
Float64Double
StringString
DateString
TIMESTAMPString
Bitmapbyte[]
ArrayString
DecimalBigDecimal
TupleString
MapString
VARIANTString

Databend JDBC驱动行为摘要

Databend的JDBC驱动通常遵循JDBC规范。以下是一些常见基本行为、其关联的关键功能及其背后的原则列表。

基本行为关键函数原理
建立连接DriverManager.getConnection, Properties.setPropertygetConnection 使用提供的连接字符串与 Databend 建立连接。

Properties 对象用于构建连接参数,例如 userpassword,这些参数也可以在连接字符串中指定。
执行查询Statement.createStatement(), Statement.execute()Statement.execute() 使用 v1/query 接口执行查询。
批量插入Connection.prepareStatement(), PrepareStatement.setInt(), PrepareStatement.setString(), PrepareStatement.addBatch(), PrepareStatement.executeBatch(), 等.Databend 支持使用 PrepareStatement 对象进行批量插入和替换(INSERT INTOREPLACE INTO)。

PrepareStatement.setXXX() 方法用于将值绑定到语句的参数。

PrepareStatement.addBatch() 将尽可能多的数据添加到创建的语句对象的批次中。

PrepareStatement.executeBatch() 将数据上传到内置的 Stage 并执行插入/替换操作,利用 Stage Attachment
上传文件到内部 StageConnection.uploadStream数据将被上传到 Stage。默认情况下,使用 PRESIGN UPLOAD 获取 URL,如果禁用了 PRESIGN,则使用 v1/upload_to_stage API。
从内部 Stage 下载文件Connection.downloadStream数据将从 Stage 下载,使用 PRESIGN DOWNLOAD 获取 URL。

配置连接字符串

一旦驱动程序安装并集成到您的项目中,您可以使用以下 JDBC 连接字符串格式连接到 Databend:

jdbc:databend://<username>:<password>@<host_port>/<database>?<connection_params>

connection_params 指的是一系列一个或多个参数,格式为 param=value。每个参数应使用 & 字符分隔,连接字符串中不应有任何空格。这些参数可以在连接字符串中设置,也可以在传递给 DriverManager.getConnection() 方法的 Properties 对象中设置。例如:

Properties props = new Properties();
props.put("parameter1", parameter1Value);
props.put("parameter2", parameter2Value);
Connection con = DriverManager.getConnection("jdbc:databend://user:pass@host/database", props);

有关可用的连接参数及其描述,请参见 https://github.com/databendcloud/databend-jdbc/blob/main/docs/Connection.md#connection-parameters

示例

示例:创建数据库和表

package com.example;

import java.sql.*;
import java.util.Properties;

public class Main {
// 以连接到本地 Databend 为例,使用 SQL 用户名为 'user1' 和密码 'abc123'。
// 请随意使用您自己的值,同时保持相同的格式。
static final String DB_URL = "jdbc:databend://127.0.0.1:8000";

public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("user", "user1");
properties.setProperty("password", "abc123");
properties.setProperty("SSL", "false");

Connection conn = DriverManager.getConnection(DB_URL, properties);

Statement stmt = conn.createStatement();
String create_sql = "CREATE DATABASE IF NOT EXISTS book_db";
stmt.execute(create_sql);

String use_sql = "USE book_db";
stmt.execute(use_sql);

String ct_sql = "CREATE TABLE IF NOT EXISTS books(title VARCHAR, author VARCHAR, date VARCHAR)";
stmt.execute(ct_sql);
stmt.close();
conn.close();
System.exit(0);
}
}

示例:复制或合并到表中

    public void copyInto(String tableName, List<String> files) throws Exception {
String filesStr = "'" + String.join("','", files) + "'";
String copyIntoSql = String.format("copy into %s from @~ files=(%s) file_format=(type=NDJSON) purge=true;", tableName, filesStr);
Connection connection = createConnection();
try (Statement statement = connection.createStatement()) {
Instant copyIntoStart = Instant.now();
statement.execute(copyIntoSql);
ResultSet r = statement.getResultSet();
while (r.next()) {
}
Instant copyIntoEnd = Instant.now();
System.out.println("Copied files into: " + files.size() + " , time elapsed: " + (copyIntoEnd.toEpochMilli() - copyIntoStart.toEpochMilli()) + "ms");
} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}
// 对于 merge into,只需替换 copyIntoSql。
提示
  1. 由于 SELECT、COPY INTO 和 MERGE INTO 等 SQL 命令返回一个 ResultSet 对象,因此在访问数据之前需要调用 rs.next()。如果不这样做,可能会导致查询被取消。如果你不打算检索结果,可以使用 while 循环(while (r.next()))来遍历 ResultSet,以避免此问题。
  2. 对于其他非查询类型的 SQL 命令,如 CREATE TABLE 或 DROP TABLE,你可以直接调用 statement.execute()。

示例:批量插入

在你的 Java 应用程序代码中,你可以通过在 INSERT 语句中绑定参数并调用 addBatch() 和 executeBatch() 来一次性插入多行数据。

例如,以下代码将两行数据插入到一个包含 INT 列和 VARCHAR 列的表中。该示例将值绑定到 INSERT 语句中的参数,并调用 addBatch() 和 executeBatch() 来执行批量插入。

Connection connection = DriverManager.getConnection(url, prop);

PreparedStatement pstmt = connection.prepareStatement("INSERT INTO t(c1, c2) VALUES(?, ?)");
pstmt.setInt(1, 101);
pstmt.setString(2, "test1");
pstmt.addBatch();

pstmt.setInt(1, 102);
pstmt.setString(2, "test2");
pstmt.addBatch();

int[] count = pstmt.executeBatch(); // 执行后,count[0]=1, count[1]=1
...
pstmt.close();

示例:上传文件到内部 Stage

 /**
* 将 inputStream 上传到 databend 内部 stage,数据将作为一个文件上传,不会拆分。
* 调用者应在上传完成后关闭输入流。
*
* @param stageName 接收上传文件的 stage
* @param destPrefix 文件名在 stage 中的前缀
* @param inputStream 文件的输入流
* @param destFileName 文件在 stage 中的目标文件名
* @param fileSize 文件大小
* @param compressData 是否压缩数据
* @throws SQLException 上传输入流失败
*/
public void uploadStream(String stageName, String destPrefix, InputStream inputStream, String destFileName, long fileSize, boolean compressData) throws SQLException;

上传 CSV 文件到 Databend:

        File f = new File("test.csv");
try (InputStream fileInputStream = Files.newInputStream(f.toPath())) {
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.ALL);
Connection connection = createConnection();
String stageName = "test_stage";
DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class);
PresignContext.createStageIfNotExists(databendConnection, stageName);
databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), false);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
f.delete();
}

示例:从内部 Stage 下载文件

 /**
* 从 databend 内部 stage 下载文件,数据将作为一个文件下载,不会拆分。
*
* @param stageName 包含文件的 stage
* @param sourceFileName 文件在 stage 中的名称
* @param decompress 是否解压数据
* @return 文件的输入流
* @throws SQLException
*/
public InputStream downloadStream(String stageName, String sourceFileName, boolean decompress) throws SQLException;

从 Databend 下载 CSV 文件:

        File f = new File("test.csv");
try (InputStream fileInputStream = Files.newInputStream(f.toPath())) {
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.ALL);
Connection connection = createConnection(true);
String stageName = "test_stage";
DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class);
PresignContext.createStageIfNotExists(databendConnection, stageName);
databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), false);
InputStream downloaded = databendConnection.downloadStream(stageName, "jdbc/test/test.csv", false);
byte[] arr = streamToByteArray(downloaded);
System.out.println(arr);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
f.delete();
}

示例:与 Databend Cloud 集成

在开始之前,请确保你已成功创建计算集群并获取连接信息。有关如何操作,请参阅连接到计算集群

第一步:使用 Maven 添加依赖

<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.2.8</version>
</dependency>

第二步:使用 databend-jdbc 连接

创建一个名为 sample.java 的文件,内容如下:

package databend_cloud;

import java.sql.*;
import java.util.Properties;

public class sample {
public static void main(String[] args) throws Exception {

String url = "jdbc:databend://{WAREHOUSE_HOST}:443/{DATABASE}";
Properties properties = new Properties();
properties.setProperty("user", "{USER}");
properties.setProperty("password", "{PASSWORD}");
properties.setProperty("SSL", "true");
Connection connection = DriverManager.getConnection(url, properties);

// 执行
connection.createStatement().execute("CREATE TABLE IF NOT EXISTS sample_test(id TINYINT, obj VARIANT, d TIMESTAMP, s String, arr ARRAY(INT64)) Engine = Fuse");

// 查询
Statement statement = connection.createStatement();
statement.execute("SELECT number from numbers(200000) order by number");
ResultSet r = statement.getResultSet();
r.next();
for (int i = 1; i < 1000; i++) {
r.next();
System.out.println(r.getInt(1));
}

// 使用 executeBatch() 插入数据
connection.setAutoCommit(false);
PreparedStatement ps = connection.prepareStatement("insert into sample_test values");
ps.setInt(1, 1);
ps.setString(2, "{\"a\": 1,\"b\": 2}");
ps.setTimestamp(3, Timestamp.valueOf("1983-07-12 21:30:55.888"));
ps.setString(4, "hello world, 你好");
ps.setString(5, "[1,2,3,4,5]");
ps.addBatch();
int[] ans = ps.executeBatch();
Statement s = connection.createStatement();

System.out.println("execute select on table");
statement.execute("SELECT * from sample_test");
ResultSet r2 = statement.getResultSet();

while (r2.next()) {
System.out.println(r2.getInt(1));
System.out.println(r2.getString(2));
System.out.println(r2.getTimestamp(3).toString());
System.out.println(r2.getString(4));
System.out.println(r2.getString(5));
}
connection.close();
}
}
提示

将代码中的 {USER}, {PASSWORD}, {WAREHOUSE_HOST}, 和 {DATABASE} 替换为你的连接信息。有关如何获取连接信息,请参阅连接到计算集群

第三步:使用 Maven 运行示例

$ mvn compile
$ mvn exec:java -D exec.mainClass="databend_cloud.sample"