[关闭]
@liyuj 2017-11-26T21:14:36.000000Z 字数 27842 阅读 10411

Apache-Ignite-2.3.0-中文开发手册

2.集群化

2.1.集群化

Ignite具有非常先进的集群能力,包括逻辑集群组和自动发现。
Ignite节点之间会自动发现对方,这有助于必要时扩展集群,而不需要重启整个集群。开发者可以利用Ignite的混合云支持,允许公有云(比如AWS)和私有云之间建立连接,向他们提供两者的好处。

2.2.集群API

2.2.1.IgniteCluster

集群的功能是通过IgniteCluster接口提供的,可以像下面这样从Ignite中获得一个IgniteCluster的实例:

  1. Ignite ignite = Ignition.ignite();
  2. IgniteCluster cluster = ignite.cluster();

通过IgniteCluster接口可以:

2.2.2.ClusterNode

ClusterNode接口具有非常简洁的API,他只处理集群中的节点,把它视为网络中的逻辑端点,它有一个唯一的ID,节点的元数据信息,静态属性集以及一些其他的参数。

2.2.3.集群节点属性

所有的集群节点在启动时都会自动地注册环境和系统的参数,把他们作为节点的属性,也可以通过配置添加自定义的节点属性。

  1. <bean class="org.apache.ignite.IgniteConfiguration">
  2. ...
  3. <property name="userAttributes">
  4. <map>
  5. <entry key="ROLE" value="worker"/>
  6. </map>
  7. </property>
  8. ...
  9. </bean>

下面的代码显示了如何获得设置了“worker”属性的节点:

  1. ClusterGroup workers = ignite.cluster().forAttribute("ROLE", "worker");
  2. Collection<ClusterNode> nodes = workers.nodes();

所有的节点属性都是通过ClusterNode.attribute("propertyName")方法获得的。

2.2.4.集群节点指标数据

Ignite自动收集集群中所有节点的指标数据,指标数据是在后台收集的并且被集群节点之间的每一次心跳消息交换所更新。
节点的指标数据是通过ClusterMetrics接口体现的,他包括至少50种指标(注意,同样的指标也可以用于集群组)。
下面是一个获取一些指标数据的示例,包括本地节点的平均CPU负载,已用堆大小:

  1. // Local Ignite node.
  2. ClusterNode localNode = cluster.localNode();
  3. // Node metrics.
  4. ClusterMetrics metrics = localNode.metrics();
  5. // Get some metric values.
  6. double cpuLoad = metrics.getCurrentCpuLoad();
  7. long usedHeap = metrics.getHeapMemoryUsed();
  8. int numberOfCores = metrics.getTotalCpus();
  9. int activeJobs = metrics.getCurrentActiveJobs();

2.2.5.本地集群节点

本地集群节点是ClusterNode的一个实例,表示当前的Ignite节点。
下面的例子显示如何获得本地节点:

  1. ClusterNode localNode = ignite.cluster().localNode();

2.3.集群组

ClusterGroup表示集群内节点的一个逻辑组。
从设计上讲,所有集群节点都是平等的,所以没有必要以一个特定的顺序启动任何节点,或者给他们赋予特定的规则。然而,Ignite可以因为一些应用的特殊需求而创建集群节点的逻辑组,比如,可能希望只在远程节点上部署一个服务,或者给部分worker节点赋予一个叫做‘worker’的规则来做作业的执行。

注意IgniteCluster接口也是一个集群组,只不过包括集群内的所有节点。

可以限制作业执行、服务部署、消息、事件以及其他任务只在部分集群组内执行,比如,下面这个例子只把作业广播到远程节点(除了本地节点):
Java8:

  1. final Ignite ignite = Ignition.ignite();
  2. IgniteCluster cluster = ignite.cluster();
  3. // Get compute instance which will only execute
  4. // over remote nodes, i.e. not this node.
  5. IgniteCompute compute = ignite.compute(cluster.forRemotes());
  6. // Broadcast to all remote nodes and print the ID of the node
  7. // on which this closure is executing.
  8. compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id());

Java7:

  1. final Ignite ignite = Ignition.ignite();
  2. IgniteCluster cluster = ignite.cluster();
  3. // Get compute instance which will only execute
  4. // over remote nodes, i.e. not this node.
  5. IgniteCompute compute = ignite.compute(cluster.forRemotes());
  6. // Broadcast closure only to remote nodes.
  7. compute.broadcast(new IgniteRunnable() {
  8. @Override public void run() {
  9. // Print ID of the node on which this runnable is executing.
  10. System.out.println(">>> Hello Node: " + ignite.cluster().localNode().id());
  11. }
  12. });

2.3.1.预定义集群组

可以基于任何谓词创建集群组,为了方便Ignite也提供了一些预定义的集群组。
下面的示例显示了ClusterGroup接口中定义的部分集群组:
远程节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // Cluster group with remote nodes, i.e. other than this node.
  3. ClusterGroup remoteGroup = cluster.forRemotes();

缓存节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // All nodes on which cache with name "myCache" is deployed,
  3. // either in client or server mode.
  4. ClusterGroup cacheGroup = cluster.forCache("myCache");
  5. // All data nodes responsible for caching data for "myCache".
  6. ClusterGroup dataGroup = cluster.forDataNodes("myCache");
  7. // All client nodes that access "myCache".
  8. ClusterGroup clientGroup = cluster.forClientNodes("myCache");

有属性的节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // All nodes with attribute "ROLE" equal to "worker".
  3. ClusterGroup attrGroup = cluster.forAttribute("ROLE", "worker");

