Skip to content

Governance API

Governance mechanisms for controlling multi-agent system behavior.

GovernanceEngine

Main governance controller.

swarm.governance.engine.GovernanceEngine

Aggregates all governance levers and provides unified interface.

Manages the lifecycle of governance application: - Epoch start hooks (reputation decay, unfreezes) - Interaction hooks (taxes, circuit breaker, audits) - Admission control (staking)

Source code in swarm/governance/engine.py
class GovernanceEngine:
    """
    Aggregates all governance levers and provides unified interface.

    Manages the lifecycle of governance application:
    - Epoch start hooks (reputation decay, unfreezes)
    - Interaction hooks (taxes, circuit breaker, audits)
    - Admission control (staking)
    """

    def __init__(
        self,
        config: Optional[GovernanceConfig] = None,
        seed: Optional[int] = None,
    ):
        """
        Initialize the governance engine.

        Args:
            config: Governance configuration (uses defaults if None)
            seed: Random seed for reproducible audits
        """
        self.config = config or GovernanceConfig()
        self.config.validate()

        levers: List[GovernanceLever] = [
            TransactionTaxLever(self.config),
            ReputationDecayLever(self.config),
            StakingLever(self.config),
            CircuitBreakerLever(self.config),
            RandomAuditLever(self.config, seed=seed),
            CollusionPenaltyLever(self.config),
            SecurityLever(self.config, seed=seed),
        ]
        # Variance-aware levers (scaffold registration; behavior in #35)
        if self.config.self_ensemble_enabled:
            levers.append(SelfEnsembleLever(self.config))
        if self.config.incoherence_breaker_enabled:
            levers.append(IncoherenceCircuitBreakerLever(self.config))
        if self.config.decomposition_enabled:
            levers.append(DecompositionLever(self.config))
        if self.config.incoherence_friction_enabled:
            levers.append(IncoherenceFrictionLever(self.config))

        # VAE paper levers
        levers.append(TransparencyLever(self.config))
        levers.append(ModeratorLever(self.config, seed=seed))
        levers.append(SybilDetectionLever(self.config))

        # Stored as a tuple so that external code cannot mutate in place.
        self._levers: tuple[GovernanceLever, ...] = tuple(levers)

        # Keep references to specific levers for direct access
        self._staking_lever: Optional[StakingLever] = None
        self._circuit_breaker_lever: Optional[CircuitBreakerLever] = None
        self._collusion_lever: Optional[CollusionPenaltyLever] = None
        self._security_lever: Optional[SecurityLever] = None
        self._vote_normalization_lever = VoteNormalizationLever(self.config)

        for lever in self._levers:
            if isinstance(lever, StakingLever):
                self._staking_lever = lever
            elif isinstance(lever, CircuitBreakerLever):
                self._circuit_breaker_lever = lever
            elif isinstance(lever, CollusionPenaltyLever):
                self._collusion_lever = lever
            elif isinstance(lever, SecurityLever):
                self._security_lever = lever

        # Adaptive governance state
        self._incoherence_forecaster: Optional[Any] = None
        self._adaptive_risk: float = 0.0
        self._adaptive_variance_active: bool = True

    def apply_epoch_start(
        self,
        state: EnvState,
        epoch: int,
    ) -> GovernanceEffect:
        """
        Apply all epoch-start governance hooks.

        Args:
            state: Current environment state
            epoch: The epoch number starting

        Returns:
            Aggregated governance effect
        """
        effects = []
        for lever in self._iter_active_levers():
            effect = lever.on_epoch_start(state, epoch)
            if effect.lever_name:  # Non-empty effect
                effects.append(effect)
        return GovernanceEffect.from_lever_effects(effects)

    def apply_interaction(
        self,
        interaction: SoftInteraction,
        state: EnvState,
    ) -> GovernanceEffect:
        """
        Apply all per-interaction governance hooks.

        Args:
            interaction: The completed interaction
            state: Current environment state

        Returns:
            Aggregated governance effect
        """
        effects = []
        for lever in self._iter_active_levers():
            effect = lever.on_interaction(interaction, state)
            if effect.lever_name:  # Non-empty effect
                effects.append(effect)
        return GovernanceEffect.from_lever_effects(effects)

    def apply_step(
        self,
        state: EnvState,
        step: int,
    ) -> GovernanceEffect:
        """
        Apply step-level governance hooks.

        Args:
            state: Current environment state
            step: Current step index within epoch

        Returns:
            Aggregated governance effect
        """
        effects = []
        for lever in self._iter_active_levers():
            effect = lever.on_step(state, step)
            if effect.lever_name:
                effects.append(effect)
        return GovernanceEffect.from_lever_effects(effects)

    def can_agent_act(
        self,
        agent_id: str,
        state: EnvState,
    ) -> bool:
        """
        Check if agent is allowed to act (all levers must approve).

        Args:
            agent_id: Agent attempting to act
            state: Current environment state

        Returns:
            True if all levers allow the agent to act
        """
        for lever in self._iter_active_levers():
            if not lever.can_agent_act(agent_id, state):
                return False
        return True

    def compute_vote_weight(
        self,
        agent_id: str,
        vote_count: int,
    ) -> float:
        """
        Compute normalized vote weight for an agent.

        Args:
            agent_id: The voting agent
            vote_count: Number of votes cast this epoch

        Returns:
            Vote weight in (0, 1]
        """
        return self._vote_normalization_lever.compute_vote_weight(agent_id, vote_count)

    def slash_agent_stake(
        self,
        agent_id: str,
        state: EnvState,
        reason: str = "violation",
    ) -> GovernanceEffect:
        """
        Slash an agent's stake.

        Args:
            agent_id: Agent to slash
            state: Current environment state
            reason: Reason for slashing

        Returns:
            Governance effect with resource delta
        """
        if self._staking_lever is None:
            return GovernanceEffect()
        effect = self._staking_lever.slash_stake(agent_id, state, reason)
        return GovernanceEffect.from_lever_effects([effect])

    def get_circuit_breaker_status(self, agent_id: str) -> Dict:
        """Get circuit breaker status for an agent."""
        if self._circuit_breaker_lever is None:
            return {}
        return self._circuit_breaker_lever.get_freeze_status(agent_id)

    def reset_circuit_breaker(self, agent_id: str) -> None:
        """Reset circuit breaker tracking for an agent."""
        if self._circuit_breaker_lever is not None:
            self._circuit_breaker_lever.reset_tracker(agent_id)

    def set_collusion_agent_ids(self, agent_ids: List[str]) -> None:
        """Set agent IDs for collusion detection."""
        if self._collusion_lever is not None:
            self._collusion_lever.set_agent_ids(agent_ids)

    def get_collusion_report(self):
        """Get the latest collusion detection report."""
        if self._collusion_lever is None:
            return None
        return self._collusion_lever.get_report()

    def clear_collusion_history(self) -> None:
        """Clear collusion detection interaction history."""
        if self._collusion_lever is not None:
            self._collusion_lever.clear_history()

    def set_security_agent_ids(self, agent_ids: List[str]) -> None:
        """Set agent IDs for security analysis."""
        if self._security_lever is not None:
            self._security_lever.set_agent_ids(agent_ids)

    def set_security_trust_scores(self, trust_scores: Dict[str, float]) -> None:
        """Set trust scores for security analysis (laundering detection)."""
        if self._security_lever is not None:
            self._security_lever.set_agent_trust_scores(trust_scores)

    def get_security_report(self):
        """Get the latest security analysis report."""
        if self._security_lever is None:
            return None
        return self._security_lever.get_report()

    def get_active_lever_names(self) -> List[str]:
        """Return registered lever names in execution order."""
        return [lever.name for lever in self._iter_active_levers()]

    def get_registered_lever_names(self) -> List[str]:
        """Return all registered lever names (ignores adaptive gating)."""
        return [lever.name for lever in self._levers]

    def set_incoherence_forecaster(self, forecaster: Any) -> None:
        """Attach a trained forecaster used for adaptive gating."""
        self._incoherence_forecaster = forecaster

    def update_adaptive_mode(self, features: Dict[str, float]) -> float:
        """
        Update adaptive gating state from current feature vector.

        Returns:
            Predicted incoherence risk in [0, 1].
        """
        if not self.config.adaptive_governance_enabled:
            self._adaptive_variance_active = True
            self._adaptive_risk = 0.0
            return self._adaptive_risk
        if self._incoherence_forecaster is None:
            # Fail open to avoid accidental disablement without model wiring.
            self._adaptive_variance_active = True
            self._adaptive_risk = 0.0
            return self._adaptive_risk

        risk = float(self._incoherence_forecaster.predict_proba(features))
        self._adaptive_risk = risk
        self._adaptive_variance_active = (
            risk >= self.config.adaptive_incoherence_threshold
        )
        return risk

    def get_adaptive_status(self) -> Dict[str, Any]:
        """Return current adaptive gating state."""
        return {
            "adaptive_enabled": self.config.adaptive_governance_enabled,
            "variance_levers_active": self._adaptive_variance_active,
            "predicted_risk": self._adaptive_risk,
            "threshold": self.config.adaptive_incoherence_threshold,
        }

    def _iter_active_levers(self) -> List[GovernanceLever]:
        """Iterate levers after adaptive gating."""
        if not self.config.adaptive_governance_enabled:
            return list(self._levers)
        if self._incoherence_forecaster is None:
            return list(self._levers)

        variance_names = {
            "self_ensemble",
            "incoherence_breaker",
            "decomposition",
            "incoherence_friction",
        }
        active: List[GovernanceLever] = []
        for lever in self._levers:
            if lever.name in variance_names and not self._adaptive_variance_active:
                continue
            active.append(lever)
        return active

    def get_quarantined_agents(self) -> frozenset[str]:
        """Get set of quarantined agents (immutable copy)."""
        if self._security_lever is None:
            return frozenset()
        return frozenset(self._security_lever.get_quarantined_agents())

    def release_from_quarantine(self, agent_id: str) -> bool:
        """Release an agent from security quarantine."""
        if self._security_lever is None:
            return False
        return self._security_lever.release_from_quarantine(agent_id)

    def get_security_containment_actions(self) -> List[Dict]:
        """Get history of security containment actions."""
        if self._security_lever is None:
            return []
        return self._security_lever.get_containment_actions()

    def clear_security_history(self) -> None:
        """Clear security analysis history and state."""
        if self._security_lever is not None:
            self._security_lever.clear_history()

