xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16cdef class Operation:
17
18  cdef void c(self) except *:
19    raise NotImplementedError()
20
21  cdef void un_c(self) except *:
22    raise NotImplementedError()
23
24
25cdef class SendInitialMetadataOperation(Operation):
26
27  def __cinit__(self, initial_metadata, flags):
28    self._initial_metadata = initial_metadata
29    self._flags = flags
30
31  def type(self):
32    return GRPC_OP_SEND_INITIAL_METADATA
33
34  cdef void c(self) except *:
35    self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
36    self.c_op.flags = self._flags
37    _store_c_metadata(
38        self._initial_metadata, &self._c_initial_metadata,
39        &self._c_initial_metadata_count)
40    self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata
41    self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count
42    self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0
43
44  cdef void un_c(self) except *:
45    _release_c_metadata(
46        self._c_initial_metadata, self._c_initial_metadata_count)
47
48
49cdef class SendMessageOperation(Operation):
50
51  def __cinit__(self, bytes message, int flags):
52    if message is None:
53      self._message = b''
54    else:
55      self._message = message
56    self._flags = flags
57
58  def type(self):
59    return GRPC_OP_SEND_MESSAGE
60
61  cdef void c(self) except *:
62    self.c_op.type = GRPC_OP_SEND_MESSAGE
63    self.c_op.flags = self._flags
64    cdef grpc_slice message_slice = grpc_slice_from_copied_buffer(
65        self._message, len(self._message))
66    self._c_message_byte_buffer = grpc_raw_byte_buffer_create(
67        &message_slice, 1)
68    grpc_slice_unref(message_slice)
69    self.c_op.data.send_message.send_message = self._c_message_byte_buffer
70
71  cdef void un_c(self) except *:
72    grpc_byte_buffer_destroy(self._c_message_byte_buffer)
73
74
75cdef class SendCloseFromClientOperation(Operation):
76
77  def __cinit__(self, int flags):
78    self._flags = flags
79
80  def type(self):
81    return GRPC_OP_SEND_CLOSE_FROM_CLIENT
82
83  cdef void c(self) except *:
84    self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
85    self.c_op.flags = self._flags
86
87  cdef void un_c(self) except *:
88    pass
89
90
91cdef class SendStatusFromServerOperation(Operation):
92
93  def __cinit__(self, trailing_metadata, code, object details, int flags):
94    self._trailing_metadata = trailing_metadata
95    self._code = code
96    self._details = details
97    self._flags = flags
98
99  def type(self):
100    return GRPC_OP_SEND_STATUS_FROM_SERVER
101
102  cdef void c(self) except *:
103    self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
104    self.c_op.flags = self._flags
105    _store_c_metadata(
106        self._trailing_metadata, &self._c_trailing_metadata,
107        &self._c_trailing_metadata_count)
108    self.c_op.data.send_status_from_server.trailing_metadata = (
109        self._c_trailing_metadata)
110    self.c_op.data.send_status_from_server.trailing_metadata_count = (
111        self._c_trailing_metadata_count)
112    self.c_op.data.send_status_from_server.status = self._code
113    self._c_details = _slice_from_bytes(_encode(self._details))
114    self.c_op.data.send_status_from_server.status_details = &self._c_details
115
116  cdef void un_c(self) except *:
117    grpc_slice_unref(self._c_details)
118    _release_c_metadata(
119        self._c_trailing_metadata, self._c_trailing_metadata_count)
120
121
122cdef class ReceiveInitialMetadataOperation(Operation):
123
124  def __cinit__(self, flags):
125    self._flags = flags
126
127  def type(self):
128    return GRPC_OP_RECV_INITIAL_METADATA
129
130  cdef void c(self) except *:
131    self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
132    self.c_op.flags = self._flags
133    grpc_metadata_array_init(&self._c_initial_metadata)
134    self.c_op.data.receive_initial_metadata.receive_initial_metadata = (
135        &self._c_initial_metadata)
136
137  cdef void un_c(self) except *:
138    self._initial_metadata = _metadata(&self._c_initial_metadata)
139    grpc_metadata_array_destroy(&self._c_initial_metadata)
140
141  def initial_metadata(self):
142    return self._initial_metadata
143
144
145cdef class ReceiveMessageOperation(Operation):
146
147  def __cinit__(self, flags):
148    self._flags = flags
149
150  def type(self):
151    return GRPC_OP_RECV_MESSAGE
152
153  cdef void c(self) except *:
154    self.c_op.type = GRPC_OP_RECV_MESSAGE
155    self.c_op.flags = self._flags
156    self.c_op.data.receive_message.receive_message = (
157        &self._c_message_byte_buffer)
158
159  cdef void un_c(self) except *:
160    cdef grpc_byte_buffer_reader message_reader
161    cdef bint message_reader_status
162    cdef grpc_slice message_slice
163    cdef size_t message_slice_length
164    cdef void *message_slice_pointer
165    if self._c_message_byte_buffer != NULL:
166      message_reader_status = grpc_byte_buffer_reader_init(
167          &message_reader, self._c_message_byte_buffer)
168      if message_reader_status:
169        message = bytearray()
170        while grpc_byte_buffer_reader_next(&message_reader, &message_slice):
171          message_slice_pointer = grpc_slice_start_ptr(message_slice)
172          message_slice_length = grpc_slice_length(message_slice)
173          message += (<char *>message_slice_pointer)[:message_slice_length]
174          grpc_slice_unref(message_slice)
175        grpc_byte_buffer_reader_destroy(&message_reader)
176        self._message = bytes(message)
177      else:
178        self._message = None
179      grpc_byte_buffer_destroy(self._c_message_byte_buffer)
180    else:
181      self._message = None
182
183  def message(self):
184    return self._message
185
186
187cdef class ReceiveStatusOnClientOperation(Operation):
188
189  def __cinit__(self, flags):
190    self._flags = flags
191
192  def type(self):
193    return GRPC_OP_RECV_STATUS_ON_CLIENT
194
195  cdef void c(self) except *:
196    self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
197    self.c_op.flags = self._flags
198    grpc_metadata_array_init(&self._c_trailing_metadata)
199    self.c_op.data.receive_status_on_client.trailing_metadata = (
200        &self._c_trailing_metadata)
201    self.c_op.data.receive_status_on_client.status = (
202        &self._c_code)
203    self.c_op.data.receive_status_on_client.status_details = (
204        &self._c_details)
205    self.c_op.data.receive_status_on_client.error_string = (
206        &self._c_error_string)
207
208  cdef void un_c(self) except *:
209    self._trailing_metadata = _metadata(&self._c_trailing_metadata)
210    grpc_metadata_array_destroy(&self._c_trailing_metadata)
211    self._code = self._c_code
212    self._details = _decode(_slice_bytes(self._c_details))
213    grpc_slice_unref(self._c_details)
214    if self._c_error_string != NULL:
215      self._error_string = _decode(self._c_error_string)
216      gpr_free(<void*>self._c_error_string)
217    else:
218      self._error_string = ""
219
220  def trailing_metadata(self):
221    return self._trailing_metadata
222
223  def code(self):
224    return self._code
225
226  def details(self):
227    return self._details
228
229  def error_string(self):
230    return self._error_string
231
232
233cdef class ReceiveCloseOnServerOperation(Operation):
234
235  def __cinit__(self, flags):
236    self._flags = flags
237
238  def type(self):
239    return GRPC_OP_RECV_CLOSE_ON_SERVER
240
241  cdef void c(self) except *:
242    self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
243    self.c_op.flags = self._flags
244    self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled
245
246  cdef void un_c(self) except *:
247    self._cancelled = bool(self._c_cancelled)
248
249  def cancelled(self):
250    return self._cancelled
251