diff --git a/doc/code/scenarios/3_adaptive_scenarios.ipynb b/doc/code/scenarios/3_adaptive_scenarios.ipynb new file mode 100644 index 000000000..f52ddf1be --- /dev/null +++ b/doc/code/scenarios/3_adaptive_scenarios.ipynb @@ -0,0 +1,265 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "94e7f44a", + "metadata": {}, + "source": [ + "# Adaptive Scenarios\n", + "\n", + "An **adaptive scenario** doesn't run every attack technique against every objective.\n", + "Instead, it picks which technique to try next per-objective, learns from what worked,\n", + "and stops as soon as one technique succeeds. This concentrates spend on techniques\n", + "that actually work on your target.\n", + "\n", + "## How it works (high level)\n", + "\n", + "For each objective, the scenario tries up to `max_attempts_per_objective` techniques:\n", + "\n", + "- With probability `epsilon`, it **explores** — picks a random technique.\n", + "- Otherwise it **exploits** — picks the technique with the highest observed success\n", + " rate so far.\n", + "- It records the outcome and stops early on success.\n", + "\n", + "Unseen techniques are tried first, so the first few objectives effectively round-robin\n", + "through every technique before the scenario settles on the best performers.\n", + "\n", + "## Adaptive vs. static scenarios\n", + "\n", + "| Feature | Static scenarios | Adaptive scenarios |\n", + "|---------------------|-----------------------------------|------------------------------------|\n", + "| Technique selection | Run every selected technique | Pick per-objective from outcomes |\n", + "| Early stopping | No | Yes — stops on first success |\n", + "| Cost | O(techniques × objectives) | O(max_attempts × objectives) |\n", + "\n", + "`AdaptiveScenario` is the modality-agnostic base class.\n", + "[`TextAdaptive`](../../../pyrit/scenario/scenarios/adaptive/text_adaptive.py) is the\n", + "text subclass used in the examples below." + ] + }, + { + "cell_type": "markdown", + "id": "cb716650", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4b536900", + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "from pyrit.registry import TargetRegistry\n", + "from pyrit.scenario import DatasetConfiguration\n", + "from pyrit.scenario.printer.console_printer import ConsoleScenarioResultPrinter\n", + "from pyrit.scenario.scenarios.adaptive import TextAdaptive\n", + "from pyrit.setup import initialize_from_config_async\n", + "\n", + "await initialize_from_config_async(config_path=Path(\"../../scanner/pyrit_conf.yaml\")) # type: ignore\n", + "\n", + "objective_target = TargetRegistry.get_registry_singleton().get_instance_by_name(\"openai_chat\")\n", + "printer = ConsoleScenarioResultPrinter()" + ] + }, + { + "cell_type": "markdown", + "id": "9f9ff786", + "metadata": {}, + "source": [ + "## Basic usage\n", + "\n", + "Defaults: `max_attempts_per_objective=3`, epsilon-greedy selector with `epsilon=0.2`,\n", + "the subclass's default datasets." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "33aa89d3", + "metadata": {}, + "outputs": [], + "source": [ + "scenario = TextAdaptive()\n", + "\n", + "await scenario.initialize_async( # type: ignore\n", + " objective_target=objective_target,\n", + ")\n", + "result = await scenario.run_async() # type: ignore\n", + "await printer.write_async(result) # type: ignore" + ] + }, + { + "cell_type": "markdown", + "id": "5083bbed", + "metadata": {}, + "source": [ + "## Configuring a run\n", + "\n", + "- **`max_attempts_per_objective`** — caps techniques tried per objective. Higher means\n", + " more chances to succeed and more API calls. Set via `set_params_from_args`.\n", + "- **`selector`** — a pre-built `TechniqueSelector` instance. Pass an\n", + " `EpsilonGreedyTechniqueSelector(epsilon=..., random_seed=...)`\n", + " to tune the selection algorithm. Defaults to an epsilon-greedy selector with\n", + " `epsilon=0.2`.\n", + "- **`scenario_strategies`** (on `initialize_async`) — restricts which techniques the\n", + " selector can pick from. Use `TextAdaptive.get_strategy_class()` to access the enum.\n", + "\n", + "The cell below exercises all of them at once." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db966395", + "metadata": {}, + "outputs": [], + "source": [ + "from pyrit.scenario.scenarios.adaptive import EpsilonGreedyTechniqueSelector\n", + "\n", + "strategy_class = TextAdaptive.get_strategy_class()\n", + "\n", + "configured_scenario = TextAdaptive(\n", + " selector=EpsilonGreedyTechniqueSelector(\n", + " epsilon=0.3,\n", + " random_seed=42,\n", + " ),\n", + ")\n", + "configured_scenario.set_params_from_args(args={\"max_attempts_per_objective\": 5})\n", + "\n", + "await configured_scenario.initialize_async( # type: ignore\n", + " objective_target=objective_target,\n", + " scenario_strategies=[strategy_class(\"single_turn\")],\n", + " dataset_config=DatasetConfiguration(\n", + " dataset_names=[\"airt_hate\", \"airt_violence\"],\n", + " max_dataset_size=4,\n", + " ),\n", + ")\n", + "configured_result = await configured_scenario.run_async() # type: ignore\n", + "await printer.write_async(configured_result) # type: ignore" + ] + }, + { + "cell_type": "markdown", + "id": "ba7e7126", + "metadata": {}, + "source": [ + "## Resuming a run\n", + "\n", + "Adaptive scenarios are resumable — pass `scenario_result_id=...` to the `TextAdaptive`\n", + "constructor and the run picks up where it left off. Resume must use the same\n", + "configuration as the original run." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4857bace", + "metadata": {}, + "outputs": [], + "source": [ + "resumed_scenario = TextAdaptive(\n", + " selector=EpsilonGreedyTechniqueSelector(\n", + " epsilon=0.3,\n", + " random_seed=42,\n", + " ),\n", + " scenario_result_id=str(configured_result.id),\n", + ")\n", + "resumed_scenario.set_params_from_args(args={\"max_attempts_per_objective\": 5})\n", + "\n", + "await resumed_scenario.initialize_async( # type: ignore\n", + " objective_target=objective_target,\n", + " scenario_strategies=[strategy_class(\"single_turn\")],\n", + " dataset_config=DatasetConfiguration(\n", + " dataset_names=[\"airt_hate\", \"airt_violence\"],\n", + " max_dataset_size=4,\n", + " ),\n", + ")\n", + "resumed_result = await resumed_scenario.run_async() # type: ignore\n", + "await printer.write_async(resumed_result) # type: ignore" + ] + }, + { + "cell_type": "markdown", + "id": "e267467c", + "metadata": {}, + "source": [ + "## Inspecting which techniques were tried\n", + "\n", + "The dispatcher stamps every objective's `AttackResult.metadata` with:\n", + "\n", + "- `adaptive_attempts` — the ordered list of `{\"technique\", \"outcome\"}` dicts\n", + " recording exactly which techniques the selector picked and what happened.\n", + "\n", + "Walk that metadata to see the per-objective trail and aggregate counts." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3a95436b", + "metadata": {}, + "outputs": [], + "source": [ + "from collections import Counter\n", + "\n", + "# Per-objective trail\n", + "for results in resumed_result.attack_results.values():\n", + " for r in results:\n", + " attempts = r.metadata.get(\"adaptive_attempts\", [])\n", + " trail = \" → \".join(f\"{a['technique']}({a['outcome']})\" for a in attempts)\n", + " print(f\"[{r.outcome.value:7s}] {r.objective!r}: {trail}\")\n", + "\n", + "# Aggregate per-technique pick counts and success rate across the run\n", + "picks: Counter[str] = Counter()\n", + "wins: Counter[str] = Counter()\n", + "for results in resumed_result.attack_results.values():\n", + " for r in results:\n", + " for step in r.metadata.get(\"adaptive_attempts\", []):\n", + " picks[step[\"technique\"]] += 1\n", + " if step[\"outcome\"] == \"success\":\n", + " wins[step[\"technique\"]] += 1\n", + "\n", + "print(\"\\nTechnique wins / picks rate\")\n", + "for technique, n in picks.most_common():\n", + " print(f\"{technique:20s} {wins[technique]:>4} / {n:<4} {wins[technique] / n:.0%}\")" + ] + }, + { + "cell_type": "markdown", + "id": "37cd0756", + "metadata": {}, + "source": [ + "## Running from the scanner CLI\n", + "\n", + "You can run `TextAdaptive` directly from the `pyrit_scan` CLI without writing Python:\n", + "\n", + "```bash\n", + "# Basic run with defaults\n", + "pyrit_scan --scenario TextAdaptive --target openai_chat\n", + "\n", + "# Tune max attempts and restrict strategies\n", + "pyrit_scan --scenario TextAdaptive --target openai_chat \\\n", + " --params max_attempts_per_objective=5 \\\n", + " --strategies single_turn\n", + "\n", + "# Use specific datasets and limit size\n", + "pyrit_scan --scenario TextAdaptive --target openai_chat \\\n", + " --datasets airt_hate airt_violence \\\n", + " --max-dataset-size 10\n", + "```" + ] + } + ], + "metadata": { + "jupytext": { + "main_language": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/code/scenarios/3_adaptive_scenarios.py b/doc/code/scenarios/3_adaptive_scenarios.py new file mode 100644 index 000000000..8826c4240 --- /dev/null +++ b/doc/code/scenarios/3_adaptive_scenarios.py @@ -0,0 +1,193 @@ +# --- +# jupyter: +# jupytext: +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.18.1 +# --- + +# %% [markdown] +# # Adaptive Scenarios +# +# An **adaptive scenario** doesn't run every attack technique against every objective. +# Instead, it picks which technique to try next per-objective, learns from what worked, +# and stops as soon as one technique succeeds. This concentrates spend on techniques +# that actually work on your target. +# +# ## How it works (high level) +# +# For each objective, the scenario tries up to `max_attempts_per_objective` techniques: +# +# - With probability `epsilon`, it **explores** — picks a random technique. +# - Otherwise it **exploits** — picks the technique with the highest observed success +# rate so far. +# - It records the outcome and stops early on success. +# +# Unseen techniques are tried first, so the first few objectives effectively round-robin +# through every technique before the scenario settles on the best performers. +# +# ## Adaptive vs. static scenarios +# +# | Feature | Static scenarios | Adaptive scenarios | +# |---------------------|-----------------------------------|------------------------------------| +# | Technique selection | Run every selected technique | Pick per-objective from outcomes | +# | Early stopping | No | Yes — stops on first success | +# | Cost | O(techniques × objectives) | O(max_attempts × objectives) | +# +# `AdaptiveScenario` is the modality-agnostic base class. +# [`TextAdaptive`](../../../pyrit/scenario/scenarios/adaptive/text_adaptive.py) is the +# text subclass used in the examples below. + +# %% [markdown] +# ## Setup + +# %% +from pathlib import Path + +from pyrit.registry import TargetRegistry +from pyrit.scenario import DatasetConfiguration +from pyrit.scenario.printer.console_printer import ConsoleScenarioResultPrinter +from pyrit.scenario.scenarios.adaptive import TextAdaptive +from pyrit.setup import initialize_from_config_async + +await initialize_from_config_async(config_path=Path("../../scanner/pyrit_conf.yaml")) # type: ignore + +objective_target = TargetRegistry.get_registry_singleton().get_instance_by_name("openai_chat") +printer = ConsoleScenarioResultPrinter() + +# %% [markdown] +# ## Basic usage +# +# Defaults: `max_attempts_per_objective=3`, epsilon-greedy selector with `epsilon=0.2`, +# the subclass's default datasets. + +# %% +scenario = TextAdaptive() + +await scenario.initialize_async( # type: ignore + objective_target=objective_target, +) +result = await scenario.run_async() # type: ignore +await printer.write_async(result) # type: ignore + +# %% [markdown] +# ## Configuring a run +# +# - **`max_attempts_per_objective`** — caps techniques tried per objective. Higher means +# more chances to succeed and more API calls. Set via `set_params_from_args`. +# - **`selector`** — a pre-built `TechniqueSelector` instance. Pass an +# `EpsilonGreedyTechniqueSelector(epsilon=..., random_seed=...)` +# to tune the selection algorithm. Defaults to an epsilon-greedy selector with +# `epsilon=0.2`. +# - **`scenario_strategies`** (on `initialize_async`) — restricts which techniques the +# selector can pick from. Use `TextAdaptive.get_strategy_class()` to access the enum. +# +# The cell below exercises all of them at once. + +# %% +from pyrit.scenario.scenarios.adaptive import EpsilonGreedyTechniqueSelector + +strategy_class = TextAdaptive.get_strategy_class() + +configured_scenario = TextAdaptive( + selector=EpsilonGreedyTechniqueSelector( + epsilon=0.3, + random_seed=42, + ), +) +configured_scenario.set_params_from_args(args={"max_attempts_per_objective": 5}) + +await configured_scenario.initialize_async( # type: ignore + objective_target=objective_target, + scenario_strategies=[strategy_class("single_turn")], + dataset_config=DatasetConfiguration( + dataset_names=["airt_hate", "airt_violence"], + max_dataset_size=4, + ), +) +configured_result = await configured_scenario.run_async() # type: ignore +await printer.write_async(configured_result) # type: ignore + +# %% [markdown] +# ## Resuming a run +# +# Adaptive scenarios are resumable — pass `scenario_result_id=...` to the `TextAdaptive` +# constructor and the run picks up where it left off. Resume must use the same +# configuration as the original run. + +# %% +resumed_scenario = TextAdaptive( + selector=EpsilonGreedyTechniqueSelector( + epsilon=0.3, + random_seed=42, + ), + scenario_result_id=str(configured_result.id), +) +resumed_scenario.set_params_from_args(args={"max_attempts_per_objective": 5}) + +await resumed_scenario.initialize_async( # type: ignore + objective_target=objective_target, + scenario_strategies=[strategy_class("single_turn")], + dataset_config=DatasetConfiguration( + dataset_names=["airt_hate", "airt_violence"], + max_dataset_size=4, + ), +) +resumed_result = await resumed_scenario.run_async() # type: ignore +await printer.write_async(resumed_result) # type: ignore + +# %% [markdown] +# ## Inspecting which techniques were tried +# +# The dispatcher stamps every objective's `AttackResult.metadata` with: +# +# - `adaptive_attempts` — the ordered list of `{"technique", "outcome"}` dicts +# recording exactly which techniques the selector picked and what happened. +# +# Walk that metadata to see the per-objective trail and aggregate counts. + +# %% +from collections import Counter + +# Per-objective trail +for results in resumed_result.attack_results.values(): + for r in results: + attempts = r.metadata.get("adaptive_attempts", []) + trail = " → ".join(f"{a['technique']}({a['outcome']})" for a in attempts) + print(f"[{r.outcome.value:7s}] {r.objective!r}: {trail}") + +# Aggregate per-technique pick counts and success rate across the run +picks: Counter[str] = Counter() +wins: Counter[str] = Counter() +for results in resumed_result.attack_results.values(): + for r in results: + for step in r.metadata.get("adaptive_attempts", []): + picks[step["technique"]] += 1 + if step["outcome"] == "success": + wins[step["technique"]] += 1 + +print("\nTechnique wins / picks rate") +for technique, n in picks.most_common(): + print(f"{technique:20s} {wins[technique]:>4} / {n:<4} {wins[technique] / n:.0%}") + +# %% [markdown] +# ## Running from the scanner CLI +# +# You can run `TextAdaptive` directly from the `pyrit_scan` CLI without writing Python: +# +# ```bash +# # Basic run with defaults +# pyrit_scan --scenario TextAdaptive --target openai_chat +# +# # Tune max attempts and restrict strategies +# pyrit_scan --scenario TextAdaptive --target openai_chat \ +# --params max_attempts_per_objective=5 \ +# --strategies single_turn +# +# # Use specific datasets and limit size +# pyrit_scan --scenario TextAdaptive --target openai_chat \ +# --datasets airt_hate airt_violence \ +# --max-dataset-size 10 +# ``` diff --git a/doc/myst.yml b/doc/myst.yml index d055e0c7b..604b05bde 100644 --- a/doc/myst.yml +++ b/doc/myst.yml @@ -194,6 +194,7 @@ project: children: - file: code/scenarios/1_common_scenario_parameters.ipynb - file: code/scenarios/2_custom_scenario_parameters.ipynb + - file: code/scenarios/3_adaptive_scenarios.ipynb - file: code/registry/0_registry.md children: - file: code/registry/1_class_registry.ipynb diff --git a/pyrit/analytics/scenario_analysis.py b/pyrit/analytics/scenario_analysis.py new file mode 100644 index 000000000..0903231f8 --- /dev/null +++ b/pyrit/analytics/scenario_analysis.py @@ -0,0 +1,67 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Scenario-level analytics: technique success rates and related helpers.""" + +from __future__ import annotations + +from collections.abc import Sequence + +from pyrit.analytics.result_analysis import AttackStats, _compute_stats +from pyrit.memory import CentralMemory +from pyrit.models import AttackOutcome + + +def compute_technique_success_rates( + *, + technique_hashes: Sequence[str], + label_key: str, + scenario_result_id: str | None = None, +) -> dict[str, AttackStats]: + """ + Query memory for historical success rates grouped by technique eval hash. + + Fetches all ``AttackResult`` rows whose memory labels contain + ``label_key`` matching one of ``technique_hashes``, then aggregates + outcomes into per-technique :class:`AttackStats`. + + By default queries across all scenario runs. Pass ``scenario_result_id`` + to restrict to a single run. + + Args: + technique_hashes (Sequence[str]): Technique eval hashes to query. + label_key (str): Memory-label key that stores the technique hash. + scenario_result_id (str | None): If provided, restrict results to + a single scenario run. Defaults to ``None`` (all runs). + + Returns: + dict[str, AttackStats]: Stats per technique hash. Techniques with + no history are omitted from the result. + """ + + memory = CentralMemory.get_memory_instance() + results = memory.get_attack_results( + labels={label_key: list(technique_hashes)}, + scenario_result_id=scenario_result_id, + ) + + counts: dict[str, tuple[int, int, int, int]] = {} + for result in results: + technique = result.labels.get(label_key) + if not technique or technique not in technique_hashes: + continue + + s, f, u, e = counts.get(technique, (0, 0, 0, 0)) + if result.outcome == AttackOutcome.SUCCESS: + counts[technique] = (s + 1, f, u, e) + elif result.outcome == AttackOutcome.FAILURE: + counts[technique] = (s, f + 1, u, e) + elif result.outcome == AttackOutcome.ERROR: + counts[technique] = (s, f, u, e + 1) + else: + counts[technique] = (s, f, u + 1, e) + + stats: dict[str, AttackStats] = {} + for technique, (s, f, u, e) in counts.items(): + stats[technique] = _compute_stats(successes=s, failures=f, undetermined=u, errors=e) + return stats diff --git a/pyrit/scenario/__init__.py b/pyrit/scenario/__init__.py index 02d725c58..1be2caea5 100644 --- a/pyrit/scenario/__init__.py +++ b/pyrit/scenario/__init__.py @@ -31,17 +31,20 @@ # Import scenario submodules directly and register them as virtual subpackages # This allows: from pyrit.scenario.airt import ContentHarms # without needing separate pyrit/scenario/airt/ directories +from pyrit.scenario.scenarios import adaptive as _adaptive_module from pyrit.scenario.scenarios import airt as _airt_module from pyrit.scenario.scenarios import benchmark as _benchmark_module from pyrit.scenario.scenarios import foundry as _foundry_module from pyrit.scenario.scenarios import garak as _garak_module +sys.modules["pyrit.scenario.adaptive"] = _adaptive_module sys.modules["pyrit.scenario.airt"] = _airt_module sys.modules["pyrit.scenario.benchmark"] = _benchmark_module sys.modules["pyrit.scenario.garak"] = _garak_module sys.modules["pyrit.scenario.foundry"] = _foundry_module # Also expose as attributes for IDE support +adaptive = _adaptive_module airt = _airt_module benchmark = _benchmark_module garak = _garak_module @@ -59,6 +62,7 @@ "ScenarioStrategy", "ScenarioIdentifier", "ScenarioResult", + "adaptive", "airt", "benchmark", "garak", diff --git a/pyrit/scenario/scenarios/adaptive/__init__.py b/pyrit/scenario/scenarios/adaptive/__init__.py new file mode 100644 index 000000000..440e43a86 --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Adaptive scenario classes.""" + +from pyrit.scenario.scenarios.adaptive.adaptive_scenario import AdaptiveScenario +from pyrit.scenario.scenarios.adaptive.dispatcher import ( + AdaptiveDispatchAttack, + AdaptiveDispatchParams, +) +from pyrit.scenario.scenarios.adaptive.selectors import ( + EpsilonGreedyTechniqueSelector, + SelectorScope, + TechniqueSelector, +) +from pyrit.scenario.scenarios.adaptive.text_adaptive import TextAdaptive + +__all__ = [ + "AdaptiveDispatchAttack", + "AdaptiveDispatchParams", + "AdaptiveScenario", + "EpsilonGreedyTechniqueSelector", + "SelectorScope", + "TechniqueSelector", + "TextAdaptive", +] diff --git a/pyrit/scenario/scenarios/adaptive/adaptive_scenario.py b/pyrit/scenario/scenarios/adaptive/adaptive_scenario.py new file mode 100644 index 000000000..15d5fa6ad --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/adaptive_scenario.py @@ -0,0 +1,254 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +``AdaptiveScenario`` — modality-agnostic base for scenarios that pick attack +techniques per-objective using a ``TechniqueSelector``. + +Owns selector wiring, dispatcher construction, and per-dataset atomic-attack +emission. Concrete subclasses (``TextAdaptive``, future ``ImageAdaptive`` / +``AudioAdaptive``) only declare strategy class, default datasets, version, +and atomic-attack prefix. + +Baseline policy is ``Enabled``: prompt_sending runs as a separate baseline +comparison and is excluded from the adaptive technique pool. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar + +from pyrit.executor.attack import AttackScoringConfig +from pyrit.scenario.core.atomic_attack import AtomicAttack +from pyrit.scenario.core.attack_technique import AttackTechnique +from pyrit.scenario.core.scenario import Scenario +from pyrit.scenario.scenarios.adaptive.dispatcher import ( + AdaptiveDispatchAttack, + TechniqueBundle, +) +from pyrit.scenario.scenarios.adaptive.selectors import ( + EpsilonGreedyTechniqueSelector, + TechniqueSelector, +) + +if TYPE_CHECKING: + from pyrit.models import SeedAttackGroup + from pyrit.prompt_target import PromptTarget + from pyrit.score import TrueFalseScorer + +logger = logging.getLogger(__name__) + + +class AdaptiveScenario(Scenario): + """ + Abstract base for adaptive (epsilon-greedy) scenarios. + + Subclasses must implement the standard ``Scenario`` class-method overrides + and declare ``_atomic_attack_prefix``. Selector wiring + and dispatcher construction are handled here. + """ + + #: Scenario version for memory bookkeeping. + VERSION: ClassVar[int] = 1 + + #: Prefix for per-objective atomic-attack names (e.g. ``"adaptive_text"``). + _atomic_attack_prefix: ClassVar[str] = "adaptive" + + def __init__( + self, + *, + objective_scorer: TrueFalseScorer | None = None, + selector: TechniqueSelector | None = None, + scenario_result_id: str | None = None, + ) -> None: + """ + Args: + objective_scorer (TrueFalseScorer | None): Scorer used to judge each + response. Defaults to the composite scorer from the base class. + selector (TechniqueSelector | None): Pre-built selector. When ``None`` + (default) an :class:`EpsilonGreedyTechniqueSelector` is created + with default settings. + scenario_result_id (str | None): ID of an existing ``ScenarioResult`` to resume. + """ + if not objective_scorer: + objective_scorer = self._get_default_objective_scorer() + self._objective_scorer: TrueFalseScorer = objective_scorer + + self._custom_selector = selector + + super().__init__( + version=self.VERSION, + strategy_class=self.get_strategy_class(), + objective_scorer=objective_scorer, + scenario_result_id=scenario_result_id, + ) + + async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: + """ + Build one ``AtomicAttack`` per dataset, each carrying every objective + in that dataset as a separate ``SeedAttackGroup``. + + A single ``AdaptiveDispatchAttack`` is constructed per dataset and + shared across its seed groups; per-call seed-group routing and + per-call ``seed_technique`` compatibility filtering happen inside the + dispatcher (driven by ``AdaptiveDispatchParams.seed_group``). All + dispatchers across all datasets share one ``TechniqueSelector`` + instance so learning accumulates globally. + + Returns: + list[AtomicAttack]: One ``AtomicAttack`` per dataset that has at + least one compatible seed group. + + Raises: + ValueError: If ``self._objective_target`` is not set, or if + ``_build_techniques_dict`` finds no usable techniques. + """ + if self._objective_target is None: + raise ValueError("objective_target must be set before creating attacks") + + techniques = self._build_techniques_dict(objective_target=self._objective_target) + + selector: TechniqueSelector = ( + self._custom_selector + if self._custom_selector is not None + else EpsilonGreedyTechniqueSelector() + ) + + seed_groups_by_dataset = self._dataset_config.get_seed_attack_groups() + atomic_attacks: list[AtomicAttack] = [] + for dataset_name, seed_groups in seed_groups_by_dataset.items(): + atomic = self._build_atomic_for_dataset( + dataset_name=dataset_name, + seed_groups=seed_groups, + techniques=techniques, + selector=selector, + ) + if atomic is not None: + atomic_attacks.append(atomic) + + return atomic_attacks + + def _build_techniques_dict( + self, + *, + objective_target: PromptTarget, + ) -> dict[str, TechniqueBundle]: + """ + Resolve selected strategies into a ``{eval_hash: TechniqueBundle}`` map. + + Each bundle carries the inner attack strategy along with the factory's + ``seed_technique`` and ``adversarial_chat`` so the dispatcher can + reproduce the static ``AtomicAttack`` execution path per attempt. + + Technique keys are eval hashes derived from the ``AttackTechnique`` + identity (strategy + seed_technique configuration). This allows the + selector and analytics to track techniques by their behavioral + configuration rather than by name alone. + + Returns: + dict[str, TechniqueBundle]: Mapping from technique eval hash to its + bundle, in the order selected strategies were resolved. + + Raises: + ValueError: If no techniques remain after filtering. Includes the + requested techniques and skip reasons. + """ + selected_techniques = sorted({s.value for s in self._scenario_strategies}) + factories = self._get_attack_technique_factories() + scoring_config = AttackScoringConfig(objective_scorer=self._objective_scorer) + + techniques: dict[str, TechniqueBundle] = {} + skipped_no_factory: list[str] = [] + for technique_name in selected_techniques: + factory = factories.get(technique_name) + if factory is None: + skipped_no_factory.append(technique_name) + logger.warning(f"No factory for technique '{technique_name}', skipping.") + continue + technique = factory.create( + objective_target=objective_target, + attack_scoring_config=scoring_config, + ) + eval_hash = technique.get_identifier().hash + techniques[eval_hash] = TechniqueBundle( + attack=technique.attack, + name=technique_name, + seed_technique=technique.seed_technique, + adversarial_chat=factory.adversarial_chat, + ) + + if not techniques: + suffix = f" (skipped, no factory registered: {sorted(skipped_no_factory)})" if skipped_no_factory else "" + raise ValueError( + f"{type(self).__name__}: no usable techniques after resolving strategies. " + f"Check the --strategies selection.{suffix}" + ) + + return techniques + + def _build_atomic_for_dataset( + self, + *, + dataset_name: str, + seed_groups: list[SeedAttackGroup], + techniques: dict[str, TechniqueBundle], + selector: TechniqueSelector, + ) -> AtomicAttack | None: + """ + Build a single ``AtomicAttack`` for one dataset with all compatible + seed groups attached. + + Seed groups for which no technique in the pool is compatible are + dropped here with a warning so the dispatcher's per-call compatible + pool is guaranteed non-empty. + + Returns: + AtomicAttack | None: The constructed atomic attack, or ``None`` when + every seed group is incompatible with every technique. + + Raises: + ValueError: If ``self._objective_target`` is not set (defensive + guard; ``_get_atomic_attacks_async`` enforces this earlier). + """ + if self._objective_target is None: # pragma: no cover - defensive + raise ValueError("objective_target must be set before creating attacks") + + compatible_seed_groups: list[SeedAttackGroup] = [] + for seed_group in seed_groups: + has_compatible = any( + bundle.seed_technique is None + or seed_group.is_compatible_with_technique(technique=bundle.seed_technique) + for bundle in techniques.values() + ) + if has_compatible: + compatible_seed_groups.append(seed_group) + else: + logger.warning( + "AdaptiveScenario: no compatible techniques for seed group in dataset '%s' " + "(objective=%r); skipping.", + dataset_name, + seed_group.objective.value, + ) + + if not compatible_seed_groups: + return None + + dispatcher = AdaptiveDispatchAttack( + objective_target=self._objective_target, + techniques=techniques, + selector=selector, + objective_scorer=self._objective_scorer, + max_attempts_per_objective=self.params["max_attempts_per_objective"], + scenario_result_id=self._scenario_result_id, + ) + + return AtomicAttack( + atomic_attack_name=f"{self._atomic_attack_prefix}_{dataset_name}", + attack_technique=AttackTechnique(attack=dispatcher), + seed_groups=compatible_seed_groups, + objective_scorer=self._objective_scorer, + memory_labels=dict(self._memory_labels), + display_group=dataset_name, + ) + diff --git a/pyrit/scenario/scenarios/adaptive/dispatcher.py b/pyrit/scenario/scenarios/adaptive/dispatcher.py new file mode 100644 index 000000000..6a49cacaf --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/dispatcher.py @@ -0,0 +1,339 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +``AdaptiveDispatchAttack`` — picks inner techniques per objective via a +``TechniqueSelector``, runs them in priority order, and stops on success. + +The selector is stateless and async: it queries memory for historical +success rates. The dispatcher pre-selects up to ``max_attempts_per_objective`` +techniques at the start of each objective, then iterates through them. +""" + +from __future__ import annotations + +import dataclasses +import logging +import uuid +from dataclasses import dataclass, field, replace +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any, Optional + +from pyrit.executor.attack.core.attack_executor import AttackExecutor +from pyrit.executor.attack.core.attack_parameters import AttackParameters +from pyrit.executor.attack.core.attack_strategy import AttackContext, AttackStrategy +from pyrit.models import AttackOutcome, AttackResult, SeedAttackGroup +from pyrit.scenario.scenarios.adaptive.selectors import ( + TechniqueSelector, +) +from pyrit.scenario.scenarios.adaptive.selectors.technique_selector import ADAPTIVE_TECHNIQUE_LABEL + +if TYPE_CHECKING: + from pyrit.models import SeedAttackTechniqueGroup + from pyrit.prompt_target import PromptTarget + from pyrit.score import TrueFalseScorer + +logger = logging.getLogger(__name__) + + +# Memory-label keys stamped onto persisted prompt rows so adaptive attempts +# can be filtered/grouped after a run. +ADAPTIVE_ATTEMPT_LABEL: str = "_adaptive_attempt" +"""1-based attempt index within the per-objective loop.""" + + +@dataclass(frozen=True) +class TechniqueBundle: + """ + Per-technique bundle consumed by the dispatcher. + + Carries the inner attack strategy alongside the factory-supplied + ``seed_technique`` (if any) and ``adversarial_chat`` (required when the + seed_technique contains a simulated-conversation config). + """ + + attack: AttackStrategy[Any, AttackResult] + name: str = "" + seed_technique: SeedAttackTechniqueGroup | None = None + adversarial_chat: PromptTarget | None = None + + +@dataclass(frozen=True) +class AdaptiveDispatchParams(AttackParameters): + """Attack parameters for adaptive dispatch, carrying the original seed group.""" + + # The original SeedAttackGroup is preserved on the params so the + # dispatcher can apply per-attempt seed_technique merging and derive + # the per-call adaptive context. Captured by ``from_seed_group_async``; + # not user-supplied via overrides. + seed_group: Optional[SeedAttackGroup] = field(default=None, repr=False, compare=False) + + @classmethod + async def from_seed_group_async( + cls, + *, + seed_group: SeedAttackGroup, + adversarial_chat: Optional[PromptTarget] = None, # noqa: ARG003 — required by base class signature + objective_scorer: Optional[TrueFalseScorer] = None, # noqa: ARG003 — required by base class signature + **overrides: Any, + ) -> AdaptiveDispatchParams: + """ + Build params for a single dispatch and capture the original seed_group. + + The dispatcher applies seed_technique merging itself per-attempt, so + we deliberately bypass the base class's simulated-conversation + expansion / next_message extraction: the inner technique runs through + its own ``execute_attack_from_seed_groups_async`` call which performs + that work using the technique-merged seed_group. + + Returns: + AdaptiveDispatchParams: The constructed parameters with the seed group attached. + + Raises: + ValueError: If the seed_group's objective is not initialized or invalid overrides are passed. + """ + if seed_group.objective is None: + raise ValueError("seed_group.objective is not initialized") + seed_group.validate() + + valid_fields = {f.name for f in dataclasses.fields(cls)} - {"seed_group"} + invalid = set(overrides.keys()) - valid_fields + if invalid: + raise ValueError(f"{cls.__name__} does not accept parameters: {invalid}. Accepted: {valid_fields}") + + return cls( + objective=seed_group.objective.value, + memory_labels=overrides.get("memory_labels") or {}, + seed_group=seed_group, + ) + + +@dataclass +class AdaptiveDispatchContext(AttackContext[AdaptiveDispatchParams]): + """Execution context for ``AdaptiveDispatchAttack`` (no extra state).""" + + +class AdaptiveDispatchAttack(AttackStrategy[AdaptiveDispatchContext, AttackResult]): + """ + Attack that delegates each attempt to one of several inner techniques, + choosing per attempt via a ``TechniqueSelector``. + + For each objective, loops up to ``max_attempts_per_objective`` times: + ask the selector, execute the chosen technique against the current seed + group, record the outcome, and stop early on success. The selector is + shared by reference across all dispatch calls in a scenario so learning + accumulates across objectives. + + The seed group for a given dispatch is read from + ``context.params.seed_group`` (captured by + ``AdaptiveDispatchParams.from_seed_group_async``). When a chosen + technique declares a ``seed_technique``, that group is merged into the + seed group before execution (mirroring the static ``AtomicAttack`` path). + Techniques whose ``seed_technique`` is incompatible with the current + seed group are filtered out of the candidate pool for that call; if the + pool is empty the dispatcher raises so the per-call seed group is dropped + by the executor's partial-failure path rather than silently no-op'ing. + + On success, the dispatcher returns a fresh ``AttackResult`` copy of the + winning inner result (new ``attack_result_id`` and ``timestamp``) with + the dispatch trail stamped onto ``metadata``. The inner result has + already been persisted by its own post-execute hook, so two rows are + written per successful objective sharing the same ``conversation_id``: + the inner row carries the raw outcome, the outer row carries the + adaptive trail. + """ + + def __init__( + self, + *, + objective_target: PromptTarget, + techniques: dict[str, TechniqueBundle], + selector: TechniqueSelector, + objective_scorer: TrueFalseScorer | None = None, + max_attempts_per_objective: int = 3, + scenario_result_id: str | None = None, + ) -> None: + """ + Args: + objective_target (PromptTarget): The target inner attacks run against. + techniques (dict[str, TechniqueBundle]): Mapping from technique eval hash to + its bundle (attack, name, seed_technique, adversarial_chat). Must be non-empty. + selector (TechniqueSelector): Stateless technique selector. + objective_scorer (TrueFalseScorer | None): Scorer passed through to + techniques that generate simulated conversations. + max_attempts_per_objective (int): Max attempts per objective; >= 1. + Defaults to 3. + scenario_result_id (str | None): If provided, passed to the selector + to scope memory queries to this scenario run. Defaults to ``None``. + + Raises: + ValueError: If ``techniques`` is empty or ``max_attempts_per_objective`` < 1. + """ + if not techniques: + raise ValueError("techniques must contain at least one attack technique") + if max_attempts_per_objective < 1: + raise ValueError(f"max_attempts_per_objective must be >= 1, got {max_attempts_per_objective}") + + super().__init__( + objective_target=objective_target, + context_type=AdaptiveDispatchContext, + params_type=AdaptiveDispatchParams, + logger=logger, + ) + self._techniques = techniques + self._selector = selector + self._objective_scorer = objective_scorer + self._max_attempts = max_attempts_per_objective + self._scenario_result_id = scenario_result_id + self._executor = AttackExecutor(max_concurrency=1) + + def _validate_context(self, *, context: AdaptiveDispatchContext) -> None: + """ + Ensure the context carries a non-empty objective string. + + Raises: + ValueError: If ``context.objective`` is empty or whitespace-only. + """ + if not context.objective or context.objective.isspace(): + raise ValueError("Attack objective must be provided and non-empty") + + async def _setup_async(self, *, context: AdaptiveDispatchContext) -> None: + """No-op: per-attempt setup is owned by the inner technique's executor.""" + + async def _teardown_async(self, *, context: AdaptiveDispatchContext) -> None: + """No-op: per-attempt teardown is owned by the inner technique's executor.""" + + async def _run_inner_attack_async( + self, + *, + bundle: TechniqueBundle, + seed_group: SeedAttackGroup, + attempt_labels: dict[str, str], + ) -> AttackResult: + """ + Execute the chosen technique against the per-call seed group. + + Merges ``bundle.seed_technique`` into ``seed_group`` (when present) + and delegates execution to ``AttackExecutor``. Isolated as a method + so tests can patch the inner-attack call surface. + + Args: + bundle (TechniqueBundle): The chosen technique's attack + seeds + chat. + seed_group (SeedAttackGroup): The seed group for this dispatch call. + attempt_labels (dict[str, str]): Memory labels stamped onto this attempt. + + Returns: + AttackResult: The single result produced for this attempt. + + Raises: + RuntimeError: If the executor returned no completed results and no + propagated exception (should be unreachable). + """ + if bundle.seed_technique is not None: + execution_group = seed_group.with_technique(technique=bundle.seed_technique) + else: + execution_group = seed_group + + executor_result = await self._executor.execute_attack_from_seed_groups_async( + attack=bundle.attack, + seed_groups=[execution_group], + adversarial_chat=bundle.adversarial_chat, + objective_scorer=self._objective_scorer, + memory_labels=attempt_labels, + ) + + if executor_result.completed_results: + return executor_result.completed_results[0] + if executor_result.incomplete_objectives: + raise executor_result.incomplete_objectives[0][1] + raise RuntimeError( # pragma: no cover - defensive + "AttackExecutor returned neither completed nor incomplete results." + ) + + async def _perform_async(self, *, context: AdaptiveDispatchContext) -> AttackResult: + """ + Run the per-objective adaptive loop. + + Pre-selects up to ``max_attempts_per_objective`` techniques via the + stateless selector, then iterates in priority order. Stops early on + success. + + Args: + context (AdaptiveDispatchContext): Execution context whose + ``params.seed_group`` carries the seed group for this call. + + Returns: + AttackResult: A fresh dispatcher-owned copy of the final inner + result with the dispatch trail stamped onto ``metadata``. + + Raises: + ValueError: If ``context.params.seed_group`` is missing, or if no + techniques in the pool are compatible with the seed group. + RuntimeError: If the loop somehow ran zero attempts (unreachable + because ``max_attempts_per_objective`` is validated >= 1). + """ + seed_group = context.params.seed_group + if seed_group is None: + raise ValueError( + "AdaptiveDispatchAttack requires AdaptiveDispatchParams.seed_group; " + "build params via AdaptiveDispatchParams.from_seed_group_async." + ) + + compatible_names = [ + name + for name, bundle in self._techniques.items() + if bundle.seed_technique is None or seed_group.is_compatible_with_technique(technique=bundle.seed_technique) + ] + if not compatible_names: + raise ValueError( + f"AdaptiveDispatchAttack: no compatible techniques for seed group " + f"(objective={seed_group.objective.value!r})." + ) + + chosen_techniques = await self._selector.select_async( + technique_identifiers=compatible_names, + objective=context.objective, + num_top_techniques=self._max_attempts, + scenario_result_id=self._scenario_result_id, + ) + + last_result: AttackResult | None = None + trail: list[dict[str, str]] = [] + + for attempt_idx, chosen in enumerate(chosen_techniques): + bundle = self._techniques[chosen] + attempt_labels = { + **context.memory_labels, + ADAPTIVE_TECHNIQUE_LABEL: chosen, + ADAPTIVE_ATTEMPT_LABEL: str(attempt_idx + 1), + } + + logger.debug( + "AdaptiveDispatchAttack: attempt %d/%d technique=%r (hash=%s)", + attempt_idx + 1, + len(chosen_techniques), + bundle.name, + chosen, + ) + + result = await self._run_inner_attack_async( + bundle=bundle, seed_group=seed_group, attempt_labels=attempt_labels + ) + + trail.append({"technique": bundle.name, "technique_hash": chosen, "outcome": result.outcome.value}) + last_result = result + + if result.outcome == AttackOutcome.SUCCESS: + break + + if last_result is None: # pragma: no cover - defensive + raise RuntimeError("AdaptiveDispatchAttack ran zero attempts; this should be unreachable.") + return replace( + last_result, + attack_result_id=str(uuid.uuid4()), + timestamp=datetime.now(timezone.utc), + metadata={ + **last_result.metadata, + "adaptive_attempts": trail, + }, + ) diff --git a/pyrit/scenario/scenarios/adaptive/selectors/__init__.py b/pyrit/scenario/scenarios/adaptive/selectors/__init__.py new file mode 100644 index 000000000..9fe4c2d3c --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/selectors/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Selector protocol and selector implementations.""" + +from pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy import ( + EpsilonGreedyTechniqueSelector, +) +from pyrit.scenario.scenarios.adaptive.selectors.technique_selector import ( + ADAPTIVE_TECHNIQUE_LABEL, + SelectorScope, + TechniqueSelector, +) + +__all__ = [ + "ADAPTIVE_TECHNIQUE_LABEL", + "EpsilonGreedyTechniqueSelector", + "SelectorScope", + "TechniqueSelector", +] diff --git a/pyrit/scenario/scenarios/adaptive/selectors/epsilon_greedy.py b/pyrit/scenario/scenarios/adaptive/selectors/epsilon_greedy.py new file mode 100644 index 000000000..662ff3cfb --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/selectors/epsilon_greedy.py @@ -0,0 +1,158 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Epsilon-greedy technique selector for adaptive scenarios.""" + +from __future__ import annotations + +import hashlib +import logging +import random +import struct +from collections.abc import Sequence + +from pyrit.analytics.result_analysis import AttackStats +from pyrit.analytics.scenario_analysis import compute_technique_success_rates +from pyrit.scenario.scenarios.adaptive.selectors.technique_selector import ADAPTIVE_TECHNIQUE_LABEL, SelectorScope + +logger = logging.getLogger(__name__) + + +def _derive_rng(random_seed: int | None, decision_key: str) -> random.Random: + """ + Derive a per-decision ``Random`` from ``(random_seed, decision_key)``. + + Returns: + random.Random: A fresh ``random.Random`` seeded deterministically from the + inputs when ``random_seed`` is not None, or an unseeded ``Random`` otherwise. + """ + if random_seed is None: + return random.Random() + digest = hashlib.sha256(f"{random_seed}|{decision_key}".encode()).digest() + derived_seed = struct.unpack(" None: + """ + Args: + epsilon (float): Exploration probability in [0.0, 1.0]. Defaults to 0.2. + scope (SelectorScope): Whether to use all historical data or only + the current scenario run. Defaults to ``SelectorScope.ALL_RUNS``. + random_seed (int | None): Base seed for deterministic per-decision RNG + derivation. Defaults to ``None`` (non-deterministic). + + Raises: + ValueError: If ``epsilon`` is outside [0.0, 1.0]. + """ + if not 0.0 <= epsilon <= 1.0: + raise ValueError(f"epsilon must be in [0.0, 1.0], got {epsilon}") + + self._epsilon = epsilon + self._scope = scope + self._seed = random_seed + + async def select_async( + self, + *, + technique_identifiers: Sequence[str], + objective: str, + num_top_techniques: int = 1, + scenario_result_id: str | None = None, + ) -> Sequence[str]: + """ + Return up to ``num_top_techniques`` techniques in priority order. + + Args: + technique_identifiers (Sequence[str]): Available technique names. + objective (str): The objective text for scoping the per-decision RNG. + num_top_techniques (int): Max techniques to return. Defaults to 1. + scenario_result_id (str | None): If provided, restrict memory + queries to this scenario run. Defaults to ``None`` (all runs). + + Returns: + Sequence[str]: Techniques in priority order. Fewer than + ``num_top_techniques`` if not enough techniques are available. + + Raises: + ValueError: If ``technique_identifiers`` is empty. + """ + technique_list = list(technique_identifiers) + if not technique_list: + raise ValueError("technique_identifiers must contain at least one entry") + + num_top_techniques = min(num_top_techniques, len(technique_list)) + + decision_key = objective + rng = _derive_rng(self._seed, decision_key) + + stats = compute_technique_success_rates( + technique_hashes=technique_list, + label_key=ADAPTIVE_TECHNIQUE_LABEL, + scenario_result_id=scenario_result_id if self._scope == SelectorScope.CURRENT_RUN else None, + ) + + chosen: list[str] = [] + remaining = list(technique_list) + + for _ in range(num_top_techniques): + if not remaining: + break + + if rng.random() < self._epsilon: + pick = rng.choice(remaining) + else: + estimates = { + t: self._estimate(technique=t, stats=stats) for t in remaining + } + best = max(estimates.values()) + winners = [t for t, v in estimates.items() if v >= best - self._TIE_TOL] + pick = rng.choice(winners) + + chosen.append(pick) + remaining.remove(pick) + + return chosen + + @staticmethod + def _estimate(*, technique: str, stats: dict[str, AttackStats]) -> float: + """ + Laplace-smoothed success-rate estimate for a technique. + + Unseen techniques get ``(0 + 1) / (0 + 1) = 1.0`` (optimistic init). + + Args: + technique (str): The technique name. + stats (dict[str, AttackStats]): Pre-computed stats from memory. + + Returns: + float: Estimated success rate in ``(0, 1]``. + """ + technique_stats = stats.get(technique) + if technique_stats is None or technique_stats.total_decided == 0: + return 1.0 + return (technique_stats.successes + 1) / (technique_stats.total_decided + 1) diff --git a/pyrit/scenario/scenarios/adaptive/selectors/technique_selector.py b/pyrit/scenario/scenarios/adaptive/selectors/technique_selector.py new file mode 100644 index 000000000..d87adb1a2 --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/selectors/technique_selector.py @@ -0,0 +1,64 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Technique selector protocol for adaptive scenarios.""" + +from __future__ import annotations + +from collections.abc import Sequence +from enum import Enum +from typing import Protocol, runtime_checkable + + +# TODO: probably want to expand this to allow for more filtering options +# (e.g. filter by scenario parameters, attack labels, etc.) +class SelectorScope(str, Enum): + """Controls which historical data a selector queries.""" + + ALL_RUNS = "all_runs" + """Use technique success rates from all historical scenario runs.""" + + CURRENT_RUN = "current_run" + """Use technique success rates only from the current scenario run.""" + + +ADAPTIVE_TECHNIQUE_LABEL: str = "_adaptive_technique" +"""Memory-label key the dispatcher stamps on each attack result to record +which technique was used.""" + + +@runtime_checkable +class TechniqueSelector(Protocol): + """ + Protocol for adaptive technique selectors. + + Selectors are **stateless** — they query memory for historical success + rates rather than maintaining internal counts. Calling ``select_async`` + with the same arguments twice should yield the same answer + (deterministic given memory contents). + """ + + async def select_async( + self, + *, + technique_identifiers: Sequence[str], + objective: str, + num_top_techniques: int = 1, + scenario_result_id: str | None = None, + ) -> Sequence[str]: + """ + Return techniques in priority order (try first, try second, …). + + Args: + technique_identifiers (Sequence[str]): Available technique names. + objective (str): The objective text for this selection. + num_top_techniques (int): Max techniques to return. Defaults to 1. + scenario_result_id (str | None): The current scenario run ID, + provided by the dispatcher. Selectors use this when their + scope is ``SelectorScope.CURRENT_RUN``. + + Returns: + Sequence[str]: Up to ``num_top_techniques`` technique names in + priority order. Fewer if not enough techniques are available. + """ + ... # pragma: no cover diff --git a/pyrit/scenario/scenarios/adaptive/text_adaptive.py b/pyrit/scenario/scenarios/adaptive/text_adaptive.py new file mode 100644 index 000000000..a9fb12a03 --- /dev/null +++ b/pyrit/scenario/scenarios/adaptive/text_adaptive.py @@ -0,0 +1,146 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +``TextAdaptive`` — text adaptive scenario. + +Picks attack techniques per-objective using an epsilon-greedy selector +informed by observed success rates. Runs up to ``max_attempts_per_objective`` +techniques per objective and stops early on success. ``prompt_sending`` is +excluded from the adaptive technique pool and runs as the baseline comparison +instead. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar + +from pyrit.common import apply_defaults +from pyrit.common.parameter import Parameter +from pyrit.registry.tag_query import TagQuery +from pyrit.scenario.core.dataset_configuration import DatasetConfiguration +from pyrit.scenario.scenarios.adaptive.adaptive_scenario import AdaptiveScenario +from pyrit.scenario.scenarios.adaptive.selectors import ( + TechniqueSelector, +) + +if TYPE_CHECKING: + from pyrit.scenario.core.scenario_strategy import ScenarioStrategy + from pyrit.score import TrueFalseScorer + +logger = logging.getLogger(__name__) + +# Techniques excluded from the adaptive technique pool. These run as the +# baseline comparison rather than as adversarial moves the selector chooses. +_EXCLUDED_TECHNIQUES = frozenset({"prompt_sending"}) + + +def _build_text_adaptive_strategy() -> type[ScenarioStrategy]: + """ + Build the strategy enum from the core scenario-techniques catalog, + excluding techniques that run as baseline. + + Returns: + type[ScenarioStrategy]: The dynamically-built strategy enum class. + """ + from pyrit.registry.object_registries.attack_technique_registry import ( + AttackTechniqueRegistry, + ) + from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES + + filtered_specs = [spec for spec in SCENARIO_TECHNIQUES if spec.name not in _EXCLUDED_TECHNIQUES] + + return AttackTechniqueRegistry.build_strategy_class_from_specs( # type: ignore[return-value, ty:invalid-return-type] + class_name="TextAdaptiveStrategy", + specs=filtered_specs, + aggregate_tags={ + "default": TagQuery.any_of("default"), + "single_turn": TagQuery.any_of("single_turn"), + "multi_turn": TagQuery.any_of("multi_turn"), + }, + ) + + +class TextAdaptive(AdaptiveScenario): + """ + Adaptive text-attack scenario. + + Selects techniques per-objective via an epsilon-greedy selector over the + set of selected strategies. ``prompt_sending`` runs as the baseline + comparison and is excluded from the adaptive technique pool. + """ + + _cached_strategy_class: ClassVar[type[ScenarioStrategy] | None] = None + + @classmethod + def get_strategy_class(cls) -> type[ScenarioStrategy]: + """Return the strategy enum for this scenario, building it once on first access.""" + if cls._cached_strategy_class is None: + cls._cached_strategy_class = _build_text_adaptive_strategy() + return cls._cached_strategy_class + + @classmethod + def get_default_strategy(cls) -> ScenarioStrategy: + """Return the default strategy aggregate (resolves to every ``default``-tagged technique).""" + strategy_class = cls.get_strategy_class() + return strategy_class("default") + + @classmethod + def required_datasets(cls) -> list[str]: + """Return the dataset names this scenario expects when no override is provided.""" + return [ + "airt_hate", + "airt_fairness", + "airt_violence", + "airt_sexual", + "airt_harassment", + "airt_misinformation", + "airt_leakage", + ] + + @classmethod + def default_dataset_config(cls) -> DatasetConfiguration: + """Return the default :class:`DatasetConfiguration` (required datasets, capped at 4 per dataset).""" + return DatasetConfiguration(dataset_names=cls.required_datasets(), max_dataset_size=4) + + @classmethod + def supported_parameters(cls) -> list[Parameter]: + """ + Declare custom parameters this scenario accepts from the CLI / config file. + + Returns: + list[Parameter]: Parameters configurable per-run. + """ + return [ + Parameter( + name="max_attempts_per_objective", + description="Max techniques tried per objective.", + param_type=int, + default=3, + ), + ] + + @apply_defaults + def __init__( + self, + *, + objective_scorer: TrueFalseScorer | None = None, + selector: TechniqueSelector | None = None, + scenario_result_id: str | None = None, + ) -> None: + """ + Args: + objective_scorer (TrueFalseScorer | None): Scorer used to judge each + response. Defaults to the composite scorer from the base class. + selector (TechniqueSelector | None): Pre-built selector. When ``None`` + (default) an :class:`EpsilonGreedyTechniqueSelector` is created + with default settings. Pass a custom instance to tune + ``epsilon`` or ``random_seed``. + scenario_result_id (str | None): ID of an existing ``ScenarioResult`` to resume. + """ + super().__init__( + objective_scorer=objective_scorer, + selector=selector, + scenario_result_id=scenario_result_id, + ) diff --git a/tests/unit/analytics/test_scenario_analysis.py b/tests/unit/analytics/test_scenario_analysis.py new file mode 100644 index 000000000..0678f89df --- /dev/null +++ b/tests/unit/analytics/test_scenario_analysis.py @@ -0,0 +1,111 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from unittest.mock import MagicMock, patch + +import pytest + +from pyrit.analytics.scenario_analysis import compute_technique_success_rates +from pyrit.models import AttackOutcome + + +LABEL_KEY = "_adaptive_technique" + + +def _make_result(*, technique: str, outcome: AttackOutcome) -> MagicMock: + r = MagicMock() + r.labels = {LABEL_KEY: technique} + r.outcome = outcome + return r + + +@pytest.fixture(autouse=True) +def _patch_memory(): + mock_memory = MagicMock() + mock_memory.get_attack_results.return_value = [] + with patch("pyrit.memory.CentralMemory") as cm: + cm.get_memory_instance.return_value = mock_memory + yield mock_memory + + +class TestComputeTechniqueSuccessRates: + + def test_empty_results_returns_empty(self, _patch_memory): + stats = compute_technique_success_rates(technique_hashes=["a", "b"], label_key=LABEL_KEY) + assert stats == {} + + def test_counts_successes_and_failures(self, _patch_memory): + _patch_memory.get_attack_results.return_value = [ + _make_result(technique="a", outcome=AttackOutcome.SUCCESS), + _make_result(technique="a", outcome=AttackOutcome.SUCCESS), + _make_result(technique="a", outcome=AttackOutcome.FAILURE), + _make_result(technique="b", outcome=AttackOutcome.FAILURE), + ] + + stats = compute_technique_success_rates(technique_hashes=["a", "b"], label_key=LABEL_KEY) + + assert stats["a"].successes == 2 + assert stats["a"].failures == 1 + assert stats["a"].total_decided == 3 + assert stats["b"].successes == 0 + assert stats["b"].failures == 1 + + def test_counts_errors_and_undetermined(self, _patch_memory): + _patch_memory.get_attack_results.return_value = [ + _make_result(technique="a", outcome=AttackOutcome.ERROR), + _make_result(technique="a", outcome=AttackOutcome.UNDETERMINED), + ] + + stats = compute_technique_success_rates(technique_hashes=["a"], label_key=LABEL_KEY) + + assert stats["a"].errors == 1 + assert stats["a"].undetermined == 1 + + def test_ignores_techniques_not_in_requested_list(self, _patch_memory): + _patch_memory.get_attack_results.return_value = [ + _make_result(technique="a", outcome=AttackOutcome.SUCCESS), + _make_result(technique="c", outcome=AttackOutcome.SUCCESS), + ] + + stats = compute_technique_success_rates(technique_hashes=["a", "b"], label_key=LABEL_KEY) + + assert "a" in stats + assert "c" not in stats + + def test_passes_label_key_to_memory_query(self, _patch_memory): + custom_key = "my_custom_key" + compute_technique_success_rates(technique_hashes=["x"], label_key=custom_key) + + call_kwargs = _patch_memory.get_attack_results.call_args[1] + assert call_kwargs["labels"] == {custom_key: ["x"]} + assert call_kwargs["scenario_result_id"] is None + + def test_passes_scenario_result_id_to_memory_query(self, _patch_memory): + compute_technique_success_rates( + technique_hashes=["x"], label_key=LABEL_KEY, scenario_result_id="run-123" + ) + + call_kwargs = _patch_memory.get_attack_results.call_args[1] + assert call_kwargs["scenario_result_id"] == "run-123" + + def test_omits_techniques_with_no_history(self, _patch_memory): + _patch_memory.get_attack_results.return_value = [ + _make_result(technique="a", outcome=AttackOutcome.SUCCESS), + ] + + stats = compute_technique_success_rates(technique_hashes=["a", "b"], label_key=LABEL_KEY) + + assert "a" in stats + assert "b" not in stats + + def test_success_rate_computed(self, _patch_memory): + _patch_memory.get_attack_results.return_value = [ + _make_result(technique="a", outcome=AttackOutcome.SUCCESS), + _make_result(technique="a", outcome=AttackOutcome.SUCCESS), + _make_result(technique="a", outcome=AttackOutcome.FAILURE), + _make_result(technique="a", outcome=AttackOutcome.FAILURE), + ] + + stats = compute_technique_success_rates(technique_hashes=["a"], label_key=LABEL_KEY) + + assert stats["a"].success_rate == pytest.approx(0.5) diff --git a/tests/unit/scenario/scenarios/adaptive/test_dispatcher.py b/tests/unit/scenario/scenarios/adaptive/test_dispatcher.py new file mode 100644 index 000000000..9c6ba29fb --- /dev/null +++ b/tests/unit/scenario/scenarios/adaptive/test_dispatcher.py @@ -0,0 +1,271 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from pyrit.models import AttackOutcome, AttackResult, SeedAttackGroup, SeedObjective +from pyrit.scenario.scenarios.adaptive.dispatcher import ( + ADAPTIVE_ATTEMPT_LABEL, + ADAPTIVE_TECHNIQUE_LABEL, + AdaptiveDispatchAttack, + AdaptiveDispatchContext, + AdaptiveDispatchParams, + TechniqueBundle, +) +from pyrit.scenario.scenarios.adaptive.selectors import ( + EpsilonGreedyTechniqueSelector, +) + + +def _make_bundle(*, name: str, outcomes: list[AttackOutcome], seed_technique=None) -> TechniqueBundle: + """Build a TechniqueBundle whose attack stub yields the given outcomes in order.""" + attack = MagicMock(name=f"attack-{name}") + attack._outcomes = outcomes + attack._name = name + return TechniqueBundle(attack=attack, name=name, seed_technique=seed_technique) + + +def _make_context( + *, + objective: str = "obj", + labels: dict[str, str] | None = None, + seed_group: SeedAttackGroup | None = None, + harm_categories: list[str] | None = None, +) -> AdaptiveDispatchContext: + if seed_group is None: + seed_group = SeedAttackGroup(seeds=[SeedObjective(value=objective, harm_categories=harm_categories)]) + return AdaptiveDispatchContext( + params=AdaptiveDispatchParams( + objective=objective, + memory_labels=labels or {}, + seed_group=seed_group, + ) + ) + + +def _patch_inner( + *, + dispatcher: AdaptiveDispatchAttack, + bundles: dict[str, TechniqueBundle], +) -> AsyncMock: + """Replace ``_run_inner_attack_async`` with a stub backed by per-bundle outcomes.""" + name_for_attack = {id(b.attack): name for name, b in bundles.items()} + counters: dict[str, int] = dict.fromkeys(bundles, 0) + + async def _stub(*, bundle: TechniqueBundle, seed_group, attempt_labels: dict[str, str]) -> AttackResult: + name = name_for_attack[id(bundle.attack)] + idx = counters[name] + counters[name] = idx + 1 + outcome = bundle.attack._outcomes[idx] + return AttackResult( + conversation_id=f"conv-{name}-{idx}", + objective="obj", + outcome=outcome, + ) + + inner_mock = AsyncMock(side_effect=_stub) + dispatcher._run_inner_attack_async = inner_mock # type: ignore[method-assign] + return inner_mock + + +class _StubSelector: + """A deterministic selector stub that returns techniques in the order given.""" + + def __init__(self, *, technique_order: list[str]): + self._order = technique_order + + async def select_async( + self, + *, + technique_identifiers, + objective: str, + num_top_techniques: int = 1, + scenario_result_id: str | None = None, + ): + return self._order[:num_top_techniques] + + +@pytest.fixture +def selector(): + return _StubSelector(technique_order=["a", "b", "c"]) + + +@pytest.fixture +def target() -> MagicMock: + return MagicMock(name="objective_target") + + +@pytest.fixture +def seed_group() -> SeedAttackGroup: + return SeedAttackGroup(seeds=[SeedObjective(value="obj")]) + + +class TestInit: + @pytest.mark.usefixtures("patch_central_database") + def test_init_rejects_empty_techniques(self, target, selector, seed_group): + with pytest.raises(ValueError, match="techniques"): + AdaptiveDispatchAttack( + objective_target=target, + techniques={}, + selector=selector, + + + ) + + @pytest.mark.parametrize("bad_max", [0, -1]) + @pytest.mark.usefixtures("patch_central_database") + def test_init_rejects_invalid_max_attempts(self, target, selector, seed_group, bad_max): + with pytest.raises(ValueError, match="max_attempts_per_objective"): + AdaptiveDispatchAttack( + objective_target=target, + techniques={"a": _make_bundle(name="a", outcomes=[AttackOutcome.SUCCESS])}, + selector=selector, + + + max_attempts_per_objective=bad_max, + ) + + +@pytest.mark.usefixtures("patch_central_database") +class TestPerform: + async def test_stops_on_first_success(self, target, seed_group): + bundles = { + "a": _make_bundle(name="a", outcomes=[AttackOutcome.SUCCESS]), + "b": _make_bundle(name="b", outcomes=[AttackOutcome.SUCCESS]), + } + selector = _StubSelector(technique_order=["a", "b"]) + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques=bundles, + selector=selector, + + + max_attempts_per_objective=5, + ) + inner = _patch_inner(dispatcher=dispatcher, bundles=bundles) + + result = await dispatcher._perform_async(context=_make_context()) + + assert result.outcome == AttackOutcome.SUCCESS + assert inner.call_count == 1 + + async def test_retries_until_max_attempts_on_failure(self, target, seed_group): + bundles = { + "a": _make_bundle(name="a", outcomes=[AttackOutcome.FAILURE] * 3), + "b": _make_bundle(name="b", outcomes=[AttackOutcome.FAILURE] * 3), + "c": _make_bundle(name="c", outcomes=[AttackOutcome.FAILURE] * 3), + } + selector = _StubSelector(technique_order=["a", "b", "c"]) + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques=bundles, + selector=selector, + + + max_attempts_per_objective=3, + ) + inner = _patch_inner(dispatcher=dispatcher, bundles=bundles) + + result = await dispatcher._perform_async(context=_make_context()) + + assert result.outcome == AttackOutcome.FAILURE + assert inner.call_count == 3 + + async def test_passes_attempt_labels_to_inner(self, target, seed_group): + bundles = {"a": _make_bundle(name="a", outcomes=[AttackOutcome.SUCCESS])} + selector = _StubSelector(technique_order=["a"]) + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques=bundles, + selector=selector, + + + ) + inner = _patch_inner(dispatcher=dispatcher, bundles=bundles) + + await dispatcher._perform_async(context=_make_context(labels={"foo": "bar"})) + + labels = inner.call_args.kwargs["attempt_labels"] + assert labels["foo"] == "bar" + assert labels[ADAPTIVE_TECHNIQUE_LABEL] == "a" + assert labels[ADAPTIVE_ATTEMPT_LABEL] == "1" + + async def test_metadata_records_adaptive_trail(self, target, seed_group): + bundles = { + "a": _make_bundle(name="a", outcomes=[AttackOutcome.FAILURE]), + "b": _make_bundle(name="b", outcomes=[AttackOutcome.SUCCESS]), + } + selector = _StubSelector(technique_order=["a", "b"]) + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques=bundles, + selector=selector, + + + max_attempts_per_objective=3, + ) + _patch_inner(dispatcher=dispatcher, bundles=bundles) + result = await dispatcher._perform_async(context=_make_context()) + + trail = result.metadata["adaptive_attempts"] + assert trail == [ + {"technique": "a", "technique_hash": "a", "outcome": "failure"}, + {"technique": "b", "technique_hash": "b", "outcome": "success"}, + ] + + async def test_returns_fresh_result_distinct_from_inner(self, target, seed_group): + bundles = {"a": _make_bundle(name="a", outcomes=[AttackOutcome.SUCCESS])} + selector = _StubSelector(technique_order=["a"]) + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques=bundles, + selector=selector, + + + ) + inner_ids: list[str] = [] + + async def _spy(*, bundle, seed_group, attempt_labels): + inner_result = AttackResult( + conversation_id="conv-a-0", + objective="obj", + outcome=AttackOutcome.SUCCESS, + ) + inner_ids.append(inner_result.attack_result_id) + return inner_result + + dispatcher._run_inner_attack_async = AsyncMock(side_effect=_spy) # type: ignore[method-assign] + + result = await dispatcher._perform_async(context=_make_context()) + + assert len(inner_ids) == 1 + assert result.attack_result_id != inner_ids[0] + assert result.outcome == AttackOutcome.SUCCESS + assert result.metadata["adaptive_attempts"] == [{"technique": "a", "technique_hash": "a", "outcome": "success"}] + + +@pytest.mark.usefixtures("patch_central_database") +class TestValidate: + @pytest.mark.parametrize("bad_objective", ["", " ", "\n\t"]) + def test_validate_rejects_empty_objective(self, target, selector, seed_group, bad_objective): + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques={"a": _make_bundle(name="a", outcomes=[AttackOutcome.SUCCESS])}, + selector=selector, + + + ) + with pytest.raises(ValueError, match="objective"): + dispatcher._validate_context(context=_make_context(objective=bad_objective)) + + def test_validate_accepts_normal_objective(self, target, selector, seed_group): + dispatcher = AdaptiveDispatchAttack( + objective_target=target, + techniques={"a": _make_bundle(name="a", outcomes=[AttackOutcome.SUCCESS])}, + selector=selector, + + + ) + dispatcher._validate_context(context=_make_context(objective="ok")) diff --git a/tests/unit/scenario/scenarios/adaptive/test_epsilon_greedy.py b/tests/unit/scenario/scenarios/adaptive/test_epsilon_greedy.py new file mode 100644 index 000000000..985a6fe74 --- /dev/null +++ b/tests/unit/scenario/scenarios/adaptive/test_epsilon_greedy.py @@ -0,0 +1,132 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from unittest.mock import MagicMock, patch + +import pytest + +from pyrit.analytics.result_analysis import AttackStats +from pyrit.scenario.scenarios.adaptive.selectors import ( + EpsilonGreedyTechniqueSelector, +) + +TECHNIQUES = ["a", "b", "c", "d"] + + +def _seeded_selector(*, epsilon: float = 0.0, random_seed: int = 0) -> EpsilonGreedyTechniqueSelector: + return EpsilonGreedyTechniqueSelector(epsilon=epsilon, random_seed=random_seed) + + +def _empty_rates(*args, **kwargs) -> dict[str, AttackStats]: + """Return empty stats (all techniques unseen).""" + return {} + + +def _rates_with_winner(winner: str, *, successes: int = 5, failures: int = 0): + """Return stats where one technique has a clear win record and others have failures.""" + + def _compute(*args, **kwargs): + stats = {} + total = successes + failures + stats[winner] = AttackStats( + success_rate=successes / total if total else None, + total_decided=total, + successes=successes, + failures=failures, + undetermined=0, + errors=0, + ) + for t in TECHNIQUES: + if t != winner: + stats[t] = AttackStats( + success_rate=0.0, + total_decided=5, + successes=0, + failures=5, + undetermined=0, + errors=0, + ) + return stats + + return _compute + + +class TestEpsilonGreedyTechniqueSelectorInit: + def test_init_defaults(self): + EpsilonGreedyTechniqueSelector() + + @pytest.mark.parametrize("bad_epsilon", [-0.1, 1.1, 2.0, -1.0]) + def test_init_rejects_out_of_range_epsilon(self, bad_epsilon): + with pytest.raises(ValueError, match="epsilon"): + EpsilonGreedyTechniqueSelector(epsilon=bad_epsilon) + + +class TestEpsilonGreedyTechniqueSelectorSelect: + @patch("pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy.compute_technique_success_rates", side_effect=_empty_rates) + async def test_select_empty_techniques_raises(self, _mock): + selector = _seeded_selector() + with pytest.raises(ValueError, match="technique_identifiers"): + await selector.select_async(technique_identifiers=[], objective="obj") + + @patch("pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy.compute_technique_success_rates", side_effect=_empty_rates) + async def test_select_all_unseen_ties_resolved_randomly(self, _mock): + winners = set() + for s in range(50): + sel = _seeded_selector(random_seed=s) + result = await sel.select_async(technique_identifiers=TECHNIQUES, objective="obj") + winners.add(result[0]) + assert len(winners) > 1 + assert winners.issubset(set(TECHNIQUES)) + + @patch( + "pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy.compute_technique_success_rates", + side_effect=_rates_with_winner("b"), + ) + async def test_select_exploits_clear_winner(self, _mock): + selector = _seeded_selector() + for _ in range(20): + result = await selector.select_async(technique_identifiers=TECHNIQUES, objective="obj") + assert result[0] == "b" + + @patch("pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy.compute_technique_success_rates", side_effect=_empty_rates) + async def test_select_epsilon_one_is_pure_random(self, _mock): + selector = _seeded_selector(epsilon=1.0) + picks = set() + for i in range(200): + result = await selector.select_async( + technique_identifiers=TECHNIQUES, objective=f"obj-{i}" + ) + picks.add(result[0]) + assert picks == set(TECHNIQUES) + + @patch("pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy.compute_technique_success_rates", side_effect=_empty_rates) + async def test_select_returns_multiple_techniques(self, _mock): + selector = _seeded_selector() + result = await selector.select_async( + technique_identifiers=TECHNIQUES, objective="obj", num_top_techniques=3 + ) + assert len(result) == 3 + assert len(set(result)) == 3 # no duplicates + + @patch("pyrit.scenario.scenarios.adaptive.selectors.epsilon_greedy.compute_technique_success_rates", side_effect=_empty_rates) + async def test_select_caps_at_available_techniques(self, _mock): + selector = _seeded_selector() + result = await selector.select_async( + technique_identifiers=["a", "b"], objective="obj", num_top_techniques=5 + ) + assert len(result) == 2 + + +class TestEpsilonGreedyEstimate: + def test_estimate_unseen_is_one(self): + assert EpsilonGreedyTechniqueSelector._estimate(technique="a", stats={}) == pytest.approx(1.0) + + def test_estimate_with_data(self): + stats = { + "a": AttackStats( + success_rate=0.6, total_decided=5, successes=3, failures=2, undetermined=0, errors=0 + ) + } + # (3 + 1) / (5 + 1) = 4/6 ≈ 0.6667 + assert EpsilonGreedyTechniqueSelector._estimate(technique="a", stats=stats) == pytest.approx(4 / 6) + diff --git a/tests/unit/scenario/scenarios/adaptive/test_technique_selector.py b/tests/unit/scenario/scenarios/adaptive/test_technique_selector.py new file mode 100644 index 000000000..5167cf331 --- /dev/null +++ b/tests/unit/scenario/scenarios/adaptive/test_technique_selector.py @@ -0,0 +1,13 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from pyrit.scenario.scenarios.adaptive.selectors import ( + EpsilonGreedyTechniqueSelector, + TechniqueSelector, +) + + +class TestTechniqueSelectorProtocol: + def test_implements_protocol(self): + selector = EpsilonGreedyTechniqueSelector() + assert isinstance(selector, TechniqueSelector) diff --git a/tests/unit/scenario/scenarios/adaptive/test_text_adaptive.py b/tests/unit/scenario/scenarios/adaptive/test_text_adaptive.py new file mode 100644 index 000000000..786f3692a --- /dev/null +++ b/tests/unit/scenario/scenarios/adaptive/test_text_adaptive.py @@ -0,0 +1,372 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Tests for the ``TextAdaptive`` scenario.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from pyrit.identifiers import ComponentIdentifier +from pyrit.models import SeedAttackGroup, SeedObjective +from pyrit.prompt_target import PromptTarget +from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry +from pyrit.scenario.core.dataset_configuration import DatasetConfiguration +from pyrit.scenario.core.scenario import BaselineAttackPolicy +from pyrit.scenario.scenarios.adaptive.dispatcher import ( + AdaptiveDispatchAttack, +) +from pyrit.scenario.scenarios.adaptive.text_adaptive import TextAdaptive +from pyrit.score import TrueFalseScorer + +_MOCK_MANY_SHOT_EXAMPLES = [{"question": f"q{i}", "answer": f"a{i}"} for i in range(100)] + + +def _mock_id(name: str) -> ComponentIdentifier: + return ComponentIdentifier(class_name=name, class_module="test") + + +@pytest.fixture +def mock_objective_target() -> MagicMock: + mock = MagicMock(spec=PromptTarget) + mock.get_identifier.return_value = _mock_id("MockObjectiveTarget") + return mock + + +@pytest.fixture +def mock_objective_scorer() -> MagicMock: + mock = MagicMock(spec=TrueFalseScorer) + mock.get_identifier.return_value = _mock_id("MockObjectiveScorer") + return mock + + +@pytest.fixture(autouse=True) +def reset_technique_registry(): + """Reset registries and the cached strategy class between tests.""" + from pyrit.registry import TargetRegistry + + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + TextAdaptive._cached_strategy_class = None + yield + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + TextAdaptive._cached_strategy_class = None + + +@pytest.fixture(autouse=True) +def patch_many_shot_load(): + with patch( + "pyrit.executor.attack.single_turn.many_shot_jailbreak.load_many_shot_jailbreaking_dataset", + return_value=_MOCK_MANY_SHOT_EXAMPLES, + ): + yield + + +@pytest.fixture +def mock_runtime_env(): + with patch.dict( + "os.environ", + { + "OPENAI_CHAT_ENDPOINT": "https://test.openai.azure.com/", + "OPENAI_CHAT_KEY": "test-key", + "OPENAI_CHAT_MODEL": "gpt-4", + }, + ): + yield + + +def _make_seed_group(*, value: str, harm_categories: list[str] | None = None) -> SeedAttackGroup: + return SeedAttackGroup(seeds=[SeedObjective(value=value, harm_categories=harm_categories)]) + + +def _make_fake_factory(*, seed_technique=None, adversarial_chat=None) -> MagicMock: + """Return a stub attack-technique factory that produces a fake ``AttackTechnique``. + + Mocks the surface ``AdaptiveScenario._build_techniques_dict`` consumes + (``factory.create(...)`` and ``factory.adversarial_chat``). + """ + fake_technique = MagicMock() + fake_technique.attack = MagicMock(name="fake-attack-strategy") + fake_technique.seed_technique = seed_technique + factory = MagicMock() + factory.create.return_value = fake_technique + factory.adversarial_chat = adversarial_chat + return factory + + +FIXTURES = ["patch_central_database", "mock_runtime_env"] + + +@pytest.mark.usefixtures(*FIXTURES) +class TestTextAdaptiveBasics: + def test_version(self): + assert TextAdaptive.VERSION == 1 + + def test_baseline_enabled(self): + assert TextAdaptive.BASELINE_ATTACK_POLICY is BaselineAttackPolicy.Enabled + + def test_default_dataset_config(self): + config = TextAdaptive.default_dataset_config() + assert isinstance(config, DatasetConfiguration) + assert config.max_dataset_size == 4 + + def test_required_datasets_non_empty(self): + assert len(TextAdaptive.required_datasets()) > 0 + + def test_get_strategy_class_is_cached(self): + cls_a = TextAdaptive.get_strategy_class() + cls_b = TextAdaptive.get_strategy_class() + assert cls_a is cls_b + + def test_get_default_strategy(self): + strat = TextAdaptive.get_default_strategy() + # The default aggregate must resolve to something runnable. + assert strat is not None + + @patch("pyrit.scenario.core.scenario.Scenario._get_default_objective_scorer") + def test_init_stores_adaptive_params(self, mock_get_scorer, mock_objective_scorer): + mock_get_scorer.return_value = mock_objective_scorer + scenario = TextAdaptive() + scenario.set_params_from_args( + args={ + "max_attempts_per_objective": 7, + } + ) + assert scenario.params["max_attempts_per_objective"] == 7 + + +@pytest.mark.usefixtures(*FIXTURES) +class TestTextAdaptiveAtomicAttacks: + """Tests for ``_get_atomic_attacks_async`` overriding.""" + + async def _build_scenario_and_attacks( + self, + *, + mock_objective_target, + mock_objective_scorer, + seed_groups: dict[str, list[SeedAttackGroup]], + **scenario_kwargs, + ): + with patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=seed_groups): + scenario = TextAdaptive( + objective_scorer=mock_objective_scorer, + **scenario_kwargs, + ) + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + ) + return scenario, await scenario._get_atomic_attacks_async() + + async def test_one_atomic_per_dataset(self, mock_objective_target, mock_objective_scorer): + groups = { + "violence": [ + _make_seed_group(value="obj-v1", harm_categories=["violence"]), + _make_seed_group(value="obj-v2", harm_categories=["violence"]), + ], + "hate": [ + _make_seed_group(value="obj-h1", harm_categories=["hate"]), + ], + } + _scenario, attacks = await self._build_scenario_and_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + seed_groups=groups, + ) + # One atomic per dataset, carrying all that dataset's seed groups. + assert len(attacks) == 2 + total_seed_groups = sum(len(a.seed_groups) for a in attacks) + assert total_seed_groups == 3 + + async def test_atomics_share_one_selector_across_dispatchers(self, mock_objective_target, mock_objective_scorer): + groups = { + "violence": [ + _make_seed_group(value="obj-v1", harm_categories=["violence"]), + _make_seed_group(value="obj-v2", harm_categories=["violence"]), + ], + "hate": [ + _make_seed_group(value="obj-h1", harm_categories=["hate"]), + ], + } + _scenario, attacks = await self._build_scenario_and_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + seed_groups=groups, + ) + dispatchers = [atomic._attack_technique.attack for atomic in attacks] + # One dispatcher per dataset (atomic). + assert len({id(d) for d in dispatchers}) == len(attacks) + for d in dispatchers: + assert isinstance(d, AdaptiveDispatchAttack) + # All dispatchers share the same selector so learning is global. + selectors = {id(d._selector) for d in dispatchers} + assert len(selectors) == 1 + + async def test_atomic_names_are_dataset_scoped(self, mock_objective_target, mock_objective_scorer): + groups = { + "violence": [_make_seed_group(value=f"obj-{i}", harm_categories=["violence"]) for i in range(5)], + "hate": [_make_seed_group(value=f"hate-{i}", harm_categories=["hate"]) for i in range(3)], + } + _scenario, attacks = await self._build_scenario_and_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + seed_groups=groups, + ) + names = {atomic.atomic_attack_name for atomic in attacks} + # One atomic name per dataset; the dataset name is embedded. + assert len(names) == len(groups) + assert all(any(ds in name for ds in groups) for name in names) + + async def test_display_group_is_dataset_name(self, mock_objective_target, mock_objective_scorer): + groups = { + "violence": [_make_seed_group(value="obj-v", harm_categories=["violence"])], + "hate": [_make_seed_group(value="obj-h", harm_categories=["hate"])], + } + _scenario, attacks = await self._build_scenario_and_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + seed_groups=groups, + ) + display_groups = {atomic.display_group for atomic in attacks} + assert display_groups == {"violence", "hate"} + + async def test_no_usable_techniques_raises(self, mock_objective_target, mock_objective_scorer): + groups = {"violence": [_make_seed_group(value="obj")]} + with patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups): + scenario = TextAdaptive(objective_scorer=mock_objective_scorer) + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + ) + # Force the factory map to be empty. + with patch.object(scenario, "_get_attack_technique_factories", return_value={}): + with pytest.raises(ValueError, match="no usable techniques"): + await scenario._get_atomic_attacks_async() + + async def test_techniques_with_seed_technique_are_kept(self, mock_objective_target, mock_objective_scorer): + """Factories that declare a ``seed_technique`` participate in the pool + (the old behavior silently dropped them with a warning). + """ + groups = {"violence": [_make_seed_group(value="obj")]} + plain_factory = _make_fake_factory(seed_technique=None) + seeded_factory = _make_fake_factory(seed_technique=MagicMock(name="seed_technique")) + + with ( + patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups), + patch.object(SeedAttackGroup, "is_compatible_with_technique", return_value=True), + ): + scenario = TextAdaptive(objective_scorer=mock_objective_scorer) + strategy_class = scenario.get_strategy_class() + factories = {"role_play": plain_factory, "many_shot": seeded_factory} + with patch.object(scenario, "_get_attack_technique_factories", return_value=factories): + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + scenario_strategies=[strategy_class("role_play"), strategy_class("many_shot")], + ) + attacks = scenario._atomic_attacks + + assert len(attacks) == 1 + dispatcher = attacks[0]._attack_technique.attack + assert isinstance(dispatcher, AdaptiveDispatchAttack) + # Both factories survive; in particular the seeded one is no longer + # silently dropped. Keys are now eval hashes; check by bundle name. + technique_names = {b.name for b in dispatcher._techniques.values()} + assert "role_play" in technique_names + assert "many_shot" in technique_names + + async def test_incompatible_seed_technique_is_filtered_per_objective( + self, mock_objective_target, mock_objective_scorer + ): + """Per-objective candidate pool drops techniques whose seed_technique + is incompatible with the seed group; compatible techniques remain. + """ + groups = {"violence": [_make_seed_group(value="obj")]} + plain_factory = _make_fake_factory(seed_technique=None) + incompatible_factory = _make_fake_factory(seed_technique=MagicMock(name="incompatible_seed_technique")) + + with ( + patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups), + patch.object(SeedAttackGroup, "is_compatible_with_technique", return_value=False), + ): + scenario = TextAdaptive(objective_scorer=mock_objective_scorer) + strategy_class = scenario.get_strategy_class() + factories = {"role_play": plain_factory, "many_shot": incompatible_factory} + with patch.object(scenario, "_get_attack_technique_factories", return_value=factories): + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + scenario_strategies=[strategy_class("role_play"), strategy_class("many_shot")], + ) + attacks = scenario._atomic_attacks + + assert len(attacks) == 1 + dispatcher = attacks[0]._attack_technique.attack + # Under the one-atomic-per-dataset design, the full technique pool is + # shared by the dispatcher; per-call compatibility filtering now + # happens inside ``AdaptiveDispatchAttack._perform_async``. The seed + # group survived because the plain (no-seed_technique) factory keeps + # the compatible pool non-empty. Keys are now eval hashes; check by bundle name. + technique_names = {b.name for b in dispatcher._techniques.values()} + assert "role_play" in technique_names + assert "many_shot" in technique_names + assert len(attacks[0].seed_groups) == 1 + + async def test_objective_skipped_when_no_compatible_techniques( + self, mock_objective_target, mock_objective_scorer, caplog + ): + """When every technique requires an incompatible seed_technique, the + objective is dropped with a warning rather than producing an atomic + attack with an empty technique pool. + """ + groups = { + "violence": [_make_seed_group(value="obj-keep")], + "hate": [_make_seed_group(value="obj-skip")], + } + seeded_factory = _make_fake_factory(seed_technique=MagicMock(name="seed_technique")) + + # is_compatible_with_technique returns True for "obj-keep", False for "obj-skip". + def _selective_compat(self_group, *, technique): + return self_group.objective.value == "obj-keep" + + with ( + patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups), + patch.object(SeedAttackGroup, "is_compatible_with_technique", _selective_compat), + ): + scenario = TextAdaptive(objective_scorer=mock_objective_scorer) + strategy_class = scenario.get_strategy_class() + with patch.object( + scenario, + "_get_attack_technique_factories", + return_value={"role_play": seeded_factory}, + ): + import logging + + with caplog.at_level(logging.WARNING): + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=False, + scenario_strategies=[strategy_class("role_play")], + ) + attacks = scenario._atomic_attacks + + # Only the compatible objective produced an atomic attack. + assert len(attacks) == 1 + # Skip was logged with the affected objective value. + assert any("obj-skip" in record.getMessage() for record in caplog.records) + + +@pytest.mark.usefixtures(*FIXTURES) +class TestTextAdaptiveBaselinePolicy: + async def test_initialize_async_accepts_explicit_baseline(self, mock_objective_target, mock_objective_scorer): + groups = {"violence": [_make_seed_group(value="obj", harm_categories=["violence"])]} + with patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups): + scenario = TextAdaptive(objective_scorer=mock_objective_scorer) + # Baseline is Enabled by default, so explicit include_baseline=True must not raise. + await scenario.initialize_async( + objective_target=mock_objective_target, + include_baseline=True, + )