彻底搞懂gRPC负载均衡

彻底搞懂gRPC负载均衡

  1. code
  2. in 5 hours
  3. 16 min read

通过编写案例,研究gRPC的负载均衡策略,包括默认的 pick_first、手动配置的round_robin轮询、还有手动实现的加权轮询策略。

什么是负载均衡

负载均衡的目的:

  • 分布式请求: 将客户端请求分散到多个后端服务器。
  • 防止过载: 避免单一服务器压力过大。
  • 可伸缩性: 系统可以通过增加服务器来横向扩展。

负载均衡的职责:

  • 输入:从名称解析器 (Name Resolver) 获取一个服务器 IP 地址列表。
  • 维护连接 (Subchannels): 负责客户端与这些服务器地址建立和维护 gRPC 连接(称为子通道 Subchannel)。
  • 选择连接策略 (Picker): 当一个 RPC 被发送时,负载均衡策略的 Picker (选择器) 决定使用哪个子通道 (即哪个后端服务器) 来发送这个 RPC。

gRPC负载均衡

gRPC 的一个关键特性是负载均衡,它允许将来自客户端的请求分布到多个服务器上。这有助于防止任何一台服务器过载,并允许系统通过添加更多服务器来扩展。

gRPC 负载均衡策略由名称解析器(name resolver)提供一组服务器IP地址列表。该策略负责维护与服务器的连接(子通道),并在发送 RPC 时选择要使用的连接。

默认情况下,gRPC将使用 pick_first 策略。此策略实际上不进行负载均衡,而只是尝试从名称解析器获取的每个地址,并使用第一个可以连接的地址。通过更新 gRPC 服务配置,还可以切换到 round_robin 策略,该策略会连接到它获取的每个地址,并在每个 RPC 连接的后端之间轮流进行连接。还有一些其他可用的负载均衡策略,但具体设置因语言而异。如果内置策略无法满足需求,还可以实现自己的自定义策略。

接下来我们将通过具体的example来了解pick_first、round_robin和自定义gRPC负载均衡策略。 代码结构如下

grpc-lb-demo/
├── go.mod
├── proto/
│   ├── greet.pb.go
│   ├── greet.proto
│   └── greet_grpc.pb.go
├── server/
│   └── server.go
├── client_pick_first/
│   └── client.go
├── client_round_robin/
│   └── client.go
├── client_custom_lb/
│   ├── client.go
│   └── weighted_round_robin_lb/
│       └── weighted_round_robin_lb.go
|       └── weighted_resolver.go
└── resolver/
    └── resolver.go

pick_first

首先我们测试默认的pick_first策略。通过测试,我们需要知道客户端是否会自动选择一个可用的地址,并且后续的请求一直使用该地址。

proto实现

该proto定义连个客户端请求的消息体:EchoRequest,服务端响应的消息体:EchoResponse,以及一个服务端需要实现的Echo方法。

syntax = "proto3";

option go_package = "./proto";

package proto;

message EchoRequest {
  string message = 1;
}

message EchoResponse {
  string message = 1;
}

service Echo {

  rpc Echo(EchoRequest) returns (EchoResponse) {}
}

使用protoc生成对应的go代码。

protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    proto/greet.proto

Server端

server端我们实现如下代码,我们启动两个服务端口50051和50052,通过在server struct里面传入一个addr用来判断是哪个服务端口处理的请求,通过将这个addr返回给客户端,我们就能知道客户端请求的哪个地址了。Echo方法正是用来响应客户端消息的。

具体代码如下:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"

	pb "grpc-lb-example/proto"

	"google.golang.org/grpc"
)

var addrs = []string{"localhost:50050", "localhost:50051"}

type server struct {
	pb.UnimplementedEchoServer
	addr string
}

func (s *server) Echo(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
}

func main() {
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
	for _, addr := range addrs {
		go startServer(addr)
	}

	<-stop
	log.Println("Shutting down servers...")

}

func startServer(addr string) {
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	log.Printf("Server listening at %v", lis.Addr())

	s := grpc.NewServer()
	pb.RegisterEchoServer(s, &server{addr: addr})

	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

解析器实现

客户端想要获取多个地址,需要实现一个名称解析。这里写了三个地址backendAddr1、backendAddr2、backendAddr3。这个三个地址只有backendAddr2和backendAddr3可用。我们看一下client端会不会自动选择出这个可用的地址连接Server,并且只使用其中一个。

package custom_resolver

import (
	"log"

	"google.golang.org/grpc/resolver"
)

const (
	myScheme      = "example"
	myServiceName = "my-custom-service:1234"
	backendAddr1  = "127.0.0.1:1"
	backendAddr2  = "127.0.0.1:50051"
	backendAddr3  = "127.0.0.1:50052"
)

type myResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
}

