Go で実装しながら gRPC を理解する

Written by @ryysud

Jul 10, 2019 00:00 · 5125 words · 11 minutes read #gRPC

はじめに

gRPC の理解が浅く gRPC を利用するプロダクトの開発で困ったので調べてまとめてみました。

gRPC について

Google が開発した RPC フレームワークで、gRPC を使うと異なる言語で書かれたアプリケーション同士が gRPC により自動生成されたインターフェースを通じて通信することが可能になる。Go で書かれたサーバーに Java で書いたクライアントが接続可能になるイメージ。通信プロトコルには HTTP/2 が使われ、データはバイナリデータでやりとりする仕様。

gRPC の前身は Google 社内で10年以上運用されていた Stubby というもので、多くの機能が標準規格に基づいていないことから長い間 OSS として公開されていなかったが、HTTP/2, QUIC の登場で Stubby の機能の多くが標準化(標準化には Google も参加)されたので、そのタイミングで gPRC として作り直されて 2015年に OSS として公開 された。

gRPC は、同じく Google が開発した Protocol Buffers により型付けされたインターフェース(メソッドの引数や戻り値、やりとりするデータなどを強力に静的型付け)を持っていて、型の定義ファイルをコンパイルすることで色々な言語のコードを自動生成してくれる。

gRPC では Java, Objective-C, Swift でのサポートもされていて、HTTP/2 による通信の効率化もあり、モバイルアプリでの活用も多いとのこと。最近は gRPC-Web でブラウザから gRPC サーバーへの通信が可能になった様子。

gRPC の通信プロトコルには HTTP/2 が使われているので、クライアントとサーバー間のリクエストとレスポンスは 1:1 だけでなく 1:* や *:* などの通信が可能。クライアントからのリクエストで複数のメッセージを投げられて、サーバーから複数メッセージを返すなどの通信が可能になるイメージ。

これもすべて HTTP/2 の機能の1つであるストリーム(同一の TCP 接続でクライアントとサーバーが双方向にデータをやりとり出来る仕組み)のおかげ。

また、型付けされたメッセージの他に key-value(value は複数設定可能)で定義可能なメタデータというものも存在しており、HTTP ヘッダのように扱うことができるとのこと。利用例としては、不正なリクエストかの判断を行うための認証情報や、複数のサービスを横断する処理をトレースする ID などをハンドリングするケースが挙げられるとのこと。

Protocol Buffers について

Protocol Buffers(protobuf)は構造化データをバイト列に変換(シリアライズ/デシリアライズ)するためソフトウェアで、構造化データを変換するためのスキーマ言語という捉え方もできる。

Protocol Buffers でデータのスキーマを定義して自動生成したコードを利用することで、言語やプラットフォームに依存することなく、アプリケーション間で構造化データを簡単に読み書きすることが可能になる。

Go で gRPC を触っていく

grpc.io から提供される Go で gRPC を触るチュートリアルをやっていきます。
https://grpc.io/docs/tutorials/basic/go/

前準備

gRPC では Go 1.6 or higher が必要なのでバージョン確認。

$ go version
go version go1.12.6 darwin/amd64

gRPC のインストール。

$ go get -u google.golang.org/grpc

$ cat $GOPATH/src/google.golang.org/grpc/version.go | grep Version
// Version is the current grpc version.
const Version = "1.23.0-dev"

Protocol Buffers v3 をインストール。

$ brew install protobuf

$ protoc --version
libprotoc 3.7.1

Protocol Buffers の Go プラグインをインストール。

go get -u github.com/golang/protobuf/protoc-gen-go

Protocol Buffers ファイル(.proto)にサービスとメッセージを定義

サービスとそのサービス内に rpc メソッド(サービスメソッドとも呼ばれるっぽい)を定義する前に補足しておくと、gRPC では以下の4種類のサービスメソッドを定義することが可能になっていて、チュートリアルではその4種類全てが網羅されている。

① シンプル RPC

gRPC クライアントからの単一メッセージに gRPC サーバーが単一メッセージを返すもの。(チュートリアルだと GetFeature に相当)

② サーバーサイドストリーミング RPC

gRPC クライアントからの単一メッセージに gRPC サーバーがストリーミングで複数メッセージを返すもの。(チュートリアルだと ListFeatures に相当)

③ クライアントサイドストリーミング RPC

gRPC クライアントからストリーミングで受けた複数メッセージに gRPC サーバーが単一メッセージを返すもの。(チュートリアルだと RecordRoute に相当)

④ 双方向ストリーミング RPC

gRPC クライアントからストリーミングで受けた複数メッセージに gRPC サーバーがストリーミングで複数メッセージを返すもの。(チュートリアルだと RouteChat に相当)

上記を踏まえてサービスとサービスメソッドの定義

チュートリアルでは route_guide.proto に定義を行っている。

「gRPC クライアントからのリクエスト = サービスメソッドの引数」で「gRPC サーバーからのレスポンス = サービスメソッドの返り値」となるので、サービスメソッドでストリーミングを使う場合には、引数や返り値の変数の前に stream と宣言する必要がある。

