开发者

Python实现音频添加数字水印的示例详解

目录
  • 方法一:LSB (最低有效位) 水印
  • 方法二:频域水印 (DCT变换)
  • 方法三:扩频水印
  • 注意事项
  • 扩展建议

数字水印技术可以将隐藏信息嵌入到音频文件中而不明显影响音频质量。下面我将介绍几种在python中实现音频数字水印的方法。

方法一:LSB (最低有效位) 水印

import numpy as np
from scipy.io import wavfile
def embed_watermark_lsb(audio_path, watermark, output_path):
    # 读取音频文件
    sample_rate, audio_data = wavfile.read(audio_path)    
    # 确保是立体声,如果是单声道则转换为立体声
    if len(audio_data.shape) == 1:
        audio_data = np.column_stack((audio_data, audio_data))   
    # 将水印转换为二进制
    watermark_bin = ''.join(format(ord(c), '08b') for c in watermark)
    watermark_bin += '00000000' # 添加结束标记   
    # 检查水印是否适合音频
    if len(watermark_bin) > audio_data.size:
        raise ValueError("水印太大,无法嵌入到音频中")  
    # 嵌入水印到最低有效位
    watermark_index = 0
    for i in range(len(audio_data)):
        for j in range(len(audio_data[i])):
            if watermark_index < len(watermark_bin):
                # 替换最低有效位
                audio_data[i][j] = (audio_data[i][j] & 0xFE) | int(watermark_bin[watermark_index])
                watermark_index += 1
            else:
                break   
    # 保存带水印的音频
    wavfile.write(output_path, sample_rate, audio_data)
def extract_watermark_lsb(audio_path, watermark_length):
    # 读取音频文件
    sample_rate, audio_data = wavfile.read(audio_path)    
    # 提取最低有效位
    watermark_bits = []
    for i in range(len(audio_data)):
        for j in range(len(audio_data[i])):
            watermark_bits.append(str(audio_data[i][jwww.devze.com] & 1))   
    # 将比特转换为字节
    watermark = ''
    for i in range(0, len(watermark_bits), 8):
        byte = ''.join(watermark_bits[i:i+8])
        if byte == '00000000': # 遇到结束标记
            break
        watermark += chr(int(byte, 2))   
    return watermark[:watermark_length]
# 使用示例
embed_watermark_lsb('original.wav', '秘密消息', 'watermarked.wav')
extracted = extract_watermark_lsb('watermarked.wav', 4)
print("提取的水印:", extracted)

方法二:频域水印 (DCT变换)

import numpy as np
from scipy.fftpack import dct, idct
from scipy.io import wavfile
def embed_watermark_dct(audio_path, watermark, output_path, alpha=0.01):
    # 读取音频
    sample_rate, audio_data = wavfile.read(audio_path)  
    # 如果是立体声,只使用一个声道
    if len(audio_data.shape) > 1:
        audio_data = audio_data[:, 0]   
    # 将水印转换为二进制
    watermark_bin = ''.join(format(ord(c), '08b') for c in watermark)
    watermark_bin = [int(b) for b in watermark_bin]    
    # 分段处理音频
    segment_size = 1024
    num_segments = len(audio_data) // segment_size
    watermark_length = len(watermark_bin)   
    if num_segments < watermark_length:
        raise ValueError("音频太短,无法嵌入水印")   
    # 嵌入水印
    watermarked_audio = np.copy(audio_data)
    for i in range(watermark_length):
        start = i * segment_size
        end = start + segment_size    js    
        segment = audio_data[start:end]
        dct_coeffs = dct(segment, norm='ortho')       
        # 修改中频系数嵌入水印
        coeff_index = 100 # 选择一个中频系数
        if watermark_bin[i] == 1:
            dct_coeffs[coeff_index] += alpha * np.abs(dct_coeffs[coeff_index])
        else:
            dct_coeffs[coeff_index] -= alpha * np.abs(dct_coeffs[coeff_index])      
        # 逆DCT变换
        watermarked_segment = idct(dct_coeffs, norm='ortho')
        watermarked_audio[start:end] = watermarked_segment   
    # 保存带水印的音频
    wavfile.write(output_path, sample_rate, watermarked_audio.astype(np.int16))
