Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions backends/nxp/backend/edge_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# LICENSE file in the root directory of this source tree.

import logging
import operator

import torch

Expand Down Expand Up @@ -367,3 +368,104 @@ def node_has_well_defined_shape(node: Node) -> bool:

def try_get_arg(node: Node, idx: int) -> Argument | None:
return node.args[idx] if idx < len(node.args) else None


def input_quantization_type(
node: Node, input_index: int | tuple[int, int]
) -> torch.dtype | None:
"""Return the quantization input datatype of the QDQ quantized `node`.

:param node: The compute node.
:param input_index: The index into the `node.args`. If a tuple of 2 ints is provided,
`args[input_index[0]][input_index[1]]` is used instead.
:return: The input quantization datatype of the QDQ quantized `node`, or `None` if the graph does not follow the
QDQ pattern or some metadata is incomplete or an invalid input index is given.

│ <returned type>
┌─────▼──────┐
│ Dequantize │
└─────┬──────┘
│ float
┌───▼────┐
│ `node` │
└───┬────┘
"""
try:
if isinstance(input_index, int):
dequantize_node = node.args[input_index]
elif (
isinstance(input_index, tuple)
and len(input_index) == 2
and all(isinstance(i, int) for i in input_index)
):
dequantize_node = node.args[input_index[0]][input_index[1]]
else:
raise RuntimeError(
"NXP backend: edge_helper.input_quantization_type(): Invalid input index."
)
except IndexError:
return None # Invalid input args index.

if not _is_dequantize(dequantize_node):
return None # Broken QDQ schema.

if (dequantize_input_val := dequantize_node.args[0].meta.get("val")) is None:
return None # Invalid metadata.

return dequantize_input_val.dtype


def output_quantization_type(
node: Node, output_index: int | None = None
) -> torch.dtype | None:
"""Return the quantization output datatype of the QDQ quantized `node`.

:param node: The compute node.
:param output_index: If the `node` has multiple outputs and therefore multiple `getitem` nodes follow it, the
index selects the output.
:return: The output quantization datatype of the QDQ quantized `node`, or `None` if the graph does not follow the
QDQ pattern or some metadata is incomplete or an invalid input index is given.

┌───▼────┐
│ `node` │
┌───▼────┐ └───┬────┘
│ `node` │ │
└───┬────┘ ┌──┴───────────────...──
│ float ┌─────────▼─────────────┐
┌────▼─────┐ or │ getitem(output_index) │ ...
│ Quantize │ └─────────┬─────────────┘
└────┬─────┘ │ float
│ <returned type> ┌────▼─────┐
│ Quantize │
└────┬─────┘
│ <returned type>
"""
users = list(node.users)
if len(users) == 1:
if not _is_quantize(quantize_node := users[0]):
return None

else: # Multiple users
if not isinstance(output_index, int):
return None # Invalid index.
if not all(user.target == operator.getitem for user in users):
# Broken QDQ schema (unexpected nodes). These nodes should be moved out by
# `move_auxiliary_operator_into_separate_qdq_cluster_pass.py`.
return None

selected_getitems = list(
Comment thread
MartinPavella marked this conversation as resolved.
filter(lambda getitem: getitem.args[1] == output_index, users)
)
if len(selected_getitems) != 1:
return None # Multiple getitems access the selected output -> broken QDQ schema.
selected_getitem_users = list(selected_getitems[0].users)
if not (
Comment thread
MartinPavella marked this conversation as resolved.
len(selected_getitem_users) == 1
and _is_quantize(quantize_node := selected_getitem_users[0])
):
return None # Broken QDQ schema.

if (quantize_val := quantize_node.meta.get("val")) is None:
return None # Invalid metadata.

