1#!/usr/bin/env python 2import numpy as np 3import wave 4import struct 5import sys 6from sbc import * 7 8X = np.zeros(shape=(2,80), dtype = np.int16) 9implementation = "SIG" 10 11def fetch_samples_for_next_sbc_frame(fin, frame): 12 raw_data = fin.readframes(frame.nr_blocks * frame.nr_subbands) 13 fmt = "%ih" % (len(raw_data) / 2) 14 data = struct.unpack(fmt, raw_data) 15 16 if frame.nr_channels == 2: 17 for i in range(len(data)/2): 18 frame.pcm[0][i] = data[2*i] 19 frame.pcm[1][i] = data[2*i+1] 20 else: 21 for i in range(len(data)): 22 frame.pcm[0][i] = data[i] 23 24 25def sbc_frame_analysis(frame, ch, blk, C): 26 global X 27 28 M = frame.nr_subbands 29 L = 10 * M 30 M2 = 2*M 31 L2 = 2*L 32 33 Z = np.zeros(L) 34 Y = np.zeros(M2) 35 W = np.zeros(shape=(M, M2)) 36 S = np.zeros(M) 37 38 for i in range(L-1, M-1, -1): 39 X[ch][i] = X[ch][i-M] 40 for i in range(M-1, -1, -1): 41 X[ch][i] = frame.EX[M-1-i] 42 for i in range(L): 43 Z[i] = X[ch][i] * C[i] 44 45 for i in range(M2): 46 for k in range(5): 47 Y[i] += Z[i+k*M2] 48 49 for i in range(M): 50 for k in range(M2): 51 W[i][k] = np.cos((i+0.5)*(k-M/2)*np.pi/M) 52 S[i] += W[i][k] * Y[k] 53 54 for sb in range(M): 55 frame.sb_sample[blk][ch][sb] = S[sb] 56 57 58def sbc_frame_analysis(frame, ch, blk, proto_table): 59 global total_time_ms, implementation 60 61 t1 = time_ms() 62 if implementation == "SIG": 63 sbc_frame_analysis_sig(frame, ch, blk, proto_table) 64 else: 65 print ("Analysis %s not implemented" % implementation) 66 exit(1) 67 68 t2 = time_ms() 69 total_time_ms += t2-t1 70 71 72def sbc_analysis(frame): 73 if frame.nr_subbands == 4: 74 C = Proto_4_40 75 elif frame.nr_subbands == 8: 76 C = Proto_8_80 77 else: 78 return -1 79 80 frame.sb_sample = np.ndarray(shape=(frame.nr_blocks, frame.nr_channels, frame.nr_subbands)) 81 for ch in range(frame.nr_channels): 82 index = 0 83 for blk in range(frame.nr_blocks): 84 for sb in range(frame.nr_subbands): 85 frame.EX[sb] = frame.pcm[ch][index] 86 index+=1 87 sbc_frame_analysis(frame, ch, blk, C) 88 return 0 89 90def sbc_encode(frame, force_channel_mode): 91 err = sbc_analysis(frame) 92 if err >= 0: 93 err = sbc_quantization(frame, force_channel_mode) 94 return err 95 96def sbc_quantization(frame, force_channel_mode): 97 calculate_channel_mode_and_scale_factors(frame, force_channel_mode) 98 frame.bits = sbc_bit_allocation(frame) 99 100 # Reconstruct the Audio Samples 101 frame.levels = np.zeros(shape=(frame.nr_channels, frame.nr_subbands), dtype = np.int32) 102 for ch in range(frame.nr_channels): 103 for sb in range(frame.nr_subbands): 104 frame.levels[ch][sb] = (1 << frame.bits[ch][sb]) - 1 #pow(2.0, frame.bits[ch][sb]) - 1 105 106 frame.syncword = 0x9c 107 frame.crc_check = calculate_crc(frame) 108 109 110 for blk in range(frame.nr_blocks): 111 for ch in range(frame.nr_channels): 112 for sb in range(frame.nr_subbands): 113 if frame.levels[ch][sb] > 0: 114 SB = frame.sb_sample[blk][ch][sb] 115 L = frame.levels[ch][sb] 116 SF = frame.scalefactor[ch][sb] 117 frame.audio_sample[blk][ch][sb] = np.uint16(((SB * L / SF + L) - 1.0)/2.0) 118 else: 119 frame.audio_sample[blk][ch][sb] = 0 120 121 return 0 122 123def sbc_write_frame(fout, sbc_encoder_frame): 124 stream = frame_to_bitstream(sbc_encoder_frame) 125 barray = bytearray(stream) 126 fout.write(barray) 127 128if __name__ == "__main__": 129 usage = ''' 130 Usage: ./sbc_encoder.py input.wav blocks subbands bitpool allocation_method[0-LOUDNESS,1-SNR] force_channel_mode[2-STEREO,3-JOINT_STEREO] 131 Example: ./sbc_encoder.py fanfare.wav 16 4 31 0 132 ''' 133 nr_blocks = 0 134 nr_subbands = 0 135 136 137 if (len(sys.argv) < 6): 138 print(usage) 139 sys.exit(1) 140 try: 141 infile = sys.argv[1] 142 if not infile.endswith('.wav'): 143 print(usage) 144 sys.exit(1) 145 sbcfile = infile.replace('.wav', '-encoded.sbc') 146 147 nr_blocks = int(sys.argv[2]) 148 nr_subbands = int(sys.argv[3]) 149 bitpool = int(sys.argv[4]) 150 allocation_method = int(sys.argv[5]) 151 force_channel_mode = 0 152 if len(sys.argv) == 6: 153 force_channel_mode = int(sys.argv[6]) 154 if force_channel_mode != 2 or force_channel_mode != 3: 155 print(usage) 156 sys.exit(1) 157 158 fin = wave.open(infile, 'rb') 159 nr_channels = fin.getnchannels() 160 sampling_frequency = fin.getframerate() 161 nr_audio_frames = fin.getnframes() 162 163 subband_frame_count = 0 164 audio_frame_count = 0 165 nr_samples = nr_blocks * nr_subbands 166 fout = open(sbcfile, 'wb') 167 while audio_frame_count < nr_audio_frames: 168 if subband_frame_count % 200 == 0: 169 print("== Frame %d ==" % (subband_frame_count)) 170 171 sbc_encoder_frame = SBCFrame(nr_blocks, nr_subbands, nr_channels, bitpool, sampling_frequency, allocation_method) 172 fetch_samples_for_next_sbc_frame(fin, sbc_encoder_frame) 173 174 sbc_encode(sbc_encoder_frame, force_channel_mode) 175 sbc_write_frame(fout, sbc_encoder_frame) 176 177 audio_frame_count += nr_samples 178 subband_frame_count += 1 179 180 fin.close() 181 fout.close() 182 print("DONE, WAV file %s encoded into SBC file %s " % (infile, sbcfile)) 183 if frame_count > 0: 184 print ("Average analysis time per frame: %d ms/frame" % (total_time_ms/frame_count)) 185 else: 186 print ("No frame found") 187 188 189 except IOError as e: 190 print(usage) 191 sys.exit(1) 192 193 194 195 196 197