Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
538fc13
feat: add MVP of propagating room downwards from room -> track -> aud…
1egoman May 18, 2026
7c7eaa4
feat: call _on_stream_info_updated with parent room reference on audi…
1egoman May 19, 2026
12718d1
feat: call _on_credentials_updated with token / server url extracted …
1egoman May 20, 2026
af26b3d
fix: remove debugging logs
1egoman May 20, 2026
5ecca5d
fix: address lint errors
1egoman May 20, 2026
af56d61
feat: only call frame processor handlers if room is set
1egoman May 20, 2026
f62c247
fix: properly intercept room refresh token events
1egoman May 20, 2026
e7ab10e
feat: add from __future__ import annotations to remove string types
1egoman May 26, 2026
f7f422d
fix: address incorrect docs
1egoman May 26, 2026
24f2b6e
refactor: centralize frame processor state logic into Track, not Audi…
1egoman May 26, 2026
ad32574
feat: add auto cleanup of FrameProcessor as opt-out
1egoman May 26, 2026
ce5e793
fix: disable no-op credentials push
1egoman May 26, 2026
b9f34d0
fix: move processor close from __del__ to aclose
1egoman May 26, 2026
4c73cc8
fix: proxy throgh noise_cancellation_leave_open into AudioStream.from…
1egoman May 26, 2026
22f4896
fix: include missed noise_cancellation_leave_open in from_track
1egoman May 26, 2026
2dbe350
fix: address type checker warning
1egoman May 26, 2026
07fec79
feat: add new _on_stream_info_cleared / _on_credentials_cleared Frame…
1egoman May 27, 2026
8d3f4fe
fix: apply devin suggestion
1egoman May 27, 2026
7743e6a
feat: add new frame processor tests
1egoman May 27, 2026
75d8874
fix: address type errors in tests
1egoman May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
noise_cancellation_leave_open: bool = False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
noise_cancellation_leave_open: bool = False,

Can we move that inside NoiseCancellationOptions?

**kwargs: Any,
) -> None:
"""Initialize an `AudioStream` instance.
Expand All @@ -81,6 +82,9 @@ def __init__(
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.
noise_cancellation_leave_open (bool):
When the audio stream closes, leaves the FrameProcessor in an unclosed state so it
can be used with another AudioStream.

Example:
```python
Expand Down Expand Up @@ -113,11 +117,13 @@ def __init__(
self._audio_filter_module: str | None = None
self._audio_filter_options: dict[str, Any] | None = None
self._processor: FrameProcessor[AudioFrame] | None = None
self._processor_leave_open = False
if isinstance(noise_cancellation, NoiseCancellationOptions):
self._audio_filter_module = noise_cancellation.module_id
self._audio_filter_options = noise_cancellation.options
elif isinstance(noise_cancellation, FrameProcessor):
self._processor = noise_cancellation
self._processor_leave_open = noise_cancellation_leave_open

self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)
Expand All @@ -132,6 +138,9 @@ def __init__(
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info

if self._track is not None:
self._track._register_audio_stream(self)

@classmethod
def from_participant(
cls,
Expand All @@ -144,6 +153,7 @@ def from_participant(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
noise_cancellation_leave_open: bool = False,
) -> AudioStream:
"""Create an `AudioStream` from a participant's audio track.

Expand Down Expand Up @@ -179,8 +189,9 @@ def from_participant(
track=None, # type: ignore
sample_rate=sample_rate,
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
noise_cancellation=noise_cancellation,
noise_cancellation_leave_open=noise_cancellation_leave_open,
)

@classmethod
Expand All @@ -194,6 +205,7 @@ def from_track(
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
noise_cancellation_leave_open: bool = False,
) -> AudioStream:
"""Create an `AudioStream` from an existing audio track.

Expand All @@ -203,9 +215,12 @@ def from_track(
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
noise_cancellation (Optional[NoiseCancellationOptions], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` instance
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
created by the noise cancellation module.
noise_cancellation_leave_open (bool):
When the audio stream closes, leaves the FrameProcessor in an unclosed state so it
can be used with another AudioStream.

Returns:
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.
Expand All @@ -225,8 +240,9 @@ def from_track(
capacity=capacity,
sample_rate=sample_rate,
num_channels=num_channels,
noise_cancellation=noise_cancellation,
frame_size_ms=frame_size_ms,
noise_cancellation=noise_cancellation,
noise_cancellation_leave_open=noise_cancellation_leave_open,
)

def __del__(self) -> None:
Expand Down Expand Up @@ -303,8 +319,12 @@ async def aclose(self) -> None:
This method cleans up resources associated with the audio stream and waits for
any pending operations to complete.
"""
if self._track is not None:
self._track._unregister_audio_stream(self)
self._ffi_handle.dispose()
await self._task
if self._processor is not None and not self._processor_leave_open:
self._processor._close()