__init__(config=None, seed=None)

Initialize the governance engine.

Parameters:

Name Type Description Default
config Optional[GovernanceConfig]

Governance configuration (uses defaults if None)

None
seed Optional[int]

Random seed for reproducible audits

None
Source code in swarm/governance/engine.py
def __init__(
    self,
    config: Optional[GovernanceConfig] = None,
    seed: Optional[int] = None,
):
    """
    Initialize the governance engine.

    Args:
        config: Governance configuration (uses defaults if None)
        seed: Random seed for reproducible audits
    """
    self.config = config or GovernanceConfig()
    self.config.validate()

    levers: List[GovernanceLever] = [
        TransactionTaxLever(self.config),
        ReputationDecayLever(self.config),
        StakingLever(self.config),
        CircuitBreakerLever(self.config),
        RandomAuditLever(self.config, seed=seed),
        CollusionPenaltyLever(self.config),
        SecurityLever(self.config, seed=seed),
    ]
    # Variance-aware levers (scaffold registration; behavior in #35)
    if self.config.self_ensemble_enabled:
        levers.append(SelfEnsembleLever(self.config))
    if self.config.incoherence_breaker_enabled:
        levers.append(IncoherenceCircuitBreakerLever(self.config))
    if self.config.decomposition_enabled:
        levers.append(DecompositionLever(self.config))
    if self.config.incoherence_friction_enabled:
        levers.append(IncoherenceFrictionLever(self.config))

    # VAE paper levers
    levers.append(TransparencyLever(self.config))
    levers.append(ModeratorLever(self.config, seed=seed))
    levers.append(SybilDetectionLever(self.config))

    # Stored as a tuple so that external code cannot mutate in place.
    self._levers: tuple[GovernanceLever, ...] = tuple(levers)

    # Keep references to specific levers for direct access
    self._staking_lever: Optional[StakingLever] = None
    self._circuit_breaker_lever: Optional[CircuitBreakerLever] = None
    self._collusion_lever: Optional[CollusionPenaltyLever] = None
    self._security_lever: Optional[SecurityLever] = None
    self._vote_normalization_lever = VoteNormalizationLever(self.config)

    for lever in self._levers:
        if isinstance(lever, StakingLever):
            self._staking_lever = lever
        elif isinstance(lever, CircuitBreakerLever):
            self._circuit_breaker_lever = lever
        elif isinstance(lever, CollusionPenaltyLever):
            self._collusion_lever = lever
        elif isinstance(lever, SecurityLever):
            self._security_lever = lever

    # Adaptive governance state
    self._incoherence_forecaster: Optional[Any] = None
    self._adaptive_risk: float = 0.0
    self._adaptive_variance_active: bool = True

