xref: /aosp_15_r20/external/scapy/test/tls/travis_test_server.py (revision 7dc08ffc4802948ccbc861daaf1e81c405c2c4bd)
1*7dc08ffcSJunyu Lai#!/usr/bin/env python
2*7dc08ffcSJunyu Lai
3*7dc08ffcSJunyu Lai## This file is part of Scapy
4*7dc08ffcSJunyu Lai## This program is published under a GPLv2 license
5*7dc08ffcSJunyu Lai
6*7dc08ffcSJunyu Lai"""
7*7dc08ffcSJunyu LaiTLS server used in unit tests.
8*7dc08ffcSJunyu Lai
9*7dc08ffcSJunyu LaiWhen some expected_data is provided, a TLS client (e.g. openssl s_client)
10*7dc08ffcSJunyu Laishould send some application data after the handshake. If this data matches our
11*7dc08ffcSJunyu Laiexpected_data, then we leave with exit code 0. Else we leave with exit code 1.
12*7dc08ffcSJunyu LaiIf no expected_data was provided and the handshake was ok, we exit with 0.
13*7dc08ffcSJunyu Lai"""
14*7dc08ffcSJunyu Lai
15*7dc08ffcSJunyu Laifrom ast import literal_eval
16*7dc08ffcSJunyu Laiimport os
17*7dc08ffcSJunyu Laiimport sys
18*7dc08ffcSJunyu Laifrom contextlib import contextmanager
19*7dc08ffcSJunyu Laifrom io import BytesIO, StringIO
20*7dc08ffcSJunyu Lai
21*7dc08ffcSJunyu Laifrom scapy.modules import six
22*7dc08ffcSJunyu Lai
23*7dc08ffcSJunyu Laibasedir = os.path.abspath(os.path.join(os.path.dirname(__file__),
24*7dc08ffcSJunyu Lai                                       os.path.pardir, os.path.pardir))
25*7dc08ffcSJunyu Laisys.path = [basedir] + sys.path
26*7dc08ffcSJunyu Lai
27*7dc08ffcSJunyu Laifrom scapy.layers.tls.automaton_srv import TLSServerAutomaton
28*7dc08ffcSJunyu Lai
29*7dc08ffcSJunyu Lai
30*7dc08ffcSJunyu Lai@contextmanager
31*7dc08ffcSJunyu Laidef captured_output():
32*7dc08ffcSJunyu Lai    new_out, new_err = (StringIO(), StringIO()) if six.PY3 else (BytesIO(), BytesIO())
33*7dc08ffcSJunyu Lai    old_out, old_err = sys.stdout, sys.stderr
34*7dc08ffcSJunyu Lai    try:
35*7dc08ffcSJunyu Lai        sys.stdout, sys.stderr = new_out, new_err
36*7dc08ffcSJunyu Lai        yield sys.stdout, sys.stderr
37*7dc08ffcSJunyu Lai    finally:
38*7dc08ffcSJunyu Lai        sys.stdout, sys.stderr = old_out, old_err
39*7dc08ffcSJunyu Lai
40*7dc08ffcSJunyu Laidef check_output_for_data(out, err, expected_data):
41*7dc08ffcSJunyu Lai    errored = err.getvalue()
42*7dc08ffcSJunyu Lai    if errored:
43*7dc08ffcSJunyu Lai        return (False, errored)
44*7dc08ffcSJunyu Lai    output = out.getvalue().strip()
45*7dc08ffcSJunyu Lai    if expected_data:
46*7dc08ffcSJunyu Lai        for data in output.split('> Received: ')[1:]:
47*7dc08ffcSJunyu Lai            for line in literal_eval(data).split(b'\n'):
48*7dc08ffcSJunyu Lai                if line == expected_data:
49*7dc08ffcSJunyu Lai                    return (True, output)
50*7dc08ffcSJunyu Lai        return (False, output)
51*7dc08ffcSJunyu Lai    else:
52*7dc08ffcSJunyu Lai        return (True, None)
53*7dc08ffcSJunyu Lai
54*7dc08ffcSJunyu Laidef run_tls_test_server(expected_data, q):
55*7dc08ffcSJunyu Lai    correct = False
56*7dc08ffcSJunyu Lai    with captured_output() as (out, err):
57*7dc08ffcSJunyu Lai        # Prepare automaton
58*7dc08ffcSJunyu Lai        crt_basedir = os.path.join(basedir, 'test', 'tls', 'pki')
59*7dc08ffcSJunyu Lai        t = TLSServerAutomaton(mycert=os.path.join(crt_basedir, 'srv_cert.pem'),
60*7dc08ffcSJunyu Lai                               mykey=os.path.join(crt_basedir, 'srv_key.pem'))
61*7dc08ffcSJunyu Lai        # Sync threads
62*7dc08ffcSJunyu Lai        q.put(True)
63*7dc08ffcSJunyu Lai        # Run server automaton
64*7dc08ffcSJunyu Lai        t.run()
65*7dc08ffcSJunyu Lai        # Return correct answer
66*7dc08ffcSJunyu Lai        correct, out_e = check_output_for_data(out, err, expected_data)
67*7dc08ffcSJunyu Lai    # Return data
68*7dc08ffcSJunyu Lai    q.put(out_e)
69*7dc08ffcSJunyu Lai    if correct:
70*7dc08ffcSJunyu Lai        sys.exit(0)
71*7dc08ffcSJunyu Lai    else:
72*7dc08ffcSJunyu Lai        sys.exit(1)
73