Source code for teatime.plugins.eth1.txpool

"""This module contains checks regarding a node's transaction pool."""

from teatime.plugins import Context, JSONRPCPlugin, NodeType
from teatime.reporting import Issue, Severity


[docs]class TxPoolContent(JSONRPCPlugin): """Try to fetch the transaction pool contents. Severity: Low Parity/OpenEthereum: https://openethereum.github.io/wiki/JSONRPC-parity-module#parity_pendingtransactions Geth: https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content """ INTRUSIVE = False def _check(self, context: Context) -> None: if context.node_type == NodeType.GETH: payload = self.get_rpc_json(context.target, method="txpool_content") context.report.add_issue( Issue( title="TxPool Content", description=( "Anyone can see the transcation pool contents " "using the txpool_content RPC call." ), raw_data=payload, severity=Severity.LOW, ) ) elif context.node_type == NodeType.PARITY: payload = self.get_rpc_json( context.target, method="parity_pendingTransactions" ) context.report.add_issue( Issue( title="TxPool Content", description=( "Anyone can see the transaction pool contents " "using the parity_pendingTransactions RPC call." ), raw_data=payload, severity=Severity.LOW, ) )
[docs]class GethTxPoolInspection(JSONRPCPlugin): """Try to inspect the transaction pool. Severity: Low Geth: https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_inspect """ INTRUSIVE = False def _check(self, context: Context) -> None: if context.node_type != NodeType.GETH: return payload = self.get_rpc_json(context.target, method="txpool_inspect") context.report.add_issue( Issue( title="TxPool Inspection", description=( "Anyone can inspect the transaction pool " "using the txpool_inspect RPC call." ), raw_data=payload, severity=Severity.LOW, ) )
[docs]class GethTxPoolStatus(JSONRPCPlugin): """Try to fetch the transaction pool status. Severity: Low Geth: https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_status """ INTRUSIVE = False def _check(self, context: Context) -> None: if context.node_type != NodeType.GETH: return payload = self.get_rpc_json(context.target, method="txpool_status") context.report.add_issue( Issue( title="TxPool Status", description=( "Anyone can see the transaction pool status " "using the txpool_status RPC call." ), raw_data=payload, severity=Severity.LOW, ) )
[docs]class ParityTxPoolStatistics(JSONRPCPlugin): """Try to fetch the transaction pool statistics. Severity: Low Parity: https://openethereum.github.io/wiki/JSONRPC-parity-module#parity_pendingtransactionsstats """ INTRUSIVE = False def _check(self, context: Context) -> None: if context.node_type != NodeType.PARITY: return payload = self.get_rpc_json( context.target, method="parity_pendingTransactionsStats" ) context.report.add_issue( Issue( title="TxPool Statistics", description=( "Anyone can see the transaction pool statistics " "using the parity_pendingTransactionsStats RPC call." ), raw_data=payload, severity=Severity.LOW, ) )