Skip to content

Dead code analysis

dead_code_analysis

CFGEdge dataclass

Bases: GenericLatticeAnchor

A lattice anchor representing a control-flow edge between two blocks.

Source code in xdsl/analysis/dead_code_analysis.py
19
20
21
22
23
24
25
26
27
28
29
@dataclass(frozen=True)
class CFGEdge(GenericLatticeAnchor):
    """A lattice anchor representing a control-flow edge between two blocks."""

    from_block: Block
    to_block: Block

    def __str__(self) -> str:
        from_name = self.from_block.name_hint or id(self.from_block)
        to_name = self.to_block.name_hint or id(self.to_block)
        return f"edge({from_name} -> {to_name})"

from_block: Block instance-attribute

to_block: Block instance-attribute

__init__(from_block: Block, to_block: Block) -> None

__str__() -> str

Source code in xdsl/analysis/dead_code_analysis.py
26
27
28
29
def __str__(self) -> str:
    from_name = self.from_block.name_hint or id(self.from_block)
    to_name = self.to_block.name_hint or id(self.to_block)
    return f"edge({from_name} -> {to_name})"

Executable

Bases: AnalysisState

A state that represents whether a block or CFG edge is live.

Source code in xdsl/analysis/dead_code_analysis.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Executable(AnalysisState):
    """A state that represents whether a block or CFG edge is live."""

    live: bool
    block_content_subscribers: set[DataFlowAnalysis]

    def __init__(self, anchor: LatticeAnchor):
        super().__init__(anchor)
        self.live = False
        self.block_content_subscribers = set()

    def set_to_live(self) -> ChangeResult:
        """Marks the anchor as live and returns whether the state changed."""
        if self.live:
            return ChangeResult.NO_CHANGE
        self.live = True
        return ChangeResult.CHANGE

    def on_update(self, solver: DataFlowSolver) -> None:
        super().on_update(solver)

        # If a block becomes live, enqueue its operations for subscribed analyses.
        if self.live and isinstance(self.anchor, ProgramPoint):
            point = self.anchor
            # Check if this is the start of a block
            if (
                isinstance(point.entity, Operation)
                and point.entity.parent
                and point.entity is point.entity.parent.first_op
            ):
                block = point.entity.parent
                for analysis in self.block_content_subscribers:
                    for op in block.ops:
                        solver.enqueue((ProgramPoint.before(op), analysis))

    def __str__(self) -> str:
        return "live" if self.live else "dead"

live: bool = False instance-attribute

block_content_subscribers: set[DataFlowAnalysis] = set() instance-attribute

__init__(anchor: LatticeAnchor)

Source code in xdsl/analysis/dead_code_analysis.py
38
39
40
41
def __init__(self, anchor: LatticeAnchor):
    super().__init__(anchor)
    self.live = False
    self.block_content_subscribers = set()

set_to_live() -> ChangeResult

Marks the anchor as live and returns whether the state changed.

Source code in xdsl/analysis/dead_code_analysis.py
43
44
45
46
47
48
def set_to_live(self) -> ChangeResult:
    """Marks the anchor as live and returns whether the state changed."""
    if self.live:
        return ChangeResult.NO_CHANGE
    self.live = True
    return ChangeResult.CHANGE

on_update(solver: DataFlowSolver) -> None

Source code in xdsl/analysis/dead_code_analysis.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def on_update(self, solver: DataFlowSolver) -> None:
    super().on_update(solver)

    # If a block becomes live, enqueue its operations for subscribed analyses.
    if self.live and isinstance(self.anchor, ProgramPoint):
        point = self.anchor
        # Check if this is the start of a block
        if (
            isinstance(point.entity, Operation)
            and point.entity.parent
            and point.entity is point.entity.parent.first_op
        ):
            block = point.entity.parent
            for analysis in self.block_content_subscribers:
                for op in block.ops:
                    solver.enqueue((ProgramPoint.before(op), analysis))

