xref: /aosp_15_r20/external/tink/testing/python/jwt_service.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1*e7b1675dSTing-Kang Chang# Copyright 2021 Google LLC
2*e7b1675dSTing-Kang Chang#
3*e7b1675dSTing-Kang Chang# Licensed under the Apache License, Version 2.0 (the "License");
4*e7b1675dSTing-Kang Chang# you may not use this file except in compliance with the License.
5*e7b1675dSTing-Kang Chang# You may obtain a copy of the License at
6*e7b1675dSTing-Kang Chang#
7*e7b1675dSTing-Kang Chang#      http://www.apache.org/licenses/LICENSE-2.0
8*e7b1675dSTing-Kang Chang#
9*e7b1675dSTing-Kang Chang# Unless required by applicable law or agreed to in writing, software
10*e7b1675dSTing-Kang Chang# distributed under the License is distributed on an "AS-IS" BASIS,
11*e7b1675dSTing-Kang Chang# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*e7b1675dSTing-Kang Chang# See the License for the specific language governing permissions and
13*e7b1675dSTing-Kang Chang# limitations under the License.
14*e7b1675dSTing-Kang Chang"""JWT testing service API implementations in Python."""
15*e7b1675dSTing-Kang Chang
16*e7b1675dSTing-Kang Changimport datetime
17*e7b1675dSTing-Kang Changimport io
18*e7b1675dSTing-Kang Changimport json
19*e7b1675dSTing-Kang Chang
20*e7b1675dSTing-Kang Changfrom typing import Tuple
21*e7b1675dSTing-Kang Chang
22*e7b1675dSTing-Kang Changimport grpc
23*e7b1675dSTing-Kang Changimport tink
24*e7b1675dSTing-Kang Changfrom tink import cleartext_keyset_handle
25*e7b1675dSTing-Kang Chang
26*e7b1675dSTing-Kang Changfrom tink import jwt
27*e7b1675dSTing-Kang Chang
28*e7b1675dSTing-Kang Changfrom google.protobuf import duration_pb2
29*e7b1675dSTing-Kang Changfrom google.protobuf import timestamp_pb2
30*e7b1675dSTing-Kang Chang
31*e7b1675dSTing-Kang Changfrom protos import testing_api_pb2
32*e7b1675dSTing-Kang Changfrom protos import testing_api_pb2_grpc
33*e7b1675dSTing-Kang Chang
34*e7b1675dSTing-Kang Chang
35*e7b1675dSTing-Kang Changdef _to_timestamp_tuple(t: datetime.datetime) -> Tuple[int, int]:
36*e7b1675dSTing-Kang Chang  if not t.tzinfo:
37*e7b1675dSTing-Kang Chang    raise ValueError('datetime must have tzinfo')
38*e7b1675dSTing-Kang Chang  seconds = int(t.timestamp())
39*e7b1675dSTing-Kang Chang  nanos = int((t.timestamp() - seconds) * 1e9)
40*e7b1675dSTing-Kang Chang  return (seconds, nanos)
41*e7b1675dSTing-Kang Chang
42*e7b1675dSTing-Kang Chang
43*e7b1675dSTing-Kang Changdef _from_timestamp_proto(
44*e7b1675dSTing-Kang Chang    timestamp: timestamp_pb2.Timestamp) -> datetime.datetime:
45*e7b1675dSTing-Kang Chang  t = timestamp.seconds + (timestamp.nanos / 1e9)
46*e7b1675dSTing-Kang Chang  return datetime.datetime.fromtimestamp(t, datetime.timezone.utc)
47*e7b1675dSTing-Kang Chang
48*e7b1675dSTing-Kang Chang
49*e7b1675dSTing-Kang Changdef _from_duration_proto(
50*e7b1675dSTing-Kang Chang    duration: duration_pb2.Duration) -> datetime.timedelta:
51*e7b1675dSTing-Kang Chang  return datetime.timedelta(seconds=duration.seconds)
52*e7b1675dSTing-Kang Chang
53*e7b1675dSTing-Kang Chang
54*e7b1675dSTing-Kang Changdef raw_jwt_from_proto(proto_raw_jwt: testing_api_pb2.JwtToken) -> jwt.RawJwt:
55*e7b1675dSTing-Kang Chang  """Converts a proto JwtToken into a jwt.RawJwt."""
56*e7b1675dSTing-Kang Chang  type_header = None
57*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('type_header'):
58*e7b1675dSTing-Kang Chang    type_header = proto_raw_jwt.type_header.value
59*e7b1675dSTing-Kang Chang  issuer = None
60*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('issuer'):
61*e7b1675dSTing-Kang Chang    issuer = proto_raw_jwt.issuer.value
62*e7b1675dSTing-Kang Chang  subject = None
63*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('subject'):
64*e7b1675dSTing-Kang Chang    subject = proto_raw_jwt.subject.value
65*e7b1675dSTing-Kang Chang  audiences = list(proto_raw_jwt.audiences)
66*e7b1675dSTing-Kang Chang  if not audiences:
67*e7b1675dSTing-Kang Chang    audiences = None
68*e7b1675dSTing-Kang Chang  jwt_id = None
69*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('jwt_id'):
70*e7b1675dSTing-Kang Chang    jwt_id = proto_raw_jwt.jwt_id.value
71*e7b1675dSTing-Kang Chang  custom_claims = {}
72*e7b1675dSTing-Kang Chang  for name, claim in proto_raw_jwt.custom_claims.items():
73*e7b1675dSTing-Kang Chang    if claim.HasField('null_value'):
74*e7b1675dSTing-Kang Chang      custom_claims[name] = None
75*e7b1675dSTing-Kang Chang    elif claim.HasField('number_value'):
76*e7b1675dSTing-Kang Chang      custom_claims[name] = claim.number_value
77*e7b1675dSTing-Kang Chang    elif claim.HasField('string_value'):
78*e7b1675dSTing-Kang Chang      custom_claims[name] = claim.string_value
79*e7b1675dSTing-Kang Chang    elif claim.HasField('bool_value'):
80*e7b1675dSTing-Kang Chang      custom_claims[name] = claim.bool_value
81*e7b1675dSTing-Kang Chang    elif claim.HasField('json_object_value'):
82*e7b1675dSTing-Kang Chang      custom_claims[name] = json.loads(claim.json_object_value)
83*e7b1675dSTing-Kang Chang    elif claim.HasField('json_array_value'):
84*e7b1675dSTing-Kang Chang      custom_claims[name] = json.loads(claim.json_array_value)
85*e7b1675dSTing-Kang Chang    else:
86*e7b1675dSTing-Kang Chang      raise ValueError('claim %s has unknown type' % name)
87*e7b1675dSTing-Kang Chang  expiration = None
88*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('expiration'):
89*e7b1675dSTing-Kang Chang    expiration = _from_timestamp_proto(proto_raw_jwt.expiration)
90*e7b1675dSTing-Kang Chang  not_before = None
91*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('not_before'):
92*e7b1675dSTing-Kang Chang    not_before = _from_timestamp_proto(proto_raw_jwt.not_before)
93*e7b1675dSTing-Kang Chang  issued_at = None
94*e7b1675dSTing-Kang Chang  if proto_raw_jwt.HasField('issued_at'):
95*e7b1675dSTing-Kang Chang    issued_at = _from_timestamp_proto(proto_raw_jwt.issued_at)
96*e7b1675dSTing-Kang Chang  without_expiration = not expiration
97*e7b1675dSTing-Kang Chang  return jwt.new_raw_jwt(
98*e7b1675dSTing-Kang Chang      type_header=type_header,
99*e7b1675dSTing-Kang Chang      issuer=issuer,
100*e7b1675dSTing-Kang Chang      subject=subject,
101*e7b1675dSTing-Kang Chang      audiences=audiences,
102*e7b1675dSTing-Kang Chang      jwt_id=jwt_id,
103*e7b1675dSTing-Kang Chang      expiration=expiration,
104*e7b1675dSTing-Kang Chang      without_expiration=without_expiration,
105*e7b1675dSTing-Kang Chang      not_before=not_before,
106*e7b1675dSTing-Kang Chang      issued_at=issued_at,
107*e7b1675dSTing-Kang Chang      custom_claims=custom_claims)
108*e7b1675dSTing-Kang Chang
109*e7b1675dSTing-Kang Chang
110*e7b1675dSTing-Kang Changdef verifiedjwt_to_proto(
111*e7b1675dSTing-Kang Chang    verified_jwt: jwt.VerifiedJwt) -> testing_api_pb2.JwtToken:
112*e7b1675dSTing-Kang Chang  """Converts a jwt.VerifiedJwt into a proto JwtToken."""
113*e7b1675dSTing-Kang Chang  token = testing_api_pb2.JwtToken()
114*e7b1675dSTing-Kang Chang  if verified_jwt.has_type_header():
115*e7b1675dSTing-Kang Chang    token.type_header.value = verified_jwt.type_header()
116*e7b1675dSTing-Kang Chang  if verified_jwt.has_issuer():
117*e7b1675dSTing-Kang Chang    token.issuer.value = verified_jwt.issuer()
118*e7b1675dSTing-Kang Chang  if verified_jwt.has_subject():
119*e7b1675dSTing-Kang Chang    token.subject.value = verified_jwt.subject()
120*e7b1675dSTing-Kang Chang  if verified_jwt.has_audiences():
121*e7b1675dSTing-Kang Chang    token.audiences.extend(verified_jwt.audiences())
122*e7b1675dSTing-Kang Chang  if verified_jwt.has_jwt_id():
123*e7b1675dSTing-Kang Chang    token.jwt_id.value = verified_jwt.jwt_id()
124*e7b1675dSTing-Kang Chang  if verified_jwt.has_expiration():
125*e7b1675dSTing-Kang Chang    seconds, nanos = _to_timestamp_tuple(verified_jwt.expiration())
126*e7b1675dSTing-Kang Chang    token.expiration.seconds = seconds
127*e7b1675dSTing-Kang Chang    token.expiration.nanos = nanos
128*e7b1675dSTing-Kang Chang  if verified_jwt.has_not_before():
129*e7b1675dSTing-Kang Chang    seconds, nanos = _to_timestamp_tuple(verified_jwt.not_before())
130*e7b1675dSTing-Kang Chang    token.not_before.seconds = seconds
131*e7b1675dSTing-Kang Chang    token.not_before.nanos = nanos
132*e7b1675dSTing-Kang Chang  if verified_jwt.has_issued_at():
133*e7b1675dSTing-Kang Chang    seconds, nanos = _to_timestamp_tuple(verified_jwt.issued_at())
134*e7b1675dSTing-Kang Chang    token.issued_at.seconds = seconds
135*e7b1675dSTing-Kang Chang    token.issued_at.nanos = nanos
136*e7b1675dSTing-Kang Chang  for name in verified_jwt.custom_claim_names():
137*e7b1675dSTing-Kang Chang    value = verified_jwt.custom_claim(name)
138*e7b1675dSTing-Kang Chang    if value is None:
139*e7b1675dSTing-Kang Chang      token.custom_claims[name].null_value = testing_api_pb2.NULL_VALUE
140*e7b1675dSTing-Kang Chang    elif isinstance(value, bool):
141*e7b1675dSTing-Kang Chang      token.custom_claims[name].bool_value = value
142*e7b1675dSTing-Kang Chang    elif isinstance(value, (int, float)):
143*e7b1675dSTing-Kang Chang      token.custom_claims[name].number_value = value
144*e7b1675dSTing-Kang Chang    elif isinstance(value, str):
145*e7b1675dSTing-Kang Chang      token.custom_claims[name].string_value = value
146*e7b1675dSTing-Kang Chang    elif isinstance(value, dict):
147*e7b1675dSTing-Kang Chang      token.custom_claims[name].json_object_value = json.dumps(value)
148*e7b1675dSTing-Kang Chang    elif isinstance(value, list):
149*e7b1675dSTing-Kang Chang      token.custom_claims[name].json_array_value = json.dumps(value)
150*e7b1675dSTing-Kang Chang    else:
151*e7b1675dSTing-Kang Chang      raise ValueError('claim %s has unknown type' % name)
152*e7b1675dSTing-Kang Chang  return token
153*e7b1675dSTing-Kang Chang
154*e7b1675dSTing-Kang Chang
155*e7b1675dSTing-Kang Changdef validator_from_proto(
156*e7b1675dSTing-Kang Chang    proto_validator: testing_api_pb2.JwtValidator) -> jwt.JwtValidator:
157*e7b1675dSTing-Kang Chang  """Converts a proto JwtValidator into a JwtValidator."""
158*e7b1675dSTing-Kang Chang  expected_type_header = None
159*e7b1675dSTing-Kang Chang  if proto_validator.HasField('expected_type_header'):
160*e7b1675dSTing-Kang Chang    expected_type_header = proto_validator.expected_type_header.value
161*e7b1675dSTing-Kang Chang  expected_issuer = None
162*e7b1675dSTing-Kang Chang  if proto_validator.HasField('expected_issuer'):
163*e7b1675dSTing-Kang Chang    expected_issuer = proto_validator.expected_issuer.value
164*e7b1675dSTing-Kang Chang  expected_audience = None
165*e7b1675dSTing-Kang Chang  if proto_validator.HasField('expected_audience'):
166*e7b1675dSTing-Kang Chang    expected_audience = proto_validator.expected_audience.value
167*e7b1675dSTing-Kang Chang  fixed_now = None
168*e7b1675dSTing-Kang Chang  if proto_validator.HasField('now'):
169*e7b1675dSTing-Kang Chang    fixed_now = _from_timestamp_proto(proto_validator.now)
170*e7b1675dSTing-Kang Chang  clock_skew = None
171*e7b1675dSTing-Kang Chang  if proto_validator.HasField('clock_skew'):
172*e7b1675dSTing-Kang Chang    clock_skew = _from_duration_proto(proto_validator.clock_skew)
173*e7b1675dSTing-Kang Chang  return jwt.new_validator(
174*e7b1675dSTing-Kang Chang      expected_type_header=expected_type_header,
175*e7b1675dSTing-Kang Chang      expected_issuer=expected_issuer,
176*e7b1675dSTing-Kang Chang      expected_audience=expected_audience,
177*e7b1675dSTing-Kang Chang      ignore_type_header=proto_validator.ignore_type_header,
178*e7b1675dSTing-Kang Chang      ignore_issuer=proto_validator.ignore_issuer,
179*e7b1675dSTing-Kang Chang      ignore_audiences=proto_validator.ignore_audience,
180*e7b1675dSTing-Kang Chang      allow_missing_expiration=proto_validator.allow_missing_expiration,
181*e7b1675dSTing-Kang Chang      expect_issued_in_the_past=proto_validator.expect_issued_in_the_past,
182*e7b1675dSTing-Kang Chang      fixed_now=fixed_now,
183*e7b1675dSTing-Kang Chang      clock_skew=clock_skew)
184*e7b1675dSTing-Kang Chang
185*e7b1675dSTing-Kang Chang
186*e7b1675dSTing-Kang Changclass JwtServicer(testing_api_pb2_grpc.JwtServicer):
187*e7b1675dSTing-Kang Chang  """A service for signing and verifying JWTs."""
188*e7b1675dSTing-Kang Chang
189*e7b1675dSTing-Kang Chang  def CreateJwtMac(
190*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.CreationRequest,
191*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
192*e7b1675dSTing-Kang Chang    """Creates a JwtMac without using it."""
193*e7b1675dSTing-Kang Chang    try:
194*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
195*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
196*e7b1675dSTing-Kang Chang      keyset_handle.primitive(jwt.JwtMac)
197*e7b1675dSTing-Kang Chang      return testing_api_pb2.CreationResponse()
198*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
199*e7b1675dSTing-Kang Chang      return testing_api_pb2.CreationResponse(err=str(e))
200*e7b1675dSTing-Kang Chang
201*e7b1675dSTing-Kang Chang  def CreateJwtPublicKeySign(
202*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.CreationRequest,
203*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
204*e7b1675dSTing-Kang Chang    """Creates a JwtPublicKeySign without using it."""
205*e7b1675dSTing-Kang Chang    try:
206*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
207*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
208*e7b1675dSTing-Kang Chang      keyset_handle.primitive(jwt.JwtPublicKeySign)
209*e7b1675dSTing-Kang Chang      return testing_api_pb2.CreationResponse()
210*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
211*e7b1675dSTing-Kang Chang      return testing_api_pb2.CreationResponse(err=str(e))
212*e7b1675dSTing-Kang Chang
213*e7b1675dSTing-Kang Chang  def CreateJwtPublicKeyVerify(
214*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.CreationRequest,
215*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
216*e7b1675dSTing-Kang Chang    """Creates a JwtPublicKeyVerify without using it."""
217*e7b1675dSTing-Kang Chang    try:
218*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
219*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
220*e7b1675dSTing-Kang Chang      keyset_handle.primitive(jwt.JwtPublicKeyVerify)
221*e7b1675dSTing-Kang Chang      return testing_api_pb2.CreationResponse()
222*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
223*e7b1675dSTing-Kang Chang      return testing_api_pb2.CreationResponse(err=str(e))
224*e7b1675dSTing-Kang Chang
225*e7b1675dSTing-Kang Chang  def ComputeMacAndEncode(
226*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.JwtSignRequest,
227*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.JwtSignResponse:
228*e7b1675dSTing-Kang Chang    """Computes a MACed compact JWT."""
229*e7b1675dSTing-Kang Chang    try:
230*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
231*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
232*e7b1675dSTing-Kang Chang      p = keyset_handle.primitive(jwt.JwtMac)
233*e7b1675dSTing-Kang Chang      raw_jwt = raw_jwt_from_proto(request.raw_jwt)
234*e7b1675dSTing-Kang Chang      signed_compact_jwt = p.compute_mac_and_encode(raw_jwt)
235*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtSignResponse(
236*e7b1675dSTing-Kang Chang          signed_compact_jwt=signed_compact_jwt)
237*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
238*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtSignResponse(err=str(e))
239*e7b1675dSTing-Kang Chang
240*e7b1675dSTing-Kang Chang  def VerifyMacAndDecode(
241*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.JwtVerifyRequest,
242*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.JwtVerifyResponse:
243*e7b1675dSTing-Kang Chang    """Verifies a MAC value."""
244*e7b1675dSTing-Kang Chang    try:
245*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
246*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
247*e7b1675dSTing-Kang Chang      validator = validator_from_proto(request.validator)
248*e7b1675dSTing-Kang Chang      p = keyset_handle.primitive(jwt.JwtMac)
249*e7b1675dSTing-Kang Chang      verified_jwt = p.verify_mac_and_decode(request.signed_compact_jwt,
250*e7b1675dSTing-Kang Chang                                             validator)
251*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtVerifyResponse(
252*e7b1675dSTing-Kang Chang          verified_jwt=verifiedjwt_to_proto(verified_jwt))
253*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
254*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtVerifyResponse(err=str(e))
255*e7b1675dSTing-Kang Chang
256*e7b1675dSTing-Kang Chang  def PublicKeySignAndEncode(
257*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.JwtSignRequest,
258*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.JwtSignResponse:
259*e7b1675dSTing-Kang Chang    """Computes a signed compact JWT token."""
260*e7b1675dSTing-Kang Chang    try:
261*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
262*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
263*e7b1675dSTing-Kang Chang      p = keyset_handle.primitive(jwt.JwtPublicKeySign)
264*e7b1675dSTing-Kang Chang      raw_jwt = raw_jwt_from_proto(request.raw_jwt)
265*e7b1675dSTing-Kang Chang      signed_compact_jwt = p.sign_and_encode(raw_jwt)
266*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtSignResponse(
267*e7b1675dSTing-Kang Chang          signed_compact_jwt=signed_compact_jwt)
268*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
269*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtSignResponse(err=str(e))
270*e7b1675dSTing-Kang Chang
271*e7b1675dSTing-Kang Chang  def PublicKeyVerifyAndDecode(
272*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.JwtVerifyRequest,
273*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.JwtVerifyResponse:
274*e7b1675dSTing-Kang Chang    """Verifies the validity of the signed compact JWT token."""
275*e7b1675dSTing-Kang Chang    try:
276*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
277*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
278*e7b1675dSTing-Kang Chang      validator = validator_from_proto(request.validator)
279*e7b1675dSTing-Kang Chang      p = keyset_handle.primitive(jwt.JwtPublicKeyVerify)
280*e7b1675dSTing-Kang Chang      verified_jwt = p.verify_and_decode(request.signed_compact_jwt, validator)
281*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtVerifyResponse(
282*e7b1675dSTing-Kang Chang          verified_jwt=verifiedjwt_to_proto(verified_jwt))
283*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
284*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtVerifyResponse(err=str(e))
285*e7b1675dSTing-Kang Chang
286*e7b1675dSTing-Kang Chang  def ToJwkSet(
287*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.JwtToJwkSetRequest,
288*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.JwtToJwkSetResponse:
289*e7b1675dSTing-Kang Chang    """Converts a Tink Keyset with JWT keys into a JWK set."""
290*e7b1675dSTing-Kang Chang    try:
291*e7b1675dSTing-Kang Chang      keyset_handle = cleartext_keyset_handle.read(
292*e7b1675dSTing-Kang Chang          tink.BinaryKeysetReader(request.keyset))
293*e7b1675dSTing-Kang Chang      jwk_set = jwt.jwk_set_from_public_keyset_handle(keyset_handle)
294*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtToJwkSetResponse(jwk_set=jwk_set)
295*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
296*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtToJwkSetResponse(err=str(e))
297*e7b1675dSTing-Kang Chang
298*e7b1675dSTing-Kang Chang  def FromJwkSet(
299*e7b1675dSTing-Kang Chang      self, request: testing_api_pb2.JwtFromJwkSetRequest,
300*e7b1675dSTing-Kang Chang      context: grpc.ServicerContext) -> testing_api_pb2.JwtFromJwkSetResponse:
301*e7b1675dSTing-Kang Chang    """Converts a JWK set into a Tink Keyset."""
302*e7b1675dSTing-Kang Chang    try:
303*e7b1675dSTing-Kang Chang      keyset_handle = jwt.jwk_set_to_public_keyset_handle(request.jwk_set)
304*e7b1675dSTing-Kang Chang      keyset = io.BytesIO()
305*e7b1675dSTing-Kang Chang      cleartext_keyset_handle.write(
306*e7b1675dSTing-Kang Chang          tink.BinaryKeysetWriter(keyset), keyset_handle)
307*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtFromJwkSetResponse(keyset=keyset.getvalue())
308*e7b1675dSTing-Kang Chang    except tink.TinkError as e:
309*e7b1675dSTing-Kang Chang      return testing_api_pb2.JwtFromJwkSetResponse(err=str(e))
310