如何通过memberlist库实现gossip管理集群及集群数据交互问题

2022-07-13,,,,

通过memberlist库实现gossip管理集群以及集群数据交互

概述

memberlist库的简单用法如下,注意下面使用for循环来执行 list.join ,原因是一开始各节点都没有runing,直接执行 join 会出现连接拒绝的错误。

package main
import (
	"fmt"
	"github.com/hashicorp/memberlist"
	"time"
)
func main() {
	/* create the initial memberlist from a safe configuration.
	   please reference the godoc for other default config types.
	   http://godoc.org/github.com/hashicorp/memberlist#config
	*/
	list, err := memberlist.create(memberlist.defaultlocalconfig())
	if err != nil {
		panic("failed to create memberlist: " + err.error())
	}
	t := time.newticker(time.second * 5)
	for {
		select {
		case <-t.c:
			// join an existing cluster by specifying at least one known member.
			n, err := list.join([]string{"192.168.80.129"})
			if err != nil {
				fmt.println("failed to join cluster: " + err.error())
				continue
			}
			fmt.println("member number is:", n)
			goto end
		}
	}
end:
	for {
		select {
		case <-t.c:
			// ask for members of the cluster
			for _, member := range list.members() {
				fmt.printf("member: %s %s\n", member.name, member.addr)
			}
		}
	}
	// continue doing whatever you need, memberlist will maintain membership
	// information in the background. delegates can be used for receiving
	// events when members join or leave.
}

memberlist的两个主要接口如下:

  • create:根据入参配置创建一个 memberlist ,初始化阶段 memberlist 仅包含本节点状态。注意此时并不会连接到其他节点,执行成功之后就可以允许其他节点加入该memberlist。

  • join:使用已有的 memberlist 来尝试连接给定的主机,并与之同步状态,以此来加入某个cluster。执行该操作可以让其他节点了解到本节点的存在。最后返回成功建立连接的节点数以及错误信息,如果没有与任何节点建立连接,则返回错误。

    注意当join一个cluster时,至少需要指定集群中的一个已知成员,后续会通过gossip同步整个集群的成员信息。

memberlist提供的功能主要分为两块:维护成员状态(gossip)以及数据同步(boardcast、sendreliable)。下面看几个相关接口。

接口

memberlist.create 的入参要求给出相应的 配置 信息, defaultlocalconfig() 给出了通用的配置信息,但还需要实现相关接口来实现成员状态的同步以及用户数据的收发。注意下面有些接口是必选的,有些则可选:

type config struct {
	// ...
	// delegate and events are delegates for receiving and providing
	// data to memberlist via callback mechanisms. for delegate, see
	// the delegate interface. for events, see the eventdelegate interface.
	//
	// the delegateprotocolmin/max are used to guarantee protocol-compatibility
	// for any custom messages that the delegate might do (broadcasts,
	// local/remote state, etc.). if you don't set these, then the protocol
	// versions will just be zero, and version compliance won't be done.
	delegate                delegate
	events                  eventdelegate
	conflict                conflictdelegate
	merge                   mergedelegate
	ping                    pingdelegate
	alive                   alivedelegate
	//...
}

memberlist使用如下 类型 的消息来同步集群状态和处理用户消息:

const (
	pingmsg messagetype = iota
	indirectpingmsg
	ackrespmsg
	suspectmsg
	alivemsg
	deadmsg
	pushpullmsg
	compoundmsg
	usermsg // user mesg, not handled by us
	compressmsg
	encryptmsg
	nackrespmsg
	hascrcmsg
	errmsg
)

delegate

如果要使用memberlist的gossip协议,则必须实现该接口。所有这些方法都必须是线程安全的。

type delegate interface {
	// nodemeta is used to retrieve meta-data about the current node
	// when broadcasting an alive message. it's length is limited to
	// the given byte size. this metadata is available in the node structure.
	nodemeta(limit int) []byte

	// notifymsg is called when a user-data message is received.
	// care should be taken that this method does not block, since doing
	// so would block the entire udp packet receive loop. additionally, the byte
	// slice may be modified after the call returns, so it should be copied if needed
	notifymsg([]byte)

	// getbroadcasts is called when user data messages can be broadcast.
	// it can return a list of buffers to send. each buffer should assume an
	// overhead as provided with a limit on the total byte size allowed.
	// the total byte size of the resulting data to send must not exceed
	// the limit. care should be taken that this method does not block,
	// since doing so would block the entire udp packet receive loop.
	getbroadcasts(overhead, limit int) [][]byte

	// localstate is used for a tcp push/pull. this is sent to
	// the remote side in addition to the membership information. any
	// data can be sent here. see mergeremotestate as well. the `join`
	// boolean indicates this is for a join instead of a push/pull.
	localstate(join bool) []byte

	// mergeremotestate is invoked after a tcp push/pull. this is the
	// state received from the remote side and is the result of the
	// remote side's localstate call. the 'join'
	// boolean indicates this is for a join instead of a push/pull.
	mergeremotestate(buf []byte, join bool)
}

