Skip to content

observable_block

Implements the wrapper for logic blocks with output port management.

ObservableBlock

Bases: ObservableInterface

A wrapper class that encapsulates a logic block.

Manages both mathematical results (via the wrapped block) and visual output ports (via observers).

Source code in src/ecc_analyzer/core/observable_block.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class ObservableBlock(ObservableInterface):
    """A wrapper class that encapsulates a logic block.

    Manages both mathematical results (via the wrapped block) and visual output
    ports (via observers).
    """

    def __init__(self, logic_block: BlockInterface):
        """Initializes the observable wrapper.

        Args:
            logic_block (BlockInterface): The pure mathematical block to be wrapped.
        """
        self.block = logic_block
        self._observers = []

    def attach(self, observer: SafetyObserver):
        """Registers an observer.

        Args:
            observer (SafetyObserver): The SafetyObserver instance to be registered.
        """
        if observer not in self._observers:
            self._observers.append(observer)

    def run(
        self,
        spfm_in: dict[FaultType, float],
        lfm_in: dict[FaultType, float],
        input_ports: dict,
    ) -> tuple[dict[FaultType, float], dict[FaultType, float], dict]:
        """Executes calculation and collects output ports from the observer.

        Args:
            spfm_in (dict[FaultType, float]): Incoming SPFM fault rates.
            lfm_in (dict[FaultType, float]): Incoming LFM fault rates.
            input_ports (dict): Mapping of incoming node IDs for visualization.

        Returns:
            tuple[dict[FaultType, float], dict[FaultType, float], dict]: A tuple containing:
                - Updated SPFM rates.
                - Updated LFM rates.
                - Output ports dictionary from the observer.
        """
        spfm_out, lfm_out = self.block.compute_fit(spfm_in, lfm_in)

        output_ports = self.notify(input_ports, spfm_in, lfm_in, spfm_out, lfm_out)

        return spfm_out, lfm_out, output_ports

    def notify(
        self,
        input_ports: dict,
        spfm_in: dict[FaultType, float],
        lfm_in: dict[FaultType, float],
        spfm_out: dict[FaultType, float],
        lfm_out: dict[FaultType, float],
    ) -> dict:
        """Broadcasts results and returns the visual ports created by the observer.

        Args:
            input_ports (dict): Incoming visual ports.
            spfm_in (dict[FaultType, float]): Incoming SPFM rates.
            lfm_in (dict[FaultType, float]): Incoming LFM rates.
            spfm_out (dict[FaultType, float]): Outgoing SPFM rates.
            lfm_out (dict[FaultType, float]): Outgoing LFM rates.

        Returns:
            dict: The visual output ports created by the observers.
        """
        last_created_ports = {}
        for observer in self._observers:
            ports = observer.on_block_computed(self.block, input_ports, spfm_in, lfm_in, spfm_out, lfm_out)
            if ports:
                last_created_ports = ports

        return last_created_ports

__init__(logic_block)

Initializes the observable wrapper.

Parameters:

Name Type Description Default
logic_block BlockInterface

The pure mathematical block to be wrapped.

required
Source code in src/ecc_analyzer/core/observable_block.py
15
16
17
18
19
20
21
22
def __init__(self, logic_block: BlockInterface):
    """Initializes the observable wrapper.

    Args:
        logic_block (BlockInterface): The pure mathematical block to be wrapped.
    """
    self.block = logic_block
    self._observers = []

attach(observer)

Registers an observer.

Parameters:

Name Type Description Default
observer SafetyObserver

The SafetyObserver instance to be registered.

required
Source code in src/ecc_analyzer/core/observable_block.py
24
25
26
27
28
29
30
31
def attach(self, observer: SafetyObserver):
    """Registers an observer.

    Args:
        observer (SafetyObserver): The SafetyObserver instance to be registered.
    """
    if observer not in self._observers:
        self._observers.append(observer)

notify(input_ports, spfm_in, lfm_in, spfm_out, lfm_out)

Broadcasts results and returns the visual ports created by the observer.

Parameters:

Name Type Description Default
input_ports dict

Incoming visual ports.

required
spfm_in dict[FaultType, float]

Incoming SPFM rates.

required
lfm_in dict[FaultType, float]

Incoming LFM rates.

required
spfm_out dict[FaultType, float]

Outgoing SPFM rates.

required
lfm_out dict[FaultType, float]

Outgoing LFM rates.

required

Returns:

Name Type Description
dict dict

The visual output ports created by the observers.

Source code in src/ecc_analyzer/core/observable_block.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def notify(
    self,
    input_ports: dict,
    spfm_in: dict[FaultType, float],
    lfm_in: dict[FaultType, float],
    spfm_out: dict[FaultType, float],
    lfm_out: dict[FaultType, float],
) -> dict:
    """Broadcasts results and returns the visual ports created by the observer.

    Args:
        input_ports (dict): Incoming visual ports.
        spfm_in (dict[FaultType, float]): Incoming SPFM rates.
        lfm_in (dict[FaultType, float]): Incoming LFM rates.
        spfm_out (dict[FaultType, float]): Outgoing SPFM rates.
        lfm_out (dict[FaultType, float]): Outgoing LFM rates.

    Returns:
        dict: The visual output ports created by the observers.
    """
    last_created_ports = {}
    for observer in self._observers:
        ports = observer.on_block_computed(self.block, input_ports, spfm_in, lfm_in, spfm_out, lfm_out)
        if ports:
            last_created_ports = ports

    return last_created_ports

run(spfm_in, lfm_in, input_ports)

Executes calculation and collects output ports from the observer.

Parameters:

Name Type Description Default
spfm_in dict[FaultType, float]

Incoming SPFM fault rates.

required
lfm_in dict[FaultType, float]

Incoming LFM fault rates.

required
input_ports dict

Mapping of incoming node IDs for visualization.

required

Returns:

Type Description
tuple[dict[FaultType, float], dict[FaultType, float], dict]

tuple[dict[FaultType, float], dict[FaultType, float], dict]: A tuple containing: - Updated SPFM rates. - Updated LFM rates. - Output ports dictionary from the observer.

Source code in src/ecc_analyzer/core/observable_block.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def run(
    self,
    spfm_in: dict[FaultType, float],
    lfm_in: dict[FaultType, float],
    input_ports: dict,
) -> tuple[dict[FaultType, float], dict[FaultType, float], dict]:
    """Executes calculation and collects output ports from the observer.

    Args:
        spfm_in (dict[FaultType, float]): Incoming SPFM fault rates.
        lfm_in (dict[FaultType, float]): Incoming LFM fault rates.
        input_ports (dict): Mapping of incoming node IDs for visualization.

    Returns:
        tuple[dict[FaultType, float], dict[FaultType, float], dict]: A tuple containing:
            - Updated SPFM rates.
            - Updated LFM rates.
            - Output ports dictionary from the observer.
    """
    spfm_out, lfm_out = self.block.compute_fit(spfm_in, lfm_in)

    output_ports = self.notify(input_ports, spfm_in, lfm_in, spfm_out, lfm_out)

    return spfm_out, lfm_out, output_ports