随机节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // Cluster group containing one random node.
  3. ClusterGroup randomGroup = cluster.forRandom();
  4. // First (and only) node in the random group.
  5. ClusterNode randomNode = randomGroup.node();

主机节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // Pick random node.
  3. ClusterGroup randomNode = cluster.forRandom();
  4. // All nodes on the same physical host as the random node.
  5. ClusterGroup cacheNodes = cluster.forHost(randomNode);

最老的节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // Dynamic cluster group representing the oldest cluster node.
  3. // Will automatically shift to the next oldest, if the oldest
  4. // node crashes.
  5. ClusterGroup oldestNode = cluster.forOldest();

本地节点

  1. IgniteCluster cluster = ignite.cluster();
  2. // Cluster group with only this (local) node in it.
  3. ClusterGroup localGroup = cluster.forLocal();
  4. // Local node.
  5. ClusterNode localNode = localGroup.node();

客户端和服务端

  1. IgniteCluster cluster = ignite.cluster();
  2. // All client nodes.
  3. ClusterGroup clientGroup = cluster.forClients();
  4. // All server nodes.
  5. ClusterGroup serverGroup = cluster.forServers();

2.3.2.带节点属性的集群组

Ignite的唯一特点是所有节点都是平等的。没有master节点或者server节点,也没有worker节点或者client节点,按照Ignite的观点所有节点都是平等的。但是,可以将节点配置成master,worker,或者client以及data节点。
所有集群节点启动时都会自动将所有的环境和系统属性注册为节点的属性,但是也可以通过配置自定义节点属性。
XML:

  1. <bean class="org.apache.ignite.IgniteConfiguration">
  2. ...
  3. <property name="userAttributes">
  4. <map>
  5. <entry key="ROLE" value="worker"/>
  6. </map>
  7. </property>
  8. ...
  9. </bean>

Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. Map<String, String> attrs = Collections.singletonMap("ROLE", "worker");
  3. cfg.setUserAttributes(attrs);
  4. // Start Ignite node.
  5. Ignite ignite = Ignition.start(cfg);

启动时,所有的环境变量和系统属性都会自动地注册为节点属性。
节点属性是通过ClusterNode.attribute("propertyName")属性获得的。

下面的例子显示了如何获得赋予了‘worker’属性值的节点:

  1. IgniteCluster cluster = ignite.cluster();
  2. ClusterGroup workerGroup = cluster.forAttribute("ROLE", "worker");
  3. Collection<GridNode> workerNodes = workerGroup.nodes();

2.3.3.自定义集群组

可以基于一些谓词定义动态集群组,这个集群组只会包含符合该谓词的节点。
下面是一个例子,一个集群组只会包括CPU利用率小于50%的节点,注意这个组里面的节点会随着CPU负载的变化而改变。

  1. IgniteCluster cluster = ignite.cluster();
  2. // Nodes with less than 50% CPU load.
  3. ClusterGroup readyNodes = cluster.forPredicate((node) -> node.metrics().getCurrentCpuLoad() < 0.5);

2.3.4.集群组组合

可以通过彼此之间的嵌套来组合集群组,比如,下面的代码片段显示了如何通过组合最老组和远程组来获得最老的远程节点:

  1. // Group containing oldest node out of remote nodes.
  2. ClusterGroup oldestGroup = cluster.forRemotes().forOldest();
  3. ClusterNode oldestNode = oldestGroup.node();

2.3.5.从集群组中获得节点

可以像下面这样获得各种集群组的节点:

  1. ClusterGroup remoteGroup = cluster.forRemotes();
  2. // All cluster nodes in the group.
  3. Collection<ClusterNode> grpNodes = remoteGroup.nodes();
  4. // First node in the group (useful for groups with one node).
  5. ClusterNode node = remoteGroup.node();
  6. // And if you know a node ID, get node by ID.
  7. UUID myID = ...;
  8. node = remoteGroup.node(myId);

2.3.6.集群组指标数据

Ignite自动收集所有集群节点的指标数据,很酷的事是集群组会自动地收集组内所有节点的指标数据,然后提供组内正确的平均值,最小值,最大值等信息。
集群组指标数据是通过ClusterMetrics接口体现的,他包括了超过50个指标(注意,同样的指标单独的集群节点也有)。
下面的例子是获取一些指标数据,包括所有远程节点的平均CPU利用率以及可用堆大小:

  1. // Cluster group with remote nodes, i.e. other than this node.
  2. ClusterGroup remoteGroup = ignite.cluster().forRemotes();
  3. // Cluster group metrics.
  4. ClusterMetrics metrics = remoteGroup.metrics();
  5. // Get some metric values.
  6. double cpuLoad = metrics.getCurrentCpuLoad();
  7. long usedHeap = metrics.getHeapMemoryUsed();
  8. int numberOfCores = metrics.getTotalCpus();
  9. int activeJobs = metrics.getCurrentActiveJobs();

2.4.领导者选举

2.4.1.摘要

当工作在分布式环境中时,有时需要确保有这么一个节点,不管网络是否发生变化,这个节点通常被叫做leader(领导者)
很多系统选举领导者通常要处理数据一致性,然后通常是通过收集集群成员的选票处理的。而在Ignite中,数据一致性是通过数据网格的类似功能处理的(Rendezvous Hashing或者HRW哈希),选择领导者在传统意义上的数据一致性,在数据网格以外就不是真的需要了。
然而,可能还是希望有一个协调员节点来处理某些任务,为了这个,Ignite允许在集群中自动地选择最老的或者最新的节点。