apply_epoch_start(state, epoch)

Apply all epoch-start governance hooks.

Parameters:

Name Type Description Default
state EnvState

Current environment state

required
epoch int

The epoch number starting

required

Returns:

Type Description
GovernanceEffect

Aggregated governance effect

Source code in swarm/governance/engine.py
def apply_epoch_start(
    self,
    state: EnvState,
    epoch: int,
) -> GovernanceEffect:
    """
    Apply all epoch-start governance hooks.

    Args:
        state: Current environment state
        epoch: The epoch number starting

    Returns:
        Aggregated governance effect
    """
    effects = []
    for lever in self._iter_active_levers():
        effect = lever.on_epoch_start(state, epoch)
        if effect.lever_name:  # Non-empty effect
            effects.append(effect)
    return GovernanceEffect.from_lever_effects(effects)

apply_interaction(interaction, state)

Apply all per-interaction governance hooks.

Parameters:

Name Type Description Default
interaction SoftInteraction

The completed interaction

required
state EnvState

Current environment state

required

Returns:

Type Description
GovernanceEffect

Aggregated governance effect

Source code in swarm/governance/engine.py
def apply_interaction(
    self,
    interaction: SoftInteraction,
    state: EnvState,
) -> GovernanceEffect:
    """
    Apply all per-interaction governance hooks.

    Args:
        interaction: The completed interaction
        state: Current environment state

    Returns:
        Aggregated governance effect
    """
    effects = []
    for lever in self._iter_active_levers():
        effect = lever.on_interaction(interaction, state)
        if effect.lever_name:  # Non-empty effect
            effects.append(effect)
    return GovernanceEffect.from_lever_effects(effects)

