1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/client.h"
16
17 #include <cstring>
18
19 #include "pw_assert/check.h"
20 #include "pw_bytes/array.h"
21 #include "pw_rpc/raw/client_testing.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_status/status.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread_stl/options.h"
26 #include "pw_transfer/internal/config.h"
27 #include "pw_transfer_private/chunk_testing.h"
28 #include "pw_unit_test/framework.h"
29
30 namespace pw::transfer::test {
31 namespace {
32
33 using internal::Chunk;
34 using pw_rpc::raw::Transfer;
35
36 using namespace std::chrono_literals;
37
TransferThreadOptions()38 thread::Options& TransferThreadOptions() {
39 static thread::stl::Options options;
40 return options;
41 }
42
43 class ReadTransfer : public ::testing::Test {
44 protected:
ReadTransfer(size_t max_bytes_to_receive=0)45 ReadTransfer(size_t max_bytes_to_receive = 0)
46 : transfer_thread_(chunk_buffer_, encode_buffer_),
47 legacy_client_(context_.client(),
48 context_.channel().id(),
49 transfer_thread_,
50 max_bytes_to_receive > 0
51 ? max_bytes_to_receive
52 : transfer_thread_.max_chunk_size()),
53 client_(context_.client(),
54 context_.channel().id(),
55 transfer_thread_,
56 max_bytes_to_receive > 0 ? max_bytes_to_receive
57 : transfer_thread_.max_chunk_size()),
58 system_thread_(TransferThreadOptions(), transfer_thread_) {
59 legacy_client_.set_protocol_version(ProtocolVersion::kLegacy);
60 }
61
~ReadTransfer()62 ~ReadTransfer() override {
63 transfer_thread_.Terminate();
64 system_thread_.join();
65 }
66
67 rpc::RawClientTestContext<> context_;
68
69 Thread<1, 1> transfer_thread_;
70 Client legacy_client_;
71 Client client_;
72
73 std::array<std::byte, 64> chunk_buffer_;
74 std::array<std::byte, 64> encode_buffer_;
75
76 pw::Thread system_thread_;
77 };
78
__anonfefc08470202(size_t i) 79 constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
__anonfefc08470302(size_t i) 80 constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
__anonfefc08470402(size_t i) 81 constexpr auto kData256 = bytes::Initialized<256>([](size_t i) { return i; });
82
TEST_F(ReadTransfer,SingleChunk)83 TEST_F(ReadTransfer, SingleChunk) {
84 stream::MemoryWriterBuffer<64> writer;
85 Status transfer_status = Status::Unknown();
86
87 ASSERT_EQ(
88 OkStatus(),
89 legacy_client_
90 .Read(3,
91 writer,
92 [&transfer_status](Status status) { transfer_status = status; })
93 .status());
94
95 transfer_thread_.WaitUntilEventIsProcessed();
96
97 // First transfer parameters chunk is sent.
98 rpc::PayloadsView payloads =
99 context_.output().payloads<Transfer::Read>(context_.channel().id());
100 ASSERT_EQ(payloads.size(), 1u);
101 EXPECT_EQ(transfer_status, Status::Unknown());
102
103 Chunk c0 = DecodeChunk(payloads[0]);
104 EXPECT_EQ(c0.session_id(), 3u);
105 EXPECT_EQ(c0.resource_id(), 3u);
106 EXPECT_EQ(c0.offset(), 0u);
107 EXPECT_EQ(c0.window_end_offset(), 37u);
108 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
109
110 context_.server().SendServerStream<Transfer::Read>(
111 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
112 .set_session_id(3)
113 .set_offset(0)
114 .set_payload(kData32)
115 .set_remaining_bytes(0)));
116 transfer_thread_.WaitUntilEventIsProcessed();
117
118 ASSERT_EQ(payloads.size(), 2u);
119
120 Chunk c1 = DecodeChunk(payloads.back());
121 EXPECT_EQ(c1.session_id(), 3u);
122 ASSERT_TRUE(c1.status().has_value());
123 EXPECT_EQ(c1.status().value(), OkStatus());
124
125 EXPECT_EQ(transfer_status, OkStatus());
126 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
127 0);
128 }
129
TEST_F(ReadTransfer,MultiChunk)130 TEST_F(ReadTransfer, MultiChunk) {
131 stream::MemoryWriterBuffer<64> writer;
132 Status transfer_status = Status::Unknown();
133
134 ASSERT_EQ(
135 OkStatus(),
136 legacy_client_
137 .Read(4,
138 writer,
139 [&transfer_status](Status status) { transfer_status = status; })
140 .status());
141
142 transfer_thread_.WaitUntilEventIsProcessed();
143
144 // First transfer parameters chunk is sent.
145 rpc::PayloadsView payloads =
146 context_.output().payloads<Transfer::Read>(context_.channel().id());
147 ASSERT_EQ(payloads.size(), 1u);
148 EXPECT_EQ(transfer_status, Status::Unknown());
149
150 Chunk c0 = DecodeChunk(payloads[0]);
151 EXPECT_EQ(c0.session_id(), 4u);
152 EXPECT_EQ(c0.resource_id(), 4u);
153 EXPECT_EQ(c0.offset(), 0u);
154 EXPECT_EQ(c0.window_end_offset(), 37u);
155 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
156
157 constexpr ConstByteSpan data(kData32);
158 context_.server().SendServerStream<Transfer::Read>(
159 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
160 .set_session_id(4)
161 .set_offset(0)
162 .set_payload(data.first(16))));
163 transfer_thread_.WaitUntilEventIsProcessed();
164
165 ASSERT_EQ(payloads.size(), 1u);
166
167 context_.server().SendServerStream<Transfer::Read>(
168 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
169 .set_session_id(4)
170 .set_offset(16)
171 .set_payload(data.subspan(16))
172 .set_remaining_bytes(0)));
173 transfer_thread_.WaitUntilEventIsProcessed();
174
175 ASSERT_EQ(payloads.size(), 2u);
176
177 Chunk c1 = DecodeChunk(payloads[1]);
178 EXPECT_EQ(c1.session_id(), 4u);
179 ASSERT_TRUE(c1.status().has_value());
180 EXPECT_EQ(c1.status().value(), OkStatus());
181
182 EXPECT_EQ(transfer_status, OkStatus());
183 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
184 0);
185 }
186
TEST_F(ReadTransfer,MultipleTransfers)187 TEST_F(ReadTransfer, MultipleTransfers) {
188 stream::MemoryWriterBuffer<64> writer;
189 Status transfer_status = Status::Unknown();
190
191 ASSERT_EQ(
192 OkStatus(),
193 legacy_client_
194 .Read(3,
195 writer,
196 [&transfer_status](Status status) { transfer_status = status; })
197 .status());
198 transfer_thread_.WaitUntilEventIsProcessed();
199
200 context_.server().SendServerStream<Transfer::Read>(
201 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
202 .set_session_id(3)
203 .set_offset(0)
204 .set_payload(kData32)
205 .set_remaining_bytes(0)));
206 transfer_thread_.WaitUntilEventIsProcessed();
207
208 ASSERT_EQ(transfer_status, OkStatus());
209 transfer_status = Status::Unknown();
210
211 ASSERT_EQ(
212 OkStatus(),
213 legacy_client_
214 .Read(3,
215 writer,
216 [&transfer_status](Status status) { transfer_status = status; })
217 .status());
218 transfer_thread_.WaitUntilEventIsProcessed();
219
220 context_.server().SendServerStream<Transfer::Read>(
221 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
222 .set_session_id(3)
223 .set_offset(0)
224 .set_payload(kData32)
225 .set_remaining_bytes(0)));
226 transfer_thread_.WaitUntilEventIsProcessed();
227
228 EXPECT_EQ(transfer_status, OkStatus());
229 }
230
231 class ReadTransferMaxBytes32 : public ReadTransfer {
232 protected:
ReadTransferMaxBytes32()233 ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
234 };
235
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromConstructorArg)236 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
237 stream::MemoryWriterBuffer<64> writer;
238 EXPECT_EQ(OkStatus(), legacy_client_.Read(5, writer, [](Status) {}).status());
239 transfer_thread_.WaitUntilEventIsProcessed();
240
241 // First transfer parameters chunk is sent.
242 rpc::PayloadsView payloads =
243 context_.output().payloads<Transfer::Read>(context_.channel().id());
244 ASSERT_EQ(payloads.size(), 1u);
245
246 Chunk c0 = DecodeChunk(payloads[0]);
247 EXPECT_EQ(c0.session_id(), 5u);
248 EXPECT_EQ(c0.resource_id(), 5u);
249 EXPECT_EQ(c0.offset(), 0u);
250 EXPECT_EQ(c0.window_end_offset(), 32u);
251 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
252 }
253
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromWriterLimit)254 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
255 stream::MemoryWriterBuffer<16> small_writer;
256 EXPECT_EQ(OkStatus(),
257 legacy_client_.Read(5, small_writer, [](Status) {}).status());
258 transfer_thread_.WaitUntilEventIsProcessed();
259
260 // First transfer parameters chunk is sent.
261 rpc::PayloadsView payloads =
262 context_.output().payloads<Transfer::Read>(context_.channel().id());
263 ASSERT_EQ(payloads.size(), 1u);
264
265 Chunk c0 = DecodeChunk(payloads[0]);
266 EXPECT_EQ(c0.session_id(), 5u);
267 EXPECT_EQ(c0.resource_id(), 5u);
268 EXPECT_EQ(c0.offset(), 0u);
269 EXPECT_EQ(c0.window_end_offset(), 16u);
270 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
271 }
272
TEST_F(ReadTransferMaxBytes32,MultiParameters)273 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
274 stream::MemoryWriterBuffer<64> writer;
275 Status transfer_status = Status::Unknown();
276
277 ASSERT_EQ(
278 OkStatus(),
279 legacy_client_
280 .Read(6,
281 writer,
282 [&transfer_status](Status status) { transfer_status = status; })
283 .status());
284 transfer_thread_.WaitUntilEventIsProcessed();
285
286 // First transfer parameters chunk is sent.
287 rpc::PayloadsView payloads =
288 context_.output().payloads<Transfer::Read>(context_.channel().id());
289 ASSERT_EQ(payloads.size(), 1u);
290 EXPECT_EQ(transfer_status, Status::Unknown());
291
292 Chunk c0 = DecodeChunk(payloads[0]);
293 EXPECT_EQ(c0.session_id(), 6u);
294 EXPECT_EQ(c0.resource_id(), 6u);
295 EXPECT_EQ(c0.offset(), 0u);
296 ASSERT_EQ(c0.window_end_offset(), 32u);
297
298 constexpr ConstByteSpan data(kData64);
299 context_.server().SendServerStream<Transfer::Read>(
300 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
301 .set_session_id(6)
302 .set_offset(0)
303 .set_payload(data.first(32))));
304 transfer_thread_.WaitUntilEventIsProcessed();
305
306 ASSERT_EQ(payloads.size(), 2u);
307 EXPECT_EQ(transfer_status, Status::Unknown());
308
309 // Second parameters chunk.
310 Chunk c1 = DecodeChunk(payloads[1]);
311 EXPECT_EQ(c1.session_id(), 6u);
312 EXPECT_EQ(c1.offset(), 32u);
313 ASSERT_EQ(c1.window_end_offset(), 64u);
314
315 context_.server().SendServerStream<Transfer::Read>(
316 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
317 .set_session_id(6)
318 .set_offset(32)
319 .set_payload(data.subspan(32))
320 .set_remaining_bytes(0)));
321 transfer_thread_.WaitUntilEventIsProcessed();
322
323 ASSERT_EQ(payloads.size(), 3u);
324
325 Chunk c2 = DecodeChunk(payloads[2]);
326 EXPECT_EQ(c2.session_id(), 6u);
327 ASSERT_TRUE(c2.status().has_value());
328 EXPECT_EQ(c2.status().value(), OkStatus());
329
330 EXPECT_EQ(transfer_status, OkStatus());
331 EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
332 }
333
TEST_F(ReadTransfer,UnexpectedOffset)334 TEST_F(ReadTransfer, UnexpectedOffset) {
335 stream::MemoryWriterBuffer<64> writer;
336 Status transfer_status = Status::Unknown();
337
338 ASSERT_EQ(
339 OkStatus(),
340 legacy_client_
341 .Read(7,
342 writer,
343 [&transfer_status](Status status) { transfer_status = status; })
344 .status());
345 transfer_thread_.WaitUntilEventIsProcessed();
346
347 // First transfer parameters chunk is sent.
348 rpc::PayloadsView payloads =
349 context_.output().payloads<Transfer::Read>(context_.channel().id());
350 ASSERT_EQ(payloads.size(), 1u);
351 EXPECT_EQ(transfer_status, Status::Unknown());
352
353 Chunk c0 = DecodeChunk(payloads[0]);
354 EXPECT_EQ(c0.session_id(), 7u);
355 EXPECT_EQ(c0.resource_id(), 7u);
356 EXPECT_EQ(c0.offset(), 0u);
357 EXPECT_EQ(c0.window_end_offset(), 37u);
358
359 constexpr ConstByteSpan data(kData32);
360 context_.server().SendServerStream<Transfer::Read>(
361 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
362 .set_session_id(7)
363 .set_offset(0)
364 .set_payload(data.first(16))));
365 transfer_thread_.WaitUntilEventIsProcessed();
366
367 ASSERT_EQ(payloads.size(), 1u);
368 EXPECT_EQ(transfer_status, Status::Unknown());
369
370 // Send a chunk with an incorrect offset. The client should resend parameters.
371 context_.server().SendServerStream<Transfer::Read>(
372 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
373 .set_session_id(7)
374 .set_offset(8) // wrong!
375 .set_payload(data.subspan(16))
376 .set_remaining_bytes(0)));
377 transfer_thread_.WaitUntilEventIsProcessed();
378
379 ASSERT_EQ(payloads.size(), 2u);
380 EXPECT_EQ(transfer_status, Status::Unknown());
381
382 Chunk c1 = DecodeChunk(payloads[1]);
383 EXPECT_EQ(c1.session_id(), 7u);
384 EXPECT_EQ(c1.offset(), 16u);
385 EXPECT_EQ(c1.window_end_offset(), 53u);
386
387 // Send the correct chunk, completing the transfer.
388 context_.server().SendServerStream<Transfer::Read>(
389 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
390 .set_session_id(7)
391 .set_offset(16)
392 .set_payload(data.subspan(16))
393 .set_remaining_bytes(0)));
394 transfer_thread_.WaitUntilEventIsProcessed();
395
396 ASSERT_EQ(payloads.size(), 3u);
397
398 Chunk c2 = DecodeChunk(payloads[2]);
399 EXPECT_EQ(c2.session_id(), 7u);
400 ASSERT_TRUE(c2.status().has_value());
401 EXPECT_EQ(c2.status().value(), OkStatus());
402
403 EXPECT_EQ(transfer_status, OkStatus());
404 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
405 0);
406 }
407
TEST_F(ReadTransferMaxBytes32,TooMuchData_EntersRecovery)408 TEST_F(ReadTransferMaxBytes32, TooMuchData_EntersRecovery) {
409 stream::MemoryWriterBuffer<32> writer;
410 Status transfer_status = Status::Unknown();
411
412 ASSERT_EQ(
413 OkStatus(),
414 legacy_client_
415 .Read(8,
416 writer,
417 [&transfer_status](Status status) { transfer_status = status; })
418 .status());
419 transfer_thread_.WaitUntilEventIsProcessed();
420
421 // First transfer parameters chunk is sent.
422 rpc::PayloadsView payloads =
423 context_.output().payloads<Transfer::Read>(context_.channel().id());
424 ASSERT_EQ(payloads.size(), 1u);
425 EXPECT_EQ(transfer_status, Status::Unknown());
426
427 Chunk c0 = DecodeChunk(payloads[0]);
428 EXPECT_EQ(c0.session_id(), 8u);
429 EXPECT_EQ(c0.resource_id(), 8u);
430 EXPECT_EQ(c0.offset(), 0u);
431 ASSERT_EQ(c0.window_end_offset(), 32u);
432
433 constexpr ConstByteSpan data(kData64);
434
435 // pending_bytes == 32
436 context_.server().SendServerStream<Transfer::Read>(
437 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
438 .set_session_id(8)
439 .set_offset(0)
440 .set_payload(data.first(16))));
441
442 // pending_bytes == 16
443 context_.server().SendServerStream<Transfer::Read>(
444 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
445 .set_session_id(8)
446 .set_offset(16)
447 .set_payload(data.subspan(16, 8))));
448
449 // pending_bytes == 8, send 16 instead.
450 context_.server().SendServerStream<Transfer::Read>(
451 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
452 .set_session_id(8)
453 .set_offset(24)
454 .set_payload(data.subspan(24, 16))));
455 transfer_thread_.WaitUntilEventIsProcessed();
456
457 ASSERT_EQ(payloads.size(), 4u);
458
459 // The device should resend a parameters chunk.
460 Chunk c1 = DecodeChunk(payloads[3]);
461 EXPECT_EQ(c1.session_id(), 8u);
462 EXPECT_EQ(c1.type(), Chunk::Type::kParametersRetransmit);
463 EXPECT_EQ(c1.offset(), 24u);
464 EXPECT_EQ(c1.window_end_offset(), 32u);
465
466 context_.server().SendServerStream<Transfer::Read>(
467 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
468 .set_session_id(8)
469 .set_offset(24)
470 .set_payload(data.subspan(24, 8))
471 .set_remaining_bytes(0)));
472 transfer_thread_.WaitUntilEventIsProcessed();
473
474 EXPECT_EQ(transfer_status, OkStatus());
475 }
476
TEST_F(ReadTransferMaxBytes32,TooMuchData_HitsLifetimeRetries)477 TEST_F(ReadTransferMaxBytes32, TooMuchData_HitsLifetimeRetries) {
478 stream::MemoryWriterBuffer<32> writer;
479 Status transfer_status = Status::Unknown();
480
481 constexpr int kLowMaxLifetimeRetries = 3;
482 legacy_client_.set_max_lifetime_retries(kLowMaxLifetimeRetries).IgnoreError();
483
484 ASSERT_EQ(
485 OkStatus(),
486 legacy_client_
487 .Read(8,
488 writer,
489 [&transfer_status](Status status) { transfer_status = status; })
490 .status());
491 transfer_thread_.WaitUntilEventIsProcessed();
492
493 // First transfer parameters chunk is sent.
494 rpc::PayloadsView payloads =
495 context_.output().payloads<Transfer::Read>(context_.channel().id());
496 ASSERT_EQ(payloads.size(), 1u);
497 EXPECT_EQ(transfer_status, Status::Unknown());
498
499 Chunk c0 = DecodeChunk(payloads[0]);
500 EXPECT_EQ(c0.session_id(), 8u);
501 EXPECT_EQ(c0.resource_id(), 8u);
502 EXPECT_EQ(c0.offset(), 0u);
503 ASSERT_EQ(c0.window_end_offset(), 32u);
504
505 constexpr ConstByteSpan data(kData64);
506
507 // pending_bytes == 32
508 context_.server().SendServerStream<Transfer::Read>(
509 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
510 .set_session_id(8)
511 .set_offset(0)
512 .set_payload(data.first(16))));
513
514 // pending_bytes == 16
515 context_.server().SendServerStream<Transfer::Read>(
516 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
517 .set_session_id(8)
518 .set_offset(16)
519 .set_payload(data.subspan(16, 8))));
520
521 // pending_bytes == 8, but send 16 several times.
522 for (int i = 0; i < kLowMaxLifetimeRetries; ++i) {
523 context_.server().SendServerStream<Transfer::Read>(
524 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
525 .set_session_id(8)
526 .set_offset(24)
527 .set_payload(data.subspan(24, 16))));
528 transfer_thread_.WaitUntilEventIsProcessed();
529
530 ASSERT_EQ(payloads.size(), 4u + i);
531
532 // The device should resend a parameters chunk.
533 Chunk c = DecodeChunk(payloads.back());
534 EXPECT_EQ(c.session_id(), 8u);
535 EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
536 }
537 EXPECT_EQ(transfer_status, Status::Unknown());
538
539 // Send one more incorrectly-sized chunk. The transfer should fail.
540 context_.server().SendServerStream<Transfer::Read>(
541 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
542 .set_session_id(8)
543 .set_offset(24)
544 .set_payload(data.subspan(24, 16))));
545 transfer_thread_.WaitUntilEventIsProcessed();
546
547 ASSERT_EQ(payloads.size(), 7u);
548 Chunk error = DecodeChunk(payloads.back());
549 EXPECT_EQ(error.session_id(), 8u);
550 EXPECT_EQ(error.type(), Chunk::Type::kCompletion);
551 EXPECT_EQ(error.status(), Status::Internal());
552
553 EXPECT_EQ(transfer_status, Status::Internal());
554 }
555
TEST_F(ReadTransfer,ServerError)556 TEST_F(ReadTransfer, ServerError) {
557 stream::MemoryWriterBuffer<64> writer;
558 Status transfer_status = Status::Unknown();
559
560 ASSERT_EQ(
561 OkStatus(),
562 legacy_client_
563 .Read(9,
564 writer,
565 [&transfer_status](Status status) { transfer_status = status; })
566 .status());
567 transfer_thread_.WaitUntilEventIsProcessed();
568
569 // First transfer parameters chunk is sent.
570 rpc::PayloadsView payloads =
571 context_.output().payloads<Transfer::Read>(context_.channel().id());
572 ASSERT_EQ(payloads.size(), 1u);
573 EXPECT_EQ(transfer_status, Status::Unknown());
574
575 Chunk c0 = DecodeChunk(payloads[0]);
576 EXPECT_EQ(c0.session_id(), 9u);
577 EXPECT_EQ(c0.resource_id(), 9u);
578 EXPECT_EQ(c0.offset(), 0u);
579 ASSERT_EQ(c0.window_end_offset(), 37u);
580
581 // Server sends an error. Client should not respond and terminate the
582 // transfer.
583 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
584 Chunk::Final(ProtocolVersion::kLegacy, 9, Status::NotFound())));
585 transfer_thread_.WaitUntilEventIsProcessed();
586
587 ASSERT_EQ(payloads.size(), 1u);
588 EXPECT_EQ(transfer_status, Status::NotFound());
589 }
590
TEST_F(ReadTransfer,OnlySendsParametersOnceAfterDrop)591 TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
592 stream::MemoryWriterBuffer<64> writer;
593 Status transfer_status = Status::Unknown();
594
595 ASSERT_EQ(
596 OkStatus(),
597 legacy_client_
598 .Read(10,
599 writer,
600 [&transfer_status](Status status) { transfer_status = status; })
601 .status());
602 transfer_thread_.WaitUntilEventIsProcessed();
603
604 // First transfer parameters chunk is sent.
605 rpc::PayloadsView payloads =
606 context_.output().payloads<Transfer::Read>(context_.channel().id());
607 ASSERT_EQ(payloads.size(), 1u);
608 EXPECT_EQ(transfer_status, Status::Unknown());
609
610 Chunk c0 = DecodeChunk(payloads[0]);
611 EXPECT_EQ(c0.session_id(), 10u);
612 EXPECT_EQ(c0.resource_id(), 10u);
613 EXPECT_EQ(c0.offset(), 0u);
614 ASSERT_EQ(c0.window_end_offset(), 37u);
615
616 constexpr ConstByteSpan data(kData32);
617
618 // Send the first 8 bytes of the transfer.
619 context_.server().SendServerStream<Transfer::Read>(
620 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
621 .set_session_id(10)
622 .set_offset(0)
623 .set_payload(data.first(8))));
624
625 // Skip offset 8, send the rest starting from 16.
626 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
627 context_.server().SendServerStream<Transfer::Read>(
628 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
629 .set_session_id(10)
630 .set_offset(offset)
631 .set_payload(data.subspan(offset, 8))));
632 }
633 transfer_thread_.WaitUntilEventIsProcessed();
634
635 // Only one parameters update should be sent, with the offset of the initial
636 // dropped packet.
637 ASSERT_EQ(payloads.size(), 2u);
638
639 Chunk c1 = DecodeChunk(payloads[1]);
640 EXPECT_EQ(c1.session_id(), 10u);
641 EXPECT_EQ(c1.offset(), 8u);
642 ASSERT_EQ(c1.window_end_offset(), 45u);
643
644 // Send the remaining data to complete the transfer.
645 context_.server().SendServerStream<Transfer::Read>(
646 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
647 .set_session_id(10)
648 .set_offset(8)
649 .set_payload(data.subspan(8))
650 .set_remaining_bytes(0)));
651 transfer_thread_.WaitUntilEventIsProcessed();
652
653 ASSERT_EQ(payloads.size(), 3u);
654
655 Chunk c2 = DecodeChunk(payloads[2]);
656 EXPECT_EQ(c2.session_id(), 10u);
657 ASSERT_TRUE(c2.status().has_value());
658 EXPECT_EQ(c2.status().value(), OkStatus());
659
660 EXPECT_EQ(transfer_status, OkStatus());
661 }
662
TEST_F(ReadTransfer,ResendsParametersIfSentRepeatedChunkDuringRecovery)663 TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
664 stream::MemoryWriterBuffer<64> writer;
665 Status transfer_status = Status::Unknown();
666
667 ASSERT_EQ(
668 OkStatus(),
669 legacy_client_
670 .Read(11,
671 writer,
672 [&transfer_status](Status status) { transfer_status = status; })
673 .status());
674 transfer_thread_.WaitUntilEventIsProcessed();
675
676 // First transfer parameters chunk is sent.
677 rpc::PayloadsView payloads =
678 context_.output().payloads<Transfer::Read>(context_.channel().id());
679 ASSERT_EQ(payloads.size(), 1u);
680 EXPECT_EQ(transfer_status, Status::Unknown());
681
682 Chunk c0 = DecodeChunk(payloads[0]);
683 EXPECT_EQ(c0.session_id(), 11u);
684 EXPECT_EQ(c0.resource_id(), 11u);
685 EXPECT_EQ(c0.offset(), 0u);
686 ASSERT_EQ(c0.window_end_offset(), 37u);
687
688 constexpr ConstByteSpan data(kData32);
689
690 // Send the first 8 bytes of the transfer.
691 context_.server().SendServerStream<Transfer::Read>(
692 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
693 .set_session_id(11)
694 .set_offset(0)
695 .set_payload(data.first(8))));
696
697 // Skip offset 8, send the rest starting from 16.
698 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
699 context_.server().SendServerStream<Transfer::Read>(
700 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
701 .set_session_id(11)
702 .set_offset(offset)
703 .set_payload(data.subspan(offset, 8))));
704 }
705 transfer_thread_.WaitUntilEventIsProcessed();
706
707 // Only one parameters update should be sent, with the offset of the initial
708 // dropped packet.
709 ASSERT_EQ(payloads.size(), 2u);
710
711 const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
712 .set_session_id(11)
713 .set_offset(24)
714 .set_payload(data.subspan(24));
715
716 // Re-send the final chunk of the block.
717 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
718 transfer_thread_.WaitUntilEventIsProcessed();
719
720 // The original drop parameters should be re-sent.
721 ASSERT_EQ(payloads.size(), 3u);
722 Chunk c2 = DecodeChunk(payloads[2]);
723 EXPECT_EQ(c2.session_id(), 11u);
724 EXPECT_EQ(c2.offset(), 8u);
725 ASSERT_EQ(c2.window_end_offset(), 45u);
726
727 // Do it again.
728 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
729 transfer_thread_.WaitUntilEventIsProcessed();
730
731 ASSERT_EQ(payloads.size(), 4u);
732 Chunk c3 = DecodeChunk(payloads[3]);
733 EXPECT_EQ(c3.session_id(), 11u);
734 EXPECT_EQ(c3.offset(), 8u);
735 ASSERT_EQ(c3.window_end_offset(), 45u);
736
737 // Finish the transfer normally.
738 context_.server().SendServerStream<Transfer::Read>(
739 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
740 .set_session_id(11)
741 .set_offset(8)
742 .set_payload(data.subspan(8))
743 .set_remaining_bytes(0)));
744 transfer_thread_.WaitUntilEventIsProcessed();
745
746 ASSERT_EQ(payloads.size(), 5u);
747
748 Chunk c4 = DecodeChunk(payloads[4]);
749 EXPECT_EQ(c4.session_id(), 11u);
750 ASSERT_TRUE(c4.status().has_value());
751 EXPECT_EQ(c4.status().value(), OkStatus());
752
753 EXPECT_EQ(transfer_status, OkStatus());
754 }
755
756 // Use a long timeout to avoid accidentally triggering timeouts.
757 constexpr chrono::SystemClock::duration kTestTimeout = std::chrono::seconds(30);
758 constexpr uint8_t kTestRetries = 3;
759
TEST_F(ReadTransfer,Timeout_ResendsCurrentParameters)760 TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
761 stream::MemoryWriterBuffer<64> writer;
762 Status transfer_status = Status::Unknown();
763
764 ASSERT_EQ(
765 OkStatus(),
766 legacy_client_
767 .Read(
768 12,
769 writer,
770 [&transfer_status](Status status) { transfer_status = status; },
771 kTestTimeout,
772 kTestTimeout)
773 .status());
774 transfer_thread_.WaitUntilEventIsProcessed();
775
776 // First transfer parameters chunk is sent.
777 rpc::PayloadsView payloads =
778 context_.output().payloads<Transfer::Read>(context_.channel().id());
779 ASSERT_EQ(payloads.size(), 1u);
780 EXPECT_EQ(transfer_status, Status::Unknown());
781
782 Chunk c0 = DecodeChunk(payloads.back());
783 EXPECT_EQ(c0.session_id(), 12u);
784 EXPECT_EQ(c0.resource_id(), 12u);
785 EXPECT_EQ(c0.offset(), 0u);
786 EXPECT_EQ(c0.window_end_offset(), 37u);
787 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
788
789 // Wait for the timeout to expire without doing anything. The client should
790 // resend its initial parameters chunk.
791 transfer_thread_.SimulateClientTimeout(12);
792 ASSERT_EQ(payloads.size(), 2u);
793
794 Chunk c = DecodeChunk(payloads.back());
795 EXPECT_EQ(c.session_id(), 12u);
796 EXPECT_EQ(c.offset(), 0u);
797 EXPECT_EQ(c.window_end_offset(), 37u);
798 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
799
800 // Transfer has not yet completed.
801 EXPECT_EQ(transfer_status, Status::Unknown());
802
803 // Finish the transfer following the timeout.
804 context_.server().SendServerStream<Transfer::Read>(
805 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
806 .set_session_id(12)
807 .set_offset(0)
808 .set_payload(kData32)
809 .set_remaining_bytes(0)));
810 transfer_thread_.WaitUntilEventIsProcessed();
811
812 ASSERT_EQ(payloads.size(), 3u);
813
814 Chunk c4 = DecodeChunk(payloads.back());
815 EXPECT_EQ(c4.session_id(), 12u);
816 ASSERT_TRUE(c4.status().has_value());
817 EXPECT_EQ(c4.status().value(), OkStatus());
818
819 EXPECT_EQ(transfer_status, OkStatus());
820 }
821
TEST_F(ReadTransfer,Timeout_ResendsUpdatedParameters)822 TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
823 stream::MemoryWriterBuffer<64> writer;
824 Status transfer_status = Status::Unknown();
825
826 ASSERT_EQ(
827 OkStatus(),
828 legacy_client_
829 .Read(
830 13,
831 writer,
832 [&transfer_status](Status status) { transfer_status = status; },
833 kTestTimeout)
834 .status());
835 transfer_thread_.WaitUntilEventIsProcessed();
836
837 // First transfer parameters chunk is sent.
838 rpc::PayloadsView payloads =
839 context_.output().payloads<Transfer::Read>(context_.channel().id());
840 ASSERT_EQ(payloads.size(), 1u);
841 EXPECT_EQ(transfer_status, Status::Unknown());
842
843 Chunk c0 = DecodeChunk(payloads.back());
844 EXPECT_EQ(c0.session_id(), 13u);
845 EXPECT_EQ(c0.resource_id(), 13u);
846 EXPECT_EQ(c0.offset(), 0u);
847 EXPECT_EQ(c0.window_end_offset(), 37u);
848 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
849
850 constexpr ConstByteSpan data(kData32);
851
852 // Send some data, but not everything.
853 context_.server().SendServerStream<Transfer::Read>(
854 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
855 .set_session_id(13)
856 .set_offset(0)
857 .set_payload(data.first(16))));
858 transfer_thread_.WaitUntilEventIsProcessed();
859
860 ASSERT_EQ(payloads.size(), 1u);
861
862 // Wait for the timeout to expire without sending more data. The client should
863 // send an updated parameters chunk, accounting for the data already received.
864 transfer_thread_.SimulateClientTimeout(13);
865 ASSERT_EQ(payloads.size(), 2u);
866
867 Chunk c = DecodeChunk(payloads.back());
868 EXPECT_EQ(c.session_id(), 13u);
869 EXPECT_EQ(c.offset(), 16u);
870 EXPECT_EQ(c.window_end_offset(), 53u);
871 EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
872
873 // Transfer has not yet completed.
874 EXPECT_EQ(transfer_status, Status::Unknown());
875
876 // Send the rest of the data, finishing the transfer.
877 context_.server().SendServerStream<Transfer::Read>(
878 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
879 .set_session_id(13)
880 .set_offset(16)
881 .set_payload(data.subspan(16))
882 .set_remaining_bytes(0)));
883 transfer_thread_.WaitUntilEventIsProcessed();
884
885 ASSERT_EQ(payloads.size(), 3u);
886
887 Chunk c4 = DecodeChunk(payloads.back());
888 EXPECT_EQ(c4.session_id(), 13u);
889 ASSERT_TRUE(c4.status().has_value());
890 EXPECT_EQ(c4.status().value(), OkStatus());
891
892 EXPECT_EQ(transfer_status, OkStatus());
893 }
894
TEST_F(ReadTransfer,Timeout_EndsTransferAfterMaxRetries)895 TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
896 stream::MemoryWriterBuffer<64> writer;
897 Status transfer_status = Status::Unknown();
898
899 Result<Client::Handle> handle = legacy_client_.Read(
900 14,
901 writer,
902 [&transfer_status](Status status) { transfer_status = status; },
903 kTestTimeout,
904 kTestTimeout);
905 ASSERT_EQ(OkStatus(), handle.status());
906 transfer_thread_.WaitUntilEventIsProcessed();
907
908 // First transfer parameters chunk is sent.
909 rpc::PayloadsView payloads =
910 context_.output().payloads<Transfer::Read>(context_.channel().id());
911 ASSERT_EQ(payloads.size(), 1u);
912 EXPECT_EQ(transfer_status, Status::Unknown());
913
914 Chunk c0 = DecodeChunk(payloads.back());
915 EXPECT_EQ(c0.session_id(), 14u);
916 EXPECT_EQ(c0.resource_id(), 14u);
917 EXPECT_EQ(c0.offset(), 0u);
918 EXPECT_EQ(c0.window_end_offset(), 37u);
919 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
920
921 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
922 // Wait for the timeout to expire without doing anything. The client should
923 // resend its parameters chunk.
924 transfer_thread_.SimulateClientTimeout(14);
925 ASSERT_EQ(payloads.size(), retry + 1);
926
927 Chunk c = DecodeChunk(payloads.back());
928 EXPECT_EQ(c.session_id(), 14u);
929 EXPECT_EQ(c.offset(), 0u);
930 EXPECT_EQ(c.window_end_offset(), 37u);
931
932 // Transfer has not yet completed.
933 EXPECT_EQ(transfer_status, Status::Unknown());
934 }
935
936 // Time out one more time after the final retry. The client should cancel the
937 // transfer at this point. As no packets were received from the server, no
938 // final status chunk should be sent.
939 transfer_thread_.SimulateClientTimeout(14);
940 ASSERT_EQ(payloads.size(), 4u);
941
942 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
943
944 // After finishing the transfer, nothing else should be sent.
945 transfer_thread_.SimulateClientTimeout(14);
946 transfer_thread_.SimulateClientTimeout(14);
947 transfer_thread_.SimulateClientTimeout(14);
948 ASSERT_EQ(payloads.size(), 4u);
949 }
950
TEST_F(ReadTransfer,Timeout_ReceivingDataResetsRetryCount)951 TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
952 stream::MemoryWriterBuffer<64> writer;
953 Status transfer_status = Status::Unknown();
954
955 constexpr ConstByteSpan data(kData32);
956
957 Result<Client::Handle> handle = legacy_client_.Read(
958 14,
959 writer,
960 [&transfer_status](Status status) { transfer_status = status; },
961 kTestTimeout,
962 kTestTimeout);
963 ASSERT_EQ(OkStatus(), handle.status());
964 transfer_thread_.WaitUntilEventIsProcessed();
965
966 // First transfer parameters chunk is sent.
967 rpc::PayloadsView payloads =
968 context_.output().payloads<Transfer::Read>(context_.channel().id());
969 ASSERT_EQ(payloads.size(), 1u);
970 EXPECT_EQ(transfer_status, Status::Unknown());
971
972 Chunk c0 = DecodeChunk(payloads.back());
973 EXPECT_EQ(c0.session_id(), 14u);
974 EXPECT_EQ(c0.resource_id(), 14u);
975 EXPECT_EQ(c0.offset(), 0u);
976 EXPECT_EQ(c0.window_end_offset(), 37u);
977
978 // Simulate one less timeout than the maximum amount of retries.
979 for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
980 transfer_thread_.SimulateClientTimeout(14);
981 ASSERT_EQ(payloads.size(), retry + 1);
982
983 Chunk c = DecodeChunk(payloads.back());
984 EXPECT_EQ(c.session_id(), 14u);
985 EXPECT_EQ(c.offset(), 0u);
986 EXPECT_EQ(c.window_end_offset(), 37u);
987
988 // Transfer has not yet completed.
989 EXPECT_EQ(transfer_status, Status::Unknown());
990 }
991
992 // Send some data.
993 context_.server().SendServerStream<Transfer::Read>(
994 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
995 .set_session_id(14)
996 .set_offset(0)
997 .set_payload(data.first(16))));
998 transfer_thread_.WaitUntilEventIsProcessed();
999 ASSERT_EQ(payloads.size(), 3u);
1000
1001 // Time out a couple more times. The context's retry count should have been
1002 // reset, so it should go through the standard retry flow instead of
1003 // terminating the transfer.
1004 transfer_thread_.SimulateClientTimeout(14);
1005 ASSERT_EQ(payloads.size(), 4u);
1006
1007 Chunk c = DecodeChunk(payloads.back());
1008 EXPECT_FALSE(c.status().has_value());
1009 EXPECT_EQ(c.session_id(), 14u);
1010 EXPECT_EQ(c.offset(), 16u);
1011 EXPECT_EQ(c.window_end_offset(), 53u);
1012
1013 transfer_thread_.SimulateClientTimeout(14);
1014 ASSERT_EQ(payloads.size(), 5u);
1015
1016 c = DecodeChunk(payloads.back());
1017 EXPECT_FALSE(c.status().has_value());
1018 EXPECT_EQ(c.session_id(), 14u);
1019 EXPECT_EQ(c.offset(), 16u);
1020 EXPECT_EQ(c.window_end_offset(), 53u);
1021
1022 // Ensure we don't leave a dangling reference to transfer_status.
1023 handle->Cancel();
1024 transfer_thread_.WaitUntilEventIsProcessed();
1025 }
1026
TEST_F(ReadTransfer,InitialPacketFails_OnCompletedCalledWithDataLoss)1027 TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
1028 stream::MemoryWriterBuffer<64> writer;
1029 Status transfer_status = Status::Unknown();
1030
1031 context_.output().set_send_status(Status::Unauthenticated());
1032
1033 ASSERT_EQ(OkStatus(),
1034 legacy_client_
1035 .Read(
1036 14,
1037 writer,
1038 [&transfer_status](Status status) {
1039 ASSERT_EQ(transfer_status,
1040 Status::Unknown()); // Must only call once
1041 transfer_status = status;
1042 },
1043 kTestTimeout)
1044 .status());
1045 transfer_thread_.WaitUntilEventIsProcessed();
1046
1047 EXPECT_EQ(transfer_status, Status::Internal());
1048 }
1049
1050 class WriteTransfer : public ::testing::Test {
1051 protected:
WriteTransfer()1052 WriteTransfer()
1053 : transfer_thread_(chunk_buffer_, encode_buffer_),
1054 legacy_client_(context_.client(),
1055 context_.channel().id(),
1056 transfer_thread_,
1057 transfer_thread_.max_chunk_size()),
1058 client_(context_.client(),
1059 context_.channel().id(),
1060 transfer_thread_,
1061 transfer_thread_.max_chunk_size()),
1062 system_thread_(TransferThreadOptions(), transfer_thread_) {
1063 legacy_client_.set_protocol_version(ProtocolVersion::kLegacy);
1064 }
1065
~WriteTransfer()1066 ~WriteTransfer() override {
1067 transfer_thread_.Terminate();
1068 system_thread_.join();
1069 }
1070
1071 rpc::RawClientTestContext<> context_;
1072
1073 Thread<1, 1> transfer_thread_;
1074 Client legacy_client_;
1075 Client client_;
1076
1077 std::array<std::byte, 64> chunk_buffer_;
1078 std::array<std::byte, 64> encode_buffer_;
1079
1080 pw::Thread system_thread_;
1081 };
1082
TEST_F(WriteTransfer,SingleChunk)1083 TEST_F(WriteTransfer, SingleChunk) {
1084 stream::MemoryReader reader(kData32);
1085 Status transfer_status = Status::Unknown();
1086
1087 ASSERT_EQ(OkStatus(),
1088 legacy_client_
1089 .Write(3,
1090 reader,
1091 [&transfer_status](Status status) {
1092 transfer_status = status;
1093 })
1094 .status());
1095 transfer_thread_.WaitUntilEventIsProcessed();
1096
1097 // The client begins by sending the ID of the resource to transfer.
1098 rpc::PayloadsView payloads =
1099 context_.output().payloads<Transfer::Write>(context_.channel().id());
1100 ASSERT_EQ(payloads.size(), 1u);
1101 EXPECT_EQ(transfer_status, Status::Unknown());
1102
1103 Chunk c0 = DecodeChunk(payloads[0]);
1104 EXPECT_EQ(c0.session_id(), 3u);
1105 EXPECT_EQ(c0.resource_id(), 3u);
1106 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1107
1108 // Send transfer parameters. Client should send a data chunk and the final
1109 // chunk.
1110 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1111 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1112 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1113 .set_session_id(3)
1114 .set_offset(0)
1115 .set_window_end_offset(64)
1116 .set_max_chunk_size_bytes(32)));
1117 });
1118
1119 ASSERT_EQ(payloads.size(), 3u);
1120
1121 Chunk c1 = DecodeChunk(payloads[1]);
1122 EXPECT_EQ(c1.session_id(), 3u);
1123 EXPECT_EQ(c1.offset(), 0u);
1124 EXPECT_TRUE(c1.has_payload());
1125 EXPECT_EQ(
1126 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1127
1128 Chunk c2 = DecodeChunk(payloads[2]);
1129 EXPECT_EQ(c2.session_id(), 3u);
1130 ASSERT_TRUE(c2.remaining_bytes().has_value());
1131 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1132
1133 EXPECT_EQ(transfer_status, Status::Unknown());
1134
1135 // Send the final status chunk to complete the transfer.
1136 context_.server().SendServerStream<Transfer::Write>(
1137 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
1138 transfer_thread_.WaitUntilEventIsProcessed();
1139
1140 EXPECT_EQ(payloads.size(), 3u);
1141 EXPECT_EQ(transfer_status, OkStatus());
1142 }
1143
TEST_F(WriteTransfer,MultiChunk)1144 TEST_F(WriteTransfer, MultiChunk) {
1145 stream::MemoryReader reader(kData32);
1146 Status transfer_status = Status::Unknown();
1147
1148 ASSERT_EQ(OkStatus(),
1149 legacy_client_
1150 .Write(4,
1151 reader,
1152 [&transfer_status](Status status) {
1153 transfer_status = status;
1154 })
1155 .status());
1156 transfer_thread_.WaitUntilEventIsProcessed();
1157
1158 // The client begins by sending the ID of the resource to transfer.
1159 rpc::PayloadsView payloads =
1160 context_.output().payloads<Transfer::Write>(context_.channel().id());
1161 ASSERT_EQ(payloads.size(), 1u);
1162 EXPECT_EQ(transfer_status, Status::Unknown());
1163
1164 Chunk c0 = DecodeChunk(payloads[0]);
1165 EXPECT_EQ(c0.session_id(), 4u);
1166 EXPECT_EQ(c0.resource_id(), 4u);
1167 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1168
1169 // Send transfer parameters with a chunk size smaller than the data.
1170
1171 // Client should send two data chunks and the final chunk.
1172 rpc::test::WaitForPackets(context_.output(), 3, [this] {
1173 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1174 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1175 .set_session_id(4)
1176 .set_offset(0)
1177 .set_window_end_offset(64)
1178 .set_max_chunk_size_bytes(16)));
1179 });
1180
1181 ASSERT_EQ(payloads.size(), 4u);
1182
1183 Chunk c1 = DecodeChunk(payloads[1]);
1184 EXPECT_EQ(c1.session_id(), 4u);
1185 EXPECT_EQ(c1.offset(), 0u);
1186 EXPECT_TRUE(c1.has_payload());
1187 EXPECT_EQ(
1188 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1189
1190 Chunk c2 = DecodeChunk(payloads[2]);
1191 EXPECT_EQ(c2.session_id(), 4u);
1192 EXPECT_EQ(c2.offset(), 16u);
1193 EXPECT_TRUE(c2.has_payload());
1194 EXPECT_EQ(std::memcmp(c2.payload().data(),
1195 kData32.data() + c2.offset(),
1196 c2.payload().size()),
1197 0);
1198
1199 Chunk c3 = DecodeChunk(payloads[3]);
1200 EXPECT_EQ(c3.session_id(), 4u);
1201 ASSERT_TRUE(c3.remaining_bytes().has_value());
1202 EXPECT_EQ(c3.remaining_bytes().value(), 0u);
1203
1204 EXPECT_EQ(transfer_status, Status::Unknown());
1205
1206 // Send the final status chunk to complete the transfer.
1207 context_.server().SendServerStream<Transfer::Write>(
1208 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 4, OkStatus())));
1209 transfer_thread_.WaitUntilEventIsProcessed();
1210
1211 EXPECT_EQ(payloads.size(), 4u);
1212 EXPECT_EQ(transfer_status, OkStatus());
1213 }
1214
TEST_F(WriteTransfer,OutOfOrder_SeekSupported)1215 TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
1216 stream::MemoryReader reader(kData32);
1217 Status transfer_status = Status::Unknown();
1218
1219 ASSERT_EQ(OkStatus(),
1220 legacy_client_
1221 .Write(5,
1222 reader,
1223 [&transfer_status](Status status) {
1224 transfer_status = status;
1225 })
1226 .status());
1227 transfer_thread_.WaitUntilEventIsProcessed();
1228
1229 // The client begins by sending the ID of the resource to transfer.
1230 rpc::PayloadsView payloads =
1231 context_.output().payloads<Transfer::Write>(context_.channel().id());
1232 ASSERT_EQ(payloads.size(), 1u);
1233 EXPECT_EQ(transfer_status, Status::Unknown());
1234
1235 Chunk c0 = DecodeChunk(payloads[0]);
1236 EXPECT_EQ(c0.session_id(), 5u);
1237 EXPECT_EQ(c0.resource_id(), 5u);
1238 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1239
1240 // Send transfer parameters with a nonzero offset, requesting a seek.
1241 // Client should send a data chunk and the final chunk.
1242 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1243 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1244 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1245 .set_session_id(5)
1246 .set_offset(16)
1247 .set_window_end_offset(64)
1248 .set_max_chunk_size_bytes(32)));
1249 });
1250
1251 ASSERT_EQ(payloads.size(), 3u);
1252
1253 Chunk c1 = DecodeChunk(payloads[1]);
1254 EXPECT_EQ(c1.session_id(), 5u);
1255 EXPECT_EQ(c1.offset(), 16u);
1256 EXPECT_TRUE(c1.has_payload());
1257 EXPECT_EQ(std::memcmp(c1.payload().data(),
1258 kData32.data() + c1.offset(),
1259 c1.payload().size()),
1260 0);
1261
1262 Chunk c2 = DecodeChunk(payloads[2]);
1263 EXPECT_EQ(c2.session_id(), 5u);
1264 ASSERT_TRUE(c2.remaining_bytes().has_value());
1265 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1266
1267 EXPECT_EQ(transfer_status, Status::Unknown());
1268
1269 // Send the final status chunk to complete the transfer.
1270 context_.server().SendServerStream<Transfer::Write>(
1271 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 5, OkStatus())));
1272 transfer_thread_.WaitUntilEventIsProcessed();
1273
1274 EXPECT_EQ(payloads.size(), 3u);
1275 EXPECT_EQ(transfer_status, OkStatus());
1276 }
1277
1278 class FakeNonSeekableReader final : public stream::NonSeekableReader {
1279 public:
FakeNonSeekableReader(ConstByteSpan data)1280 FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1281
1282 private:
DoRead(ByteSpan out)1283 StatusWithSize DoRead(ByteSpan out) final {
1284 if (position_ == data_.size()) {
1285 return StatusWithSize::OutOfRange();
1286 }
1287
1288 size_t to_copy = std::min(out.size(), data_.size() - position_);
1289 std::memcpy(out.data(), data_.data() + position_, to_copy);
1290 position_ += to_copy;
1291
1292 return StatusWithSize(to_copy);
1293 }
1294
1295 ConstByteSpan data_;
1296 size_t position_;
1297 };
1298
TEST_F(WriteTransfer,OutOfOrder_SeekNotSupported)1299 TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
1300 FakeNonSeekableReader reader(kData32);
1301 Status transfer_status = Status::Unknown();
1302
1303 ASSERT_EQ(OkStatus(),
1304 legacy_client_
1305 .Write(6,
1306 reader,
1307 [&transfer_status](Status status) {
1308 transfer_status = status;
1309 })
1310 .status());
1311 transfer_thread_.WaitUntilEventIsProcessed();
1312
1313 // The client begins by sending the ID of the resource to transfer.
1314 rpc::PayloadsView payloads =
1315 context_.output().payloads<Transfer::Write>(context_.channel().id());
1316 ASSERT_EQ(payloads.size(), 1u);
1317 EXPECT_EQ(transfer_status, Status::Unknown());
1318
1319 Chunk c0 = DecodeChunk(payloads[0]);
1320 EXPECT_EQ(c0.session_id(), 6u);
1321 EXPECT_EQ(c0.resource_id(), 6u);
1322 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1323
1324 // Send transfer parameters with a nonzero offset, requesting a seek.
1325 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1326 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1327 .set_session_id(6)
1328 .set_offset(16)
1329 .set_window_end_offset(64)
1330 .set_max_chunk_size_bytes(32)));
1331 transfer_thread_.WaitUntilEventIsProcessed();
1332
1333 // Client should send a status chunk and end the transfer.
1334 ASSERT_EQ(payloads.size(), 2u);
1335
1336 Chunk c1 = DecodeChunk(payloads[1]);
1337 EXPECT_EQ(c1.session_id(), 6u);
1338 EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1339 ASSERT_TRUE(c1.status().has_value());
1340 EXPECT_EQ(c1.status().value(), Status::Unimplemented());
1341
1342 EXPECT_EQ(transfer_status, Status::Unimplemented());
1343 }
1344
TEST_F(WriteTransfer,ServerError)1345 TEST_F(WriteTransfer, ServerError) {
1346 stream::MemoryReader reader(kData32);
1347 Status transfer_status = Status::Unknown();
1348
1349 ASSERT_EQ(OkStatus(),
1350 legacy_client_
1351 .Write(7,
1352 reader,
1353 [&transfer_status](Status status) {
1354 transfer_status = status;
1355 })
1356 .status());
1357 transfer_thread_.WaitUntilEventIsProcessed();
1358
1359 // The client begins by sending the ID of the resource to transfer.
1360 rpc::PayloadsView payloads =
1361 context_.output().payloads<Transfer::Write>(context_.channel().id());
1362 ASSERT_EQ(payloads.size(), 1u);
1363 EXPECT_EQ(transfer_status, Status::Unknown());
1364
1365 Chunk c0 = DecodeChunk(payloads[0]);
1366 EXPECT_EQ(c0.session_id(), 7u);
1367 EXPECT_EQ(c0.resource_id(), 7u);
1368 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1369
1370 // Send an error from the server.
1371 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1372 Chunk::Final(ProtocolVersion::kLegacy, 7, Status::NotFound())));
1373 transfer_thread_.WaitUntilEventIsProcessed();
1374
1375 // Client should not respond and terminate the transfer.
1376 EXPECT_EQ(payloads.size(), 1u);
1377 EXPECT_EQ(transfer_status, Status::NotFound());
1378 }
1379
TEST_F(WriteTransfer,AbortIfZeroBytesAreRequested)1380 TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1381 stream::MemoryReader reader(kData32);
1382 Status transfer_status = Status::Unknown();
1383
1384 ASSERT_EQ(OkStatus(),
1385 legacy_client_
1386 .Write(9,
1387 reader,
1388 [&transfer_status](Status status) {
1389 transfer_status = status;
1390 })
1391 .status());
1392 transfer_thread_.WaitUntilEventIsProcessed();
1393
1394 // The client begins by sending the ID of the resource to transfer.
1395 rpc::PayloadsView payloads =
1396 context_.output().payloads<Transfer::Write>(context_.channel().id());
1397 ASSERT_EQ(payloads.size(), 1u);
1398 EXPECT_EQ(transfer_status, Status::Unknown());
1399
1400 Chunk c0 = DecodeChunk(payloads[0]);
1401 EXPECT_EQ(c0.session_id(), 9u);
1402 EXPECT_EQ(c0.resource_id(), 9u);
1403 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1404
1405 // Send an invalid transfer parameters chunk with 0 pending bytes.
1406 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1407 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1408 .set_session_id(9)
1409 .set_offset(0)
1410 .set_window_end_offset(0)
1411 .set_max_chunk_size_bytes(32)));
1412 transfer_thread_.WaitUntilEventIsProcessed();
1413
1414 // Client should send a status chunk and end the transfer.
1415 ASSERT_EQ(payloads.size(), 2u);
1416
1417 Chunk c1 = DecodeChunk(payloads[1]);
1418 EXPECT_EQ(c1.session_id(), 9u);
1419 ASSERT_TRUE(c1.status().has_value());
1420 EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1421
1422 EXPECT_EQ(transfer_status, Status::ResourceExhausted());
1423 }
1424
TEST_F(WriteTransfer,Timeout_RetriesWithInitialChunk)1425 TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1426 stream::MemoryReader reader(kData32);
1427 Status transfer_status = Status::Unknown();
1428
1429 Result<Client::Handle> handle = legacy_client_.Write(
1430 10,
1431 reader,
1432 [&transfer_status](Status status) { transfer_status = status; },
1433 kTestTimeout,
1434 kTestTimeout);
1435 ASSERT_EQ(OkStatus(), handle.status());
1436 transfer_thread_.WaitUntilEventIsProcessed();
1437
1438 // The client begins by sending the ID of the resource to transfer.
1439 rpc::PayloadsView payloads =
1440 context_.output().payloads<Transfer::Write>(context_.channel().id());
1441 ASSERT_EQ(payloads.size(), 1u);
1442 EXPECT_EQ(transfer_status, Status::Unknown());
1443
1444 Chunk c0 = DecodeChunk(payloads.back());
1445 EXPECT_EQ(c0.session_id(), 10u);
1446 EXPECT_EQ(c0.resource_id(), 10u);
1447 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1448
1449 // Wait for the timeout to expire without doing anything. The client should
1450 // resend the initial transmit chunk.
1451 transfer_thread_.SimulateClientTimeout(10);
1452 ASSERT_EQ(payloads.size(), 2u);
1453
1454 Chunk c = DecodeChunk(payloads.back());
1455 EXPECT_EQ(c.session_id(), 10u);
1456 EXPECT_EQ(c.resource_id(), 10u);
1457 EXPECT_EQ(c.type(), Chunk::Type::kStart);
1458
1459 // Transfer has not yet completed.
1460 EXPECT_EQ(transfer_status, Status::Unknown());
1461
1462 // Ensure we don't leave a dangling reference to transfer_status.
1463 handle->Cancel();
1464 transfer_thread_.WaitUntilEventIsProcessed();
1465 }
1466
TEST_F(WriteTransfer,Timeout_RetriesWithMostRecentChunk)1467 TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1468 stream::MemoryReader reader(kData32);
1469 Status transfer_status = Status::Unknown();
1470
1471 Result<Client::Handle> handle = legacy_client_.Write(
1472 11,
1473 reader,
1474 [&transfer_status](Status status) { transfer_status = status; },
1475 kTestTimeout);
1476 ASSERT_EQ(OkStatus(), handle.status());
1477 transfer_thread_.WaitUntilEventIsProcessed();
1478
1479 // The client begins by sending the ID of the resource to transfer.
1480 rpc::PayloadsView payloads =
1481 context_.output().payloads<Transfer::Write>(context_.channel().id());
1482 ASSERT_EQ(payloads.size(), 1u);
1483 EXPECT_EQ(transfer_status, Status::Unknown());
1484
1485 Chunk c0 = DecodeChunk(payloads.back());
1486 EXPECT_EQ(c0.session_id(), 11u);
1487 EXPECT_EQ(c0.resource_id(), 11u);
1488 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1489
1490 // Send the first parameters chunk.
1491 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1492 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1493 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1494 .set_session_id(11)
1495 .set_offset(0)
1496 .set_window_end_offset(16)
1497 .set_max_chunk_size_bytes(8)));
1498 });
1499 ASSERT_EQ(payloads.size(), 3u);
1500
1501 EXPECT_EQ(transfer_status, Status::Unknown());
1502
1503 Chunk c1 = DecodeChunk(payloads[1]);
1504 EXPECT_EQ(c1.session_id(), 11u);
1505 EXPECT_EQ(c1.offset(), 0u);
1506 EXPECT_EQ(c1.payload().size(), 8u);
1507 EXPECT_EQ(
1508 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1509
1510 Chunk c2 = DecodeChunk(payloads[2]);
1511 EXPECT_EQ(c2.session_id(), 11u);
1512 EXPECT_EQ(c2.offset(), 8u);
1513 EXPECT_EQ(c2.payload().size(), 8u);
1514 EXPECT_EQ(std::memcmp(c2.payload().data(),
1515 kData32.data() + c2.offset(),
1516 c2.payload().size()),
1517 0);
1518
1519 // Wait for the timeout to expire without doing anything. The client should
1520 // resend the most recently sent chunk.
1521 transfer_thread_.SimulateClientTimeout(11);
1522 ASSERT_EQ(payloads.size(), 4u);
1523
1524 Chunk c3 = DecodeChunk(payloads[3]);
1525 EXPECT_EQ(c3.session_id(), c2.session_id());
1526 EXPECT_EQ(c3.offset(), c2.offset());
1527 EXPECT_EQ(c3.payload().size(), c2.payload().size());
1528 EXPECT_EQ(std::memcmp(
1529 c3.payload().data(), c2.payload().data(), c3.payload().size()),
1530 0);
1531
1532 // Transfer has not yet completed.
1533 EXPECT_EQ(transfer_status, Status::Unknown());
1534
1535 // Ensure we don't leave a dangling reference to transfer_status.
1536 handle->Cancel();
1537 transfer_thread_.WaitUntilEventIsProcessed();
1538 }
1539
TEST_F(WriteTransfer,Timeout_RetriesWithSingleChunkTransfer)1540 TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1541 stream::MemoryReader reader(kData32);
1542 Status transfer_status = Status::Unknown();
1543
1544 Result<Client::Handle> handle = legacy_client_.Write(
1545 12,
1546 reader,
1547 [&transfer_status](Status status) { transfer_status = status; },
1548 kTestTimeout);
1549 ASSERT_EQ(OkStatus(), handle.status());
1550 transfer_thread_.WaitUntilEventIsProcessed();
1551
1552 // The client begins by sending the ID of the resource to transfer.
1553 rpc::PayloadsView payloads =
1554 context_.output().payloads<Transfer::Write>(context_.channel().id());
1555 ASSERT_EQ(payloads.size(), 1u);
1556 EXPECT_EQ(transfer_status, Status::Unknown());
1557
1558 Chunk c0 = DecodeChunk(payloads.back());
1559 EXPECT_EQ(c0.session_id(), 12u);
1560 EXPECT_EQ(c0.resource_id(), 12u);
1561 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1562
1563 // Send the first parameters chunk, requesting all the data. The client should
1564 // respond with one data chunk and a remaining_bytes = 0 chunk.
1565 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1566 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1567 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1568 .set_session_id(12)
1569 .set_offset(0)
1570 .set_window_end_offset(64)
1571 .set_max_chunk_size_bytes(64)));
1572 });
1573 ASSERT_EQ(payloads.size(), 3u);
1574
1575 EXPECT_EQ(transfer_status, Status::Unknown());
1576
1577 Chunk c1 = DecodeChunk(payloads[1]);
1578 EXPECT_EQ(c1.session_id(), 12u);
1579 EXPECT_EQ(c1.offset(), 0u);
1580 EXPECT_EQ(c1.payload().size(), 32u);
1581 EXPECT_EQ(
1582 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1583
1584 Chunk c2 = DecodeChunk(payloads[2]);
1585 EXPECT_EQ(c2.session_id(), 12u);
1586 ASSERT_TRUE(c2.remaining_bytes().has_value());
1587 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1588
1589 // Wait for the timeout to expire without doing anything. The client should
1590 // resend the data chunk.
1591 transfer_thread_.SimulateClientTimeout(12);
1592 ASSERT_EQ(payloads.size(), 4u);
1593
1594 Chunk c3 = DecodeChunk(payloads[3]);
1595 EXPECT_EQ(c3.session_id(), c1.session_id());
1596 EXPECT_EQ(c3.offset(), c1.offset());
1597 EXPECT_EQ(c3.payload().size(), c1.payload().size());
1598 EXPECT_EQ(std::memcmp(
1599 c3.payload().data(), c1.payload().data(), c3.payload().size()),
1600 0);
1601
1602 // The remaining_bytes = 0 chunk should be resent on the next parameters.
1603 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1604 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1605 .set_session_id(12)
1606 .set_offset(32)
1607 .set_window_end_offset(64)));
1608 transfer_thread_.WaitUntilEventIsProcessed();
1609
1610 ASSERT_EQ(payloads.size(), 5u);
1611
1612 Chunk c4 = DecodeChunk(payloads[4]);
1613 EXPECT_EQ(c4.session_id(), 12u);
1614 ASSERT_TRUE(c4.remaining_bytes().has_value());
1615 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
1616
1617 context_.server().SendServerStream<Transfer::Write>(
1618 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 12, OkStatus())));
1619 transfer_thread_.WaitUntilEventIsProcessed();
1620
1621 EXPECT_EQ(transfer_status, OkStatus());
1622 }
1623
TEST_F(WriteTransfer,Timeout_EndsTransferAfterMaxRetries)1624 TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1625 stream::MemoryReader reader(kData32);
1626 Status transfer_status = Status::Unknown();
1627
1628 Result<Client::Handle> handle = legacy_client_.Write(
1629 13,
1630 reader,
1631 [&transfer_status](Status status) { transfer_status = status; },
1632 kTestTimeout,
1633 kTestTimeout);
1634 ASSERT_EQ(OkStatus(), handle.status());
1635 transfer_thread_.WaitUntilEventIsProcessed();
1636
1637 // The client begins by sending the ID of the resource to transfer.
1638 rpc::PayloadsView payloads =
1639 context_.output().payloads<Transfer::Write>(context_.channel().id());
1640 ASSERT_EQ(payloads.size(), 1u);
1641 EXPECT_EQ(transfer_status, Status::Unknown());
1642
1643 Chunk c0 = DecodeChunk(payloads.back());
1644 EXPECT_EQ(c0.session_id(), 13u);
1645 EXPECT_EQ(c0.resource_id(), 13u);
1646 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1647
1648 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1649 // Wait for the timeout to expire without doing anything. The client should
1650 // resend the initial transmit chunk.
1651 transfer_thread_.SimulateClientTimeout(13);
1652 ASSERT_EQ(payloads.size(), retry + 1);
1653
1654 Chunk c = DecodeChunk(payloads.back());
1655 EXPECT_EQ(c.session_id(), 13u);
1656 EXPECT_EQ(c.resource_id(), 13u);
1657 EXPECT_EQ(c.type(), Chunk::Type::kStart);
1658
1659 // Transfer has not yet completed.
1660 EXPECT_EQ(transfer_status, Status::Unknown());
1661 }
1662
1663 // Time out one more time after the final retry. The client should cancel the
1664 // transfer at this point. As no packets were received from the server, no
1665 // final status chunk should be sent.
1666 transfer_thread_.SimulateClientTimeout(13);
1667 ASSERT_EQ(payloads.size(), 4u);
1668
1669 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1670
1671 // After finishing the transfer, nothing else should be sent.
1672 transfer_thread_.SimulateClientTimeout(13);
1673 transfer_thread_.SimulateClientTimeout(13);
1674 transfer_thread_.SimulateClientTimeout(13);
1675 ASSERT_EQ(payloads.size(), 4u);
1676
1677 // Ensure we don't leave a dangling reference to transfer_status.
1678 handle->Cancel();
1679 transfer_thread_.WaitUntilEventIsProcessed();
1680 }
1681
TEST_F(WriteTransfer,Timeout_NonSeekableReaderEndsTransfer)1682 TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1683 FakeNonSeekableReader reader(kData32);
1684 Status transfer_status = Status::Unknown();
1685
1686 Result<Client::Handle> handle = legacy_client_.Write(
1687 14,
1688 reader,
1689 [&transfer_status](Status status) { transfer_status = status; },
1690 kTestTimeout);
1691 ASSERT_EQ(OkStatus(), handle.status());
1692 transfer_thread_.WaitUntilEventIsProcessed();
1693
1694 // The client begins by sending the ID of the resource to transfer.
1695 rpc::PayloadsView payloads =
1696 context_.output().payloads<Transfer::Write>(context_.channel().id());
1697 ASSERT_EQ(payloads.size(), 1u);
1698 EXPECT_EQ(transfer_status, Status::Unknown());
1699
1700 Chunk c0 = DecodeChunk(payloads.back());
1701 EXPECT_EQ(c0.session_id(), 14u);
1702 EXPECT_EQ(c0.resource_id(), 14u);
1703 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1704
1705 // Send the first parameters chunk.
1706 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1707 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1708 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1709 .set_session_id(14)
1710 .set_offset(0)
1711 .set_window_end_offset(16)
1712 .set_max_chunk_size_bytes(8)));
1713 });
1714 ASSERT_EQ(payloads.size(), 3u);
1715
1716 EXPECT_EQ(transfer_status, Status::Unknown());
1717
1718 Chunk c1 = DecodeChunk(payloads[1]);
1719 EXPECT_EQ(c1.session_id(), 14u);
1720 EXPECT_EQ(c1.offset(), 0u);
1721 EXPECT_TRUE(c1.has_payload());
1722 EXPECT_EQ(c1.payload().size(), 8u);
1723 EXPECT_EQ(
1724 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1725
1726 Chunk c2 = DecodeChunk(payloads[2]);
1727 EXPECT_EQ(c2.session_id(), 14u);
1728 EXPECT_EQ(c2.offset(), 8u);
1729 EXPECT_TRUE(c2.has_payload());
1730 EXPECT_EQ(c2.payload().size(), 8u);
1731 EXPECT_EQ(std::memcmp(c2.payload().data(),
1732 kData32.data() + c2.offset(),
1733 c2.payload().size()),
1734 0);
1735
1736 // Wait for the timeout to expire without doing anything. The client should
1737 // fail to seek back and end the transfer.
1738 transfer_thread_.SimulateClientTimeout(14);
1739 ASSERT_EQ(payloads.size(), 4u);
1740
1741 Chunk c3 = DecodeChunk(payloads[3]);
1742 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
1743 EXPECT_EQ(c3.session_id(), 14u);
1744 ASSERT_TRUE(c3.status().has_value());
1745 EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
1746
1747 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1748
1749 // Ensure we don't leave a dangling reference to transfer_status.
1750 handle->Cancel();
1751 transfer_thread_.WaitUntilEventIsProcessed();
1752 }
1753
TEST_F(WriteTransfer,ManualCancel)1754 TEST_F(WriteTransfer, ManualCancel) {
1755 stream::MemoryReader reader(kData32);
1756 Status transfer_status = Status::Unknown();
1757
1758 Result<Client::Handle> handle = legacy_client_.Write(
1759 15,
1760 reader,
1761 [&transfer_status](Status status) { transfer_status = status; },
1762 kTestTimeout);
1763 ASSERT_EQ(OkStatus(), handle.status());
1764 transfer_thread_.WaitUntilEventIsProcessed();
1765
1766 // The client begins by sending the ID of the resource to transfer.
1767 rpc::PayloadsView payloads =
1768 context_.output().payloads<Transfer::Write>(context_.channel().id());
1769 ASSERT_EQ(payloads.size(), 1u);
1770 EXPECT_EQ(transfer_status, Status::Unknown());
1771
1772 Chunk chunk = DecodeChunk(payloads.back());
1773 EXPECT_EQ(chunk.session_id(), 15u);
1774 EXPECT_EQ(chunk.resource_id(), 15u);
1775 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1776
1777 // Get a response from the server, then cancel the transfer.
1778 // This must request a smaller chunk than the entire available write data to
1779 // prevent the client from trying to send an additional finish chunk.
1780 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1781 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1782 .set_session_id(15)
1783 .set_offset(0)
1784 .set_window_end_offset(16)
1785 .set_max_chunk_size_bytes(16)));
1786 transfer_thread_.WaitUntilEventIsProcessed();
1787 ASSERT_EQ(payloads.size(), 2u);
1788
1789 handle->Cancel();
1790 transfer_thread_.WaitUntilEventIsProcessed();
1791
1792 // Client should send a cancellation chunk to the server.
1793 ASSERT_EQ(payloads.size(), 3u);
1794 chunk = DecodeChunk(payloads.back());
1795 EXPECT_EQ(chunk.session_id(), 15u);
1796 ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
1797 EXPECT_EQ(chunk.status().value(), Status::Cancelled());
1798
1799 EXPECT_EQ(transfer_status, Status::Cancelled());
1800 }
1801
TEST_F(WriteTransfer,ManualCancel_NoContact)1802 TEST_F(WriteTransfer, ManualCancel_NoContact) {
1803 stream::MemoryReader reader(kData32);
1804 Status transfer_status = Status::Unknown();
1805
1806 Result<Client::Handle> handle = legacy_client_.Write(
1807 15,
1808 reader,
1809 [&transfer_status](Status status) { transfer_status = status; },
1810 kTestTimeout,
1811 kTestTimeout);
1812 ASSERT_EQ(handle.status(), OkStatus());
1813 transfer_thread_.WaitUntilEventIsProcessed();
1814
1815 // The client begins by sending the ID of the resource to transfer.
1816 rpc::PayloadsView payloads =
1817 context_.output().payloads<Transfer::Write>(context_.channel().id());
1818 ASSERT_EQ(payloads.size(), 1u);
1819 EXPECT_EQ(transfer_status, Status::Unknown());
1820
1821 Chunk chunk = DecodeChunk(payloads.back());
1822 EXPECT_EQ(chunk.session_id(), 15u);
1823 EXPECT_EQ(chunk.resource_id(), 15u);
1824 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1825
1826 // Cancel transfer without a server response. No final chunk should be sent.
1827 handle->Cancel();
1828 transfer_thread_.WaitUntilEventIsProcessed();
1829
1830 ASSERT_EQ(payloads.size(), 1u);
1831
1832 EXPECT_EQ(transfer_status, Status::Cancelled());
1833 }
1834
TEST_F(WriteTransfer,ManualCancel_Duplicate)1835 TEST_F(WriteTransfer, ManualCancel_Duplicate) {
1836 stream::MemoryReader reader(kData32);
1837 Status transfer_status = Status::Unknown();
1838
1839 Result<Client::Handle> handle = legacy_client_.Write(
1840 16,
1841 reader,
1842 [&transfer_status](Status status) { transfer_status = status; },
1843 kTestTimeout);
1844 ASSERT_EQ(OkStatus(), handle.status());
1845 transfer_thread_.WaitUntilEventIsProcessed();
1846
1847 // The client begins by sending the ID of the resource to transfer.
1848 rpc::PayloadsView payloads =
1849 context_.output().payloads<Transfer::Write>(context_.channel().id());
1850 ASSERT_EQ(payloads.size(), 1u);
1851 EXPECT_EQ(transfer_status, Status::Unknown());
1852
1853 Chunk chunk = DecodeChunk(payloads.back());
1854 EXPECT_EQ(chunk.session_id(), 16u);
1855 EXPECT_EQ(chunk.resource_id(), 16u);
1856 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1857
1858 // Get a response from the server, then cancel the transfer.
1859 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1860 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1861 .set_session_id(16)
1862 .set_offset(0)
1863 .set_window_end_offset(16) // Request only a single chunk.
1864 .set_max_chunk_size_bytes(16)));
1865 transfer_thread_.WaitUntilEventIsProcessed();
1866 ASSERT_EQ(payloads.size(), 2u);
1867
1868 handle->Cancel();
1869 transfer_thread_.WaitUntilEventIsProcessed();
1870
1871 // Client should send a cancellation chunk to the server.
1872 ASSERT_EQ(payloads.size(), 3u);
1873 chunk = DecodeChunk(payloads.back());
1874 EXPECT_EQ(chunk.session_id(), 16u);
1875 ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
1876 EXPECT_EQ(chunk.status().value(), Status::Cancelled());
1877
1878 EXPECT_EQ(transfer_status, Status::Cancelled());
1879
1880 // Attempt to cancel the transfer again.
1881 transfer_status = Status::Unknown();
1882 handle->Cancel();
1883 transfer_thread_.WaitUntilEventIsProcessed();
1884
1885 // No further chunks should be sent.
1886 EXPECT_EQ(payloads.size(), 3u);
1887 EXPECT_EQ(transfer_status, Status::Unknown());
1888 }
1889
TEST_F(ReadTransfer,Version2_SingleChunk)1890 TEST_F(ReadTransfer, Version2_SingleChunk) {
1891 stream::MemoryWriterBuffer<64> writer;
1892 Status transfer_status = Status::Unknown();
1893
1894 ASSERT_EQ(
1895 OkStatus(),
1896 client_
1897 .Read(
1898 3,
1899 writer,
1900 [&transfer_status](Status status) { transfer_status = status; },
1901 cfg::kDefaultClientTimeout,
1902 cfg::kDefaultClientTimeout)
1903 .status());
1904
1905 transfer_thread_.WaitUntilEventIsProcessed();
1906
1907 // Initial chunk of the transfer is sent. This chunk should contain all the
1908 // fields from both legacy and version 2 protocols for backwards
1909 // compatibility.
1910 rpc::PayloadsView payloads =
1911 context_.output().payloads<Transfer::Read>(context_.channel().id());
1912 ASSERT_EQ(payloads.size(), 1u);
1913 EXPECT_EQ(transfer_status, Status::Unknown());
1914
1915 Chunk chunk = DecodeChunk(payloads[0]);
1916 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1917 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1918 EXPECT_EQ(chunk.desired_session_id(), 1u);
1919 EXPECT_EQ(chunk.resource_id(), 3u);
1920 EXPECT_EQ(chunk.offset(), 0u);
1921 EXPECT_EQ(chunk.window_end_offset(), 37u);
1922 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1923
1924 // The server responds with a START_ACK, continuing the version 2 handshake.
1925 context_.server().SendServerStream<Transfer::Read>(
1926 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1927 .set_session_id(1)
1928 .set_resource_id(3)));
1929 transfer_thread_.WaitUntilEventIsProcessed();
1930
1931 ASSERT_EQ(payloads.size(), 2u);
1932
1933 // Client should accept the session_id with a START_ACK_CONFIRMATION,
1934 // additionally containing the initial parameters for the read transfer.
1935 chunk = DecodeChunk(payloads.back());
1936 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1937 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1938 EXPECT_FALSE(chunk.desired_session_id().has_value());
1939 EXPECT_EQ(chunk.session_id(), 1u);
1940 EXPECT_FALSE(chunk.resource_id().has_value());
1941 EXPECT_EQ(chunk.offset(), 0u);
1942 EXPECT_EQ(chunk.window_end_offset(), 37u);
1943 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1944
1945 // Send all the transfer data. Client should accept it and complete the
1946 // transfer.
1947 context_.server().SendServerStream<Transfer::Read>(
1948 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1949 .set_session_id(1)
1950 .set_offset(0)
1951 .set_payload(kData32)
1952 .set_remaining_bytes(0)));
1953 transfer_thread_.WaitUntilEventIsProcessed();
1954
1955 ASSERT_EQ(payloads.size(), 3u);
1956
1957 chunk = DecodeChunk(payloads.back());
1958 EXPECT_EQ(chunk.session_id(), 1u);
1959 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1960 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1961 ASSERT_TRUE(chunk.status().has_value());
1962 EXPECT_EQ(chunk.status().value(), OkStatus());
1963
1964 EXPECT_EQ(transfer_status, OkStatus());
1965 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1966 0);
1967
1968 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1969 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1970 .set_session_id(1)));
1971 }
1972
TEST_F(ReadTransfer,Version2_ServerRunsLegacy)1973 TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
1974 stream::MemoryWriterBuffer<64> writer;
1975 Status transfer_status = Status::Unknown();
1976
1977 ASSERT_EQ(
1978 OkStatus(),
1979 client_
1980 .Read(
1981 3,
1982 writer,
1983 [&transfer_status](Status status) { transfer_status = status; },
1984 cfg::kDefaultClientTimeout,
1985 cfg::kDefaultClientTimeout)
1986 .status());
1987
1988 transfer_thread_.WaitUntilEventIsProcessed();
1989
1990 // Initial chunk of the transfer is sent. This chunk should contain all the
1991 // fields from both legacy and version 2 protocols for backwards
1992 // compatibility.
1993 rpc::PayloadsView payloads =
1994 context_.output().payloads<Transfer::Read>(context_.channel().id());
1995 ASSERT_EQ(payloads.size(), 1u);
1996 EXPECT_EQ(transfer_status, Status::Unknown());
1997
1998 Chunk chunk = DecodeChunk(payloads[0]);
1999 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2000 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2001 EXPECT_EQ(chunk.desired_session_id(), 1u);
2002 EXPECT_EQ(chunk.resource_id(), 3u);
2003 EXPECT_EQ(chunk.offset(), 0u);
2004 EXPECT_EQ(chunk.window_end_offset(), 37u);
2005 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2006
2007 // Instead of a START_ACK to continue the handshake, the server responds with
2008 // an immediate data chunk, indicating that it is running the legacy protocol
2009 // version. Client should revert to legacy, using the resource_id of 3 as the
2010 // session_id, and complete the transfer.
2011 context_.server().SendServerStream<Transfer::Read>(
2012 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
2013 .set_session_id(3)
2014 .set_offset(0)
2015 .set_payload(kData32)
2016 .set_remaining_bytes(0)));
2017 transfer_thread_.WaitUntilEventIsProcessed();
2018
2019 ASSERT_EQ(payloads.size(), 2u);
2020
2021 chunk = DecodeChunk(payloads.back());
2022 EXPECT_FALSE(chunk.desired_session_id().has_value());
2023 EXPECT_EQ(chunk.session_id(), 3u);
2024 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2025 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2026 ASSERT_TRUE(chunk.status().has_value());
2027 EXPECT_EQ(chunk.status().value(), OkStatus());
2028
2029 EXPECT_EQ(transfer_status, OkStatus());
2030 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2031 0);
2032 }
2033
TEST_F(ReadTransfer,Version2_TimeoutDuringHandshake)2034 TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
2035 stream::MemoryWriterBuffer<64> writer;
2036 Status transfer_status = Status::Unknown();
2037
2038 ASSERT_EQ(
2039 OkStatus(),
2040 client_
2041 .Read(
2042 3,
2043 writer,
2044 [&transfer_status](Status status) { transfer_status = status; },
2045 cfg::kDefaultClientTimeout,
2046 cfg::kDefaultClientTimeout)
2047 .status());
2048
2049 transfer_thread_.WaitUntilEventIsProcessed();
2050
2051 // Initial chunk of the transfer is sent. This chunk should contain all the
2052 // fields from both legacy and version 2 protocols for backwards
2053 // compatibility.
2054 rpc::PayloadsView payloads =
2055 context_.output().payloads<Transfer::Read>(context_.channel().id());
2056 ASSERT_EQ(payloads.size(), 1u);
2057 EXPECT_EQ(transfer_status, Status::Unknown());
2058
2059 Chunk chunk = DecodeChunk(payloads.back());
2060 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2061 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2062 EXPECT_EQ(chunk.desired_session_id(), 1u);
2063 EXPECT_EQ(chunk.resource_id(), 3u);
2064 EXPECT_EQ(chunk.offset(), 0u);
2065 EXPECT_EQ(chunk.window_end_offset(), 37u);
2066 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2067
2068 // Wait for the timeout to expire without doing anything. The client should
2069 // resend the initial chunk.
2070 transfer_thread_.SimulateClientTimeout(1);
2071 ASSERT_EQ(payloads.size(), 2u);
2072
2073 chunk = DecodeChunk(payloads.back());
2074 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2075 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2076 EXPECT_EQ(chunk.session_id(), 1u);
2077 EXPECT_EQ(chunk.resource_id(), 3u);
2078 EXPECT_EQ(chunk.offset(), 0u);
2079 EXPECT_EQ(chunk.window_end_offset(), 37u);
2080 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2081
2082 // This time, the server responds, continuing the handshake and transfer.
2083 context_.server().SendServerStream<Transfer::Read>(
2084 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2085 .set_session_id(1)
2086 .set_resource_id(3)));
2087 transfer_thread_.WaitUntilEventIsProcessed();
2088
2089 ASSERT_EQ(payloads.size(), 3u);
2090
2091 chunk = DecodeChunk(payloads.back());
2092 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2093 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2094 EXPECT_EQ(chunk.session_id(), 1u);
2095 EXPECT_FALSE(chunk.resource_id().has_value());
2096 EXPECT_EQ(chunk.offset(), 0u);
2097 EXPECT_EQ(chunk.window_end_offset(), 37u);
2098 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2099
2100 context_.server().SendServerStream<Transfer::Read>(
2101 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2102 .set_session_id(1)
2103 .set_offset(0)
2104 .set_payload(kData32)
2105 .set_remaining_bytes(0)));
2106 transfer_thread_.WaitUntilEventIsProcessed();
2107
2108 ASSERT_EQ(payloads.size(), 4u);
2109
2110 chunk = DecodeChunk(payloads.back());
2111 EXPECT_EQ(chunk.session_id(), 1u);
2112 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2113 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2114 ASSERT_TRUE(chunk.status().has_value());
2115 EXPECT_EQ(chunk.status().value(), OkStatus());
2116
2117 EXPECT_EQ(transfer_status, OkStatus());
2118 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2119 0);
2120
2121 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2122 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2123 .set_session_id(1)));
2124 }
2125
TEST_F(ReadTransfer,Version2_TimeoutAfterHandshake)2126 TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
2127 stream::MemoryWriterBuffer<64> writer;
2128 Status transfer_status = Status::Unknown();
2129
2130 ASSERT_EQ(
2131 OkStatus(),
2132 client_
2133 .Read(
2134 3,
2135 writer,
2136 [&transfer_status](Status status) { transfer_status = status; },
2137 cfg::kDefaultClientTimeout,
2138 cfg::kDefaultClientTimeout)
2139 .status());
2140
2141 transfer_thread_.WaitUntilEventIsProcessed();
2142
2143 // Initial chunk of the transfer is sent. This chunk should contain all the
2144 // fields from both legacy and version 2 protocols for backwards
2145 // compatibility.
2146 rpc::PayloadsView payloads =
2147 context_.output().payloads<Transfer::Read>(context_.channel().id());
2148 ASSERT_EQ(payloads.size(), 1u);
2149 EXPECT_EQ(transfer_status, Status::Unknown());
2150
2151 Chunk chunk = DecodeChunk(payloads.back());
2152 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2153 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2154 EXPECT_EQ(chunk.desired_session_id(), 1u);
2155 EXPECT_EQ(chunk.resource_id(), 3u);
2156 EXPECT_EQ(chunk.offset(), 0u);
2157 EXPECT_EQ(chunk.window_end_offset(), 37u);
2158 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2159
2160 // The server responds with a START_ACK, continuing the version 2 handshake
2161 // and assigning a session_id to the transfer.
2162 context_.server().SendServerStream<Transfer::Read>(
2163 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2164 .set_session_id(1)
2165 .set_resource_id(3)));
2166 transfer_thread_.WaitUntilEventIsProcessed();
2167
2168 ASSERT_EQ(payloads.size(), 2u);
2169
2170 // Client should accept the session_id with a START_ACK_CONFIRMATION,
2171 // additionally containing the initial parameters for the read transfer.
2172 chunk = DecodeChunk(payloads.back());
2173 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2174 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2175 EXPECT_EQ(chunk.session_id(), 1u);
2176 EXPECT_FALSE(chunk.resource_id().has_value());
2177 EXPECT_EQ(chunk.offset(), 0u);
2178 EXPECT_EQ(chunk.window_end_offset(), 37u);
2179 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2180
2181 // Wait for the timeout to expire without doing anything. The client should
2182 // resend the confirmation chunk.
2183 transfer_thread_.SimulateClientTimeout(1);
2184 ASSERT_EQ(payloads.size(), 3u);
2185
2186 chunk = DecodeChunk(payloads.back());
2187 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2188 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2189 EXPECT_EQ(chunk.session_id(), 1u);
2190 EXPECT_FALSE(chunk.resource_id().has_value());
2191 EXPECT_EQ(chunk.offset(), 0u);
2192 EXPECT_EQ(chunk.window_end_offset(), 37u);
2193 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2194
2195 // The server responds and the transfer should continue normally.
2196 context_.server().SendServerStream<Transfer::Read>(
2197 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2198 .set_session_id(1)
2199 .set_offset(0)
2200 .set_payload(kData32)
2201 .set_remaining_bytes(0)));
2202 transfer_thread_.WaitUntilEventIsProcessed();
2203
2204 ASSERT_EQ(payloads.size(), 4u);
2205
2206 chunk = DecodeChunk(payloads.back());
2207 EXPECT_EQ(chunk.session_id(), 1u);
2208 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2209 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2210 ASSERT_TRUE(chunk.status().has_value());
2211 EXPECT_EQ(chunk.status().value(), OkStatus());
2212
2213 EXPECT_EQ(transfer_status, OkStatus());
2214 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2215 0);
2216
2217 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2218 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2219 .set_session_id(1)));
2220 }
2221
TEST_F(ReadTransfer,Version2_ServerErrorDuringHandshake)2222 TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
2223 stream::MemoryWriterBuffer<64> writer;
2224 Status transfer_status = Status::Unknown();
2225
2226 ASSERT_EQ(
2227 OkStatus(),
2228 client_
2229 .Read(
2230 3,
2231 writer,
2232 [&transfer_status](Status status) { transfer_status = status; },
2233 cfg::kDefaultClientTimeout,
2234 cfg::kDefaultClientTimeout)
2235 .status());
2236
2237 transfer_thread_.WaitUntilEventIsProcessed();
2238
2239 // Initial chunk of the transfer is sent. This chunk should contain all the
2240 // fields from both legacy and version 2 protocols for backwards
2241 // compatibility.
2242 rpc::PayloadsView payloads =
2243 context_.output().payloads<Transfer::Read>(context_.channel().id());
2244 ASSERT_EQ(payloads.size(), 1u);
2245 EXPECT_EQ(transfer_status, Status::Unknown());
2246
2247 Chunk chunk = DecodeChunk(payloads.back());
2248 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2249 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2250 EXPECT_EQ(chunk.desired_session_id(), 1u);
2251 EXPECT_EQ(chunk.resource_id(), 3u);
2252 EXPECT_EQ(chunk.offset(), 0u);
2253 EXPECT_EQ(chunk.window_end_offset(), 37u);
2254 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2255
2256 // The server responds to the start request with an error.
2257 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
2258 ProtocolVersion::kVersionTwo, 1, Status::Unauthenticated())));
2259 transfer_thread_.WaitUntilEventIsProcessed();
2260
2261 EXPECT_EQ(payloads.size(), 1u);
2262 EXPECT_EQ(transfer_status, Status::Unauthenticated());
2263 }
2264
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckRetries)2265 TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
2266 stream::MemoryWriterBuffer<64> writer;
2267 Status transfer_status = Status::Unknown();
2268
2269 ASSERT_EQ(
2270 OkStatus(),
2271 client_
2272 .Read(
2273 3,
2274 writer,
2275 [&transfer_status](Status status) { transfer_status = status; },
2276 cfg::kDefaultClientTimeout,
2277 cfg::kDefaultClientTimeout)
2278 .status());
2279
2280 transfer_thread_.WaitUntilEventIsProcessed();
2281
2282 // Initial chunk of the transfer is sent. This chunk should contain all the
2283 // fields from both legacy and version 2 protocols for backwards
2284 // compatibility.
2285 rpc::PayloadsView payloads =
2286 context_.output().payloads<Transfer::Read>(context_.channel().id());
2287 ASSERT_EQ(payloads.size(), 1u);
2288 EXPECT_EQ(transfer_status, Status::Unknown());
2289
2290 Chunk chunk = DecodeChunk(payloads[0]);
2291 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2292 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2293 EXPECT_EQ(chunk.desired_session_id(), 1u);
2294 EXPECT_EQ(chunk.resource_id(), 3u);
2295 EXPECT_EQ(chunk.offset(), 0u);
2296 EXPECT_EQ(chunk.window_end_offset(), 37u);
2297 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2298
2299 // The server responds with a START_ACK, continuing the version 2 handshake.
2300 context_.server().SendServerStream<Transfer::Read>(
2301 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2302 .set_session_id(1)
2303 .set_resource_id(3)));
2304 transfer_thread_.WaitUntilEventIsProcessed();
2305
2306 ASSERT_EQ(payloads.size(), 2u);
2307
2308 // Client should accept the session_id with a START_ACK_CONFIRMATION,
2309 // additionally containing the initial parameters for the read transfer.
2310 chunk = DecodeChunk(payloads.back());
2311 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2312 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2313 EXPECT_EQ(chunk.session_id(), 1u);
2314 EXPECT_FALSE(chunk.resource_id().has_value());
2315 EXPECT_EQ(chunk.offset(), 0u);
2316 EXPECT_EQ(chunk.window_end_offset(), 37u);
2317 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2318
2319 // Send all the transfer data. Client should accept it and complete the
2320 // transfer.
2321 context_.server().SendServerStream<Transfer::Read>(
2322 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2323 .set_session_id(1)
2324 .set_offset(0)
2325 .set_payload(kData32)
2326 .set_remaining_bytes(0)));
2327 transfer_thread_.WaitUntilEventIsProcessed();
2328
2329 ASSERT_EQ(payloads.size(), 3u);
2330
2331 chunk = DecodeChunk(payloads.back());
2332 EXPECT_EQ(chunk.session_id(), 1u);
2333 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2334 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2335 ASSERT_TRUE(chunk.status().has_value());
2336 EXPECT_EQ(chunk.status().value(), OkStatus());
2337
2338 EXPECT_EQ(transfer_status, OkStatus());
2339 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2340 0);
2341
2342 // Time out instead of sending a completion ACK. THe transfer should resend
2343 // its completion chunk.
2344 transfer_thread_.SimulateClientTimeout(1);
2345 ASSERT_EQ(payloads.size(), 4u);
2346
2347 // Reset transfer_status to check whether the handler is called again.
2348 transfer_status = Status::Unknown();
2349
2350 chunk = DecodeChunk(payloads.back());
2351 EXPECT_EQ(chunk.session_id(), 1u);
2352 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2353 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2354 ASSERT_TRUE(chunk.status().has_value());
2355 EXPECT_EQ(chunk.status().value(), OkStatus());
2356
2357 // Transfer handler should not be called a second time in response to the
2358 // re-sent completion chunk.
2359 EXPECT_EQ(transfer_status, Status::Unknown());
2360
2361 // Send a completion ACK to end the transfer.
2362 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2363 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2364 .set_session_id(1)));
2365 transfer_thread_.WaitUntilEventIsProcessed();
2366
2367 // No further chunks should be sent following the ACK.
2368 transfer_thread_.SimulateClientTimeout(1);
2369 ASSERT_EQ(payloads.size(), 4u);
2370 }
2371
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries)2372 TEST_F(ReadTransfer,
2373 Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) {
2374 stream::MemoryWriterBuffer<64> writer;
2375 Status transfer_status = Status::Unknown();
2376
2377 ASSERT_EQ(
2378 OkStatus(),
2379 client_
2380 .Read(
2381 3,
2382 writer,
2383 [&transfer_status](Status status) { transfer_status = status; },
2384 cfg::kDefaultClientTimeout,
2385 cfg::kDefaultClientTimeout)
2386 .status());
2387
2388 transfer_thread_.WaitUntilEventIsProcessed();
2389
2390 // Initial chunk of the transfer is sent. This chunk should contain all the
2391 // fields from both legacy and version 2 protocols for backwards
2392 // compatibility.
2393 rpc::PayloadsView payloads =
2394 context_.output().payloads<Transfer::Read>(context_.channel().id());
2395 ASSERT_EQ(payloads.size(), 1u);
2396 EXPECT_EQ(transfer_status, Status::Unknown());
2397
2398 Chunk chunk = DecodeChunk(payloads[0]);
2399 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2400 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2401 EXPECT_EQ(chunk.desired_session_id(), 1u);
2402 EXPECT_EQ(chunk.resource_id(), 3u);
2403 EXPECT_EQ(chunk.offset(), 0u);
2404 EXPECT_EQ(chunk.window_end_offset(), 37u);
2405 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2406
2407 // The server responds with a START_ACK, continuing the version 2 handshake
2408 // and assigning a session_id to the transfer.
2409 context_.server().SendServerStream<Transfer::Read>(
2410 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2411 .set_session_id(1)
2412 .set_resource_id(3)));
2413 transfer_thread_.WaitUntilEventIsProcessed();
2414
2415 ASSERT_EQ(payloads.size(), 2u);
2416
2417 // Client should accept the session_id with a START_ACK_CONFIRMATION,
2418 // additionally containing the initial parameters for the read transfer.
2419 chunk = DecodeChunk(payloads.back());
2420 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2421 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2422 EXPECT_EQ(chunk.session_id(), 1u);
2423 EXPECT_FALSE(chunk.resource_id().has_value());
2424 EXPECT_EQ(chunk.offset(), 0u);
2425 EXPECT_EQ(chunk.window_end_offset(), 37u);
2426 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2427
2428 // Send all the transfer data. Client should accept it and complete the
2429 // transfer.
2430 context_.server().SendServerStream<Transfer::Read>(
2431 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2432 .set_session_id(1)
2433 .set_offset(0)
2434 .set_payload(kData32)
2435 .set_remaining_bytes(0)));
2436 transfer_thread_.WaitUntilEventIsProcessed();
2437
2438 ASSERT_EQ(payloads.size(), 3u);
2439
2440 chunk = DecodeChunk(payloads.back());
2441 EXPECT_EQ(chunk.session_id(), 1u);
2442 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2443 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2444 ASSERT_TRUE(chunk.status().has_value());
2445 EXPECT_EQ(chunk.status().value(), OkStatus());
2446
2447 EXPECT_EQ(transfer_status, OkStatus());
2448 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2449 0);
2450
2451 // Time out instead of sending a completion ACK. THe transfer should resend
2452 // its completion chunk at first, then terminate after the maximum number of
2453 // retries.
2454 transfer_thread_.SimulateClientTimeout(1);
2455 ASSERT_EQ(payloads.size(), 4u); // Retry 1.
2456
2457 transfer_thread_.SimulateClientTimeout(1);
2458 ASSERT_EQ(payloads.size(), 5u); // Retry 2.
2459
2460 transfer_thread_.SimulateClientTimeout(1);
2461 ASSERT_EQ(payloads.size(), 6u); // Retry 3.
2462
2463 transfer_thread_.SimulateClientTimeout(1);
2464 ASSERT_EQ(payloads.size(), 6u); // No more retries; transfer has ended.
2465 }
2466
TEST_F(WriteTransfer,Version2_SingleChunk)2467 TEST_F(WriteTransfer, Version2_SingleChunk) {
2468 stream::MemoryReader reader(kData32);
2469 Status transfer_status = Status::Unknown();
2470
2471 ASSERT_EQ(
2472 OkStatus(),
2473 client_
2474 .Write(
2475 3,
2476 reader,
2477 [&transfer_status](Status status) { transfer_status = status; },
2478 cfg::kDefaultClientTimeout,
2479 cfg::kDefaultClientTimeout)
2480 .status());
2481 transfer_thread_.WaitUntilEventIsProcessed();
2482
2483 // The client begins by sending the ID of the resource to transfer.
2484 rpc::PayloadsView payloads =
2485 context_.output().payloads<Transfer::Write>(context_.channel().id());
2486 ASSERT_EQ(payloads.size(), 1u);
2487 EXPECT_EQ(transfer_status, Status::Unknown());
2488
2489 Chunk chunk = DecodeChunk(payloads.back());
2490 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2491 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2492 EXPECT_EQ(chunk.desired_session_id(), 1u);
2493 EXPECT_EQ(chunk.resource_id(), 3u);
2494
2495 // The server responds with a START_ACK, continuing the version 2 handshake.
2496 context_.server().SendServerStream<Transfer::Write>(
2497 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2498 .set_session_id(1)
2499 .set_resource_id(3)));
2500 transfer_thread_.WaitUntilEventIsProcessed();
2501
2502 ASSERT_EQ(payloads.size(), 2u);
2503
2504 // Client should accept the session_id with a START_ACK_CONFIRMATION.
2505 chunk = DecodeChunk(payloads.back());
2506 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2507 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2508 EXPECT_EQ(chunk.session_id(), 1u);
2509 EXPECT_FALSE(chunk.resource_id().has_value());
2510
2511 // The server can then begin the data transfer by sending its transfer
2512 // parameters. Client should respond with a data chunk and the final chunk.
2513 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2514 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2515 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2516 .set_session_id(1)
2517 .set_offset(0)
2518 .set_window_end_offset(64)
2519 .set_max_chunk_size_bytes(32)));
2520 });
2521
2522 ASSERT_EQ(payloads.size(), 4u);
2523
2524 chunk = DecodeChunk(payloads[2]);
2525 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2526 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2527 EXPECT_EQ(chunk.session_id(), 1u);
2528 EXPECT_EQ(chunk.offset(), 0u);
2529 EXPECT_TRUE(chunk.has_payload());
2530 EXPECT_EQ(std::memcmp(
2531 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2532 0);
2533
2534 chunk = DecodeChunk(payloads[3]);
2535 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2536 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2537 EXPECT_EQ(chunk.session_id(), 1u);
2538 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2539 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2540
2541 EXPECT_EQ(transfer_status, Status::Unknown());
2542
2543 // Send the final status chunk to complete the transfer.
2544 context_.server().SendServerStream<Transfer::Write>(
2545 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
2546 transfer_thread_.WaitUntilEventIsProcessed();
2547
2548 // Client should acknowledge the completion of the transfer.
2549 EXPECT_EQ(payloads.size(), 5u);
2550
2551 chunk = DecodeChunk(payloads[4]);
2552 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2553 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2554 EXPECT_EQ(chunk.session_id(), 1u);
2555
2556 EXPECT_EQ(transfer_status, OkStatus());
2557 }
2558
TEST_F(WriteTransfer,Version2_ServerRunsLegacy)2559 TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
2560 stream::MemoryReader reader(kData32);
2561 Status transfer_status = Status::Unknown();
2562
2563 ASSERT_EQ(
2564 OkStatus(),
2565 client_
2566 .Write(
2567 3,
2568 reader,
2569 [&transfer_status](Status status) { transfer_status = status; },
2570 cfg::kDefaultClientTimeout,
2571 cfg::kDefaultClientTimeout)
2572 .status());
2573 transfer_thread_.WaitUntilEventIsProcessed();
2574
2575 // The client begins by sending the ID of the resource to transfer.
2576 rpc::PayloadsView payloads =
2577 context_.output().payloads<Transfer::Write>(context_.channel().id());
2578 ASSERT_EQ(payloads.size(), 1u);
2579 EXPECT_EQ(transfer_status, Status::Unknown());
2580
2581 Chunk chunk = DecodeChunk(payloads.back());
2582 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2583 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2584 EXPECT_EQ(chunk.desired_session_id(), 1u);
2585 EXPECT_EQ(chunk.resource_id(), 3u);
2586
2587 // Instead of continuing the handshake with a START_ACK, the server
2588 // immediately sends parameters, indicating that it only supports the legacy
2589 // protocol. Client should switch over to legacy and continue the transfer.
2590 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2591 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2592 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
2593 .set_session_id(3)
2594 .set_offset(0)
2595 .set_window_end_offset(64)
2596 .set_max_chunk_size_bytes(32)));
2597 });
2598
2599 ASSERT_EQ(payloads.size(), 3u);
2600
2601 chunk = DecodeChunk(payloads[1]);
2602 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2603 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2604 EXPECT_EQ(chunk.session_id(), 3u);
2605 EXPECT_EQ(chunk.offset(), 0u);
2606 EXPECT_TRUE(chunk.has_payload());
2607 EXPECT_EQ(std::memcmp(
2608 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2609 0);
2610
2611 chunk = DecodeChunk(payloads[2]);
2612 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2613 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2614 EXPECT_EQ(chunk.session_id(), 3u);
2615 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2616 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2617
2618 EXPECT_EQ(transfer_status, Status::Unknown());
2619
2620 // Send the final status chunk to complete the transfer.
2621 context_.server().SendServerStream<Transfer::Write>(
2622 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
2623 transfer_thread_.WaitUntilEventIsProcessed();
2624
2625 EXPECT_EQ(payloads.size(), 3u);
2626 EXPECT_EQ(transfer_status, OkStatus());
2627 }
2628
TEST_F(WriteTransfer,Version2_RetryDuringHandshake)2629 TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
2630 stream::MemoryReader reader(kData32);
2631 Status transfer_status = Status::Unknown();
2632
2633 ASSERT_EQ(
2634 OkStatus(),
2635 client_
2636 .Write(
2637 3,
2638 reader,
2639 [&transfer_status](Status status) { transfer_status = status; },
2640 cfg::kDefaultClientTimeout,
2641 cfg::kDefaultClientTimeout)
2642 .status());
2643 transfer_thread_.WaitUntilEventIsProcessed();
2644
2645 // The client begins by sending the ID of the resource to transfer.
2646 rpc::PayloadsView payloads =
2647 context_.output().payloads<Transfer::Write>(context_.channel().id());
2648 ASSERT_EQ(payloads.size(), 1u);
2649 EXPECT_EQ(transfer_status, Status::Unknown());
2650
2651 Chunk chunk = DecodeChunk(payloads.back());
2652 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2653 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2654 EXPECT_EQ(chunk.desired_session_id(), 1u);
2655 EXPECT_EQ(chunk.resource_id(), 3u);
2656
2657 // Time out waiting for a server response. The client should resend the
2658 // initial packet.
2659 transfer_thread_.SimulateClientTimeout(1);
2660 ASSERT_EQ(payloads.size(), 2u);
2661
2662 chunk = DecodeChunk(payloads.back());
2663 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2664 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2665 EXPECT_EQ(chunk.desired_session_id(), 1u);
2666 EXPECT_EQ(chunk.resource_id(), 3u);
2667
2668 // This time, respond with the correct continuation packet. The transfer
2669 // should resume and complete normally.
2670 context_.server().SendServerStream<Transfer::Write>(
2671 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2672 .set_session_id(1)
2673 .set_resource_id(3)));
2674 transfer_thread_.WaitUntilEventIsProcessed();
2675
2676 ASSERT_EQ(payloads.size(), 3u);
2677
2678 chunk = DecodeChunk(payloads.back());
2679 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2680 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2681 EXPECT_EQ(chunk.session_id(), 1u);
2682 EXPECT_FALSE(chunk.resource_id().has_value());
2683
2684 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2685 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2686 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2687 .set_session_id(1)
2688 .set_offset(0)
2689 .set_window_end_offset(64)
2690 .set_max_chunk_size_bytes(32)));
2691 });
2692
2693 ASSERT_EQ(payloads.size(), 5u);
2694
2695 chunk = DecodeChunk(payloads[3]);
2696 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2697 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2698 EXPECT_EQ(chunk.session_id(), 1u);
2699 EXPECT_EQ(chunk.offset(), 0u);
2700 EXPECT_TRUE(chunk.has_payload());
2701 EXPECT_EQ(std::memcmp(
2702 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2703 0);
2704
2705 chunk = DecodeChunk(payloads[4]);
2706 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2707 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2708 EXPECT_EQ(chunk.session_id(), 1u);
2709 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2710 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2711
2712 EXPECT_EQ(transfer_status, Status::Unknown());
2713
2714 context_.server().SendServerStream<Transfer::Write>(
2715 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
2716 transfer_thread_.WaitUntilEventIsProcessed();
2717
2718 // Client should acknowledge the completion of the transfer.
2719 EXPECT_EQ(payloads.size(), 6u);
2720
2721 chunk = DecodeChunk(payloads[5]);
2722 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2723 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2724 EXPECT_EQ(chunk.session_id(), 1u);
2725
2726 EXPECT_EQ(transfer_status, OkStatus());
2727 }
2728
TEST_F(WriteTransfer,Version2_RetryAfterHandshake)2729 TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
2730 stream::MemoryReader reader(kData32);
2731 Status transfer_status = Status::Unknown();
2732
2733 ASSERT_EQ(
2734 OkStatus(),
2735 client_
2736 .Write(
2737 3,
2738 reader,
2739 [&transfer_status](Status status) { transfer_status = status; },
2740 cfg::kDefaultClientTimeout,
2741 cfg::kDefaultClientTimeout)
2742 .status());
2743 transfer_thread_.WaitUntilEventIsProcessed();
2744
2745 // The client begins by sending the ID of the resource to transfer.
2746 rpc::PayloadsView payloads =
2747 context_.output().payloads<Transfer::Write>(context_.channel().id());
2748 ASSERT_EQ(payloads.size(), 1u);
2749 EXPECT_EQ(transfer_status, Status::Unknown());
2750
2751 Chunk chunk = DecodeChunk(payloads.back());
2752 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2753 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2754 EXPECT_EQ(chunk.desired_session_id(), 1u);
2755 EXPECT_EQ(chunk.resource_id(), 3u);
2756
2757 // The server responds with a START_ACK, continuing the version 2 handshake.
2758 context_.server().SendServerStream<Transfer::Write>(
2759 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2760 .set_session_id(1)
2761 .set_resource_id(3)));
2762 transfer_thread_.WaitUntilEventIsProcessed();
2763
2764 ASSERT_EQ(payloads.size(), 2u);
2765
2766 // Client should accept the session_id with a START_ACK_CONFIRMATION.
2767 chunk = DecodeChunk(payloads.back());
2768 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2769 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2770 EXPECT_EQ(chunk.session_id(), 1u);
2771 EXPECT_FALSE(chunk.resource_id().has_value());
2772
2773 // Time out waiting for a server response. The client should resend the
2774 // initial packet.
2775 transfer_thread_.SimulateClientTimeout(1);
2776 ASSERT_EQ(payloads.size(), 3u);
2777
2778 chunk = DecodeChunk(payloads.back());
2779 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2780 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2781 EXPECT_EQ(chunk.session_id(), 1u);
2782 EXPECT_FALSE(chunk.resource_id().has_value());
2783
2784 // This time, respond with the first transfer parameters chunk. The transfer
2785 // should resume and complete normally.
2786 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2787 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2788 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2789 .set_session_id(1)
2790 .set_offset(0)
2791 .set_window_end_offset(64)
2792 .set_max_chunk_size_bytes(32)));
2793 });
2794
2795 ASSERT_EQ(payloads.size(), 5u);
2796
2797 chunk = DecodeChunk(payloads[3]);
2798 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2799 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2800 EXPECT_EQ(chunk.session_id(), 1u);
2801 EXPECT_EQ(chunk.offset(), 0u);
2802 EXPECT_TRUE(chunk.has_payload());
2803 EXPECT_EQ(std::memcmp(
2804 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2805 0);
2806
2807 chunk = DecodeChunk(payloads[4]);
2808 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2809 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2810 EXPECT_EQ(chunk.session_id(), 1u);
2811 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2812 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2813
2814 EXPECT_EQ(transfer_status, Status::Unknown());
2815
2816 context_.server().SendServerStream<Transfer::Write>(
2817 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
2818 transfer_thread_.WaitUntilEventIsProcessed();
2819
2820 // Client should acknowledge the completion of the transfer.
2821 EXPECT_EQ(payloads.size(), 6u);
2822
2823 chunk = DecodeChunk(payloads[5]);
2824 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2825 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2826 EXPECT_EQ(chunk.session_id(), 1u);
2827
2828 EXPECT_EQ(transfer_status, OkStatus());
2829 }
2830
TEST_F(WriteTransfer,Version2_ServerErrorDuringHandshake)2831 TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
2832 stream::MemoryReader reader(kData32);
2833 Status transfer_status = Status::Unknown();
2834
2835 ASSERT_EQ(
2836 OkStatus(),
2837 client_
2838 .Write(
2839 3,
2840 reader,
2841 [&transfer_status](Status status) { transfer_status = status; },
2842 cfg::kDefaultClientTimeout,
2843 cfg::kDefaultClientTimeout)
2844 .status());
2845 transfer_thread_.WaitUntilEventIsProcessed();
2846
2847 // The client begins by sending the ID of the resource to transfer.
2848 rpc::PayloadsView payloads =
2849 context_.output().payloads<Transfer::Write>(context_.channel().id());
2850 ASSERT_EQ(payloads.size(), 1u);
2851 EXPECT_EQ(transfer_status, Status::Unknown());
2852
2853 Chunk chunk = DecodeChunk(payloads.back());
2854 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2855 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2856 EXPECT_EQ(chunk.desired_session_id(), 1u);
2857 EXPECT_EQ(chunk.resource_id(), 3u);
2858
2859 // The server responds to the start request with an error.
2860 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2861 Chunk::Final(ProtocolVersion::kVersionTwo, 1, Status::NotFound())));
2862 transfer_thread_.WaitUntilEventIsProcessed();
2863
2864 EXPECT_EQ(payloads.size(), 1u);
2865 EXPECT_EQ(transfer_status, Status::NotFound());
2866 }
2867
2868 class ReadTransferMaxBytes256 : public ReadTransfer {
2869 protected:
ReadTransferMaxBytes256()2870 ReadTransferMaxBytes256() : ReadTransfer(/*max_bytes_to_receive=*/256) {}
2871 };
2872
TEST_F(ReadTransferMaxBytes256,Version2_AdapativeWindow_SlowStart)2873 TEST_F(ReadTransferMaxBytes256, Version2_AdapativeWindow_SlowStart) {
2874 stream::MemoryWriterBuffer<256> writer;
2875 Status transfer_status = Status::Unknown();
2876
2877 constexpr size_t kExpectedMaxChunkSize = 37;
2878
2879 ASSERT_EQ(
2880 OkStatus(),
2881 client_
2882 .Read(
2883 3,
2884 writer,
2885 [&transfer_status](Status status) { transfer_status = status; },
2886 cfg::kDefaultClientTimeout,
2887 cfg::kDefaultClientTimeout)
2888 .status());
2889 transfer_thread_.WaitUntilEventIsProcessed();
2890
2891 // Initial chunk of the transfer is sent. This chunk should contain all the
2892 // fields from both legacy and version 2 protocols for backwards
2893 // compatibility.
2894 rpc::PayloadsView payloads =
2895 context_.output().payloads<Transfer::Read>(context_.channel().id());
2896
2897 ASSERT_EQ(payloads.size(), 1u);
2898 EXPECT_EQ(transfer_status, Status::Unknown());
2899
2900 Chunk chunk = DecodeChunk(payloads[0]);
2901 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2902 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2903 EXPECT_EQ(chunk.desired_session_id(), 1u);
2904 EXPECT_EQ(chunk.resource_id(), 3u);
2905 EXPECT_EQ(chunk.offset(), 0u);
2906 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2907 EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
2908
2909 // The server responds with a START_ACK, continuing the version 2 handshake.
2910 context_.server().SendServerStream<Transfer::Read>(
2911 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2912 .set_session_id(1)
2913 .set_resource_id(3)));
2914 transfer_thread_.WaitUntilEventIsProcessed();
2915
2916 ASSERT_EQ(payloads.size(), 2u);
2917
2918 // Client should accept the session_id with a START_ACK_CONFIRMATION,
2919 // additionally containing the initial parameters for the read transfer.
2920 chunk = DecodeChunk(payloads.back());
2921 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2922 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2923 EXPECT_FALSE(chunk.desired_session_id().has_value());
2924 EXPECT_EQ(chunk.session_id(), 1u);
2925 EXPECT_FALSE(chunk.resource_id().has_value());
2926 EXPECT_EQ(chunk.offset(), 0u);
2927 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2928 EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
2929
2930 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2931 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2932 .set_session_id(1)
2933 .set_offset(0)
2934 .set_payload(span(kData256).first(kExpectedMaxChunkSize))));
2935 transfer_thread_.WaitUntilEventIsProcessed();
2936
2937 ASSERT_EQ(payloads.size(), 3u);
2938
2939 // Window size should double in response to successful receipt.
2940 chunk = DecodeChunk(payloads.back());
2941 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2942 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2943 EXPECT_FALSE(chunk.desired_session_id().has_value());
2944 EXPECT_EQ(chunk.session_id(), 1u);
2945 EXPECT_FALSE(chunk.resource_id().has_value());
2946 EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
2947 EXPECT_EQ(chunk.window_end_offset(),
2948 chunk.offset() + 2 * kExpectedMaxChunkSize);
2949
2950 // Send the next chunk.
2951 context_.server().SendServerStream<Transfer::Read>(
2952 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2953 .set_session_id(1)
2954 .set_offset(chunk.offset())
2955 .set_payload(span(kData256).subspan(
2956 chunk.offset(), kExpectedMaxChunkSize))));
2957 transfer_thread_.WaitUntilEventIsProcessed();
2958
2959 ASSERT_EQ(payloads.size(), 4u);
2960
2961 // Window size should double in response to successful receipt.
2962 chunk = DecodeChunk(payloads.back());
2963 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2964 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2965 EXPECT_FALSE(chunk.desired_session_id().has_value());
2966 EXPECT_EQ(chunk.session_id(), 1u);
2967 EXPECT_FALSE(chunk.resource_id().has_value());
2968 EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2969 EXPECT_EQ(chunk.window_end_offset(),
2970 chunk.offset() + 4 * kExpectedMaxChunkSize);
2971
2972 // Finish the transfer.
2973 context_.server().SendServerStream<Transfer::Read>(
2974 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2975 .set_session_id(1)
2976 .set_offset(chunk.offset())
2977 .set_payload(span(kData256).subspan(
2978 chunk.offset(), kExpectedMaxChunkSize))
2979 .set_remaining_bytes(0)));
2980 transfer_thread_.WaitUntilEventIsProcessed();
2981
2982 ASSERT_EQ(payloads.size(), 5u);
2983
2984 chunk = DecodeChunk(payloads.back());
2985 EXPECT_EQ(chunk.session_id(), 1u);
2986 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2987 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2988 ASSERT_TRUE(chunk.status().has_value());
2989 EXPECT_EQ(chunk.status().value(), OkStatus());
2990
2991 EXPECT_EQ(transfer_status, OkStatus());
2992 EXPECT_EQ(std::memcmp(writer.data(), kData256.data(), writer.bytes_written()),
2993 0);
2994
2995 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2996 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2997 .set_session_id(1)));
2998 }
2999
TEST_F(ReadTransferMaxBytes256,Version2_AdapativeWindow_CongestionAvoidance)3000 TEST_F(ReadTransferMaxBytes256, Version2_AdapativeWindow_CongestionAvoidance) {
3001 stream::MemoryWriterBuffer<256> writer;
3002 Status transfer_status = Status::Unknown();
3003
3004 constexpr size_t kExpectedMaxChunkSize = 37;
3005
3006 ASSERT_EQ(
3007 OkStatus(),
3008 client_
3009 .Read(
3010 3,
3011 writer,
3012 [&transfer_status](Status status) { transfer_status = status; },
3013 cfg::kDefaultClientTimeout,
3014 cfg::kDefaultClientTimeout)
3015 .status());
3016
3017 transfer_thread_.WaitUntilEventIsProcessed();
3018
3019 // Initial chunk of the transfer is sent. This chunk should contain all the
3020 // fields from both legacy and version 2 protocols for backwards
3021 // compatibility.
3022 rpc::PayloadsView payloads =
3023 context_.output().payloads<Transfer::Read>(context_.channel().id());
3024 ASSERT_EQ(payloads.size(), 1u);
3025 EXPECT_EQ(transfer_status, Status::Unknown());
3026
3027 Chunk chunk = DecodeChunk(payloads[0]);
3028 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3029 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3030 EXPECT_EQ(chunk.desired_session_id(), 1u);
3031 EXPECT_EQ(chunk.resource_id(), 3u);
3032 EXPECT_EQ(chunk.offset(), 0u);
3033 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3034 EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
3035
3036 // The server responds with a START_ACK, continuing the version 2 handshake.
3037 context_.server().SendServerStream<Transfer::Read>(
3038 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3039 .set_session_id(1)
3040 .set_resource_id(3)));
3041 transfer_thread_.WaitUntilEventIsProcessed();
3042
3043 ASSERT_EQ(payloads.size(), 2u);
3044
3045 // Client should accept the session_id with a START_ACK_CONFIRMATION,
3046 // additionally containing the initial parameters for the read transfer.
3047 chunk = DecodeChunk(payloads.back());
3048 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3049 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3050 EXPECT_FALSE(chunk.desired_session_id().has_value());
3051 EXPECT_EQ(chunk.session_id(), 1u);
3052 EXPECT_FALSE(chunk.resource_id().has_value());
3053 EXPECT_EQ(chunk.offset(), 0u);
3054 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3055 EXPECT_EQ(chunk.max_chunk_size_bytes(), kExpectedMaxChunkSize);
3056
3057 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
3058 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3059 .set_session_id(1)
3060 .set_offset(0)
3061 .set_payload(span(kData256).first(kExpectedMaxChunkSize))));
3062 transfer_thread_.WaitUntilEventIsProcessed();
3063
3064 ASSERT_EQ(payloads.size(), 3u);
3065
3066 // Window size should double in response to successful receipt.
3067 chunk = DecodeChunk(payloads.back());
3068 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3069 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3070 EXPECT_FALSE(chunk.desired_session_id().has_value());
3071 EXPECT_EQ(chunk.session_id(), 1u);
3072 EXPECT_FALSE(chunk.resource_id().has_value());
3073 EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
3074 EXPECT_EQ(chunk.window_end_offset(),
3075 chunk.offset() + 2 * kExpectedMaxChunkSize);
3076
3077 // Send the next chunk.
3078 context_.server().SendServerStream<Transfer::Read>(
3079 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3080 .set_session_id(1)
3081 .set_offset(chunk.offset())
3082 .set_payload(span(kData256).subspan(
3083 chunk.offset(), kExpectedMaxChunkSize))));
3084 transfer_thread_.WaitUntilEventIsProcessed();
3085
3086 ASSERT_EQ(payloads.size(), 4u);
3087
3088 // Window size should double in response to successful receipt.
3089 chunk = DecodeChunk(payloads.back());
3090 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3091 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3092 EXPECT_FALSE(chunk.desired_session_id().has_value());
3093 EXPECT_EQ(chunk.session_id(), 1u);
3094 EXPECT_FALSE(chunk.resource_id().has_value());
3095 EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
3096 EXPECT_EQ(chunk.window_end_offset(),
3097 chunk.offset() + 4 * kExpectedMaxChunkSize);
3098
3099 // Time out instead of sending another chunk.
3100 transfer_thread_.SimulateClientTimeout(1);
3101
3102 ASSERT_EQ(payloads.size(), 5u);
3103
3104 // Window size should half following data loss.
3105 chunk = DecodeChunk(payloads.back());
3106 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3107 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3108 EXPECT_FALSE(chunk.desired_session_id().has_value());
3109 EXPECT_EQ(chunk.session_id(), 1u);
3110 EXPECT_FALSE(chunk.resource_id().has_value());
3111 EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
3112 EXPECT_EQ(chunk.window_end_offset(),
3113 chunk.offset() + 2 * (kExpectedMaxChunkSize - 1));
3114
3115 // Send another chunk.
3116 context_.server().SendServerStream<Transfer::Read>(
3117 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3118 .set_session_id(1)
3119 .set_offset(chunk.offset())
3120 .set_payload(span(kData256).subspan(
3121 chunk.offset(), kExpectedMaxChunkSize - 1))));
3122 transfer_thread_.WaitUntilEventIsProcessed();
3123
3124 ASSERT_EQ(payloads.size(), 6u);
3125
3126 // Window size should now only increase by 1 instead of doubling.
3127 chunk = DecodeChunk(payloads.back());
3128 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3129 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3130 EXPECT_FALSE(chunk.desired_session_id().has_value());
3131 EXPECT_EQ(chunk.session_id(), 1u);
3132 EXPECT_FALSE(chunk.resource_id().has_value());
3133 EXPECT_EQ(chunk.offset(), 3 * kExpectedMaxChunkSize - 1);
3134 EXPECT_EQ(chunk.window_end_offset(),
3135 chunk.offset() + 3 * (kExpectedMaxChunkSize - 1));
3136
3137 // Finish the transfer.
3138 context_.server().SendServerStream<Transfer::Read>(
3139 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3140 .set_session_id(1)
3141 .set_offset(chunk.offset())
3142 .set_payload(span(kData256).subspan(
3143 chunk.offset(), kExpectedMaxChunkSize - 1))
3144 .set_remaining_bytes(0)));
3145 transfer_thread_.WaitUntilEventIsProcessed();
3146
3147 ASSERT_EQ(payloads.size(), 7u);
3148
3149 chunk = DecodeChunk(payloads.back());
3150 EXPECT_EQ(chunk.session_id(), 1u);
3151 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
3152 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3153 ASSERT_TRUE(chunk.status().has_value());
3154 EXPECT_EQ(chunk.status().value(), OkStatus());
3155
3156 EXPECT_EQ(transfer_status, OkStatus());
3157 EXPECT_EQ(std::memcmp(writer.data(), kData256.data(), writer.bytes_written()),
3158 0);
3159
3160 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
3161 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
3162 .set_session_id(1)));
3163 }
3164
TEST_F(WriteTransfer,Write_UpdateTransferSize)3165 TEST_F(WriteTransfer, Write_UpdateTransferSize) {
3166 FakeNonSeekableReader reader(kData32);
3167 Status transfer_status = Status::Unknown();
3168
3169 Result<Client::Handle> result = client_.Write(
3170 91,
3171 reader,
3172 [&transfer_status](Status status) { transfer_status = status; },
3173 kTestTimeout);
3174 ASSERT_EQ(OkStatus(), result.status());
3175 transfer_thread_.WaitUntilEventIsProcessed();
3176
3177 Client::Handle handle = *result;
3178 handle.SetTransferSize(kData32.size());
3179 transfer_thread_.WaitUntilEventIsProcessed();
3180
3181 // Initial chunk of the transfer is sent. This chunk should contain all the
3182 // fields from both legacy and version 2 protocols for backwards
3183 // compatibility.
3184 rpc::PayloadsView payloads =
3185 context_.output().payloads<Transfer::Write>(context_.channel().id());
3186 ASSERT_EQ(payloads.size(), 1u);
3187 EXPECT_EQ(transfer_status, Status::Unknown());
3188
3189 Chunk chunk = DecodeChunk(payloads[0]);
3190 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3191 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3192 EXPECT_EQ(chunk.desired_session_id(), 1u);
3193 EXPECT_EQ(chunk.resource_id(), 91u);
3194 EXPECT_EQ(chunk.offset(), 0u);
3195
3196 // The server responds with a START_ACK, continuing the version 2 handshake.
3197 context_.server().SendServerStream<Transfer::Write>(
3198 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3199 .set_session_id(1)
3200 .set_resource_id(91)));
3201 transfer_thread_.WaitUntilEventIsProcessed();
3202
3203 ASSERT_EQ(payloads.size(), 2u);
3204
3205 // Client should accept the session_id with a START_ACK_CONFIRMATION.
3206 chunk = DecodeChunk(payloads.back());
3207 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3208 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3209 EXPECT_EQ(chunk.session_id(), 1u);
3210 EXPECT_FALSE(chunk.resource_id().has_value());
3211
3212 // The server can then begin the data transfer by sending its transfer
3213 // parameters. Client should respond with data chunks.
3214 rpc::test::WaitForPackets(context_.output(), 4, [this] {
3215 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
3216 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
3217 .set_session_id(1)
3218 .set_offset(0)
3219 .set_window_end_offset(64)
3220 .set_max_chunk_size_bytes(8)));
3221 });
3222
3223 ASSERT_EQ(payloads.size(), 6u);
3224
3225 // Each 8-byte chunk of the 32-byte transfer should have an appropriate
3226 // `remaining_bytes` value set.
3227 chunk = DecodeChunk(payloads[2]);
3228 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3229 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3230 EXPECT_EQ(chunk.session_id(), 1u);
3231 EXPECT_EQ(chunk.offset(), 0u);
3232 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3233 EXPECT_EQ(chunk.remaining_bytes().value(), 24u);
3234
3235 chunk = DecodeChunk(payloads[3]);
3236 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3237 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3238 EXPECT_EQ(chunk.session_id(), 1u);
3239 EXPECT_EQ(chunk.offset(), 8u);
3240 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3241 EXPECT_EQ(chunk.remaining_bytes().value(), 16u);
3242
3243 chunk = DecodeChunk(payloads[4]);
3244 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3245 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3246 EXPECT_EQ(chunk.session_id(), 1u);
3247 EXPECT_EQ(chunk.offset(), 16u);
3248 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3249 EXPECT_EQ(chunk.remaining_bytes().value(), 8u);
3250
3251 chunk = DecodeChunk(payloads[5]);
3252 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3253 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3254 EXPECT_EQ(chunk.session_id(), 1u);
3255 EXPECT_EQ(chunk.offset(), 24u);
3256 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3257 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
3258
3259 EXPECT_EQ(transfer_status, Status::Unknown());
3260
3261 // Send the final status chunk to complete the transfer.
3262 context_.server().SendServerStream<Transfer::Write>(
3263 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
3264 transfer_thread_.WaitUntilEventIsProcessed();
3265
3266 // Client should acknowledge the completion of the transfer.
3267 ASSERT_EQ(payloads.size(), 7u);
3268
3269 chunk = DecodeChunk(payloads[6]);
3270 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
3271 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3272 EXPECT_EQ(chunk.session_id(), 1u);
3273
3274 EXPECT_EQ(transfer_status, OkStatus());
3275
3276 // Ensure we don't leave a dangling reference to transfer_status.
3277 handle.Cancel();
3278 transfer_thread_.WaitUntilEventIsProcessed();
3279 }
3280
TEST_F(WriteTransfer,Write_TransferSize_Large)3281 TEST_F(WriteTransfer, Write_TransferSize_Large) {
3282 FakeNonSeekableReader reader(kData64);
3283 Status transfer_status = Status::Unknown();
3284
3285 Result<Client::Handle> result = client_.Write(
3286 91,
3287 reader,
3288 [&transfer_status](Status status) { transfer_status = status; },
3289 kTestTimeout);
3290 ASSERT_EQ(OkStatus(), result.status());
3291 transfer_thread_.WaitUntilEventIsProcessed();
3292
3293 // Set a large transfer size that will encode to a multibyte varint.
3294 constexpr size_t kLargeRemainingBytes = 1u << 28;
3295 Client::Handle handle = *result;
3296 handle.SetTransferSize(kLargeRemainingBytes);
3297 transfer_thread_.WaitUntilEventIsProcessed();
3298
3299 // Initial chunk of the transfer is sent. This chunk should contain all the
3300 // fields from both legacy and version 2 protocols for backwards
3301 // compatibility.
3302 rpc::PayloadsView payloads =
3303 context_.output().payloads<Transfer::Write>(context_.channel().id());
3304 ASSERT_EQ(payloads.size(), 1u);
3305 EXPECT_EQ(transfer_status, Status::Unknown());
3306
3307 Chunk chunk = DecodeChunk(payloads[0]);
3308 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3309 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3310 EXPECT_EQ(chunk.desired_session_id(), 1u);
3311 EXPECT_EQ(chunk.resource_id(), 91u);
3312 EXPECT_EQ(chunk.offset(), 0u);
3313
3314 // The server responds with a START_ACK, continuing the version 2 handshake.
3315 context_.server().SendServerStream<Transfer::Write>(
3316 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3317 .set_session_id(1)
3318 .set_resource_id(91)));
3319 transfer_thread_.WaitUntilEventIsProcessed();
3320
3321 ASSERT_EQ(payloads.size(), 2u);
3322
3323 // Client should accept the session_id with a START_ACK_CONFIRMATION.
3324 chunk = DecodeChunk(payloads.back());
3325 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3326 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3327 EXPECT_EQ(chunk.session_id(), 1u);
3328 EXPECT_FALSE(chunk.resource_id().has_value());
3329
3330 // The server can then begin the data transfer by sending its transfer
3331 // parameters. Client should respond with data chunks.
3332 rpc::test::WaitForPackets(context_.output(), 2, [this] {
3333 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
3334 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
3335 .set_session_id(1)
3336 .set_offset(0)
3337 .set_window_end_offset(64) // Only request one chunk.
3338 .set_max_chunk_size_bytes(64)));
3339 });
3340
3341 ASSERT_EQ(payloads.size(), 4u);
3342
3343 // The transfer should reserve appropriate space for the `remaining_bytes`
3344 // value and not fail to encode.
3345 chunk = DecodeChunk(payloads[2]);
3346 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3347 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3348 EXPECT_EQ(chunk.session_id(), 1u);
3349 EXPECT_EQ(chunk.offset(), 0u);
3350 EXPECT_EQ(chunk.payload().size_bytes(), 48u);
3351 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3352 EXPECT_EQ(chunk.remaining_bytes().value(),
3353 kLargeRemainingBytes - chunk.payload().size_bytes());
3354
3355 EXPECT_EQ(transfer_status, Status::Unknown());
3356
3357 // Send the final status chunk to complete the transfer.
3358 context_.server().SendServerStream<Transfer::Write>(
3359 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
3360 transfer_thread_.WaitUntilEventIsProcessed();
3361
3362 // Client should acknowledge the completion of the transfer.
3363 EXPECT_EQ(payloads.size(), 5u);
3364
3365 chunk = DecodeChunk(payloads[4]);
3366 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
3367 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3368 EXPECT_EQ(chunk.session_id(), 1u);
3369
3370 EXPECT_EQ(transfer_status, OkStatus());
3371
3372 // Ensure we don't leave a dangling reference to transfer_status.
3373 handle.Cancel();
3374 transfer_thread_.WaitUntilEventIsProcessed();
3375 }
3376
TEST_F(WriteTransfer,Write_TransferSize_SmallerThanResource)3377 TEST_F(WriteTransfer, Write_TransferSize_SmallerThanResource) {
3378 // 64 byte data, but only set a 16 byte size.
3379 constexpr size_t kSmallerTransferSize = 16;
3380 FakeNonSeekableReader reader(kData64);
3381 Status transfer_status = Status::Unknown();
3382
3383 Result<Client::Handle> result = client_.Write(
3384 92,
3385 reader,
3386 [&transfer_status](Status status) { transfer_status = status; },
3387 kTestTimeout);
3388 ASSERT_EQ(OkStatus(), result.status());
3389 transfer_thread_.WaitUntilEventIsProcessed();
3390
3391 Client::Handle handle = *result;
3392 handle.SetTransferSize(kSmallerTransferSize);
3393 transfer_thread_.WaitUntilEventIsProcessed();
3394
3395 // Initial chunk of the transfer is sent. This chunk should contain all the
3396 // fields from both legacy and version 2 protocols for backwards
3397 // compatibility.
3398 rpc::PayloadsView payloads =
3399 context_.output().payloads<Transfer::Write>(context_.channel().id());
3400 ASSERT_EQ(payloads.size(), 1u);
3401 EXPECT_EQ(transfer_status, Status::Unknown());
3402
3403 Chunk chunk = DecodeChunk(payloads[0]);
3404 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3405 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3406 EXPECT_EQ(chunk.desired_session_id(), 1u);
3407 EXPECT_EQ(chunk.resource_id(), 92u);
3408 EXPECT_EQ(chunk.offset(), 0u);
3409
3410 // The server responds with a START_ACK, continuing the version 2 handshake.
3411 context_.server().SendServerStream<Transfer::Write>(
3412 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3413 .set_session_id(1)
3414 .set_resource_id(92)));
3415 transfer_thread_.WaitUntilEventIsProcessed();
3416
3417 ASSERT_EQ(payloads.size(), 2u);
3418
3419 // Client should accept the session_id with a START_ACK_CONFIRMATION.
3420 chunk = DecodeChunk(payloads.back());
3421 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
3422 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3423 EXPECT_EQ(chunk.session_id(), 1u);
3424 EXPECT_FALSE(chunk.resource_id().has_value());
3425
3426 // The server can then begin the data transfer by sending its transfer
3427 // parameters. Client should respond with data chunks.
3428 rpc::test::WaitForPackets(context_.output(), 2, [this] {
3429 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
3430 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
3431 .set_session_id(1)
3432 .set_offset(0)
3433 .set_window_end_offset(64)
3434 .set_max_chunk_size_bytes(8)));
3435 });
3436
3437 ASSERT_EQ(payloads.size(), 4u);
3438
3439 // Each 8-byte chunk of the transfer should have an appropriate
3440 // `remaining_bytes` value set.
3441 chunk = DecodeChunk(payloads[2]);
3442 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3443 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3444 EXPECT_EQ(chunk.session_id(), 1u);
3445 EXPECT_EQ(chunk.offset(), 0u);
3446 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3447 EXPECT_EQ(chunk.remaining_bytes().value(), 8u);
3448
3449 chunk = DecodeChunk(payloads[3]);
3450 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
3451 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3452 EXPECT_EQ(chunk.session_id(), 1u);
3453 EXPECT_EQ(chunk.offset(), 8u);
3454 ASSERT_TRUE(chunk.remaining_bytes().has_value());
3455 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
3456
3457 EXPECT_EQ(transfer_status, Status::Unknown());
3458
3459 // Send the final status chunk to complete the transfer.
3460 context_.server().SendServerStream<Transfer::Write>(
3461 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
3462 transfer_thread_.WaitUntilEventIsProcessed();
3463
3464 // Client should acknowledge the completion of the transfer.
3465 ASSERT_EQ(payloads.size(), 5u);
3466
3467 chunk = DecodeChunk(payloads[4]);
3468 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
3469 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3470 EXPECT_EQ(chunk.session_id(), 1u);
3471
3472 EXPECT_EQ(transfer_status, OkStatus());
3473
3474 // Ensure we don't leave a dangling reference to transfer_status.
3475 handle.Cancel();
3476 transfer_thread_.WaitUntilEventIsProcessed();
3477 }
3478
TEST_F(ReadTransfer,Version2_CancelBeforeServerResponse)3479 TEST_F(ReadTransfer, Version2_CancelBeforeServerResponse) {
3480 stream::MemoryWriterBuffer<64> writer;
3481 Status transfer_status = Status::Unknown();
3482
3483 Result<Client::Handle> transfer = client_.Read(
3484 3,
3485 writer,
3486 [&transfer_status](Status status) { transfer_status = status; },
3487 cfg::kDefaultClientTimeout,
3488 cfg::kDefaultClientTimeout);
3489 ASSERT_EQ(transfer.status(), OkStatus());
3490
3491 transfer_thread_.WaitUntilEventIsProcessed();
3492
3493 // Initial chunk of the transfer is sent. This chunk should contain all the
3494 // fields from both legacy and version 2 protocols for backwards
3495 // compatibility.
3496 rpc::PayloadsView payloads =
3497 context_.output().payloads<Transfer::Read>(context_.channel().id());
3498 ASSERT_EQ(payloads.size(), 1u);
3499 EXPECT_EQ(transfer_status, Status::Unknown());
3500
3501 Chunk chunk = DecodeChunk(payloads[0]);
3502 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
3503 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3504 EXPECT_EQ(chunk.desired_session_id(), 1u);
3505 EXPECT_EQ(chunk.resource_id(), 3u);
3506 EXPECT_EQ(chunk.offset(), 0u);
3507 EXPECT_EQ(chunk.window_end_offset(), 37u);
3508 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
3509
3510 // Cancel the transfer before the server responds. Since no contact was made,
3511 // no cancellation chunk should be sent.
3512 transfer->Cancel();
3513 transfer_thread_.WaitUntilEventIsProcessed();
3514
3515 ASSERT_EQ(payloads.size(), 1u);
3516
3517 // The server responds after the cancellation. The client should notify it
3518 // that the transfer is no longer active.
3519 context_.server().SendServerStream<Transfer::Read>(
3520 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
3521 .set_session_id(1)
3522 .set_resource_id(3)));
3523 transfer_thread_.WaitUntilEventIsProcessed();
3524
3525 chunk = DecodeChunk(payloads.back());
3526 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
3527 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3528 EXPECT_EQ(chunk.status(), Status::Cancelled());
3529 }
3530
3531 } // namespace
3532 } // namespace pw::transfer::test
3533