Daily Pipecat

Voice Isolation

⚠️

Krisp SDK integration guide published at https://docs.pipecat.ai/guides/features/krisp is not compatible with the latest Krisp Server SDK and models.

Integrating Krisp Server SDK with Pipecat

  • Download the latest Python SDK from the SDK Portal..
  • Install Python wheel file into your venv of the Pipecat installation.
pip install <path to the Krisp wheel file>
transport = DailyTransport(
    room_url,
    token,
    "Respond bot",
    DailyParams(
        audio_in_filter=KrispFilter(), # Enable Krisp noise reduction
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
)

Check the NoiseFilterFromKrisp in the code snippet.

import os
import numpy as np
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame

import krisp_audio


class NoiseFilterFromKrisp(BaseAudioFilter):
    krisp_audio.globalInit("")
    SDK_VERSION = krisp_audio.getVersion()
    print(f"Krisp Audio Python SDK Version: {SDK_VERSION.major}."
          f"{SDK_VERSION.minor}.{SDK_VERSION.patch}")
    SAMPLE_RATES = {
        8000: krisp_audio.SamplingRate.Sr8000Hz,
        16000: krisp_audio.SamplingRate.Sr16000Hz,
        24000: krisp_audio.SamplingRate.Sr24000Hz,
        32000: krisp_audio.SamplingRate.Sr32000Hz,
        44100: krisp_audio.SamplingRate.Sr44100Hz,
        48000: krisp_audio.SamplingRate.Sr48000Hz
    }

    def __init__(self, model_path: str):
        super().__init__()
        if not model_path:
            raise Exception("Model path is not set")
        if not model_path.endswith(".kef"):
            raise Exception("Model is expected with .kef extension")
        if not os.path.isfile(model_path):
            raise FileNotFoundError(f"Model file not found: {model_path}")
        self._model_path = model_path
        self._filtering = True
        self._session = None
        self._samples_per_frame = None
        self._noise_suppression_level = 100

    def _int_to_sample_rate(self, sample_rate):
        if sample_rate not in self.SAMPLE_RATES:
            raise ValueError("Unsupported sample rate")
        return self.SAMPLE_RATES[sample_rate]

    async def start(self, sample_rate: int):
        model_info = krisp_audio.ModelInfo()
        model_info.path = self._model_path
        nc_cfg = krisp_audio.NcSessionConfig()
        nc_cfg.inputSampleRate = self._int_to_sample_rate(sample_rate)
        nc_cfg.inputFrameDuration = krisp_audio.FrameDuration.Fd10ms
        nc_cfg.outputSampleRate = nc_cfg.inputSampleRate
        nc_cfg.modelInfo = model_info
        self._samples_per_frame = int((sample_rate * 10) / 1000)
        self._session = krisp_audio.NcInt16.create(nc_cfg)

    async def stop(self):
        self._session = None

    async def process_frame(self, frame: FilterControlFrame):
        if isinstance(frame, FilterEnableFrame):
            self._filtering = frame.enable

    async def filter(self, audio: bytes) -> bytes:
        if not self._filtering:
            return audio
        samples = np.frombuffer(audio, dtype=np.int16)
        if samples.size % self._samples_per_frame:
            raise ValueError(f"Audio length {samples.size} samples is not a multiple of {self._samples_per_frame}")
        frames = samples.reshape(-1, self._samples_per_frame)
        processed_samples = np.empty_like(samples)
        for i, frame in enumerate(frames):
            cleaned_frame = self._session.process(frame, self._noise_suppression_level)
            processed_samples[i*self._samples_per_frame:(i+1)*self._samples_per_frame] = cleaned_frame
        return processed_samples.tobytes()

The NoiseFilterFromKrisp does not use KRISP_SDK_PATH and KRISP_MODEL_PATH environment variables. The class constructor has the model_path mandatory argument .


Turn-Taking

import krisp_audio
import numpy as np
import time
from typing import Optional, Tuple

def log_callback(log_message, log_level):
    print(f"[{log_level}] {log_message}", flush=True)

class BaseKrispTurn(BaseTurnAnalyzer):
    krisp_audio.globalInit("", log_callback, krisp_audio.LogLevel.Off)
    SDK_VERSION = krisp_audio.getVersion()
    print(f"Krisp Audio Python SDK Version: {SDK_VERSION.major}."
          f"{SDK_VERSION.minor}.{SDK_VERSION.patch}")
    SAMPLE_RATES = {
        8000: krisp_audio.SamplingRate.Sr8000Hz,
        16000: krisp_audio.SamplingRate.Sr16000Hz,
        24000: krisp_audio.SamplingRate.Sr24000Hz,
        32000: krisp_audio.SamplingRate.Sr32000Hz,
        44100: krisp_audio.SamplingRate.Sr44100Hz,
        48000: krisp_audio.SamplingRate.Sr48000Hz
    }
	
    def __init__(
        self, 
        *, 
        sample_rate: Optional[int] = None, 
        device: str = "cpu",
        silence_duration: float = 5.0  
    ):        
        super().__init__(sample_rate=16000)
        self.threshold = 0.5

        self.model_path = "model/path.kef"
        krisp_audio.globalInit("", log_callback, krisp_audio.LogLevel.Off)
        self.tt_session =self._create_tt_session()
		
        self._speech_triggered = False
        self._speech_start_time = None
        self._eot_start_time = None
        self._silence_ms = 0
		
    def _create_tt_session(self):
        model_info = krisp_audio.ModelInfo()
        model_info.path = self.model_path

        tt_cfg = krisp_audio.TtSessionConfig()
        tt_cfg.inputSampleRate = krisp_audio.SamplingRate.Sr16000Hz
        tt_cfg.inputFrameDuration = krisp_audio.FrameDuration.Fd20ms
        tt_cfg.modelInfo = model_info
        tt_instance = krisp_audio.TtFloat.create(tt_cfg)

        return tt_instance

    @property
    def speech_triggered(self) -> bool:
        return self._speech_triggered

    def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
        # Convert raw audio to float32 format and append to the buffer
        audio_int16 = np.frombuffer(buffer, dtype=np.int16)
        audio_float32 = audio_int16.astype(np.float32) / 32768.0
        
        state = EndOfTurnState.INCOMPLETE

        if is_speech:
            # Reset silence tracking on speech
            self._silence_ms = 0
            self._speech_triggered = True
            self._eot_start_time = None
            if self._speech_start_time is None:
                self._speech_start_time = time.time()
        else:
            if self._eot_start_time is None:
                self._eot_start_time = time.time()
        prob = self.tt_session.process(audio_float32.tolist())

        if self._speech_triggered and prob >= self.threshold:
            if self._eot_start_time is None:
                self._eot_start_time = time.time()
            elapsed_time = time.time() - self._eot_start_time
            state = EndOfTurnState.COMPLETE
            self._silence_ms = 0
            self._speech_triggered = False
            self._speech_start_time = None
            self._eot_start_time = None

        return state

    async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
        return EndOfTurnState.INCOMPLETE, None

    def _clear(self, turn_state: EndOfTurnState):
        # If the state is still incomplete, keep the _speech_triggered as True
        self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
        self._speech_start_time = None
        self._eot_start_time = None