__str__() -> str

Source code in xdsl/analysis/dead_code_analysis.py
67
68
def __str__(self) -> str:
    return "live" if self.live else "dead"

PredecessorState

Bases: AnalysisState

A state representing the set of live control-flow predecessors of a program point (e.g., a region entry or a call operation).

Source code in xdsl/analysis/dead_code_analysis.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class PredecessorState(AnalysisState):
    """
    A state representing the set of live control-flow predecessors of a
    program point (e.g., a region entry or a call operation).
    """

    all_known: bool
    known_predecessors: set[Operation]
    successor_inputs: dict[Operation, Sequence[SSAValue]]

    def __init__(self, anchor: LatticeAnchor):
        super().__init__(anchor)
        self.all_known = True
        self.known_predecessors = set()
        self.successor_inputs = {}

    def set_has_unknown_predecessors(self) -> ChangeResult:
        """Marks that not all predecessors can be known."""
        if not self.all_known:
            return ChangeResult.NO_CHANGE
        self.all_known = False
        return ChangeResult.CHANGE

    def join(
        self, predecessor: Operation, inputs: Sequence[SSAValue] | None = None
    ) -> ChangeResult:
        """Add a known predecessor and its successor inputs."""
        changed = predecessor not in self.known_predecessors
        if changed:
            self.known_predecessors.add(predecessor)

        if inputs is not None:
            if self.successor_inputs.get(predecessor) != inputs:
                self.successor_inputs[predecessor] = inputs
                changed = True

        return ChangeResult.CHANGE if changed else ChangeResult.NO_CHANGE

    def __str__(self) -> str:
        preds = ", ".join(p.name for p in self.known_predecessors)
        status = "all known" if self.all_known else "unknown predecessors"
        return f"Predecessors: [{preds}] ({status})"

all_known: bool = True instance-attribute

known_predecessors: set[Operation] = set() instance-attribute

successor_inputs: dict[Operation, Sequence[SSAValue]] = {} instance-attribute

__init__(anchor: LatticeAnchor)

Source code in xdsl/analysis/dead_code_analysis.py
81
82
83
84
85
def __init__(self, anchor: LatticeAnchor):
    super().__init__(anchor)
    self.all_known = True
    self.known_predecessors = set()
    self.successor_inputs = {}

set_has_unknown_predecessors() -> ChangeResult

Marks that not all predecessors can be known.

Source code in xdsl/analysis/dead_code_analysis.py
87
88
89
90
91
92
def set_has_unknown_predecessors(self) -> ChangeResult:
    """Marks that not all predecessors can be known."""
    if not self.all_known:
        return ChangeResult.NO_CHANGE
    self.all_known = False
    return ChangeResult.CHANGE

join(predecessor: Operation, inputs: Sequence[SSAValue] | None = None) -> ChangeResult

Add a known predecessor and its successor inputs.

Source code in xdsl/analysis/dead_code_analysis.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def join(
    self, predecessor: Operation, inputs: Sequence[SSAValue] | None = None
) -> ChangeResult:
    """Add a known predecessor and its successor inputs."""
    changed = predecessor not in self.known_predecessors
    if changed:
        self.known_predecessors.add(predecessor)

    if inputs is not None:
        if self.successor_inputs.get(predecessor) != inputs:
            self.successor_inputs[predecessor] = inputs
            changed = True

    return ChangeResult.CHANGE if changed else ChangeResult.NO_CHANGE

__str__() -> str

Source code in xdsl/analysis/dead_code_analysis.py
109
110
111
112
def __str__(self) -> str:
    preds = ", ".join(p.name for p in self.known_predecessors)
    status = "all known" if self.all_known else "unknown predecessors"
    return f"Predecessors: [{preds}] ({status})"

DeadCodeAnalysis

Bases: DataFlowAnalysis

This analysis identifies executable code paths. It is a foundational analysis that other dataflow analyses can depend on to correctly handle control flow.

