xref: /btstack/test/sbc/sbc_encoder.py (revision 1522543de84c04e141f181431af0e80f00b6b718)
1#!/usr/bin/env python
2import numpy as np
3import wave
4import struct
5import sys
6from sbc import *
7
8X = np.zeros(80, dtype = np.int16)
9
10
11def fetch_samples_for_next_sbc_frame(fin, frame):
12    nr_samples = frame.nr_blocks * frame.nr_subbands * frame.nr_channels
13    raw_data = fin.readframes(nr_samples) # Returns byte data
14    len_raw_data =  len(raw_data) / 2
15
16    fmt = "%ih" % len_raw_data # read signed 2 byte shorts
17
18    data = struct.unpack(fmt, raw_data)
19    len_data = len(data)
20
21    for i in range(frame.nr_blocks * frame.nr_subbands):
22        for ch in range(frame.nr_channels):
23            index = i*2 + ch
24            if index < len_data:
25                frame.pcm[ch][i] = data[i*2 + ch]
26            else:
27                frame.pcm[ch][i] = 0
28
29
30def sbc_frame_analysis(frame, ch, blk, C):
31    global X
32
33    M = frame.nr_subbands
34    L = 10 * M
35    M2 = 2*M
36    L2 = 2*L
37
38    Z = np.zeros(L)
39    Y = np.zeros(M2)
40    W = np.zeros(shape=(M, M2))
41    S = np.zeros(M)
42
43    for i in range(L-1, M-1, -1):
44        X[i] = X[i-M]
45    for i in range(M-1, -1, -1):
46        X[i] = frame.EX[M-1-i]
47
48    for i in range(L):
49        Z[i] = X[i] * C[i]
50
51    for i in range(M2):
52        for k in range(5):
53            Y[i] += Z[i+k*M2]
54
55    for i in range(M):
56        for k in range(M2):
57            W[i][k] = np.cos((i+0.5)*(k-M/2)*np.pi/M)
58            S[i] += W[i][k] * Y[k]
59
60    for sb in range(M):
61        frame.sb_sample[blk][ch][sb] = S[sb]
62
63def sbc_analysis(frame):
64    if frame.nr_subbands == 4:
65        C = Proto_4_40
66    elif frame.nr_subbands == 8:
67        C = Proto_8_80
68    else:
69        return -1
70
71    frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands))
72    index = 0
73    for ch in range(frame.nr_channels):
74        for blk in range(frame.nr_blocks):
75            for sb in range(frame.nr_subbands):
76                frame.EX[sb] = np.int16(frame.pcm[ch][index])
77                index+=1
78            sbc_frame_analysis(frame, ch, blk, C)
79    return 0
80
81def sbc_encode(frame):
82    err = sbc_analysis(frame)
83    if err >= 0:
84        err = sbc_quantization(frame)
85    return err
86
87def calculate_joint_stereo_signal(frame):
88    sb_sample = np.zeros(shape = (frame.nr_blocks,frame.nr_channels,frame.nr_subbands), dtype = np.uint32)
89    scale_factor = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
90    scalefactor = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
91
92    for sb in range(frame.nr_subbands-1):
93        for blk in range(frame.nr_blocks):
94             sb_sample[blk][0][sb] = (frame.sb_sample_f[blk][0][sb] +  frame.sb_sample_f[blk][1][sb]) >> 1
95             sb_sample[blk][1][sb] = (frame.sb_sample_f[blk][0][sb] -  frame.sb_sample_f[blk][1][sb]) >> 1
96
97    for ch in range(frame.nr_channels):
98        for sb in range(frame.nr_subbands-1):
99            frame.scale_factor[ch][sb] = 0
100            frame.scalefactor[ch][sb] = 2
101            for blk in range(frame.nr_blocks):
102                while frame.scalefactor[ch][sb] < abs(frame.sb_sample[blk][ch][sb]):
103                    frame.scale_factor[ch][sb]+=1
104                    frame.scalefactor[ch][sb] *= 2
105
106    for sb in range(frame.nr_subbands-1):
107        if (frame.scalefactor[0][sb] + frame.scalefactor[1][sb]) > (scalefactor[0][sb] + scalefactor[1][sb]):
108            frame.join[sb] = 1
109            frame.scale_factor[0][sb] = scale_factor[0][sb]
110            frame.scale_factor[1][sb] = scale_factor[1][sb]
111            frame.scalefactor[0][sb]  = scalefactor[0][sb]
112            frame.scalefactor[1][sb]  = scalefactor[1][sb]
113            for blk in range(frame.nr_blocks):
114                frame.sb_sample[blk][0][sb] = sb_sample[blk][0][sb]
115                frame.sb_sample[blk][1][sb] = sb_sample[blk][1][sb]
116
117
118def sbc_quantization(frame):
119    calculate_scalefactors_and_channel_mode(frame)
120    frame.bits = sbc_bit_allocation(frame)
121
122    # Reconstruct the Audio Samples
123    frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32)
124    for ch in range(frame.nr_channels):
125        for sb in range(frame.nr_subbands):
126            frame.levels[ch][sb] = (1 << frame.bits[ch][sb]) - 1 #pow(2.0, frame.bits[ch][sb]) - 1
127
128    frame.syncword = 156
129    frame.crc_check = calculate_crc(frame)
130
131    frame.join = np.zeros(frame.nr_subbands, dtype = np.uint8)
132
133
134    if frame.channel_mode == JOINT_STEREO:
135        calculate_joint_stereo_signal(frame)
136
137    for blk in range(frame.nr_blocks):
138        for ch in range(frame.nr_channels):
139            for sb in range(frame.nr_subbands):
140                if frame.levels[ch][sb] > 0:
141                    SB = frame.sb_sample[blk][ch][sb]
142                    L  = frame.levels[ch][sb]
143                    SF = frame.scalefactor[ch][sb]
144                    frame.audio_sample[blk][ch][sb] = np.uint16(((SB * L / SF    + L) - 1.0)/2.0)
145                else:
146                    frame.audio_sample[blk][ch][sb] = 0
147
148    return 0
149
150def sbc_write_frame(fout, sbc_encoder_frame):
151    stream = frame_to_bitstream(sbc_encoder_frame)
152    barray = bytearray(stream)
153    fout.write(barray)
154
155if __name__ == "__main__":
156    usage = '''
157    Usage:      ./sbc_encoder.py input.wav blocks subbands bitpool allocation_method[0-LOUDNESS,1-SNR]
158    Example:    ./sbc_encoder.py fanfare.wav 16 4 31 0
159    '''
160    nr_blocks = 0
161    nr_subbands = 0
162
163
164    if (len(sys.argv) < 6):
165        print(usage)
166        sys.exit(1)
167    try:
168        infile = sys.argv[1]
169        if not infile.endswith('.wav'):
170            print(usage)
171            sys.exit(1)
172        sbcfile = infile.replace('.wav', '-encoded.sbc')
173
174        nr_blocks = int(sys.argv[2])
175        nr_subbands = int(sys.argv[3])
176        bitpool = int(sys.argv[4])
177        allocation_method = int(sys.argv[5])
178
179        fin = wave.open(infile, 'rb')
180        nr_channels = fin.getnchannels()
181        sampling_frequency = fin.getframerate()
182        nr_audio_frames = fin.getnframes()
183
184        subband_frame_count = 0
185        audio_frame_count = 0
186        nr_samples = nr_blocks * nr_subbands
187        fout = open(sbcfile, 'wb')
188        while audio_frame_count < nr_audio_frames:
189            if subband_frame_count % 200 == 0:
190                print("== Frame %d ==" % (subband_frame_count))
191
192            sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency, allocation_method)
193            fetch_samples_for_next_sbc_frame(fin, sbc_encoder_frame)
194
195            sbc_encode(sbc_encoder_frame)
196            sbc_write_frame(fout, sbc_encoder_frame)
197
198            audio_frame_count += nr_samples
199            subband_frame_count += 1
200
201        fin.close()
202        fout.close()
203        print("DONE, WAV file %s encoded into SBC file %s " % (infile, sbcfile))
204
205
206    except IOError as e:
207        print(usage)
208        sys.exit(1)
209
210
211
212
213
214