[关闭]
@liyuj 2017-05-31T14:18:24.000000Z 字数 34765 阅读 4116

Apache-Ignite-2.0.0-中文开发手册

4.SQL网格

4.1.SQL网格

内存SQL网格为Apache Ignite提供了分布式内存数据库的功能,它水平可扩展,容错并且兼容SQL的ANSI-99标准。
SQL网格支持完整的DML命令,包括SELECT, UPDATE, INSERT, MERGE以及DELETE。

内存SQL网格使得开发者与Ignite的交互不仅仅可以使用原生的,面向Java、C++和.NET的开发API,还可以通过JDBC或者ODBC API使用标准的SQL命令,这提供了真正的语言层面的跨平台连接性,比如PHP,Ruby以及其他的。

4.2.分布式查询

4.2.1.摘要

Ignite支持任意的SQL查询,没有任何限制。SQL语法是ANSI-99兼容的,也就意味着作为SQL查询的一部分,规范定义的任何SQL函数、聚合、分组以及关联,都是可以使用的。
此外,查询是完全分布式的。SQL引擎的功能不仅仅是将查询映射到特定的节点然后将结果汇总为最终的结果集,它还可以将存储在不同缓存甚至是不同节点上的数据进行关联。此外,引擎是以容错的方式保证,不会因为新节点加入集群或者旧节点离开而获得不完整或者错误的结果。

4.2.2.SQL查询如何工作

Ignite的SQL网格组件是与H2数据库紧紧绑定在一起的,简而言之,H2是一个Java写的,遵循一组开源许可证,基于内存和磁盘的数据库。
ignite-indexing模块加入节点的类路径之后,一个嵌入式的H2数据库实例就会作为Ignite节点进程的一部分被启动。如果节点是在终端中通过ignite.sh{bat}脚本启动的,那么需要将{apache_ignite}\libs\optional\ignite-indexing目录拷贝到{apache_ignite}\libs\,如果使用的是maven,那么需要将如下的依赖加入pom.xml文件:

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-indexing</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

Ignite借用了H2的SQL查询解析器以及优化器还有执行计划器。最后,届时H2会在一个特定的节点执行本地化的查询(一个分布式查询会被映射到节点或者查询是以本地模式执行的),然后会将本地的结果集传递给分布式SQL引擎用于后续处理。
然而,数据和索引,通常是存储于Ignite数据网格端的,而Ignite以分布式以及容错的方式执行SQL查询,这个是H2不支持的。
Ignite SQL网格执行查询有两种方式:
首先,如果查询在一个部署有REPLICATED模式缓存的节点上执行,那么Ignite会假定所有的数据都是本地化的,然后将其直接传递给H2数据库引擎执行一个简单的本地化SQL查询,对于LOCAL模式的缓存,也是同样的执行流程。
第二,如果查询执行于PARTITIONED模式缓存,那么执行流程如下:

跨缓存查询的执行流程
跨缓存或者关联查询的执行流程与上面描述的分区缓存查询执行流程没什么不同,后面文档还会提到。
处理带有ORDER BY以及GROUP BY的结果集
带有ORDER BY语句的SQL查询不需要将所有结果集都加载到查询发起(汇总)节点来完成排序。而是查询映射的每个节点都会对自己那部分数据进行排序然后汇总节点以流的方式进行合并。
对于有序的GROUP BY查询也是同样的优化方式,不需要在将其返回给应用之前将所有数据加载到汇总节点用于分组。在Ignite中,来自单独节点的部分结果集可以被逐步地流化、合并、聚合以及返回给应用。

4.2.3.查询类型

在Java API层,通常有两种类型的SQL查询,分别为SqlQuerySqlFieldsQuery

替代APIs
Ignite内存SQL网格并不绑定到Java API,可以从.NET, C++通过 ODBC或者JDBC驱动连接到Ignite集群然后执行SQL查询。

