1*7dc08ffcSJunyu Lai#!/usr/bin/env python 2*7dc08ffcSJunyu Lai 3*7dc08ffcSJunyu Lai## This file is part of Scapy 4*7dc08ffcSJunyu Lai## This program is published under a GPLv2 license 5*7dc08ffcSJunyu Lai 6*7dc08ffcSJunyu Lai""" 7*7dc08ffcSJunyu LaiTLS server used in unit tests. 8*7dc08ffcSJunyu Lai 9*7dc08ffcSJunyu LaiWhen some expected_data is provided, a TLS client (e.g. openssl s_client) 10*7dc08ffcSJunyu Laishould send some application data after the handshake. If this data matches our 11*7dc08ffcSJunyu Laiexpected_data, then we leave with exit code 0. Else we leave with exit code 1. 12*7dc08ffcSJunyu LaiIf no expected_data was provided and the handshake was ok, we exit with 0. 13*7dc08ffcSJunyu Lai""" 14*7dc08ffcSJunyu Lai 15*7dc08ffcSJunyu Laifrom ast import literal_eval 16*7dc08ffcSJunyu Laiimport os 17*7dc08ffcSJunyu Laiimport sys 18*7dc08ffcSJunyu Laifrom contextlib import contextmanager 19*7dc08ffcSJunyu Laifrom io import BytesIO, StringIO 20*7dc08ffcSJunyu Lai 21*7dc08ffcSJunyu Laifrom scapy.modules import six 22*7dc08ffcSJunyu Lai 23*7dc08ffcSJunyu Laibasedir = os.path.abspath(os.path.join(os.path.dirname(__file__), 24*7dc08ffcSJunyu Lai os.path.pardir, os.path.pardir)) 25*7dc08ffcSJunyu Laisys.path = [basedir] + sys.path 26*7dc08ffcSJunyu Lai 27*7dc08ffcSJunyu Laifrom scapy.layers.tls.automaton_srv import TLSServerAutomaton 28*7dc08ffcSJunyu Lai 29*7dc08ffcSJunyu Lai 30*7dc08ffcSJunyu Lai@contextmanager 31*7dc08ffcSJunyu Laidef captured_output(): 32*7dc08ffcSJunyu Lai new_out, new_err = (StringIO(), StringIO()) if six.PY3 else (BytesIO(), BytesIO()) 33*7dc08ffcSJunyu Lai old_out, old_err = sys.stdout, sys.stderr 34*7dc08ffcSJunyu Lai try: 35*7dc08ffcSJunyu Lai sys.stdout, sys.stderr = new_out, new_err 36*7dc08ffcSJunyu Lai yield sys.stdout, sys.stderr 37*7dc08ffcSJunyu Lai finally: 38*7dc08ffcSJunyu Lai sys.stdout, sys.stderr = old_out, old_err 39*7dc08ffcSJunyu Lai 40*7dc08ffcSJunyu Laidef check_output_for_data(out, err, expected_data): 41*7dc08ffcSJunyu Lai errored = err.getvalue() 42*7dc08ffcSJunyu Lai if errored: 43*7dc08ffcSJunyu Lai return (False, errored) 44*7dc08ffcSJunyu Lai output = out.getvalue().strip() 45*7dc08ffcSJunyu Lai if expected_data: 46*7dc08ffcSJunyu Lai for data in output.split('> Received: ')[1:]: 47*7dc08ffcSJunyu Lai for line in literal_eval(data).split(b'\n'): 48*7dc08ffcSJunyu Lai if line == expected_data: 49*7dc08ffcSJunyu Lai return (True, output) 50*7dc08ffcSJunyu Lai return (False, output) 51*7dc08ffcSJunyu Lai else: 52*7dc08ffcSJunyu Lai return (True, None) 53*7dc08ffcSJunyu Lai 54*7dc08ffcSJunyu Laidef run_tls_test_server(expected_data, q): 55*7dc08ffcSJunyu Lai correct = False 56*7dc08ffcSJunyu Lai with captured_output() as (out, err): 57*7dc08ffcSJunyu Lai # Prepare automaton 58*7dc08ffcSJunyu Lai crt_basedir = os.path.join(basedir, 'test', 'tls', 'pki') 59*7dc08ffcSJunyu Lai t = TLSServerAutomaton(mycert=os.path.join(crt_basedir, 'srv_cert.pem'), 60*7dc08ffcSJunyu Lai mykey=os.path.join(crt_basedir, 'srv_key.pem')) 61*7dc08ffcSJunyu Lai # Sync threads 62*7dc08ffcSJunyu Lai q.put(True) 63*7dc08ffcSJunyu Lai # Run server automaton 64*7dc08ffcSJunyu Lai t.run() 65*7dc08ffcSJunyu Lai # Return correct answer 66*7dc08ffcSJunyu Lai correct, out_e = check_output_for_data(out, err, expected_data) 67*7dc08ffcSJunyu Lai # Return data 68*7dc08ffcSJunyu Lai q.put(out_e) 69*7dc08ffcSJunyu Lai if correct: 70*7dc08ffcSJunyu Lai sys.exit(0) 71*7dc08ffcSJunyu Lai else: 72*7dc08ffcSJunyu Lai sys.exit(1) 73