Skip to content

Governance API

Governance mechanisms for controlling multi-agent system behavior.

GovernanceEngine

Main governance controller.

swarm.governance.engine.GovernanceEngine

Aggregates all governance levers and provides unified interface.

Manages the lifecycle of governance application: - Epoch start hooks (reputation decay, unfreezes) - Interaction hooks (taxes, circuit breaker, audits) - Admission control (staking)

Source code in swarm/governance/engine.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class GovernanceEngine:
    """
    Aggregates all governance levers and provides unified interface.

    Manages the lifecycle of governance application:
    - Epoch start hooks (reputation decay, unfreezes)
    - Interaction hooks (taxes, circuit breaker, audits)
    - Admission control (staking)
    """

    def __init__(
        self,
        config: Optional[GovernanceConfig] = None,
        seed: Optional[int] = None,
        council: Optional[Any] = None,
    ):
        """
        Initialize the governance engine.

        Args:
            config: Governance configuration (uses defaults if None)
            seed: Random seed for reproducible audits
        """
        self.config = GovernanceConfig() if config is None else config
        # Pydantic auto-validates

        levers: List[GovernanceLever] = [
            RefineryLever(self.config),
            TransactionTaxLever(self.config),
            ReputationDecayLever(self.config),
            StakingLever(self.config),
            CircuitBreakerLever(self.config),
            RandomAuditLever(self.config, seed=seed),
            CollusionPenaltyLever(self.config),
            SecurityLever(self.config, seed=seed),
            PairCapLever(self.config),
            PageCooldownLever(self.config),
            DailyPointCapLever(self.config),
            NoSelfFixLever(self.config),
        ]
        if self.config.moltbook_rate_limit_enabled:
            levers.append(MoltbookRateLimitLever(self.config))
        if self.config.moltbook_challenge_enabled:
            levers.append(ChallengeVerificationLever(self.config))
        # Memory tier levers
        levers.append(PromotionGateLever(self.config))
        levers.append(WriteRateLimitLever(self.config))
        levers.append(CrossVerificationLever(self.config))
        levers.append(ProvenanceLever(self.config))
        # Variance-aware levers (scaffold registration; behavior in #35)
        if self.config.self_ensemble_enabled:
            levers.append(SelfEnsembleLever(self.config))
        if self.config.incoherence_breaker_enabled:
            levers.append(IncoherenceCircuitBreakerLever(self.config))
        if self.config.decomposition_enabled:
            levers.append(DecompositionLever(self.config))
        if self.config.incoherence_friction_enabled:
            levers.append(IncoherenceFrictionLever(self.config))

        # VAE paper levers
        levers.append(TransparencyLever(self.config))
        levers.append(ModeratorLever(self.config, seed=seed))
        levers.append(SybilDetectionLever(self.config))

        # Council governance lever (requires external council instance)
        if self.config.council_lever_enabled and council is not None:
            levers.append(CouncilGovernanceLever(self.config, council=council, seed=seed))

        # Diversity as Defense lever
        levers.append(DiversityDefenseLever(self.config))

        # Loop detector lever
        if self.config.loop_detector_enabled:
            levers.append(LoopDetectorLever(self.config))

        # Self-modification governance lever
        if self.config.self_modification_enabled:
            levers.append(SelfModificationLever(self.config))

        # RBAC lever
        if self.config.rbac_enabled:
            levers.append(RBACLever(self.config))

        # Stored as a tuple so that external code cannot mutate in place.
        self._levers: tuple[GovernanceLever, ...] = tuple(levers)

        # Keep references to specific levers for direct access
        self._staking_lever: Optional[StakingLever] = None
        self._circuit_breaker_lever: Optional[CircuitBreakerLever] = None
        self._collusion_lever: Optional[CollusionPenaltyLever] = None
        self._security_lever: Optional[SecurityLever] = None
        self._vote_normalization_lever = VoteNormalizationLever(self.config)
        self._moltbook_rate_limit_lever: Optional[MoltbookRateLimitLever] = None
        self._moltbook_challenge_lever: Optional[ChallengeVerificationLever] = None
        self._diversity_lever: Optional[DiversityDefenseLever] = None
        self._loop_detector_lever: Optional[LoopDetectorLever] = None
        self._self_modification_lever: Optional[SelfModificationLever] = None
        self._rbac_lever: Optional[RBACLever] = None

        for lever in self._levers:
            if isinstance(lever, StakingLever):
                self._staking_lever = lever
            elif isinstance(lever, CircuitBreakerLever):
                self._circuit_breaker_lever = lever
            elif isinstance(lever, CollusionPenaltyLever):
                self._collusion_lever = lever
            elif isinstance(lever, SecurityLever):
                self._security_lever = lever
            elif isinstance(lever, MoltbookRateLimitLever):
                self._moltbook_rate_limit_lever = lever
            elif isinstance(lever, ChallengeVerificationLever):
                self._moltbook_challenge_lever = lever
            elif isinstance(lever, DiversityDefenseLever):
                self._diversity_lever = lever
            elif isinstance(lever, LoopDetectorLever):
                self._loop_detector_lever = lever
            elif isinstance(lever, SelfModificationLever):
                self._self_modification_lever = lever
            elif isinstance(lever, RBACLever):
                self._rbac_lever = lever

        # Adaptive governance state
        self._incoherence_forecaster: Optional[Any] = None
        self._adaptive_risk: float = 0.0
        self._adaptive_variance_active: bool = True

    def get_moltbook_rate_limit_lever(self) -> Optional[MoltbookRateLimitLever]:
        """Return Moltbook rate limit lever if registered."""
        return self._moltbook_rate_limit_lever

    def get_moltbook_challenge_lever(self) -> Optional[ChallengeVerificationLever]:
        """Return Moltbook challenge lever if registered."""
        return self._moltbook_challenge_lever

    def apply_epoch_start(
        self,
        state: EnvState,
        epoch: int,
    ) -> GovernanceEffect:
        """
        Apply all epoch-start governance hooks.

        Args:
            state: Current environment state
            epoch: The epoch number starting

        Returns:
            Aggregated governance effect
        """
        effects = []
        for lever in self._iter_active_levers():
            effect = lever.on_epoch_start(state, epoch)
            if effect.lever_name:  # Non-empty effect
                effects.append(effect)
        return GovernanceEffect.from_lever_effects(effects)

    def apply_interaction(
        self,
        interaction: SoftInteraction,
        state: EnvState,
    ) -> GovernanceEffect:
        """
        Apply all per-interaction governance hooks.

        Args:
            interaction: The completed interaction
            state: Current environment state

        Returns:
            Aggregated governance effect
        """
        effects = []
        for lever in self._iter_active_levers():
            effect = lever.on_interaction(interaction, state)
            if effect.lever_name:  # Non-empty effect
                effects.append(effect)
        return GovernanceEffect.from_lever_effects(effects)

    def apply_step(
        self,
        state: EnvState,
        step: int,
    ) -> GovernanceEffect:
        """
        Apply step-level governance hooks.

        Args:
            state: Current environment state
            step: Current step index within epoch

        Returns:
            Aggregated governance effect
        """
        effects = []
        for lever in self._iter_active_levers():
            effect = lever.on_step(state, step)
            if effect.lever_name:
                effects.append(effect)
        return GovernanceEffect.from_lever_effects(effects)

    def can_agent_act(
        self,
        agent_id: str,
        state: EnvState,
    ) -> bool:
        """
        Check if agent is allowed to act (all levers must approve).

        Args:
            agent_id: Agent attempting to act
            state: Current environment state

        Returns:
            True if all levers allow the agent to act
        """
        for lever in self._iter_active_levers():
            if not lever.can_agent_act(agent_id, state):
                return False
        return True

    def compute_vote_weight(
        self,
        agent_id: str,
        vote_count: int,
    ) -> float:
        """
        Compute normalized vote weight for an agent.

        Args:
            agent_id: The voting agent
            vote_count: Number of votes cast this epoch

        Returns:
            Vote weight in (0, 1]
        """
        return float(self._vote_normalization_lever.compute_vote_weight(agent_id, vote_count))

    def slash_agent_stake(
        self,
        agent_id: str,
        state: EnvState,
        reason: str = "violation",
    ) -> GovernanceEffect:
        """
        Slash an agent's stake.

        Args:
            agent_id: Agent to slash
            state: Current environment state
            reason: Reason for slashing

        Returns:
            Governance effect with resource delta
        """
        if self._staking_lever is None:
            return GovernanceEffect()
        effect = self._staking_lever.slash_stake(agent_id, state, reason)
        return GovernanceEffect.from_lever_effects([effect])

    def get_circuit_breaker_status(self, agent_id: str) -> Dict[str, Any]:
        """Get circuit breaker status for an agent."""
        if self._circuit_breaker_lever is None:
            return {}
        return dict(self._circuit_breaker_lever.get_freeze_status(agent_id))

    def reset_circuit_breaker(self, agent_id: str) -> None:
        """Reset circuit breaker tracking for an agent."""
        if self._circuit_breaker_lever is not None:
            self._circuit_breaker_lever.reset_tracker(agent_id)

    def set_collusion_agent_ids(self, agent_ids: List[str]) -> None:
        """Set agent IDs for collusion detection."""
        if self._collusion_lever is not None:
            self._collusion_lever.set_agent_ids(agent_ids)

    def get_collusion_report(self):
        """Get the latest collusion detection report."""
        if self._collusion_lever is None:
            return None
        return self._collusion_lever.get_report()

    def clear_collusion_history(self) -> None:
        """Clear collusion detection interaction history."""
        if self._collusion_lever is not None:
            self._collusion_lever.clear_history()

    def set_security_agent_ids(self, agent_ids: List[str]) -> None:
        """Set agent IDs for security analysis."""
        if self._security_lever is not None:
            self._security_lever.set_agent_ids(agent_ids)

    def set_security_trust_scores(self, trust_scores: Dict[str, float]) -> None:
        """Set trust scores for security analysis (laundering detection)."""
        if self._security_lever is not None:
            self._security_lever.set_agent_trust_scores(trust_scores)

    def get_security_report(self):
        """Get the latest security analysis report."""
        if self._security_lever is None:
            return None
        return self._security_lever.get_report()

    def get_active_lever_names(self) -> List[str]:
        """Return registered lever names in execution order."""
        return [lever.name for lever in self._iter_active_levers()]

    def get_registered_lever_names(self) -> List[str]:
        """Return all registered lever names (ignores adaptive gating)."""
        return [lever.name for lever in self._levers]

    def set_incoherence_forecaster(self, forecaster: Any) -> None:
        """Attach a trained forecaster used for adaptive gating."""
        self._incoherence_forecaster = forecaster

    def update_adaptive_mode(self, features: Dict[str, float]) -> float:
        """
        Update adaptive gating state from current feature vector.

        Returns:
            Predicted incoherence risk in [0, 1].
        """
        if not self.config.adaptive_governance_enabled:
            self._adaptive_variance_active = True
            self._adaptive_risk = 0.0
            return self._adaptive_risk
        if self._incoherence_forecaster is None:
            # Fail open to avoid accidental disablement without model wiring.
            self._adaptive_variance_active = True
            self._adaptive_risk = 0.0
            return self._adaptive_risk

        risk = float(self._incoherence_forecaster.predict_proba(features))
        self._adaptive_risk = risk
        self._adaptive_variance_active = (
            risk >= self.config.adaptive_incoherence_threshold
        )
        return risk

    def get_adaptive_status(self) -> Dict[str, Any]:
        """Return current adaptive gating state."""
        return {
            "adaptive_enabled": self.config.adaptive_governance_enabled,
            "variance_levers_active": self._adaptive_variance_active,
            "predicted_risk": self._adaptive_risk,
            "threshold": self.config.adaptive_incoherence_threshold,
        }

    def _iter_active_levers(self) -> List[GovernanceLever]:
        """Iterate levers after adaptive gating."""
        if not self.config.adaptive_governance_enabled:
            return list(self._levers)
        if self._incoherence_forecaster is None:
            return list(self._levers)

        variance_names = {
            "self_ensemble",
            "incoherence_breaker",
            "decomposition",
            "incoherence_friction",
        }
        active: List[GovernanceLever] = []
        for lever in self._levers:
            if lever.name in variance_names and not self._adaptive_variance_active:
                continue
            active.append(lever)
        return active

    def get_quarantined_agents(self) -> frozenset[str]:
        """Get set of quarantined agents (immutable copy)."""
        if self._security_lever is None:
            return frozenset()
        return frozenset(self._security_lever.get_quarantined_agents())

    def release_from_quarantine(self, agent_id: str) -> bool:
        """Release an agent from security quarantine."""
        if self._security_lever is None:
            return False
        return bool(self._security_lever.release_from_quarantine(agent_id))

    def get_security_containment_actions(self) -> List[Dict[str, Any]]:
        """Get history of security containment actions."""
        if self._security_lever is None:
            return []
        return list(self._security_lever.get_containment_actions())

    def clear_security_history(self) -> None:
        """Clear security analysis history and state."""
        if self._security_lever is not None:
            self._security_lever.clear_history()

    def get_diversity_metrics(self) -> Optional[Any]:
        """Return the latest diversity metrics snapshot."""
        if self._diversity_lever is None:
            return None
        return self._diversity_lever.get_metrics()

    def get_diversity_lever(self) -> Optional[DiversityDefenseLever]:
        """Return the diversity defense lever if registered."""
        return self._diversity_lever

    def get_loop_detector_lever(self) -> Optional[LoopDetectorLever]:
        """Return the loop detector lever if registered."""
        return self._loop_detector_lever

    def get_self_modification_lever(self) -> Optional[SelfModificationLever]:
        """Return the self-modification lever if registered."""
        return self._self_modification_lever

    def get_rbac_lever(self) -> Optional[RBACLever]:
        """Return the RBAC lever if registered."""
        return self._rbac_lever

    def set_rbac_agent_roles(self, agent_roles: Dict[str, List[str]]) -> None:
        """Set agent roles for RBAC enforcement."""
        if self._rbac_lever is not None:
            self._rbac_lever.set_agent_roles(agent_roles)

    def set_rbac_security_clearances(self, clearances: Dict[str, int]) -> None:
        """Set security clearances for RBAC enforcement."""
        if self._rbac_lever is not None:
            self._rbac_lever.set_security_clearances(clearances)