type myResolverBuilder struct{}

func (*myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
	r := &myResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			myServiceName: {backendAddr1, backendAddr2, backendAddr3},
		},
	}
	r.start()
	return r, nil
}

func (*myResolverBuilder) Scheme() string { return myScheme }

func (r *myResolver) start() {
	addrStrs := r.addrsStore[r.target.Endpoint()]
	addrs := make([]resolver.Address, len(addrStrs))
	for i, s := range addrStrs {
		addrs[i] = resolver.Address{Addr: s}
	}
	err := r.cc.UpdateState(resolver.State{Addresses: addrs})
	if err != nil {
		log.Fatalf("UpdateState failed: %v", err)
	}
}

func (r *myResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (r *myResolver) Close()                                {}

func init() {
	resolver.Register(&myResolverBuilder{})
}

client端实现

client传入的地址就是解析器里的地址

package main

import (
	"context"
	"log"
	"time"

	pb "grpc-lb-example/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	// 重要:导入自定义解析器包,这会执行其 init() 函数来注册解析器
	_ "grpc-lb-example/resolver"
)

const (
	address    = "example:///my-custom-service:1234"
	clientName = "Colin"
)

func main() {
	log.Println("Client: Dialing with custom resolver address:", address)

	conn, err := grpc.NewClient(
		address,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalf("Client: Did not connect: %v", err)
	}
	defer conn.Close()
	log.Println("Client: Connected successfully!")

	c := pb.NewEchoClient(conn)

	for range 10 {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()

		r, err := c.Echo(ctx, &pb.EchoRequest{Message: clientName})
		if err != nil {
			log.Fatalf("Client: Could not greet: %v", err)
		}
		log.Printf("Client: Echo from server: %s", r.GetMessage())
		time.Sleep(500 * time.Millisecond)
	}
}

测试连接

我们运行测试,client跳过了backendAddr1,连上了backendAddr2

[root@debian client]# go run client.go
2025/05/18 17:22:55 Client: Dialing with custom resolver address: example:///my-custom-service:1234
2025/05/18 17:22:55 Client: Connected successfully!
2025/05/18 17:22:55 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:56 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:56 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:57 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:57 Client: Echo from server: Colin (from :50051)

当我们将resolve里的地址顺序进行调换。将backendAddr2和backendAddr3的值进行调换。

const (
	myScheme      = "example"
	myServiceName = "my-custom-service:1234"
	backendAddr1  = "127.0.0.1:1"
	backendAddr2  = "127.0.0.1:50052"
	backendAddr3  = "127.0.0.1:50051"
)

再次运行

[root@debian client]# go run client.go
2025/05/18 17:27:05 Client: Dialing with custom resolver address: example:///my-custom-service:1234
2025/05/18 17:27:05 Client: Connected successfully!
2025/05/18 17:27:05 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:05 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:06 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:06 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:07 Client: Echo from server: Colin (from :50052)

这次连上的还是backendAddr2,说明pick_first策略是按照顺序连接第一个可用的地址,并且后续一直使用这个地址。

round_robin

测试round_robin我们只需要在client代码添加一条策略即可

conn, err := grpc.NewClient(
		address,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
    //这里增加ound_robin
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
	)

再次运行

[root@debian client]# go run client.go
2025/05/18 17:30:56 Client: Dialing with custom resolver address: example:///my-custom-service:1234
2025/05/18 17:30:56 Client: Connected successfully!
2025/05/18 17:30:56 Client: Echo from server: Colin (from :50051)
2025/05/18 17:30:57 Client: Echo from server: Colin (from :50052)
2025/05/18 17:30:57 Client: Echo from server: Colin (from :50051)
2025/05/18 17:30:58 Client: Echo from server: Colin (from :50052)
2025/05/18 17:30:58 Client: Echo from server: Colin (from :50051)
2025/05/18 17:30:59 Client: Echo from server: Colin (from :50052)
2025/05/18 17:30:59 Client: Echo from server: Colin (from :50051)
2025/05/18 17:31:00 Client: Echo from server: Colin (from :50052)
2025/05/18 17:31:00 Client: Echo from server: Colin (from :50051)
2025/05/18 17:31:01 Client: Echo from server: Colin (from :50052)

发现现在client是轮询了可用的server地址,这就是轮询地址了。

自定义负载均衡

轮询策略roud_robin不能满足因服务器配置不同而承担不同负载量。接下来介绍如何实现加权随机法的自定义负载均衡策略。

加权随机法可以根据服务器的处理能力而分配不同的权重,从而实现处理能力高的服务器可承担更多的请求,处理能力低的服务器少承担请求。

resolver实现

我们需要先实现名称解析,与之前的解析器不同的是,需要为每个地址添加元数据,这个元数据就是权重,这里设置backendAddr2的权重为3,backendAddr1的权重为1。如果配置正确,请求比例讲师1:3。

package weighted_round_robin_lb

import (
	"log"

	"google.golang.org/grpc/attributes"
	"google.golang.org/grpc/resolver"
)

func init() {
	// 注册带权重的解析器
	resolver.Register(&weightedResolverBuilder{})
}

type AddrMetadata struct {
	Weight int
}

const (
	myScheme      = "example"
	myServiceName = "my-custom-service:1234"
	backendAddr1  = "127.0.0.1:50051"
	backendAddr2  = "127.0.0.1:50052"
	backendAddr3  = "127.0.0.1:50053"
)

// 权重定义
var weightMap = map[string]int{
	backendAddr1: 1, //权重定为1
	backendAddr2: 3, //权重定为3
	backendAddr3: 0,
}

type weightedResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
}

