Skip to content

Speech Enhancement

Noise estimation and single-channel speech enhancement algorithms.

Noise estimation

v_estnoiseg

V_ESTNOISEG - Estimate MMSE noise spectrum (Gerkmann & Hendriks).

v_estnoiseg

v_estnoiseg(yf, tz, pp=None) -> tuple[ndarray, dict]

Estimate noise spectrum using MMSE method.

Parameters:

Name Type Description Default
yf ndarray

Input power spectra (one row per frame).

required
tz float or dict

Frame increment in seconds, or state from previous call.

required
pp dict

Algorithm parameters.

None

Returns:

Name Type Description
x ndarray

Estimated noise power spectra (one row per frame).

zo dict

Output state for subsequent calls.

Source code in pyvoicebox/v_estnoiseg.py
def v_estnoiseg(yf, tz, pp=None) -> tuple[np.ndarray, dict]:
    """Estimate noise spectrum using MMSE method.

    Parameters
    ----------
    yf : ndarray
        Input power spectra (one row per frame).
    tz : float or dict
        Frame increment in seconds, or state from previous call.
    pp : dict, optional
        Algorithm parameters.

    Returns
    -------
    x : ndarray
        Estimated noise power spectra (one row per frame).
    zo : dict
        Output state for subsequent calls.
    """
    yf = np.asarray(yf, dtype=float)
    if yf.ndim == 1:
        yf = yf.reshape(1, -1)
    nr, nrf = yf.shape
    x = np.zeros((nr, nrf))

    if nr == 0 and isinstance(tz, dict):
        return x, tz

    if isinstance(tz, dict):
        nrcum = tz['nrcum']
        xt = tz['xt']
        pslp = tz['pslp']
        tinc = tz['tinc']
        qq = tz['qq']
    else:
        tinc = tz
        nrcum = 0
        qq = {
            'tax': 0.0717,
            'tap': 0.152,
            'psthr': 0.99,
            'pnsaf': 0.01,
            'pspri': 0.5,
            'asnr': 15,
            'psini': 0.5,
            'tavini': 0.064,
        }
        if pp is not None:
            for key in qq:
                if key in pp:
                    qq[key] = pp[key]
        pslp = np.full(nrf, qq['psini'])
        xt = None

    # Unpack parameters
    psthr = qq['psthr']
    pnsaf = qq['pnsaf']

    # Derived constants
    ax = np.exp(-tinc / qq['tax'])
    axc = 1.0 - ax
    ap = np.exp(-tinc / qq['tap'])
    apc = 1.0 - ap
    xih1 = 10.0 ** (qq['asnr'] / 10.0)
    xih1r = 1.0 / (1.0 + xih1) - 1.0
    pfac = (1.0 / qq['pspri'] - 1.0) * (1.0 + xih1)

    if nrcum == 0 and nr > 0:
        nini = max(1, min(nr, round(1 + qq['tavini'] / tinc)))
        xt = qq['psini'] * np.mean(yf[:nini, :], axis=0)

    for t in range(nr):
        yft = yf[t, :]
        ph1y = (1.0 + pfac * np.exp(xih1r * yft / xt)) ** (-1)
        pslp = ap * pslp + apc * ph1y
        ph1y = np.minimum(ph1y, 1.0 - pnsaf * (pslp > psthr))
        xtr = (1.0 - ph1y) * yft + ph1y * xt
        xt = ax * xt + axc * xtr
        x[t, :] = xt

    zo = {
        'nrcum': nrcum + nr,
        'xt': xt,
        'pslp': pslp,
        'tinc': tinc,
        'qq': qq,
    }
    return x, zo

v_estnoisem

V_ESTNOISEM - Estimate noise spectrum using minimum statistics (Martin).

v_estnoisem

v_estnoisem(
    yf, tz, pp=None
) -> tuple[ndarray, dict, ndarray]

Estimate noise spectrum using minimum statistics.

Parameters:

