@aloxc
2017-12-05T06:41:02.000000Z
字数 8043
阅读 561
ignite
streaming
先看下一个官网的示例
public class StreamTransformerExample {
/** Random number generator. */
private static final Random RAND = new Random();
/** Range within which to generate numbers. */
private static final int RANGE = 1000;
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
if (!ExamplesUtils.hasServerNodes(ignite))
return;
CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers");
// Index key and value.
cfg.setIndexedTypes(Integer.class, Long.class);
// Auto-close cache at the end of the example.
try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {
try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count random numbers added to the stream.
stmr.receiver(new StreamTransformer<Integer, Long>() {
@Override public Object process(MutableEntry<Integer, Long> e, Object... args) {
// Get current count.
Long val = e.getValue();
// Increment count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}
});
// Stream 10 million of random numbers into the streamer cache.
for (int i = 1; i <= 10_000_000; i++) {
stmr.addData(RAND.nextInt(RANGE), 1L);
if (i % 500_000 == 0)
System.out.println("Number of tuples streamed into Ignite: " + i);
}
}
// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10");
// Execute queries.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();
System.out.println("Top 10 most popular numbers:");
// Print top 10 words.
ExamplesUtils.printQueryResults(top10);
}
finally {
// Distributed cache could be removed from cluster only by #destroyCache() call.
ignite.destroyCache(cfg.getName());
}
}
}
}
这个示例是先写入若干随机的数字到流缓存中,然后通过sql查询出这些数字的次数,按次数从高到低排序。
那现在我想写入一个自定义的对象StreamPerson到流缓存中,我想通过sql查询这些对象按,对象中的age进行排序?如何做,经过本人试验,StreamPerson也实现了Comparable接口了,无法得到我想要的排序结果。
看下ignite中流相关的介绍
Ignite always stores cache keys and values as _key and _val fields, so we use this syntax in our SQL query.Ignite通常以_key和_val字段名来保存键和值。
我也使用StreamPerson的不正确排序结果(sql:*select * from StreamPerson order by _val desc limit 10*),select * 返回的每行只有两列,就是_key和_val。
我试着修改sql为
*select * from StreamPerson order by _val.age desc limit 10*
*select * from StreamPerson order by _val.getAge() desc limit 10*
都有异常
异常堆栈信息如下:
Exception in thread "main" javax.cache.CacheException: Failed to parse query: select * from StreamPerson order by _val.age desc limit 10
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1137)
at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:732)
at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:730)
at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1666)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:730)
at org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:700)
at com.github.aloxc.ignite.streaming.StreamTransformerExample1.main(StreamTransformerExample1.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.h2.jdbc.JdbcSQLException: Column "_VAL.AGE" not found; SQL statement:
select * from StreamPerson order by _val.age desc limit 10 [42122-191]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:345)
at org.h2.message.DbException.get(DbException.java:179)
at org.h2.message.DbException.get(DbException.java:155)
at org.h2.expression.ExpressionColumn.optimize(ExpressionColumn.java:147)
at org.h2.command.dml.Select.prepare(Select.java:852)
at org.h2.command.Parser.prepareCommand(Parser.java:257)
at org.h2.engine.Session.prepareLocal(Session.java:560)
at org.h2.engine.Session.prepareCommand(Session.java:501)
at org.h2.jdbc.JdbcConnection.prepareCommand(JdbcConnection.java:1188)
at org.h2.jdbc.JdbcPreparedStatement.<init>(JdbcPreparedStatement.java:73)
at org.h2.jdbc.JdbcConnection.prepareStatement(JdbcConnection.java:276)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.prepareStatement(IgniteH2Indexing.java:406)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1121)
... 12 more
Exception in thread "main" javax.cache.CacheException: Failed to parse query: select * from StreamPerson order by _val.getAge() desc limit 10
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1137)
at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:732)
at org.apache.ignite.internal.processors.query.GridQueryProcessor$2.applyx(GridQueryProcessor.java:730)
at org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1666)
at org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:730)
at org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:700)
at com.github.aloxc.ignite.streaming.StreamTransformerExample1.main(StreamTransformerExample1.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.h2.jdbc.JdbcSQLException: Function "GETAGE" not found; SQL statement:
select * from StreamPerson order by _val.getAge() desc limit 10 [90022-191]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:345)
at org.h2.message.DbException.get(DbException.java:179)
at org.h2.message.DbException.get(DbException.java:155)
at org.h2.command.Parser.readJavaFunction(Parser.java:2405)
at org.h2.command.Parser.readFunction(Parser.java:2457)
at org.h2.command.Parser.readTermObjectDot(Parser.java:2666)
at org.h2.command.Parser.readTerm(Parser.java:2784)
at org.h2.command.Parser.readFactor(Parser.java:2308)
at org.h2.command.Parser.readSum(Parser.java:2295)
at org.h2.command.Parser.readConcat(Parser.java:2265)
at org.h2.command.Parser.readCondition(Parser.java:2115)
at org.h2.command.Parser.readAnd(Parser.java:2087)
at org.h2.command.Parser.readExpression(Parser.java:2079)
at org.h2.command.Parser.parseEndOfQuery(Parser.java:1751)
at org.h2.command.Parser.parseSelectUnionExtension(Parser.java:1731)
at org.h2.command.Parser.parseSelectUnion(Parser.java:1700)
at org.h2.command.Parser.parseSelect(Parser.java:1687)
at org.h2.command.Parser.parsePrepared(Parser.java:443)
at org.h2.command.Parser.parse(Parser.java:315)
at org.h2.command.Parser.parse(Parser.java:287)
at org.h2.command.Parser.prepareCommand(Parser.java:252)
at org.h2.engine.Session.prepareLocal(Session.java:560)
at org.h2.engine.Session.prepareCommand(Session.java:501)
at org.h2.jdbc.JdbcConnection.prepareCommand(JdbcConnection.java:1188)
at org.h2.jdbc.JdbcPreparedStatement.<init>(JdbcPreparedStatement.java:73)
at org.h2.jdbc.JdbcConnection.prepareStatement(JdbcConnection.java:276)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.prepareStatement(IgniteH2Indexing.java:406)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1121)
... 12 more
这个问题解决了,想实现这个功能需要这样做
- 实体类的相关字段增加QuerySqlField注解或者在配置文件中写明,或者在配置代码中写明
- 调整sql语句,*select * from StreamPerson order by age desc limit 10* ,这个sql是按age排序,那在StreamPerson这个实体类的age字段上必须加上QuerySqlField注解或者在配置文件中写明,或者在配置代码中写明
- 然后就是读取数据了。
这些细节没在文档中体现出来,我猜是留给要用这些细节来做收费服务的!