__init__(config=None, seed=None, council=None)

Initialize the governance engine.

Parameters:

Name Type Description Default
config Optional[GovernanceConfig]

Governance configuration (uses defaults if None)

None
seed Optional[int]

Random seed for reproducible audits

None
Source code in swarm/governance/engine.py
def __init__(
    self,
    config: Optional[GovernanceConfig] = None,
    seed: Optional[int] = None,
    council: Optional[Any] = None,
):
    """
    Initialize the governance engine.

    Args:
        config: Governance configuration (uses defaults if None)
        seed: Random seed for reproducible audits
    """
    self.config = GovernanceConfig() if config is None else config
    # Pydantic auto-validates

    levers: List[GovernanceLever] = [
        RefineryLever(self.config),
        TransactionTaxLever(self.config),
        ReputationDecayLever(self.config),
        StakingLever(self.config),
        CircuitBreakerLever(self.config),
        RandomAuditLever(self.config, seed=seed),
        CollusionPenaltyLever(self.config),
        SecurityLever(self.config, seed=seed),
        PairCapLever(self.config),
        PageCooldownLever(self.config),
        DailyPointCapLever(self.config),
        NoSelfFixLever(self.config),
    ]
    if self.config.moltbook_rate_limit_enabled:
        levers.append(MoltbookRateLimitLever(self.config))
    if self.config.moltbook_challenge_enabled:
        levers.append(ChallengeVerificationLever(self.config))
    # Memory tier levers
    levers.append(PromotionGateLever(self.config))
    levers.append(WriteRateLimitLever(self.config))
    levers.append(CrossVerificationLever(self.config))
    levers.append(ProvenanceLever(self.config))
    # Variance-aware levers (scaffold registration; behavior in #35)
    if self.config.self_ensemble_enabled:
        levers.append(SelfEnsembleLever(self.config))
    if self.config.incoherence_breaker_enabled:
        levers.append(IncoherenceCircuitBreakerLever(self.config))
    if self.config.decomposition_enabled:
        levers.append(DecompositionLever(self.config))
    if self.config.incoherence_friction_enabled:
        levers.append(IncoherenceFrictionLever(self.config))

    # VAE paper levers
    levers.append(TransparencyLever(self.config))
    levers.append(ModeratorLever(self.config, seed=seed))
    levers.append(SybilDetectionLever(self.config))

    # Council governance lever (requires external council instance)
    if self.config.council_lever_enabled and council is not None:
        levers.append(CouncilGovernanceLever(self.config, council=council, seed=seed))

    # Diversity as Defense lever
    levers.append(DiversityDefenseLever(self.config))

    # Loop detector lever
    if self.config.loop_detector_enabled:
        levers.append(LoopDetectorLever(self.config))

    # Self-modification governance lever
    if self.config.self_modification_enabled:
        levers.append(SelfModificationLever(self.config))

    # RBAC lever
    if self.config.rbac_enabled:
        levers.append(RBACLever(self.config))

    # Stored as a tuple so that external code cannot mutate in place.
    self._levers: tuple[GovernanceLever, ...] = tuple(levers)

    # Keep references to specific levers for direct access
    self._staking_lever: Optional[StakingLever] = None
    self._circuit_breaker_lever: Optional[CircuitBreakerLever] = None
    self._collusion_lever: Optional[CollusionPenaltyLever] = None
    self._security_lever: Optional[SecurityLever] = None
    self._vote_normalization_lever = VoteNormalizationLever(self.config)
    self._moltbook_rate_limit_lever: Optional[MoltbookRateLimitLever] = None
    self._moltbook_challenge_lever: Optional[ChallengeVerificationLever] = None
    self._diversity_lever: Optional[DiversityDefenseLever] = None
    self._loop_detector_lever: Optional[LoopDetectorLever] = None
    self._self_modification_lever: Optional[SelfModificationLever] = None
    self._rbac_lever: Optional[RBACLever] = None

    for lever in self._levers:
        if isinstance(lever, StakingLever):
            self._staking_lever = lever
        elif isinstance(lever, CircuitBreakerLever):
            self._circuit_breaker_lever = lever
        elif isinstance(lever, CollusionPenaltyLever):
            self._collusion_lever = lever
        elif isinstance(lever, SecurityLever):
            self._security_lever = lever
        elif isinstance(lever, MoltbookRateLimitLever):
            self._moltbook_rate_limit_lever = lever
        elif isinstance(lever, ChallengeVerificationLever):
            self._moltbook_challenge_lever = lever
        elif isinstance(lever, DiversityDefenseLever):
            self._diversity_lever = lever
        elif isinstance(lever, LoopDetectorLever):
            self._loop_detector_lever = lever
        elif isinstance(lever, SelfModificationLever):
            self._self_modification_lever = lever
        elif isinstance(lever, RBACLever):
            self._rbac_lever = lever

    # Adaptive governance state
    self._incoherence_forecaster: Optional[Any] = None
    self._adaptive_risk: float = 0.0
    self._adaptive_variance_active: bool = True

