您现在的位置是:亿华云 > 域名

Go 实现分布式高可用后台:使用 gRPC 实现日志微服务

亿华云2025-10-03 18:17:44【域名】4人已围观

简介掌握了gRPC的基本原理后,我们可以借助它来实现日志的微服务功能。在构建高并发系统时,内部的服务组件通常使用gRPC来实现高效数据传输,因此我们把前面使用json完成的日志服务改成用gRPC来完成。第

掌握了gRPC的实现式高使用实现基本原理后,我们可以借助它来实现日志的分布微服务功能。在构建高并发系统时,可用内部的后台服务组件通常使用gRPC来实现高效数据传输,因此我们把前面使用json完成的日志日志服务改成用gRPC来完成。

第一步还是微服务要定义proto文件,修改proglog/api/v1下面的实现式高使用实现log.proto文件:

Go 实现分布式高可用后台:使用 gRPC 实现日志微服务

syntax = "proto3";

Go 实现分布式高可用后台:使用 gRPC 实现日志微服务

package log.v1;

Go 实现分布式高可用后台:使用 gRPC 实现日志微服务

option go_package = "api/log_v1";

service Log {

rpc Produce(ProduceRequest) returns (ProduceResponse) { }

rpc Consume(ConsumeRequest) returns (ConsumeResponse) { }

rpc ConsumeStream(ConsumeRequest) returns (stream ConsumeResponse){ }

rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse) { }

}

message Record {

bytes value = 1;

uint64 offset = 2;

}

message ProduceRequest {

Record record = 1;

}

message ProduceResponse {

uint64 offset = 1;

}

message ConsumeRequest {

uint64 offset = 1;

}

message ConsumeResponse {

Record record = 2;

}

代码的逻辑跟前面几节我们尝试使用gRPC时的proto文件定义逻辑没什么不同,Produce接口是分布客户端向服务端提交一条日志信息,Consume是可用客户端向服务端提交日志编号,然后服务端返回日志信息,后台ConsumeStream是日志客户端向服务端提交一连串的日志编号,然后服务端返回一连串的微服务日志信息,ProduceStream是实现式高使用实现客户端向服务端提交一连串的日志信息,然后服务端返回日志添加后对应的分布编号。完成上面proto文件定义后,可用将它编译出对应的pb.go文件,这些文件会放置在api/v1/api_log_v1目录下,服务器租用然后我们看看服务端的逻辑设计。

在internal/server下新建server.go文件,首先我们添加依赖模块,同时生成gRPC服务器对象,并注册我们要实现的接口:

package server

import (

"context"

api "api/v1/api/log_v1"

"google.golang.org/grpc"

)

type commitLog interface {

Append(*api.Record) (uint64, error)

Read(uint64) (*api.Record, error)

}

type Config struct { //实现依赖注入

CommitLog commitLog

}

var _ api.LogServer = (*grpcServer)(nil) //gRPC服务器对象

func NewGRPCServer(config *Config) (*grpc.Server, error) {

gsrv := grpc.NewServer()

srv, err := newgrpcServer(config)

if err != nil {

return nil, err

}

api.RegisterLogServer(gsrv, srv)

return gsrv, nil

}

type grpcServer struct {

api.UnimplementedLogServer

*Config

}

func newgrpcServer(config *Config)(srv *grpcServer, err error) {

srv = &grpcServer { //grpcServer会实现proto文件里面定义的接口

Config: config,

}

return srv, nil

}

在上面代码中有一点需要注意,那就是它使用了常用的设计模式叫依赖注入,我们的服务需要使用到日志模块提供的功能,但是我们这里只需要知道日志模块提供的接口,也就是Append和Read,我们不需要关心它的具体实现,这样我们就能实现逻辑上的解耦合,在启动我们的服务程序时,只需要调用者将实现了commitLog接口的实例传给我们即可,至于接口的实现细节我们不需要关心,通过依赖注入这种设计模式能够使得系统设计的复杂度降低,灵活性提升。

接下来就是服务器托管对四个服务接口的实现,其逻辑跟我们前两节做的没有什么区别:

func (s *grpcServer) Produce(ctx context.Context,

req *api.ProduceRequest) (*api.ProduceResponse, error){

//收到客户端发来的日志添加请求,然后调用日志模块Append接口进行添加

offset, err := s.CommitLog.Append(req.Record)

if err != nil {

return nil, err

}

//添加完成后返回日志编号

return &api.ProduceResponse{ Offset: offset}, nil

}

func (s *grpcServer) Consume(ctx context.Context,

req *api.ConsumeRequest)(*api.ConsumeResponse, error) {

//收到客户端发来的日志编号,返回日志内容

record, err := s.CommitLog.Read(req.Offset)

if err != nil {

return nil, err

}

return &api.ConsumeResponse{ Record: record}, nil

}

func (s *grpcServer) ProduceStream (stream api.Log_ProduceStreamServer) error {

for {

//客户端发来一系列日志数据,服务端通过Recv()依次收取,然后将日志进行添加

req, err := stream.Recv()

if err != nil {

return err

}

res, err := s.Produce(stream.Context(), req)

if err != nil {

return err

}

if err = stream.Send(res); err != nil {

return err

}

}

}

