Gossip In Cassandra

This article describes the implementation details of the Gossip protocol in Cassandra.

What is the Gossip protocol?

Gossip is a widely used protocol in distributed systems, and is mainly used to implement information exchange between distributed nodes or processes.The Gossip protocol uses a random way to spread information across the network.As Gossip itself means, the workflow of the Gossip protocol is similar to the spread of anecdotes or the spread of epidemics.
More detailed information can be found on the Wikipedia page Gossip_protocol

Implementation in Cassandra

The gossip-related processing in cassandra is concentrated in the Gossiper class.The method of initiating gossip is in the Gossiper.gossipTask.run method, which is a runnable and executed once per second by a scheduled task in the upper layer.

Node State ata structure

Gossiper.endpointStateMap<InetAddress, EndPointState> holds the state of the entire cluster known to the current node. The key is the IP address of the corresponding node, and the value is EndPointState.

EndPointState consists of two parts: HeartBeatState and ApplicationState.

There are two members in HeartBeatState, generation and version.The generation can be understood as the major version number of the current node state defaults to the timestamp in seconds of start time, and the version is the minor version number starting from 0.

ApplicationState is actually a Map<ApplicationState, VersionedValue>, which stores a variety of application information and the version corresponding to the information.ApplicationState is an enumeration.

The Gossip protocol in Cassandra have three phases.SYN, ACK, ACK2.Consider sending a gossip message from A to B as an example: A sends a SYN message to B. After receiving the SYN message, B replies with an ACK message. After receiving the ACK message from B, A replies with ACK2 message to B to complete a message exchange. .

The specific implementation details of each phase are described below.

SYN

  1. Update the HeartBeatState.version of the local node , increase it by 1.

  2. Construct a SYN message.The SYN message contains a List< GossipDigest > and cluster name as well as the partitioner of the cluster.GossipDigest is a summary of node information, including three fields: the address of the node (InetAddress), Generation (which is the generation in heartbeatstate), maxVersion (take the maximum value of the version in heartBeat and the versionedValue in the applicationState )
    Take out all the endpointStates in the endpointStateMap to construct the List.

  3. Randomly select one from the living nodes (Gossiper.liveEndPoints), the so-called sending object, and send a SYN message.If liveEndPoints is empty, nothing is done.

  4. An attempt is made to send a SYN message to an unreachable node by generating a random number rnd. If rnd is less than the number of unreachable nodes / (number of surviving nodes + 1), randomly select one of the unreachable nodes to send a SYN message.
  5. If no message is sent in step 3, or the number of nodes that are alive is less than the number of seed nodes, then the seed node is sent a message, the algorithm is similar to step 4, rnd < number of seed nodes / (survival node + unreachable node)

ACK

After receiving the SYN from A, B replies an ACK message to A, as follows:

  1. Sort the list received from A. The ordering is based on the comparison between the elements in the list and the local endStateMap. According to the order of the differences of the version, the more the versions differs the more element is in the front.(That is, the element in the front differs most between A and B).

  2. Comparing the status sent by A with the existing state of B, yielding two data: List< GossipDigest > deltaGossipDigestList and Map<InetAddress, EndPointState> deltaEpStateMap.

    The deltaGossipDigestList stores the GossipDigest that A has while B hasn’t , or A has the newer version, and will be placed in the following cases:
    I. A has B don’t has
    II. A’s Generation is greater than B’s
    III. A’s version is greater than B

    deltaEpStateMap stores infomation that B hold while A not , or B holds the newer version of EndPointState, using the similar conditions described above.

  3. Constructs an ACK message. The ACK message only contains the deltaGossipDigestList and deltaEpStateMap, reply it to A

ACK2

After A receives the ACK message from B:

  1. The local state is updated according to deltaEpStateMap when genration or version is greater than local.
  2. According to the deltaGossipDigestList, the corresponding latest node state is picked , and put it into an epStateMap<InetAddress, EndPointState> (because the first SYN only sends the digest and does not send the complete EndPointState)
  3. Constructs an ACK2 message. ACK2 contains only the epStateMap from step 2 and sends it to B.
  4. B updates the local status after receiving ACK2.

At this point, a round of Gossip information exchange between A and B ends.The whole process can be described as follows:

cassandra_gossip