apply_epoch_start(state, epoch)

Apply all epoch-start governance hooks.

Parameters:

Name Type Description Default
state EnvState

Current environment state

required
epoch int

The epoch number starting

required

Returns:

Type Description
GovernanceEffect

Aggregated governance effect

Source code in swarm/governance/engine.py
def apply_epoch_start(
    self,
    state: EnvState,
    epoch: int,
) -> GovernanceEffect:
    """
    Apply all epoch-start governance hooks.

    Args:
        state: Current environment state
        epoch: The epoch number starting

    Returns:
        Aggregated governance effect
    """
    effects = []
    for lever in self._iter_active_levers():
        effect = lever.on_epoch_start(state, epoch)
        if effect.lever_name:  # Non-empty effect
            effects.append(effect)
    return GovernanceEffect.from_lever_effects(effects)

apply_interaction(interaction, state)

Apply all per-interaction governance hooks.

Parameters:

Name Type Description Default
interaction SoftInteraction

The completed interaction

required
state EnvState

Current environment state

required

Returns:

Type Description
GovernanceEffect

Aggregated governance effect

Source code in swarm/governance/engine.py
def apply_interaction(
    self,
    interaction: SoftInteraction,
    state: EnvState,
) -> GovernanceEffect:
    """
    Apply all per-interaction governance hooks.

    Args:
        interaction: The completed interaction
        state: Current environment state

    Returns:
        Aggregated governance effect
    """
    effects = []
    for lever in self._iter_active_levers():
        effect = lever.on_interaction(interaction, state)
        if effect.lever_name:  # Non-empty effect
            effects.append(effect)
    return GovernanceEffect.from_lever_effects(effects)