SqlQuery
SqlQuery适用于查询执行完毕后需要获得存储于缓存(键和值)中的整个对象的场景,然后返回最终的结果集,下面的代码片段显示了在实践中如何实现:

  1. IgniteCache<Long, Person> cache = ignite.cache("personCache");
  2. SqlQuery sql = new SqlQuery(Person.class, "salary > ?");
  3. // Find all persons earning more than 1,000.
  4. try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs(1000))) {
  5. for (Entry<Long, Person> e : cursor)
  6. System.out.println(e.getValue().toString());
  7. }

SqlFieldsQuery
不需要查询整个对象,只需要指定几个特定的字段即可,这样可以最小化网络和序列化的开销。为此,Ignite实现了一个字段查询的概念。SqlFieldsQuery接受一个常规的ANSI-99 SQL查询作为它的构造器参数,然后像下面的示例那样立即执行:

  1. IgniteCache<Long, Person> cache = ignite.cache("personCache");
  2. // Execute query to get names of all employees.
  3. SqlFieldsQuery sql = new SqlFieldsQuery(
  4. "select concat(firstName, ' ', lastName) from Person");
  5. // Iterate over the result set.
  6. try (QueryCursor<List<?>> cursor = cache.query(sql) {
  7. for (List<?> row : cursor)
  8. System.out.println("personName=" + row.get(0));
  9. }

可查询字段定义
SqlQuerySqlFieldsQuery中的指定字段可以被访问之前,他们需要在POJO层面加上注解,或者在QueryEntity中进行定义,以便SQL引擎可以感知到它们,后续章节还会详述。
访问条目的键和值
在SQL查询中使用_key_val关键字,可以指向条目的整个键和值,而不用写每个字段,如果要在SQL查询执行的结果中返回键或值,也可以使用这两个关键字。
另外,如果键和值是基本类型(int, String, Date等),那么它会被自动地添加到查询的结果集中,比如:SELECT * FROM ...

4.2.4.跨缓存查询

作为单个SqlQuerySqlFieldsQuery查询的一部分,查询的数据可以来自多个缓存。这时,缓存名会扮演类似传统RDBMS中SQL查询的模式名的角色。缓存的名字,用于创建IgniteCache的实例,如果用于查询的话,会作为默认的模式名并且不需要显式地指定。其余的存储于不同缓存中的对象,也会被查询,但是需要加上它的缓存名(额外的模式名)作为前缀。

  1. // In this example, suppose Person objects are stored in a
  2. // cache named 'personCache' and Organization objects
  3. // are stored in a cache named 'orgCache'.
  4. IgniteCache<Long, Person> personCache = ignite.cache("personCache");
  5. // Select with join between Person and Organization to
  6. // get the names of all the employees of a specific organization.
  7. SqlFieldsQuery sql = new SqlFieldsQuery(
  8. "select Person.name "
  9. + "from Person as p, \"orgCache\".Organization as org where "
  10. + "p.orgId = org.id "
  11. + "and org.name = ?");
  12. // Execute the query and obtain the query result cursor.
  13. try (QueryCursor<List<?>> cursor = personCache.query(sql.setArgs("Ignite"))) {
  14. for (List<?> row : cursor)
  15. System.out.println("Person name=" + row.get(0));
  16. }

上面的示例中,会从personCache创建一个SqlFieldsQuery的实例,之后personCache会作为默认的模式名,这就是Person对象没有通过显式指定的模式名(from Person as p)就能访问的原因。而Organization对象,因为它存储于一个单独的名为orgCache的缓存中,所以在该查询中这个缓存的名字作为模式名必须显式地指定("orgCache".Organization as org)。

修改缓存名
如果希望使用不同于缓存名的模式名,可以通过调用CacheConfiguration.setSqlSchema(...)方法解决。

4.2.5.分布式关联

Ignite支持并置和非并置的分布式SQL关联,此外,如果数据位于不同的缓存,Ignite可以进行跨缓存的关联。

  1. IgniteCache<Long, Person> cache = ignite.cache("personCache");
  2. // SQL join on Person and Organization.
  3. SqlQuery sql = new SqlQuery(Person.class,
  4. "from Person as p, \"orgCache\".Organization as org"
  5. + "where p.orgId = org.id "
  6. + "and lower(org.name) = lower(?)");
  7. // Find all persons working for Ignite organization.
  8. try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs("Ignite"))) {
  9. for (Entry<Long, Person> e : cursor)
  10. System.out.println(e.getValue().toString());
  11. }

