[关闭]
@liyuj 2022-04-19T09:00:45.000000Z 字数 10299 阅读 5055

Apache-Ignite-2.3.0-中文开发手册

4.JDBC

4.1.JDBC Thin模式驱动

Ignite提供了一个JDBC驱动,它可以通过标准的SQL语句处理分布式数据,比如从JDBC端直接进行SELECTINSERTUPDATEDELETE
目前,Ignite支持两种类型的驱动,轻量易用的JDBC Thin模式驱动以及以客户端节点形式直接接入集群。

4.1.1.JDBC Thin模式驱动

JDBC Thin模式驱动是默认的,是一个轻量级驱动,要使用这种驱动,只需要将`ignite-core-{version}.jar`放入应用的类路径即可。
驱动会接入集群节点然后将所有的请求转发给它进行处理。节点会处理分布式的查询以及结果集的汇总,然后将结果集反馈给客户端应用。
JDBC连接串如下所示:
  1. jdbc:ignite:thin://host[:port][/schema][?<params>]
  1. param1=value1?param2=value2?...:paramN=valueN

驱动类名为org.apache.ignite.IgniteJdbcThinDriver,比如,下面就是如何打开到集群节点的连接,监听地址为192.168.0.50:

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
  3. // Open the JDBC connection.
  4. Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");

下表列出了JDBC连接串支持的所有参数:

属性名 描述 默认值
distributedJoins 对于非并置数据是否使用分布式关联 false
enforceJoinOrder 是否在查询中强制表的关联顺序,如果配置为true,查询优化器在关联中不会对表进行重新排序。 false
collocated 数据是否并置,当执行分布式查询时,它会将子查询发送给各个节点,如果事先知道要查询的数据在相同的节点是并置在一起的,那么Ignite会有显著的性能提升和网络优化。 false
replicatedOnly 查询是否只包含复制表,这是一个潜在的可能提高性能的提示。 false
autoCloseServerCursor 当拿到最后一个结果集时是否自动关闭服务端游标。开启之后,对ResultSet.close()的调用就不需要网络访问,这样会改进性能。但是,如果服务端游标已经关闭,在调用ResultSet.getMetadata()方法时会抛出异常,这时为什么默认值为false的原因。 false
socketSendBuffer 发送套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 0
socketReceiveBuffer 接收套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 0
tcpNoDelay 是否使用TCP_NODELAY选项。 true
lazy 查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 false
skipReducerOnUpdate 开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 false

连接串示例

集群配置
为了接收和处理来自JDBC Thin驱动转发过来的请求,一个节点需要绑定到一个本地网络端口10800,然后监听入站请求。
通过IgniteConfiguration配置SqlConnectorConfiguration,可以对参数进行修改:
Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration()
  2. .setSqlConnectorConfiguration(new SqlConnectorConfiguration());

XML:

  1. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  2. <property name="sqlConnectorConfiguration">
  3. <bean class="org.apache.ignite.configuration.SqlConnectorConfiguration" />
  4. </property>
  5. </bean>

其支持如下的参数:

参数名 描述 默认值
host 绑定的主机名或者IP地址,如果配置为null,会使用IgniteConfigiration.localHost null
port 绑定的端口,如果指定的端口已被占用,Ignite会使用portRange属性来查找其他可用的端口。 10800
portRange 定义尝试绑定的端口数量,比如,如果端口配置为10800并且端口范围为100,Ignite会从10800开始,在[10800,10900]范围内查找可用端口。 100
maxOpenCursorsPerConnection 每个连接打开的服务端游标的最大数量,如果超过了,当试图打开另一个游标时会抛出异常。 128
threadPoolSize 执行查询的线程数量。 max(8,CPU核数)
socketSendBufferSize 发送套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 0
socketReceiveBufferSize 接收套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 0
tcpNoDelay 是否使用TCP_NODELAY选项。 true

4.1.2.示例

要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection对象:

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
  3. // Open the JDBC connection.
  4. Connection conn = DriverManager.getConnection("`jdbc:ignite:thin://192.168.0.50");