apply_step(state, step)

Apply step-level governance hooks.

Parameters:

Name Type Description Default
state EnvState

Current environment state

required
step int

Current step index within epoch

required

Returns:

Type Description
GovernanceEffect

Aggregated governance effect

Source code in swarm/governance/engine.py
def apply_step(
    self,
    state: EnvState,
    step: int,
) -> GovernanceEffect:
    """
    Apply step-level governance hooks.

    Args:
        state: Current environment state
        step: Current step index within epoch

    Returns:
        Aggregated governance effect
    """
    effects = []
    for lever in self._iter_active_levers():
        effect = lever.on_step(state, step)
        if effect.lever_name:
            effects.append(effect)
    return GovernanceEffect.from_lever_effects(effects)

can_agent_act(agent_id, state)

Check if agent is allowed to act (all levers must approve).

Parameters:

Name Type Description Default
agent_id str

Agent attempting to act

required
state EnvState

Current environment state

required

Returns:

Type Description
bool

True if all levers allow the agent to act

Source code in swarm/governance/engine.py
def can_agent_act(
    self,
    agent_id: str,
    state: EnvState,
) -> bool:
    """
    Check if agent is allowed to act (all levers must approve).

    Args:
        agent_id: Agent attempting to act
        state: Current environment state

    Returns:
        True if all levers allow the agent to act
    """
    for lever in self._iter_active_levers():
        if not lever.can_agent_act(agent_id, state):
            return False
    return True