クライアントサイドストリーミング RPC である RecordRoute だと、クライアントからはストリーミングでメッセージを受けたいので引数に stream を付与している形。

service RouteGuide {
  rpc GetFeature(Point) returns (Feature) {}
  rpc ListFeatures(Rectangle) returns (stream Feature) {}
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

次にメッセージの定義

サービスメソッドで使うメッセージの型を定義する。

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

message Rectangle {
  Point lo = 1;
  Point hi = 2;
}

message Feature {
  string name = 1;
  Point location = 2;
}

message RouteNote {
  Point location = 1;
  string message = 2;
}

message RouteSummary {
  int32 point_count = 1;
  int32 feature_count = 2;
  int32 distance = 3;
  int32 elapsed_time = 4;
}

Protocol Buffers ファイル(.proto)から Go のファイル(.pb.go)を自動生成

Protocol Buffers ファイルをコンパイルすることで、gRPC クライアントコードと gRPC サーバーを実装するインターフェースが含まれる Go のファイルが自動生成される。

protoc コマンドを実行することで route_guide.proto の定義を元に route_guide.pb.go が自動生成される。--proto_path でコンパイル対象の Protocol Buffers ファイルのディレクトリを指定して、--go_out で自動生成する Go のファイルの出力ディレクトリ(gRPC をプラグインとして指定するために plugins=grpc を付与)を指定する仕様。

$ protoc --proto_path routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide

自動生成された Go のファイル(route_guide.pb.go)に含まれるもの

① 定義されたメッセージに対応する構造体

構造体内のデータを操作するメソッド(一部抜粋)も含まれる。

type Point struct {
	Latitude             int32    `protobuf:"varint,1,opt,name=latitude,proto3" json:"latitude,omitempty"`
	Longitude            int32    `protobuf:"varint,2,opt,name=longitude,proto3" json:"longitude,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *Point) GetLatitude() int32 {
	if m != nil {
		return m.Latitude
	}
	return 0
}

func (m *Point) GetLongitude() int32 {
	if m != nil {
		return m.Longitude
	}
	return 0
}

② サービスメソッドを呼び出すための gRPC クライアントコード

以下は、gRPC クライアントのインタフェースと実処理を含むクライアントコードを返すコード。

type RouteGuideClient interface {
	GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error)
	ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error)
	RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error)
	RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)
}

type routeGuideClient struct {
	cc *grpc.ClientConn
}

func NewRouteGuideClient(cc *grpc.ClientConn) RouteGuideClient {
	return &routeGuideClient{cc}
}

以下は、上記のインタフェースの実装でサービスメソッド GetFeature を呼び出すためのメソッド。c.cc.Invoke の第2引数で渡しているのは gRPC のサービスメソッドのエンドポイント?ぽい。なんか少しわかってきた。

func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
	out := new(Feature)
	err := c.cc.Invoke(ctx, "/routeguide.RouteGuide/GetFeature", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

上記は、シンプルRPC なサービスメソッド GetFeature を呼び出すものなので簡素だが、ストリーミングが絡んでくると少し変わってくるので、双方向ストリーミング RPC である RouteChat を呼び出すクライアントコードを見ていく。

双方向ストリーミング RPC である RouteChat を呼び出すクライアントコードなので、サービスメソッドの呼び出し結果(gRPC サーバーからのレスポンス)はストリーミングで受けることになる。

それを踏まえて以下のクライアントコードを確認してみると、サービスメソッドの呼び出し結果である返り値 RouteGuide_RouteChatClient には、ストリーミングでメッセージを送るための Send(gRPC クライアント => gRPC サーバー)、ストリーミングでメッセージを受け取るための Recv(gRPC サーバー => gRPC クライアント)が定義されていることがわかる。

func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
	stream, err := c.cc.NewStream(ctx, &_RouteGuide_serviceDesc.Streams[2], "/routeguide.RouteGuide/RouteChat", opts...)
	if err != nil {
		return nil, err
	}
	x := &routeGuideRouteChatClient{stream}
	return x, nil
}

type RouteGuide_RouteChatClient interface {
	Send(*RouteNote) error
	Recv() (*RouteNote, error)
	grpc.ClientStream
}

type routeGuideRouteChatClient struct {
	grpc.ClientStream
}

func (x *routeGuideRouteChatClient) Send(m *RouteNote) error {
	return x.ClientStream.SendMsg(m)
}

func (x *routeGuideRouteChatClient) Recv() (*RouteNote, error) {
	m := new(RouteNote)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

これらが定義されているので、gRPC サーバーからのレスポンスを gRPC クライアントがストリーミングで受け取ることが可能になる。

③ サービスメソッドを実装するための gRPC サーバー用インタフェース

クライアントコードと異なり、gRPC サーバーは自前で実装する必要があるのでインタフェースのみの定義となっている。

type RouteGuideServer interface {
	GetFeature(context.Context, *Point) (*Feature, error)
	ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error
	RecordRoute(RouteGuide_RecordRouteServer) error
	RouteChat(RouteGuide_RouteChatServer) error
}

シンプル RPC である GetFeature の引数の型は、Protocol Buffers ファイルでの定義から予想できたものだが、ストリーミングが絡むサービスメソッドの引数だけ毛色が違うので、双方向ストリーミング RPC である RouteChat を対象に深掘りしていく。

RouteChat の引数である RouteGuide_RouteChatServer は以下のようになっていて、クライアントコードとは向き先が入れ替わる形で、ストリーミングでメッセージを送るための Send(gRPC サーバー => gRPC クライアント)と、ストリーミングでメッセージを受け取るための Recv(gRPC クライアント => gRPC サーバー)が定義されていることがわかる。

type RouteGuide_RouteChatServer interface {
	Send(*RouteNote) error
	Recv() (*RouteNote, error)
	grpc.ServerStream
}

type routeGuideRouteChatServer struct {
	grpc.ServerStream
}

func (x *routeGuideRouteChatServer) Send(m *RouteNote) error {
	return x.ServerStream.SendMsg(m)
}

func (x *routeGuideRouteChatServer) Recv() (*RouteNote, error) {
	m := new(RouteNote)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

gRPC サーバーの実装

gRPC サーバーの実装に必要な作業は、以下2つ。

① 自動生成された Go のファイル内の gRPC サーバーのインターフェースの実装
② gRPC クライアントからのリクエストを待ち受けて gRPC サーバーにリクエスト流す処理の実装

gRPC サーバーのインターフェースの実装

以下のように実装を行う必要がある。

type routeGuideServer struct {
    ...
}
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    ...
}
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    ...
}
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    ...
}
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    ...
}

全てを追いかけてもあれなので、双方向ストリーミング RPC である RouteChat の実装だけを見ていく。

引数には先述の gRPC サーバーからストリーミングでメッセージを送るための Send とストリーミングでメッセージを受け取るための Recv が実装された RouteGuide_RouteChatServer が使われていて、stream.Recv() でクライアントからストリーミングで流れてくるメッセージを取得して実処理に流し、処理の結果を stream.Send() でクライアントにストリーミングでメッセージを送信していることがわかる。

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		key := serialize(in.Location)

		s.mu.Lock()
		s.routeNotes[key] = append(s.routeNotes[key], in)
		// Note: this copy prevents blocking other clients while serving this one.
		// We don't need to do a deep copy, because elements in the slice are
		// insert-only and never modified.
		rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
		copy(rn, s.routeNotes[key])
		s.mu.Unlock()

		for _, note := range rn {
			if err := stream.Send(note); err != nil {
				return err
			}
		}
	}
}

上記を除くサービスメソッドの実装も、以下の4種類のサービスメソッドそれぞれに合わせて、自動生成された Go のファイルのメソッドを駆使して実装している印象でした。

  • シンプル RPC(GetFeature に相当)
  • サーバーサイドストリーミング RPC(ListFeatures に相当)
  • クライアントサイドストリーミング RPC(RecordRoute に相当)
  • 双方向ストリーミング RPC(RouteChat に相当)

gRPC クライアントからのリクエストを待ち受けて gRPC サーバーにリクエスト流す処理の実装

こちらは単純で、初期化された gRPC サーバーに前のステップで実装したものを登録して起動するだけでした。

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
        log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)

gRPC クライアントの実装

全てを追いかけてもあれなので、双方向ストリーミング RPC である RouteChat にリクエストを投げる実装だけを見ていく。

サービスにリクエストを投げるために、自動生成された Go のコードに含まれるクライアントコードを使って gRPC クライアントを初期化する。

conn, err := grpc.Dial(*serverAddr)
if err != nil {
    ...
}
defer conn.Close()

client := pb.NewRouteGuideClient(conn)

RouteChat にリクエストを投げるためのコードを見てみると、stream.Send() で gRPC サーバーにストリーミングでメッセージを送信して、stream.Recv() で gRPC サーバーからストリーミングで流れてくるメッセージを取得して、実処理に流していることがわかりました。

func runRouteChat(client pb.RouteGuideClient) {
	notes := []*pb.RouteNote{
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.RouteChat(ctx)
	if err != nil {
		log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				// read done.
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("Failed to receive a note : %v", err)
			}
			log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
		}
	}()
	for _, note := range notes {
		if err := stream.Send(note); err != nil {
			log.Fatalf("Failed to send a note: %v", err)
		}
	}
	stream.CloseSend()
	<-waitc
}

さいごに

公式のチュートリアルを通して gRPC の仕組みや実装を理解することができました。これで gRPC を利用するプロダクトの開発にも参加していけそうな気がするので、引き続きやっていきたいと思います。

Protocol Buffers でインタフェースやデータをガッチリと型で縛れたり、ストリーミングで通信するなどは便利で感動しました…。無知ってこわい…。

yak shaving 感がありますが、gRPC を支える HTTP/2 に(ストリーミングのところが特に)興味が出てきたので、空いた時間で深く調べていきたいと思います。

おしまい。

参考資料