NOTE: Due to the lack of control flow interfaces in xDSL (e.g., BranchOpInterface, RegionBranchOpInterface, CallOpInterface), the analysis does not handle control flow at all. Essentially, only the entry block of the top-level op's first region is marked as live.

Source code in xdsl/analysis/dead_code_analysis.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class DeadCodeAnalysis(DataFlowAnalysis):
    """
    This analysis identifies executable code paths. It is a foundational analysis
    that other dataflow analyses can depend on to correctly handle control flow.

    NOTE: Due to the lack of control flow interfaces in xDSL (e.g., BranchOpInterface,
    RegionBranchOpInterface, CallOpInterface), the analysis does not handle control flow at all.
    Essentially, only the entry block of the top-level op's first region is marked as live.
    """

    def initialize(self, op: Operation) -> None:
        # Mark the entry block of the top-level op's first region as live.
        if op.regions and op.regions[0].first_block:
            entry_point = ProgramPoint.at_start_of_block(op.regions[0].first_block)
            executable = self.get_or_create_state(entry_point, Executable)
            self.propagate_if_changed(executable, executable.set_to_live())

    def visit(self, point: ProgramPoint) -> None:
        op = point.op
        if op is None:
            # This analysis only triggers on operations.
            return
        # special cased for the typical case where the analysis is run on a ModuleOp:
        assert isinstance(op, ModuleOp) or not op.regions, (
            "Cannot yet handle operations with regions"
        )

        # If parent block is not live, do nothing.
        parent_block = op.parent
        if parent_block:
            assert parent_block.last_op
            assert not parent_block.last_op.successors, (
                "Block has successor blocks, which are not supported yet"
            )
            block_start_point = ProgramPoint.at_start_of_block(parent_block)
            parent_executable = self.get_or_create_state(block_start_point, Executable)
            # Create dependency: if block liveness changes, re-visit this op.
            self.add_dependency(parent_executable, point)
            if not parent_executable.live:
                return
            # Subscribe to block liveness to visit all ops if it becomes live.
            parent_executable.block_content_subscribers.add(self)

        # Assert that we don't have nested regions or control flow yet
        assert not op.regions, (
            f"Operation {op.name} has nested regions, which are not supported yet"
        )

initialize(op: Operation) -> None

Source code in xdsl/analysis/dead_code_analysis.py
125
126
127
128
129
130
def initialize(self, op: Operation) -> None:
    # Mark the entry block of the top-level op's first region as live.
    if op.regions and op.regions[0].first_block:
        entry_point = ProgramPoint.at_start_of_block(op.regions[0].first_block)
        executable = self.get_or_create_state(entry_point, Executable)
        self.propagate_if_changed(executable, executable.set_to_live())

visit(point: ProgramPoint) -> None

Source code in xdsl/analysis/dead_code_analysis.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def visit(self, point: ProgramPoint) -> None:
    op = point.op
    if op is None:
        # This analysis only triggers on operations.
        return
    # special cased for the typical case where the analysis is run on a ModuleOp:
    assert isinstance(op, ModuleOp) or not op.regions, (
        "Cannot yet handle operations with regions"
    )

    # If parent block is not live, do nothing.
    parent_block = op.parent
    if parent_block:
        assert parent_block.last_op
        assert not parent_block.last_op.successors, (
            "Block has successor blocks, which are not supported yet"
        )
        block_start_point = ProgramPoint.at_start_of_block(parent_block)
        parent_executable = self.get_or_create_state(block_start_point, Executable)
        # Create dependency: if block liveness changes, re-visit this op.
        self.add_dependency(parent_executable, point)
        if not parent_executable.live:
            return
        # Subscribe to block liveness to visit all ops if it becomes live.
        parent_executable.block_content_subscribers.add(self)

    # Assert that we don't have nested regions or control flow yet
    assert not op.regions, (
        f"Operation {op.name} has nested regions, which are not supported yet"
    )