clear_collusion_history()

Clear collusion detection interaction history.

Source code in swarm/governance/engine.py
def clear_collusion_history(self) -> None:
    """Clear collusion detection interaction history."""
    if self._collusion_lever is not None:
        self._collusion_lever.clear_history()

clear_security_history()

Clear security analysis history and state.

Source code in swarm/governance/engine.py
def clear_security_history(self) -> None:
    """Clear security analysis history and state."""
    if self._security_lever is not None:
        self._security_lever.clear_history()

compute_vote_weight(agent_id, vote_count)

Compute normalized vote weight for an agent.

Parameters:

Name Type Description Default
agent_id str

The voting agent

required
vote_count int

Number of votes cast this epoch

required

Returns:

Type Description
float

Vote weight in (0, 1]

Source code in swarm/governance/engine.py
def compute_vote_weight(
    self,
    agent_id: str,
    vote_count: int,
) -> float:
    """
    Compute normalized vote weight for an agent.

    Args:
        agent_id: The voting agent
        vote_count: Number of votes cast this epoch

    Returns:
        Vote weight in (0, 1]
    """
    return float(self._vote_normalization_lever.compute_vote_weight(agent_id, vote_count))

get_active_lever_names()

Return registered lever names in execution order.

Source code in swarm/governance/engine.py
def get_active_lever_names(self) -> List[str]:
    """Return registered lever names in execution order."""
    return [lever.name for lever in self._iter_active_levers()]

