[关闭]
@aloxc 2017-12-05T06:41:02.000000Z 字数 8043 阅读 561

ignite做流式计算的时候如何对自定义实体类结果进行排序?

ignite streaming


先看下一个官网的示例

  1. public class StreamTransformerExample {
  2. /** Random number generator. */
  3. private static final Random RAND = new Random();
  4. /** Range within which to generate numbers. */
  5. private static final int RANGE = 1000;
  6. public static void main(String[] args) throws Exception {
  7. // Mark this cluster member as client.
  8. Ignition.setClientMode(true);
  9. try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
  10. if (!ExamplesUtils.hasServerNodes(ignite))
  11. return;
  12. CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers");
  13. // Index key and value.
  14. cfg.setIndexedTypes(Integer.class, Long.class);
  15. // Auto-close cache at the end of the example.
  16. try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {
  17. try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
  18. // Allow data updates.
  19. stmr.allowOverwrite(true);
  20. // Configure data transformation to count random numbers added to the stream.
  21. stmr.receiver(new StreamTransformer<Integer, Long>() {
  22. @Override public Object process(MutableEntry<Integer, Long> e, Object... args) {
  23. // Get current count.
  24. Long val = e.getValue();
  25. // Increment count by 1.
  26. e.setValue(val == null ? 1L : val + 1);
  27. return null;
  28. }
  29. });
  30. // Stream 10 million of random numbers into the streamer cache.
  31. for (int i = 1; i <= 10_000_000; i++) {
  32. stmr.addData(RAND.nextInt(RANGE), 1L);
  33. if (i % 500_000 == 0)
  34. System.out.println("Number of tuples streamed into Ignite: " + i);
  35. }
  36. }
  37. // Query top 10 most popular numbers every.
  38. SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10");
  39. // Execute queries.
  40. List<List<?>> top10 = stmCache.query(top10Qry).getAll();
  41. System.out.println("Top 10 most popular numbers:");
  42. // Print top 10 words.
  43. ExamplesUtils.printQueryResults(top10);
  44. }
  45. finally {
  46. // Distributed cache could be removed from cluster only by #destroyCache() call.
  47. ignite.destroyCache(cfg.getName());
  48. }
  49. }
  50. }
  51. }

这个示例是先写入若干随机的数字到流缓存中,然后通过sql查询出这些数字的次数,按次数从高到低排序。

那现在我想写入一个自定义的对象StreamPerson到流缓存中,我想通过sql查询这些对象按,对象中的age进行排序?如何做,经过本人试验,StreamPerson也实现了Comparable接口了,无法得到我想要的排序结果。

看下ignite中流相关的介绍

Ignite always stores cache keys and values as _key and _val fields, so we use this syntax in our SQL query.Ignite通常以_key和_val字段名来保存键和值。

我也使用StreamPerson的不正确排序结果(sql:*select * from StreamPerson order by _val desc limit 10*),select * 返回的每行只有两列,就是_key和_val。
我试着修改sql为

*select * from StreamPerson order by _val.age desc limit 10*

*select * from StreamPerson order by _val.getAge() desc limit 10*

