1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16 #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17
18 #include "pw_transfer/client.h"
19
20 #include "pw_log/log.h"
21
22 namespace pw::transfer {
23
Read(uint32_t resource_id,stream::Writer & output,CompletionFunc && on_completion,ProtocolVersion protocol_version,chrono::SystemClock::duration timeout,chrono::SystemClock::duration initial_chunk_timeout,uint32_t initial_offset)24 Result<Client::Handle> Client::Read(
25 uint32_t resource_id,
26 stream::Writer& output,
27 CompletionFunc&& on_completion,
28 ProtocolVersion protocol_version,
29 chrono::SystemClock::duration timeout,
30 chrono::SystemClock::duration initial_chunk_timeout,
31 uint32_t initial_offset) {
32 if (on_completion == nullptr ||
33 protocol_version == ProtocolVersion::kUnknown) {
34 return Status::InvalidArgument();
35 }
36
37 if (protocol_version < ProtocolVersion::kVersionTwo && initial_offset != 0) {
38 return Status::InvalidArgument();
39 }
40
41 if (!has_read_stream_) {
42 rpc::RawClientReaderWriter read_stream =
43 client_.Read(nullptr, // on_next will be set by the transfer_thread.
44 [this](Status status) {
45 OnRpcError(status, internal::TransferType::kReceive);
46 });
47 transfer_thread_.SetClientReadStream(
48 read_stream, [this](ConstByteSpan chunk) {
49 transfer_thread_.ProcessClientChunk(chunk);
50 });
51 has_read_stream_ = true;
52 }
53
54 Handle handle = AssignHandle();
55
56 transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
57 protocol_version,
58 resource_id,
59 handle.id(),
60 &output,
61 max_parameters_,
62 std::move(on_completion),
63 timeout,
64 initial_chunk_timeout,
65 max_retries_,
66 max_lifetime_retries_,
67 initial_offset);
68 return handle;
69 }
70
Write(uint32_t resource_id,stream::Reader & input,CompletionFunc && on_completion,ProtocolVersion protocol_version,chrono::SystemClock::duration timeout,chrono::SystemClock::duration initial_chunk_timeout,uint32_t initial_offset)71 Result<Client::Handle> Client::Write(
72 uint32_t resource_id,
73 stream::Reader& input,
74 CompletionFunc&& on_completion,
75 ProtocolVersion protocol_version,
76 chrono::SystemClock::duration timeout,
77 chrono::SystemClock::duration initial_chunk_timeout,
78 uint32_t initial_offset) {
79 if (on_completion == nullptr ||
80 protocol_version == ProtocolVersion::kUnknown) {
81 return Status::InvalidArgument();
82 }
83
84 if (protocol_version < ProtocolVersion::kVersionTwo && initial_offset != 0) {
85 return Status::InvalidArgument();
86 }
87
88 if (!has_write_stream_) {
89 rpc::RawClientReaderWriter write_stream =
90 client_.Write(nullptr, // on_next will be set by the transfer thread.
91 [this](Status status) {
92 OnRpcError(status, internal::TransferType::kTransmit);
93 });
94 transfer_thread_.SetClientWriteStream(
95 write_stream, [this](ConstByteSpan chunk) {
96 transfer_thread_.ProcessClientChunk(chunk);
97 });
98 has_write_stream_ = true;
99 }
100
101 Handle handle = AssignHandle();
102
103 transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
104 protocol_version,
105 resource_id,
106 handle.id(),
107 &input,
108 max_parameters_,
109 std::move(on_completion),
110 timeout,
111 initial_chunk_timeout,
112 max_retries_,
113 max_lifetime_retries_,
114 initial_offset);
115
116 return handle;
117 }
118
AssignHandle()119 Client::Handle Client::AssignHandle() {
120 uint32_t handle_id = next_handle_id_++;
121 if (handle_id == Handle::kUnassignedHandleId) {
122 handle_id = next_handle_id_++;
123 }
124
125 return Handle(this, handle_id);
126 }
127
OnRpcError(Status status,internal::TransferType type)128 void Client::OnRpcError(Status status, internal::TransferType type) {
129 bool is_write_error = type == internal::TransferType::kTransmit;
130
131 PW_LOG_ERROR("Client %s stream terminated with status %d",
132 is_write_error ? "Write()" : "Read()",
133 status.code());
134
135 if (is_write_error) {
136 has_write_stream_ = false;
137 } else {
138 has_read_stream_ = false;
139 }
140 }
141
142 } // namespace pw::transfer
143