Skip to content

[bugfix] add optional MEDIA_DECODE_TIMEOUT to prevent silent media-decode hang#9541

Open
HaozheZhang6 wants to merge 1 commit into
modelscope:mainfrom
HaozheZhang6:fix/media-decode-timeout
Open

[bugfix] add optional MEDIA_DECODE_TIMEOUT to prevent silent media-decode hang#9541
HaozheZhang6 wants to merge 1 commit into
modelscope:mainfrom
HaozheZhang6:fix/media-decode-timeout

Conversation

@HaozheZhang6

Copy link
Copy Markdown
Contributor

PR type

  • Bug Fix
  • New Feature
  • Document Updates
  • More Models or Datasets Support

PR information

Fixes #9507.

During multimodal SFT a corrupt or unsupported clip can make the native media decoders deadlock in C while holding the GIL — librosa.load falling back to audioread.ffdec.FFmpegAudioFile (the known ffmpeg pipe-starvation hang), and decord.get_batch. The decode runs in-loop in the data pipeline with no bound, so a single bad sample silently hangs a DataLoader worker forever: process alive, GPUs at 0%, no exception, no log line. A signal-based timeout can't help here — the decoder holds the GIL, so SIGALRM never fires.

This adds an opt-in hard wall-clock timeout. When MEDIA_DECODE_TIMEOUT (seconds) > 0, the decode runs in a forked worker that is killed on overrun and raises TimeoutError, so one bad clip can't freeze the run. Unset/0 (default) keeps the original in-process path with zero overhead — non-breaking.

  • _decode_with_timeout(func, *args, **kwargs) — generic, env-gated via get_env_args('media_decode_timeout', ...) to match the existing env-arg convention.
  • load_audio decodes through it (body split into _load_audio so it's picklable for the worker).
  • Uses the fork context on purpose: load_audio runs inside the data pipeline where fork is already the norm (the DataLoader itself forks workers), and unlike forkserver/spawn it doesn't re-import the training entrypoint per call. Falls back to the default context where fork is unavailable.

Scope is the audio path (the one reproduced in the issue). The helper is reusable for the decord load_video_* paths — glad to extend it to them in this PR if you'd prefer one change.

One note: on a multi-threaded host process Python prints a fork() DeprecationWarning, the same one the DataLoader triggers; kept fork deliberately for the reasons above, but happy to switch the mechanism if you'd rather.

Experiment results

tests/utils/test_vision_utils.py (CPU-only, no GPU/model/network):

test_decode_with_timeout_kills_hung_decode PASSED
test_decode_with_timeout_returns_result_when_fast PASSED
test_decode_with_timeout_propagates_decode_error PASSED
test_decode_with_timeout_disabled_calls_directly PASSED
4 passed

End-to-end on a real 16 kHz WAV, load_audio returns an identical array (shape (3200,)) with the timeout off (in-process) and on (forked decode), confirming the wrapped path decodes correctly. isort + flake8 + yapf (repo config) clean on the changed files.

…code hang

Corrupt or unsupported audio/video clips can make the native decoders
(librosa->audioread/ffmpeg, decord) deadlock in C while holding the GIL, which
silently hangs a DataLoader worker forever with GPUs idle and no error logged.

Add an opt-in hard wall-clock timeout: when MEDIA_DECODE_TIMEOUT (seconds) > 0,
the decode runs in a forked worker that is killed on overrun and raises
TimeoutError so one bad clip cannot freeze the whole run. Default (unset/0)
keeps the original in-process path with zero overhead. Applied to load_audio
(the reproduced hang path); the helper is reusable for the decord video paths.

Fixes modelscope#9507

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a subprocess-based timeout mechanism (_decode_with_timeout) for media decoding to prevent silent hangs in DataLoader workers, along with corresponding unit tests. A critical review comment points out that using multiprocessing.SimpleQueue can lead to a deadlock if the decoded payload exceeds the OS pipe buffer limit, as the parent process joins the child before reading from the queue. It is recommended to use multiprocessing.Pipe and poll(timeout) instead to avoid this issue.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +299 to +336
def _decode_worker(queue, func, args, kwargs):
try:
queue.put((True, func(*args, **kwargs)))
except Exception as e:
queue.put((False, e))


def _decode_with_timeout(func: Callable[..., _T], *args, **kwargs) -> _T:
# Native media decoders (audioread/ffmpeg, decord) can deadlock in C while holding the GIL on a
# corrupt/unsupported clip, silently hanging a DataLoader worker forever; a signal-based timeout
# can't interrupt them. When `MEDIA_DECODE_TIMEOUT` (seconds) > 0, decode in a killable subprocess.
timeout = get_env_args('media_decode_timeout', float, 0)
if not timeout or timeout <= 0:
return func(*args, **kwargs)
import multiprocessing as mp

# Fork the decode worker: load_audio runs inside the data pipeline where fork is already the
# norm (PyTorch DataLoader), and unlike forkserver/spawn it does not re-import the training
# entrypoint per call. Fall back to the default context where fork is unavailable.
try:
ctx = mp.get_context('fork')
except ValueError:
ctx = mp.get_context()
queue = ctx.SimpleQueue()
process = ctx.Process(target=_decode_worker, args=(queue, func, args, kwargs))
process.start()
process.join(timeout)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f'Media decode exceeded MEDIA_DECODE_TIMEOUT={timeout}s and was killed '
'(likely a corrupt or unsupported clip).')
if process.exitcode != 0:
raise RuntimeError(f'Media decode subprocess exited abnormally (exitcode={process.exitcode}).')
ok, payload = queue.get()
if not ok:
raise payload
return payload

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using multiprocessing.SimpleQueue can lead to a silent deadlock when the decoded payload is large.