使用服务网格
注意对于大多数领导者或者类单例用例中,建议使用服务网格功能,他可以自动地部署各个集群单例服务,而且更易于使用。

2.4.2.最老的节点

每当新节点加入时,最老的节点都有一个保持不变的属性,集群中的最老节点唯一发生变化的时间点就是它从集群中退出或者该节点故障。
下面的例子显示了如何选择一个集群组,他只包含了最老的节点。

  1. IgniteCluster cluster = ignite.cluster();
  2. // Dynamic cluster group representing the oldest cluster node.
  3. // Will automatically shift to the next oldest, if the oldest
  4. // node crashes.
  5. ClusterGroup oldestNode = cluster.forOldest();

2.4.3.最新的节点

最新的节点,与最老的节点不同,每当新节点加入集群时都会不断发生变化,然而,有时他也会变得很灵活,尤其是如果希望只在最新的节点上执行一些任务时。
下面的例子显示了如何选择一个集群组,他只包含了最新的节点。

  1. IgniteCluster cluster = ignite.cluster();
  2. // Dynamic cluster group representing the youngest cluster node.
  3. // Will automatically shift to the next youngest, if the youngest
  4. // node crashes.
  5. ClusterGroup youngestNode = cluster.forYoungest();

一旦获得了集群组,就可以用它执行任务、部署服务、发送消息等。

2.5.集群配置

2.5.1.摘要

Ignite中,通过DiscoverySpi节点可以彼此发现对方,Ignite提供了TcpDiscoverySpi作为DiscoverySpi的默认实现,它使用TCP/IP来作为节点发现的实现,可以配置成基于组播的或者基于静态IP的。

2.5.2.基于组播的发现

TcpDiscoveryMulticastIpFinder使用组播来发现网格内的每个节点。他也是默认的IP搜索器。除非打算覆盖默认的设置否则不需要指定他。
下面的例子显示了如何通过Spring XML配置文件或者通过Java代码编程式地进行配置:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="discoverySpi">
  4. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  5. <property name="ipFinder">
  6. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
  7. <property name="multicastGroup" value="228.10.10.157"/>
  8. </bean>
  9. </property>
  10. </bean>
  11. </property>
  12. </bean>

Java:

  1. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  2. TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
  3. ipFinder.setMulticastGroup("228.10.10.157");
  4. spi.setIpFinder(ipFinder);
  5. IgniteConfiguration cfg = new IgniteConfiguration();
  6. // Override default discovery SPI.
  7. cfg.setDiscoverySpi(spi);
  8. // Start Ignite node.
  9. Ignition.start(cfg);

2.5.3.基于静态IP的发现

对于组播被禁用的情况,TcpDiscoveryVmIpFinder会使用预配置的IP地址列表。
唯一需要提供的就是至少一个远程节点的IP地址,但是为了保证冗余一个比较好的做法是在未来的某些时间点提供2-3个计划启动的网格节点的IP地址。只要建立了与任何一个已提供的IP地址的连接,Ignite就会自动地发现其他的所有节点。

TcpDiscoveryVmIpFinder默认用的是非共享模式,如果希望启动一个服务端节点,那么在该模式中的IP地址列表同时也要包含本地节点的一个IP地址。当其他节点加入集群过程中时,他会使该节点不用等待而是成为集群的第一个节点,并且正常运行。

下面的例子显示了如何通过Spring XML配置文件或者通过Java代码编程式地进行配置:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="discoverySpi">
  4. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  5. <property name="ipFinder">
  6. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
  7. <property name="addresses">
  8. <list>
  9. <!--
  10. Explicitly specifying address of a local node to let it start and operate normally even if there is no more nodes in the cluster.
  11. You can also optionally specify an individual port or port range.
  12. -->
  13. <value>1.2.3.4</value>
  14. <!--
  15. IP Address and optional port range of a remote node.
  16. You can also optionally specify an individual port and don't set the port range at all.
  17. -->
  18. <value>1.2.3.5:47500..47509</value>
  19. </list>
  20. </property>
  21. </bean>
  22. </property>
  23. </bean>
  24. </property>
  25. </bean>

Java:

  1. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  2. TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
  3. // Set initial IP addresses.
  4. // Note that you can optionally specify a port or a port range.
  5. ipFinder.setAddresses(Arrays.asList("1.2.3.4", "1.2.3.5:47500..47509"));
  6. spi.setIpFinder(ipFinder);
  7. IgniteConfiguration cfg = new IgniteConfiguration();
  8. // Override default discovery SPI.
  9. cfg.setDiscoverySpi(spi);
  10. // Start Ignite node.
  11. Ignition.start(cfg);

2.5.4.基于组播和静态IP的发现

可以同时使用基于组播和静态IP的发现,这种情况下,除了通过组播接受地址以外,如果有的话,TcpDiscoveryMulticastIpFinder也可以与预配置的静态IP地址列表一起工作,就像上面描述的基于静态IP的发现一样。
下面的例子显示了如何配置使用了静态IP地址的组播IP搜索器。
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="discoverySpi">
  4. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  5. <property name="ipFinder">
  6. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
  7. <property name="multicastGroup" value="228.10.10.157"/>
  8. <!-- list of static IP addresses-->
  9. <property name="addresses">
  10. <list>
  11. <value>1.2.3.4</value>
  12. <!--
  13. IP Address and optional port range.
  14. You can also optionally specify an individual port.
  15. -->
  16. <value>1.2.3.5:47500..47509</value>
  17. </list>
  18. </property>
  19. </bean>
  20. </property>
  21. </bean>
  22. </property>
  23. </bean>