分区复制模式缓存之间的关联也可以无限制地进行。
然而,如果在至少两个分区模式的数据集之间进行关联,那么一定要确保要么关联的键是并置的,要么为查询开启了非并置关联参数,两种类型的分布式关联模式下面会详述。
分布式并置关联
默认情况下,如果一个SQL关联需要跨越多个Ignite缓存,那么所有的缓存都需要是并置的,否则,查询完成后会得到一个不完整的结果集,这是因为在关联阶段一个节点的可用数据只是本地的,如图1所示,首先,一个SQL查询会被发送到待关联数据所在的节点(Q),然后查询在每个节点的本地数据上立即执行(E(Q)),最后,所有的执行结果都会在客户端进行聚合(R)。

分布式非并置关联
虽然关系并置是一个强大的概念,即一旦配置了应用的业务实体(缓存),就可以以最优的方式执行跨缓存的关联,并且返回一个完整且一致的结果集。但还有一种可能就是,无法并置所有的数据,这时,就可能无法执行满足需求的所有SQL查询了。

在实践中不要过度使用基于非并置的分布式关联的方式,因为这种关联方式的性能差于基于关系并置的关联,因为要完成这个查询,要有更多的网络开销和节点间的数据移动。

当通过SqlQuery.setDistributedJoins(boolean)参数为一个SQL查询启用了非并置的分布式关联之后,查询映射的节点就会从远程节点通过发送广播或者单播请求的方式获取缺失的数据(本地不存在的数据),正如图2所示,有一个潜在的数据移动步骤(D(Q))。潜在的单播请求只会在关联在主键(缓存键)或者关系键上完成之后才会发送,因为执行关联的节点知道缺失数据的位置,其他所有的情况都会发送广播请求。

不管是广播还是单播请求,都是由一个节点发送到另一个节点来获取缺失的数据,是按照顺序执行的。SQL引擎会将所有的请求组成若干批量,这个批量的大小是由SqlQuery.setPageSize(int)参数管理的。