The Issue

SimpleQueue is backed by an OS pipe. If the serialized payload (e.g., decoded audio arrays or video frames) exceeds the OS pipe buffer limit (typically 64KB on Linux), the child process's queue.put() call will block indefinitely until the parent process reads from the queue. However, the parent process calls process.join(timeout) before reading from the queue. This creates a classic deadlock: the child is blocked waiting for the parent to read, and the parent is blocked waiting for the child to finish. This will cause a false TimeoutError and terminate the child process for any large media file.

The Solution

Use multiprocessing.Pipe and poll(timeout) instead. This allows the parent process to detect if data is available to read (or if the child has exited/errored) and read from the pipe, which unblocks the child process if the buffer fills up.

def _decode_worker(conn, func, args, kwargs):
    try:
        conn.send((True, func(*args, **kwargs)))
    except Exception as e:
        conn.send((False, e))
    finally:
        conn.close()


def _decode_with_timeout(func: Callable[..., _T], *args, **kwargs) -> _T:
    # Native media decoders (audioread/ffmpeg, decord) can deadlock in C while holding the GIL on a
    # corrupt/unsupported clip, silently hanging a DataLoader worker forever; a signal-based timeout
    # can't interrupt them. When `MEDIA_DECODE_TIMEOUT` (seconds) > 0, decode in a killable subprocess.
    timeout = get_env_args('media_decode_timeout', float, 0)
    if not timeout or timeout <= 0:
        return func(*args, **kwargs)
    import multiprocessing as mp

    # Fork the decode worker: load_audio runs inside the data pipeline where fork is already the
    # norm (PyTorch DataLoader), and unlike forkserver/spawn it does not re-import the training
    # entrypoint per call. Fall back to the default context where fork is unavailable.
    try:
        ctx = mp.get_context('fork')
    except ValueError:
        ctx = mp.get_context()
    parent_conn, child_conn = ctx.Pipe(duplex=False)
    process = ctx.Process(target=_decode_worker, args=(child_conn, func, args, kwargs))
    process.start()
    child_conn.close()
    if parent_conn.poll(timeout):
        try:
            ok, payload = parent_conn.recv()
        except EOFError:
            process.join()
            raise RuntimeError(f'Media decode subprocess exited abnormally (exitcode={process.exitcode}).')
        process.join()
        if not ok:
            raise payload
        return payload
    else:
        process.terminate()
        process.join()
        raise TimeoutError(f'Media decode exceeded MEDIA_DECODE_TIMEOUT={timeout}s and was killed '
                           '(likely a corrupt or unsupported clip).')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Multimodal SFT silently hangs forever on certain clips: load_audio (librosa→audioread) and decord video decode have no timeout (DataLoader deadlock)

1 participant