func (s *grpcServer) ConsumeStream(req *api.ConsumeRequest, stream api.Log_ConsumeStreamServer) error {

for {

//客户端发来一系列日志编号,服务端返回一系列与编号对应的日志内容

select {

case <-stream.Context().Done():

//进入这里表明客户端终端了连接

return nil

default:

res, err := s.Consume(stream.Context(), req)

switch err.(type){

case nil:

case api.ErrorOffsetOutOfRange:

continue

default:

return err

}

//将获得的日志信息发送给客户端

if err = stream.Send(res); err != nil {

return err

}

req.Offset++

}

}

}

上面代码的实现逻辑与我们前面描述的一模一样,因此没有多少可以探究的地方,最后我们测试一下上面代码的实现,新建server_test.go,添加内容如下:

package server

import (

"context"

"io/ioutil"

"net"

"testing"

"github.com/stretchr/testify/require"

api "api/v1/api/log_v1"

"internal/log"

"google.golang.org/grpc"

)

func TestServer(t *testing.T) {

for scenario, fn := range map[string]func(

t *testing.T,

client api.LogClient,

config *Config,

) {

"produce/consume a meesage to/from the log success": testProduceConsume ,

"produce/consume stream success": testProduceConsumeStream,

"consume past log boundary fails: ": testConsumePastBoundary,

} {

t.Run(scenario, func(t *testing.T) {

//在运行测试用例前先创建服务端对象

client, config, teardown := setupTest(t, nil)

defer teardown() //关闭服务端

fn(t, client, config)

})

}

}

func setupTest(t *testing.T, fn func(*Config)) (client api.LogClient, cfg*Config, teardown func()) {

t.Helper()

//生成tcp连接,使用0意味着使用随机端口

l, err := net.Listen("tcp", ":0")

require.NoError(t, err)

clientOptions := []grpc.DialOption{ grpc.WithInsecure()}

cc, err := grpc.Dial(l.Addr().String() , clientOptions...)

require.NoError(t, err)

dir, err := ioutil.TempDir("", "server-test")

require.NoError(t, err)

clog, err := log.NewLog(dir, log.Config{ })

require.NoError(t, err)

cfg = &Config{

CommitLog: clog,

}

if fn != nil {

fn(cfg)

}

//创建服务端对象

server, err := NewGRPCServer(cfg)

require.NoError(t ,err)

go func() {

//启动服务端

server.Serve(l)

}()

//创建客户端对象

client = api.NewLogClient(cc)

return client, cfg, func() {

server.Stop()

cc.Close()

l.Close()

clog.Remove()

}

}

func testProduceConsume(t *testing.T, client api.LogClient, config*Config) {

ctx := context.Background()

want := &api.Record{

Value: []byte("hello world"),

}

//客户端提交一条日志,然后拿到日志编号后再用于请求日志内容,检验服务端返回的日志内容与提交的是否一致

produce, err := client.Produce(ctx, &api.ProduceRequest{

Record: want,

})

require.NoError(t, err)

consume, err := client.Consume(ctx, &api.ConsumeRequest{

Offset: produce.Offset,

})

require.NoError(t, err)

require.Equal(t, want.Value, consume.Record.Value)

require.Equal(t, want.Offset, consume.Record.Offset)

}

func testConsumePastBoundary(t *testing.T, client api.LogClient, config *Config) {

ctx := context.Background()

produce, err := client.Produce(ctx, &api.ProduceRequest{

Record: &api.Record {

Value: []byte("hello world"),

},

})

//使用不存在的日志编号进行请求,服务端应该返回相应错误

require.NoError(t, err)

consume, err := client.Consume(ctx, &api.ConsumeRequest{

Offset: produce.Offset + 1,

})

if consume != nil {

t.Fatal("consume not nil")

}

got := grpc.Code(err)

want := grpc.Code(api.ErrorOffsetOutOfRange{ }.GRPCStatus().Err())

if got != want {

t.Fatalf("got err: %v, want %v", got, want)

}

}

func testProduceConsumeStream(t *testing.T, client api.LogClient, config *Config) {

ctx := context.Background()

records := []*api.Record{ {

Value: []byte("first message"),

Offset: 0,

},

{

Value: []byte("second message"),

Offset: 0,

},

}

//客户端向服务端提交多个日志,获得多个日志编号,然后再提交获得的编号,从而让服务端返回一系列日志数据

//接着比对服务端返回的日志内容和服务端是高防服务器否一致

{

stream, err := client.ProduceStream(ctx)

require.NoError(t, err)

for offset, record := range records {

err = stream.Send(&api.ProduceRequest{

Record: record,

})

require.NoError(t, err)

res, err := stream.Recv()

require.NoError(t, err)

if res.Offset != uint64(offset) {

t.Fatalf("got offset: %d, want: %d", res.Offset, offset,)

}

}

}

{

stream, err := client.ConsumeStream(ctx, &api.ConsumeRequest{ Offset: 0},)

require.NoError(t, err)

for i, record := range records{

res, err := stream.Recv()

require.NoError(t, err)

require.Equal(t, res.Record, &api.Record{

Value: record.Value,

Offset: uint64(i),

})

}

}

}

测试代码的逻辑通过注释就能理解,在测试用例中,客户端的创建,数据的发送和接收跟我们前面描述的没什么区别,由此我们依靠gRPC框架就完成了日志服务,下一节我们看看gRPC框架提供的数据安全功能。

很赞哦!(1449)