Java:

  1. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  2. TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
  3. // Set Multicast group.
  4. ipFinder.setMulticastGroup("228.10.10.157");
  5. // Set initial IP addresses.
  6. // Note that you can optionally specify a port or a port range.
  7. ipFinder.setAddresses(Arrays.asList("1.2.3.4", "1.2.3.5:47500..47509"));
  8. spi.setIpFinder(ipFinder);
  9. IgniteConfiguration cfg = new IgniteConfiguration();
  10. // Override default discovery SPI.
  11. cfg.setDiscoverySpi(spi);
  12. // Start Ignite node.
  13. Ignition.start(cfg);

2.5.5.在同一个机器组中隔离Ignite集群

Ignite可以在同一组机器中启动两个隔离的集群,对于TcpDiscoverySpiTcpCommunicationSpi,不同集群的节点使用不交叉的本地端口范围就可以了。
为了测试,假设需要在一台机器上启动两个互相隔离的集群,那么对于第一个集群的节点,需要使用如下的TcpDiscoverySpiTcpCommunicationSpi配置:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <!--
  4. Explicitly configure TCP discovery SPI to provide list of
  5. initial nodes from the first cluster.
  6. -->
  7. <property name="discoverySpi">
  8. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  9. <!-- Initial local port to listen to. -->
  10. <property name="localPort" value="48500"/>
  11. <!-- Changing local port range. This is an optional action. -->
  12. <property name="localPortRange" value="20"/>
  13. <!-- Setting up IP finder for this cluster -->
  14. <property name="ipFinder">
  15. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
  16. <property name="addresses">
  17. <list>
  18. <!--
  19. Addresses and port range of nodes from
  20. the first cluster.
  21. 127.0.0.1 can be replaced with actual IP addresses
  22. or host names. Port range is optional.
  23. -->
  24. <value>127.0.0.1:48500..48520</value>
  25. </list>
  26. </property>
  27. </bean>
  28. </property>
  29. </bean>
  30. </property>
  31. <!--
  32. Explicitly configure TCP communication SPI changing local
  33. port number for the nodes from the first cluster.
  34. -->
  35. <property name="communicationSpi">
  36. <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
  37. <property name="localPort" value="48100"/>
  38. </bean>
  39. </property>
  40. </bean>

Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. // Explicitly configure TCP discovery SPI to provide list of initial nodes
  3. // from the first cluster.
  4. TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
  5. // Initial local port to listen to.
  6. discoverySpi.setLocalPort(48500);
  7. // Changing local port range. This is an optional action.
  8. discoverySpi.setLocalPortRange(20);
  9. TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
  10. // Addresses and port range of the nodes from the first cluster.
  11. // 127.0.0.1 can be replaced with actual IP addresses or host names.
  12. // The port range is optional.
  13. ipFinder.setAddresses(Arrays.asList("127.0.0.1:48500..48520"));
  14. // Overriding IP finder.
  15. discoverySpi.setIpFinder(ipFinder);
  16. // Explicitly configure TCP communication SPI by changing local port number for
  17. // the nodes from the first cluster.
  18. TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
  19. commSpi.setLocalPort(48100);
  20. // Overriding discovery SPI.
  21. cfg.setDiscoverySpi(discoverySpi);
  22. // Overriding communication SPI.
  23. cfg.setCommunicationSpi(commSpi);
  24. // Starting a node.
  25. Ignition.start(cfg);

而对于第二个集群的节点,配置看起来是这样的:
XML:

  1. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  2. <!--
  3. Explicitly configure TCP discovery SPI to provide list of initial
  4. nodes from the second cluster.
  5. -->
  6. <property name="discoverySpi">
  7. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  8. <!-- Initial local port to listen to. -->
  9. <property name="localPort" value="49500"/>
  10. <!-- Changing local port range. This is an optional action. -->
  11. <property name="localPortRange" value="20"/>
  12. <!-- Setting up IP finder for this cluster -->
  13. <property name="ipFinder">
  14. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
  15. <property name="addresses">
  16. <list>
  17. <!--
  18. Addresses and port range of the nodes from the second cluster.
  19. 127.0.0.1 can be replaced with actual IP addresses or host names. Port range is optional.
  20. -->
  21. <value>127.0.0.1:49500..49520</value>
  22. </list>
  23. </property>
  24. </bean>
  25. </property>
  26. </bean>
  27. </property>
  28. <!--
  29. Explicitly configure TCP communication SPI changing local port number
  30. for the nodes from the second cluster.
  31. -->
  32. <property name="communicationSpi">
  33. <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
  34. <property name="localPort" value="49100"/>
  35. </bean>
  36. </property>
  37. </bean>

Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. // Explicitly configure TCP discovery SPI to provide list of initial nodes
  3. // from the second cluster.
  4. TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
  5. // Initial local port to listen to.
  6. discoverySpi.setLocalPort(49500);
  7. // Changing local port range. This is an optional action.
  8. discoverySpi.setLocalPortRange(20);
  9. TcpDiscoveryVmIpFinder ipFinder=new TcpDiscoveryVmIpFinder();
  10. // Addresses and port range of the nodes from the second cluster.
  11. // 127.0.0.1 can be replaced with actual IP addresses or host names.
  12. // The port range is optional.
  13. ipFinder.setAddresses(Arrays.asList("127.0.0.1:49500..49520"));
  14. // Overriding IP finder.
  15. discoverySpi.setIpFinder(ipFinder);
  16. // Explicitly configure TCP communication SPI by changing local port number for
  17. // the nodes from the second cluster.
  18. TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
  19. commSpi.setLocalPort(49100);
  20. // Overriding discovery SPI.
  21. cfg.setDiscoverySpi(discoverySpi);
  22. // Overriding communication SPI.
  23. cfg.setCommunicationSpi(commSpi);
  24. // Starting a node.
  25. Ignition.start(cfg);

