xref: /btstack/test/sbc/sbc_encoder.py (revision 1bd8157e8c1d5bbfe83038c2ea9e6b99d35fcb36)
1#!/usr/bin/env python
2import numpy as np
3import wave
4import struct
5import sys
6from sbc import *
7
8X = np.zeros(shape=(2,80), dtype = np.int16)
9implementation = "SIG"
10
11def fetch_samples_for_next_sbc_frame(fin, frame):
12    raw_data = fin.readframes(frame.nr_blocks * frame.nr_subbands)
13    fmt = "%ih" % (len(raw_data) / 2)
14    data = struct.unpack(fmt, raw_data)
15
16    if frame.nr_channels == 2:
17        for i in range(len(data)/2):
18            frame.pcm[0][i] = data[2*i]
19            frame.pcm[1][i] = data[2*i+1]
20    else:
21        for i in range(len(data)):
22            frame.pcm[0][i] = data[i]
23
24
25def sbc_frame_analysis(frame, ch, blk, C):
26    global X
27
28    M = frame.nr_subbands
29    L = 10 * M
30    M2 = 2*M
31    L2 = 2*L
32
33    Z = np.zeros(L)
34    Y = np.zeros(M2)
35    W = np.zeros(shape=(M, M2))
36    S = np.zeros(M)
37
38    for i in range(L-1, M-1, -1):
39        X[ch][i] = X[ch][i-M]
40    for i in range(M-1, -1, -1):
41        X[ch][i] = frame.EX[M-1-i]
42    for i in range(L):
43        Z[i] = X[ch][i] * C[i]
44
45    for i in range(M2):
46        for k in range(5):
47            Y[i] += Z[i+k*M2]
48
49    for i in range(M):
50        for k in range(M2):
51            W[i][k] = np.cos((i+0.5)*(k-M/2)*np.pi/M)
52            S[i] += W[i][k] * Y[k]
53
54    for sb in range(M):
55        frame.sb_sample[blk][ch][sb] = S[sb]
56
57
58def sbc_frame_analysis(frame, ch, blk, proto_table):
59    global total_time_ms, implementation
60
61    t1 = time_ms()
62    if implementation == "SIG":
63         sbc_frame_analysis_sig(frame, ch, blk, proto_table)
64    else:
65        print ("Analysis %s not implemented" % implementation)
66        exit(1)
67
68    t2 = time_ms()
69    total_time_ms += t2-t1
70
71
72def sbc_analysis(frame):
73    if frame.nr_subbands == 4:
74        C = Proto_4_40
75    elif frame.nr_subbands == 8:
76        C = Proto_8_80
77    else:
78        return -1
79
80    frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
81    for ch in range(frame.nr_channels):
82        index = 0
83        for blk in range(frame.nr_blocks):
84            for sb in range(frame.nr_subbands):
85                frame.EX[sb] = frame.pcm[ch][index]
86                index+=1
87            sbc_frame_analysis(frame, ch, blk, C)
88    return 0
89
90def sbc_encode(frame, force_channel_mode):
91    err = sbc_analysis(frame)
92    if err >= 0:
93        err = sbc_quantization(frame, force_channel_mode)
94    return err
95
96def sbc_quantization(frame, force_channel_mode):
97    calculate_channel_mode_and_scale_factors(frame, force_channel_mode)
98    frame.bits = sbc_bit_allocation(frame)
99
100    # Reconstruct the Audio Samples
101    frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
102    for ch in range(frame.nr_channels):
103        for sb in range(frame.nr_subbands):
104            frame.levels[ch][sb] = (1 << frame.bits[ch][sb]) - 1 #pow(2.0, frame.bits[ch][sb]) - 1
105
106    frame.syncword = 0x9c
107    frame.crc_check = calculate_crc(frame)
108
109
110    for blk in range(frame.nr_blocks):
111        for ch in range(frame.nr_channels):
112            for sb in range(frame.nr_subbands):
113                if frame.levels[ch][sb] > 0:
114                    SB = frame.sb_sample[blk][ch][sb]
115                    L  = frame.levels[ch][sb]
116                    SF = frame.scalefactor[ch][sb]
117                    frame.audio_sample[blk][ch][sb] = np.uint16(((SB * L / SF + L) - 1.0)/2.0)
118                else:
119                    frame.audio_sample[blk][ch][sb] = 0
120
121    return 0
122
123def sbc_write_frame(fout, sbc_encoder_frame):
124    stream = frame_to_bitstream(sbc_encoder_frame)
125    barray = bytearray(stream)
126    fout.write(barray)
127
128if __name__ == "__main__":
129    usage = '''
130    Usage:      ./sbc_encoder.py input.wav blocks subbands bitpool allocation_method[0-LOUDNESS,1-SNR] force_channel_mode[2-STEREO,3-JOINT_STEREO]
131    Example:    ./sbc_encoder.py fanfare.wav 16 4 31 0
132    '''
133    nr_blocks = 0
134    nr_subbands = 0
135
136
137    if (len(sys.argv) < 6):
138        print(usage)
139        sys.exit(1)
140    try:
141        infile = sys.argv[1]
142        if not infile.endswith('.wav'):
143            print(usage)
144            sys.exit(1)
145        sbcfile = infile.replace('.wav', '-encoded.sbc')
146
147        nr_blocks = int(sys.argv[2])
148        nr_subbands = int(sys.argv[3])
149        bitpool = int(sys.argv[4])
150        allocation_method = int(sys.argv[5])
151        force_channel_mode = 0
152        if len(sys.argv) == 6:
153            force_channel_mode = int(sys.argv[6])
154            if force_channel_mode != 2 or force_channel_mode != 3:
155                print(usage)
156                sys.exit(1)
157
158        fin = wave.open(infile, 'rb')
159        nr_channels = fin.getnchannels()
160        sampling_frequency = fin.getframerate()
161        nr_audio_frames = fin.getnframes()
162
163        subband_frame_count = 0
164        audio_frame_count = 0
165        nr_samples = nr_blocks * nr_subbands
166        fout = open(sbcfile, 'wb')
167        while audio_frame_count < nr_audio_frames:
168            if subband_frame_count % 200 == 0:
169                print("== Frame %d ==" % (subband_frame_count))
170
171            sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency, allocation_method)
172            fetch_samples_for_next_sbc_frame(fin, sbc_encoder_frame)
173
174            sbc_encode(sbc_encoder_frame, force_channel_mode)
175            sbc_write_frame(fout, sbc_encoder_frame)
176
177            audio_frame_count += nr_samples
178            subband_frame_count += 1
179
180        fin.close()
181        fout.close()
182        print("DONE, WAV file %s encoded into SBC file %s " % (infile, sbcfile))
183        if frame_count > 0:
184            print ("Average analysis time per frame: %d ms/frame" % (total_time_ms/frame_count))
185        else:
186            print ("No frame found")
187
188
189    except IOError as e:
190        print(usage)
191        sys.exit(1)
192
193
194
195
196
197