@liyuj
2022-04-19T17:00:45.000000Z
字数 10299
阅读 5219
Apache-Ignite-2.3.0-中文开发手册
Ignite提供了一个JDBC驱动,它可以通过标准的SQL语句处理分布式数据,比如从JDBC端直接进行SELECT
、INSERT
、UPDATE
和DELETE
。
目前,Ignite支持两种类型的驱动,轻量易用的JDBC Thin模式驱动以及以客户端节点形式直接接入集群。
JDBC Thin模式驱动是默认的,是一个轻量级驱动,要使用这种驱动,只需要将`ignite-core-{version}.jar`放入应用的类路径即可。
驱动会接入集群节点然后将所有的请求转发给它进行处理。节点会处理分布式的查询以及结果集的汇总,然后将结果集反馈给客户端应用。
JDBC连接串如下所示:
jdbc:ignite:thin://host[:port][/schema][?<params>]
host
是必需的,它定义了要接入的集群节点主机地址;port
是接入的端口,如果不指定默认为10800
;schema
是要访问的模式名,默认是PUBLIC
,这个名字对应于SQL的ANSI-99标准,不加引号是大小写不敏感的,加引号是大小写敏感的;<params>
是可选的,形式如下:
param1=value1?param2=value2?...:paramN=valueN
驱动类名为org.apache.ignite.IgniteJdbcThinDriver
,比如,下面就是如何打开到集群节点的连接,监听地址为192.168.0.50:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open the JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");
下表列出了JDBC连接串支持的所有参数:
属性名 | 描述 | 默认值 |
---|---|---|
distributedJoins |
对于非并置数据是否使用分布式关联 | false |
enforceJoinOrder |
是否在查询中强制表的关联顺序,如果配置为true ,查询优化器在关联中不会对表进行重新排序。 |
false |
collocated |
数据是否并置,当执行分布式查询时,它会将子查询发送给各个节点,如果事先知道要查询的数据在相同的节点是并置在一起的,那么Ignite会有显著的性能提升和网络优化。 | false |
replicatedOnly |
查询是否只包含复制表,这是一个潜在的可能提高性能的提示。 | false |
autoCloseServerCursor |
当拿到最后一个结果集时是否自动关闭服务端游标。开启之后,对ResultSet.close() 的调用就不需要网络访问,这样会改进性能。但是,如果服务端游标已经关闭,在调用ResultSet.getMetadata() 方法时会抛出异常,这时为什么默认值为false 的原因。 |
false |
socketSendBuffer |
发送套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 | 0 |
socketReceiveBuffer |
接收套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 | 0 |
tcpNoDelay |
是否使用TCP_NODELAY 选项。 |
true |
lazy |
查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 |
false |
skipReducerOnUpdate |
开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |
连接串示例
jdbc:ignite:thin://myHost
:接入myHost
,其它比如端口为10800
等都是默认值;jdbc:ignite:thin://myHost:11900
:接入myHost
,自定义端口为11900
,其它为默认值;jdbc:ignite:thin://myHost:11900?distributedJoins=true&autoCloseServerCursor=true
:接入myHost
,自定义端口为11900
,开启了分布式关联和autoCloseServerCursor
优化;jdbc:ignite:thin://myHost:11900/myschema?lazy=true
:接入myHost
,自定义端口为11900
,模式为MYSCHEMA
,并且开启了查询的延迟执行;jdbc:ignite:thin://myHost:11900/"MySchema"?lazy=true
:接入myHost
,自定义端口为11900
,模式为MySchema
(模式名区分大小写),并且开启了查询的延迟执行。集群配置
为了接收和处理来自JDBC Thin驱动转发过来的请求,一个节点需要绑定到一个本地网络端口10800
,然后监听入站请求。
通过IgniteConfiguration
配置SqlConnectorConfiguration
,可以对参数进行修改:
Java:
IgniteConfiguration cfg = new IgniteConfiguration()
.setSqlConnectorConfiguration(new SqlConnectorConfiguration());
XML:
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="sqlConnectorConfiguration">
<bean class="org.apache.ignite.configuration.SqlConnectorConfiguration" />
</property>
</bean>
其支持如下的参数:
参数名 | 描述 | 默认值 |
---|---|---|
host |
绑定的主机名或者IP地址,如果配置为null ,会使用IgniteConfigiration.localHost 。 |
null |
port |
绑定的端口,如果指定的端口已被占用,Ignite会使用portRange 属性来查找其他可用的端口。 |
10800 |
portRange |
定义尝试绑定的端口数量,比如,如果端口配置为10800 并且端口范围为100 ,Ignite会从10800开始,在[10800,10900]范围内查找可用端口。 |
100 |
maxOpenCursorsPerConnection |
每个连接打开的服务端游标的最大数量,如果超过了,当试图打开另一个游标时会抛出异常。 | 128 |
threadPoolSize |
执行查询的线程数量。 | max(8,CPU核数) |
socketSendBufferSize |
发送套接字缓冲区大小,如果配置为0 ,会使用操作系统默认值。 |
0 |
socketReceiveBufferSize |
接收套接字缓冲区大小,如果配置为0 ,会使用操作系统默认值。 |
0 |
tcpNoDelay |
是否使用TCP_NODELAY 选项。 |
true |
要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection
对象:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open the JDBC connection.
Connection conn = DriverManager.getConnection("`jdbc:ignite:thin://192.168.0.50");
之后就可以执行SELECT
SQL查询了:
// Query names of all people.
ResultSet rs = conn.createStatement().executeQuery("select name from Person");
while (rs.next()) {
String name = rs.getString(1);
...
}
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
stmt.setInt(1, 30);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String name = rs.getString("name");
int age = rs.getInt("age");
...
}
此外,可以使用DML语句对数据进行修改。
INSERT
// Insert a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.execute();
MERGE
// Merge a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.executeUpdate();
UPDATE
// Update a Person.
conn.createStatement().
executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");
DELETE
conn.createStatement().execute("DELETE FROM Person WHERE age = 25");
JDBC客户端节点模式驱动使用自己的完整功能的客户端节点连接接入集群,这要求开发者提供一个完整的Spring XML配置作为JDBC连接串的一部分,然后拷贝下面所有的jar文件到应用或者SQL工具的类路径中:
{apache_ignite_release}\libs
目录下的所有jar文件;{apache_ignite_release}\ignite-indexing
和{apache_ignite_release}\ignite-spring
目录下的所有jar文件;这个驱动很重,而且可能不支持Ignite的最新SQL特性,但是因为它底层使用客户端节点连接,它可以执行分布式查询,然后在应用端直接对结果进行汇总。
JDBC连接URL的规则如下:
jdbc:ignite:cfg://[<params>@]<config_url>
<config_url>
是必需的,表示指向Ignite客户端节点配置文件的任意合法URL,当驱动试图建立到集群的连接时,这个节点会在Ignite JDBC客户端节点驱动中启动; <params>
是可选的,格式如下:
param1=value1:param2=value2:...:paramN=valueN
驱动类名为org.apache.ignite.IgniteJdbcDriver
,比如下面的代码,展示了如何打开一个到集群的连接:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");
它支持如下的参数:
属性 | 描述 | 默认值 |
---|---|---|
cache |
缓存名,如果未定义会使用默认的缓存,区分大小写 | |
nodeId |
要执行的查询所在节点的Id,对于在本地查询是有用的 | |
local |
查询只在本地节点执行,这个参数和nodeId 参数都是通过指定节点来限制数据集 |
false |
collocated |
优化标志,当Ignite执行一个分布式查询时,他会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和网络优化 | false |
distributedJoins |
可以在非并置的数据上使用分布式关联。 | false |
streaming |
通过INSERT 语句为本链接开启批量数据加载模式,具体可以参照后面的流模式 相关章节。 |
false |
streamingAllowOverwrite |
通知Ignite对于重复的已有键,覆写它的值而不是忽略他们,具体可以参照后面的流模式 相关章节。 |
false |
streamingFlushFrequency |
超时时间,毫秒,数据流处理器用于刷新数据,数据默认会在连接关闭时刷新,具体可以参照后面的流模式 相关章节。 |
0 |
streamingPerNodeBufferSize |
数据流处理器的每节点缓冲区大小,具体可以参照后面的流模式 相关章节。 |
1024 |
streamingPerNodeParallelOperations |
数据流处理器的每节点并行操作数。具体可以参照后面的流模式 相关章节。 |
16 |
transactionsAllowed |
目前已经支持了ACID事务,但是仅仅在键-值API层面,在SQL层面Ignite支持原子性,还不支持事务一致性,这意味着使用这个功能的时候驱动可能抛出不支持事务 这样的异常。但是,一些BI工具会一直强制事务行为,这时即使不需要事务,也需要将该参数配置为true 以满足需求。 |
false |
multipleStatementsAllowed |
JDBC驱动可以同时处理多个SQL语句并且返回多个ResultSet 对象,如果该参数为false,多个语句的查询会返回错误。 |
false |
lazy |
查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 |
false |
skipReducerOnUpdate |
开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |
跨缓存查询
驱动连接到的缓存会被视为默认的模式,要跨越多个缓存进行查询,可以参照3.6.缓存查询
章节。
流模式
使用JDBC驱动,可以以流模式(批处理模式)将数据注入Ignite集群。这时驱动会在内部实例化IgniteDataStreamer
然后将数据传给它。要激活这个模式,可以在JDBC连接串中增加streaming
参数并且设置为true
:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Opening connection in the streaming mode.
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true@file:///etc/config/ignite-jdbc.xml");
目前,流模式只支持INSERT操作,对于想更快地将数据预加载进缓存的场景非常有用。JDBC驱动定义了多个连接参数来影响流模式的行为,这些参数已经在上述的参数表中列出。
这些参数几乎覆盖了IgniteDataStreamer
的所有常规配置,这样就可以根据需要更好地调整流处理器。关于如何配置流处理器可以参考流处理器
的相关文档来了解更多的信息。
基于时间的刷新
默认情况下,当要么连接关闭,要么达到了streamingPerNodeBufferSize
,数据才会被刷新,如果希望按照时间的方式来刷新,那么可以调整streamingFlushFrequency
参数。
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Opening a connection in the streaming mode and time based flushing set.
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true@streamingFlushFrequency=1000@file:///etc/config/ignite-jdbc.xml");
PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
// Adding the data.
for (int i = 1; i < 100000; i++) {
// Inserting a Person object with a Long key.
stmt.setInt(1, i);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.execute();
}
conn.close();
// Beyond this point, all data is guaranteed to be flushed into the cache.
要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection
对象:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");
之后就可以执行SELECT
SQL查询了:
// Query names of all people.
ResultSet rs = conn.createStatement().executeQuery("select name from Person");
while (rs.next()) {
String name = rs.getString(1);
...
}
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
stmt.setInt(1, 30);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String name = rs.getString("name");
int age = rs.getInt("age");
...
}
此外,可以使用DML语句对数据进行修改。
INSERT
// Insert a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.execute();
MERGE
// Merge a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.executeUpdate();
UPDATE
// Update a Person.
conn.createStatement().
executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");
DELETE
conn.createStatement().execute("DELETE FROM Person WHERE age = 25");
Ignite的JDBC驱动将错误码封装进了java.sql.SQLException
类,它简化了应用端的错误处理。要获得错误码,可以使用java.sql.SQLException.getSQLState()
方法,它会返回一个字符串:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1");
PreparedStatement ps;
try {
ps = conn.prepareStatement("INSERT INTO Person(id, name, age) values (1," +
"'John', 'unparseableString')");
}
catch (SQLException e) {
switch (e.getSQLState()) {
case "0700B":
System.out.println("Conversion failure");
break;
case "42000":
System.out.println("Parsing error");
break;
default:
System.out.println("Unprocessed error: " + e.getSQLState());
break;
}
}
下表中列出了Ignite目前支持的所有错误码,未来这个列表可能还会扩展:
代码 | 描述 |
---|---|
0700B |
转换失败(比如,一个字符串表达式无法解析成数值或者日期) |
0700E |
无效的事务隔离级别 |
08001 |
驱动接入集群失败 |
08003 |
连接意外地处于关闭状态 |
08004 |
连接被集群拒绝 |
08006 |
通信中发生I/O错误 |
22004 |
不允许的空值 |
22023 |
不支持的参数类型 |
23000 |
违反了数据完整性约束 |
24000 |
无效的结果集状态 |
0A000 |
不支持的操作 |
42000 |
查询解析异常 |
50000 |
Ignite内部错误,这个代码不是ANSI定义的,属于Ignite特有的错误,获取java.sql.SQLException 的错误信息可以了解更多的细节 |