从配置中可以看到,他们的差别是很小的 - 只是SPI的端口号和IP搜索器不同。

如果希望不同集群的节点之间可以互相检索到,可以使用组播协议然后将TcpDiscoveryVmIpFinder替换为TcpDiscoveryMulticastIpFinder并且在上面的每个配置中设置唯一的TcpDiscoveryMulticastIpFinder.multicastGroups

Ignite持久化的文件位置
如果隔离的集群使用了Ignite持久化,那么在文件系统中每个集群会将持久化文件保存在不同的路径中。通过DataStorageConfiguration中的setStoragePath(...)setWalPath(...)setWalArchivePath(...)方法可以针对每个单独的集群进行修改。

2.5.6.基于JDBC的发现

可以用数据库作为通用共享存储来保存初始的IP地址,通过这个搜索器这些节点会在启动时将IP地址写入数据库,这是通过TcpDiscoveryJdbcIpFinder实现的。
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="discoverySpi">
  4. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  5. <property name="ipFinder">
  6. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder">
  7. <property name="dataSource" ref="ds"/>
  8. </bean>
  9. </property>
  10. </bean>
  11. </property>
  12. </bean>
  13. <!-- Configured data source instance. -->
  14. <bean id="ds" class="some.Datasource">
  15. ...
  16. </bean>

Java:

  1. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  2. // Configure your DataSource.
  3. DataSource someDs = MySampleDataSource(...);
  4. TcpDiscoveryJdbcIpFinder ipFinder = new TcpDiscoveryJdbcIpFinder();
  5. ipFinder.setDataSource(someDs);
  6. spi.setIpFinder(ipFinder);
  7. IgniteConfiguration cfg = new IgniteConfiguration();
  8. // Override default discovery SPI.
  9. cfg.setDiscoverySpi(spi);
  10. // Start Ignite node.
  11. Ignition.start(cfg);

2.5.7.基于共享文件系统的发现

一个共享文件系统可以用于节点IP地址的存储,节点会在启动时将他们的IP地址写入文件系统,这样的行为是由TcpDiscoverySharedFsIpFinder支持的。
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="discoverySpi">
  4. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  5. <property name="ipFinder">
  6. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder">
  7. <property name="path" value="/var/ignite/addresses"/>
  8. </bean>
  9. </property>
  10. </bean>
  11. </property>
  12. </bean>

Java:

  1. // Configuring discovery SPI.
  2. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  3. // Configuring IP finder.
  4. TcpDiscoverySharedFsIpFinder ipFinder = new TcpDiscoverySharedFsIpFinder();
  5. ipFinder.setPath("/var/ignite/addresses");
  6. spi.setIpFinder(ipFinder);
  7. IgniteConfiguration cfg = new IgniteConfiguration();
  8. // Override default discovery SPI.
  9. cfg.setDiscoverySpi(spi);
  10. // Start Ignite node.
  11. Ignition.start(cfg);

2.5.8.基于ZooKeeper的发现

如果使用ZooKeeper 来整合分布式环境,也可以利用它进行Ignite节点的发现,这是通过TcpDiscoveryZookeeperIpFinder实现的(注意需要启用ignite-zookeeper模块)。
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="discoverySpi">
  4. <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
  5. <property name="ipFinder">
  6. <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.zk.TcpDiscoveryZookeeperIpFinder">
  7. <property name="zkConnectionString" value="127.0.0.1:2181"/>
  8. </bean>
  9. </property>
  10. </bean>
  11. </property>
  12. </bean>

Java:

  1. TcpDiscoverySpi spi = new TcpDiscoverySpi();
  2. TcpDiscoveryZookeeperIpFinder ipFinder = new TcpDiscoveryZookeeperIpFinder();
  3. // Specify ZooKeeper connection string.
  4. ipFinder.setZkConnectionString("127.0.0.1:2181");
  5. spi.setIpFinder(ipFinder);
  6. IgniteConfiguration cfg = new IgniteConfiguration();
  7. // Override default discovery SPI.
  8. cfg.setDiscoverySpi(spi);
  9. // Start Ignite node.
  10. Ignition.start(cfg);

2.5.9.故障检测超时

故障检测超时用于确定一个集群节点在与远程节点连接失败时可以等待多长时间。
集群中的每个节点都是与其他节点连接在一起的,在发现SPI这个层级,NodeA会向NodeB发送心跳消息(还有其它在集群内传输的系统消息),如果后者在failureDetectionTimeout指定的时间范围内没有反馈,那么NodeB会被从集群中踢出。
根据集群的硬件和网络条件,这个超时时间是调整发现SPI的故障检测功能的最简单的方式。

这些TcpDiscoverySpi的超时配置参数是自动控制的,比如套接字超时,消息确认超时以及其他的,如果显式地设置了这些参数中的任意一个,故障超时设置都会被忽略掉。