apply_step(state, step)

Apply step-level governance hooks.

Parameters:

Name Type Description Default
state EnvState

Current environment state

required
step int

Current step index within epoch

required

Returns:

Type Description
GovernanceEffect

Aggregated governance effect

Source code in swarm/governance/engine.py
def apply_step(
    self,
    state: EnvState,
    step: int,
) -> GovernanceEffect:
    """
    Apply step-level governance hooks.

    Args:
        state: Current environment state
        step: Current step index within epoch

    Returns:
        Aggregated governance effect
    """
    effects = []
    for lever in self._iter_active_levers():
        effect = lever.on_step(state, step)
        if effect.lever_name:
            effects.append(effect)
    return GovernanceEffect.from_lever_effects(effects)

can_agent_act(agent_id, state)

Check if agent is allowed to act (all levers must approve).

Parameters:

Name Type Description Default
agent_id str

Agent attempting to act

required
state EnvState

Current environment state

required

Returns:

Type Description
bool

True if all levers allow the agent to act

Source code in swarm/governance/engine.py
def can_agent_act(
    self,
    agent_id: str,
    state: EnvState,
) -> bool:
    """
    Check if agent is allowed to act (all levers must approve).

    Args:
        agent_id: Agent attempting to act
        state: Current environment state

    Returns:
        True if all levers allow the agent to act
    """
    for lever in self._iter_active_levers():
        if not lever.can_agent_act(agent_id, state):
            return False
    return True