type weightedResolverBuilder struct{}

// RegisterResolver 注册加权解析器
func RegisterResolver() {
	resolver.Register(&weightedResolverBuilder{})
}

func (*weightedResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
	r := &weightedResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			myServiceName: {backendAddr1, backendAddr2, backendAddr3},
		},
	}
	r.start()
	return r, nil
}

func (*weightedResolverBuilder) Scheme() string { return myScheme }

func (r *weightedResolver) start() {
	addrStrs := r.addrsStore[r.target.Endpoint()]
	addrs := make([]resolver.Address, len(addrStrs))
	for i, s := range addrStrs {
		// 获取权重
		weight := weightMap[s]
		// 创建权重元数据
		meta := AddrMetadata{Weight: weight}
		// 创建属性
		attrs := attributes.New(AddrMetadata{}, meta)
		// 创建地址+属性
		addrs[i] = resolver.Address{
			Addr:       s,
			Attributes: attrs,
		}
		log.Printf("加权解析器: 添加服务地址 %s 权重 %d, Attributes: %v", s, weight, attrs)
	}

	err := r.cc.UpdateState(resolver.State{Addresses: addrs})
	if err != nil {
		log.Fatalf("UpdateState失败: %v", err)
	}
}

func (r *weightedResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (r *weightedResolver) Close()                                {}

balancer实现

  1. 在init方法中使用 balancer.Register() ,将其注册到 gRPC 全局的负载均衡器构建器注册表中。之后,当 gRPC 客户端的 Service Config 指定使用名为 “weighted_round_robin” 的策略时,gRPC 就会调用这个已注册的 Builder 的 Build 方法来创建一个负载均衡器实例。

  2. Register需要传入 balancer.Builder实例。base 包提供的 NewBalancerBuilder 辅助函数。这个函数极大地简化了 balancer.Builder 接口的实现。

  3. NewBalancerBuilder方法需要传入PickerBuilder,需要实现wrrPickerBuilder接口。当 gRPC 需要一个新的 Picker 时,这个实例就会被调用。在这个实例的Picker方法中,我们实现了平滑加权轮询 (Smooth Weighted Round-Robin) 算法。

package weighted_round_robin_lb

import (
	"log"
	"math"
	"sync"

	"google.golang.org/grpc/balancer"
	"google.golang.org/grpc/balancer/base"
)

const (
	Name          = "weighted_round_robin"
	DefaultWeight = 1
)

func init() {
	log.Println("WeightedRoundRobinLB: Registering balancer builder with name:", Name)
	balancer.Register(newBuilder())
}

func newBuilder() balancer.Builder {
	return base.NewBalancerBuilder(Name, &wrrPickerBuilder{}, base.Config{HealthCheck: false})
}

type wrrPickerBuilder struct{}

func (pb *wrrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
	log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): Build called. Have %d Ready SubConns.", len(info.ReadySCs))

	if len(info.ReadySCs) == 0 {
		log.Println("WeightedRoundRobinLB (wrrPickerBuilder): No ready SubConns. Returning ErrNoSubConnAvailable.")
		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
	}

	var subConns []*weightedSubConn
	totalWeight := 0

	for sc, scInfo := range info.ReadySCs {
		weight := DefaultWeight
		var fetchedMetadata AddrMetadata
		var metadataOk bool
		if scInfo.Address.Attributes != nil {
			if val := scInfo.Address.Attributes.Value((AddrMetadata{})); val != nil {
				fetchedMetadata, metadataOk = val.(AddrMetadata)
			}
		}

		if metadataOk {
			log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, successfully retrieved AddrMetadata: %+v", scInfo.Address.Addr, fetchedMetadata)
			if fetchedMetadata.Weight > 0 {
				weight = fetchedMetadata.Weight
				log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, using metadata weight: %d", scInfo.Address.Addr, weight)
			} else {
				log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, metadata weight is %d (<=0). Using default weight: %d", scInfo.Address.Addr, fetchedMetadata.Weight, DefaultWeight)
			}
		} else {
			// Log why metadata wasn't used
			if scInfo.Address.Attributes == nil {
				log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, no attributes found. Using default weight: %d", scInfo.Address.Addr, DefaultWeight)
			} else {
				val := scInfo.Address.Attributes.Value(AddrMetadata{})
				if val == nil {
					log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, attributes present, but key 'AddrMetadata{}' (or its equivalent) not found. Using default weight: %d", scInfo.Address.Addr, DefaultWeight)
				} else {
					log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, attributes present and key 'AddrMetadata{}' (or its equivalent) found, but value is of WRONG TYPE (got %T, expected AddrMetadata). Using default weight: %d", scInfo.Address.Addr, val, DefaultWeight)
				}
			}
		}

		subConns = append(subConns, &weightedSubConn{
			sc:            sc,
			address:       scInfo.Address.Addr,
			weight:        weight,
			currentWeight: 0, // Initialized to 0 as per smooth WRR
		})
		totalWeight += weight
	}

	return &wrrPicker{
		subConns:    subConns,
		totalWeight: totalWeight,
	}
}