关于故障检测超时的配置,对于服务端节点是通过IgniteConfiguration.setFailureDetectionTimeout(long)方法配置的,对于客户端节点是通过IgniteConfiguration.setClientFailureDetectionTimeout(long)方法配置的。关于默认值,服务端节点为10秒,客户端节点为30秒,这个时间可以使发现SPI在大多数的私有和虚拟化环境下可靠地工作,但是对于一个稳定的低延迟网络来说,这个参数设置成大约200毫秒会更有助于快速地进行故障的检测和响应。

2.5.10.配置

下面的配置参数可以对TcpDiscoverySpi进行可选的配置,在TcpDiscoverySpi的javadoc中还可以看到完整的配置参数列表:

setter方法 描述 默认值
setIpFinder(TcpDiscoveryIpFinder) 用于节点IP地址信息共享的IP搜索器 TcpDiscoveryMulticastIpFinder,部分实现如下:TcpDiscoverySharedFsIpFinder,TcpDiscoveryS3IpFinder,TcpDiscoveryJdbcIpFinder,TcpDiscoveryVmIpFinder
setLocalAddress(String) 设置发现SPI使用的本地主机IP地址 如果未提供,默认会使用发现的第一个非loopback地址,如果没有可用的非loopback地址,那么会使用java.net.InetAddress.getLocalHost()
setLocalPort(int) SPI监听端口 47500
setLocalPortRange(int) 本地端口范围,本地节点会试图绑定从localPort开始的第一个可用的端口,直到localPort+localPortRange 100
setReconnectCount(int) 节点与其他节点试图(重新)建立连接的次数 2
setNetworkTimeout(long) 用于网络操作的最大超时时间 5000
setSocketTimeout(long) 设置Socket操作超时时间,这个超时时间用于限制连接时间以及写Socket时间 2000
setAckTimeout(long) 设置收到发送消息的确认的超时时间,如果在这个时间段内未收到确认,发送会被认为失败然后SPI会试图重新发送消息 2000
setJoinTimeout(long) 设置加入超时时间,如果使用了非共享的IP搜索器然后节点通过IP搜索器无法与任何地址建立连接,节点会在这个时间段内仍然试图加入集群。如果所有地址仍然无响应,会抛出异常然后节点启动失败,0意味着一直等待 0
setThreadPriority(int) SPI启动的线程的线程优先级 0
setStatisticsPrintFrequency(int) 统计输出的频率(毫秒),0意味着不需要输出。如果值大于0那么日志就会激活,然后每隔一段时间就会以INFO级别输出一个状态,这对于跟踪网络拓扑的问题非常有用。 0

2.6.零部署

2.6.1.摘要

计算所需的闭包和任务可能是任意自定义的类,也包括匿名类。Ignite中,远程节点会自动感知这些类,不需要显式地将任何jar文件部署或者移动到任何远程节点上。

2.6.2.对等类加载

这个行为是通过对等类加载(P2P类加载)实现的,他是Ignite中的一个特别的分布式类加载器,实现了节点间的字节码交换。当对等类加载启用时,不需要在网格内的每个节点上手工地部署Java或者Scala代码,也不需要每次在发生变化时重新部署。
下面的代码由于对等类加载会在所有的远程节点上运行,不需要任何的显式部署步骤:

  1. IgniteCluster cluster = ignite.cluster();
  2. // Compute instance over remote nodes.
  3. IgniteCompute compute = ignite.compute(cluster.forRemotes());
  4. // Print hello message on all remote nodes.
  5. compute.broadcast(() -> System.out.println("Hello node: " + cluster.localNode().id()));

下面是对等类加载如何配置:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <!-- Explicitly enable peer class loading. -->
  4. <property name="peerClassLoadingEnabled" value="true"/>
  5. ...
  6. </bean>

Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. cfg.setPeerClassLoadingEnabled(true);
  3. // Start Ignite node.
  4. Ignite ignite = Ignition.start(cfg);

对等类加载的工作步骤如下:

  1. Ignite会检查类是否在本地CLASSPATH中有效(是否在系统启动时加载),如果有效,就会被返回。这时不会发生从对等节点加载类的行为。
  2. 如果类本地不可用,会向初始节点发送一个提供类定义的请求,初始节点会发送类字节码定义然后类会在工作节点上加载。这个每个类只会发生一次-一旦一个节点上一个类定义被加载了,他就不会再次加载了。

第三方库
当使用对等类加载时,会发现可能从对等节点加载库,还可能本地类路径就已经存在库。建议在每个节点的类路径里包含所有的第三方库,这可以通过将jar文件复制到Ignite的libs文件夹实现,这样就可以避免每次只改变了一行代码然后还需要向远程节点上传输若干M的第三方库文件。

2.6.3.显式部署

要在Ignite中显式地部署jar文件,可以将他们拷贝进每个集群节点的libs文件夹,Ignite会在启动时自动加载所有的libs文件夹中的jar文件。

2.7.部署模式

2.7.1.摘要