主要方法如下:

  • notifymsg:用于接收用户消息( usermsg )。注意不能阻塞该方法,否则会阻塞整个udp/tcp报文接收循环。此外由于数据可能在方法调用时被修改,因此应该事先拷贝数据。

    该方法用于接收通过udp/tcp方式发送的用户消息( usermsg ):

    注意udp方式并不是立即发送的,它会随gossip周期性发送或在处理 pingmsg 等消息时发送从getbroadcasts获取到的用户消息。

    //使用udp方式将用户消息传输到给定节点,消息大小受限于memberlist的udpbuffersize配置。没有使用gossip机制
    func (m *memberlist) sendbesteffort(to *node, msg []byte) error
    //与sendbesteffort机制相同,只不过一个指定了node,一个指定了node地址
    func (m *memberlist) sendtoaddress(a address, msg []byte) error
    //使用tcp方式将用户消息传输到给定节点,消息没有大小限制。没有使用gossip机制
    func (m *memberlist) sendreliable(to *node, msg []byte) error
  • getbroadcasts:用于在gossip周期性调度或处理处理 pingmsg 等消息时携带用户消息,因此并不是即时的。通常会把需要发送的消息通过 transmitlimitedqueue.queuebroadcast 保存起来,然后在发送时通过 transmitlimitedqueue.getbroadcasts 获取需要发送的消息。见下面 transmitlimitedqueue 的描述。

  • localstate:用于tcp push/pull,用于向远端发送除成员之外的信息(可以发送任意数据),用于定期同步成员状态。参数 join 用于表示将该方法用于join阶段,而非push/pull。

  • mergeremotestate:tcp push/pull之后调用,接收到远端的状态(即远端调用localstate的结果)。参数 join 用于表示将该方法用于join阶段,而非push/pull。

定期(pushpullinterval)调用pushpull来随机执行一次完整的状态交互。但由于pushpull会与其他节点同步本节点的所有状态,因此代价也比较大。

eventdelegate

仅用于接收成员的joining 和leaving通知,可以用于更新本地的成员状态信息。

type eventdelegate interface {
	// notifyjoin is invoked when a node is detected to have joined.
	// the node argument must not be modified.
	notifyjoin(*node)

	// notifyleave is invoked when a node is detected to have left.
	// the node argument must not be modified.
	notifyleave(*node)

	// notifyupdate is invoked when a node is detected to have
	// updated, usually involving the meta data. the node argument
	// must not be modified.
	notifyupdate(*node)
}

channeleventdelegate 实现了简单的 eventdelegate 接口:

type channeleventdelegate struct {
  ch chan<- nodeevent
}

conflictdelegate

用于通知某个client在执行join时产生了命名冲突。通常是因为两个client配置了相同的名称,但使用了不同的地址。可以用于统计错误信息。

type conflictdelegate interface {
	// notifyconflict is invoked when a name conflict is detected
	notifyconflict(existing, other *node)
}

mergedelegate

在集群执行merge操作时调用。 notifymerge 方法的参数 peers 提供了对端成员信息。 可以不实现该接口。

type mergedelegate interface {
	// notifymerge is invoked when a merge could take place.
	// provides a list of the nodes known by the peer. if
	// the return value is non-nil, the merge is canceled.
	notifymerge(peers []*node) error
}

pingdelegate

用于通知观察者完成一个ping消息( pingmsg )要花费多长时间。可以在 notifypingcomplete 中(使用histogram)统计ping的执行时间。

type pingdelegate interface {
	// ackpayload is invoked when an ack is being sent; the returned bytes will be appended to the ack
	ackpayload() []byte
	// notifyping is invoked when an ack for a ping is received
	notifypingcomplete(other *node, rtt time.duration, payload []byte)
}

alivedelegate

当接收到 alivemsg 消息时调用的接口,可以用于添加日志和指标等信息。

type alivedelegate interface {
	// notifyalive is invoked when a message about a live
	// node is received from the network.  returning a non-nil
	// error prevents the node from being considered a peer.
	notifyalive(peer *node) error
}

broadcast

可以随gossip将数据广播到memberlist集群。

// broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
type broadcast interface {
	// invalidates checks if enqueuing the current broadcast
	// invalidates a previous broadcast
	invalidates(b broadcast) bool

	// returns a byte form of the message
	message() []byte

	// finished is invoked when the message will no longer
	// be broadcast, either due to invalidation or to the
	// transmit limit being reached
	finished()
}

broadcast 接口通常作为 transmitlimitedqueue.queuebroadcast 的入参:

func (q *transmitlimitedqueue) queuebroadcast(b broadcast) {
	q.queuebroadcast(b, 0)
}

alertmanager中的实现如下:

type simplebroadcast []byte

func (b simplebroadcast) message() []byte                       { return []byte(b) }
func (b simplebroadcast) invalidates(memberlist.broadcast) bool { return false }
func (b simplebroadcast) finished()

transmitlimitedqueue

transmitlimitedqueue主要用于处理广播消息。有两个主要的方法: queuebroadcast 和 getbroadcasts ,前者用于保存广播消息,后者用于在发送的时候获取需要广播的消息。随gossip周期性调度或在处理 pingmsg 等消息时调用 getbroadcasts 方法。

// transmitlimitedqueue is used to queue messages to broadcast to
// the cluster (via gossip) but limits the number of transmits per
// message. it also prioritizes messages with lower transmit counts
// (hence newer messages).
type transmitlimitedqueue struct {
	// numnodes returns the number of nodes in the cluster. this is
	// used to determine the retransmit count, which is calculated
	// based on the log of this.
	numnodes func() int

	// retransmitmult is the multiplier used to determine the maximum
	// number of retransmissions attempted.
	retransmitmult int

	mu    sync.mutex
	tq    *btree.btree // stores *limitedbroadcast as btree.item
	tm    map[string]*limitedbroadcast
	idgen int64
}

小结

memberlist中的消息分为两种,一种是内部用于同步集群状态的消息,另一种是用户消息。

gossipinterval 周期性调度的有两个方法:

  • gossip :用于同步 alivemsg 、 deadmsg 、 suspectmsg 消息
  • probe :用于使用 pingmsg 消息探测节点状态
// gossipinterval and gossipnodes are used to configure the gossip
	// behavior of memberlist.
	//
	// gossipinterval is the interval between sending messages that need
	// to be gossiped that haven't been able to piggyback on probing messages.
	// if this is set to zero, non-piggyback gossip is disabled. by lowering
	// this value (more frequent) gossip messages are propagated across
	// the cluster more quickly at the expense of increased bandwidth.
	//
	// gossipnodes is the number of random nodes to send gossip messages to
	// per gossipinterval. increasing this number causes the gossip messages
	// to propagate across the cluster more quickly at the expense of
	// increased bandwidth.
	//
	// gossiptothedeadtime is the interval after which a node has died that
	// we will still try to gossip to it. this gives it a chance to refute.
	gossipinterval      time.duration
	gossipnodes         int
	gossiptothedeadtime time.duration

用户消息又分为两种:

  • 周期性同步:
    • 以 pushpullinterval 为周期,使用 delegate.localstate 和 delegate.mergeremotestate 以tcp方式同步用户信息;
    • 使用 delegate.getbroadcasts 随gossip发送用户信息。
  • 主动发送:使用 sendreliable 等方法实现主动发送用户消息。

alertmanager的处理

alertmanager通过两种方式发送用户消息,即udp方式和tcp方式。在alertmanager中,当要发送的数据大于 maxgossippacketsize/2 将采用tcp方式( sendreliable 方法),否则使用udp方式( broadcast 接口)。

func (c *channel) broadcast(b []byte) {
	b, err := proto.marshal(&clusterpb.part{key: c.key, data: b})
	if err != nil {
		return
	}

	if oversizedmessage(b) {
		select {
		case c.msgc <- b: //从c.msgc 接收数据,并使用sendreliable发送
		default:
			level.debug(c.logger).log("msg", "oversized gossip channel full")
			c.oversizegossipmessagedroppedtotal.inc()
		}
	} else {
		c.send(b)
	}
}

func oversizedmessage(b []byte) bool {
	return len(b) > maxgossippacketsize/2
}

demo

这里 实现了一个简单的基于gossip管理集群信息,并通过tcp给集群成员发送信息的例子。

到此这篇关于通过memberlist库实现gossip管理集群以及集群数据交互的文章就介绍到这了,更多相关memberlist库gossip集群内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

《如何通过memberlist库实现gossip管理集群及集群数据交互问题.doc》

下载本文的Word格式文档,以方便收藏与打印。