xref: /aosp_15_r20/external/pigweed/pw_transfer/client_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/client.h"
16 
17 #include <cstring>
18 
19 #include "pw_assert/check.h"
20 #include "pw_bytes/array.h"
21 #include "pw_rpc/raw/client_testing.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_status/status.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread_stl/options.h"
26 #include "pw_transfer/internal/config.h"
27 #include "pw_transfer_private/chunk_testing.h"
28 #include "pw_unit_test/framework.h"
29 
30 namespace pw::transfer::test {
31 namespace {
32 
33 using internal::Chunk;
34 using pw_rpc::raw::Transfer;
35 
36 using namespace std::chrono_literals;
37 
TransferThreadOptions()38 thread::Options& TransferThreadOptions() {
39   static thread::stl::Options options;
40   return options;
41 }
42 
43 class ReadTransfer : public ::testing::Test {
44  protected:
ReadTransfer(size_t max_bytes_to_receive=0)45   ReadTransfer(size_t max_bytes_to_receive = 0)
46       : transfer_thread_(chunk_buffer_, encode_buffer_),
47         legacy_client_(context_.client(),
48                        context_.channel().id(),
49                        transfer_thread_,
50                        max_bytes_to_receive > 0
51                            ? max_bytes_to_receive
52                            : transfer_thread_.max_chunk_size()),
53         client_(context_.client(),
54                 context_.channel().id(),
55                 transfer_thread_,
56                 max_bytes_to_receive > 0 ? max_bytes_to_receive
57                                          : transfer_thread_.max_chunk_size()),
58         system_thread_(TransferThreadOptions(), transfer_thread_) {
59     legacy_client_.set_protocol_version(ProtocolVersion::kLegacy);
60   }
61 
~ReadTransfer()62   ~ReadTransfer() override {
63     transfer_thread_.Terminate();
64     system_thread_.join();
65   }
66 
67   rpc::RawClientTestContext<> context_;
68 
69   Thread<1, 1> transfer_thread_;
70   Client legacy_client_;
71   Client client_;
72 
73   std::array<std::byte, 64> chunk_buffer_;
74   std::array<std::byte, 64> encode_buffer_;
75 
76   pw::Thread system_thread_;
77 };
78 
__anonfefc08470202(size_t i) 79 constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
__anonfefc08470302(size_t i) 80 constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
__anonfefc08470402(size_t i) 81 constexpr auto kData256 = bytes::Initialized<256>([](size_t i) { return i; });
82 
TEST_F(ReadTransfer,SingleChunk)83 TEST_F(ReadTransfer, SingleChunk) {
84   stream::MemoryWriterBuffer<64> writer;
85   Status transfer_status = Status::Unknown();
86 
87   ASSERT_EQ(
88       OkStatus(),
89       legacy_client_
90           .Read(3,
91                 writer,
92                 [&transfer_status](Status status) { transfer_status = status; })
93           .status());
94 
95   transfer_thread_.WaitUntilEventIsProcessed();
96 
97   // First transfer parameters chunk is sent.
98   rpc::PayloadsView payloads =
99       context_.output().payloads<Transfer::Read>(context_.channel().id());
100   ASSERT_EQ(payloads.size(), 1u);
101   EXPECT_EQ(transfer_status, Status::Unknown());
102 
103   Chunk c0 = DecodeChunk(payloads[0]);
104   EXPECT_EQ(c0.session_id(), 3u);
105   EXPECT_EQ(c0.resource_id(), 3u);
106   EXPECT_EQ(c0.offset(), 0u);
107   EXPECT_EQ(c0.window_end_offset(), 37u);
108   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
109 
110   context_.server().SendServerStream<Transfer::Read>(
111       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
112                       .set_session_id(3)
113                       .set_offset(0)
114                       .set_payload(kData32)
115                       .set_remaining_bytes(0)));
116   transfer_thread_.WaitUntilEventIsProcessed();
117 
118   ASSERT_EQ(payloads.size(), 2u);
119 
120   Chunk c1 = DecodeChunk(payloads.back());
121   EXPECT_EQ(c1.session_id(), 3u);
122   ASSERT_TRUE(c1.status().has_value());
123   EXPECT_EQ(c1.status().value(), OkStatus());
124 
125   EXPECT_EQ(transfer_status, OkStatus());
126   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
127             0);
128 }
129 
TEST_F(ReadTransfer,MultiChunk)130 TEST_F(ReadTransfer, MultiChunk) {
131   stream::MemoryWriterBuffer<64> writer;
132   Status transfer_status = Status::Unknown();
133 
134   ASSERT_EQ(
135       OkStatus(),
136       legacy_client_
137           .Read(4,
138                 writer,
139                 [&transfer_status](Status status) { transfer_status = status; })
140           .status());
141 
142   transfer_thread_.WaitUntilEventIsProcessed();
143 
144   // First transfer parameters chunk is sent.
145   rpc::PayloadsView payloads =
146       context_.output().payloads<Transfer::Read>(context_.channel().id());
147   ASSERT_EQ(payloads.size(), 1u);
148   EXPECT_EQ(transfer_status, Status::Unknown());
149 
150   Chunk c0 = DecodeChunk(payloads[0]);
151   EXPECT_EQ(c0.session_id(), 4u);
152   EXPECT_EQ(c0.resource_id(), 4u);
153   EXPECT_EQ(c0.offset(), 0u);
154   EXPECT_EQ(c0.window_end_offset(), 37u);
155   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
156 
157   constexpr ConstByteSpan data(kData32);
158   context_.server().SendServerStream<Transfer::Read>(
159       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
160                       .set_session_id(4)
161                       .set_offset(0)
162                       .set_payload(data.first(16))));
163   transfer_thread_.WaitUntilEventIsProcessed();
164 
165   ASSERT_EQ(payloads.size(), 1u);
166 
167   context_.server().SendServerStream<Transfer::Read>(
168       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
169                       .set_session_id(4)
170                       .set_offset(16)
171                       .set_payload(data.subspan(16))
172                       .set_remaining_bytes(0)));
173   transfer_thread_.WaitUntilEventIsProcessed();
174 
175   ASSERT_EQ(payloads.size(), 2u);
176 
177   Chunk c1 = DecodeChunk(payloads[1]);
178   EXPECT_EQ(c1.session_id(), 4u);
179   ASSERT_TRUE(c1.status().has_value());
180   EXPECT_EQ(c1.status().value(), OkStatus());
181 
182   EXPECT_EQ(transfer_status, OkStatus());
183   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
184             0);
185 }
186 
TEST_F(ReadTransfer,MultipleTransfers)187 TEST_F(ReadTransfer, MultipleTransfers) {
188   stream::MemoryWriterBuffer<64> writer;
189   Status transfer_status = Status::Unknown();
190 
191   ASSERT_EQ(
192       OkStatus(),
193       legacy_client_
194           .Read(3,
195                 writer,
196                 [&transfer_status](Status status) { transfer_status = status; })
197           .status());
198   transfer_thread_.WaitUntilEventIsProcessed();
199 
200   context_.server().SendServerStream<Transfer::Read>(
201       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
202                       .set_session_id(3)
203                       .set_offset(0)
204                       .set_payload(kData32)
205                       .set_remaining_bytes(0)));
206   transfer_thread_.WaitUntilEventIsProcessed();
207 
208   ASSERT_EQ(transfer_status, OkStatus());
209   transfer_status = Status::Unknown();
210 
211   ASSERT_EQ(
212       OkStatus(),
213       legacy_client_
214           .Read(3,
215                 writer,
216                 [&transfer_status](Status status) { transfer_status = status; })
217           .status());
218   transfer_thread_.WaitUntilEventIsProcessed();
219 
220   context_.server().SendServerStream<Transfer::Read>(
221       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
222                       .set_session_id(3)
223                       .set_offset(0)
224                       .set_payload(kData32)
225                       .set_remaining_bytes(0)));
226   transfer_thread_.WaitUntilEventIsProcessed();
227 
228   EXPECT_EQ(transfer_status, OkStatus());
229 }
230 
231 class ReadTransferMaxBytes32 : public ReadTransfer {
232  protected:
ReadTransferMaxBytes32()233   ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
234 };
235 
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromConstructorArg)236 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
237   stream::MemoryWriterBuffer<64> writer;
238   EXPECT_EQ(OkStatus(), legacy_client_.Read(5, writer, [](Status) {}).status());
239   transfer_thread_.WaitUntilEventIsProcessed();
240 
241   // First transfer parameters chunk is sent.
242   rpc::PayloadsView payloads =
243       context_.output().payloads<Transfer::Read>(context_.channel().id());
244   ASSERT_EQ(payloads.size(), 1u);
245 
246   Chunk c0 = DecodeChunk(payloads[0]);
247   EXPECT_EQ(c0.session_id(), 5u);
248   EXPECT_EQ(c0.resource_id(), 5u);
249   EXPECT_EQ(c0.offset(), 0u);
250   EXPECT_EQ(c0.window_end_offset(), 32u);
251   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
252 }
253 
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromWriterLimit)254 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
255   stream::MemoryWriterBuffer<16> small_writer;
256   EXPECT_EQ(OkStatus(),
257             legacy_client_.Read(5, small_writer, [](Status) {}).status());
258   transfer_thread_.WaitUntilEventIsProcessed();
259 
260   // First transfer parameters chunk is sent.
261   rpc::PayloadsView payloads =
262       context_.output().payloads<Transfer::Read>(context_.channel().id());
263   ASSERT_EQ(payloads.size(), 1u);
264 
265   Chunk c0 = DecodeChunk(payloads[0]);
266   EXPECT_EQ(c0.session_id(), 5u);
267   EXPECT_EQ(c0.resource_id(), 5u);
268   EXPECT_EQ(c0.offset(), 0u);
269   EXPECT_EQ(c0.window_end_offset(), 16u);
270   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
271 }
272 
TEST_F(ReadTransferMaxBytes32,MultiParameters)273 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
274   stream::MemoryWriterBuffer<64> writer;
275   Status transfer_status = Status::Unknown();
276 
277   ASSERT_EQ(
278       OkStatus(),
279       legacy_client_
280           .Read(6,
281                 writer,
282                 [&transfer_status](Status status) { transfer_status = status; })
283           .status());
284   transfer_thread_.WaitUntilEventIsProcessed();
285 
286   // First transfer parameters chunk is sent.
287   rpc::PayloadsView payloads =
288       context_.output().payloads<Transfer::Read>(context_.channel().id());
289   ASSERT_EQ(payloads.size(), 1u);
290   EXPECT_EQ(transfer_status, Status::Unknown());
291 
292   Chunk c0 = DecodeChunk(payloads[0]);
293   EXPECT_EQ(c0.session_id(), 6u);
294   EXPECT_EQ(c0.resource_id(), 6u);
295   EXPECT_EQ(c0.offset(), 0u);
296   ASSERT_EQ(c0.window_end_offset(), 32u);
297 
298   constexpr ConstByteSpan data(kData64);
299   context_.server().SendServerStream<Transfer::Read>(
300       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
301                       .set_session_id(6)
302                       .set_offset(0)
303                       .set_payload(data.first(32))));
304   transfer_thread_.WaitUntilEventIsProcessed();
305 
306   ASSERT_EQ(payloads.size(), 2u);
307   EXPECT_EQ(transfer_status, Status::Unknown());
308 
309   // Second parameters chunk.
310   Chunk c1 = DecodeChunk(payloads[1]);
311   EXPECT_EQ(c1.session_id(), 6u);
312   EXPECT_EQ(c1.offset(), 32u);
313   ASSERT_EQ(c1.window_end_offset(), 64u);
314 
315   context_.server().SendServerStream<Transfer::Read>(
316       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
317                       .set_session_id(6)
318                       .set_offset(32)
319                       .set_payload(data.subspan(32))
320                       .set_remaining_bytes(0)));
321   transfer_thread_.WaitUntilEventIsProcessed();
322 
323   ASSERT_EQ(payloads.size(), 3u);
324 
325   Chunk c2 = DecodeChunk(payloads[2]);
326   EXPECT_EQ(c2.session_id(), 6u);
327   ASSERT_TRUE(c2.status().has_value());
328   EXPECT_EQ(c2.status().value(), OkStatus());
329 
330   EXPECT_EQ(transfer_status, OkStatus());
331   EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
332 }
333 
TEST_F(ReadTransfer,UnexpectedOffset)334 TEST_F(ReadTransfer, UnexpectedOffset) {
335   stream::MemoryWriterBuffer<64> writer;
336   Status transfer_status = Status::Unknown();
337 
338   ASSERT_EQ(
339       OkStatus(),
340       legacy_client_
341           .Read(7,
342                 writer,
343                 [&transfer_status](Status status) { transfer_status = status; })
344           .status());
345   transfer_thread_.WaitUntilEventIsProcessed();
346 
347   // First transfer parameters chunk is sent.
348   rpc::PayloadsView payloads =
349       context_.output().payloads<Transfer::Read>(context_.channel().id());
350   ASSERT_EQ(payloads.size(), 1u);
351   EXPECT_EQ(transfer_status, Status::Unknown());
352 
353   Chunk c0 = DecodeChunk(payloads[0]);
354   EXPECT_EQ(c0.session_id(), 7u);
355   EXPECT_EQ(c0.resource_id(), 7u);
356   EXPECT_EQ(c0.offset(), 0u);
357   EXPECT_EQ(c0.window_end_offset(), 37u);
358 
359   constexpr ConstByteSpan data(kData32);
360   context_.server().SendServerStream<Transfer::Read>(
361       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
362                       .set_session_id(7)
363                       .set_offset(0)
364                       .set_payload(data.first(16))));
365   transfer_thread_.WaitUntilEventIsProcessed();
366 
367   ASSERT_EQ(payloads.size(), 1u);
368   EXPECT_EQ(transfer_status, Status::Unknown());
369 
370   // Send a chunk with an incorrect offset. The client should resend parameters.
371   context_.server().SendServerStream<Transfer::Read>(
372       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
373                       .set_session_id(7)
374                       .set_offset(8)  // wrong!
375                       .set_payload(data.subspan(16))
376                       .set_remaining_bytes(0)));
377   transfer_thread_.WaitUntilEventIsProcessed();
378 
379   ASSERT_EQ(payloads.size(), 2u);
380   EXPECT_EQ(transfer_status, Status::Unknown());
381 
382   Chunk c1 = DecodeChunk(payloads[1]);
383   EXPECT_EQ(c1.session_id(), 7u);
384   EXPECT_EQ(c1.offset(), 16u);
385   EXPECT_EQ(c1.window_end_offset(), 53u);
386 
387   // Send the correct chunk, completing the transfer.
388   context_.server().SendServerStream<Transfer::Read>(
389       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
390                       .set_session_id(7)
391                       .set_offset(16)
392                       .set_payload(data.subspan(16))
393                       .set_remaining_bytes(0)));
394   transfer_thread_.WaitUntilEventIsProcessed();
395 
396   ASSERT_EQ(payloads.size(), 3u);
397 
398   Chunk c2 = DecodeChunk(payloads[2]);
399   EXPECT_EQ(c2.session_id(), 7u);
400   ASSERT_TRUE(c2.status().has_value());
401   EXPECT_EQ(c2.status().value(), OkStatus());
402 
403   EXPECT_EQ(transfer_status, OkStatus());
404   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
405             0);
406 }
407 
TEST_F(ReadTransferMaxBytes32,TooMuchData_EntersRecovery)408 TEST_F(ReadTransferMaxBytes32, TooMuchData_EntersRecovery) {
409   stream::MemoryWriterBuffer<32> writer;
410   Status transfer_status = Status::Unknown();
411 
412   ASSERT_EQ(
413       OkStatus(),
414       legacy_client_
415           .Read(8,
416                 writer,
417                 [&transfer_status](Status status) { transfer_status = status; })
418           .status());
419   transfer_thread_.WaitUntilEventIsProcessed();
420 
421   // First transfer parameters chunk is sent.
422   rpc::PayloadsView payloads =
423       context_.output().payloads<Transfer::Read>(context_.channel().id());
424   ASSERT_EQ(payloads.size(), 1u);
425   EXPECT_EQ(transfer_status, Status::Unknown());
426 
427   Chunk c0 = DecodeChunk(payloads[0]);
428   EXPECT_EQ(c0.session_id(), 8u);
429   EXPECT_EQ(c0.resource_id(), 8u);
430   EXPECT_EQ(c0.offset(), 0u);
431   ASSERT_EQ(c0.window_end_offset(), 32u);
432 
433   constexpr ConstByteSpan data(kData64);
434 
435   // pending_bytes == 32
436   context_.server().SendServerStream<Transfer::Read>(
437       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
438                       .set_session_id(8)
439                       .set_offset(0)
440                       .set_payload(data.first(16))));
441 
442   // pending_bytes == 16
443   context_.server().SendServerStream<Transfer::Read>(
444       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
445                       .set_session_id(8)
446                       .set_offset(16)
447                       .set_payload(data.subspan(16, 8))));
448 
449   // pending_bytes == 8, send 16 instead.
450   context_.server().SendServerStream<Transfer::Read>(
451       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
452                       .set_session_id(8)
453                       .set_offset(24)
454                       .set_payload(data.subspan(24, 16))));
455   transfer_thread_.WaitUntilEventIsProcessed();
456 
457   ASSERT_EQ(payloads.size(), 4u);
458 
459   // The device should resend a parameters chunk.
460   Chunk c1 = DecodeChunk(payloads[3]);
461   EXPECT_EQ(c1.session_id(), 8u);
462   EXPECT_EQ(c1.type(), Chunk::Type::kParametersRetransmit);
463   EXPECT_EQ(c1.offset(), 24u);
464   EXPECT_EQ(c1.window_end_offset(), 32u);
465 
466   context_.server().SendServerStream<Transfer::Read>(
467       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
468                       .set_session_id(8)
469                       .set_offset(24)
470                       .set_payload(data.subspan(24, 8))
471                       .set_remaining_bytes(0)));
472   transfer_thread_.WaitUntilEventIsProcessed();
473 
474   EXPECT_EQ(transfer_status, OkStatus());
475 }
476 
TEST_F(ReadTransferMaxBytes32,TooMuchData_HitsLifetimeRetries)477 TEST_F(ReadTransferMaxBytes32, TooMuchData_HitsLifetimeRetries) {
478   stream::MemoryWriterBuffer<32> writer;
479   Status transfer_status = Status::Unknown();
480 
481   constexpr int kLowMaxLifetimeRetries = 3;
482   legacy_client_.set_max_lifetime_retries(kLowMaxLifetimeRetries).IgnoreError();
483 
484   ASSERT_EQ(
485       OkStatus(),
486       legacy_client_
487           .Read(8,
488                 writer,
489                 [&transfer_status](Status status) { transfer_status = status; })
490           .status());
491   transfer_thread_.WaitUntilEventIsProcessed();
492 
493   // First transfer parameters chunk is sent.
494   rpc::PayloadsView payloads =
495       context_.output().payloads<Transfer::Read>(context_.channel().id());
496   ASSERT_EQ(payloads.size(), 1u);
497   EXPECT_EQ(transfer_status, Status::Unknown());
498 
499   Chunk c0 = DecodeChunk(payloads[0]);
500   EXPECT_EQ(c0.session_id(), 8u);
501   EXPECT_EQ(c0.resource_id(), 8u);
502   EXPECT_EQ(c0.offset(), 0u);
503   ASSERT_EQ(c0.window_end_offset(), 32u);
504 
505   constexpr ConstByteSpan data(kData64);
506 
507   // pending_bytes == 32
508   context_.server().SendServerStream<Transfer::Read>(
509       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
510                       .set_session_id(8)
511                       .set_offset(0)
512                       .set_payload(data.first(16))));
513 
514   // pending_bytes == 16
515   context_.server().SendServerStream<Transfer::Read>(
516       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
517                       .set_session_id(8)
518                       .set_offset(16)
519                       .set_payload(data.subspan(16, 8))));
520 
521   // pending_bytes == 8, but send 16 several times.
522   for (int i = 0; i < kLowMaxLifetimeRetries; ++i) {
523     context_.server().SendServerStream<Transfer::Read>(
524         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
525                         .set_session_id(8)
526                         .set_offset(24)
527                         .set_payload(data.subspan(24, 16))));
528     transfer_thread_.WaitUntilEventIsProcessed();
529 
530     ASSERT_EQ(payloads.size(), 4u + i);
531 
532     // The device should resend a parameters chunk.
533     Chunk c = DecodeChunk(payloads.back());
534     EXPECT_EQ(c.session_id(), 8u);
535     EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
536   }
537   EXPECT_EQ(transfer_status, Status::Unknown());
538 
539   // Send one more incorrectly-sized chunk. The transfer should fail.
540   context_.server().SendServerStream<Transfer::Read>(
541       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
542                       .set_session_id(8)
543                       .set_offset(24)
544                       .set_payload(data.subspan(24, 16))));
545   transfer_thread_.WaitUntilEventIsProcessed();
546 
547   ASSERT_EQ(payloads.size(), 7u);
548   Chunk error = DecodeChunk(payloads.back());
549   EXPECT_EQ(error.session_id(), 8u);
550   EXPECT_EQ(error.type(), Chunk::Type::kCompletion);
551   EXPECT_EQ(error.status(), Status::Internal());
552 
553   EXPECT_EQ(transfer_status, Status::Internal());
554 }
555 
TEST_F(ReadTransfer,ServerError)556 TEST_F(ReadTransfer, ServerError) {
557   stream::MemoryWriterBuffer<64> writer;
558   Status transfer_status = Status::Unknown();
559 
560   ASSERT_EQ(
561       OkStatus(),
562       legacy_client_
563           .Read(9,
564                 writer,
565                 [&transfer_status](Status status) { transfer_status = status; })
566           .status());
567   transfer_thread_.WaitUntilEventIsProcessed();
568 
569   // First transfer parameters chunk is sent.
570   rpc::PayloadsView payloads =
571       context_.output().payloads<Transfer::Read>(context_.channel().id());
572   ASSERT_EQ(payloads.size(), 1u);
573   EXPECT_EQ(transfer_status, Status::Unknown());
574 
575   Chunk c0 = DecodeChunk(payloads[0]);
576   EXPECT_EQ(c0.session_id(), 9u);
577   EXPECT_EQ(c0.resource_id(), 9u);
578   EXPECT_EQ(c0.offset(), 0u);
579   ASSERT_EQ(c0.window_end_offset(), 37u);
580 
581   // Server sends an error. Client should not respond and terminate the
582   // transfer.
583   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
584       Chunk::Final(ProtocolVersion::kLegacy, 9, Status::NotFound())));
585   transfer_thread_.WaitUntilEventIsProcessed();
586 
587   ASSERT_EQ(payloads.size(), 1u);
588   EXPECT_EQ(transfer_status, Status::NotFound());
589 }
590 
TEST_F(ReadTransfer,OnlySendsParametersOnceAfterDrop)591 TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
592   stream::MemoryWriterBuffer<64> writer;
593   Status transfer_status = Status::Unknown();
594 
595   ASSERT_EQ(
596       OkStatus(),
597       legacy_client_
598           .Read(10,
599                 writer,
600                 [&transfer_status](Status status) { transfer_status = status; })
601           .status());
602   transfer_thread_.WaitUntilEventIsProcessed();
603 
604   // First transfer parameters chunk is sent.
605   rpc::PayloadsView payloads =
606       context_.output().payloads<Transfer::Read>(context_.channel().id());
607   ASSERT_EQ(payloads.size(), 1u);
608   EXPECT_EQ(transfer_status, Status::Unknown());
609 
610   Chunk c0 = DecodeChunk(payloads[0]);
611   EXPECT_EQ(c0.session_id(), 10u);
612   EXPECT_EQ(c0.resource_id(), 10u);
613   EXPECT_EQ(c0.offset(), 0u);
614   ASSERT_EQ(c0.window_end_offset(), 37u);
615 
616   constexpr ConstByteSpan data(kData32);
617 
618   // Send the first 8 bytes of the transfer.
619   context_.server().SendServerStream<Transfer::Read>(
620       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
621                       .set_session_id(10)
622                       .set_offset(0)
623                       .set_payload(data.first(8))));
624 
625   // Skip offset 8, send the rest starting from 16.
626   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
627     context_.server().SendServerStream<Transfer::Read>(
628         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
629                         .set_session_id(10)
630                         .set_offset(offset)
631                         .set_payload(data.subspan(offset, 8))));
632   }
633   transfer_thread_.WaitUntilEventIsProcessed();
634 
635   // Only one parameters update should be sent, with the offset of the initial
636   // dropped packet.
637   ASSERT_EQ(payloads.size(), 2u);
638 
639   Chunk c1 = DecodeChunk(payloads[1]);
640   EXPECT_EQ(c1.session_id(), 10u);
641   EXPECT_EQ(c1.offset(), 8u);
642   ASSERT_EQ(c1.window_end_offset(), 45u);
643 
644   // Send the remaining data to complete the transfer.
645   context_.server().SendServerStream<Transfer::Read>(
646       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
647                       .set_session_id(10)
648                       .set_offset(8)
649                       .set_payload(data.subspan(8))
650                       .set_remaining_bytes(0)));
651   transfer_thread_.WaitUntilEventIsProcessed();
652 
653   ASSERT_EQ(payloads.size(), 3u);
654 
655   Chunk c2 = DecodeChunk(payloads[2]);
656   EXPECT_EQ(c2.session_id(), 10u);
657   ASSERT_TRUE(c2.status().has_value());
658   EXPECT_EQ(c2.status().value(), OkStatus());
659 
660   EXPECT_EQ(transfer_status, OkStatus());
661 }
662 
TEST_F(ReadTransfer,ResendsParametersIfSentRepeatedChunkDuringRecovery)663 TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
664   stream::MemoryWriterBuffer<64> writer;
665   Status transfer_status = Status::Unknown();
666 
667   ASSERT_EQ(
668       OkStatus(),
669       legacy_client_
670           .Read(11,
671                 writer,
672                 [&transfer_status](Status status) { transfer_status = status; })
673           .status());
674   transfer_thread_.WaitUntilEventIsProcessed();
675 
676   // First transfer parameters chunk is sent.
677   rpc::PayloadsView payloads =
678       context_.output().payloads<Transfer::Read>(context_.channel().id());
679   ASSERT_EQ(payloads.size(), 1u);
680   EXPECT_EQ(transfer_status, Status::Unknown());
681 
682   Chunk c0 = DecodeChunk(payloads[0]);
683   EXPECT_EQ(c0.session_id(), 11u);
684   EXPECT_EQ(c0.resource_id(), 11u);
685   EXPECT_EQ(c0.offset(), 0u);
686   ASSERT_EQ(c0.window_end_offset(), 37u);
687 
688   constexpr ConstByteSpan data(kData32);
689 
690   // Send the first 8 bytes of the transfer.
691   context_.server().SendServerStream<Transfer::Read>(
692       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
693                       .set_session_id(11)
694                       .set_offset(0)
695                       .set_payload(data.first(8))));
696 
697   // Skip offset 8, send the rest starting from 16.
698   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
699     context_.server().SendServerStream<Transfer::Read>(
700         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
701                         .set_session_id(11)
702                         .set_offset(offset)
703                         .set_payload(data.subspan(offset, 8))));
704   }
705   transfer_thread_.WaitUntilEventIsProcessed();
706 
707   // Only one parameters update should be sent, with the offset of the initial
708   // dropped packet.
709   ASSERT_EQ(payloads.size(), 2u);
710 
711   const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
712                                .set_session_id(11)
713                                .set_offset(24)
714                                .set_payload(data.subspan(24));
715 
716   // Re-send the final chunk of the block.
717   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
718   transfer_thread_.WaitUntilEventIsProcessed();
719 
720   // The original drop parameters should be re-sent.
721   ASSERT_EQ(payloads.size(), 3u);
722   Chunk c2 = DecodeChunk(payloads[2]);
723   EXPECT_EQ(c2.session_id(), 11u);
724   EXPECT_EQ(c2.offset(), 8u);
725   ASSERT_EQ(c2.window_end_offset(), 45u);
726 
727   // Do it again.
728   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
729   transfer_thread_.WaitUntilEventIsProcessed();
730 
731   ASSERT_EQ(payloads.size(), 4u);
732   Chunk c3 = DecodeChunk(payloads[3]);
733   EXPECT_EQ(c3.session_id(), 11u);
734   EXPECT_EQ(c3.offset(), 8u);
735   ASSERT_EQ(c3.window_end_offset(), 45u);
736 
737   // Finish the transfer normally.
738   context_.server().SendServerStream<Transfer::Read>(
739       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
740                       .set_session_id(11)
741                       .set_offset(8)
742                       .set_payload(data.subspan(8))
743                       .set_remaining_bytes(0)));
744   transfer_thread_.WaitUntilEventIsProcessed();
745 
746   ASSERT_EQ(payloads.size(), 5u);
747 
748   Chunk c4 = DecodeChunk(payloads[4]);
749   EXPECT_EQ(c4.session_id(), 11u);
750   ASSERT_TRUE(c4.status().has_value());
751   EXPECT_EQ(c4.status().value(), OkStatus());
752 
753   EXPECT_EQ(transfer_status, OkStatus());
754 }
755 
756 // Use a long timeout to avoid accidentally triggering timeouts.
757 constexpr chrono::SystemClock::duration kTestTimeout = std::chrono::seconds(30);
758 constexpr uint8_t kTestRetries = 3;
759 
TEST_F(ReadTransfer,Timeout_ResendsCurrentParameters)760 TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
761   stream::MemoryWriterBuffer<64> writer;
762   Status transfer_status = Status::Unknown();
763 
764   ASSERT_EQ(
765       OkStatus(),
766       legacy_client_
767           .Read(
768               12,
769               writer,
770               [&transfer_status](Status status) { transfer_status = status; },
771               kTestTimeout,
772               kTestTimeout)
773           .status());
774   transfer_thread_.WaitUntilEventIsProcessed();
775 
776   // First transfer parameters chunk is sent.
777   rpc::PayloadsView payloads =
778       context_.output().payloads<Transfer::Read>(context_.channel().id());
779   ASSERT_EQ(payloads.size(), 1u);
780   EXPECT_EQ(transfer_status, Status::Unknown());
781 
782   Chunk c0 = DecodeChunk(payloads.back());
783   EXPECT_EQ(c0.session_id(), 12u);
784   EXPECT_EQ(c0.resource_id(), 12u);
785   EXPECT_EQ(c0.offset(), 0u);
786   EXPECT_EQ(c0.window_end_offset(), 37u);
787   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
788 
789   // Wait for the timeout to expire without doing anything. The client should
790   // resend its initial parameters chunk.
791   transfer_thread_.SimulateClientTimeout(12);
792   ASSERT_EQ(payloads.size(), 2u);
793 
794   Chunk c = DecodeChunk(payloads.back());
795   EXPECT_EQ(c.session_id(), 12u);
796   EXPECT_EQ(c.offset(), 0u);
797   EXPECT_EQ(c.window_end_offset(), 37u);
798   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
799 
800   // Transfer has not yet completed.
801   EXPECT_EQ(transfer_status, Status::Unknown());
802 
803   // Finish the transfer following the timeout.
804   context_.server().SendServerStream<Transfer::Read>(
805       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
806                       .set_session_id(12)
807                       .set_offset(0)
808                       .set_payload(kData32)
809                       .set_remaining_bytes(0)));
810   transfer_thread_.WaitUntilEventIsProcessed();
811 
812   ASSERT_EQ(payloads.size(), 3u);
813 
814   Chunk c4 = DecodeChunk(payloads.back());
815   EXPECT_EQ(c4.session_id(), 12u);
816   ASSERT_TRUE(c4.status().has_value());
817   EXPECT_EQ(c4.status().value(), OkStatus());
818 
819   EXPECT_EQ(transfer_status, OkStatus());
820 }
821 
TEST_F(ReadTransfer,Timeout_ResendsUpdatedParameters)822 TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
823   stream::MemoryWriterBuffer<64> writer;
824   Status transfer_status = Status::Unknown();
825 
826   ASSERT_EQ(
827       OkStatus(),
828       legacy_client_
829           .Read(
830               13,
831               writer,
832               [&transfer_status](Status status) { transfer_status = status; },
833               kTestTimeout)
834           .status());
835   transfer_thread_.WaitUntilEventIsProcessed();
836 
837   // First transfer parameters chunk is sent.
838   rpc::PayloadsView payloads =
839       context_.output().payloads<Transfer::Read>(context_.channel().id());
840   ASSERT_EQ(payloads.size(), 1u);
841   EXPECT_EQ(transfer_status, Status::Unknown());
842 
843   Chunk c0 = DecodeChunk(payloads.back());
844   EXPECT_EQ(c0.session_id(), 13u);
845   EXPECT_EQ(c0.resource_id(), 13u);
846   EXPECT_EQ(c0.offset(), 0u);
847   EXPECT_EQ(c0.window_end_offset(), 37u);
848   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
849 
850   constexpr ConstByteSpan data(kData32);
851 
852   // Send some data, but not everything.
853   context_.server().SendServerStream<Transfer::Read>(
854       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
855                       .set_session_id(13)
856                       .set_offset(0)
857                       .set_payload(data.first(16))));
858   transfer_thread_.WaitUntilEventIsProcessed();
859 
860   ASSERT_EQ(payloads.size(), 1u);
861 
862   // Wait for the timeout to expire without sending more data. The client should
863   // send an updated parameters chunk, accounting for the data already received.
864   transfer_thread_.SimulateClientTimeout(13);
865   ASSERT_EQ(payloads.size(), 2u);
866 
867   Chunk c = DecodeChunk(payloads.back());
868   EXPECT_EQ(c.session_id(), 13u);
869   EXPECT_EQ(c.offset(), 16u);
870   EXPECT_EQ(c.window_end_offset(), 53u);
871   EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
872 
873   // Transfer has not yet completed.
874   EXPECT_EQ(transfer_status, Status::Unknown());
875 
876   // Send the rest of the data, finishing the transfer.
877   context_.server().SendServerStream<Transfer::Read>(
878       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
879                       .set_session_id(13)
880                       .set_offset(16)
881                       .set_payload(data.subspan(16))
882                       .set_remaining_bytes(0)));
883   transfer_thread_.WaitUntilEventIsProcessed();
884 
885   ASSERT_EQ(payloads.size(), 3u);
886 
887   Chunk c4 = DecodeChunk(payloads.back());
888   EXPECT_EQ(c4.session_id(), 13u);
889   ASSERT_TRUE(c4.status().has_value());
890   EXPECT_EQ(c4.status().value(), OkStatus());
891 
892   EXPECT_EQ(transfer_status, OkStatus());
893 }
894 
TEST_F(ReadTransfer,Timeout_EndsTransferAfterMaxRetries)895 TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
896   stream::MemoryWriterBuffer<64> writer;
897   Status transfer_status = Status::Unknown();
898 
899   Result<Client::Handle> handle = legacy_client_.Read(
900       14,
901       writer,
902       [&transfer_status](Status status) { transfer_status = status; },
903       kTestTimeout,
904       kTestTimeout);
905   ASSERT_EQ(OkStatus(), handle.status());
906   transfer_thread_.WaitUntilEventIsProcessed();
907 
908   // First transfer parameters chunk is sent.
909   rpc::PayloadsView payloads =
910       context_.output().payloads<Transfer::Read>(context_.channel().id());
911   ASSERT_EQ(payloads.size(), 1u);
912   EXPECT_EQ(transfer_status, Status::Unknown());
913 
914   Chunk c0 = DecodeChunk(payloads.back());
915   EXPECT_EQ(c0.session_id(), 14u);
916   EXPECT_EQ(c0.resource_id(), 14u);
917   EXPECT_EQ(c0.offset(), 0u);
918   EXPECT_EQ(c0.window_end_offset(), 37u);
919   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
920 
921   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
922     // Wait for the timeout to expire without doing anything. The client should
923     // resend its parameters chunk.
924     transfer_thread_.SimulateClientTimeout(14);
925     ASSERT_EQ(payloads.size(), retry + 1);
926 
927     Chunk c = DecodeChunk(payloads.back());
928     EXPECT_EQ(c.session_id(), 14u);
929     EXPECT_EQ(c.offset(), 0u);
930     EXPECT_EQ(c.window_end_offset(), 37u);
931 
932     // Transfer has not yet completed.
933     EXPECT_EQ(transfer_status, Status::Unknown());
934   }
935 
936   // Time out one more time after the final retry. The client should cancel the
937   // transfer at this point. As no packets were received from the server, no
938   // final status chunk should be sent.
939   transfer_thread_.SimulateClientTimeout(14);
940   ASSERT_EQ(payloads.size(), 4u);
941 
942   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
943 
944   // After finishing the transfer, nothing else should be sent.
945   transfer_thread_.SimulateClientTimeout(14);
946   transfer_thread_.SimulateClientTimeout(14);
947   transfer_thread_.SimulateClientTimeout(14);
948   ASSERT_EQ(payloads.size(), 4u);
949 }
950 
TEST_F(ReadTransfer,Timeout_ReceivingDataResetsRetryCount)951 TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
952   stream::MemoryWriterBuffer<64> writer;
953   Status transfer_status = Status::Unknown();
954 
955   constexpr ConstByteSpan data(kData32);
956 
957   Result<Client::Handle> handle = legacy_client_.Read(
958       14,
959       writer,
960       [&transfer_status](Status status) { transfer_status = status; },
961       kTestTimeout,
962       kTestTimeout);
963   ASSERT_EQ(OkStatus(), handle.status());
964   transfer_thread_.WaitUntilEventIsProcessed();
965 
966   // First transfer parameters chunk is sent.
967   rpc::PayloadsView payloads =
968       context_.output().payloads<Transfer::Read>(context_.channel().id());
969   ASSERT_EQ(payloads.size(), 1u);
970   EXPECT_EQ(transfer_status, Status::Unknown());
971 
972   Chunk c0 = DecodeChunk(payloads.back());
973   EXPECT_EQ(c0.session_id(), 14u);
974   EXPECT_EQ(c0.resource_id(), 14u);
975   EXPECT_EQ(c0.offset(), 0u);
976   EXPECT_EQ(c0.window_end_offset(), 37u);
977 
978   // Simulate one less timeout than the maximum amount of retries.
979   for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
980     transfer_thread_.SimulateClientTimeout(14);
981     ASSERT_EQ(payloads.size(), retry + 1);
982 
983     Chunk c = DecodeChunk(payloads.back());
984     EXPECT_EQ(c.session_id(), 14u);
985     EXPECT_EQ(c.offset(), 0u);
986     EXPECT_EQ(c.window_end_offset(), 37u);
987 
988     // Transfer has not yet completed.
989     EXPECT_EQ(transfer_status, Status::Unknown());
990   }
991 
992   // Send some data.
993   context_.server().SendServerStream<Transfer::Read>(
994       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
995                       .set_session_id(14)
996                       .set_offset(0)
997                       .set_payload(data.first(16))));
998   transfer_thread_.WaitUntilEventIsProcessed();
999   ASSERT_EQ(payloads.size(), 3u);
1000 
1001   // Time out a couple more times. The context's retry count should have been
1002   // reset, so it should go through the standard retry flow instead of
1003   // terminating the transfer.
1004   transfer_thread_.SimulateClientTimeout(14);
1005   ASSERT_EQ(payloads.size(), 4u);
1006 
1007   Chunk c = DecodeChunk(payloads.back());
1008   EXPECT_FALSE(c.status().has_value());
1009   EXPECT_EQ(c.session_id(), 14u);
1010   EXPECT_EQ(c.offset(), 16u);
1011   EXPECT_EQ(c.window_end_offset(), 53u);
1012 
1013   transfer_thread_.SimulateClientTimeout(14);
1014   ASSERT_EQ(payloads.size(), 5u);
1015 
1016   c = DecodeChunk(payloads.back());
1017   EXPECT_FALSE(c.status().has_value());
1018   EXPECT_EQ(c.session_id(), 14u);
1019   EXPECT_EQ(c.offset(), 16u);
1020   EXPECT_EQ(c.window_end_offset(), 53u);
1021 
1022   // Ensure we don't leave a dangling reference to transfer_status.
1023   handle->Cancel();
1024   transfer_thread_.WaitUntilEventIsProcessed();
1025 }
1026 
TEST_F(ReadTransfer,InitialPacketFails_OnCompletedCalledWithDataLoss)1027 TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
1028   stream::MemoryWriterBuffer<64> writer;
1029   Status transfer_status = Status::Unknown();
1030 
1031   context_.output().set_send_status(Status::Unauthenticated());
1032 
1033   ASSERT_EQ(OkStatus(),
1034             legacy_client_
1035                 .Read(
1036                     14,
1037                     writer,
1038                     [&transfer_status](Status status) {
1039                       ASSERT_EQ(transfer_status,
1040                                 Status::Unknown());  // Must only call once
1041                       transfer_status = status;
1042                     },
1043                     kTestTimeout)
1044                 .status());
1045   transfer_thread_.WaitUntilEventIsProcessed();
1046 
1047   EXPECT_EQ(transfer_status, Status::Internal());
1048 }
1049 
1050 class WriteTransfer : public ::testing::Test {
1051  protected:
WriteTransfer()1052   WriteTransfer()
1053       : transfer_thread_(chunk_buffer_, encode_buffer_),
1054         legacy_client_(context_.client(),
1055                        context_.channel().id(),
1056                        transfer_thread_,
1057                        transfer_thread_.max_chunk_size()),
1058         client_(context_.client(),
1059                 context_.channel().id(),
1060                 transfer_thread_,
1061                 transfer_thread_.max_chunk_size()),
1062         system_thread_(TransferThreadOptions(), transfer_thread_) {
1063     legacy_client_.set_protocol_version(ProtocolVersion::kLegacy);
1064   }
1065 
~WriteTransfer()1066   ~WriteTransfer() override {
1067     transfer_thread_.Terminate();
1068     system_thread_.join();
1069   }
1070 
1071   rpc::RawClientTestContext<> context_;
1072 
1073   Thread<1, 1> transfer_thread_;
1074   Client legacy_client_;
1075   Client client_;
1076 
1077   std::array<std::byte, 64> chunk_buffer_;
1078   std::array<std::byte, 64> encode_buffer_;
1079 
1080   pw::Thread system_thread_;
1081 };
1082 
TEST_F(WriteTransfer,SingleChunk)1083 TEST_F(WriteTransfer, SingleChunk) {
1084   stream::MemoryReader reader(kData32);
1085   Status transfer_status = Status::Unknown();
1086 
1087   ASSERT_EQ(OkStatus(),
1088             legacy_client_
1089                 .Write(3,
1090                        reader,
1091                        [&transfer_status](Status status) {
1092                          transfer_status = status;
1093                        })
1094                 .status());
1095   transfer_thread_.WaitUntilEventIsProcessed();
1096 
1097   // The client begins by sending the ID of the resource to transfer.
1098   rpc::PayloadsView payloads =
1099       context_.output().payloads<Transfer::Write>(context_.channel().id());
1100   ASSERT_EQ(payloads.size(), 1u);
1101   EXPECT_EQ(transfer_status, Status::Unknown());
1102 
1103   Chunk c0 = DecodeChunk(payloads[0]);
1104   EXPECT_EQ(c0.session_id(), 3u);
1105   EXPECT_EQ(c0.resource_id(), 3u);
1106   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1107 
1108   // Send transfer parameters. Client should send a data chunk and the final
1109   // chunk.
1110   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1111     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1112         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1113             .set_session_id(3)
1114             .set_offset(0)
1115             .set_window_end_offset(64)
1116             .set_max_chunk_size_bytes(32)));
1117   });
1118 
1119   ASSERT_EQ(payloads.size(), 3u);
1120 
1121   Chunk c1 = DecodeChunk(payloads[1]);
1122   EXPECT_EQ(c1.session_id(), 3u);
1123   EXPECT_EQ(c1.offset(), 0u);
1124   EXPECT_TRUE(c1.has_payload());
1125   EXPECT_EQ(
1126       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1127 
1128   Chunk c2 = DecodeChunk(payloads[2]);
1129   EXPECT_EQ(c2.session_id(), 3u);
1130   ASSERT_TRUE(c2.remaining_bytes().has_value());
1131   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1132 
1133   EXPECT_EQ(transfer_status, Status::Unknown());
1134 
1135   // Send the final status chunk to complete the transfer.
1136   context_.server().SendServerStream<Transfer::Write>(
1137       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
1138   transfer_thread_.WaitUntilEventIsProcessed();
1139 
1140   EXPECT_EQ(payloads.size(), 3u);
1141   EXPECT_EQ(transfer_status, OkStatus());
1142 }
1143 
TEST_F(WriteTransfer,MultiChunk)1144 TEST_F(WriteTransfer, MultiChunk) {
1145   stream::MemoryReader reader(kData32);
1146   Status transfer_status = Status::Unknown();
1147 
1148   ASSERT_EQ(OkStatus(),
1149             legacy_client_
1150                 .Write(4,
1151                        reader,
1152                        [&transfer_status](Status status) {
1153                          transfer_status = status;
1154                        })
1155                 .status());
1156   transfer_thread_.WaitUntilEventIsProcessed();
1157 
1158   // The client begins by sending the ID of the resource to transfer.
1159   rpc::PayloadsView payloads =
1160       context_.output().payloads<Transfer::Write>(context_.channel().id());
1161   ASSERT_EQ(payloads.size(), 1u);
1162   EXPECT_EQ(transfer_status, Status::Unknown());
1163 
1164   Chunk c0 = DecodeChunk(payloads[0]);
1165   EXPECT_EQ(c0.session_id(), 4u);
1166   EXPECT_EQ(c0.resource_id(), 4u);
1167   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1168 
1169   // Send transfer parameters with a chunk size smaller than the data.
1170 
1171   // Client should send two data chunks and the final chunk.
1172   rpc::test::WaitForPackets(context_.output(), 3, [this] {
1173     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1174         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1175             .set_session_id(4)
1176             .set_offset(0)
1177             .set_window_end_offset(64)
1178             .set_max_chunk_size_bytes(16)));
1179   });
1180 
1181   ASSERT_EQ(payloads.size(), 4u);
1182 
1183   Chunk c1 = DecodeChunk(payloads[1]);
1184   EXPECT_EQ(c1.session_id(), 4u);
1185   EXPECT_EQ(c1.offset(), 0u);
1186   EXPECT_TRUE(c1.has_payload());
1187   EXPECT_EQ(
1188       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1189 
1190   Chunk c2 = DecodeChunk(payloads[2]);
1191   EXPECT_EQ(c2.session_id(), 4u);
1192   EXPECT_EQ(c2.offset(), 16u);
1193   EXPECT_TRUE(c2.has_payload());
1194   EXPECT_EQ(std::memcmp(c2.payload().data(),
1195                         kData32.data() + c2.offset(),
1196                         c2.payload().size()),
1197             0);
1198 
1199   Chunk c3 = DecodeChunk(payloads[3]);
1200   EXPECT_EQ(c3.session_id(), 4u);
1201   ASSERT_TRUE(c3.remaining_bytes().has_value());
1202   EXPECT_EQ(c3.remaining_bytes().value(), 0u);
1203 
1204   EXPECT_EQ(transfer_status, Status::Unknown());
1205 
1206   // Send the final status chunk to complete the transfer.
1207   context_.server().SendServerStream<Transfer::Write>(
1208       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 4, OkStatus())));
1209   transfer_thread_.WaitUntilEventIsProcessed();
1210 
1211   EXPECT_EQ(payloads.size(), 4u);
1212   EXPECT_EQ(transfer_status, OkStatus());
1213 }
1214 
TEST_F(WriteTransfer,OutOfOrder_SeekSupported)1215 TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
1216   stream::MemoryReader reader(kData32);
1217   Status transfer_status = Status::Unknown();
1218 
1219   ASSERT_EQ(OkStatus(),
1220             legacy_client_
1221                 .Write(5,
1222                        reader,
1223                        [&transfer_status](Status status) {
1224                          transfer_status = status;
1225                        })
1226                 .status());
1227   transfer_thread_.WaitUntilEventIsProcessed();
1228 
1229   // The client begins by sending the ID of the resource to transfer.
1230   rpc::PayloadsView payloads =
1231       context_.output().payloads<Transfer::Write>(context_.channel().id());
1232   ASSERT_EQ(payloads.size(), 1u);
1233   EXPECT_EQ(transfer_status, Status::Unknown());
1234 
1235   Chunk c0 = DecodeChunk(payloads[0]);
1236   EXPECT_EQ(c0.session_id(), 5u);
1237   EXPECT_EQ(c0.resource_id(), 5u);
1238   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1239 
1240   // Send transfer parameters with a nonzero offset, requesting a seek.
1241   // Client should send a data chunk and the final chunk.
1242   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1243     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1244         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1245             .set_session_id(5)
1246             .set_offset(16)
1247             .set_window_end_offset(64)
1248             .set_max_chunk_size_bytes(32)));
1249   });
1250 
1251   ASSERT_EQ(payloads.size(), 3u);
1252 
1253   Chunk c1 = DecodeChunk(payloads[1]);
1254   EXPECT_EQ(c1.session_id(), 5u);
1255   EXPECT_EQ(c1.offset(), 16u);
1256   EXPECT_TRUE(c1.has_payload());
1257   EXPECT_EQ(std::memcmp(c1.payload().data(),
1258                         kData32.data() + c1.offset(),
1259                         c1.payload().size()),
1260             0);
1261 
1262   Chunk c2 = DecodeChunk(payloads[2]);
1263   EXPECT_EQ(c2.session_id(), 5u);
1264   ASSERT_TRUE(c2.remaining_bytes().has_value());
1265   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1266 
1267   EXPECT_EQ(transfer_status, Status::Unknown());
1268 
1269   // Send the final status chunk to complete the transfer.
1270   context_.server().SendServerStream<Transfer::Write>(
1271       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 5, OkStatus())));
1272   transfer_thread_.WaitUntilEventIsProcessed();
1273 
1274   EXPECT_EQ(payloads.size(), 3u);
1275   EXPECT_EQ(transfer_status, OkStatus());
1276 }
1277 
1278 class FakeNonSeekableReader final : public stream::NonSeekableReader {
1279  public:
FakeNonSeekableReader(ConstByteSpan data)1280   FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1281 
1282  private:
DoRead(ByteSpan out)1283   StatusWithSize DoRead(ByteSpan out) final {
1284     if (position_ == data_.size()) {
1285       return StatusWithSize::OutOfRange();
1286     }
1287 
1288     size_t to_copy = std::min(out.size(), data_.size() - position_);
1289     std::memcpy(out.data(), data_.data() + position_, to_copy);
1290     position_ += to_copy;
1291 
1292     return StatusWithSize(to_copy);
1293   }
1294 
1295   ConstByteSpan data_;
1296   size_t position_;
1297 };
1298 
TEST_F(WriteTransfer,OutOfOrder_SeekNotSupported)1299 TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
1300   FakeNonSeekableReader reader(kData32);
1301   Status transfer_status = Status::Unknown();
1302 
1303   ASSERT_EQ(OkStatus(),
1304             legacy_client_
1305                 .Write(6,
1306                        reader,
1307                        [&transfer_status](Status status) {
1308                          transfer_status = status;
1309                        })
1310                 .status());
1311   transfer_thread_.WaitUntilEventIsProcessed();
1312 
1313   // The client begins by sending the ID of the resource to transfer.
1314   rpc::PayloadsView payloads =
1315       context_.output().payloads<Transfer::Write>(context_.channel().id());
1316   ASSERT_EQ(payloads.size(), 1u);
1317   EXPECT_EQ(transfer_status, Status::Unknown());
1318 
1319   Chunk c0 = DecodeChunk(payloads[0]);
1320   EXPECT_EQ(c0.session_id(), 6u);
1321   EXPECT_EQ(c0.resource_id(), 6u);
1322   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1323 
1324   // Send transfer parameters with a nonzero offset, requesting a seek.
1325   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1326       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1327           .set_session_id(6)
1328           .set_offset(16)
1329           .set_window_end_offset(64)
1330           .set_max_chunk_size_bytes(32)));
1331   transfer_thread_.WaitUntilEventIsProcessed();
1332 
1333   // Client should send a status chunk and end the transfer.
1334   ASSERT_EQ(payloads.size(), 2u);
1335 
1336   Chunk c1 = DecodeChunk(payloads[1]);
1337   EXPECT_EQ(c1.session_id(), 6u);
1338   EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1339   ASSERT_TRUE(c1.status().has_value());
1340   EXPECT_EQ(c1.status().value(), Status::Unimplemented());
1341 
1342   EXPECT_EQ(transfer_status, Status::Unimplemented());
1343 }
1344 
TEST_F(WriteTransfer,ServerError)1345 TEST_F(WriteTransfer, ServerError) {
1346   stream::MemoryReader reader(kData32);
1347   Status transfer_status = Status::Unknown();
1348 
1349   ASSERT_EQ(OkStatus(),
1350             legacy_client_
1351                 .Write(7,
1352                        reader,
1353                        [&transfer_status](Status status) {
1354                          transfer_status = status;
1355                        })
1356                 .status());
1357   transfer_thread_.WaitUntilEventIsProcessed();
1358 
1359   // The client begins by sending the ID of the resource to transfer.
1360   rpc::PayloadsView payloads =
1361       context_.output().payloads<Transfer::Write>(context_.channel().id());
1362   ASSERT_EQ(payloads.size(), 1u);
1363   EXPECT_EQ(transfer_status, Status::Unknown());
1364 
1365   Chunk c0 = DecodeChunk(payloads[0]);
1366   EXPECT_EQ(c0.session_id(), 7u);
1367   EXPECT_EQ(c0.resource_id(), 7u);
1368   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1369 
1370   // Send an error from the server.
1371   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1372       Chunk::Final(ProtocolVersion::kLegacy, 7, Status::NotFound())));
1373   transfer_thread_.WaitUntilEventIsProcessed();
1374 
1375   // Client should not respond and terminate the transfer.
1376   EXPECT_EQ(payloads.size(), 1u);
1377   EXPECT_EQ(transfer_status, Status::NotFound());
1378 }
1379 
TEST_F(WriteTransfer,AbortIfZeroBytesAreRequested)1380 TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1381   stream::MemoryReader reader(kData32);
1382   Status transfer_status = Status::Unknown();
1383 
1384   ASSERT_EQ(OkStatus(),
1385             legacy_client_
1386                 .Write(9,
1387                        reader,
1388                        [&transfer_status](Status status) {
1389                          transfer_status = status;
1390                        })
1391                 .status());
1392   transfer_thread_.WaitUntilEventIsProcessed();
1393 
1394   // The client begins by sending the ID of the resource to transfer.
1395   rpc::PayloadsView payloads =
1396       context_.output().payloads<Transfer::Write>(context_.channel().id());
1397   ASSERT_EQ(payloads.size(), 1u);
1398   EXPECT_EQ(transfer_status, Status::Unknown());
1399 
1400   Chunk c0 = DecodeChunk(payloads[0]);
1401   EXPECT_EQ(c0.session_id(), 9u);
1402   EXPECT_EQ(c0.resource_id(), 9u);
1403   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1404 
1405   // Send an invalid transfer parameters chunk with 0 pending bytes.
1406   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1407       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1408           .set_session_id(9)
1409           .set_offset(0)
1410           .set_window_end_offset(0)
1411           .set_max_chunk_size_bytes(32)));
1412   transfer_thread_.WaitUntilEventIsProcessed();
1413 
1414   // Client should send a status chunk and end the transfer.
1415   ASSERT_EQ(payloads.size(), 2u);
1416 
1417   Chunk c1 = DecodeChunk(payloads[1]);
1418   EXPECT_EQ(c1.session_id(), 9u);
1419   ASSERT_TRUE(c1.status().has_value());
1420   EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1421 
1422   EXPECT_EQ(transfer_status, Status::ResourceExhausted());
1423 }
1424 
TEST_F(WriteTransfer,Timeout_RetriesWithInitialChunk)1425 TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1426   stream::MemoryReader reader(kData32);
1427   Status transfer_status = Status::Unknown();
1428 
1429   Result<Client::Handle> handle = legacy_client_.Write(
1430       10,
1431       reader,
1432       [&transfer_status](Status status) { transfer_status = status; },
1433       kTestTimeout,
1434       kTestTimeout);
1435   ASSERT_EQ(OkStatus(), handle.status());
1436   transfer_thread_.WaitUntilEventIsProcessed();
1437 
1438   // The client begins by sending the ID of the resource to transfer.
1439   rpc::PayloadsView payloads =
1440       context_.output().payloads<Transfer::Write>(context_.channel().id());
1441   ASSERT_EQ(payloads.size(), 1u);
1442   EXPECT_EQ(transfer_status, Status::Unknown());
1443 
1444   Chunk c0 = DecodeChunk(payloads.back());
1445   EXPECT_EQ(c0.session_id(), 10u);
1446   EXPECT_EQ(c0.resource_id(), 10u);
1447   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1448 
1449   // Wait for the timeout to expire without doing anything. The client should
1450   // resend the initial transmit chunk.
1451   transfer_thread_.SimulateClientTimeout(10);
1452   ASSERT_EQ(payloads.size(), 2u);
1453 
1454   Chunk c = DecodeChunk(payloads.back());
1455   EXPECT_EQ(c.session_id(), 10u);
1456   EXPECT_EQ(c.resource_id(), 10u);
1457   EXPECT_EQ(c.type(), Chunk::Type::kStart);
1458 
1459   // Transfer has not yet completed.
1460   EXPECT_EQ(transfer_status, Status::Unknown());
1461 
1462   // Ensure we don't leave a dangling reference to transfer_status.
1463   handle->Cancel();
1464   transfer_thread_.WaitUntilEventIsProcessed();
1465 }
1466 
TEST_F(WriteTransfer,Timeout_RetriesWithMostRecentChunk)1467 TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1468   stream::MemoryReader reader(kData32);
1469   Status transfer_status = Status::Unknown();
1470 
1471   Result<Client::Handle> handle = legacy_client_.Write(
1472       11,
1473       reader,
1474       [&transfer_status](Status status) { transfer_status = status; },
1475       kTestTimeout);
1476   ASSERT_EQ(OkStatus(), handle.status());
1477   transfer_thread_.WaitUntilEventIsProcessed();
1478 
1479   // The client begins by sending the ID of the resource to transfer.
1480   rpc::PayloadsView payloads =
1481       context_.output().payloads<Transfer::Write>(context_.channel().id());
1482   ASSERT_EQ(payloads.size(), 1u);
1483   EXPECT_EQ(transfer_status, Status::Unknown());
1484 
1485   Chunk c0 = DecodeChunk(payloads.back());
1486   EXPECT_EQ(c0.session_id(), 11u);
1487   EXPECT_EQ(c0.resource_id(), 11u);
1488   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1489 
1490   // Send the first parameters chunk.
1491   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1492     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1493         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1494             .set_session_id(11)
1495             .set_offset(0)
1496             .set_window_end_offset(16)
1497             .set_max_chunk_size_bytes(8)));
1498   });
1499   ASSERT_EQ(payloads.size(), 3u);
1500 
1501   EXPECT_EQ(transfer_status, Status::Unknown());
1502 
1503   Chunk c1 = DecodeChunk(payloads[1]);
1504   EXPECT_EQ(c1.session_id(), 11u);
1505   EXPECT_EQ(c1.offset(), 0u);
1506   EXPECT_EQ(c1.payload().size(), 8u);
1507   EXPECT_EQ(
1508       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1509 
1510   Chunk c2 = DecodeChunk(payloads[2]);
1511   EXPECT_EQ(c2.session_id(), 11u);
1512   EXPECT_EQ(c2.offset(), 8u);
1513   EXPECT_EQ(c2.payload().size(), 8u);
1514   EXPECT_EQ(std::memcmp(c2.payload().data(),
1515                         kData32.data() + c2.offset(),
1516                         c2.payload().size()),
1517             0);
1518 
1519   // Wait for the timeout to expire without doing anything. The client should
1520   // resend the most recently sent chunk.
1521   transfer_thread_.SimulateClientTimeout(11);
1522   ASSERT_EQ(payloads.size(), 4u);
1523 
1524   Chunk c3 = DecodeChunk(payloads[3]);
1525   EXPECT_EQ(c3.session_id(), c2.session_id());
1526   EXPECT_EQ(c3.offset(), c2.offset());
1527   EXPECT_EQ(c3.payload().size(), c2.payload().size());
1528   EXPECT_EQ(std::memcmp(
1529                 c3.payload().data(), c2.payload().data(), c3.payload().size()),
1530             0);
1531 
1532   // Transfer has not yet completed.
1533   EXPECT_EQ(transfer_status, Status::Unknown());
1534 
1535   // Ensure we don't leave a dangling reference to transfer_status.
1536   handle->Cancel();
1537   transfer_thread_.WaitUntilEventIsProcessed();
1538 }
1539 
TEST_F(WriteTransfer,Timeout_RetriesWithSingleChunkTransfer)1540 TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1541   stream::MemoryReader reader(kData32);
1542   Status transfer_status = Status::Unknown();
1543 
1544   Result<Client::Handle> handle = legacy_client_.Write(
1545       12,
1546       reader,
1547       [&transfer_status](Status status) { transfer_status = status; },
1548       kTestTimeout);
1549   ASSERT_EQ(OkStatus(), handle.status());
1550   transfer_thread_.WaitUntilEventIsProcessed();
1551 
1552   // The client begins by sending the ID of the resource to transfer.
1553   rpc::PayloadsView payloads =
1554       context_.output().payloads<Transfer::Write>(context_.channel().id());
1555   ASSERT_EQ(payloads.size(), 1u);
1556   EXPECT_EQ(transfer_status, Status::Unknown());
1557 
1558   Chunk c0 = DecodeChunk(payloads.back());
1559   EXPECT_EQ(c0.session_id(), 12u);
1560   EXPECT_EQ(c0.resource_id(), 12u);
1561   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1562 
1563   // Send the first parameters chunk, requesting all the data. The client should
1564   // respond with one data chunk and a remaining_bytes = 0 chunk.
1565   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1566     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1567         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1568             .set_session_id(12)
1569             .set_offset(0)
1570             .set_window_end_offset(64)
1571             .set_max_chunk_size_bytes(64)));
1572   });
1573   ASSERT_EQ(payloads.size(), 3u);
1574 
1575   EXPECT_EQ(transfer_status, Status::Unknown());
1576 
1577   Chunk c1 = DecodeChunk(payloads[1]);
1578   EXPECT_EQ(c1.session_id(), 12u);
1579   EXPECT_EQ(c1.offset(), 0u);
1580   EXPECT_EQ(c1.payload().size(), 32u);
1581   EXPECT_EQ(
1582       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1583 
1584   Chunk c2 = DecodeChunk(payloads[2]);
1585   EXPECT_EQ(c2.session_id(), 12u);
1586   ASSERT_TRUE(c2.remaining_bytes().has_value());
1587   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1588 
1589   // Wait for the timeout to expire without doing anything. The client should
1590   // resend the data chunk.
1591   transfer_thread_.SimulateClientTimeout(12);
1592   ASSERT_EQ(payloads.size(), 4u);
1593 
1594   Chunk c3 = DecodeChunk(payloads[3]);
1595   EXPECT_EQ(c3.session_id(), c1.session_id());
1596   EXPECT_EQ(c3.offset(), c1.offset());
1597   EXPECT_EQ(c3.payload().size(), c1.payload().size());
1598   EXPECT_EQ(std::memcmp(
1599                 c3.payload().data(), c1.payload().data(), c3.payload().size()),
1600             0);
1601 
1602   // The remaining_bytes = 0 chunk should be resent on the next parameters.
1603   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1604       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1605           .set_session_id(12)
1606           .set_offset(32)
1607           .set_window_end_offset(64)));
1608   transfer_thread_.WaitUntilEventIsProcessed();
1609 
1610   ASSERT_EQ(payloads.size(), 5u);
1611 
1612   Chunk c4 = DecodeChunk(payloads[4]);
1613   EXPECT_EQ(c4.session_id(), 12u);
1614   ASSERT_TRUE(c4.remaining_bytes().has_value());
1615   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
1616 
1617   context_.server().SendServerStream<Transfer::Write>(
1618       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 12, OkStatus())));
1619   transfer_thread_.WaitUntilEventIsProcessed();
1620 
1621   EXPECT_EQ(transfer_status, OkStatus());
1622 }
1623 
TEST_F(WriteTransfer,Timeout_EndsTransferAfterMaxRetries)1624 TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1625   stream::MemoryReader reader(kData32);
1626   Status transfer_status = Status::Unknown();
1627 
1628   Result<Client::Handle> handle = legacy_client_.Write(
1629       13,
1630       reader,
1631       [&transfer_status](Status status) { transfer_status = status; },
1632       kTestTimeout,
1633       kTestTimeout);
1634   ASSERT_EQ(OkStatus(), handle.status());
1635   transfer_thread_.WaitUntilEventIsProcessed();
1636 
1637   // The client begins by sending the ID of the resource to transfer.
1638   rpc::PayloadsView payloads =
1639       context_.output().payloads<Transfer::Write>(context_.channel().id());
1640   ASSERT_EQ(payloads.size(), 1u);
1641   EXPECT_EQ(transfer_status, Status::Unknown());
1642 
1643   Chunk c0 = DecodeChunk(payloads.back());
1644   EXPECT_EQ(c0.session_id(), 13u);
1645   EXPECT_EQ(c0.resource_id(), 13u);
1646   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1647 
1648   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1649     // Wait for the timeout to expire without doing anything. The client should
1650     // resend the initial transmit chunk.
1651     transfer_thread_.SimulateClientTimeout(13);
1652     ASSERT_EQ(payloads.size(), retry + 1);
1653 
1654     Chunk c = DecodeChunk(payloads.back());
1655     EXPECT_EQ(c.session_id(), 13u);
1656     EXPECT_EQ(c.resource_id(), 13u);
1657     EXPECT_EQ(c.type(), Chunk::Type::kStart);
1658 
1659     // Transfer has not yet completed.
1660     EXPECT_EQ(transfer_status, Status::Unknown());
1661   }
1662 
1663   // Time out one more time after the final retry. The client should cancel the
1664   // transfer at this point. As no packets were received from the server, no
1665   // final status chunk should be sent.
1666   transfer_thread_.SimulateClientTimeout(13);
1667   ASSERT_EQ(payloads.size(), 4u);
1668 
1669   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1670 
1671   // After finishing the transfer, nothing else should be sent.
1672   transfer_thread_.SimulateClientTimeout(13);
1673   transfer_thread_.SimulateClientTimeout(13);
1674   transfer_thread_.SimulateClientTimeout(13);
1675   ASSERT_EQ(payloads.size(), 4u);
1676 
1677   // Ensure we don't leave a dangling reference to transfer_status.
1678   handle->Cancel();
1679   transfer_thread_.WaitUntilEventIsProcessed();
1680 }
1681 
TEST_F(WriteTransfer,Timeout_NonSeekableReaderEndsTransfer)1682 TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1683   FakeNonSeekableReader reader(kData32);
1684   Status transfer_status = Status::Unknown();
1685 
1686   Result<Client::Handle> handle = legacy_client_.Write(
1687       14,
1688       reader,
1689       [&transfer_status](Status status) { transfer_status = status; },
1690       kTestTimeout);
1691   ASSERT_EQ(OkStatus(), handle.status());
1692   transfer_thread_.WaitUntilEventIsProcessed();
1693 
1694   // The client begins by sending the ID of the resource to transfer.
1695   rpc::PayloadsView payloads =
1696       context_.output().payloads<Transfer::Write>(context_.channel().id());
1697   ASSERT_EQ(payloads.size(), 1u);
1698   EXPECT_EQ(transfer_status, Status::Unknown());
1699 
1700   Chunk c0 = DecodeChunk(payloads.back());
1701   EXPECT_EQ(c0.session_id(), 14u);
1702   EXPECT_EQ(c0.resource_id(), 14u);
1703   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1704 
1705   // Send the first parameters chunk.
1706   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1707     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1708         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1709             .set_session_id(14)
1710             .set_offset(0)
1711             .set_window_end_offset(16)
1712             .set_max_chunk_size_bytes(8)));
1713   });
1714   ASSERT_EQ(payloads.size(), 3u);
1715 
1716   EXPECT_EQ(transfer_status, Status::Unknown());
1717 
1718   Chunk c1 = DecodeChunk(payloads[1]);
1719   EXPECT_EQ(c1.session_id(), 14u);
1720   EXPECT_EQ(c1.offset(), 0u);
1721   EXPECT_TRUE(c1.has_payload());
1722   EXPECT_EQ(c1.payload().size(), 8u);
1723   EXPECT_EQ(
1724       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1725 
1726   Chunk c2 = DecodeChunk(payloads[2]);
1727   EXPECT_EQ(c2.session_id(), 14u);
1728   EXPECT_EQ(c2.offset(), 8u);
1729   EXPECT_TRUE(c2.has_payload());
1730   EXPECT_EQ(c2.payload().size(), 8u);
1731   EXPECT_EQ(std::memcmp(c2.payload().data(),
1732                         kData32.data() + c2.offset(),
1733                         c2.payload().size()),
1734             0);
1735 
1736   // Wait for the timeout to expire without doing anything. The client should
1737   // fail to seek back and end the transfer.
1738   transfer_thread_.SimulateClientTimeout(14);
1739   ASSERT_EQ(payloads.size(), 4u);
1740 
1741   Chunk c3 = DecodeChunk(payloads[3]);
1742   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
1743   EXPECT_EQ(c3.session_id(), 14u);
1744   ASSERT_TRUE(c3.status().has_value());
1745   EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
1746 
1747   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1748 
1749   // Ensure we don't leave a dangling reference to transfer_status.
1750   handle->Cancel();
1751   transfer_thread_.WaitUntilEventIsProcessed();
1752 }
1753 
TEST_F(WriteTransfer,ManualCancel)1754 TEST_F(WriteTransfer, ManualCancel) {
1755   stream::MemoryReader reader(kData32);
1756   Status transfer_status = Status::Unknown();
1757 
1758   Result<Client::Handle> handle = legacy_client_.Write(
1759       15,
1760       reader,
1761       [&transfer_status](Status status) { transfer_status = status; },
1762       kTestTimeout);
1763   ASSERT_EQ(OkStatus(), handle.status());
1764   transfer_thread_.WaitUntilEventIsProcessed();
1765 
1766   // The client begins by sending the ID of the resource to transfer.
1767   rpc::PayloadsView payloads =
1768       context_.output().payloads<Transfer::Write>(context_.channel().id());
1769   ASSERT_EQ(payloads.size(), 1u);
1770   EXPECT_EQ(transfer_status, Status::Unknown());
1771 
1772   Chunk chunk = DecodeChunk(payloads.back());
1773   EXPECT_EQ(chunk.session_id(), 15u);
1774   EXPECT_EQ(chunk.resource_id(), 15u);
1775   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1776 
1777   // Get a response from the server, then cancel the transfer.
1778   // This must request a smaller chunk than the entire available write data to
1779   // prevent the client from trying to send an additional finish chunk.
1780   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1781       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1782           .set_session_id(15)
1783           .set_offset(0)
1784           .set_window_end_offset(16)
1785           .set_max_chunk_size_bytes(16)));
1786   transfer_thread_.WaitUntilEventIsProcessed();
1787   ASSERT_EQ(payloads.size(), 2u);
1788 
1789   handle->Cancel();
1790   transfer_thread_.WaitUntilEventIsProcessed();
1791 
1792   // Client should send a cancellation chunk to the server.
1793   ASSERT_EQ(payloads.size(), 3u);
1794   chunk = DecodeChunk(payloads.back());
1795   EXPECT_EQ(chunk.session_id(), 15u);
1796   ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
1797   EXPECT_EQ(chunk.status().value(), Status::Cancelled());
1798 
1799   EXPECT_EQ(transfer_status, Status::Cancelled());
1800 }
1801 
TEST_F(WriteTransfer,ManualCancel_NoContact)1802 TEST_F(WriteTransfer, ManualCancel_NoContact) {
1803   stream::MemoryReader reader(kData32);
1804   Status transfer_status = Status::Unknown();
1805 
1806   Result<Client::Handle> handle = legacy_client_.Write(
1807       15,
1808       reader,
1809       [&transfer_status](Status status) { transfer_status = status; },
1810       kTestTimeout,
1811       kTestTimeout);
1812   ASSERT_EQ(handle.status(), OkStatus());
1813   transfer_thread_.WaitUntilEventIsProcessed();
1814 
1815   // The client begins by sending the ID of the resource to transfer.
1816   rpc::PayloadsView payloads =
1817       context_.output().payloads<Transfer::Write>(context_.channel().id());
1818   ASSERT_EQ(payloads.size(), 1u);
1819   EXPECT_EQ(transfer_status, Status::Unknown());
1820 
1821   Chunk chunk = DecodeChunk(payloads.back());
1822   EXPECT_EQ(chunk.session_id(), 15u);
1823   EXPECT_EQ(chunk.resource_id(), 15u);
1824   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1825 
1826   // Cancel transfer without a server response. No final chunk should be sent.
1827   handle->Cancel();
1828   transfer_thread_.WaitUntilEventIsProcessed();
1829 
1830   ASSERT_EQ(payloads.size(), 1u);
1831 
1832   EXPECT_EQ(transfer_status, Status::Cancelled());
1833 }
1834 
TEST_F(WriteTransfer,ManualCancel_Duplicate)1835 TEST_F(WriteTransfer, ManualCancel_Duplicate) {
1836   stream::MemoryReader reader(kData32);
1837   Status transfer_status = Status::Unknown();
1838 
1839   Result<Client::Handle> handle = legacy_client_.Write(
1840       16,
1841       reader,
1842       [&transfer_status](Status status) { transfer_status = status; },
1843       kTestTimeout);
1844   ASSERT_EQ(OkStatus(), handle.status());
1845   transfer_thread_.WaitUntilEventIsProcessed();
1846 
1847   // The client begins by sending the ID of the resource to transfer.
1848   rpc::PayloadsView payloads =
1849       context_.output().payloads<Transfer::Write>(context_.channel().id());
1850   ASSERT_EQ(payloads.size(), 1u);
1851   EXPECT_EQ(transfer_status, Status::Unknown());
1852 
1853   Chunk chunk = DecodeChunk(payloads.back());
1854   EXPECT_EQ(chunk.session_id(), 16u);
1855   EXPECT_EQ(chunk.resource_id(), 16u);
1856   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1857 
1858   // Get a response from the server, then cancel the transfer.
1859   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1860       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1861           .set_session_id(16)
1862           .set_offset(0)
1863           .set_window_end_offset(16)  // Request only a single chunk.
1864           .set_max_chunk_size_bytes(16)));
1865   transfer_thread_.WaitUntilEventIsProcessed();
1866   ASSERT_EQ(payloads.size(), 2u);
1867 
1868   handle->Cancel();
1869   transfer_thread_.WaitUntilEventIsProcessed();
1870 
1871   // Client should send a cancellation chunk to the server.
1872   ASSERT_EQ(payloads.size(), 3u);
1873   chunk = DecodeChunk(payloads.back());
1874   EXPECT_EQ(chunk.session_id(), 16u);
1875   ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
1876   EXPECT_EQ(chunk.status().value(), Status::Cancelled());
1877 
1878   EXPECT_EQ(transfer_status, Status::Cancelled());
1879 
1880   // Attempt to cancel the transfer again.
1881   transfer_status = Status::Unknown();
1882   handle->Cancel();
1883   transfer_thread_.WaitUntilEventIsProcessed();
1884 
1885   // No further chunks should be sent.
1886   EXPECT_EQ(payloads.size(), 3u);
1887   EXPECT_EQ(transfer_status, Status::Unknown());
1888 }
1889 
TEST_F(ReadTransfer,Version2_SingleChunk)1890 TEST_F(ReadTransfer, Version2_SingleChunk) {
1891   stream::MemoryWriterBuffer<64> writer;
1892   Status transfer_status = Status::Unknown();
1893 
1894   ASSERT_EQ(
1895       OkStatus(),
1896       client_
1897           .Read(
1898               3,
1899               writer,
1900               [&transfer_status](Status status) { transfer_status = status; },
1901               cfg::kDefaultClientTimeout,
1902               cfg::kDefaultClientTimeout)
1903           .status());
1904 
1905   transfer_thread_.WaitUntilEventIsProcessed();
1906 
1907   // Initial chunk of the transfer is sent. This chunk should contain all the
1908   // fields from both legacy and version 2 protocols for backwards
1909   // compatibility.
1910   rpc::PayloadsView payloads =
1911       context_.output().payloads<Transfer::Read>(context_.channel().id());
1912   ASSERT_EQ(payloads.size(), 1u);
1913   EXPECT_EQ(transfer_status, Status::Unknown());
1914 
1915   Chunk chunk = DecodeChunk(payloads[0]);
1916   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1917   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1918   EXPECT_EQ(chunk.desired_session_id(), 1u);
1919   EXPECT_EQ(chunk.resource_id(), 3u);
1920   EXPECT_EQ(chunk.offset(), 0u);
1921   EXPECT_EQ(chunk.window_end_offset(), 37u);
1922   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1923 
1924   // The server responds with a START_ACK, continuing the version 2 handshake.
1925   context_.server().SendServerStream<Transfer::Read>(
1926       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1927                       .set_session_id(1)
1928                       .set_resource_id(3)));
1929   transfer_thread_.WaitUntilEventIsProcessed();
1930 
1931   ASSERT_EQ(payloads.size(), 2u);
1932 
1933   // Client should accept the session_id with a START_ACK_CONFIRMATION,
1934   // additionally containing the initial parameters for the read transfer.
1935   chunk = DecodeChunk(payloads.back());
1936   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1937   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1938   EXPECT_FALSE(chunk.desired_session_id().has_value());
1939   EXPECT_EQ(chunk.session_id(), 1u);
1940   EXPECT_FALSE(chunk.resource_id().has_value());
1941   EXPECT_EQ(chunk.offset(), 0u);
1942   EXPECT_EQ(chunk.window_end_offset(), 37u);
1943   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1944 
1945   // Send all the transfer data. Client should accept it and complete the
1946   // transfer.
1947   context_.server().SendServerStream<Transfer::Read>(
1948       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1949                       .set_session_id(1)
1950                       .set_offset(0)
1951                       .set_payload(kData32)
1952                       .set_remaining_bytes(0)));
1953   transfer_thread_.WaitUntilEventIsProcessed();
1954 
1955   ASSERT_EQ(payloads.size(), 3u);
1956 
1957   chunk = DecodeChunk(payloads.back());
1958   EXPECT_EQ(chunk.session_id(), 1u);
1959   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1960   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1961   ASSERT_TRUE(chunk.status().has_value());
1962   EXPECT_EQ(chunk.status().value(), OkStatus());
1963 
1964   EXPECT_EQ(transfer_status, OkStatus());
1965   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1966             0);
1967 
1968   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1969       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1970           .set_session_id(1)));
1971 }
1972 
TEST_F(ReadTransfer,Version2_ServerRunsLegacy)1973 TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
1974   stream::MemoryWriterBuffer<64> writer;
1975   Status transfer_status = Status::Unknown();
1976 
1977   ASSERT_EQ(
1978       OkStatus(),
1979       client_
1980           .Read(
1981               3,
1982               writer,
1983               [&transfer_status](Status status) { transfer_status = status; },
1984               cfg::kDefaultClientTimeout,
1985               cfg::kDefaultClientTimeout)
1986           .status());
1987 
1988   transfer_thread_.WaitUntilEventIsProcessed();
1989 
1990   // Initial chunk of the transfer is sent. This chunk should contain all the
1991   // fields from both legacy and version 2 protocols for backwards
1992   // compatibility.
1993   rpc::PayloadsView payloads =
1994       context_.output().payloads<Transfer::Read>(context_.channel().id());
1995   ASSERT_EQ(payloads.size(), 1u);
1996   EXPECT_EQ(transfer_status, Status::Unknown());
1997 
1998   Chunk chunk = DecodeChunk(payloads[0]);
1999   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2000   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2001   EXPECT_EQ(chunk.desired_session_id(), 1u);
2002   EXPECT_EQ(chunk.resource_id(), 3u);
2003   EXPECT_EQ(chunk.offset(), 0u);
2004   EXPECT_EQ(chunk.window_end_offset(), 37u);
2005   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2006 
2007   // Instead of a START_ACK to continue the handshake, the server responds with
2008   // an immediate data chunk, indicating that it is running the legacy protocol
2009   // version. Client should revert to legacy, using the resource_id of 3 as the
2010   // session_id, and complete the transfer.
2011   context_.server().SendServerStream<Transfer::Read>(
2012       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
2013                       .set_session_id(3)
2014                       .set_offset(0)
2015                       .set_payload(kData32)
2016                       .set_remaining_bytes(0)));
2017   transfer_thread_.WaitUntilEventIsProcessed();
2018 
2019   ASSERT_EQ(payloads.size(), 2u);
2020 
2021   chunk = DecodeChunk(payloads.back());
2022   EXPECT_FALSE(chunk.desired_session_id().has_value());
2023   EXPECT_EQ(chunk.session_id(), 3u);
2024   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2025   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2026   ASSERT_TRUE(chunk.status().has_value());
2027   EXPECT_EQ(chunk.status().value(), OkStatus());
2028 
2029   EXPECT_EQ(transfer_status, OkStatus());
2030   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2031             0);
2032 }
2033 
TEST_F(ReadTransfer,Version2_TimeoutDuringHandshake)2034 TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
2035   stream::MemoryWriterBuffer<64> writer;
2036   Status transfer_status = Status::Unknown();
2037 
2038   ASSERT_EQ(
2039       OkStatus(),
2040       client_
2041           .Read(
2042               3,
2043               writer,
2044               [&transfer_status](Status status) { transfer_status = status; },
2045               cfg::kDefaultClientTimeout,
2046               cfg::kDefaultClientTimeout)
2047           .status());
2048 
2049   transfer_thread_.WaitUntilEventIsProcessed();
2050 
2051   // Initial chunk of the transfer is sent. This chunk should contain all the
2052   // fields from both legacy and version 2 protocols for backwards
2053   // compatibility.
2054   rpc::PayloadsView payloads =
2055       context_.output().payloads<Transfer::Read>(context_.channel().id());
2056   ASSERT_EQ(payloads.size(), 1u);
2057   EXPECT_EQ(transfer_status, Status::Unknown());
2058 
2059   Chunk chunk = DecodeChunk(payloads.back());
2060   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2061   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2062   EXPECT_EQ(chunk.desired_session_id(), 1u);
2063   EXPECT_EQ(chunk.resource_id(), 3u);
2064   EXPECT_EQ(chunk.offset(), 0u);
2065   EXPECT_EQ(chunk.window_end_offset(), 37u);
2066   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2067 
2068   // Wait for the timeout to expire without doing anything. The client should
2069   // resend the initial chunk.
2070   transfer_thread_.SimulateClientTimeout(1);
2071   ASSERT_EQ(payloads.size(), 2u);
2072 
2073   chunk = DecodeChunk(payloads.back());
2074   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2075   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2076   EXPECT_EQ(chunk.session_id(), 1u);
2077   EXPECT_EQ(chunk.resource_id(), 3u);
2078   EXPECT_EQ(chunk.offset(), 0u);
2079   EXPECT_EQ(chunk.window_end_offset(), 37u);
2080   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2081 
2082   // This time, the server responds, continuing the handshake and transfer.
2083   context_.server().SendServerStream<Transfer::Read>(
2084       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2085                       .set_session_id(1)
2086                       .set_resource_id(3)));
2087   transfer_thread_.WaitUntilEventIsProcessed();
2088 
2089   ASSERT_EQ(payloads.size(), 3u);
2090 
2091   chunk = DecodeChunk(payloads.back());
2092   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2093   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2094   EXPECT_EQ(chunk.session_id(), 1u);
2095   EXPECT_FALSE(chunk.resource_id().has_value());
2096   EXPECT_EQ(chunk.offset(), 0u);
2097   EXPECT_EQ(chunk.window_end_offset(), 37u);
2098   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2099 
2100   context_.server().SendServerStream<Transfer::Read>(
2101       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2102                       .set_session_id(1)
2103                       .set_offset(0)
2104                       .set_payload(kData32)
2105                       .set_remaining_bytes(0)));
2106   transfer_thread_.WaitUntilEventIsProcessed();
2107 
2108   ASSERT_EQ(payloads.size(), 4u);
2109 
2110   chunk = DecodeChunk(payloads.back());
2111   EXPECT_EQ(chunk.session_id(), 1u);
2112   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2113   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2114   ASSERT_TRUE(chunk.status().has_value());
2115   EXPECT_EQ(chunk.status().value(), OkStatus());
2116 
2117   EXPECT_EQ(transfer_status, OkStatus());
2118   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2119             0);
2120 
2121   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2122       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2123           .set_session_id(1)));
2124 }
2125 
TEST_F(ReadTransfer,Version2_TimeoutAfterHandshake)2126 TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
2127   stream::MemoryWriterBuffer<64> writer;
2128   Status transfer_status = Status::Unknown();
2129 
2130   ASSERT_EQ(
2131       OkStatus(),
2132       client_
2133           .Read(
2134               3,
2135               writer,
2136               [&transfer_status](Status status) { transfer_status = status; },
2137               cfg::kDefaultClientTimeout,
2138               cfg::kDefaultClientTimeout)
2139           .status());
2140 
2141   transfer_thread_.WaitUntilEventIsProcessed();
2142 
2143   // Initial chunk of the transfer is sent. This chunk should contain all the
2144   // fields from both legacy and version 2 protocols for backwards
2145   // compatibility.
2146   rpc::PayloadsView payloads =
2147       context_.output().payloads<Transfer::Read>(context_.channel().id());
2148   ASSERT_EQ(payloads.size(), 1u);
2149   EXPECT_EQ(transfer_status, Status::Unknown());
2150 
2151   Chunk chunk = DecodeChunk(payloads.back());
2152   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2153   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2154   EXPECT_EQ(chunk.desired_session_id(), 1u);
2155   EXPECT_EQ(chunk.resource_id(), 3u);
2156   EXPECT_EQ(chunk.offset(), 0u);
2157   EXPECT_EQ(chunk.window_end_offset(), 37u);
2158   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2159 
2160   // The server responds with a START_ACK, continuing the version 2 handshake
2161   // and assigning a session_id to the transfer.
2162   context_.server().SendServerStream<Transfer::Read>(
2163       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2164                       .set_session_id(1)
2165                       .set_resource_id(3)));
2166   transfer_thread_.WaitUntilEventIsProcessed();
2167 
2168   ASSERT_EQ(payloads.size(), 2u);
2169 
2170   // Client should accept the session_id with a START_ACK_CONFIRMATION,
2171   // additionally containing the initial parameters for the read transfer.
2172   chunk = DecodeChunk(payloads.back());
2173   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2174   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2175   EXPECT_EQ(chunk.session_id(), 1u);
2176   EXPECT_FALSE(chunk.resource_id().has_value());
2177   EXPECT_EQ(chunk.offset(), 0u);
2178   EXPECT_EQ(chunk.window_end_offset(), 37u);
2179   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2180 
2181   // Wait for the timeout to expire without doing anything. The client should
2182   // resend the confirmation chunk.
2183   transfer_thread_.SimulateClientTimeout(1);
2184   ASSERT_EQ(payloads.size(), 3u);
2185 
2186   chunk = DecodeChunk(payloads.back());
2187   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2188   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2189   EXPECT_EQ(chunk.session_id(), 1u);
2190   EXPECT_FALSE(chunk.resource_id().has_value());
2191   EXPECT_EQ(chunk.offset(), 0u);
2192   EXPECT_EQ(chunk.window_end_offset(), 37u);
2193   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2194 
2195   // The server responds and the transfer should continue normally.
2196   context_.server().SendServerStream<Transfer::Read>(
2197       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2198                       .set_session_id(1)
2199                       .set_offset(0)
2200                       .set_payload(kData32)
2201                       .set_remaining_bytes(0)));
2202   transfer_thread_.WaitUntilEventIsProcessed();
2203 
2204   ASSERT_EQ(payloads.size(), 4u);
2205 
2206   chunk = DecodeChunk(payloads.back());
2207   EXPECT_EQ(chunk.session_id(), 1u);
2208   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2209   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2210   ASSERT_TRUE(chunk.status().has_value());
2211   EXPECT_EQ(chunk.status().value(), OkStatus());
2212 
2213   EXPECT_EQ(transfer_status, OkStatus());
2214   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2215             0);
2216 
2217   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2218       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2219           .set_session_id(1)));
2220 }
2221 
TEST_F(ReadTransfer,Version2_ServerErrorDuringHandshake)2222 TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
2223   stream::MemoryWriterBuffer<64> writer;
2224   Status transfer_status = Status::Unknown();
2225 
2226   ASSERT_EQ(
2227       OkStatus(),
2228       client_
2229           .Read(
2230               3,
2231               writer,
2232               [&transfer_status](Status status) { transfer_status = status; },
2233               cfg::kDefaultClientTimeout,
2234               cfg::kDefaultClientTimeout)
2235           .status());
2236 
2237   transfer_thread_.WaitUntilEventIsProcessed();
2238 
2239   // Initial chunk of the transfer is sent. This chunk should contain all the
2240   // fields from both legacy and version 2 protocols for backwards
2241   // compatibility.
2242   rpc::PayloadsView payloads =
2243       context_.output().payloads<Transfer::Read>(context_.channel().id());
2244   ASSERT_EQ(payloads.size(), 1u);
2245   EXPECT_EQ(transfer_status, Status::Unknown());
2246 
2247   Chunk chunk = DecodeChunk(payloads.back());
2248   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2249   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2250   EXPECT_EQ(chunk.desired_session_id(), 1u);
2251   EXPECT_EQ(chunk.resource_id(), 3u);
2252   EXPECT_EQ(chunk.offset(), 0u);
2253   EXPECT_EQ(chunk.window_end_offset(), 37u);
2254   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2255 
2256   // The server responds to the start request with an error.
2257   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
2258       ProtocolVersion::kVersionTwo, 1, Status::Unauthenticated())));
2259   transfer_thread_.WaitUntilEventIsProcessed();
2260 
2261   EXPECT_EQ(payloads.size(), 1u);
2262   EXPECT_EQ(transfer_status, Status::Unauthenticated());
2263 }
2264 
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckRetries)2265 TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
2266   stream::MemoryWriterBuffer<64> writer;
2267   Status transfer_status = Status::Unknown();
2268 
2269   ASSERT_EQ(
2270       OkStatus(),
2271       client_
2272           .Read(
2273               3,
2274               writer,
2275               [&transfer_status](Status status) { transfer_status = status; },
2276               cfg::kDefaultClientTimeout,
2277               cfg::kDefaultClientTimeout)
2278           .status());
2279 
2280   transfer_thread_.WaitUntilEventIsProcessed();
2281 
2282   // Initial chunk of the transfer is sent. This chunk should contain all the
2283   // fields from both legacy and version 2 protocols for backwards
2284   // compatibility.
2285   rpc::PayloadsView payloads =
2286       context_.output().payloads<Transfer::Read>(context_.channel().id());
2287   ASSERT_EQ(payloads.size(), 1u);
2288   EXPECT_EQ(transfer_status, Status::Unknown());
2289 
2290   Chunk chunk = DecodeChunk(payloads[0]);
2291   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2292   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2293   EXPECT_EQ(chunk.desired_session_id(), 1u);
2294   EXPECT_EQ(chunk.resource_id(), 3u);
2295   EXPECT_EQ(chunk.offset(), 0u);
2296   EXPECT_EQ(chunk.window_end_offset(), 37u);
2297   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2298 
2299   // The server responds with a START_ACK, continuing the version 2 handshake.
2300   context_.server().SendServerStream<Transfer::Read>(
2301       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2302                       .set_session_id(1)
2303                       .set_resource_id(3)));
2304   transfer_thread_.WaitUntilEventIsProcessed();
2305 
2306   ASSERT_EQ(payloads.size(), 2u);
2307 
2308   // Client should accept the session_id with a START_ACK_CONFIRMATION,
2309   // additionally containing the initial parameters for the read transfer.
2310   chunk = DecodeChunk(payloads.back());
2311   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2312   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2313   EXPECT_EQ(chunk.session_id(), 1u);
2314   EXPECT_FALSE(chunk.resource_id().has_value());
2315   EXPECT_EQ(chunk.offset(), 0u);
2316   EXPECT_EQ(chunk.window_end_offset(), 37u);
2317   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2318 
2319   // Send all the transfer data. Client should accept it and complete the
2320   // transfer.
2321   context_.server().SendServerStream<Transfer::Read>(
2322       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2323                       .set_session_id(1)
2324                       .set_offset(0)
2325                       .set_payload(kData32)
2326                       .set_remaining_bytes(0)));
2327   transfer_thread_.WaitUntilEventIsProcessed();
2328 
2329   ASSERT_EQ(payloads.size(), 3u);
2330 
2331   chunk = DecodeChunk(payloads.back());
2332   EXPECT_EQ(chunk.session_id(), 1u);
2333   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2334   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2335   ASSERT_TRUE(chunk.status().has_value());
2336   EXPECT_EQ(chunk.status().value(), OkStatus());
2337 
2338   EXPECT_EQ(transfer_status, OkStatus());
2339   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2340             0);
2341 
2342   // Time out instead of sending a completion ACK. THe transfer should resend
2343   // its completion chunk.
2344   transfer_thread_.SimulateClientTimeout(1);
2345   ASSERT_EQ(payloads.size(), 4u);
2346 
2347   // Reset transfer_status to check whether the handler is called again.
2348   transfer_status = Status::Unknown();
2349 
2350   chunk = DecodeChunk(payloads.back());
2351   EXPECT_EQ(chunk.session_id(), 1u);
2352   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2353   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2354   ASSERT_TRUE(chunk.status().has_value());
2355   EXPECT_EQ(chunk.status().value(), OkStatus());
2356 
2357   // Transfer handler should not be called a second time in response to the
2358   // re-sent completion chunk.
2359   EXPECT_EQ(transfer_status, Status::Unknown());
2360 
2361   // Send a completion ACK to end the transfer.
2362   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2363       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2364           .set_session_id(1)));
2365   transfer_thread_.WaitUntilEventIsProcessed();
2366 
2367   // No further chunks should be sent following the ACK.
2368   transfer_thread_.SimulateClientTimeout(1);
2369   ASSERT_EQ(payloads.size(), 4u);
2370 }
2371 
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries)2372 TEST_F(ReadTransfer,
2373        Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) {
2374   stream::MemoryWriterBuffer<64> writer;
2375   Status transfer_status = Status::Unknown();
2376 
2377   ASSERT_EQ(
2378       OkStatus(),
2379       client_
2380           .Read(
2381               3,
2382               writer,
2383               [&transfer_status](Status status) { transfer_status = status; },
2384               cfg::kDefaultClientTimeout,
2385               cfg::kDefaultClientTimeout)
2386           .status());
2387 
2388   transfer_thread_.WaitUntilEventIsProcessed();
2389 
2390   // Initial chunk of the transfer is sent. This chunk should contain all the
2391   // fields from both legacy and version 2 protocols for backwards
2392   // compatibility.
2393   rpc::PayloadsView payloads =
2394       context_.output().payloads<Transfer::Read>(context_.channel().id());
2395   ASSERT_EQ(payloads.size(), 1u);
2396   EXPECT_EQ(transfer_status, Status::Unknown());
2397 
2398   Chunk chunk = DecodeChunk(payloads[0]);
2399   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2400   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2401   EXPECT_EQ(chunk.desired_session_id(), 1u);
2402   EXPECT_EQ(chunk.resource_id(), 3u);
2403   EXPECT_EQ(chunk.offset(), 0u);
2404   EXPECT_EQ(chunk.window_end_offset(), 37u);
2405   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2406 
2407   // The server responds with a START_ACK, continuing the version 2 handshake
2408   // and assigning a session_id to the transfer.
2409   context_.server().SendServerStream<Transfer::Read>(
2410       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2411                       .set_session_id(1)
2412                       .set_resource_id(3)));
2413   transfer_thread_.WaitUntilEventIsProcessed();
2414 
2415   ASSERT_EQ(payloads.size(), 2u);
2416 
2417   // Client should accept the session_id with a START_ACK_CONFIRMATION,
2418   // additionally containing the initial parameters for the read transfer.
2419   chunk = DecodeChunk(payloads.back());
2420   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2421   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2422   EXPECT_EQ(chunk.session_id(), 1u);
2423   EXPECT_FALSE(chunk.resource_id().has_value());
2424   EXPECT_EQ(chunk.offset(), 0u);
2425   EXPECT_EQ(chunk.window_end_offset(), 37u);
2426   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2427 
2428   // Send all the transfer data. Client should accept it and complete the
2429   // transfer.
2430   context_.server().SendServerStream<Transfer::Read>(
2431       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2432                       .set_session_id(1)
2433                       .set_offset(0)
2434                       .set_payload(kData32)
2435                       .set_remaining_bytes(0)));
2436   transfer_thread_.WaitUntilEventIsProcessed();
2437 
2438   ASSERT_EQ(payloads.size(), 3u);
2439 
2440   chunk = DecodeChunk(payloads.back());
2441   EXPECT_EQ(chunk.session_id(), 1u);
2442   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2443   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2444   ASSERT_TRUE(chunk.status().has_value());
2445   EXPECT_EQ(chunk.status().value(), OkStatus());
2446 
2447   EXPECT_EQ(transfer_status, OkStatus());
2448   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2449             0);
2450 
2451   // Time out instead of sending a completion ACK. THe transfer should resend
2452   // its completion chunk at first, then terminate after the maximum number of
2453   // retries.
2454   transfer_thread_.SimulateClientTimeout(1);
2455   ASSERT_EQ(payloads.size(), 4u);  // Retry 1.
2456 
2457   transfer_thread_.SimulateClientTimeout(1);
2458   ASSERT_EQ(payloads.size(), 5u);  // Retry 2.
2459 
2460   transfer_thread_.SimulateClientTimeout(1);
2461   ASSERT_EQ(payloads.size(), 6u);  // Retry 3.
2462 
2463   transfer_thread_.SimulateClientTimeout(1);
2464   ASSERT_EQ(payloads.size(), 6u);  // No more retries; transfer has ended.
2465 }
2466 
TEST_F(WriteTransfer,Version2_SingleChunk)2467 TEST_F(WriteTransfer, Version2_SingleChunk) {
2468   stream::MemoryReader reader(kData32);
2469   Status transfer_status = Status::Unknown();
2470 
2471   ASSERT_EQ(
2472       OkStatus(),
2473       client_
2474           .Write(
2475               3,
2476               reader,
2477               [&transfer_status](Status status) { transfer_status = status; },
2478               cfg::kDefaultClientTimeout,
2479               cfg::kDefaultClientTimeout)
2480           .status());
2481   transfer_thread_.WaitUntilEventIsProcessed();
2482 
2483   // The client begins by sending the ID of the resource to transfer.
2484   rpc::PayloadsView payloads =
2485       context_.output().payloads<Transfer::Write>(context_.channel().id());
2486   ASSERT_EQ(payloads.size(), 1u);
2487   EXPECT_EQ(transfer_status, Status::Unknown());
2488 
2489   Chunk chunk = DecodeChunk(payloads.back());
2490   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2491   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2492   EXPECT_EQ(chunk.desired_session_id(), 1u);
2493   EXPECT_EQ(chunk.resource_id(), 3u);
2494 
2495   // The server responds with a START_ACK, continuing the version 2 handshake.
2496   context_.server().SendServerStream<Transfer::Write>(
2497       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2498                       .set_session_id(1)
2499                       .set_resource_id(3)));
2500   transfer_thread_.WaitUntilEventIsProcessed();
2501 
2502   ASSERT_EQ(payloads.size(), 2u);
2503 
2504   // Client should accept the session_id with a START_ACK_CONFIRMATION.
2505   chunk = DecodeChunk(payloads.back());
2506   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2507   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2508   EXPECT_EQ(chunk.session_id(), 1u);
2509   EXPECT_FALSE(chunk.resource_id().has_value());
2510 
2511   // The server can then begin the data transfer by sending its transfer
2512   // parameters. Client should respond with a data chunk and the final chunk.
2513   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2514     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2515         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2516             .set_session_id(1)
2517             .set_offset(0)
2518             .set_window_end_offset(64)
2519             .set_max_chunk_size_bytes(32)));
2520   });
2521 
2522   ASSERT_EQ(payloads.size(), 4u);
2523 
2524   chunk = DecodeChunk(payloads[2]);
2525   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2526   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2527   EXPECT_EQ(chunk.session_id(), 1u);
2528   EXPECT_EQ(chunk.offset(), 0u);
2529   EXPECT_TRUE(chunk.has_payload());
2530   EXPECT_EQ(std::memcmp(
2531                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2532             0);
2533 
2534   chunk = DecodeChunk(payloads[3]);
2535   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2536   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2537   EXPECT_EQ(chunk.session_id(), 1u);
2538   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2539   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2540 
2541   EXPECT_EQ(transfer_status, Status::Unknown());
2542 
2543   // Send the final status chunk to complete the transfer.
2544   context_.server().SendServerStream<Transfer::Write>(
2545       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
2546   transfer_thread_.WaitUntilEventIsProcessed();
2547 
2548   // Client should acknowledge the completion of the transfer.
2549   EXPECT_EQ(payloads.size(), 5u);
2550 
2551   chunk = DecodeChunk(payloads[4]);
2552   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2553   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2554   EXPECT_EQ(chunk.session_id(), 1u);
2555 
2556   EXPECT_EQ(transfer_status, OkStatus());
2557 }
2558 
TEST_F(WriteTransfer,Version2_ServerRunsLegacy)2559 TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
2560   stream::MemoryReader reader(kData32);
2561   Status transfer_status = Status::Unknown();
2562 
2563   ASSERT_EQ(
2564       OkStatus(),
2565       client_
2566           .Write(
2567               3,
2568               reader,
2569               [&transfer_status](Status status) { transfer_status = status; },
2570               cfg::kDefaultClientTimeout,
2571               cfg::kDefaultClientTimeout)
2572           .status());
2573   transfer_thread_.WaitUntilEventIsProcessed();
2574 
2575   // The client begins by sending the ID of the resource to transfer.
2576   rpc::PayloadsView payloads =
2577       context_.output().payloads<Transfer::Write>(context_.channel().id());
2578   ASSERT_EQ(payloads.size(), 1u);
2579   EXPECT_EQ(transfer_status, Status::Unknown());
2580 
2581   Chunk chunk = DecodeChunk(payloads.back());
2582   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2583   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2584   EXPECT_EQ(chunk.desired_session_id(), 1u);
2585   EXPECT_EQ(chunk.resource_id(), 3u);
2586 
2587   // Instead of continuing the handshake with a START_ACK, the server
2588   // immediately sends parameters, indicating that it only supports the legacy
2589   // protocol. Client should switch over to legacy and continue the transfer.
2590   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2591     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2592         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
2593             .set_session_id(3)
2594             .set_offset(0)
2595             .set_window_end_offset(64)
2596             .set_max_chunk_size_bytes(32)));
2597   });
2598 
2599   ASSERT_EQ(payloads.size(), 3u);
2600 
2601   chunk = DecodeChunk(payloads[1]);
2602   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2603   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2604   EXPECT_EQ(chunk.session_id(), 3u);
2605   EXPECT_EQ(chunk.offset(), 0u);
2606   EXPECT_TRUE(chunk.has_payload());
2607   EXPECT_EQ(std::memcmp(
2608                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2609             0);
2610 
2611   chunk = DecodeChunk(payloads[2]);
2612   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2613   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2614   EXPECT_EQ(chunk.session_id(), 3u);
2615   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2616   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2617 
2618   EXPECT_EQ(transfer_status, Status::Unknown());
2619 
2620   // Send the final status chunk to complete the transfer.
2621   context_.server().SendServerStream<Transfer::Write>(
2622       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
2623   transfer_thread_.WaitUntilEventIsProcessed();
2624 
2625   EXPECT_EQ(payloads.size(), 3u);
2626   EXPECT_EQ(transfer_status, OkStatus());
2627 }
2628 
TEST_F(WriteTransfer,Version2_RetryDuringHandshake)2629 TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
2630   stream::MemoryReader reader(kData32);
2631   Status transfer_status = Status::Unknown();
2632 
2633   ASSERT_EQ(
2634       OkStatus(),
2635       client_
2636           .Write(
2637               3,
2638               reader,
2639               [&transfer_status](Status status) { transfer_status = status; },
2640               cfg::kDefaultClientTimeout,
2641               cfg::kDefaultClientTimeout)
2642           .status());
2643   transfer_thread_.WaitUntilEventIsProcessed();
2644 
2645   // The client begins by sending the ID of the resource to transfer.
2646   rpc::PayloadsView payloads =
2647       context_.output().payloads<Transfer::Write>(context_.channel().id());
2648   ASSERT_EQ(payloads.size(), 1u);
2649   EXPECT_EQ(transfer_status, Status::Unknown());
2650 
2651   Chunk chunk = DecodeChunk(payloads.back());
2652   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2653   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2654   EXPECT_EQ(chunk.desired_session_id(), 1u);
2655   EXPECT_EQ(chunk.resource_id(), 3u);
2656 
2657   // Time out waiting for a server response. The client should resend the
2658   // initial packet.
2659   transfer_thread_.SimulateClientTimeout(1);
2660   ASSERT_EQ(payloads.size(), 2u);
2661 
2662   chunk = DecodeChunk(payloads.back());
2663   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2664   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2665   EXPECT_EQ(chunk.desired_session_id(), 1u);
2666   EXPECT_EQ(chunk.resource_id(), 3u);
2667 
2668   // This time, respond with the correct continuation packet. The transfer
2669   // should resume and complete normally.
2670   context_.server().SendServerStream<Transfer::Write>(
2671       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2672                       .set_session_id(1)
2673                       .set_resource_id(3)));
2674   transfer_thread_.WaitUntilEventIsProcessed();
2675 
2676   ASSERT_EQ(payloads.size(), 3u);
2677 
2678   chunk = DecodeChunk(payloads.back());
2679   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2680   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2681   EXPECT_EQ(chunk.session_id(), 1u);
2682   EXPECT_FALSE(chunk.resource_id().has_value());
2683 
2684   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2685     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2686         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2687             .set_session_id(1)
2688             .set_offset(0)
2689             .set_window_end_offset(64)
2690             .set_max_chunk_size_bytes(32)));
2691   });
2692 
2693   ASSERT_EQ(payloads.size(), 5u);
2694 
2695   chunk = DecodeChunk(payloads[3]);
2696   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2697   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2698   EXPECT_EQ(chunk.session_id(), 1u);
2699   EXPECT_EQ(chunk.offset(), 0u);
2700   EXPECT_TRUE(chunk.has_payload());
2701   EXPECT_EQ(std::memcmp(
2702                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2703             0);
2704 
2705   chunk = DecodeChunk(payloads[4]);
2706   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2707   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2708   EXPECT_EQ(chunk.session_id(), 1u);
2709   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2710   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2711 
2712   EXPECT_EQ(transfer_status, Status::Unknown());
2713 
2714   context_.server().SendServerStream<Transfer::Write>(
2715       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
2716   transfer_thread_.WaitUntilEventIsProcessed();
2717 
2718   // Client should acknowledge the completion of the transfer.
2719   EXPECT_EQ(payloads.size(), 6u);
2720 
2721   chunk = DecodeChunk(payloads[5]);
2722   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2723   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2724   EXPECT_EQ(chunk.session_id(), 1u);
2725 
2726   EXPECT_EQ(transfer_status, OkStatus());
2727 }
2728 
TEST_F(WriteTransfer,Version2_RetryAfterHandshake)2729 TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
2730   stream::MemoryReader reader(kData32);
2731   Status transfer_status = Status::Unknown();
2732 
2733   ASSERT_EQ(
2734       OkStatus(),
2735       client_
2736           .Write(
2737               3,
2738               reader,
2739               [&transfer_status](Status status) { transfer_status = status; },
2740               cfg::kDefaultClientTimeout,
2741               cfg::kDefaultClientTimeout)
2742           .status());
2743   transfer_thread_.WaitUntilEventIsProcessed();
2744 
2745   // The client begins by sending the ID of the resource to transfer.
2746   rpc::PayloadsView payloads =
2747       context_.output().payloads<Transfer::Write>(context_.channel().id());
2748   ASSERT_EQ(payloads.size(), 1u);
2749   EXPECT_EQ(transfer_status, Status::Unknown());
2750 
2751   Chunk chunk = DecodeChunk(payloads.back());
2752   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2753   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2754   EXPECT_EQ(chunk.desired_session_id(), 1u);
2755   EXPECT_EQ(chunk.resource_id(), 3u);
2756 
2757   // The server responds with a START_ACK, continuing the version 2 handshake.
2758   context_.server().SendServerStream<Transfer::Write>(
2759       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2760                       .set_session_id(1)
2761                       .set_resource_id(3)));
2762   transfer_thread_.WaitUntilEventIsProcessed();
2763 
2764   ASSERT_EQ(payloads.size(), 2u);
2765 
2766   // Client should accept the session_id with a START_ACK_CONFIRMATION.
2767   chunk = DecodeChunk(payloads.back());
2768   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2769   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2770   EXPECT_EQ(chunk.session_id(), 1u);
2771   EXPECT_FALSE(chunk.resource_id().has_value());
2772 
2773   // Time out waiting for a server response. The client should resend the
2774   // initial packet.
2775   transfer_thread_.SimulateClientTimeout(1);
2776   ASSERT_EQ(payloads.size(), 3u);
2777 
2778   chunk = DecodeChunk(payloads.back());
2779   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2780   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2781   EXPECT_EQ(chunk.session_id(), 1u);
2782   EXPECT_FALSE(chunk.resource_id().has_value());
2783 
2784   // This time, respond with the first transfer parameters chunk. The transfer
2785   // should resume and complete normally.
2786   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2787     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2788         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2789             .set_session_id(1)
2790             .set_offset(0)
2791             .set_window_end_offset(64)
2792             .set_max_chunk_size_bytes(32)));
2793   });
2794 
2795   ASSERT_EQ(payloads.size(), 5u);
2796 
2797   chunk = DecodeChunk(payloads[3]);
2798   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2799   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2800   EXPECT_EQ(chunk.session_id(), 1u);
2801   EXPECT_EQ(chunk.offset(), 0u);
2802   EXPECT_TRUE(chunk.has_payload());
2803   EXPECT_EQ(std::memcmp(
2804                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2805             0);
2806 
2807   chunk = DecodeChunk(payloads[4]);
2808   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2809   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2810   EXPECT_EQ(chunk.session_id(), 1u);
2811   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2812   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2813 
2814   EXPECT_EQ(transfer_status, Status::Unknown());
2815 
2816   context_.server().SendServerStream<Transfer::Write>(
2817       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
2818   transfer_thread_.WaitUntilEventIsProcessed();
2819 
2820   // Client should acknowledge the completion of the transfer.
2821   EXPECT_EQ(payloads.size(), 6u);
2822 
2823   chunk = DecodeChunk(payloads[5]);
2824   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2825   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2826   EXPECT_EQ(chunk.session_id(), 1u);
2827 
2828   EXPECT_EQ(transfer_status, OkStatus());
2829 }
2830 
TEST_F(WriteTransfer,Version2_ServerErrorDuringHandshake)2831 TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
2832   stream::MemoryReader reader(kData32);
2833   Status transfer_status = Status::Unknown();
2834 
2835   ASSERT_EQ(
2836       OkStatus(),
2837       client_
2838           .Write(
2839               3,
2840               reader,
2841               [&transfer_status](Status status) { transfer_status = status; },
2842               cfg::kDefaultClientTimeout,
2843               cfg::kDefaultClientTimeout)
2844           .status());
2845   transfer_thread_.WaitUntilEventIsProcessed();
2846 
2847   // The client begins by sending the ID of the resource to transfer.
2848   rpc::PayloadsView payloads =
2849       context_.output().payloads<Transfer::Write>(context_.channel().id());
2850   ASSERT_EQ(payloads.size(), 1u);
2851   EXPECT_EQ(transfer_status, Status::Unknown());
2852 
2853   Chunk chunk = DecodeChunk(payloads.back());
2854   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2855   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2856   EXPECT_EQ(chunk.desired_session_id(), 1u);
2857   EXPECT_EQ(chunk.resource_id(), 3u);
2858 
2859   // The server responds to the start request with an error.
2860   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2861       Chunk::Final(ProtocolVersion::kVersionTwo, 1, Status::NotFound())));
2862   transfer_thread_.WaitUntilEventIsProcessed();
2863 
2864   EXPECT_EQ(payloads.size(), 1u);
2865   EXPECT_EQ(transfer_status, Status::NotFound());
2866 }
2867 
2868 class ReadTransferMaxBytes256 : public ReadTransfer {
2869  protected:
ReadTransferMaxBytes256()2870   ReadTransferMaxBytes256() : ReadTransfer(/*max_bytes_to_receive=*/256) {}
2871 };
2872 
TEST_F(ReadTransferMaxBytes256,Version2_AdapativeWindow_SlowStart)2873 TEST_F(ReadTransferMaxBytes256, Version2_AdapativeWindow_SlowStart) {
2874   stream::MemoryWriterBuffer<256> writer;
2875   Status transfer_status = Status::Unknown();
2876 
2877   constexpr size_t kExpectedMaxChunkSize = 37;
2878 
2879   ASSERT_EQ(
2880       OkStatus(),
2881       client_
2882           .Read(
2883               3,
2884               writer,
2885               [&transfer_status](Status status) { transfer_status = status; },
2886               cfg::kDefaultClientTimeout,
2887               cfg::kDefaultClientTimeout)
2888           .status());
2889   transfer_thread_.WaitUntilEventIsProcessed();
2890 
2891   // Initial chunk of the transfer is sent. This chunk should contain all the
2892   // fields from both legacy and version 2 protocols for backwards
2893   // compatibility.
2894   rpc::PayloadsView payloads =
2895       context_.output().payloads<Transfer::Read>(context_.channel().id());
2896 
2897   ASSERT_EQ(payloads.size(), 1u);
2898   EXPECT_EQ(transfer_status, Status::Unknown());
2899 
2900   Chunk chunk = DecodeChunk(payloads[0]);
2901   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2902   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2903   EXPECT_EQ(chunk.desired_session_id(), 1u);
2904   EXPECT_EQ(chunk.resource_id(), 3u);
2905   EXPECT_EQ(chunk.offset(), 0u);
2906   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2907   EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
2908 
2909   // The server responds with a START_ACK, continuing the version 2 handshake.
2910   context_.server().SendServerStream<Transfer::Read>(
2911       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2912                       .set_session_id(1)
2913                       .set_resource_id(3)));
2914   transfer_thread_.WaitUntilEventIsProcessed();
2915 
2916   ASSERT_EQ(payloads.size(), 2u);
2917 
2918   // Client should accept the session_id with a START_ACK_CONFIRMATION,
2919   // additionally containing the initial parameters for the read transfer.
2920   chunk = DecodeChunk(payloads.back());
2921   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2922   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2923   EXPECT_FALSE(chunk.desired_session_id().has_value());
2924   EXPECT_EQ(chunk.session_id(), 1u);
2925   EXPECT_FALSE(chunk.resource_id().has_value());
2926   EXPECT_EQ(chunk.offset(), 0u);
2927   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2928   EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
2929 
2930   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2931       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2932           .set_session_id(1)
2933           .set_offset(0)
2934           .set_payload(span(kData256).first(kExpectedMaxChunkSize))));
2935   transfer_thread_.WaitUntilEventIsProcessed();
2936 
2937   ASSERT_EQ(payloads.size(), 3u);
2938 
2939   // Window size should double in response to successful receipt.
2940   chunk = DecodeChunk(payloads.back());
2941   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2942   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2943   EXPECT_FALSE(chunk.desired_session_id().has_value());
2944   EXPECT_EQ(chunk.session_id(), 1u);
2945   EXPECT_FALSE(chunk.resource_id().has_value());
2946   EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
2947   EXPECT_EQ(chunk.window_end_offset(),
2948             chunk.offset() + 2 * kExpectedMaxChunkSize);
2949 
2950   // Send the next chunk.
2951   context_.server().SendServerStream<Transfer::Read>(
2952       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2953                       .set_session_id(1)
2954                       .set_offset(chunk.offset())
2955                       .set_payload(span(kData256).subspan(
2956                           chunk.offset(), kExpectedMaxChunkSize))));
2957   transfer_thread_.WaitUntilEventIsProcessed();
2958 
2959   ASSERT_EQ(payloads.size(), 4u);
2960 
2961   // Window size should double in response to successful receipt.
2962   chunk = DecodeChunk(payloads.back());
2963   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2964   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2965   EXPECT_FALSE(chunk.desired_session_id().has_value());
2966   EXPECT_EQ(chunk.session_id(), 1u);
2967   EXPECT_FALSE(chunk.resource_id().has_value());
2968   EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2969   EXPECT_EQ(chunk.window_end_offset(),
2970             chunk.offset() + 4 * kExpectedMaxChunkSize);
2971 
2972   // Finish the transfer.
2973   context_.server().SendServerStream<Transfer::Read>(
2974       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2975                       .set_session_id(1)
2976                       .set_offset(chunk.offset())
2977                       .set_payload(span(kData256).subspan(
2978                           chunk.offset(), kExpectedMaxChunkSize))
2979                       .set_remaining_bytes(0)));
2980   transfer_thread_.WaitUntilEventIsProcessed();
2981 
2982   ASSERT_EQ(payloads.size(), 5u);
2983 
2984   chunk = DecodeChunk(payloads.back());
2985   EXPECT_EQ(chunk.session_id(), 1u);
2986   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2987   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2988   ASSERT_TRUE(chunk.status().has_value());
2989   EXPECT_EQ(chunk.status().value(), OkStatus());
2990 
2991   EXPECT_EQ(transfer_status, OkStatus());
2992   EXPECT_EQ(std::memcmp(writer.data(), kData256.data(), writer.bytes_written()),
2993             0);
2994 
2995   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2996       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2997           .set_session_id(1)));
2998 }
2999 
TEST_F(ReadTransferMaxBytes256,Version2_AdapativeWindow_CongestionAvoidance)3000 TEST_F(ReadTransferMaxBytes256, Version2_AdapativeWindow_CongestionAvoidance) {
3001   stream::MemoryWriterBuffer<256> writer;
3002   Status transfer_status = Status::Unknown();
3003 
3004   constexpr size_t kExpectedMaxChunkSize = 37;
3005 
3006   ASSERT_EQ(
3007       OkStatus(),
3008       client_
3009           .Read(
3010               3,
3011               writer,
3012               [&transfer_status](Status status) { transfer_status = status; },
3013               cfg::kDefaultClientTimeout,
3014               cfg::kDefaultClientTimeout)
3015           .status());
3016 
3017   transfer_thread_.WaitUntilEventIsProcessed();
3018 
3019   // Initial chunk of the transfer is sent. This chunk should contain all the
3020   // fields from both legacy and version 2 protocols for backwards
3021   // compatibility.
3022   rpc::PayloadsView payloads =
3023       context_.output().payloads<Transfer::Read>(context_.channel().id());
3024   ASSERT_EQ(payloads.size(), 1u);
3025   EXPECT_EQ(transfer_status, Status::Unknown());
3026 
3027   Chunk chunk = DecodeChunk(payloads[0]);
3028   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3029   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3030   EXPECT_EQ(chunk.desired_session_id(), 1u);
3031   EXPECT_EQ(chunk.resource_id(), 3u);
3032   EXPECT_EQ(chunk.offset(), 0u);
3033   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3034   EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
3035 
3036   // The server responds with a START_ACK, continuing the version 2 handshake.
3037   context_.server().SendServerStream<Transfer::Read>(
3038       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3039                       .set_session_id(1)
3040                       .set_resource_id(3)));
3041   transfer_thread_.WaitUntilEventIsProcessed();
3042 
3043   ASSERT_EQ(payloads.size(), 2u);
3044 
3045   // Client should accept the session_id with a START_ACK_CONFIRMATION,
3046   // additionally containing the initial parameters for the read transfer.
3047   chunk = DecodeChunk(payloads.back());
3048   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3049   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3050   EXPECT_FALSE(chunk.desired_session_id().has_value());
3051   EXPECT_EQ(chunk.session_id(), 1u);
3052   EXPECT_FALSE(chunk.resource_id().has_value());
3053   EXPECT_EQ(chunk.offset(), 0u);
3054   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3055   EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
3056 
3057   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
3058       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3059           .set_session_id(1)
3060           .set_offset(0)
3061           .set_payload(span(kData256).first(kExpectedMaxChunkSize))));
3062   transfer_thread_.WaitUntilEventIsProcessed();
3063 
3064   ASSERT_EQ(payloads.size(), 3u);
3065 
3066   // Window size should double in response to successful receipt.
3067   chunk = DecodeChunk(payloads.back());
3068   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3069   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3070   EXPECT_FALSE(chunk.desired_session_id().has_value());
3071   EXPECT_EQ(chunk.session_id(), 1u);
3072   EXPECT_FALSE(chunk.resource_id().has_value());
3073   EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
3074   EXPECT_EQ(chunk.window_end_offset(),
3075             chunk.offset() + 2 * kExpectedMaxChunkSize);
3076 
3077   // Send the next chunk.
3078   context_.server().SendServerStream<Transfer::Read>(
3079       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3080                       .set_session_id(1)
3081                       .set_offset(chunk.offset())
3082                       .set_payload(span(kData256).subspan(
3083                           chunk.offset(), kExpectedMaxChunkSize))));
3084   transfer_thread_.WaitUntilEventIsProcessed();
3085 
3086   ASSERT_EQ(payloads.size(), 4u);
3087 
3088   // Window size should double in response to successful receipt.
3089   chunk = DecodeChunk(payloads.back());
3090   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3091   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3092   EXPECT_FALSE(chunk.desired_session_id().has_value());
3093   EXPECT_EQ(chunk.session_id(), 1u);
3094   EXPECT_FALSE(chunk.resource_id().has_value());
3095   EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
3096   EXPECT_EQ(chunk.window_end_offset(),
3097             chunk.offset() + 4 * kExpectedMaxChunkSize);
3098 
3099   // Time out instead of sending another chunk.
3100   transfer_thread_.SimulateClientTimeout(1);
3101 
3102   ASSERT_EQ(payloads.size(), 5u);
3103 
3104   // Window size should half following data loss.
3105   chunk = DecodeChunk(payloads.back());
3106   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3107   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3108   EXPECT_FALSE(chunk.desired_session_id().has_value());
3109   EXPECT_EQ(chunk.session_id(), 1u);
3110   EXPECT_FALSE(chunk.resource_id().has_value());
3111   EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
3112   EXPECT_EQ(chunk.window_end_offset(),
3113             chunk.offset() + 2 * (kExpectedMaxChunkSize - 1));
3114 
3115   // Send another chunk.
3116   context_.server().SendServerStream<Transfer::Read>(
3117       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3118                       .set_session_id(1)
3119                       .set_offset(chunk.offset())
3120                       .set_payload(span(kData256).subspan(
3121                           chunk.offset(), kExpectedMaxChunkSize - 1))));
3122   transfer_thread_.WaitUntilEventIsProcessed();
3123 
3124   ASSERT_EQ(payloads.size(), 6u);
3125 
3126   // Window size should now only increase by 1 instead of doubling.
3127   chunk = DecodeChunk(payloads.back());
3128   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3129   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3130   EXPECT_FALSE(chunk.desired_session_id().has_value());
3131   EXPECT_EQ(chunk.session_id(), 1u);
3132   EXPECT_FALSE(chunk.resource_id().has_value());
3133   EXPECT_EQ(chunk.offset(), 3 * kExpectedMaxChunkSize - 1);
3134   EXPECT_EQ(chunk.window_end_offset(),
3135             chunk.offset() + 3 * (kExpectedMaxChunkSize - 1));
3136 
3137   // Finish the transfer.
3138   context_.server().SendServerStream<Transfer::Read>(
3139       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3140                       .set_session_id(1)
3141                       .set_offset(chunk.offset())
3142                       .set_payload(span(kData256).subspan(
3143                           chunk.offset(), kExpectedMaxChunkSize - 1))
3144                       .set_remaining_bytes(0)));
3145   transfer_thread_.WaitUntilEventIsProcessed();
3146 
3147   ASSERT_EQ(payloads.size(), 7u);
3148 
3149   chunk = DecodeChunk(payloads.back());
3150   EXPECT_EQ(chunk.session_id(), 1u);
3151   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
3152   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3153   ASSERT_TRUE(chunk.status().has_value());
3154   EXPECT_EQ(chunk.status().value(), OkStatus());
3155 
3156   EXPECT_EQ(transfer_status, OkStatus());
3157   EXPECT_EQ(std::memcmp(writer.data(), kData256.data(), writer.bytes_written()),
3158             0);
3159 
3160   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
3161       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
3162           .set_session_id(1)));
3163 }
3164 
TEST_F(WriteTransfer,Write_UpdateTransferSize)3165 TEST_F(WriteTransfer, Write_UpdateTransferSize) {
3166   FakeNonSeekableReader reader(kData32);
3167   Status transfer_status = Status::Unknown();
3168 
3169   Result<Client::Handle> result = client_.Write(
3170       91,
3171       reader,
3172       [&transfer_status](Status status) { transfer_status = status; },
3173       kTestTimeout);
3174   ASSERT_EQ(OkStatus(), result.status());
3175   transfer_thread_.WaitUntilEventIsProcessed();
3176 
3177   Client::Handle handle = *result;
3178   handle.SetTransferSize(kData32.size());
3179   transfer_thread_.WaitUntilEventIsProcessed();
3180 
3181   // Initial chunk of the transfer is sent. This chunk should contain all the
3182   // fields from both legacy and version 2 protocols for backwards
3183   // compatibility.
3184   rpc::PayloadsView payloads =
3185       context_.output().payloads<Transfer::Write>(context_.channel().id());
3186   ASSERT_EQ(payloads.size(), 1u);
3187   EXPECT_EQ(transfer_status, Status::Unknown());
3188 
3189   Chunk chunk = DecodeChunk(payloads[0]);
3190   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3191   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3192   EXPECT_EQ(chunk.desired_session_id(), 1u);
3193   EXPECT_EQ(chunk.resource_id(), 91u);
3194   EXPECT_EQ(chunk.offset(), 0u);
3195 
3196   // The server responds with a START_ACK, continuing the version 2 handshake.
3197   context_.server().SendServerStream<Transfer::Write>(
3198       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3199                       .set_session_id(1)
3200                       .set_resource_id(91)));
3201   transfer_thread_.WaitUntilEventIsProcessed();
3202 
3203   ASSERT_EQ(payloads.size(), 2u);
3204 
3205   // Client should accept the session_id with a START_ACK_CONFIRMATION.
3206   chunk = DecodeChunk(payloads.back());
3207   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3208   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3209   EXPECT_EQ(chunk.session_id(), 1u);
3210   EXPECT_FALSE(chunk.resource_id().has_value());
3211 
3212   // The server can then begin the data transfer by sending its transfer
3213   // parameters. Client should respond with data chunks.
3214   rpc::test::WaitForPackets(context_.output(), 4, [this] {
3215     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
3216         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
3217             .set_session_id(1)
3218             .set_offset(0)
3219             .set_window_end_offset(64)
3220             .set_max_chunk_size_bytes(8)));
3221   });
3222 
3223   ASSERT_EQ(payloads.size(), 6u);
3224 
3225   // Each 8-byte chunk of the 32-byte transfer should have an appropriate
3226   // `remaining_bytes` value set.
3227   chunk = DecodeChunk(payloads[2]);
3228   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3229   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3230   EXPECT_EQ(chunk.session_id(), 1u);
3231   EXPECT_EQ(chunk.offset(), 0u);
3232   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3233   EXPECT_EQ(chunk.remaining_bytes().value(), 24u);
3234 
3235   chunk = DecodeChunk(payloads[3]);
3236   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3237   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3238   EXPECT_EQ(chunk.session_id(), 1u);
3239   EXPECT_EQ(chunk.offset(), 8u);
3240   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3241   EXPECT_EQ(chunk.remaining_bytes().value(), 16u);
3242 
3243   chunk = DecodeChunk(payloads[4]);
3244   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3245   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3246   EXPECT_EQ(chunk.session_id(), 1u);
3247   EXPECT_EQ(chunk.offset(), 16u);
3248   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3249   EXPECT_EQ(chunk.remaining_bytes().value(), 8u);
3250 
3251   chunk = DecodeChunk(payloads[5]);
3252   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3253   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3254   EXPECT_EQ(chunk.session_id(), 1u);
3255   EXPECT_EQ(chunk.offset(), 24u);
3256   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3257   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
3258 
3259   EXPECT_EQ(transfer_status, Status::Unknown());
3260 
3261   // Send the final status chunk to complete the transfer.
3262   context_.server().SendServerStream<Transfer::Write>(
3263       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
3264   transfer_thread_.WaitUntilEventIsProcessed();
3265 
3266   // Client should acknowledge the completion of the transfer.
3267   ASSERT_EQ(payloads.size(), 7u);
3268 
3269   chunk = DecodeChunk(payloads[6]);
3270   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
3271   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3272   EXPECT_EQ(chunk.session_id(), 1u);
3273 
3274   EXPECT_EQ(transfer_status, OkStatus());
3275 
3276   // Ensure we don't leave a dangling reference to transfer_status.
3277   handle.Cancel();
3278   transfer_thread_.WaitUntilEventIsProcessed();
3279 }
3280 
TEST_F(WriteTransfer,Write_TransferSize_Large)3281 TEST_F(WriteTransfer, Write_TransferSize_Large) {
3282   FakeNonSeekableReader reader(kData64);
3283   Status transfer_status = Status::Unknown();
3284 
3285   Result<Client::Handle> result = client_.Write(
3286       91,
3287       reader,
3288       [&transfer_status](Status status) { transfer_status = status; },
3289       kTestTimeout);
3290   ASSERT_EQ(OkStatus(), result.status());
3291   transfer_thread_.WaitUntilEventIsProcessed();
3292 
3293   // Set a large transfer size that will encode to a multibyte varint.
3294   constexpr size_t kLargeRemainingBytes = 1u << 28;
3295   Client::Handle handle = *result;
3296   handle.SetTransferSize(kLargeRemainingBytes);
3297   transfer_thread_.WaitUntilEventIsProcessed();
3298 
3299   // Initial chunk of the transfer is sent. This chunk should contain all the
3300   // fields from both legacy and version 2 protocols for backwards
3301   // compatibility.
3302   rpc::PayloadsView payloads =
3303       context_.output().payloads<Transfer::Write>(context_.channel().id());
3304   ASSERT_EQ(payloads.size(), 1u);
3305   EXPECT_EQ(transfer_status, Status::Unknown());
3306 
3307   Chunk chunk = DecodeChunk(payloads[0]);
3308   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3309   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3310   EXPECT_EQ(chunk.desired_session_id(), 1u);
3311   EXPECT_EQ(chunk.resource_id(), 91u);
3312   EXPECT_EQ(chunk.offset(), 0u);
3313 
3314   // The server responds with a START_ACK, continuing the version 2 handshake.
3315   context_.server().SendServerStream<Transfer::Write>(
3316       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3317                       .set_session_id(1)
3318                       .set_resource_id(91)));
3319   transfer_thread_.WaitUntilEventIsProcessed();
3320 
3321   ASSERT_EQ(payloads.size(), 2u);
3322 
3323   // Client should accept the session_id with a START_ACK_CONFIRMATION.
3324   chunk = DecodeChunk(payloads.back());
3325   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3326   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3327   EXPECT_EQ(chunk.session_id(), 1u);
3328   EXPECT_FALSE(chunk.resource_id().has_value());
3329 
3330   // The server can then begin the data transfer by sending its transfer
3331   // parameters. Client should respond with data chunks.
3332   rpc::test::WaitForPackets(context_.output(), 2, [this] {
3333     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
3334         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
3335             .set_session_id(1)
3336             .set_offset(0)
3337             .set_window_end_offset(64)  // Only request one chunk.
3338             .set_max_chunk_size_bytes(64)));
3339   });
3340 
3341   ASSERT_EQ(payloads.size(), 4u);
3342 
3343   // The transfer should reserve appropriate space for the `remaining_bytes`
3344   // value and not fail to encode.
3345   chunk = DecodeChunk(payloads[2]);
3346   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3347   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3348   EXPECT_EQ(chunk.session_id(), 1u);
3349   EXPECT_EQ(chunk.offset(), 0u);
3350   EXPECT_EQ(chunk.payload().size_bytes(), 48u);
3351   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3352   EXPECT_EQ(chunk.remaining_bytes().value(),
3353             kLargeRemainingBytes - chunk.payload().size_bytes());
3354 
3355   EXPECT_EQ(transfer_status, Status::Unknown());
3356 
3357   // Send the final status chunk to complete the transfer.
3358   context_.server().SendServerStream<Transfer::Write>(
3359       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
3360   transfer_thread_.WaitUntilEventIsProcessed();
3361 
3362   // Client should acknowledge the completion of the transfer.
3363   EXPECT_EQ(payloads.size(), 5u);
3364 
3365   chunk = DecodeChunk(payloads[4]);
3366   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
3367   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3368   EXPECT_EQ(chunk.session_id(), 1u);
3369 
3370   EXPECT_EQ(transfer_status, OkStatus());
3371 
3372   // Ensure we don't leave a dangling reference to transfer_status.
3373   handle.Cancel();
3374   transfer_thread_.WaitUntilEventIsProcessed();
3375 }
3376 
TEST_F(WriteTransfer,Write_TransferSize_SmallerThanResource)3377 TEST_F(WriteTransfer, Write_TransferSize_SmallerThanResource) {
3378   // 64 byte data, but only set a 16 byte size.
3379   constexpr size_t kSmallerTransferSize = 16;
3380   FakeNonSeekableReader reader(kData64);
3381   Status transfer_status = Status::Unknown();
3382 
3383   Result<Client::Handle> result = client_.Write(
3384       92,
3385       reader,
3386       [&transfer_status](Status status) { transfer_status = status; },
3387       kTestTimeout);
3388   ASSERT_EQ(OkStatus(), result.status());
3389   transfer_thread_.WaitUntilEventIsProcessed();
3390 
3391   Client::Handle handle = *result;
3392   handle.SetTransferSize(kSmallerTransferSize);
3393   transfer_thread_.WaitUntilEventIsProcessed();
3394 
3395   // Initial chunk of the transfer is sent. This chunk should contain all the
3396   // fields from both legacy and version 2 protocols for backwards
3397   // compatibility.
3398   rpc::PayloadsView payloads =
3399       context_.output().payloads<Transfer::Write>(context_.channel().id());
3400   ASSERT_EQ(payloads.size(), 1u);
3401   EXPECT_EQ(transfer_status, Status::Unknown());
3402 
3403   Chunk chunk = DecodeChunk(payloads[0]);
3404   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3405   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3406   EXPECT_EQ(chunk.desired_session_id(), 1u);
3407   EXPECT_EQ(chunk.resource_id(), 92u);
3408   EXPECT_EQ(chunk.offset(), 0u);
3409 
3410   // The server responds with a START_ACK, continuing the version 2 handshake.
3411   context_.server().SendServerStream<Transfer::Write>(
3412       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3413                       .set_session_id(1)
3414                       .set_resource_id(92)));
3415   transfer_thread_.WaitUntilEventIsProcessed();
3416 
3417   ASSERT_EQ(payloads.size(), 2u);
3418 
3419   // Client should accept the session_id with a START_ACK_CONFIRMATION.
3420   chunk = DecodeChunk(payloads.back());
3421   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3422   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3423   EXPECT_EQ(chunk.session_id(), 1u);
3424   EXPECT_FALSE(chunk.resource_id().has_value());
3425 
3426   // The server can then begin the data transfer by sending its transfer
3427   // parameters. Client should respond with data chunks.
3428   rpc::test::WaitForPackets(context_.output(), 2, [this] {
3429     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
3430         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
3431             .set_session_id(1)
3432             .set_offset(0)
3433             .set_window_end_offset(64)
3434             .set_max_chunk_size_bytes(8)));
3435   });
3436 
3437   ASSERT_EQ(payloads.size(), 4u);
3438 
3439   // Each 8-byte chunk of the transfer should have an appropriate
3440   // `remaining_bytes` value set.
3441   chunk = DecodeChunk(payloads[2]);
3442   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3443   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3444   EXPECT_EQ(chunk.session_id(), 1u);
3445   EXPECT_EQ(chunk.offset(), 0u);
3446   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3447   EXPECT_EQ(chunk.remaining_bytes().value(), 8u);
3448 
3449   chunk = DecodeChunk(payloads[3]);
3450   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3451   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3452   EXPECT_EQ(chunk.session_id(), 1u);
3453   EXPECT_EQ(chunk.offset(), 8u);
3454   ASSERT_TRUE(chunk.remaining_bytes().has_value());
3455   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
3456 
3457   EXPECT_EQ(transfer_status, Status::Unknown());
3458 
3459   // Send the final status chunk to complete the transfer.
3460   context_.server().SendServerStream<Transfer::Write>(
3461       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
3462   transfer_thread_.WaitUntilEventIsProcessed();
3463 
3464   // Client should acknowledge the completion of the transfer.
3465   ASSERT_EQ(payloads.size(), 5u);
3466 
3467   chunk = DecodeChunk(payloads[4]);
3468   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
3469   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3470   EXPECT_EQ(chunk.session_id(), 1u);
3471 
3472   EXPECT_EQ(transfer_status, OkStatus());
3473 
3474   // Ensure we don't leave a dangling reference to transfer_status.
3475   handle.Cancel();
3476   transfer_thread_.WaitUntilEventIsProcessed();
3477 }
3478 
TEST_F(ReadTransfer,Version2_CancelBeforeServerResponse)3479 TEST_F(ReadTransfer, Version2_CancelBeforeServerResponse) {
3480   stream::MemoryWriterBuffer<64> writer;
3481   Status transfer_status = Status::Unknown();
3482 
3483   Result<Client::Handle> transfer = client_.Read(
3484       3,
3485       writer,
3486       [&transfer_status](Status status) { transfer_status = status; },
3487       cfg::kDefaultClientTimeout,
3488       cfg::kDefaultClientTimeout);
3489   ASSERT_EQ(transfer.status(), OkStatus());
3490 
3491   transfer_thread_.WaitUntilEventIsProcessed();
3492 
3493   // Initial chunk of the transfer is sent. This chunk should contain all the
3494   // fields from both legacy and version 2 protocols for backwards
3495   // compatibility.
3496   rpc::PayloadsView payloads =
3497       context_.output().payloads<Transfer::Read>(context_.channel().id());
3498   ASSERT_EQ(payloads.size(), 1u);
3499   EXPECT_EQ(transfer_status, Status::Unknown());
3500 
3501   Chunk chunk = DecodeChunk(payloads[0]);
3502   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3503   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3504   EXPECT_EQ(chunk.desired_session_id(), 1u);
3505   EXPECT_EQ(chunk.resource_id(), 3u);
3506   EXPECT_EQ(chunk.offset(), 0u);
3507   EXPECT_EQ(chunk.window_end_offset(), 37u);
3508   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
3509 
3510   // Cancel the transfer before the server responds. Since no contact was made,
3511   // no cancellation chunk should be sent.
3512   transfer->Cancel();
3513   transfer_thread_.WaitUntilEventIsProcessed();
3514 
3515   ASSERT_EQ(payloads.size(), 1u);
3516 
3517   // The server responds after the cancellation. The client should notify it
3518   // that the transfer is no longer active.
3519   context_.server().SendServerStream<Transfer::Read>(
3520       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3521                       .set_session_id(1)
3522                       .set_resource_id(3)));
3523   transfer_thread_.WaitUntilEventIsProcessed();
3524 
3525   chunk = DecodeChunk(payloads.back());
3526   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
3527   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3528   EXPECT_EQ(chunk.status(), Status::Cancelled());
3529 }
3530 
3531 }  // namespace
3532 }  // namespace pw::transfer::test
3533