对等类加载行为的特性是由不同的部署模式控制的。特别地,当发起节点离开网格时的卸载行为也会依赖于部署模式。另一方面,通过部署模式,是用户资源管理和类版本管理。在下面的章节中会更详细地描述每个部署模式。
PRIVATE和ISOLATED
在主节点,同一个类加载器部署的类,还会在worker节点远程共享同一个类加载器。然而,从不同主节点部署的任务不会在worker节点共享同一个类加载器,这对于开发很有用,这时不同的开发者可以工作于同一个类的不同版本上。
自从@UserResource注解删除后,PRIVATE和ISOLATED部署模式就没有不同了。这两个常量都因为后向兼容的原因保留了,并且这两个之一可能在未来的大版本中被删除。
这个模式中,当主节点离开集群时,类会卸载。
SHARED
这是默认的部署模式。这个模式中,来自不同主节点的、用户版本相同的类会在worker节点上共享同一个类加载器。当所有主节点离开网格或者用户版本发生变化时,类会卸载。这个模式可以使来自不同主节点的类在远程节点上共享用户资源的同一个实例(见下面)。这个模式对于产品环境特别有用,与ISOLATED模式相比,它在单一主节点上有一个单一类加载器的作用域,SHARED模式会向所有主节点扩展部署作用域。
这个模式中,当所有的主节点离开集群时,类会卸载。
CONTINUOUS
CONTINUOUS模式中,当主节点离开网格时类不会卸载。卸载只会发生于类的用户版本发生变化时。这个方式的优势是可以使来自不同主节点的任务在worker节点共享同一个用户资源的实例(参见资源注入),这使得在worker节点上执行的所有任务可以复用,比如,连接池或者缓存的同一个实例。当用这个模式时,可以启动多个独立的worker节点,在主节点定义用户资源并且在worker节点上初始化一次,不管他们来自那个主节点。与ISOLATED部署模式相比,它在单一主节点上有一个单一类加载器的作用域,CONTINUOUS模式会向所有主节点扩展部署作用域,这对于产品模式非常有用。
这个模式中,即使所有的主节点离开集群,类都不会卸载。

2.7.2.卸载和用户版本

通过对等类加载获得的类定义,有他们自己的生命周期。在特定的事件中(当主节点离开或者用户版本变化,依赖于部署模式),类信息会从集群中卸载,类定义会从网格中的所有节点和用户资源抹掉,与该类链接的,也会有选择地抹掉(还是依赖于部署模式)。对于内存数据网格,还意味着一个卸载的类的所有缓存条目都会从缓存删除。然而,如果使用了二进制编组器,后者并不适用,它允许以二进制的形式存储缓存数据来避免从一个主节点加载条目的必要性。
当部署于SHAREDCONTINUOUS模式时,如果想重新部署类,用户版本来了。Ignite默认会自动检测类加载器是否改变或者一个节点是否重新启动。然而,如果希望在节点的一个子集上改变或者重新部署代码,或者在CONTINUOUS模式中,杀掉所有的现存部署,那么需要修改用户版本。
用户版本是在类路径的META-INF/ignite.xml中指定的,像下面这样:

  1. <!-- User version. -->
  2. <bean id="userVersion" class="java.lang.String">
  3. <constructor-arg value="0"/>
  4. </bean>

所有的Ignite启动脚本(ignite.sh或者ignite.bat)默认都会从IGNITE_HOME/config/userversion文件夹获取用户版本。通常,在这个文件夹中更新用户版本就够了,然而,当使用GAR或者JAR部署时,需要记得提供一个META-INF/ignite.xml文件,里面有期望的用户版本。

2.7.3.配置

下面的对于对等类加载的配置参数可以在IgniteConfiguration中进行可选的配置:

setter方法 描述 默认值
setPeerClassLoadingEnabled(boolean) 启用/禁用对等类加载 false
setPeerClassLoadingExecutorService(ExecutorService) 配置对等类加载使用的线程池,如果未配置,会使用一个默认的。 null
setPeerClassLoadingExecutorServiceShutdown(boolean) 对等类加载ExecutorService关闭标志,如果该标志设置为true,对等类加载线程池当节点停止时会强制关闭。 true
setPeerClassLoadingLocalClassPathExclude(String...) 系统类路径的包列表,即使他们在本地存在,P2P也不会加载。 null
setPeerClassLoadingMissedResourcesCacheSize(int) 错过的资源缓存的大小,设为0会避免错过的资源缓存。 100
setDeploymentMode(DeploymentMode) 为部署的类和任务设置部署模式。 SHARED

XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. <!--
  3. Explicitly enable peer class loading. Set to false
  4. to disable the feature.
  5. -->
  6. <property name="peerClassLoadingEnabled" value="true"/>
  7. <!-- Set deployment mode. -->
  8. <property name="deploymentMode" value="CONTINUOUS"/>
  9. <!-- Disable missed resources caching. -->
  10. <property name="peerClassLoadingMissedResourcesCacheSize" value="0"/>
  11. <!--
  12. Exclude force peer class loading of a class,
  13. even if exists locally.
  14. -->
  15. <property name="peerClassLoadingLocalClassPathExclude">
  16. <list>
  17. <value>com.mycompany.MyChangingClass</value>
  18. </list>
  19. </property>
  20. </bean>

Java:

  1. IgniteConfiguration cfg=new IgniteConfiguration();
  2. // Explicitly enable peer class loading.
  3. cfg.setPeerClassLoadingEnabled(true);
  4. // Set deployment mode.
  5. cfg.setDeploymentMode(DeploymentMode.CONTINUOUS);
  6. // Disable missed resource caching.
  7. cfg.setPeerClassLoadingMissedResourcesCacheSize(0);
  8. // Exclude force peer class loading of a class,
  9. // even if it exists locally.
  10. cfg.setPeerClassLoadingLocalClassPathExclude("com.mcompany.MyChangingClass");
  11. // Start a node.
  12. Ignition.start(cfg);

2.8.网络配置

2.8.1.TcpCommunicationSpi