get_adaptive_status()

Return current adaptive gating state.

Source code in swarm/governance/engine.py
def get_adaptive_status(self) -> Dict[str, Any]:
    """Return current adaptive gating state."""
    return {
        "adaptive_enabled": self.config.adaptive_governance_enabled,
        "variance_levers_active": self._adaptive_variance_active,
        "predicted_risk": self._adaptive_risk,
        "threshold": self.config.adaptive_incoherence_threshold,
    }

get_circuit_breaker_status(agent_id)

Get circuit breaker status for an agent.

Source code in swarm/governance/engine.py
def get_circuit_breaker_status(self, agent_id: str) -> Dict[str, Any]:
    """Get circuit breaker status for an agent."""
    if self._circuit_breaker_lever is None:
        return {}
    return dict(self._circuit_breaker_lever.get_freeze_status(agent_id))

get_collusion_report()

Get the latest collusion detection report.

Source code in swarm/governance/engine.py
def get_collusion_report(self):
    """Get the latest collusion detection report."""
    if self._collusion_lever is None:
        return None
    return self._collusion_lever.get_report()

get_diversity_lever()

Return the diversity defense lever if registered.

Source code in swarm/governance/engine.py
def get_diversity_lever(self) -> Optional[DiversityDefenseLever]:
    """Return the diversity defense lever if registered."""
    return self._diversity_lever

get_diversity_metrics()

Return the latest diversity metrics snapshot.

Source code in swarm/governance/engine.py
def get_diversity_metrics(self) -> Optional[Any]:
    """Return the latest diversity metrics snapshot."""
    if self._diversity_lever is None:
        return None
    return self._diversity_lever.get_metrics()

get_loop_detector_lever()

Return the loop detector lever if registered.

Source code in swarm/governance/engine.py
def get_loop_detector_lever(self) -> Optional[LoopDetectorLever]:
    """Return the loop detector lever if registered."""
    return self._loop_detector_lever

get_moltbook_challenge_lever()

Return Moltbook challenge lever if registered.

Source code in swarm/governance/engine.py
def get_moltbook_challenge_lever(self) -> Optional[ChallengeVerificationLever]:
    """Return Moltbook challenge lever if registered."""
    return self._moltbook_challenge_lever

get_moltbook_rate_limit_lever()

Return Moltbook rate limit lever if registered.

Source code in swarm/governance/engine.py
def get_moltbook_rate_limit_lever(self) -> Optional[MoltbookRateLimitLever]:
    """Return Moltbook rate limit lever if registered."""
    return self._moltbook_rate_limit_lever

get_quarantined_agents()

Get set of quarantined agents (immutable copy).

Source code in swarm/governance/engine.py
def get_quarantined_agents(self) -> frozenset[str]:
    """Get set of quarantined agents (immutable copy)."""
    if self._security_lever is None:
        return frozenset()
    return frozenset(self._security_lever.get_quarantined_agents())

get_rbac_lever()

Return the RBAC lever if registered.

Source code in swarm/governance/engine.py
def get_rbac_lever(self) -> Optional[RBACLever]:
    """Return the RBAC lever if registered."""
    return self._rbac_lever

get_registered_lever_names()

Return all registered lever names (ignores adaptive gating).

Source code in swarm/governance/engine.py
def get_registered_lever_names(self) -> List[str]:
    """Return all registered lever names (ignores adaptive gating)."""
    return [lever.name for lever in self._levers]

get_security_containment_actions()

Get history of security containment actions.

Source code in swarm/governance/engine.py
def get_security_containment_actions(self) -> List[Dict[str, Any]]:
    """Get history of security containment actions."""
    if self._security_lever is None:
        return []
    return list(self._security_lever.get_containment_actions())