return quantize_val.dtype
69 changes: 69 additions & 0 deletions backends/nxp/backend/ir/converter/node_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from executorch.backends.nxp.backend.custom_delegation_options import (
CustomDelegationOptions,
)
from executorch.backends.nxp.backend.edge_helper import (
input_quantization_type,
output_quantization_type,
)
from executorch.backends.nxp.backend.ir.conversion_context import ConversionContext
from executorch.backends.nxp.backend.ir.converter.builder.aten_model_builder_director import (
AtenModelBuilderDirector,
Expand Down Expand Up @@ -308,3 +312,68 @@ def _create_tflite_op_with_io_tensors(self, node: Node) -> tflite_model.Operator
t_operator.tmp_outputs.append(self.builder.tensor_for_name(tensor_name))

return t_operator

@staticmethod
def uses_quantization_type_for_inputs(
node: Node,
supported_types: list[torch.dtype],
input_indices: list[int | tuple[int, int]],
) -> bool:
"""Check if `node` uses the QDQ quantization schema and inputs on the provided indices use a quantization type
that is in `supported_types`.

:param node: The compute node.
:param supported_types: List of supported quantization types.
:param input_indices: List of indices into the `node.args`, or tuples of 2 indices into `node.args[idx1][idx2]`.
:return: True, if the `node` is QDQ quantized and has quantization input types in `supported_types`.
"""
return all(
input_quantization_type(node, input_index) in supported_types
for input_index in input_indices
)

@staticmethod
def uses_quantization_type_for_outputs(
node: Node,
supported_types: list[torch.dtype],
output_indices: list[int] | None = None,
):
"""Check if `node` uses the QDQ quantization schema and outputs on the provided indices use a quantization type
that is in `supported_types`.

:param node: The compute node.
:param supported_types: List of supported quantization types.
:param output_indices: If the `node` has multiple outputs and therefore multiple `getitem` nodes follow it, the
indices select the outputs to be checked.
:return: True, if the `node` is QDQ quantized and has quantization output types in `supported_types`.
"""
if output_indices is None:
return output_quantization_type(node) in supported_types
else:
return all(
output_quantization_type(node, output_index) in supported_types
for output_index in output_indices
)

@staticmethod
def uses_quantization_type_for_io(
node: Node,
supported_types: list[torch.dtype],
input_indices: list[int | tuple[int, int]],
output_indices: list[int] | None = None,
):
"""Check if `node` uses the QDQ quantization schema and inputs and outputs on the provided indices use a
quantization type that is in `supported_types`.

:param node: The compute node.
:param supported_types: List of supported quantization types.
:param input_indices: List of indices into the `node.args`, or tuples of 2 indices into `node.args[idx1][idx2]`.
:param output_indices: If the `node` has multiple outputs and therefore multiple `getitem` nodes follow it, the
indices select the outputs to be checked.
:return: True, if the `node` is QDQ quantized and has quantization input types in `supported_types`.
"""
return NodeConverter.uses_quantization_type_for_inputs(
node, supported_types, input_indices
) and NodeConverter.uses_quantization_type_for_outputs(
node, supported_types, output_indices
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright 2025 NXP
# Copyright 2025-2026 NXP
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import numpy as np
import torch

from executorch.backends.nxp.backend.ir.converter.conversion import (
aten_translator,
Expand All @@ -21,6 +22,8 @@
from executorch.backends.nxp.backend.ir.tflite_generator.builtin_options import (
average_pool_2d_options,
)

from executorch.backends.nxp.backend.neutron_target_spec import NeutronTargetSpec
from torch.fx import Node
from torch.nn import Parameter

Expand Down Expand Up @@ -53,6 +56,33 @@ def _is_supported_in_IR(

return True

@staticmethod
def _is_supported_on_target(
Comment thread
MartinPavella marked this conversation as resolved.
node: Node,
neutron_target_spec: NeutronTargetSpec,
parameters_mapping: dict[str, Parameter],
custom_delegation_options: CustomDelegationOptions,
) -> bool:
kernel = node.args[1]
stride = node.args[2]

if custom_delegation_options.use_new_flow_neutron_c:
# Requirements specified by the new Neutron flow documentation.

supported_types = [torch.int8, torch.uint8]
if not NodeConverter.uses_quantization_type_for_io(
node, supported_types, [0]
):
return False

if any(k > 4096 for k in kernel):
return False

if any(s > 4096 for s in stride):
return False

Comment thread
MartinPavella marked this conversation as resolved.
return True

# noinspection PyMethodMayBeStatic
def _convert_2d_avg_pool(
self, kernel_size, stride, padding, t_op: tflite_model.Operator
Expand Down Expand Up @@ -85,10 +115,19 @@ def _convert_2d_avg_pool(

return ops.flatten()

# AvgPool2d Node format: (Tensor self, int[2] kernel_size, int[2] stride=[], int[2] padding=0, bool ceil_mode=False
# bool count_include_pad=True, int? divisor_override=None)
def convert(self, node: Node):
"""Convert 'avg_pool2d' operator to TFLite 'AveragePool2D'."""
"""Convert the 'aten.avg_pool2d' operator to NeutronIR 'AveragePool2D'.
The ExecuTorch schema is:
aten.avg_pool2d(
Tensor self,
int[2] kernel_size,
int[2] stride=[],
int[2] padding=0,
bool ceil_mode=False
bool count_include_pad=True,
int? divisor_override=None
)
"""
self.assert_convertible(node)

kernel_size = node.args[1]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 NXP
# Copyright 2024,2026 NXP
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
Expand Down Expand Up @@ -28,18 +28,22 @@
ToNCHWPreprocess,
ToNHWCPreprocess,
)
from executorch.backends.nxp.tests.graph_verifier import BaseGraphVerifier
from executorch.backends.nxp.tests.models import AvgPool2dConvModule, AvgPool2dModule

from executorch.backends.nxp.tests.nsys_testing import lower_run_compare

from executorch.backends.nxp.tests.ops_aliases import (
AvgPool2D,
ExecutorchDelegateCall,
Squeeze,
SqueezeDim,
SqueezeDims,
Unsqueeze,
ViewCopy,
)
from torch.export import ExportedProgram
from executorch.backends.nxp.tests.use_qat import * # noqa F403
from executorch.exir.dialects._ops import ops as exir_ops

AvgPool2D = exir_ops.edge.aten.avg_pool2d.default
ExecutorchDelegateCall = torch.ops.higher_order.executorch_call_delegate
Squeeze = exir_ops.edge.aten.squeeze.default
SqueezeDim = exir_ops.edge.aten.squeeze.dim
SqueezeDims = exir_ops.edge.aten.squeeze.dims
Unsqueeze = exir_ops.edge.aten.unsqueeze.default
ViewCopy = exir_ops.edge.aten.view_copy.default


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -296,3 +300,73 @@ def test_from_avg_pool_1d(mocker):
tflite_input_preprocess=ToChannelLastPreprocess(),
tflite_output_preprocess=ToChannelFirstPreprocess(),
)


class TestAvgPool2DNewNeutronFlow:
def test__basic_nsys_inference(self):
input_shape = (2, 4, 6, 7)
model = AvgPool2dModule(False, 0)
graph_verifier = BaseGraphVerifier(
exp_num_delegate_call_nodes=1, # Delegated AvgPool.
exp_non_delegated_nodes=[],
)

lower_run_compare(
model, input_shape, graph_verifier, use_new_flow_neutron_c=True
Comment thread
MartinPavella marked this conversation as resolved.
)

def test__kernel_size_limit(self):
kernel_size = (1, 4096)
input_shape = (1, 4) + kernel_size
model = AvgPool2dModule(False, 0, kernel_size)
graph_verifier = BaseGraphVerifier(
exp_num_delegate_call_nodes=1, # Delegated AvgPool.
exp_non_delegated_nodes=[],
)

lower_run_compare(
model, input_shape, graph_verifier, use_new_flow_neutron_c=True
)

def test__kernel_size_limit_exceeded(self):
kernel_size = (1, 4097) # Exceeds the kernel size limit.
input_shape = (1, 4) + kernel_size
model = AvgPool2dModule(False, 0, kernel_size)

delegated_ep = to_quantized_edge_program(
model, input_shape, use_new_flow_neutron_c=True
).exported_program()

# Make sure the `avg_pool2d` was NOT delegated.
assert not graph_contains_any_of_ops(
delegated_ep.graph, [ExecutorchDelegateCall]
)
assert graph_contains_any_of_ops(delegated_ep.graph, [AvgPool2D])

def test__stride_limit(self):
stride = 4096
input_shape = (1, 4, 1, 4096)
model = AvgPool2dModule(False, 0, 1, stride)
graph_verifier = BaseGraphVerifier(
exp_num_delegate_call_nodes=1, # Delegated AvgPool.
exp_non_delegated_nodes=[],
)

lower_run_compare(
model, input_shape, graph_verifier, use_new_flow_neutron_c=True
)

def test__stride_limit_exceeded(self):
stride = 4097 # Exceeds the stride limit.
input_shape = (1, 4, 1, 4096)
model = AvgPool2dModule(False, 0, 1, stride)

delegated_ep = to_quantized_edge_program(
model, input_shape, use_new_flow_neutron_c=True
).exported_program()

# Make sure the `avg_pool2d` was NOT delegated.
assert not graph_contains_any_of_ops(
delegated_ep.graph, [ExecutorchDelegateCall]
)
assert graph_contains_any_of_ops(delegated_ep.graph, [AvgPool2D])
6 changes: 3 additions & 3 deletions backends/nxp/tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,12 @@ def forward(self, x):


class AvgPool2dModule(torch.nn.Module):
def __init__(self, count_include_pad, padding=0):
def __init__(self, count_include_pad, padding=0, kernel_size=3, stride=2):
super().__init__()

self.avg_pool = torch.nn.AvgPool2d(
kernel_size=3,
stride=2,
kernel_size=kernel_size,
stride=stride,
padding=padding,
count_include_pad=count_include_pad,
)
Expand Down
Loading