之后就可以执行SELECTSQL查询了:

  1. // Query names of all people.
  2. ResultSet rs = conn.createStatement().executeQuery("select name from Person");
  3. while (rs.next()) {
  4. String name = rs.getString(1);
  5. ...
  6. }
  7. // Query people with specific age using prepared statement.
  8. PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
  9. stmt.setInt(1, 30);
  10. ResultSet rs = stmt.executeQuery();
  11. while (rs.next()) {
  12. String name = rs.getString("name");
  13. int age = rs.getInt("age");
  14. ...
  15. }

此外,可以使用DML语句对数据进行修改。
INSERT

  1. // Insert a Person with a Long key.
  2. PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
  3. stmt.setInt(1, 1);
  4. stmt.setString(2, "John Smith");
  5. stmt.setInt(3, 25);
  6. stmt.execute();

MERGE

  1. // Merge a Person with a Long key.
  2. PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
  3. stmt.setInt(1, 1);
  4. stmt.setString(2, "John Smith");
  5. stmt.setInt(3, 25);
  6. stmt.executeUpdate();

UPDATE

  1. // Update a Person.
  2. conn.createStatement().
  3. executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");

DELETE

  1. conn.createStatement().execute("DELETE FROM Person WHERE age = 25");

4.2.JDBC客户端模式驱动

4.2.1.JDBC客户端模式驱动

JDBC客户端节点模式驱动使用自己的完整功能的客户端节点连接接入集群,这要求开发者提供一个完整的Spring XML配置作为JDBC连接串的一部分,然后拷贝下面所有的jar文件到应用或者SQL工具的类路径中:

这个驱动很重,而且可能不支持Ignite的最新SQL特性,但是因为它底层使用客户端节点连接,它可以执行分布式查询,然后在应用端直接对结果进行汇总。
JDBC连接URL的规则如下:

  1. jdbc:ignite:cfg://[<params>@]<config_url>
  1. param1=value1:param2=value2:...:paramN=valueN

驱动类名为org.apache.ignite.IgniteJdbcDriver,比如下面的代码,展示了如何打开一个到集群的连接:

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcDriver");
  3. // Open JDBC connection (cache name is not specified, which means that we use default cache).
  4. Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");

它支持如下的参数:

属性 描述 默认值
cache 缓存名,如果未定义会使用默认的缓存,区分大小写
nodeId 要执行的查询所在节点的Id,对于在本地查询是有用的
local 查询只在本地节点执行,这个参数和nodeId参数都是通过指定节点来限制数据集 false
collocated 优化标志,当Ignite执行一个分布式查询时,他会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和网络优化 false
distributedJoins 可以在非并置的数据上使用分布式关联。 false
streaming 通过INSERT语句为本链接开启批量数据加载模式,具体可以参照后面的流模式相关章节。 false
streamingAllowOverwrite 通知Ignite对于重复的已有键,覆写它的值而不是忽略他们,具体可以参照后面的流模式相关章节。 false
streamingFlushFrequency 超时时间,毫秒,数据流处理器用于刷新数据,数据默认会在连接关闭时刷新,具体可以参照后面的流模式相关章节。 0
streamingPerNodeBufferSize 数据流处理器的每节点缓冲区大小,具体可以参照后面的流模式相关章节。 1024
streamingPerNodeParallelOperations 数据流处理器的每节点并行操作数。具体可以参照后面的流模式相关章节。 16
transactionsAllowed 目前已经支持了ACID事务,但是仅仅在键-值API层面,在SQL层面Ignite支持原子性,还不支持事务一致性,这意味着使用这个功能的时候驱动可能抛出不支持事务这样的异常。但是,一些BI工具会一直强制事务行为,这时即使不需要事务,也需要将该参数配置为true以满足需求。 false
multipleStatementsAllowed JDBC驱动可以同时处理多个SQL语句并且返回多个ResultSet对象,如果该参数为false,多个语句的查询会返回错误。 false
lazy 查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 false
skipReducerOnUpdate 开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 false

跨缓存查询
驱动连接到的缓存会被视为默认的模式,要跨越多个缓存进行查询,可以参照3.6.缓存查询章节。

流模式
使用JDBC驱动,可以以流模式(批处理模式)将数据注入Ignite集群。这时驱动会在内部实例化IgniteDataStreamer然后将数据传给它。要激活这个模式,可以在JDBC连接串中增加streaming参数并且设置为true

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcDriver");
  3. // Opening connection in the streaming mode.
  4. Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true@file:///etc/config/ignite-jdbc.xml");

