
【Flutter】gRPCを使ってAPI通信を実装する【後編】
2022.11.11
はじめに
近々gRPCを使ってAPI通信するFlutterアプリを開発する機会があるので、そこに向けた事前準備&備忘録として対応内容を記録していきます。
本記事では前編記事に引き続き、gPRCの下記4つの通信方式すべてのパターンで実装した内容をまとめます。
- Unary RPC (単一リクエスト, 単一レスポンス)
- Server streaming RPC (単一リクエスト, 複数レスポンス)
- Client streaming RPC (複数リクエスト, 単一レスポンス)
- Bidirectional streaming RPC (複数リクエスト, 複数レスポンス)
環境構築については前編記事にまとめているのでそちらを御覧ください。
またgRPCとは?という部分の説明は省きますので、詳細を知りたい方は公式ドキュメントを御覧ください。
前編はこちら
開発環境
開発環境は以下の通りです。
Dart | 2.18.2 |
Flutter | 3.3.4 |
protoc | 3.21.7 |
Unary RPC (単一リクエスト, 単一レスポンス)
まずは、Unary RPCでの通信を実装してみます。
Unary RPCは単一リクエスト、単一レスポンスなので、下記のようなユースケースで利用できそうです。
- リクエストとレスポンスが時間経過や状態変化で変わることがない、または変更を通知する必要がない
前編記事で実装したHello, worldアプリもUnary RPCでしたが、おさらいということで今回は少し内容を変えて「10進数の数字を2進数へ変換する計算が正しいかどうかを判定する」機能を作成してみます。
サーバー側
サーバー側の準備を行います。
リクエストとして10進数の数字と2進数の数字文字列を受け取り、レスポンスとして正誤結果と正しい2進数の数字文字列を返却するようにします。
まずは、 server/protos に今回作るサービス用のprotoファイルを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | syntax = "proto3"; package decimal_to_binary; service Converter { rpc Convert (ConvertRequest) returns (ConvertResponse) {} } message ConvertRequest { int32 decimal = 1; string binary = 2; } message ConvertResponse { bool isCorrect = 1; string answer = 2; } |
次にgRPCのDart用コードを出力します。
前編同様に共通パッケージの grpc_gen の lib/src 内に出力します。
前編ではsrc直下に出力しましたが、今回は機能ごとにディレクトリを切って出力していきます。
1 2 | $ cd server $ protoc --dart_out=grpc:../grpc_gen/lib/src/decimal_to_binary -Iprotos protos/decimal_to_binary.proto |
利用する側が自動生成されたファイル郡を意識せずに扱えるようにするために、各ファイルのexportも設定します。
grpc_gen/lib/src/decimal_to_binary/decimal_to_binary.dart
1 2 3 4 5 6 | library decimal_to_binary; export 'decimal_to_binary.pb.dart'; export 'decimal_to_binary.pbenum.dart'; export 'decimal_to_binary.pbgrpc.dart'; export 'decimal_to_binary.pbjson.dart'; |
grpc_gen/lib/grpc_gen.dart
1 2 3 4 | library grpc_gen; ... export 'src/decimal_to_binary/decimal_to_binary.dart'; <--追加 |
最後に ConverterServiceBase を継承した ConverterService クラスを lib/converter_serevice.dart として作成し、 bin/server.dart内でサーバーアプリを起動するmain処理のサービス一覧に追加します。
server/lib/converter_service.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import 'package:grpc/grpc.dart'; import 'package:grpc_gen/grpc_gen.dart'; class ConverterService extends ConverterServiceBase { @override Future<ConvertResponse> convert( ServiceCall call, ConvertRequest request) async { // 2進数に変換 final result = request.decimal.toRadixString(2); return ConvertResponse( isCorrect: result == request.binary, answer: result, ); } } |
server/bin/server.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import 'package:grpc/grpc.dart'; import 'package:server/converter_service.dart'; import 'package:server/greeter_service.dart'; Future<void> main(List<String> args) async { final server = Server( [ GreeterService(), ConverterService(), <-- 追加 ], const <Interceptor>[], CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); } |
これでサーバー側の実装および準備は完了です。
アプリ側
サーバー側の準備が完了したので、次はアプリ側の実装を行っていきます。
前編記事にてHello, world機能のあるアプリを作成したので、依存パッケージやサーバーにアクセスするためのチャンネルを生成する Provider などはそのまま流用し、同じアプリ内に機能追加する形で新しい画面を追加していきます。
まずは、入力する10進数と2進数を保持する StateProvider を実装します。
ついでに数字が有効な場合のみ「答え合わせ」ボタンを活性にするために、変換可能かどうかを判定する Provider も作成しておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | /// 10進数を保持するProvider final decimalProvider = StateProvider.autoDispose<String>((ref) { return ''; }); /// 2進数を保持するProvider final binaryProvider = StateProvider.autoDispose<String>((ref) { return ''; }); /// 変換可能かどうかを判定するProvider final convertibleProvider = Provider.autoDispose<bool>((ref) { final decimal = ref.watch(decimalProvider); final binary = ref.watch(binaryProvider); if (decimal.isEmpty || binary.isEmpty) { // 未入力の場合は不可 return false; } final decimalNumber = int.tryParse(decimal); if (decimalNumber == null) { // 入力された10進数が数字でない場合は不可 return false; } // それ以外の場合は可 return true; }); |
次に、入力された10進数と2進数をもとに答え合わせの結果を保持・更新する StateNotiferProvider を実装します。
ボタンが押されたときに check() 、数字が変更されたときには reset() が呼ばれる想定です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | final convertProvider = StateNotifierProvider.autoDispose<ConvertNotifier, ConvertResponse?>((ref) { return ConvertNotifier(channel: ref.watch(channelProvider)); }); class ConvertNotifier extends StateNotifier<ConvertResponse?> { ConvertNotifier({ required ClientChannel channel, }) : _channel = channel, super(null); final ClientChannel _channel; Future<void> check({required int decimal, required String binary}) async { // クライアント作成 final client = ConverterClient(_channel); // サービス実行 final response = await client.convert( ConvertRequest(decimal: decimal, binary: binary), ); // レスポンスの状態を更新 state = response; } void reset() { state = null; } } |
最後に、表示する Widget を作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | class DecimalToBinaryPage extends StatelessWidget { const DecimalToBinaryPage({super.key}); @override Widget build(BuildContext context) { return Scaffold( appBar: AppBar(title: const Text('Unary RPC')), body: SafeArea( child: Padding( padding: const EdgeInsets.symmetric(horizontal: 32), child: Column( mainAxisAlignment: MainAxisAlignment.center, children: [ Row( children: [ Expanded( child: Column( crossAxisAlignment: CrossAxisAlignment.start, children: [ const Text('10進数'), const SizedBox(height: 8), Consumer( builder: (context, ref, child) { return TextField( onChanged: (value) { ref.read(convertProvider.notifier).reset(); ref .read(decimalProvider.notifier) .update((state) => value); }, ); }, ), ], ), ), const Icon(Icons.arrow_right), Expanded( child: Column( crossAxisAlignment: CrossAxisAlignment.start, children: [ const Text('2進数'), const SizedBox(height: 8), Consumer( builder: (context, ref, child) { return TextField( onChanged: (value) { ref.read(convertProvider.notifier).reset(); ref .read(binaryProvider.notifier) .update((state) => value); }, ); }, ), ], ), ), ], ), const SizedBox(height: 16), Consumer( builder: (context, ref, child) { return ElevatedButton( onPressed: ref.watch(convertibleProvider) ? () { ref.read(convertProvider.notifier).check( decimal: int.parse(ref.watch(decimalProvider)), binary: ref.watch(binaryProvider), ); } : null, child: const Text('答え合わせ'), ); }, ), const SizedBox(height: 32), Consumer( builder: (context, ref, child) { final response = ref.watch(convertProvider); if (response == null) { return const SizedBox.shrink(); } return Column( children: [ Text(response.isCorrect ? '正解!' : '不正解!'), Text('答え:${response.answer}'), ], ); }, ) ], ), ), ), ); } } |
以上でアプリ側の実装は完了です。
動作確認
それでは動作確認してみます。
10進数と対応する2進数を入力して「答え合わせ」ボタンをタップすると…
無事答え合わせができました!
Server streaming RPC (単一リクエスト, 複数レスポンス)
次は、Server streaming RPCでの通信を実装していきます。
Server streaming RPCは単一リクエスト、複数レスポンスなので、下記のようなユースケースで利用できそうです。
- リクエストに対するレスポンスが時間経過や状態変化で変化する、かつ変化を監視する必要がある
上記を満たす機能として、「秒間隔をリクエストとして送信し、送信された間隔ごとに時刻を返す」機能を作成してみます。
サーバー側
サーバー側の実装をしていきます。
リクエストとして秒間隔を受け取り、レスポンスとして秒間隔ごとにUNIX時間を返却するものを定義します。
まずは、 server/protos に今回作るサービス用のprotoファイルを作成します。
レスポンスの前にstreamを付与するのがポイントです。
また、UNIX時間は int32 に収まらないので int64 で定義します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | syntax = "proto3"; package time_notification; service TimeNotifier { rpc notifier (TimeNotificationRequest) returns (TimeNotificationResponse) {} } message TimeNotificationRequest { int32 interval = 1; } message TimeNotificationResponse { int32 unixTime = 1; } |
次にgRPCのDart用コードを出力します。
1 2 | $ cd server $ protoc --dart_out=grpc:../grpc_gen/lib/src/time_notification -Iprotos protos/time_notification.proto |
利用する側が自動生成されたファイル郡を意識せずに扱えるようにするために、各ファイルのexportも設定します。
grpc_gen/lib/src/time_notification/time_notification.dart
1 2 3 4 5 6 | library time_notification; export 'time_notification.pb.dart'; export 'time_notification.pbenum.dart'; export 'time_notification.pbgrpc.dart'; export 'time_notification.pbjson.dart'; |
grpc_gen/lib/grpc_gen.dart
1 2 3 4 | library grpc_gen; ... export 'src/time_notification/time_notification.dart'; <--追加 |
最後に TimeNotifierServiceBase を継承した TimeNotifierService クラスを lib/time_notifier_service.dart に作成し、 bin/server.dart 内でサーバーアプリを起動するmain処理のサービス一覧に追加します。
server/lib/converter_service.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | import 'package:grpc/grpc.dart'; import 'package:grpc_gen/grpc_gen.dart'; class TimeNotifierService extends TimeNotifierServiceBase { @override Stream<TimeNotificationResponse> notifier( ServiceCall call, TimeNotificationRequest request) { print('間隔: ${request.interval}秒'); var current = DateTime.now().toUtc(); final timer = Stream<TimeNotificationResponse>.periodic( Duration(seconds: request.interval), ((computationCount) { current = current.add( Duration(seconds: request.interval), ); print('時刻: $current'); return TimeNotificationResponse( unixTime: current.millisecondsSinceEpoch, ); }), ); return timer; } } |
server/bin/server.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | import 'package:grpc/grpc.dart'; import 'package:server/converter_service.dart'; import 'package:server/greeter_service.dart'; Future<void> main(List<String> args) async { final server = Server( [ GreeterService(), ConverterService(), TimeNotifierService(), <-- 追加 ], const <Interceptor>[], CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); } |
これでサーバー側の実装および準備は完了です。
アプリ側
サーバー側の準備が完了したので、次はアプリ側の実装を行っていきます。
まずは、入力した間隔を保持する Provider を実装します。
1 2 3 4 | // 入力した間隔を保持するProvider final intervalProvider = StateProvider.autoDispose<String>((ref) { return ''; }); |
次にサーバーから取得した時間をStreamで受け取る Provider を実装します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /// サーバーから取得した時間をStreamで受け取るProvider final timeProvider = StreamProvider.autoDispose<DateTime>((ref) async* { // サーバーへアクセスするためのチャンネルを取得 final channel = ref.watch(channelProvider); final interval = ref.watch(intervalProvider); if (interval.isEmpty || int.tryParse(interval) == null) { return; } // クライアント作成 final client = TimeNotifierClient(channel); // ResponseStreamを取得 final responseStream = client.notifier( TimeNotificationRequest( interval: int.parse(interval), ), ); await for (final response in responseStream) { // Streamを受け取るごとに、ローカルの現在時刻を流す yield DateTime.fromMillisecondsSinceEpoch(response.unixTime.toInt()) .toLocal(); } }); |
最後に表示する Widget を作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | class TimeNotificationPage extends HookConsumerWidget { const TimeNotificationPage({super.key}); @override Widget build(BuildContext context, WidgetRef ref) { final textController = useTextEditingController( text: ref.watch(intervalProvider), ); return Scaffold( appBar: AppBar(title: const Text('Server streaming RPC')), body: SafeArea( child: Padding( padding: const EdgeInsets.symmetric(horizontal: 32), child: Column( mainAxisAlignment: MainAxisAlignment.center, children: [ Consumer( builder: (context, ref, child) { final time = ref.watch(timeProvider).valueOrNull; if (time == null) { return const SizedBox.shrink(); } return Text('時刻:$time'); }, ), const SizedBox(height: 32), Row( children: [ Expanded( child: TextFormField( controller: textController, ), ), const SizedBox(width: 16), const Text('秒'), const SizedBox(width: 32), Consumer( builder: (context, ref, child) { return ElevatedButton( onPressed: () { print(textController.text); ref .read(intervalProvider.notifier) .update((state) => textController.text); }, child: const Text( '時間取得開始', ), ); }, ), ], ) ], ), ), ), ); } } |
以上でアプリ側の実装は完了です。
動作確認
それでは動作確認をしてみます。
取得したい時間の間隔を入力して「時間取得開始」ボタンをタップすると…
無事指定した間隔ごとに時間が取得できていることが確認できました!
Client streaming RPC (複数リクエスト, 単一レスポンス)
続いては、Client streaming RPCでの通信を実装していきます。
Client streaming RPCは複数リクエスト、単一レスポンスなので、下記のようなユースケースで利用できそうです。
- レスポンスに対するリクエストが時間経過や状態変化で変化する、かつ変化を監視する必要がある
上記を満たす機能として、「aから始まる英単語を5秒以内に何個挙げられるかを計測する」機能を作成してみます。
サーバー側
サーバー側の実装をしていきます。
リクエストとして単語を受け取り、レスポンスとして送られてきた単語の数を返却するものを定義します。
まずは、 server/protos に今回作るサービス用のprotoファイルを作成します。
今回はリクエストの前にstreamを付与します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | syntax = "proto3"; package word_count; service WordCounter { rpc count (stream WordCountRequest) returns (WordCountResponse) {} } message WordCountRequest { string word = 1; } message WordCountResponse { int32 count = 1; } |
次にgRPCのDart用コードを出力します。
1 2 | $ cd server $ protoc --dart_out=grpc:../grpc_gen/lib/src/word_count -Iprotos protos/word_count.proto |
利用する側が自動生成されたファイル郡を意識せずに扱えるようにするために、各ファイルのexportも設定します。
grpc_gen/lib/src/word_count/word_count.dart
1 2 3 4 5 6 | library word_count; export 'word_count.pb.dart'; export 'word_count.pbenum.dart'; export 'word_count.pbgrpc.dart'; export 'word_count.pbjson.dart'; |
grpc_gen/lib/grpc_gen.dart
1 2 3 4 | library grpc_gen; ... export 'src/word_count/word_count.dart'; <-- 追加 |
最後に WordCounterServiceBase を継承した WordCounterService クラスを lib/word_counter_service.dart に作成し、 bin/server.dart 内でサーバーアプリを起動するmain処理のサービス一覧に追加します。
server/lib/word_counter_service.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import 'package:grpc/grpc.dart'; import 'package:grpc_gen/grpc_gen.dart'; class WordCounterService extends WordCounterServiceBase { final words = <String>[]; @override Future<WordCountResponse> count( ServiceCall call, Stream<WordCountRequest> request) async { await for (final req in request) { // リクエストごとにリストに追加 print('単語: ${req.word}'); words.add(req.word); } // aから始まる単語をカウント final count = [...words.where((element) => element.startsWith('a'))].length; // リセット words.clear(); // カウント数を返却 return WordCountResponse(count: count); } } |
server/bin/server.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | import 'package:grpc/grpc.dart'; import 'package:server/converter_service.dart'; import 'package:server/greeter_service.dart'; import 'package:server/time_notifier_service.dart'; import 'package:server/word_counter_service.dart'; Future<void> main(List<String> args) async { final server = Server( [ GreeterService(), ConverterService(), TimeNotifierService(), WordCounterService()<span class="crayon-sy">,</span><span class="crayon-h"> </span><span class="crayon-o"><</span><span class="crayon-o">--</span><span class="crayon-h"> </span><span>追加</span> ], const <Interceptor>[], CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); } |
これでサーバー側の実装および準備は完了です。
アプリ側
サーバー側の準備が完了したので、次はアプリ側の実装を行っていきます。
まずは、入力した単語をStreamに追加してくためのControllerを保持する Provider を実装します。
1 2 3 4 5 | /// 入力した単語をStreamで保持するためのController final wordStreamControllerProvider = StateProvider.autoDispose<StreamController<String>?>((ref) { return; }); |
次に、入力した単語の数を取得する Provider を実装します。
5秒後にstreamに対してclose処理を呼び出し、レスポンスが受け取れるようにしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /// 単語の数を取得するProvider final wordCountProvider = FutureProvider.autoDispose<int?>((ref) async { // サーバーへアクセスするためのチャンネルを取得 final channel = ref.watch(channelProvider); final streamController = ref.watch(wordStreamControllerProvider); if (streamController == null) { return null; } // 5秒後にStreamの終了を通知 Future.delayed(const Duration(seconds: 5)).then( (_) => streamController.close(), ); // クライアント作成 final client = WordCounterClient(channel); final response = await client.count( streamController.stream.map( (word) => WordCountRequest(word: word), ), ); return response.count; }); |
最後に表示する Widget を作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | class WordCountPage extends HookWidget { const WordCountPage({super.key}); @override Widget build(BuildContext context) { final textController = useTextEditingController(); return Scaffold( appBar: AppBar(title: const Text('Client streaming RPC')), body: SafeArea( child: Padding( padding: const EdgeInsets.symmetric(horizontal: 32), child: Center( child: Column( mainAxisAlignment: MainAxisAlignment.center, children: [ Consumer( builder: (context, ref, child) { final count = ref.watch(wordCountProvider).valueOrNull; if (count == null) { return const SizedBox.shrink(); } return Text('結果: $count個'); }, ), const SizedBox(height: 32), Consumer( builder: (context, ref, child) { final streamController = ref.watch(wordStreamControllerProvider); if (streamController == null) { return ElevatedButton( onPressed: () { ref .read(wordStreamControllerProvider.notifier) .update((state) => StreamController()); }, child: const Text('開始!'), ); } final count = ref.watch(wordCountProvider).valueOrNull; if (count != null) { return ElevatedButton( onPressed: () { textController.clear(); ref .read(wordStreamControllerProvider.notifier) .update((state) => null); }, child: const Text('もう一度!'), ); } return TextField( controller: textController, onEditingComplete: () { ref .read(wordStreamControllerProvider) ?.add(textController.text); textController.clear(); }, ); }, ), ], ), ), ), ), ); } } |
以上でアプリ側の実装は完了です。
動作確認
それでは動作確認をしてみます。
「開始!」ボタンをタップして単語を入力していくと…
無事5秒以内に入力した単語数がレスポンスとして返却されるような機能ができました!
Bidirectional streaming RPC (複数リクエスト, 複数レスポンス)
最後は、Bidirectional streaming RPCでの通信を実装していきます。
Client streaming RPCは複数リクエスト、複数レスポンスなので、下記のようなユースケースで利用できそうです。
- リクエストおよびレスポンスが時間経過や状態変化で変化する、かつ変化を監視する必要がある
上記を満たす機能として、複数端末を使って匿名チャットができる機能を作成してみます。
サーバー側
サーバー側の実装をしていきます。
リクエストとしてメッセージを受け取り、レスポンスとしてこれまでのメッセージ一覧(時刻付き)を返却するものを定義します。
まずは、 server/protos に今回作るサービス用のprotoファイルを作成します。
今回はリクエストとレスポンスの前にそれぞれstreamを付与します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | syntax = "proto3"; package chat; service ChatConnecter { rpc connect (stream ChatConnectRequest) returns (stream ChatConnectResponse) {} } message ChatConnectRequest { string message = 1; } message ChatConnectResponse { repeated Message messages = 1; } message Message { string message = 1; int64 unixTime = 2; } |
次にgRPCのDart用コードを出力します。
1 2 | cd server $ protoc --dart_out=grpc:../grpc_gen/lib/src/chat -Iprotos protos/chat.proto |
利用する側が自動生成されたファイル郡を意識せずに扱えるようにするために、各ファイルのexportも設定します。
grpc_gen/lib/src/time_notification/time_notification.dart
1 2 3 4 5 6 | library chat; export 'chat.pb.dart'; export 'chat.pbenum.dart'; export 'chat.pbgrpc.dart'; export 'chat.pbjson.dart'; |
grpc_gen/lib/grpc_gen.dart
1 2 3 4 | library grpc_gen; ... export 'src/chat/chat.dart'; <--追加 |
最後に ChatConnecterServiceBase を継承した ChatConnecterService クラスを lib/chat_connecter_service.dart に作成し、 bin/server.dart 内でサーバーアプリを起動するmain処理のサービス一覧に追加します。
server/lib/chat_connecter_service.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import 'dart:async'; import 'package:fixnum/fixnum.dart'; import 'package:grpc/grpc.dart'; import 'package:grpc_gen/grpc_gen.dart'; class ChatConnecterService extends ChatConnecterServiceBase { final streamController = StreamController<List<Message>>.broadcast(); final messages = <Message>[]; @override Stream<ChatConnectResponse> connect( ServiceCall call, Stream<ChatConnectRequest> request) async* { // 現在のメッセージ一覧を設定 streamController.add(messages); // 接続直後に現在のメッセージ一覧を送信 yield ChatConnectResponse(messages: messages); request.listen( (event) { print('メッセージ: ${event.message}'); messages.add( Message( message: event.message, unixTime: Int64(DateTime.now().millisecondsSinceEpoch), ), ); streamController.add(messages); }, ); // Streamを返却 yield* streamController.stream .map((messages) => ChatConnectResponse(messages: messages)); } } |
server/bin/server.dart
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import 'package:grpc/grpc.dart'; import 'package:server/chat_connecter_service.dart'; import 'package:server/converter_service.dart'; import 'package:server/greeter_service.dart'; import 'package:server/time_notifier_service.dart'; import 'package:server/word_counter_service.dart'; Future<void> main(List<String> args) async { final server = Server( [ GreeterService(), ConverterService(), TimeNotifierService(), WordCounterService(), ChatConnecterService(), <-- 追加 ], const <Interceptor>[], CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); } |
これでサーバー側の実装および準備は完了です。
アプリ側
サーバー側の準備が完了したので、次はアプリ側の実装を行っていきます。
まずは、アプリ側からメッセージを流すためのStreamを管理するControllerを保持する Provider を実装します。
1 2 3 4 5 6 7 | /// メッセージを流すStreamを管理するController final messageStreamControllerProvider = Provider.autoDispose<StreamController<String>>((ref) { final controller = StreamController<String>(); ref.onDispose(controller.close); return controller; }); |
次に取得したメッセージ一覧のStreamを保持する Provider を実装します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | /// 取得したメッセージ一覧 final messagesProvider = StreamProvider.autoDispose<List<Message>>((ref) async* { // サーバーへアクセスするためのチャンネルを取得 final channel = ref.watch(channelProvider); final streamController = ref.watch(messageStreamControllerProvider); // クライアント作成 final client = ChatConnecterClient(channel); // ResponseStreamを取得 final responseStream = client.connect(streamController.stream .map((message) => ChatConnectRequest(message: message))); await for (final response in responseStream) { // Streamが流れるごとにメッセージ一覧を返却 yield response.messages; } }); |
最後に表示する Widget を作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | class ChatPage extends HookWidget { const ChatPage({super.key}); @override Widget build(BuildContext context) { final textController = useTextEditingController(); return Scaffold( appBar: AppBar(title: const Text('Bidirectional streaming RPC')), body: SafeArea( child: Padding( padding: const EdgeInsets.symmetric(horizontal: 32), child: Column( children: [ Expanded( child: Consumer(builder: (context, ref, child) { final messages = ref.watch(messagesProvider).valueOrNull ?? []; return ListView.separated( itemBuilder: ((context, index) { final message = messages[index]; return Column( crossAxisAlignment: CrossAxisAlignment.start, children: [ Row( children: [ const Expanded(child: Text('匿名さん')), Text( '${DateTime.fromMillisecondsSinceEpoch(message.unixTime.toInt())}', ), ], ), Text(message.message), ], ); }), separatorBuilder: ((context, index) { return const Padding( padding: EdgeInsets.symmetric(vertical: 8.0), child: Divider(), ); }), itemCount: messages.length, ); }), ), const Divider( color: Colors.blue, ), Consumer( builder: (context, ref, child) { return Row( children: [ Expanded( child: TextField( controller: textController, ), ), ElevatedButton( onPressed: () { ref .read(messageStreamControllerProvider) .add(textController.text); textController.clear(); }, child: const Text('送信'), ), ], ); }, ), ], ), ), ), ); } } |
以上でアプリ側の実装は完了です。
動作確認
それでは動作確認をしてみます。
それぞれの端末でメッセージを送信してみると…
それぞれの端末でメッセージの送受信が確認できました!
まとめ
本記事ではgRPCの下記4つの通信方式を使ってFlutterアプリ内でAPI通信を行う実装をまとめました。
- Unary RPC (単一リクエスト, 単一レスポンス)
- Server streaming RPC (単一リクエスト, 複数レスポンス)
- Client streaming RPC (複数リクエスト, 単一レスポンス)
- Bidirectional streaming RPC (複数リクエスト, 複数レスポンス)
各機能がシンプルなものだったのもありますが、Streamを使った機能がサクッと実装できました。
gRPCをFlutterアプリで導入しようと考えている方の参考になれば幸いです。
書いた人はこんな人

- 「好きを仕事にするエンジニア集団」の(株)ライトコードです!
ライトコードは、福岡、東京、大阪の3拠点で事業展開するIT企業です。
現在は、国内を代表する大手IT企業を取引先にもち、ITシステムの受託事業が中心。
いずれも直取引で、月間PV数1億を超えるWebサービスのシステム開発・運営、インフラの構築・運用に携わっています。
システム開発依頼・お見積もり大歓迎!
また、現在「WEBエンジニア」「モバイルエンジニア」「営業」「WEBデザイナー」「WEBディレクター」を積極採用中です!
インターンや新卒採用も行っております。
以下よりご応募をお待ちしております!
https://rightcode.co.jp/recruit
ライトコードの日常12月 1, 2023ライトコードクエスト〜東京オフィス歴史編〜
ITエンタメ10月 13, 2023Netflixの成功はレコメンドエンジン?
ライトコードの日常8月 30, 2023退職者の最終出社日に密着してみた!
ITエンタメ8月 3, 2023世界初の量産型ポータブルコンピュータを開発したのに倒産!?アダム・オズボーン