Source code for teatime.plugins.ipfs.add

"""This module contains plugins regarding file uploads to the node."""

import io
import tarfile

from teatime import Context, Issue, NodeType, Severity
from teatime.plugins.base import IPFSRPCPlugin

[docs]class OpenUploadAdd(IPFSRPCPlugin): """Detect where it's possible to upload a file using the /add endpoint. Severity: High Endpoint: An open upload functionality can enable an attacker to upload a lot of random data until storage space is exhausted, thus performing a denial of service attack against future uploads. """ INTRUSIVE = True def __init__( self, file_name: str = ".teatime", file_content: str = "teatime test file" ): self.file_name = file_name self.file_content = file_content def _check(self, context: Context): if context.node_type != NodeType.IPFS: return payload = self.get_rpc_json(, route="/api/v0/add", files={self.file_name: self.file_content.encode("utf-8")}, ) Issue( title="Anyone can upload data to the node", description=( "Anyone is able to upload files to the node. An attacker can use this to " "upload large amounts of data and thus prevent the node from accepting " "further uploads, performing a Denial of Service (DoS) attack." ), raw_data=payload, severity=Severity.HIGH, ) )
[docs]class OpenUploadTarAdd(IPFSRPCPlugin): """Detect where it's possible to upload a file using the /tar/add endpoint. Severity: High Endpoint: An open upload functionality can enable an attacker to upload a lot of random data until storage space is exhausted, thus performing a denial of service attack against future uploads. """ INTRUSIVE = True def __init__( self, file_name: str = ".teatime", file_content: str = "teatime test file" ): self.file_name = file_name self.file_content = file_content def _check(self, context: Context): if context.node_type != NodeType.IPFS: return fh = io.BytesIO() content = io.BytesIO(self.file_content.encode("utf-8")) with, mode="w:gz") as tar: info = tarfile.TarInfo(self.file_name) info.size = len(self.file_content) tar.addfile(info, content) payload = self.get_rpc_json(, route="/api/v0/tar/add", files={self.file_name: fh}, ) Issue( title="Anyone can upload compressed data to the node", description=( "Anyone is able to upload files to the node. An attacker can use this to " "upload large amounts of data and thus prevent the node from accepting " "further uploads, performing a Denial of Service (DoS) attack." ), raw_data=payload, severity=Severity.HIGH, ) )