clear_collusion_history()

Clear collusion detection interaction history.

Source code in swarm/governance/engine.py
def clear_collusion_history(self) -> None:
    """Clear collusion detection interaction history."""
    if self._collusion_lever is not None:
        self._collusion_lever.clear_history()

clear_security_history()

Clear security analysis history and state.

Source code in swarm/governance/engine.py
def clear_security_history(self) -> None:
    """Clear security analysis history and state."""
    if self._security_lever is not None:
        self._security_lever.clear_history()

compute_vote_weight(agent_id, vote_count)

Compute normalized vote weight for an agent.

Parameters:

Name Type Description Default
agent_id str

The voting agent

required
vote_count int

Number of votes cast this epoch

required

Returns:

Type Description
float

Vote weight in (0, 1]

Source code in swarm/governance/engine.py
def compute_vote_weight(
    self,
    agent_id: str,
    vote_count: int,
) -> float:
    """
    Compute normalized vote weight for an agent.

    Args:
        agent_id: The voting agent
        vote_count: Number of votes cast this epoch

    Returns:
        Vote weight in (0, 1]
    """
    return self._vote_normalization_lever.compute_vote_weight(agent_id, vote_count)

get_active_lever_names()

Return registered lever names in execution order.

Source code in swarm/governance/engine.py
def get_active_lever_names(self) -> List[str]:
    """Return registered lever names in execution order."""
    return [lever.name for lever in self._iter_active_levers()]

get_adaptive_status()

Return current adaptive gating state.

Source code in swarm/governance/engine.py
def get_adaptive_status(self) -> Dict[str, Any]:
    """Return current adaptive gating state."""
    return {
        "adaptive_enabled": self.config.adaptive_governance_enabled,
        "variance_levers_active": self._adaptive_variance_active,
        "predicted_risk": self._adaptive_risk,
        "threshold": self.config.adaptive_incoherence_threshold,
    }

get_circuit_breaker_status(agent_id)

Get circuit breaker status for an agent.

Source code in swarm/governance/engine.py
def get_circuit_breaker_status(self, agent_id: str) -> Dict:
    """Get circuit breaker status for an agent."""
    if self._circuit_breaker_lever is None:
        return {}
    return self._circuit_breaker_lever.get_freeze_status(agent_id)

get_collusion_report()

Get the latest collusion detection report.

Source code in swarm/governance/engine.py
def get_collusion_report(self):
    """Get the latest collusion detection report."""
    if self._collusion_lever is None:
        return None
    return self._collusion_lever.get_report()

get_quarantined_agents()

Get set of quarantined agents (immutable copy).

Source code in swarm/governance/engine.py
def get_quarantined_agents(self) -> frozenset[str]:
    """Get set of quarantined agents (immutable copy)."""
    if self._security_lever is None:
        return frozenset()
    return frozenset(self._security_lever.get_quarantined_agents())

get_registered_lever_names()

Return all registered lever names (ignores adaptive gating).

Source code in swarm/governance/engine.py
def get_registered_lever_names(self) -> List[str]:
    """Return all registered lever names (ignores adaptive gating)."""
    return [lever.name for lever in self._levers]

get_security_containment_actions()

Get history of security containment actions.