get_security_report()

Get the latest security analysis report.

Source code in swarm/governance/engine.py
def get_security_report(self):
    """Get the latest security analysis report."""
    if self._security_lever is None:
        return None
    return self._security_lever.get_report()

get_self_modification_lever()

Return the self-modification lever if registered.

Source code in swarm/governance/engine.py
def get_self_modification_lever(self) -> Optional[SelfModificationLever]:
    """Return the self-modification lever if registered."""
    return self._self_modification_lever

release_from_quarantine(agent_id)

Release an agent from security quarantine.

Source code in swarm/governance/engine.py
def release_from_quarantine(self, agent_id: str) -> bool:
    """Release an agent from security quarantine."""
    if self._security_lever is None:
        return False
    return bool(self._security_lever.release_from_quarantine(agent_id))

reset_circuit_breaker(agent_id)

Reset circuit breaker tracking for an agent.

Source code in swarm/governance/engine.py
def reset_circuit_breaker(self, agent_id: str) -> None:
    """Reset circuit breaker tracking for an agent."""
    if self._circuit_breaker_lever is not None:
        self._circuit_breaker_lever.reset_tracker(agent_id)

set_collusion_agent_ids(agent_ids)

Set agent IDs for collusion detection.

Source code in swarm/governance/engine.py
def set_collusion_agent_ids(self, agent_ids: List[str]) -> None:
    """Set agent IDs for collusion detection."""
    if self._collusion_lever is not None:
        self._collusion_lever.set_agent_ids(agent_ids)

set_incoherence_forecaster(forecaster)

Attach a trained forecaster used for adaptive gating.

Source code in swarm/governance/engine.py
def set_incoherence_forecaster(self, forecaster: Any) -> None:
    """Attach a trained forecaster used for adaptive gating."""
    self._incoherence_forecaster = forecaster

set_rbac_agent_roles(agent_roles)

Set agent roles for RBAC enforcement.

Source code in swarm/governance/engine.py
def set_rbac_agent_roles(self, agent_roles: Dict[str, List[str]]) -> None:
    """Set agent roles for RBAC enforcement."""
    if self._rbac_lever is not None:
        self._rbac_lever.set_agent_roles(agent_roles)

set_rbac_security_clearances(clearances)

Set security clearances for RBAC enforcement.

Source code in swarm/governance/engine.py
def set_rbac_security_clearances(self, clearances: Dict[str, int]) -> None:
    """Set security clearances for RBAC enforcement."""
    if self._rbac_lever is not None:
        self._rbac_lever.set_security_clearances(clearances)

set_security_agent_ids(agent_ids)

Set agent IDs for security analysis.

Source code in swarm/governance/engine.py
def set_security_agent_ids(self, agent_ids: List[str]) -> None:
    """Set agent IDs for security analysis."""
    if self._security_lever is not None:
        self._security_lever.set_agent_ids(agent_ids)

set_security_trust_scores(trust_scores)

Set trust scores for security analysis (laundering detection).

Source code in swarm/governance/engine.py
def set_security_trust_scores(self, trust_scores: Dict[str, float]) -> None:
    """Set trust scores for security analysis (laundering detection)."""
    if self._security_lever is not None:
        self._security_lever.set_agent_trust_scores(trust_scores)

slash_agent_stake(agent_id, state, reason='violation')

Slash an agent's stake.

Parameters:

Name Type Description Default
agent_id str

Agent to slash

required
state EnvState

Current environment state

required
reason str

Reason for slashing

'violation'

Returns:

Type Description
GovernanceEffect

Governance effect with resource delta

Source code in swarm/governance/engine.py
def slash_agent_stake(
    self,
    agent_id: str,
    state: EnvState,
    reason: str = "violation",
) -> GovernanceEffect:
    """
    Slash an agent's stake.

    Args:
        agent_id: Agent to slash
        state: Current environment state
        reason: Reason for slashing

    Returns:
        Governance effect with resource delta
    """
    if self._staking_lever is None:
        return GovernanceEffect()
    effect = self._staking_lever.slash_stake(agent_id, state, reason)
    return GovernanceEffect.from_lever_effects([effect])

update_adaptive_mode(features)

Update adaptive gating state from current feature vector.

Returns:

Type Description
float

Predicted incoherence risk in [0, 1].