都有异常
异常堆栈信息如下:

  1. Exception in thread "main" javax.cache.CacheException: Failed to parse query: select * from StreamPerson order by _val.age desc limit 10
  2. at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1137)
  3. at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:732)
  4. at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:730)
  5. at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
  6. at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1666)
  7. at org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:730)
  8. at org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:700)
  9. at com.github.aloxc.ignite.streaming.StreamTransformerExample1.main(StreamTransformerExample1.java:76)
  10. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  11. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  12. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  13. at java.lang.reflect.Method.invoke(Method.java:498)
  14. at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
  15. Caused by: org.h2.jdbc.JdbcSQLException: Column "_VAL.AGE" not found; SQL statement:
  16. select * from StreamPerson order by _val.age desc limit 10 [42122-191]
  17. at org.h2.message.DbException.getJdbcSQLException(DbException.java:345)
  18. at org.h2.message.DbException.get(DbException.java:179)
  19. at org.h2.message.DbException.get(DbException.java:155)
  20. at org.h2.expression.ExpressionColumn.optimize(ExpressionColumn.java:147)
  21. at org.h2.command.dml.Select.prepare(Select.java:852)
  22. at org.h2.command.Parser.prepareCommand(Parser.java:257)
  23. at org.h2.engine.Session.prepareLocal(Session.java:560)
  24. at org.h2.engine.Session.prepareCommand(Session.java:501)
  25. at org.h2.jdbc.JdbcConnection.prepareCommand(JdbcConnection.java:1188)
  26. at org.h2.jdbc.JdbcPreparedStatement.<init>(JdbcPreparedStatement.java:73)
  27. at org.h2.jdbc.JdbcConnection.prepareStatement(JdbcConnection.java:276)
  28. at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.prepareStatement(IgniteH2Indexing.java:406)
  29. at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1121)
  30. ... 12 more
  1. Exception in thread "main" javax.cache.CacheException: Failed to parse query: select * from StreamPerson order by _val.getAge() desc limit 10
  2. at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1137)
  3. at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:732)
  4. at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:730)
  5. at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
  6. at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1666)
  7. at org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:730)
  8. at org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:700)
  9. at com.github.aloxc.ignite.streaming.StreamTransformerExample1.main(StreamTransformerExample1.java:76)
  10. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  11. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  12. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  13. at java.lang.reflect.Method.invoke(Method.java:498)
  14. at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
  15. Caused by: org.h2.jdbc.JdbcSQLException: Function "GETAGE" not found; SQL statement:
  16. select * from StreamPerson order by _val.getAge() desc limit 10 [90022-191]
  17. at org.h2.message.DbException.getJdbcSQLException(DbException.java:345)
  18. at org.h2.message.DbException.get(DbException.java:179)
  19. at org.h2.message.DbException.get(DbException.java:155)
  20. at org.h2.command.Parser.readJavaFunction(Parser.java:2405)
  21. at org.h2.command.Parser.readFunction(Parser.java:2457)
  22. at org.h2.command.Parser.readTermObjectDot(Parser.java:2666)
  23. at org.h2.command.Parser.readTerm(Parser.java:2784)
  24. at org.h2.command.Parser.readFactor(Parser.java:2308)
  25. at org.h2.command.Parser.readSum(Parser.java:2295)
  26. at org.h2.command.Parser.readConcat(Parser.java:2265)
  27. at org.h2.command.Parser.readCondition(Parser.java:2115)
  28. at org.h2.command.Parser.readAnd(Parser.java:2087)
  29. at org.h2.command.Parser.readExpression(Parser.java:2079)
  30. at org.h2.command.Parser.parseEndOfQuery(Parser.java:1751)
  31. at org.h2.command.Parser.parseSelectUnionExtension(Parser.java:1731)
  32. at org.h2.command.Parser.parseSelectUnion(Parser.java:1700)
  33. at org.h2.command.Parser.parseSelect(Parser.java:1687)
  34. at org.h2.command.Parser.parsePrepared(Parser.java:443)
  35. at org.h2.command.Parser.parse(Parser.java:315)
  36. at org.h2.command.Parser.parse(Parser.java:287)
  37. at org.h2.command.Parser.prepareCommand(Parser.java:252)
  38. at org.h2.engine.Session.prepareLocal(Session.java:560)
  39. at org.h2.engine.Session.prepareCommand(Session.java:501)
  40. at org.h2.jdbc.JdbcConnection.prepareCommand(JdbcConnection.java:1188)
  41. at org.h2.jdbc.JdbcPreparedStatement.<init>(JdbcPreparedStatement.java:73)
  42. at org.h2.jdbc.JdbcConnection.prepareStatement(JdbcConnection.java:276)
  43. at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.prepareStatement(IgniteH2Indexing.java:406)
  44. at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1121)
  45. ... 12 more

这个问题解决了,想实现这个功能需要这样做

  • 实体类的相关字段增加QuerySqlField注解或者在配置文件中写明,或者在配置代码中写明
  • 调整sql语句,*select * from StreamPerson order by age desc limit 10* ,这个sql是按age排序,那在StreamPerson这个实体类的age字段上必须加上QuerySqlField注解或者在配置文件中写明,或者在配置代码中写明
  • 然后就是读取数据了。

>* 特别注意:当实体的n个字段配置了QuerySqlField后,使用select * from StreamPerson返回的记录集中会有n+2个字段,另外两个就是_key和_val,这两个是ignite保留返回的!

这些细节没在文档中体现出来,我猜是留给要用这些细节来做收费服务的!

x

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注