Source code in swarm/governance/engine.py
def get_security_containment_actions(self) -> List[Dict]:
    """Get history of security containment actions."""
    if self._security_lever is None:
        return []
    return self._security_lever.get_containment_actions()

get_security_report()

Get the latest security analysis report.

Source code in swarm/governance/engine.py
def get_security_report(self):
    """Get the latest security analysis report."""
    if self._security_lever is None:
        return None
    return self._security_lever.get_report()

release_from_quarantine(agent_id)

Release an agent from security quarantine.

Source code in swarm/governance/engine.py
def release_from_quarantine(self, agent_id: str) -> bool:
    """Release an agent from security quarantine."""
    if self._security_lever is None:
        return False
    return self._security_lever.release_from_quarantine(agent_id)

reset_circuit_breaker(agent_id)

Reset circuit breaker tracking for an agent.

Source code in swarm/governance/engine.py
def reset_circuit_breaker(self, agent_id: str) -> None:
    """Reset circuit breaker tracking for an agent."""
    if self._circuit_breaker_lever is not None:
        self._circuit_breaker_lever.reset_tracker(agent_id)

set_collusion_agent_ids(agent_ids)

Set agent IDs for collusion detection.

Source code in swarm/governance/engine.py
def set_collusion_agent_ids(self, agent_ids: List[str]) -> None:
    """Set agent IDs for collusion detection."""
    if self._collusion_lever is not None:
        self._collusion_lever.set_agent_ids(agent_ids)

set_incoherence_forecaster(forecaster)

Attach a trained forecaster used for adaptive gating.

Source code in swarm/governance/engine.py
def set_incoherence_forecaster(self, forecaster: Any) -> None:
    """Attach a trained forecaster used for adaptive gating."""
    self._incoherence_forecaster = forecaster

set_security_agent_ids(agent_ids)

Set agent IDs for security analysis.

Source code in swarm/governance/engine.py
def set_security_agent_ids(self, agent_ids: List[str]) -> None:
    """Set agent IDs for security analysis."""
    if self._security_lever is not None:
        self._security_lever.set_agent_ids(agent_ids)

set_security_trust_scores(trust_scores)

Set trust scores for security analysis (laundering detection).

Source code in swarm/governance/engine.py
def set_security_trust_scores(self, trust_scores: Dict[str, float]) -> None:
    """Set trust scores for security analysis (laundering detection)."""
    if self._security_lever is not None:
        self._security_lever.set_agent_trust_scores(trust_scores)

slash_agent_stake(agent_id, state, reason='violation')

Slash an agent's stake.

Parameters:

Name Type Description Default
agent_id str

Agent to slash

required
state EnvState

Current environment state

required
reason str

Reason for slashing

'violation'

Returns:

Type Description
GovernanceEffect

Governance effect with resource delta

Source code in swarm/governance/engine.py
def slash_agent_stake(
    self,
    agent_id: str,
    state: EnvState,
    reason: str = "violation",
) -> GovernanceEffect:
    """
    Slash an agent's stake.

    Args:
        agent_id: Agent to slash
        state: Current environment state
        reason: Reason for slashing

    Returns:
        Governance effect with resource delta
    """
    if self._staking_lever is None:
        return GovernanceEffect()
    effect = self._staking_lever.slash_stake(agent_id, state, reason)
    return GovernanceEffect.from_lever_effects([effect])

update_adaptive_mode(features)

Update adaptive gating state from current feature vector.

Returns:

Type Description
float

Predicted incoherence risk in [0, 1].