def extract_watermark_dct(audio_path, original_path, watermark_length):
    #www.devze.com 读取带水印音频和原始音频
    sample_rate, watermarked = wavfile.read(audio_path)
    _, original = wavfile.read(original_path)    
    # 如果是立体声,只使用一个声道
    if len(watermarked.shape) > 1:
        watermarked = watermarked[:, 0]
        original = original[:, 0]   
    segment_size = 1024
    watermark_bits = []    
    for i in range(watermark_length):
        start = i * segment_size
        end = start + segment_size       
        wm_segment = watermarked[start:end]
        orig_segment = original[start:end]        
        wm_dct = dct(wm_segment, norm='ortho')
        orig_dct = dct(orig_segment, norm='ortho')        
        coeff_index = 100
        if wm_dct[coeff_index] > orig_dct[coeff_index]:
            watermark_bits.append('1')
        else:
            watermark_bits.append('0')    
    # 将比特转换为字符串
    watermark = ''
    for i in range(0, len(watermark_bits), 8):
        byte = ''.join(watermark_bits[i:i+8])
        watermark += chr(int(byte, 2))  
    return watermark
# 使用示例
embed_watermark_dct('original.wav', '秘密', 'watermarked_dct.wav', 0.02)
extracted = extract_watermark_dct('watermarked_dct.wav', 'original.wav', 16)
print("提取的水印:", extracted)

方法三:扩频水印

import numpy as np
from scipy.io import wavfile
def generate_pn_sequence(length, seed=42):
    np.random.seed(seed)
    return np.random.choice([-1, 1], size=length)
def embed_watermark_spread_spectrum(audio_path, watermark, output_path, alpha=0.01):
    # 读取音频
    sample_rate, audio_data = wavfile.read(audio_path)    
    # 如果是立体声,只使用一个声道
    if len(audi编程o_data.shape) > 1:
        audio_data = audio_data[:, 0]    
    # 将水印转换为二进制
    watermark_bin = ''.join(format(ord(c), '08b') for c in watermark)
    watermark_bits = np.array([int(b) for b in watermark_bin])
    watermark_bits = 2 * watermark_bits - 1 # 转换为1   
    # 生成伪随机序列
    pn_length = len(audio_data) // len(watermark_bits)
    pn_sequence = generate_pn_sequence(pn_length)   
    # 创建扩频水印
    spread_watermark = np.repeat(watermark_bits, pn_length)
    spread_watermark = spread_watermark[:len(audio_data)] * pn_sequence[:len(audio_data)]   
    # 嵌入水印
    watermarked_audio = audio_data + alpha * spread_watermark * np.abs(audio_data)
    watermarked_audio = np.clip(watermarked_audio, -32768, 32767) # 确保在16位范围内 
    # 保存带水印的音频
    wavfile.write(output_path, sample_rate, watermarked_audio.astype(np.int16))
def extract_watermark_spread_spectrum(audio_path, original_path, watermark_length, pn_length):
    # 读取音频
    sample_rate, watermarked = wavfile.read(audio_path)
    _, original = wavfile.read(original_path)   
    # 如果是立体声,只使用一个声道
    if len(watermarked.shape) > 1:
        watermarked = watermarked[:, 0]
        original = original[:, 0]    
    # 计算差异
    diff = watermarked - original   
    # 生成相同的伪随机序列
    num_bits = watermark_length * 8
    pn_sequence = generate_pn_sequence(pn_length)   
    extracted_bits = []
    for i in range(num_bits):
        start = i * pn_length
        end = start + pn_length       
        segment_diff = diff[start:end]
        segment_pn = pn_sequence[:len(segment_diff)]        
        correlation = np.sum(segment_diff * segment_pn)
        extracted_bits.append('1' if correlation > 0 else '0')    
    # 将比特转换为字符串
    watermark = ''
    for i in range(0, len(extracted_bits), 8):
        byte = ''.join(extracted_bits[i:i+8])
        watermark += chr(int(byte, 2))    
    return watermark
# 使用示例
embed_watermark_spread_spectrum('original.wav', '秘密', 'watermarked_ss.wav', 0.01)
extracted = extract_watermark_spread_spectrum('watermarked_ss.wav', 'original.wav', 2, 1000)
print("提取的水印:", extracted)

注意事项

1. **音频质量**:水印嵌入会影响音频质量,需要平衡水印强度和音频质量。

2. **鲁棒性**:不同方法对音频处理的抵抗能力不同:

- LSB方法脆弱但容量大

- DCT方法对压缩有一定抵抗能力

- 扩频方法鲁棒性最强但容量小

3. **安全性**:可以考虑加密水印内容提高安全性

4. **格式支持**:示例中使用WAV格式,因编程客栈其是无损格式,其他格式可能需要先解码

扩展建议

1. 添加错误校正码提高水印提取的可靠性

2. 实现盲水印提取(不需要原始音频)

3. 添加同步信号提高对裁剪、时间拉伸的抵抗能力

4. 结合多种技术提高水印的鲁棒性和隐蔽

这些方法可以根据具体需求进行调整和组合,以实现不同场景下的音频数字水印需求。

以上就是Python实现音频添加数字水印的示例详解的详细内容,更多关于Python音频添加数字水印的资料请关注编程客栈(www.devze.com)其它相关文章!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