// weightedSubConn holds a SubConn and its associated weight information for WRR.
type weightedSubConn struct {
	sc            balancer.SubConn
	address       string
	weight        int // Static weight assigned to this SubConn
	currentWeight int // Dynamic weight, updated during picking
}

// wrrPicker is a Picker that implements the weighted round-robin algorithm.
type wrrPicker struct {
	subConns    []*weightedSubConn
	mu          sync.Mutex
	totalWeight int
}

// Pick implements the Picker interface.
func (p *wrrPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if len(p.subConns) == 0 {
		// log.Println("WeightedRoundRobinLB (wrrPicker): Pick - No SubConns available. Returning ErrNoSubConnAvailable.")
		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
	}

	var bestSc *weightedSubConn
	maxCurrentWeight := math.MinInt

	for _, wsc := range p.subConns {
		wsc.currentWeight += wsc.weight
		if wsc.currentWeight > maxCurrentWeight {
			maxCurrentWeight = wsc.currentWeight
			bestSc = wsc
		}
	}

	if bestSc == nil {
		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
	}

	bestSc.currentWeight -= p.totalWeight
	return balancer.PickResult{SubConn: bestSc.sc, Done: nil}, nil
}

client实现

最后在client中,调用weighted_round_robin策略即可

package main

import (
	"context"
	"log"
	"time"

	_ "grpc-lb-example/client_custom_lb/weighted_round_robin_lb"
	pb "grpc-lb-example/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

const (
	address    = "example:///my-custom-service:1234"
	clientName = "Colin"
)

func main() {
	log.Println("客户端: 使用加权轮询负载均衡器连接服务:", address)

	// 创建gRPC连接,指定使用加权轮询负载均衡策略
	conn, err := grpc.NewClient(
		address,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"weighted_round_robin"}`),
	)
	if err != nil {
		log.Fatalf("客户端: 连接失败: %v", err)
	}
	defer conn.Close()
	log.Println("客户端: 连接成功!")

	// 创建gRPC客户端
	c := pb.NewEchoClient(conn)

	// 发送10次请求
	for range 10 {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()

		r, err := c.Echo(ctx, &pb.EchoRequest{Message: clientName})
		if err != nil {
			log.Fatalf("Client: Could not greet: %v", err)
		}
		log.Printf("Client: Echo from server: %s", r.GetMessage())
		time.Sleep(500 * time.Millisecond)
	}
}

测试结果

最后运行client测试。 50051: 50052 = 1: 3。符合我们的加权比例

2025/05/19 12:57:06 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:06 Client: Echo from server: Colin (from localhost:50051)
2025/05/19 12:57:07 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:07 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:08 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:08 Client: Echo from server: Colin (from localhost:50051)
2025/05/19 12:57:09 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:09 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:10 Client: Echo from server: Colin (from localhost:50052)

自定义负载均衡策略完整代码在:

参考

Code Golang gRPC Go LB Load Balancing 负载均衡