Name Type Description Default
yf ndarray

Input power spectra (one row per frame).

required
tz float or dict

Frame increment in seconds, or state from previous call.

required
pp dict

Algorithm parameters.

None

Returns:

Name Type Description
x ndarray

Estimated noise power spectra (one row per frame).

zo dict

Output state.

xs ndarray

Estimated std error of x.

Source code in pyvoicebox/v_estnoisem.py
def v_estnoisem(yf, tz, pp=None) -> tuple[np.ndarray, dict, np.ndarray]:
    """Estimate noise spectrum using minimum statistics.

    Parameters
    ----------
    yf : ndarray
        Input power spectra (one row per frame).
    tz : float or dict
        Frame increment in seconds, or state from previous call.
    pp : dict, optional
        Algorithm parameters.

    Returns
    -------
    x : ndarray
        Estimated noise power spectra (one row per frame).
    zo : dict
        Output state.
    xs : ndarray
        Estimated std error of x.
    """
    yf = np.asarray(yf, dtype=float)
    if yf.ndim == 1:
        yf = yf.reshape(1, -1)
    nr, nrf = yf.shape
    x = np.zeros((nr, nrf))
    xs = np.zeros((nr, nrf))

    if nr == 0 and isinstance(tz, dict):
        return x, tz, xs

    if isinstance(tz, dict):
        nrcum = tz['nrcum']
        p_state = tz['p']
        ac = tz['ac']
        sn2 = tz['sn2']
        pb = tz['pb']
        pb2 = tz['pb2']
        pminu = tz['pminu']
        actmin = tz['actmin']
        actminsub = tz['actminsub']
        subwc = tz['subwc']
        actbuf = tz['actbuf']
        ibuf = tz['ibuf']
        lminflag = tz['lminflag']
        tinc = tz['tinc']
        qq = tz['qq']
    else:
        tinc = tz
        nrcum = 0
        qq = {
            'taca': 0.0449,
            'tamax': 0.392,
            'taminh': 0.0133,
            'tpfall': 0.064,
            'tbmax': 0.0717,
            'qeqmin': 2,
            'qeqmax': 14,
            'av': 2.12,
            'td': 1.536,
            'nu': 8,
            'qith': np.array([0.03, 0.05, 0.06, np.inf]),
            'nsmdb': np.array([47, 31.4, 15.7, 4.1]),
        }
        if pp is not None:
            for key in qq:
                if key in pp:
                    qq[key] = pp[key]

    # Unpack parameters
    taca = qq['taca']
    tamax = qq['tamax']
    taminh = qq['taminh']
    tpfall = qq['tpfall']
    tbmax = qq['tbmax']
    qeqmin = qq['qeqmin']
    qeqmax = qq['qeqmax']
    av = qq['av']
    td = qq['td']
    nu = qq['nu']
    qith = np.asarray(qq['qith'])
    nsmdb = np.asarray(qq['nsmdb'])

    # Derived constants
    aca = np.exp(-tinc / taca)
    acmax = aca
    amax = np.exp(-tinc / tamax)
    aminh = np.exp(-tinc / taminh)
    bmax = np.exp(-tinc / tbmax)
    snrexp = -tinc / tpfall
    nv = round(td / (tinc * nu))
    if nv < 4:
        nv = 4
        nu = max(round(td / (tinc * nv)), 1)
    nd = nu * nv

    md, hd = _mhvals(nd)
    mv, hv = _mhvals(nv)
    nsms = 10.0 ** (nsmdb * nv * tinc / 10.0)
    qeqimax = 1.0 / qeqmin
    qeqimin = 1.0 / qeqmax

    if not isinstance(tz, dict):
        if nr == 0:
            ac = 1.0
            subwc = nv
            ibuf = 0
            p_state = x.copy()
            sn2 = p_state.copy()
            pb = p_state.copy()
            pb2 = pb ** 2
            pminu = p_state.copy()
            actmin = np.full(nrf, np.inf)
            actminsub = actmin.copy()
            actbuf = np.full((nu, nrf), np.inf)
            lminflag = np.zeros(nrf, dtype=bool)
        else:
            p_state = yf[0, :].copy()
            ac = 1.0
            sn2 = p_state.copy()
            pb = p_state.copy()
            pb2 = pb ** 2
            pminu = p_state.copy()
            actmin = np.full(nrf, np.inf)
            actminsub = actmin.copy()
            subwc = nv
            actbuf = np.full((nu, nrf), np.inf)
            ibuf = 0
            lminflag = np.zeros(nrf, dtype=bool)

    for t in range(nr):
        yft = yf[t, :]
        acb = (1.0 + (np.sum(p_state) / np.sum(yft) - 1.0) ** 2) ** (-1)
        ac = aca * ac + (1.0 - aca) * max(acb, acmax)
        ah = amax * ac * (1.0 + (p_state / sn2 - 1.0) ** 2) ** (-1)
        snr = np.sum(p_state) / np.sum(sn2)
        ah = np.maximum(ah, min(aminh, snr ** snrexp))

        p_state = ah * p_state + (1.0 - ah) * yft
        b_coeff = np.minimum(ah ** 2, bmax)
        pb = b_coeff * pb + (1.0 - b_coeff) * p_state
        pb2 = b_coeff * pb2 + (1.0 - b_coeff) * p_state ** 2

        qeqi = np.clip((pb2 - pb ** 2) / (2.0 * sn2 ** 2), qeqimin / (t + nrcum + 1), qeqimax)
        qiav = np.sum(qeqi) / nrf
        bc = 1.0 + av * np.sqrt(qiav)
        bmind = 1.0 + 2.0 * (nd - 1) * (1.0 - md) / (1.0 / qeqi - 2.0 * md)
        bminv = 1.0 + 2.0 * (nv - 1) * (1.0 - mv) / (1.0 / qeqi - 2.0 * mv)
        kmod = bc * p_state * bmind < actmin
        if np.any(kmod):
            actmin[kmod] = bc * p_state[kmod] * bmind[kmod]
            actminsub[kmod] = bc * p_state[kmod] * bminv[kmod]

        if subwc > 1 and subwc < nv:
            lminflag = lminflag | kmod
            pminu = np.minimum(actminsub, pminu)
            sn2 = pminu.copy()
        else:
            if subwc >= nv:
                ibuf = 1 + (ibuf % nu)  # 1-based circular index (MATLAB convention)
                actbuf[ibuf - 1, :] = actmin  # convert to 0-based for array access
                pminu = np.min(actbuf, axis=0)
                # find first qith > qiav
                ii = np.where(qiav < qith)[0]
                if len(ii) > 0:
                    nsm = nsms[ii[0]]
                else:
                    nsm = nsms[-1]
                lmin = lminflag & ~kmod & (actminsub < nsm * pminu) & (actminsub > pminu)
                if np.any(lmin):
                    pminu[lmin] = actminsub[lmin]
                    actbuf[:, lmin] = pminu[lmin]
                lminflag[:] = False
                actmin[:] = np.inf
                subwc = 0

        subwc += 1
        x[t, :] = sn2
        qisq = np.sqrt(qeqi)
        xs[t, :] = sn2 * np.sqrt(0.266 * (nd + 100 * qisq) * qisq / (1.0 + 0.005 * nd + 6.0 / nd) / (0.5 / qeqi + nd - 1))

    zo = {
        'nrcum': nrcum + nr,
        'p': p_state,
        'ac': ac,
        'sn2': sn2,
        'pb': pb,
        'pb2': pb2,
        'pminu': pminu,
        'actmin': actmin,
        'actminsub': actminsub,
        'subwc': subwc,
        'actbuf': actbuf,
        'ibuf': ibuf,
        'lminflag': lminflag,
        'tinc': tinc,
        'qq': qq,
    }
    return x, zo, xs