CommunicationSpi为发送和接收网格消息提供了基本的管道,他也被用于所有的分布式网格操作,比如执行任务,监控数据交换,分布式事件查询以及其他的。Ignite提供了TcpCommunicationSpi作为CommunicationSpi的默认实现,它使用TCP/IP协议来进行节点间的通信。
要启用节点间的通信,TcpCommunicationSpi增加了TcpCommuncationSpi.ATTR_ADDRSTcpCommuncationSpi.ATTR_PORT本地节点属性。启动时,这个SPI会监听由TcpCommuncationSpi.setLocalPort(int)方法设置的本地端口。如果端口被占用,SPI会自动增加端口号直到成功绑定监听。
TcpCommuncationSpi.setLocalPortRange(int)配置参数控制了SPI可以尝试的最大端口数量。

本地端口范围
当在一台机器上甚至是在同一个JVM上启动多个网格节点时,端口范围会非常方便,这样的话所有的节点都会启动而不用一个个地进行单独的配置。

2.8.2.配置

下面TcpCommunicationSpi中的配置参数都是可选的:

方法 描述 默认值
setLocalAddress(String) 设置套接字绑定的本地主机地址 任意有效的本地主机地址
setLocalPort(int) 设置套接字绑定的本地主机端口 47100
setLocalPortRange(int) 当之前尝试的所有端口都被占用时,控制尝试的本地端口的最大数量。 100
setTcpNoDelay(boolean) 设置套接字选项TCP_NODELAY的值,每个创建或者接收的套接字都会使用这个值,它应该设置为true(默认),以减少通过TCP协议进行通讯期间请求/响应的时间。大多数情况下不建议改变这个选项 true
setConnectTimeout(long) 设置当与远程节点建立连接时使用的连接超时时间。 1000
setIdleConnectionTimeout(long) 设置当与客户端的连接将要关闭时,最大空闲连接超时时间。 30000
setBufferSizeRatio(double) 设置这个SPI的缓冲区大小比率,当发送消息时,缓冲区大小会使用这个比率进行调整。 0.8,或者设置了IGNITE_COMMUNICATION_BUF_RESIZE_RATIO系统属性值
setMinimumBufferedMessageCount(int) 设置这个SPI的最小消息数量,它们在发送之前被缓冲。 512,或者设置了IGNITE_MIN_BUFFERED_COMMUNICATION_MSG_CNT系统属性
setDualSocketConnection(boolean) 设置节点间是否要强制双向套接字连接的标志,如果设置为true,通信的节点间会建立两个独立的连接,一个用于输出消息,一个用于输入,如果设置为false,只会建立一个TCP连接用于双向通信,这个标志对于某些操作系统非常有用,比如当TCP_NODELAY被禁用并且消息的传递花费太长时间时。 false
setConnectionBufferSize(int) 只有当setAsyncSend(boolean)设置为false时,这个参数才有用。设置同步连接时的缓冲区大小,当同步地发送和接收大量的小消息时,可以增加缓冲区大小。然而,大多数情况下这个值应该设置为0(默认)。 0
setSelectorsCount(int) 设置TCP服务器使用的选择器数量。 默认的选择器数量等于Math.min(4, Runtime.getRuntime() .availableProcessors())这个表达式的结果
setConnectionBufferFlushFrequency(long) 只有当setAsyncSend(boolean)设置为false时,这个参数才有用。设置连接缓冲区刷新频率(毫秒),这个参数只有当同步发送并且连接缓冲区大小非0时才有意义。一旦在指定的时间段内如果没有足够的消息让其自动刷新时,缓冲区会被刷新。 100
setDirectBuffer(boolean) 在使用NIO Direct以及NIO Heap分配缓冲区之间进行切换。虽然Direct Buffer执行的更好,但有时(尤其在Windows)可能会造成JVM崩溃,如果在自己的环境中发生了,需要将这个属性设置为false。 true
setDirectSendBuffer(boolean) 当使用异步模式进行消息发送时,在使用NIO Direct以及NIO Heap分配缓冲区之间进行切换。 false
setAsyncSend(boolean) 在同步或者异步消息发送之间进行切换。当节点间通过网络以多线程的方式发送大量的数据时,这个值应该设为true(默认),但是这个依赖于环境以及应用,因此建议对应用针对这两种模式进行基准测试。 true
setSharedMemoryPort(int) 当在同一台主机上启动了IpcSharedMemoryServerEndpoint节点时,通过IPC共享内存进行通信的端口(只针对Linux和MacOS主机),设置为-1可以禁用IPC共享内存通信。 48100
setSocketReceiveBuffer(int) 设置这个SPI创建或者接收的套接字的接收缓冲区大小,如果未指定,默认值为0,它会导致套接字创建之后缓冲区无法交换(即使用操作系统默认值)。 0
setSocketSendBuffer(int) 设置这个SPI创建或者接收的套接字的发送缓冲区大小,如果未指定,默认值为0,它会导致套接字创建之后缓冲区无法交换(即使用操作系统默认值)。 0

示例
下面的例子显示了如何调整TcpCommunicationSpi的参数:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="communicationSpi">
  4. <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
  5. <!-- Override local port. -->
  6. <property name="localPort" value="4321"/>
  7. </bean>
  8. </property>
  9. ...
  10. </bean>

Java:

  1. TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
  2. // Override local port.
  3. commSpi.setLocalPort(4321);
  4. IgniteConfiguration cfg = new IgniteConfiguration();
  5. // Override default communication SPI.
  6. cfg.setCommunicationSpi(commSpi);
  7. // Start grid.
  8. Ignition.start(cfg);
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注