下面的代码片段是从Ignite的发行版的CacheQueryExample中提取的:

  1. IgniteCache<AffinityKey<Long>, Person> cache = ignite.cache("personCache");
  2. // SQL clause query with join over non-collocated data.
  3. String joinSql =
  4. "from Person, \"orgCache\".Organization as org " +
  5. "where Person.orgId = org.id " +
  6. "and lower(org.name) = lower(?)";
  7. SqlQuery qry = new SqlQuery<AffinityKey<Long>, Person>(Person.class, joinSql).setArgs("ApacheIgnite");
  8. // Enable distributed joins for the query.
  9. qry.setDistributedJoins(true);
  10. // Execute the query to find out employees for specified organization.
  11. System.out.println("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll());

要了解详细信息,可以参照非并置的分布式关联

查询复制缓存
如果只在复制缓存所在的数据上执行SQL查询,那么可以设置SqlQuery.setReplicatedOnly(...)true,这个给SQL引擎的特别提示会为查询产生更高效的执行计划。

4.2.6.已知的限制

事务性SQL
目前,SQL查询仅仅支持原子模式,意味着如果有一个事务已经提交了值A而值B正在提交过程中,然后如果有一个并行的SQL查询的话,会看到A而看不到B。

多版本并发控制(MVCC)
一旦Ignite SQL网格使用MVCC进行控制,SQL网格也会支持事务模式。

4.2.7.示例

关于本文描述的分布式关联如何使用的完整示例,会作为Ignite发行版的一部分进行分发,名为CacheQueryExampleGitHub上也有。

4.3.本地查询

有时,SQL网格中查询的执行会从分布式模式回落至本地模式,在本地模式中,查询会简单地传递至底层的H2引擎,他只会处理本地节点的数据集。
这些场景包括:

即使查询执行时网络拓扑发生变化(新节点加入集群或者老节点离开集群),前两个场景也会一直提供完整而一致的结果集。
然而,在应用显式开启本地模式的第三个场景中需要注意,原因是如果希望在部分节点的分区缓存上执行本地查询时网络还发生了变化,那么可能得到结果集的一部分,因为这时会触发一个并行的数据再平衡过程。SQL引擎无法处理这个特殊情况。如果仍然希望在分区缓存上执行本地查询,那么需要将查询作为affinityRun(...)或者affinityCall(...)方法的一部分。

4.4.分布式DML

4.4.1.摘要

Ignite SQL网格不仅仅可以使用ANSI-99语法的SQL在数据网格上查询数据,还可以使用众所周知的DML语句,比如INSERT、UPDATE或者DELETE修改数据。利用这个优势,依赖Ignite的SQL能力完全可以将其当做分布式内存数据库。

ANSI-99 SQL兼容
DML查询,和所有的SELECT查询一样,都是兼容ANSI-99 SQL标准的。

Ignite在内存中的数据都是以键-值对的形式存储的,因此所有和DML相关的操作都会被转换为相对应的基于键-值的缓存操作命令,比如cache.put(...)或者cache.invokeAll(...)。下面会深入地了解这些DML语句是如何实现的。

4.4.2.DML API

通常来说,所有的DML语句会被拆分为两组,一个是往缓存中添加条目(INSERTMERGE),还有就是修改已有的数据(UPDATEDELETE)。
要在Java中执行这些语句需要使用已有的用于SELECT查询的API - SqlFieldsQueryAPI,DML操作使用的API与只读查询是一致的,返回结果也是QueryCursor<List<?>>。唯一的不同是作为DML语句执行的结果,QueryCursor<List<?>>是只有一个long类型的单条目的List<?>,这个数值表示该DML语句影响的缓存条目的数量。而作为SELECT语句的结果,QueryCursor<List<?>>会包含一个从缓存获得的条目列表。

其他的API
DML API不受限于Java,也可以使用ODBC或者JDBC驱动接入Ignite集群,然后执行DML语句。

4.4.3.基本配置

在Ignite中要进行DML操作,需要使用基于QueryEntity的方式或者使用@QuerySqlField注解来配置所有可查询的字段,比如:
使用@QuerySqlField注解:

  1. public class Person {
  2. /** Field will be accessible from DML statements. */
  3. @QuerySqlField
  4. private final String firstName;
  5. /** Indexed field that will be accessible from DML statements. */
  6. @QuerySqlField (index = true)
  7. private final String lastName;
  8. /** Field will NOT be accessible from DML statements. */
  9. private int age;
  10. public Person(String firstName, String lastName) {
  11. this.firstName = firstName;
  12. this.lastName = lastName;
  13. }
  14. }

使用QueryEntity:

  1. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  2. <property name="name" value="personCache"/>
  3. <!-- Configure query entities -->
  4. <property name="queryEntities">
  5. <list>
  6. <bean class="org.apache.ignite.cache.QueryEntity">
  7. <!-- Registering key's class. -->
  8. <property name="keyType" value="java.lang.Long"/>
  9. <!-- Registering value's class. -->
  10. <property name="valueType"
  11. value="org.apache.ignite.examples.Person"/>
  12. <!--
  13. Defining fields that will be accessible from DML side
  14. -->
  15. <property name="fields">
  16. <map>
  17. <entry key="firstName" value="java.lang.String"/>
  18. <entry key="lastName" value="java.lang.String"/>
  19. </map>
  20. </property>
  21. <!--
  22. Defining which fields, listed above, will be treated as
  23. indexed fields as well.
  24. -->
  25. <property name="indexes">
  26. <list>
  27. <!-- Single field (aka. column) index -->
  28. <bean class="org.apache.ignite.cache.QueryIndex">
  29. <constructor-arg value="lastName"/>
  30. </bean>
  31. </list>
  32. </property>
  33. </bean>
  34. </list>
  35. </property>
  36. </bean>

除了通过@QuerySqlField加注的或者通过QueryEntity定义的所有字段,还有两个为每个在SQL网格中注册的对象类型预定义的字段_key_val,这几个预定义字段指向缓存中存储的对象的整个键和值,他们可以像下面这样在DML中直接使用:

  1. //Preparing cache configuration.
  2. CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>
  3. ("personCache");
  4. //Registering indexed/queryable types.
  5. cacheCfg.setIndexedTypes(Long.class, Person.class);
  6. //Starting the cache.
  7. IgniteCache<Long, Person> cache = ignite.cache(cacheCfg);
  8. // Inserting a new key-value pair referring to prefedined `_key` and `_value`
  9. // fields for Person type.
  10. cache.query(new SqlFieldsQuery("INSERT INTO Person(_key, _val) VALUES(?, ?)")
  11. .setArgs(1L, new Person("John", "Smith")));

如果倾向于处理具体的字段,而不是通过执行查询处理整个对象的值,可以执行下面这样的查询:

  1. IgniteCache<Long, Person> cache = ignite.cache(cacheCfg);
  2. cache.query(new SqlFieldsQuery(
  3. "INSERT INTO Person(_key, firstName, lastName) VALUES(?, ?, ?)").
  4. setArgs(1L, "John", "Smith"));

注意DML引擎会根据firstNamelastName重新创建一个Person对象,然后将其注入缓存,但是这些字段是需要通过QueryEntity或者@QuerySqlField注解进行定义的,就像上面描述的那样。

4.4.4.高级配置

自定义键
如果只使用预定义的SQL数据类型作为缓存键,那么就没必要对和DML相关的配置做额外的操作,这些数据类型在GridQueryProcessor#SQL_TYPES常量中进行定义,列举如下:

预定义SQL数据类型
1.所有的基本类型及其包装器,除了charCharacter
2.String;
3.BigDecimal;
4.byte[];
5.java.util.Date, java.sql.Date, java.sql.Timestamp;
6.java.util.UUID

然而,如果决定引入复杂的自定义缓存键,那么在DML语句中要指向这些字段就需要:

下面的例子展示了如何实现:
Java:

  1. // Preparing cache configuration.
  2. CacheConfiguration cacheCfg = new CacheConfiguration<>("personCache");
  3. // Creating the query entity.
  4. QueryEntity entity = new QueryEntity("CustomKey", "Person");
  5. // Listing all the queryable fields.
  6. LinkedHashMap<String, String> flds = new LinkedHashMap<>();
  7. flds.put("intKeyField", Integer.class.getName());
  8. flds.put("strKeyField", String.class.getName());
  9. flds.put("firstName", String.class.getName());
  10. flds.put("lastName", String.class.getName());
  11. entity.setFields(flds);
  12. // Listing a subset of the fields that belong to the key.
  13. Set<String> keyFlds = new HashSet<>();
  14. keyFlds.add("intKeyField");
  15. keyFlds.add("strKeyField");
  16. entity.setKeyFields(keyFlds);
  17. // End of new settings, nothing else here is DML related
  18. entity.setIndexes(Collections.<QueryIndex>emptyList());
  19. cacheCfg.setQueryEntities(Collections.singletonList(entity));
  20. ignite.createCache(cacheCfg);

XML:

  1. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  2. <property name="name" value="personCache"/>
  3. <!-- Configure query entities -->
  4. <property name="queryEntities">
  5. <list>
  6. <bean class="org.apache.ignite.cache.QueryEntity">
  7. <!-- Registering key's class. -->
  8. <property name="keyType" value="CustomKey"/>
  9. <!-- Registering value's class. -->
  10. <property name="valueType"
  11. value="org.apache.ignite.examples.Person"/>
  12. <!--
  13. Defining all the fields that will be accessible from DML.
  14. -->
  15. <property name="fields">
  16. <map>
  17. <entry key="firstName" value="java.lang.String"/>
  18. <entry key="lastName" value="java.lang.String"/>
  19. <entry key="intKeyField" value="java.lang.Integer"/>
  20. <entry key="strKeyField" value="java.lang.String"/>
  21. </map>
  22. </property>
  23. <!-- Defining the subset of key's fields -->
  24. <property name="keyFields">
  25. <set>
  26. <value>intKeyField<value/>
  27. <value>strKeyField<value/>
  28. </set>
  29. </property>
  30. </bean>
  31. </list>
  32. </property>
  33. </bean>

4.4.5.DML操作

MERGE
MERGE是一个非常简单的操作,因为它会被翻译成cache.put(...)或者cache.putAll(...),具体是哪一个,取决于MERGE语句涉及的要插入或者要更新的记录的数量。
下面的示例显示如何通过MERGE命令来更新数据集。一个是提供了条目列表,一个是通过执行子查询注入一个结果集。
MERGE(条目列表):

  1. cache.query(new SqlFieldsQuery("MERGE INTO Person(_key, firstName, lastName)" + "values (1, 'John', 'Smith'), (5, 'Mary', 'Jones')"));

MERGE(子查询):

  1. cache.query(new SqlFieldsQuery("MERGE INTO someCache.Person(_key, firstName, lastName) (SELECT _key + 1000, firstName, lastName " +
  2. "FROM anotherCache.Person WHERE _key > ? AND _key < ?)").setArgs(100, 200);

INSERT
MERGEINSERT命令的不同在于,后者添加的条目必须是缓存中不存在的。
如果要把一个键值对插入缓存,那么最后,INSERT语句会被转换为cache.putIfAbsent(...)操作,否则,如果插入的是多个键值对,那么DML引擎会为每个对创建一个EntryProcessor,然后使用cache.invokeAll(...)将数据注入缓存。
下面的示例显示如何通过INSERT命令插入一个数据集,一个是提供了条目列表,一个是通过执行子查询注入一个结果集。
INSERT(条目列表):

  1. cache.query(new SqlFieldsQuery("INSERT INTO Person(_key, firstName, " +
  2. "lastName) values (1, 'John', 'Smith'), (5, 'Mary', 'Jones')"));

INSERT(子查询):

  1. cache.query(new SqlFieldsQuery("INSERT INTO someCache.Person(_key, firstName, lastName) (SELECT _key + 1000, firstName, secondName " +
  2. "FROM anotherCache.Person WHERE _key > ? AND _key < ?)").setArgs(100, 200);

UPDATE
这个操作会更新缓存中的值的每个字段。
开始时,SQL引擎会根据UPDATE语句的WHERE条件生成并且执行一个SELECT查询,然后会修改满足条件的已有值。
修改的执行是利用cache.invokeAll(...)实现的。基本上来说,这意味着一旦SELECT查询的结果准备好,SQL引擎就会准备一定数量的EntryProcessors然后执行cache.invokeAll(...)操作,下一步,EntryProcessors修改完数据之后,会进行额外的检查来确保在SELECT和数据实际更新之间没有其他干扰。
下面这个简单示例显示了如何执行UPDATE语句。

  1. cache.query(new SqlFieldsQuery("UPDATE Person set lastName = ? " +
  2. "WHERE _key >= ?").setArgs("Jones", 2L));

UPDATE语句无法更新缓存键及其字段
原因是缓存键的状态决定了内部数据的布局及其一致性(键的哈希及其关系,索引完整性),所以目前除非先将其删除,否则无法更新缓存键。比如下面的查询:
UPDATE _key = 11 where _key = 10;
会导致下面的缓存操作:
val = get(10);
put(11, val);
remove(10);

DELETE
DELETE语句的执行也会被拆分为两个阶段,与UPDATE语句的执行类似。
首先,SQL引擎会使用SELECT语句来收集满足WHERE条件并且要被删除的缓存键,下一步,拿到这些键后,会准备一定数量的EntryProcessors然后执行cache.invokeAll(...)操作,当数据将被删除时,会进行额外的检查来确保在SELECT和数据实际删除之间没有其他干扰。
下面这个简单示例显示了如何执行DELETE语句。

  1. cache.query(new SqlFieldsQuery("DELETE FROM Person " +
  2. "WHERE _key >= ?").setArgs(2L));