1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <condition_variable>
20 #include <iostream>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <vector>
25
26 #include "caching_interceptor.h"
27
28 #include <grpcpp/grpcpp.h>
29
30 #ifdef BAZEL_BUILD
31 #include "examples/protos/keyvaluestore.grpc.pb.h"
32 #else
33 #include "keyvaluestore.grpc.pb.h"
34 #endif
35
36 using grpc::Channel;
37 using grpc::ClientContext;
38 using grpc::Status;
39 using keyvaluestore::KeyValueStore;
40 using keyvaluestore::Request;
41 using keyvaluestore::Response;
42
43 // Requests each key in the vector and displays the key and its corresponding
44 // value as a pair.
45 class KeyValueStoreClient : public grpc::ClientBidiReactor<Request, Response> {
46 public:
KeyValueStoreClient(std::shared_ptr<Channel> channel,std::vector<std::string> keys)47 KeyValueStoreClient(std::shared_ptr<Channel> channel,
48 std::vector<std::string> keys)
49 : stub_(KeyValueStore::NewStub(channel)), keys_(std::move(keys)) {
50 stub_->async()->GetValues(&context_, this);
51 assert(!keys_.empty());
52 request_.set_key(keys_[0]);
53 StartWrite(&request_);
54 StartCall();
55 }
56
OnReadDone(bool ok)57 void OnReadDone(bool ok) override {
58 if (ok) {
59 std::cout << request_.key() << " : " << response_.value() << std::endl;
60 if (++counter_ < keys_.size()) {
61 request_.set_key(keys_[counter_]);
62 StartWrite(&request_);
63 } else {
64 StartWritesDone();
65 }
66 }
67 }
68
OnWriteDone(bool ok)69 void OnWriteDone(bool ok) override {
70 if (ok) {
71 StartRead(&response_);
72 }
73 }
74
OnDone(const grpc::Status & status)75 void OnDone(const grpc::Status& status) override {
76 if (!status.ok()) {
77 std::cout << status.error_code() << ": " << status.error_message()
78 << std::endl;
79 std::cout << "RPC failed";
80 }
81 std::unique_lock<std::mutex> l(mu_);
82 done_ = true;
83 cv_.notify_all();
84 }
85
Await()86 void Await() {
87 std::unique_lock<std::mutex> l(mu_);
88 while (!done_) {
89 cv_.wait(l);
90 }
91 }
92
93 private:
94 std::unique_ptr<KeyValueStore::Stub> stub_;
95 std::vector<std::string> keys_;
96 size_t counter_ = 0;
97 ClientContext context_;
98 bool done_ = false;
99 Request request_;
100 Response response_;
101 std::mutex mu_;
102 std::condition_variable cv_;
103 };
104
main(int argc,char ** argv)105 int main(int argc, char** argv) {
106 // Instantiate the client. It requires a channel, out of which the actual RPCs
107 // are created. This channel models a connection to an endpoint (in this case,
108 // localhost at port 50051). We indicate that the channel isn't authenticated
109 // (use of InsecureChannelCredentials()).
110 // In this example, we are using a cache which has been added in as an
111 // interceptor.
112 grpc::ChannelArguments args;
113 std::vector<
114 std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
115 interceptor_creators;
116 interceptor_creators.push_back(std::make_unique<CachingInterceptorFactory>());
117 auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
118 "localhost:50051", grpc::InsecureChannelCredentials(), args,
119 std::move(interceptor_creators));
120 std::vector<std::string> keys = {"key1", "key2", "key3", "key4",
121 "key5", "key1", "key2", "key4"};
122 KeyValueStoreClient client(channel, keys);
123 client.Await();
124 return 0;
125 }
126