Enhancement

v_specsub

V_SPECSUB - Speech enhancement using spectral subtraction.

v_specsub

v_specsub(si, fs, pp=None) -> ndarray

Perform speech enhancement using spectral subtraction.

Parameters:

Name Type Description Default
si array_like

Input speech signal.

required
fs float

Sample frequency in Hz.

required
pp dict

Algorithm parameters.

None

Returns:

Name Type Description
ss ndarray

Enhanced speech output.

Source code in pyvoicebox/v_specsub.py
def v_specsub(si, fs, pp=None) -> np.ndarray:
    """Perform speech enhancement using spectral subtraction.

    Parameters
    ----------
    si : array_like
        Input speech signal.
    fs : float
        Sample frequency in Hz.
    pp : dict, optional
        Algorithm parameters.

    Returns
    -------
    ss : ndarray
        Enhanced speech output.
    """
    s = np.asarray(si, dtype=float).ravel()

    # Default parameters
    qq = {
        'of': 2, 'ti': 16e-3, 'ri': 0, 'g': 1, 'e': 1,
        'am': 3, 'b': 0.01, 'al': -5, 'ah': 20,
        'bt': -1, 'ne': 0, 'mx': 0, 'gh': 1, 'tf': 'g', 'rf': 0,
    }
    qp = {}
    if pp is not None:
        for key in qq:
            if key in pp:
                qq[key] = pp[key]
        qp = pp

    # Derived constants
    if qq['ri']:
        ni = int(2 ** np.ceil(np.log2(qq['ti'] * fs * np.sqrt(0.5))))
    else:
        ni = round(qq['ti'] * fs)
    tinc = ni / fs
    ne = qq['ne']

    # Calculate power spectrum
    no = round(qq['of'])
    nf = ni * no
    w = np.sqrt(np.hamming(nf + 1)[:nf])
    w = w / np.sqrt(np.sum(w[::ni] ** 2))

    y, tt, *_ = v_enframe(s, w, ni, 'r')
    tt = tt / fs
    yf = v_rfft(y, nf, 1)  # axis=1 for row-wise FFT
    yp = np.real(yf * np.conj(yf))
    nr, nf2 = yp.shape

    ff = np.arange(nf2) * fs / nf

    if ne > 0:
        dp, ze = v_estnoiseg(yp, tinc, qp)
    else:
        dp, ze, _ = v_estnoisem(yp, tinc, qp)

    ssv = np.zeros(ni * (no - 1))

    if nr == 0:
        return np.array([])

    mz = yp == 0
    if qq['al'] < np.inf:
        ypf = np.sum(yp, axis=1)
        dpf = np.sum(dp, axis=1)
        mzf = dpf == 0
        af = 1.0 + (qq['am'] - 1.0) * (np.clip(10.0 * np.log10(ypf / (dpf + mzf)), qq['al'], qq['ah']) - qq['ah']) / (qq['al'] - qq['ah'])
        af[mzf] = 1.0
    else:
        af = np.full(nr, qq['am'])

    if qq['g'] == 1:
        v = np.sqrt(dp / (yp + mz))
        af = np.sqrt(af)
        bf = np.sqrt(qq['b'])
    elif qq['g'] == 2:
        v = dp / (yp + mz)
        bf = qq['b']
    else:
        v = (dp / (yp + mz)) ** (0.5 * qq['g'])
        af = af ** (0.5 * qq['g'])
        bf = qq['b'] ** (0.5 * qq['g'])

    af = af[:, np.newaxis] * np.ones((1, nf2))
    mf = v >= 1.0 / (af + bf)
    g = np.zeros_like(v)
    eg = qq['e'] / qq['g']
    gh = qq['gh']

    if eg == 1:
        g[mf] = np.minimum(bf * v[mf], gh)
        g[~mf] = 1.0 - af[~mf] * v[~mf]
    elif eg == 0.5:
        g[mf] = np.minimum(np.sqrt(bf * v[mf]), gh)
        g[~mf] = np.sqrt(1.0 - af[~mf] * v[~mf])
    else:
        g[mf] = np.minimum((bf * v[mf]) ** eg, gh)
        g[~mf] = (1.0 - af[~mf] * v[~mf]) ** eg

    if qq['bt'] >= 0:
        g = (g > qq['bt']).astype(float)

    g = qq['mx'] + (1.0 - qq['mx']) * g

    # Inverse FFT and overlap-add
    se = np.real(v_irfft((yf * g).T, nf).T) * w[np.newaxis, :]
    ss = np.zeros(ni * (nr + no - 1))
    for i in range(no):
        frames = np.arange(i, nr, no)
        nm = nf * len(frames)
        seg = se[frames, :].reshape(-1)
        ss[i * ni:i * ni + nm] += seg

    ss = ss[:len(s)]
    return ss

