Cassandra源码阅读(2)-写流程解析

Cassandra源码阅读(2):写流程解析

本文以简单的写操作为例来分析,这里介绍的不包括CQL解析,Trigger,counter,view的更新,对于tokenRing的具体选择方法这里也暂时不涉及,只涉及最基础的写流程。上面说到的这些后续会有分析文章。

文章更新日志:

2019-03-03 介绍到CommitLog

2019-03-18 介绍Memtable结构以及写入路径

整体执行流程

  1. 客户端发起写请求到服务端解析成一个QueryMessage,然后通过对应的QueryHandler.process处理,里面会包括CQL的解析,已经具体变更的执行,也就是ProcessStatement

  2. Processstatment会先根据consistent level来判断是否需要把写发送到其他副本上,这里以non-local为例子分析。

  3. 判断是不是LWT(light weight transaction,通过hasCondition判断),如果是会调用StorageProxy.cas起一个Paxos同步流程,这里以非LWT为例,LWT后面再说。

  4. 判断是否是virtualTable或者是Counter,这两个的更新处理逻辑不一样,这里以标准的更新为例,暂时不管这两种特殊处理。

  5. 调用storageProxy.mutateWithTrigger,接着进入storageproxy.mutate,在mutate里面会调用storageProxy.performWite 这个方法里面会通过ReplicaPlans.forWrite生成对应的副本写入的replicaPlan。
    接着修改被代理到StandardWritePerformor的Apply中。在Apply里面会根据ReplicaPlan把修改发送到对应的节点上去。
    发送的具体内容是一个MessageOut对象,里面包含了一个Verb(操作类型)和对应的Mutation对象,mutation对象里面包含了对于表所有需要变更的内容。

  6. 副本机器接收到需要修改的对象之后,根据Verb来选择具体的操作,这里因为是写,所以以Verb=mutation为例。

  7. 写最后会被代理到Keyspace.applyInternal。下面详细说明Keyspace.applyInternal的流程

  8. 和其他的数据库一样首先会写CommitLog,具体代码是getWriteHandler().beginWrite(mutation, makeDurable)里面调用CommitLog.instance.add(Mutation),先把mutation序列化到一个内存ByteBuffer,然后把这个ByteBuffer写到一个基于文件的buffer(在代码里是 CommitLogSegment.Allocation 里面的buffer)里,这个基于文件的buffer根据配置可以是加密/压缩/无特殊配置,对于无特殊配置,cassandra是用的MappedByteBuffer,这个buffer默认是每隔10s调用一次同步(buffer.force())

  9. 接着根据mutation里面的更新内容(mutation.getPartitionUpdates),找到对应的表(ColumnFamilyStore),调用ColumnFamilyStore的apply方法。apply方法里面会去找到当前的Memtable,然后调用memtable的put方法,把对应的修改写入到memtable里面。

  10. 根据partitionkey失效缓存(CacheService.instance.rowCache.remove(key)),至此memtable写入流程完毕。

Memtable与PartitionUpdate结构详解

Memtable

Cassandra的Memtable(org.apache.cassandra.db.Memtable)结构中由一个SkipList来索引整个Memtable中的内容。

1
2
3
4
5

// We index the memtable by PartitionPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();

PartitionPosition 的实现了RingPosition接口。RingPosition接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Interface representing a position on the ring.
* Both Token and DecoratedKey represent a position in the ring, a token being
* less precise than a DecoratedKey (a token is really a range of keys).
*/
public interface RingPosition<C extends RingPosition<C>> extends Comparable<C>
{
public Token getToken();
public IPartitioner getPartitioner();
public boolean isMinimum();
public C minValue();
}

对于Murmur3Partitionerl来说,Token就是partitionKey被hash之后的包装类(LongToken)。

也就是说Memtable里面维护了一个有序Map,key的顺序是token的顺序(也就是hash之后的long的顺序),value则是一个”Wide Row”.

具体来说就是AtomicBTreePartition里面包含了一个BTree,Btree的每一个叶子节点都是一行(org.apache.cassandra.db.rows.Row),而Row只有一个唯一的实现类是BTreeRow,BTreeRow内部包含了由Cell组成的BTree结构。Cell则包含了列名以及Value。

PartitionUpdate

PartitionUpdate是AbstractBTreePartition的子类,里面以BTREE的结构包含了需要更新的Row。

这么说可能比较绕,下面的图可以比较清晰的看到具体的结构。

结构示意

Memtable代码结构示意图

CassandraMemtableStructure

Memtable代码与表概念对照图。红色是代码名称,黑色是Cassandra中通用概念
CassandraMemtableStructure

总结

本文介绍了cassandra写入memtable的基本流程,以及memtable的结构。下一篇文章将介绍memtable flush到sstable的过程