@AlexZFX
2018-11-20T09:45:54.000000Z
字数 16022
阅读 248
数据库 NIO JAVA
之前六月份的时候有参加过阿里举办的第四届中间件性能大赛,学到了不少东西,所以之后经常会关注一下天池那边阿里举办的程序设计大赛,九月底的时候注意到了这一届的POLARDB数据库性能大赛,很早就报了名。预热赛10月25日开始了~,初赛11月5日正式开始,11月19日结束,我这篇文章发布的时候初赛就已经结束了。因为11月之前一直在找实习,所以一直没有做什么准备,11月10号左右开始编写第一版的代码,到11月18号晚上放弃继续尝试。最后初赛成绩是42名,时间是240.69秒,和大佬们比起来还是有很大差距的。写这篇博客,主要还是想分享一下这段时间参赛的思路,和一点一点慢慢提升的经历。
GitHub: https://github.com/AlexZFX/engine 当前只️更新了初赛代码,复赛结束后会继续更新。
PolarDB作为软硬件结合的代表, 充分使用新硬件, 榨干硬件的红利来为用户获取极致的数据性能, 其中在PolarDB 的设计中, 我们使用 Optane SSD作为所有热数据的写入缓冲区, 通过kernel bypass的方式, 实现了极致的性能。所以本次比赛就以Optane SSD盘为背景,参赛者在其基础之上探索实现一种高效的kv存储引擎
以上是阿里云官方给的比赛背景,具体的题目内容如下
初赛赛题(完整请点击查看): 实现一个简化、高效的kv存储引擎,支持Write、Read接口。
程序评测逻辑 评测程序分为2个阶段:
1)Recover正确性评测 此阶段评测程序会并发写入特定数据(key 8B、value
4KB)同时进行任意次kill
-9来模拟进程意外退出(参赛引擎需要保证进程意外退出时数据持久化不丢失),接着重新打开DB,调用Read接口来进行正确性校验2)性能评测
- 随机写入:64个线程并发随机写入,每个线程使用Write各写100万次随机数据(key 8B、value 4KB)
- 随机读取:64个线程并发随机读取,每个线程各使用Read读取100万次随机数据 注:2.2阶段会对所有读取的kv校验是否匹配,如不通过则终止,评测失败
总体说来我们能得到的要求和信息为以下几点:
完整的参赛过程大概是一周时间,这一周进行了非常多的尝试,成绩也从第一次跑通时的900多s到最后稳定在240s,接下来会细细的说一说每一版的思路和进阶过程。(下面的标题写的key value分别表示采用的文件数)
先做一些简单的计算,
key + offset = ( 8 + 8 ) * 64000000 / 1024 / 1024 = 977M
value = 4096 * 64000000 / 1024 / 1024 / 1024 = 245G
可见磁盘和内存的限制相对来说不会造成很大的影响,对合理的设计来说还是充足的。
因为key是一个8B的byte数组,故转化成一个long型的数字很简单并且非常有利于接下来计算的事情。所以下文讨论的key都是建立在long型的基础上的。
初始主体的思路是这样的
首先想的是要跑出成绩,把所有的key都写在了一个文件里,一开始忽略了一个小点,把key和offset分开写入了文件,导致出现了一些key和value不匹配的问题。很显然的问题是写key和写offset会出现线程问题,可能导致本来应该是KeyValueKeyValue形式的数据,被写成KeyKeyValueValue的形式,所以出错之后直接加了个synchronized关键字,得出第一次的成绩968s,很快修改了这个简单的小问题,得到一个明显有大幅提升的成绩381.79s,这时的代码主要是这样的。
@Overridepublic void open(String path) throws EngineException {File file = new File(path);// 创建目录if (!file.exists()) {if (!file.mkdir()) {throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);} else {logger.info("创建文件目录成功:" + path);}}//创建 FILE_COUNT个FileChannel 顺序写入RandomAccessFile randomAccessFile;if (file.isDirectory()) {for (int i = 0; i < FILE_COUNT; i++) {try {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");FileChannel channel = randomAccessFile.getChannel();fileChannels[i] = channel;// 从 length处直接写入offsets[i] = new AtomicLong(randomAccessFile.length());} catch (IOException e) {e.printStackTrace();}}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");}File keyFile = new File(path + File.separator + "key");if (!keyFile.exists()) {try {keyFile.createNewFile();} catch (IOException e) {e.printStackTrace();}}// 从 index 文件建立 hashmaptry {randomAccessFile = new RandomAccessFile(keyFile, "rw");keyFileChannel = randomAccessFile.getChannel();ByteBuffer keyBuffer = ByteBuffer.allocate(KEY_LEN);ByteBuffer offBuffer = ByteBuffer.allocate(KEY_LEN);keyFileOffset = new AtomicLong(randomAccessFile.length());long temp = 0, maxOff = keyFileOffset.get();while (temp < maxOff) {keyBuffer.position(0);keyFileChannel.read(keyBuffer, temp);temp += KEY_LEN;offBuffer.position(0);keyFileChannel.read(offBuffer, temp);temp += KEY_LEN;keyBuffer.position(0);offBuffer.position(0);keyMap.put(keyBuffer.getLong(), offBuffer.getLong());}} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(byte[] key, byte[] value) throws EngineException {long numkey = Util.bytes2long(key);int hash = hash(numkey);long off = offsets[hash].getAndAdd(VALUE_LEN);keyMap.put(numkey, off + 1);try {//key和offset写入文件localKey.get().putLong(0, numkey).putLong(8, off + 1);localKey.get().position(0);keyFileChannel.write(localKey.get(), keyFileOffset.getAndAdd(KEY_AND_OFF_LEN));//将value写入bufferlocalBufferValue.get().position(0);localBufferValue.get().put(value, 0, VALUE_LEN);//buffer写入文件localBufferValue.get().position(0);fileChannels[hash].write(localBufferValue.get(), off);} catch (IOException e) {throw new EngineException(RetCodeEnum.IO_ERROR, "写入数据出错");}}@Overridepublic byte[] read(byte[] key) throws EngineException {long numkey = Util.bytes2long(key);int hash = hash(numkey);// key 不存在会返回0,避免跟位置0混淆,off写加一,读减一long off = keyMap.get(numkey);if (off == 0) {throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");}try {localBufferValue.get().position(0);fileChannels[hash].read(localBufferValue.get(), off - 1);} catch (IOException e) {throw new EngineException(RetCodeEnum.IO_ERROR, "读取数据出错");}localBufferValue.get().position(0);localBufferValue.get().get(localByteValue.get(), 0, VALUE_LEN);return localByteValue.get();}
这个时候open的时间将近90s,很显然是一个超出可承受范围的结果。所以接下来很快对这一部分进行了优化。
open时间过长,所以这成了我们关注的一个重点,这段时间我们做了很多改动,改动的过程主要是这样的。
这版的主要改动在open地方,下面贴出了这版的open方法。
@Overridepublic void open(String path) throws EngineException {File file = new File(path);// 创建目录if (!file.exists()) {if (!file.mkdir()) {throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);} else {logger.info("创建文件目录成功:" + path);}}RandomAccessFile randomAccessFile;// file是一个目录时进行接下来的操作if (file.isDirectory()) {try {//先构建keyFileChannel 和 初始化 mapfor (int i = 0; i < THREAD_NUM; i++) {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");FileChannel channel = randomAccessFile.getChannel();keyFileChannels[i] = channel;keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length());}ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);for (int i = 0; i < THREAD_NUM; i++) {if (!(keyOffsets[i].get() == 0)) {final long off = keyOffsets[i].get();final int finalI = i;executor.execute(() -> {int start = 0;long key;int keyHash;while (start < off) {try {localKey.get().position(0);keyFileChannels[finalI].read(localKey.get(), start);start += KEY_AND_OFF_LEN;localKey.get().position(0);key = localKey.get().getLong();keyHash = keyFileHash(key);keyMap[keyHash].put(key, localKey.get().getInt());} catch (IOException e) {e.printStackTrace();}}countDownLatch.countDown();});} else {countDownLatch.countDown();}}countDownLatch.await();executor.shutdownNow();} catch (IOException | InterruptedException e) {e.printStackTrace();}//创建 FILE_COUNT个FileChannel 供write顺序写入for (int i = 0; i < FILE_COUNT; i++) {try {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");FileChannel channel = randomAccessFile.getChannel();fileChannels[i] = channel;// 从 length处直接写入valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));} catch (IOException e) {e.printStackTrace();}}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");}}
这一部分里我们还做了一些小事
之前一直考虑着用mmap,在java里面对应的就是MappedByteBuffer,因为我不确定mmap能不能在kill -9 被杀进程的情况保证数据的完整性,同时,如果都用mmap写入的话,会让我无法确定文件的大小(mmap映射时要预先指定文件大小),无法在kill之后能从指定的位置追加写入。所以打算一步一步,最后再考虑使用这个。
但是open的时候使用mmap读一定是没有风险的,所以又进行了一次对open的改动,这时还是64个key文件和128个value文件,得到的跑分是248.58,open过程被压缩在了1s以内,大约600ms左右,这个open速度我们就基本已经满足了。
后来改成了64个value文件,每次只进行一次hash就可以确定key和value文件的位置,并且读写速度似乎都有略微进步,达到了245.18s。
这时的open代码如下
@Overridepublic void open(String path) throws EngineException {File file = new File(path);// 创建目录if (!file.exists()) {if (!file.mkdir()) {throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);} else {logger.info("创建文件目录成功:" + path);}}RandomAccessFile randomAccessFile;// file是一个目录时进行接下来的操作if (file.isDirectory()) {try {//先构建keyFileChannel 和 初始化 mapfor (int i = 0; i < THREAD_NUM; i++) {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");FileChannel channel = randomAccessFile.getChannel();keyFileChannels[i] = channel;keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length());}ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);for (int i = 0; i < THREAD_NUM; i++) {if (!(keyOffsets[i].get() == 0)) {final long off = keyOffsets[i].get();final int finalI = i;executor.execute(() -> {int start = 0;try {MappedByteBuffer mappedByteBuffer = keyFileChannels[finalI].map(FileChannel.MapMode.READ_ONLY, 0, off);while (start < off) {start += KEY_AND_OFF_LEN;keyMap[finalI].put(mappedByteBuffer.getLong(), mappedByteBuffer.getInt());}unmap(mappedByteBuffer);countDownLatch.countDown();} catch (IOException e) {e.printStackTrace();}});} else {countDownLatch.countDown();}}countDownLatch.await();executor.shutdownNow();} catch (IOException | InterruptedException e) {e.printStackTrace();}//创建 FILE_COUNT个FileChannel 供write顺序写入for (int i = 0; i < FILE_COUNT; i++) {try {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");FileChannel channel = randomAccessFile.getChannel();fileChannels[i] = channel;// 从 length处直接写入valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));} catch (IOException e) {e.printStackTrace();}}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");}}
这一版当中我们也发现了一些问题,阅读了许多文章,总结主要如下:
有第三版最后发现的内容,我们打算再对key的写入做一些改动,也就是将fileChannel写入key的方式改动为mmap写入。而mmap映射的文件大小选择一个稍大的值,open之后的写入offset通过value文件的大小来确定(valuelen / 4096 * 12),这一优化带来的大约2~3s的提升。
除此之外,还进行了简单的jvm调优工作,将新生代和老年代的比例进行了调整,将原来1:1的比例调整为了6:1,这部分优化带来了大约2s的性能提升。
最后完整的代码这一块我就直接贴在下面,对整个过程有兴趣的也可以去我的github上clone下来查看。
package com.alibabacloud.polar_race.engine.common;import com.alibabacloud.polar_race.engine.common.exceptions.EngineException;import com.alibabacloud.polar_race.engine.common.exceptions.RetCodeEnum;import com.carrotsearch.hppc.LongIntHashMap;import io.netty.util.concurrent.FastThreadLocal;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.File;import java.io.IOException;import java.io.RandomAccessFile;import java.nio.ByteBuffer;import java.nio.MappedByteBuffer;import java.nio.channels.FileChannel;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;public class EngineRace extends AbstractEngine {private static Logger logger = LoggerFactory.getLogger(EngineRace.class);// key+offset 长度 16Bprivate static final int KEY_AND_OFF_LEN = 12;// 线程数量private static final int THREAD_NUM = 64;// value 长度 4Kprivate static final int VALUE_LEN = 4096;//每个map存储的key数量private static final int PER_MAP_COUNT = 1024000;private static final int SHIFT_NUM = 12;// 存放 value 的文件数量 128private static final int FILE_COUNT = 64;private static final int HASH_VALUE = 0x3F;private static final LongIntHashMap[] keyMap = new LongIntHashMap[THREAD_NUM];static {for (int i = 0; i < THREAD_NUM; i++) {keyMap[i] = new LongIntHashMap(PER_MAP_COUNT, 0.98);}}//key 文件的fileChannelprivate static FileChannel[] keyFileChannels = new FileChannel[THREAD_NUM];private static AtomicInteger[] keyOffsets = new AtomicInteger[THREAD_NUM];private static MappedByteBuffer[] keyMappedByteBuffers = new MappedByteBuffer[THREAD_NUM];//value 文件的fileChannelprivate static FileChannel[] fileChannels = new FileChannel[FILE_COUNT];private static AtomicInteger[] valueOffsets = new AtomicInteger[FILE_COUNT];private static FastThreadLocal<ByteBuffer> localBufferValue = new FastThreadLocal<ByteBuffer>() {@Overrideprotected ByteBuffer initialValue() throws Exception {return ByteBuffer.allocate(VALUE_LEN);}};@Overridepublic void open(String path) throws EngineException {File file = new File(path);// 创建目录if (!file.exists()) {if (!file.mkdir()) {throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);} else {logger.info("创建文件目录成功:" + path);}}RandomAccessFile randomAccessFile;// file是一个目录时进行接下来的操作if (file.isDirectory()) {try {//先 创建 FILE_COUNT个FileChannel 供write顺序写入,并由此文件获取value文件的大小for (int i = 0; i < FILE_COUNT; i++) {try {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");FileChannel channel = randomAccessFile.getChannel();fileChannels[i] = channel;// 从 length处直接写入valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));keyOffsets[i] = new AtomicInteger(valueOffsets[i].get() * KEY_AND_OFF_LEN);} catch (IOException e) {e.printStackTrace();}}//先构建keyFileChannel 和 初始化 mapfor (int i = 0; i < THREAD_NUM; i++) {randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");FileChannel channel = randomAccessFile.getChannel();keyFileChannels[i] = channel;keyMappedByteBuffers[i] = channel.map(FileChannel.MapMode.READ_WRITE, 0, PER_MAP_COUNT * 20);}CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);for (int i = 0; i < THREAD_NUM; i++) {if (!(keyOffsets[i].get() == 0)) {final long off = keyOffsets[i].get();final int finalI = i;final MappedByteBuffer buffer = keyMappedByteBuffers[i];new Thread(() -> {int start = 0;while (start < off) {start += KEY_AND_OFF_LEN;keyMap[finalI].put(buffer.getLong(), buffer.getInt());}countDownLatch.countDown();}).start();} else {countDownLatch.countDown();}}countDownLatch.await();} catch (IOException | InterruptedException e) {e.printStackTrace();}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");}}@Overridepublic void write(byte[] key, byte[] value) throws EngineException {long numkey = Util.bytes2long(key);int hash = valueFileHash(numkey);int off = valueOffsets[hash].getAndIncrement();try {ByteBuffer keyBuffer = keyMappedByteBuffers[hash].slice();keyBuffer.position(keyOffsets[hash].getAndAdd(KEY_AND_OFF_LEN));keyBuffer.putLong(numkey).putInt(off);//将value写入bufferByteBuffer valueBuffer = localBufferValue.get();valueBuffer.clear();valueBuffer.put(value);valueBuffer.flip();fileChannels[hash].write(valueBuffer, ((long) off) << SHIFT_NUM);} catch (IOException e) {throw new EngineException(RetCodeEnum.IO_ERROR, "写入数据出错");}}@Overridepublic byte[] read(byte[] key) throws EngineException {long numkey = Util.bytes2long(key);int hash = valueFileHash(numkey);long off = keyMap[hash].getOrDefault(numkey, -1);ByteBuffer buffer = localBufferValue.get();if (off == -1) {throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");}try {buffer.clear();fileChannels[hash].read(buffer, off << SHIFT_NUM);} catch (IOException e) {throw new EngineException(RetCodeEnum.IO_ERROR, "读取数据出错");}return buffer.array();}@Overridepublic void range(byte[] lower, byte[] upper, AbstractVisitor visitor) throws EngineException {}@Overridepublic void close() {for (int i = 0; i < FILE_COUNT; i++) {try {keyFileChannels[i].close();fileChannels[i].close();} catch (IOException e) {logger.error("close error");}}}private static int valueFileHash(long key) {return (int) (key & HASH_VALUE);}}
这一版写的代码和之前有点不同如下:
初赛刚写的时候有一点中间件性能大赛复赛类似的地方,不过相比来说还是多学会了许多知识。我其实也尝试了利用unsafe来实现内存拷贝的一部分,但是似乎并没有起到一个好的效果,感觉主要还是我的使用姿势有些不正确,我把这一部分的有关代码放在了github的unsafe分支中,有兴趣也可以简单查看一下。
正在进行的是复赛,相比初赛来说增加了一个全量顺序遍历的需求,难度更大,也更有意思了,感觉复赛更考的是一部分设计方面的东西了,接下来还是会使用Java继续参加,如果有所收获的话,还会再写一篇博客进行相应的总结。