v_specsubm

V_SPECSUBM - Spectral subtraction (Martin's method).

v_specsubm

v_specsubm(s, fs, p=None) -> tuple[ndarray, ndarray]

Spectral subtraction using minimum statistics (Martin).

Parameters:

Name Type Description Default
s array_like

Input speech signal.

required
fs float

Sample frequency in Hz.

required
p array_like

Algorithm parameters (11 elements).

None

Returns:

Name Type Description
ss ndarray

Enhanced speech output.

po ndarray

Parameters used.

Source code in pyvoicebox/v_specsubm.py
def v_specsubm(s, fs, p=None) -> tuple[np.ndarray, np.ndarray]:
    """Spectral subtraction using minimum statistics (Martin).

    Parameters
    ----------
    s : array_like
        Input speech signal.
    fs : float
        Sample frequency in Hz.
    p : array_like, optional
        Algorithm parameters (11 elements).

    Returns
    -------
    ss : ndarray
        Enhanced speech output.
    po : ndarray
        Parameters used.
    """
    s = np.asarray(s, dtype=float).ravel()
    if p is None:
        po = np.array([0.04, 0.1, 0.032, 1.5, 0.08, 400, 4, 4, 1.5, 0.02, 4])
    else:
        po = np.asarray(p, dtype=float).ravel()

    ns = len(s)
    ts = 1.0 / fs
    ss = np.zeros(ns)

    ni = int(2 ** np.ceil(np.log2(fs * po[2] / po[7])))
    ti = ni / fs
    nw = int(ni * po[7])
    nf_count = 1 + int(np.floor((ns - nw) / ni))
    nm_count = int(np.ceil(fs * po[3] / (ni * po[6])))

    win = 0.5 * np.hamming(nw + 1)[:nw] / 1.08
    zg = np.exp(-ti / po[0])
    za = np.exp(-ti / po[1])
    zo = np.exp(-ti / po[4])

    px = np.zeros(1 + nw // 2)
    pxn = px.copy()
    os_val = px.copy()
    mb = np.ones((1 + nw // 2, int(po[6]))) * nw / 2
    im = 0
    osf = po[10] * (1.0 + np.arange(1 + nw // 2) * fs / (nw * po[5])) ** (-1)

    for i_s in range(nf_count):
        idx = np.arange(nw) + i_s * ni
        x = v_rfft(s[idx] * win)
        x2 = np.real(x * np.conj(x))

        pxn = za * pxn + (1.0 - za) * x2
        im = (im + 1) % nm_count
        if im:
            mb[:, 0] = np.minimum(mb[:, 0], pxn)
        else:
            mb = np.column_stack([pxn, mb[:, :int(po[6]) - 1]])
        pn = po[8] * np.min(mb, axis=1)

        os_val = zo * os_val + (1.0 - zo) * (1.0 + osf * pn / (pn + pxn))

        px = zg * px + (1.0 - zg) * x2
        q = np.maximum(po[9] * np.sqrt(pn / x2), 1.0 - np.sqrt(os_val * pn / px))
        ss[idx] += np.real(v_irfft(x * q, nw))

    return ss, po

v_spendred

V_SPENDRED - Speech enhancement using spectral subtraction with decision-directed approach (stub).

v_spendred

v_spendred(*args, **kwargs) -> None

Speech enhancement using spectral subtraction.

This is a complex speech enhancement function. A stub is provided.

Raises:

Type Description
NotImplementedError

Full implementation pending.

Source code in pyvoicebox/v_spendred.py
def v_spendred(*args, **kwargs) -> None:
    """Speech enhancement using spectral subtraction.

    This is a complex speech enhancement function. A stub is provided.

    Raises
    ------
    NotImplementedError
        Full implementation pending.
    """
    raise NotImplementedError(
        "v_spendred is a complex speech enhancement function. "
        "Consider using v_specsub or v_ssubmmse instead."
    )

v_ssubmmse

V_SSUBMMSE - Speech enhancement using MMSE spectral amplitude estimator.

v_ssubmmse

v_ssubmmse(si, fs, pp=None) -> ndarray

Speech enhancement using MMSE spectral amplitude estimator.

Parameters:

Name Type Description Default
si array_like

Input speech signal (1-D vector).

required
fs float

Sample frequency in Hz.

required
pp dict

Algorithm parameters.

None

Returns:

Name Type Description
ss ndarray

Enhanced speech output.

Source code in pyvoicebox/v_ssubmmse.py
def v_ssubmmse(si, fs, pp=None) -> np.ndarray:
    """Speech enhancement using MMSE spectral amplitude estimator.

    Parameters
    ----------
    si : array_like
        Input speech signal (1-D vector).
    fs : float
        Sample frequency in Hz.
    pp : dict, optional
        Algorithm parameters.

    Returns
    -------
    ss : ndarray
        Enhanced speech output.
    """
    kk = np.sqrt(2 * np.pi)
    cc = np.sqrt(2.0 / np.pi)

    s = np.asarray(si, dtype=float).ravel()

    # Default parameters
    qq = {
        'of': 2, 'ti': 16e-3, 'ri': 0, 'ta': 0.396,
        'gx': 1000, 'gn': 1, 'gz': 0.001, 'xn': 0, 'xb': 1,
        'lg': 1, 'ne': 1, 'bt': -1, 'mx': 0, 'gc': 10,
        'tf': 'g', 'rf': 0,
    }
    qp = {}
    if pp is not None:
        for key in qq:
            if key in pp:
                qq[key] = pp[key]
        qp = pp

    # Derived constants
    if qq['ri']:
        ni = int(2 ** np.ceil(np.log2(qq['ti'] * fs * np.sqrt(0.5))))
    else:
        ni = round(qq['ti'] * fs)
    tinc = ni / fs
    a = np.exp(-tinc / qq['ta'])
    gx = qq['gx']
    gz = qq['gz']
    xn = qq['xn']
    ne = qq['ne']
    gn1 = max(qq['gn'] - 1, 0)
    xb = qq['xb']

    # Calculate power spectrum
    no = round(qq['of'])
    nf = ni * no
    w = np.sqrt(np.hamming(nf + 1)[:nf])
    w = w / np.sqrt(np.sum(w[::ni] ** 2))

    y, tt, *_ = v_enframe(s, w, ni, 'r')
    tt = tt / fs
    yf = v_rfft(y, nf, 1)
    yp = np.real(yf * np.conj(yf))
    nr, nf2 = yp.shape

    if ne > 0:
        dp, ze = v_estnoiseg(yp, tinc, qp)
    else:
        dp, ze, _ = v_estnoisem(yp, tinc, qp)

    xu = 1.0

    if nr == 0:
        return np.array([])

    gam = np.clip(yp / dp, gz, gx)
    g = np.zeros((nr, nf2))
    x_snr = np.zeros((nr, nf2))

    if qq['lg'] == 0:  # amplitude domain
        for i in range(nr):
            gami = gam[i, :]
            xi = np.maximum(a * xb * xu + (1 - a) * np.maximum(gami - 1, gn1), xn)
            v = 0.5 * xi * gami / (1 + xi)
            gi = (0.277 + 2 * v) / gami
            mv = v < 0.5
            if np.any(mv):
                vmv = v[mv]
                gi[mv] = kk * np.sqrt(vmv) * ((0.5 + vmv) * besseli(0, vmv) + vmv * besseli(1, vmv)) / (gami[mv] * np.exp(vmv))
            g[i, :] = gi
            x_snr[i, :] = xi
            xu = gami * gi ** 2
    elif qq['lg'] == 2:  # perceptual
        for i in range(nr):
            gami = gam[i, :]
            xi = np.maximum(a * xb * xu + (1 - a) * np.maximum(gami - 1, gn1), xn)
            v = 0.5 * xi * gami / (1 + xi)
            gi = cc * np.sqrt(v) * np.exp(v) / (gami * besseli(0, v))
            g[i, :] = gi
            x_snr[i, :] = xi
            xu = gami * gi ** 2
    else:  # log domain (default)
        for i in range(nr):
            gami = gam[i, :]
            xi = np.maximum(a * xb * xu + (1 - a) * np.maximum(gami - 1, gn1), xn)
            xir = xi / (1 + xi)
            gi = xir * np.exp(0.5 * _expint(xir * gami))
            g[i, :] = gi
            x_snr[i, :] = xi
            xu = gami * gi ** 2

    g = np.minimum(qq['mx'] + (1 - qq['mx']) * g, qq['gc'])
    if qq['bt'] >= 0:
        g = (g > qq['bt']).astype(float)

    # Inverse FFT and overlap-add
    se = np.real(v_irfft((yf * g).T, nf).T) * w[np.newaxis, :]
    ss = np.zeros(ni * (nr + no - 1))
    for i in range(no):
        frames = np.arange(i, nr, no)
        nm = nf * len(frames)
        seg = se[frames, :].reshape(-1)
        ss[i * ni:i * ni + nm] += seg

    ss = ss[:len(s)]
    return ss

v_ssubmmsev

V_SSUBMMSEV - Speech enhancement using MMSE with VAD-based noise estimation.

v_ssubmmsev

v_ssubmmsev(si, fs, pp=None) -> ndarray

Speech enhancement using MMSE with VAD-based noise estimation.

Parameters:

Name Type Description Default
si array_like

Input speech signal (1-D vector).

required
fs float

Sample frequency in Hz.

required
pp dict

Algorithm parameters.

None

Returns:

Name Type Description
ss ndarray

Enhanced speech output.

Source code in pyvoicebox/v_ssubmmsev.py
def v_ssubmmsev(si, fs, pp=None) -> np.ndarray:
    """Speech enhancement using MMSE with VAD-based noise estimation.

    Parameters
    ----------
    si : array_like
        Input speech signal (1-D vector).
    fs : float
        Sample frequency in Hz.
    pp : dict, optional
        Algorithm parameters.

    Returns
    -------
    ss : ndarray
        Enhanced speech output.
    """
    kk = np.sqrt(2 * np.pi)
    cc = np.sqrt(2.0 / np.pi)

    s = np.asarray(si, dtype=float).ravel()

    qq = {
        'of': 2, 'ti': 16e-3, 'ri': 0, 'ta': 0.396,
        'gx': 1000, 'gn': 1, 'gz': 0.001, 'xn': 0, 'xb': 1,
        'lg': 1, 'ne': 0, 'bt': -1, 'mx': 0,
        'tf': 'g', 'rf': 0,
        'tn': 0.5, 'le': 0.15, 'tx': 0.06,
    }
    if pp is not None:
        for key in qq:
            if key in pp:
                qq[key] = pp[key]

    if qq['ri']:
        ni = int(2 ** np.ceil(np.log2(qq['ti'] * fs * np.sqrt(0.5))))
    else:
        ni = round(qq['ti'] * fs)
    tinc = ni / fs
    a_coeff = np.exp(-tinc / qq['ta'])
    gx = qq['gx']
    gz = qq['gz']
    xn = qq['xn']
    gn1 = max(qq['gn'] - 1, 0)
    le = qq['le']
    xb = qq['xb']
    nd = max(1, round(qq['tx'] / tinc))
    an = np.exp(-tinc / qq['tn'])

    no = round(qq['of'])
    nf = ni * no
    w = np.sqrt(np.hamming(nf + 1)[:nf])
    w = w / np.sqrt(np.sum(w[::ni] ** 2))

    y, tt, *_ = v_enframe(s, w, ni, 'r')
    tt = tt / fs
    yf = v_rfft(y, nf, 1)
    yp = np.real(yf * np.conj(yf))
    nr, nf2 = yp.shape

    dpi = np.zeros(nf2)
    ndp = 0
    xu = 1.0

    if nr == 0:
        return np.array([])

    # Initialize noise estimate
    ndx = min(nr, nd - ndp)
    dpi = np.mean(yp[:ndx, :], axis=0)
    ndp = ndx

    g = np.zeros((nr, nf2))

    if qq['lg'] == 0:
        for i in range(nr):
            ypi = yp[i, :]
            gami = np.clip(ypi / dpi, gz, gx)
            xi = np.maximum(a_coeff * xb * xu + (1 - a_coeff) * np.maximum(gami - 1, gn1), xn)
            if np.sum(gami * xi / (1 + xi) - np.log(1 + xi)) < le * nf2:
                dpi = dpi * an + (1 - an) * ypi
            v = 0.5 * xi * gami / (1 + xi)
            gi = (0.277 + 2 * v) / gami
            mv = v < 0.5
            if np.any(mv):
                vmv = v[mv]
                gi[mv] = kk * np.sqrt(vmv) * ((0.5 + vmv) * besseli(0, vmv) + vmv * besseli(1, vmv)) / (gami[mv] * np.exp(vmv))
            g[i, :] = gi
            xu = gami * gi ** 2
    elif qq['lg'] == 2:
        for i in range(nr):
            ypi = yp[i, :]
            gami = np.clip(ypi / dpi, gz, gx)
            xi = np.maximum(a_coeff * xb * xu + (1 - a_coeff) * np.maximum(gami - 1, gn1), xn)
            if np.sum(gami * xi / (1 + xi) - np.log(1 + xi)) < le * nf2:
                dpi = dpi * an + (1 - an) * ypi
            v = 0.5 * xi * gami / (1 + xi)
            gi = cc * np.sqrt(v) * np.exp(v) / (gami * besseli(0, v))
            g[i, :] = gi
            xu = gami * gi ** 2
    else:
        for i in range(nr):
            ypi = yp[i, :]
            gami = np.clip(ypi / dpi, gz, gx)
            xi = np.maximum(a_coeff * xb * xu + (1 - a_coeff) * np.maximum(gami - 1, gn1), xn)
            xir = xi / (1 + xi)
            if np.sum(gami * xir - np.log(1 + xi)) < le * nf2:
                dpi = dpi * an + (1 - an) * ypi
            gi = xir * np.exp(0.5 * _expint(xir * gami))
            g[i, :] = gi
            xu = gami * gi ** 2

    if qq['bt'] >= 0:
        g = (g > qq['bt']).astype(float)
    g = qq['mx'] + (1 - qq['mx']) * g

    se = np.real(v_irfft((yf * g).T, nf).T) * w[np.newaxis, :]
    ss = np.zeros(ni * (nr + no - 1))
    for i in range(no):
        frames = np.arange(i, nr, no)
        nm = nf * len(frames)
        seg = se[frames, :].reshape(-1)
        ss[i * ni:i * ni + nm] += seg

    ss = ss[:len(s)]
    return ss