Source code in swarm/governance/engine.py
def update_adaptive_mode(self, features: Dict[str, float]) -> float:
    """
    Update adaptive gating state from current feature vector.

    Returns:
        Predicted incoherence risk in [0, 1].
    """
    if not self.config.adaptive_governance_enabled:
        self._adaptive_variance_active = True
        self._adaptive_risk = 0.0
        return self._adaptive_risk
    if self._incoherence_forecaster is None:
        # Fail open to avoid accidental disablement without model wiring.
        self._adaptive_variance_active = True
        self._adaptive_risk = 0.0
        return self._adaptive_risk

    risk = float(self._incoherence_forecaster.predict_proba(features))
    self._adaptive_risk = risk
    self._adaptive_variance_active = (
        risk >= self.config.adaptive_incoherence_threshold
    )
    return risk

Usage

from swarm.governance import GovernanceEngine, GovernanceConfig

config = GovernanceConfig(
    transaction_tax=0.02,
    reputation_decay=0.1,
    circuit_breaker_threshold=0.3,
)

engine = GovernanceEngine(config)

# Check if agent can act
if engine.can_agent_act(agent_id):
    # Process action
    pass

# Record interaction
engine.on_interaction(interaction)

# Get governance costs
costs = engine.compute_costs(interaction)

GovernanceConfig

Configuration for all governance levers.

Parameter Type Default Description
transaction_tax float 0.0 Tax per interaction
reputation_decay float 0.0 Decay rate per epoch
initial_reputation float 1.0 Starting reputation
circuit_breaker_threshold float 1.0 Toxicity threshold
circuit_breaker_window int 10 Lookback window
circuit_breaker_cooldown int 5 Freeze duration
audit_probability float 0.0 Audit frequency
audit_penalty float 0.0 Failed audit cost
staking_requirement float 0.0 Minimum stake
stake_slash_rate float 0.0 Slash fraction
collusion_detection bool False Enable detection
collusion_threshold float 0.8 Correlation threshold

Security YAML Keys

Security detection can be configured via scenario YAML using the security_* governance keys below:

governance:
  security_enabled: true
  security_injection_threshold: 0.35
  security_manipulation_threshold: 0.6
  security_laundering_trust_gap: 0.3
  security_contagion_velocity: 2.0
  security_min_chain_length: 3
  security_min_interactions: 5
  security_penalty_threshold: 0.35
  security_quarantine_threshold: 0.7
  security_penalty_multiplier: 1.2
  security_realtime_penalty: true
  security_realtime_threshold: 0.6
  security_realtime_rate: 0.2
  security_clear_history_on_epoch: false

Individual Levers

TransactionTax

from swarm.governance.levers import TransactionTax

tax = TransactionTax(rate=0.02)
cost = tax.compute(interaction)

ReputationDecay

from swarm.governance.levers import ReputationDecay

decay = ReputationDecay(rate=0.1)
new_rep = decay.apply(current_rep, epoch_delta=1)

CircuitBreaker

from swarm.governance.levers import CircuitBreaker

breaker = CircuitBreaker(
    threshold=0.3,
    window=10,
    cooldown=5,
)

# Check status
if breaker.is_frozen(agent_id):
    return  # Agent cannot act

# Record interaction
breaker.record(agent_id, toxicity=0.4)

# Check if triggered
if breaker.should_freeze(agent_id):
    breaker.freeze(agent_id)

RandomAudit

from swarm.governance.levers import RandomAudit

audit = RandomAudit(probability=0.05, penalty=0.5)

if audit.should_audit():
    result = audit.execute(interaction)
    if not result.passed:
        apply_penalty(interaction.initiator, audit.penalty)

Staking

from swarm.governance.levers import Staking

staking = Staking(
    requirement=10.0,
    slash_rate=0.1,
)

# Check eligibility
if not staking.can_participate(agent_id):
    return  # Insufficient stake

# Slash on bad behavior
staking.slash(agent_id, reason="failed_audit")

Collusion Detection

from swarm.governance.collusion import CollusionDetector

detector = CollusionDetector(
    threshold=0.8,
    window=20,
)

# Record interactions
detector.record(agent_a, agent_b, interaction)

# Check for collusion
colluding_pairs = detector.detect()
for pair, score in colluding_pairs:
    print(f"Potential collusion: {pair} (score: {score:.2f})")

Hooks

Governance integrates with the orchestrator via hooks:

orchestrator = Orchestrator(
    config=config,
    governance=governance_engine,
)

# Hooks are called automatically:
# - on_epoch_start
# - on_interaction
# - on_epoch_end

Custom Levers

Create custom governance mechanisms:

from swarm.governance.levers import GovernanceLever

class CustomLever(GovernanceLever):
    def __init__(self, param: float):
        self.param = param

    def compute_cost(self, interaction) -> float:
        # Custom cost computation
        return interaction.p * self.param

    def should_block(self, agent_id: str) -> bool:
        # Custom blocking logic
        return False

See also