目前,流模式只支持INSERT操作,对于想更快地将数据预加载进缓存的场景非常有用。JDBC驱动定义了多个连接参数来影响流模式的行为,这些参数已经在上述的参数表中列出。
这些参数几乎覆盖了IgniteDataStreamer的所有常规配置,这样就可以根据需要更好地调整流处理器。关于如何配置流处理器可以参考流处理器的相关文档来了解更多的信息。

基于时间的刷新
默认情况下,当要么连接关闭,要么达到了streamingPerNodeBufferSize,数据才会被刷新,如果希望按照时间的方式来刷新,那么可以调整streamingFlushFrequency参数。

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcDriver");
  3. // Opening a connection in the streaming mode and time based flushing set.
  4. Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true@streamingFlushFrequency=1000@file:///etc/config/ignite-jdbc.xml");
  5. PreparedStatement stmt = conn.prepareStatement(
  6. "INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
  7. // Adding the data.
  8. for (int i = 1; i < 100000; i++) {
  9. // Inserting a Person object with a Long key.
  10. stmt.setInt(1, i);
  11. stmt.setString(2, "John Smith");
  12. stmt.setInt(3, 25);
  13. stmt.execute();
  14. }
  15. conn.close();
  16. // Beyond this point, all data is guaranteed to be flushed into the cache.

4.2.2.示例

要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection对象:

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcDriver");
  3. // Open JDBC connection (cache name is not specified, which means that we use default cache).
  4. Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");

之后就可以执行SELECTSQL查询了:

  1. // Query names of all people.
  2. ResultSet rs = conn.createStatement().executeQuery("select name from Person");
  3. while (rs.next()) {
  4. String name = rs.getString(1);
  5. ...
  6. }
  7. // Query people with specific age using prepared statement.
  8. PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
  9. stmt.setInt(1, 30);
  10. ResultSet rs = stmt.executeQuery();
  11. while (rs.next()) {
  12. String name = rs.getString("name");
  13. int age = rs.getInt("age");
  14. ...
  15. }

此外,可以使用DML语句对数据进行修改。
INSERT

  1. // Insert a Person with a Long key.
  2. PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
  3. stmt.setInt(1, 1);
  4. stmt.setString(2, "John Smith");
  5. stmt.setInt(3, 25);
  6. stmt.execute();

MERGE

  1. // Merge a Person with a Long key.
  2. PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
  3. stmt.setInt(1, 1);
  4. stmt.setString(2, "John Smith");
  5. stmt.setInt(3, 25);
  6. stmt.executeUpdate();

UPDATE

  1. // Update a Person.
  2. conn.createStatement().
  3. executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");

DELETE

  1. conn.createStatement().execute("DELETE FROM Person WHERE age = 25");

4.3.错误码

Ignite的JDBC驱动将错误码封装进了java.sql.SQLException类,它简化了应用端的错误处理。要获得错误码,可以使用java.sql.SQLException.getSQLState()方法,它会返回一个字符串:

  1. // Register JDBC driver.
  2. Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
  3. // Open JDBC connection.
  4. Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1");
  5. PreparedStatement ps;
  6. try {
  7. ps = conn.prepareStatement("INSERT INTO Person(id, name, age) values (1," +
  8. "'John', 'unparseableString')");
  9. }
  10. catch (SQLException e) {
  11. switch (e.getSQLState()) {
  12. case "0700B":
  13. System.out.println("Conversion failure");
  14. break;
  15. case "42000":
  16. System.out.println("Parsing error");
  17. break;
  18. default:
  19. System.out.println("Unprocessed error: " + e.getSQLState());
  20. break;
  21. }
  22. }

下表中列出了Ignite目前支持的所有错误码,未来这个列表可能还会扩展:

代码 描述
0700B 转换失败(比如,一个字符串表达式无法解析成数值或者日期)
0700E 无效的事务隔离级别
08001 驱动接入集群失败
08003 连接意外地处于关闭状态
08004 连接被集群拒绝
08006 通信中发生I/O错误
22004 不允许的空值
22023 不支持的参数类型
23000 违反了数据完整性约束
24000 无效的结果集状态
0A000 不支持的操作
42000 查询解析异常
50000 Ignite内部错误,这个代码不是ANSI定义的,属于Ignite特有的错误,获取java.sql.SQLException的错误信息可以了解更多的细节
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注