Source code in swarm/governance/engine.py
def update_adaptive_mode(self, features: Dict[str, float]) -> float:
    """
    Update adaptive gating state from current feature vector.

    Returns:
        Predicted incoherence risk in [0, 1].
    """
    if not self.config.adaptive_governance_enabled:
        self._adaptive_variance_active = True
        self._adaptive_risk = 0.0
        return self._adaptive_risk
    if self._incoherence_forecaster is None:
        # Fail open to avoid accidental disablement without model wiring.
        self._adaptive_variance_active = True
        self._adaptive_risk = 0.0
        return self._adaptive_risk

    risk = float(self._incoherence_forecaster.predict_proba(features))
    self._adaptive_risk = risk
    self._adaptive_variance_active = (
        risk >= self.config.adaptive_incoherence_threshold
    )
    return risk

Usage

from swarm.governance import GovernanceEngine, GovernanceConfig

config = GovernanceConfig(
    transaction_tax=0.02,
    reputation_decay=0.1,
    circuit_breaker_threshold=0.3,
)

engine = GovernanceEngine(config)

# Check if agent can act
if engine.can_agent_act(agent_id):
    # Process action
    pass

# Record interaction
engine.on_interaction(interaction)

# Get governance costs
costs = engine.compute_costs(interaction)

GovernanceConfig

Configuration for all governance levers.

Parameter Type Default Description
transaction_tax float 0.0 Tax per interaction
reputation_decay float 0.0 Decay rate per epoch
initial_reputation float 1.0 Starting reputation
circuit_breaker_threshold float 1.0 Toxicity threshold
circuit_breaker_window int 10 Lookback window
circuit_breaker_cooldown int 5 Freeze duration
audit_probability float 0.0 Audit frequency
audit_penalty float 0.0 Failed audit cost
staking_requirement float 0.0 Minimum stake
stake_slash_rate float 0.0 Slash fraction
collusion_detection bool False Enable detection
collusion_threshold float 0.8 Correlation threshold

Individual Levers

TransactionTax

from swarm.governance.levers import TransactionTax

tax = TransactionTax(rate=0.02)
cost = tax.compute(interaction)

ReputationDecay

from swarm.governance.levers import ReputationDecay

decay = ReputationDecay(rate=0.1)
new_rep = decay.apply(current_rep, epoch_delta=1)

CircuitBreaker

from swarm.governance.levers import CircuitBreaker

breaker = CircuitBreaker(
    threshold=0.3,
    window=10,
    cooldown=5,
)

# Check status
if breaker.is_frozen(agent_id):
    return  # Agent cannot act

# Record interaction
breaker.record(agent_id, toxicity=0.4)

# Check if triggered
if breaker.should_freeze(agent_id):
    breaker.freeze(agent_id)

RandomAudit

from swarm.governance.levers import RandomAudit

audit = RandomAudit(probability=0.05, penalty=0.5)

if audit.should_audit():
    result = audit.execute(interaction)
    if not result.passed:
        apply_penalty(interaction.initiator, audit.penalty)

Staking

from swarm.governance.levers import Staking

staking = Staking(
    requirement=10.0,
    slash_rate=0.1,
)

# Check eligibility
if not staking.can_participate(agent_id):
    return  # Insufficient stake

# Slash on bad behavior
staking.slash(agent_id, reason="failed_audit")

Collusion Detection

from swarm.governance.collusion import CollusionDetector

detector = CollusionDetector(
    threshold=0.8,
    window=20,
)

# Record interactions
detector.record(agent_a, agent_b, interaction)

# Check for collusion
colluding_pairs = detector.detect()
for pair, score in colluding_pairs:
    print(f"Potential collusion: {pair} (score: {score:.2f})")

Hooks

Governance integrates with the orchestrator via hooks:

orchestrator = Orchestrator(
    config=config,
    governance=governance_engine,
)

# Hooks are called automatically:
# - on_epoch_start
# - on_interaction
# - on_epoch_end

Custom Levers

Create custom governance mechanisms:

from swarm.governance.levers import GovernanceLever

class CustomLever(GovernanceLever):
    def __init__(self, param: float):
        self.param = param

    def compute_cost(self, interaction) -> float:
        # Custom cost computation
        return interaction.p * self.param

    def should_block(self, agent_id: str) -> bool:
        # Custom blocking logic
        return False