def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
return e.audio_stream_event.stream_handle == self._ffi_handle.handle
Expand Down
4 changes: 4 additions & 0 deletions livekit-rtc/livekit/rtc/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ def _on_stream_info_updated(
publication_sid: str,
) -> None: ...

def _on_stream_info_cleared(self) -> None: ...

def _on_credentials_updated(self, *, token: str, url: str) -> None: ...

def _on_credentials_cleared(self) -> None: ...

@abstractmethod
def _process(self, frame: T) -> T: ...

Expand Down
8 changes: 8 additions & 0 deletions livekit-rtc/livekit/rtc/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,14 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
sid = event.local_track_published.track_sid
lpublication = self.local_participant.track_publications[sid]
ltrack = lpublication.track
if ltrack is not None:
ltrack._set_room(self)
self.emit("local_track_published", lpublication, ltrack)
elif which == "local_track_unpublished":
sid = event.local_track_unpublished.publication_sid
lpublication = self.local_participant.track_publications[sid]
if lpublication.track is not None:
lpublication.track._set_room(None)
self.emit("local_track_unpublished", lpublication)
elif which == "local_track_republished":
# The SDK auto-republished a local track during a full
Expand Down Expand Up @@ -777,17 +781,21 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
rpublication._subscribed = True
if track_info.kind == TrackKind.KIND_VIDEO:
remote_video_track = RemoteVideoTrack(owned_track_info)
remote_video_track._set_room(self)
rpublication._track = remote_video_track
self.emit("track_subscribed", remote_video_track, rpublication, rparticipant)
elif track_info.kind == TrackKind.KIND_AUDIO:
remote_audio_track = RemoteAudioTrack(owned_track_info)
remote_audio_track._set_room(self)
rpublication._track = remote_audio_track
self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant)
elif which == "track_unsubscribed":
identity = event.track_unsubscribed.participant_identity
rparticipant = self._remote_participants[identity]
rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid]
rtrack = rpublication.track
if rtrack is not None:
rtrack._set_room(None)
rpublication._track = None
rpublication._subscribed = False
self.emit("track_unsubscribed", rtrack, rpublication, rparticipant)
Expand Down
78 changes: 77 additions & 1 deletion livekit-rtc/livekit/rtc/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,97 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, List, Union
from __future__ import annotations

import weakref
from typing import TYPE_CHECKING, List, Optional, Union
from ._ffi_client import FfiHandle, FfiClient
from ._proto import ffi_pb2 as proto_ffi
from ._proto import track_pb2 as proto_track
from ._proto import stats_pb2 as proto_stats

if TYPE_CHECKING:
from .audio_source import AudioSource
from .audio_stream import AudioStream
from .room import Room
from .video_source import VideoSource


class Track:
def __init__(self, owned_info: proto_track.OwnedTrack):
self._info = owned_info.info
self._ffi_handle = FfiHandle(owned_info.handle.id)
self._room_ref: Optional[weakref.ref[Room]] = None
self._audio_streams: weakref.WeakSet[AudioStream] = weakref.WeakSet()

def _resolve_room(self) -> Optional[Room]:
return self._room_ref() if self._room_ref is not None else None

def _set_room(self, room: Optional[Room]) -> None:
old_room = self._resolve_room()
if old_room is not room:
if old_room is not None:
old_room.off("token_refreshed", self._on_room_token_refreshed)
if room is not None:
room.on("token_refreshed", self._on_room_token_refreshed)

self._room_ref = weakref.ref(room) if room is not None else None

for stream in self._audio_streams:
self._push_processor_metadata_to_stream(stream, room)

def _on_room_token_refreshed(self) -> None:
room = self._resolve_room()
if room is None or room._token is None or room._server_url is None:
return
for stream in self._audio_streams:
if not stream._processor:
continue
stream._processor._on_credentials_updated(token=room._token, url=room._server_url)

def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None:
if not stream._processor:
return

if room is None:
# track left a room - clear processor's room context
stream._processor._on_stream_info_cleared()
stream._processor._on_credentials_cleared()
return

identity = ""
pub_sid = ""
track_sid = self.sid
if track_sid:
for participant in room.remote_participants.values():
publication = participant.track_publications.get(track_sid)
if publication is not None:
identity, pub_sid = participant.identity, publication.sid
break
else:
local = room._local_participant
if local is not None:
for local_publication in local.track_publications.values():
if local_publication.sid == track_sid:
identity, pub_sid = local.identity, local_publication.sid
break

stream._processor._on_stream_info_updated(
room_name=room.name,
participant_identity=identity,
publication_sid=pub_sid,
)
if room._token is not None and room._server_url is not None:
stream._processor._on_credentials_updated(token=room._token, url=room._server_url)

def _register_audio_stream(self, stream: AudioStream) -> None:
self._audio_streams.add(stream)
room = self._resolve_room()
if room is not None:
self._push_processor_metadata_to_stream(stream, room)

def _unregister_audio_stream(self, stream: AudioStream) -> None:
self._audio_streams.discard(stream)

@property
def sid(self) -> str:
Expand Down
Loading
Loading