Skip to content

API Reference

File Security Module

A comprehensive file security system for validating uploads and preventing attacks.

BaseValidator

Bases: ABC

Abstract base class for file security validators.

Attributes:

Name Type Description
config

File security configuration parameters.

Source code in safeuploads/validators/base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class BaseValidator(ABC):
    """
    Abstract base class for file security validators.

    Attributes:
        config: File security configuration parameters.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize validator with configuration.

        Args:
            config: File security settings to apply.
        """
        self.config = config

    @abstractmethod
    def validate(self, *args, **kwargs) -> Any:
        """
        Validate data using subclass-specific logic.

        Args:
            *args: Positional arguments for concrete validator.
            **kwargs: Keyword arguments for concrete validator.

        Returns:
            Validated result defined by subclass.
        """
        pass

__init__

__init__(config)

Initialize validator with configuration.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security settings to apply.

required
Source code in safeuploads/validators/base.py
22
23
24
25
26
27
28
29
def __init__(self, config: FileSecurityConfig):
    """
    Initialize validator with configuration.

    Args:
        config: File security settings to apply.
    """
    self.config = config

validate abstractmethod

validate(*args, **kwargs)

Validate data using subclass-specific logic.

Parameters:

Name Type Description Default
*args

Positional arguments for concrete validator.

()
**kwargs

Keyword arguments for concrete validator.

{}

Returns:

Type Description
Any

Validated result defined by subclass.

Source code in safeuploads/validators/base.py
31
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def validate(self, *args, **kwargs) -> Any:
    """
    Validate data using subclass-specific logic.

    Args:
        *args: Positional arguments for concrete validator.
        **kwargs: Keyword arguments for concrete validator.

    Returns:
        Validated result defined by subclass.
    """
    pass

CompoundExtensionCategory

Bases: Enum

Categorized compound file extensions that combine multiple suffixes.

Attributes:

Name Type Description
COMPRESSED_ARCHIVES

Multi-part archive formats.

JAVASCRIPT_VARIANTS

Specialized JavaScript files.

WEB_CONTENT

Minified static web assets.

Source code in safeuploads/enums.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class CompoundExtensionCategory(Enum):
    """
    Categorized compound file extensions that combine multiple suffixes.

    Attributes:
        COMPRESSED_ARCHIVES: Multi-part archive formats.
        JAVASCRIPT_VARIANTS: Specialized JavaScript files.
        WEB_CONTENT: Minified static web assets.
    """

    # Compressed archive formats
    COMPRESSED_ARCHIVES = {
        ".tar.xz",
        ".tar.gz",
        ".tar.bz2",
        ".tar.lz",
        ".tar.lzma",
        ".tar.Z",
        ".tgz",
        ".tbz2",
    }

    # JavaScript related compound extensions
    JAVASCRIPT_VARIANTS = {".user.js", ".backup.js", ".min.js", ".worker.js"}

    # Web content compound extensions
    WEB_CONTENT = {".min.css", ".min.html"}

CompressionSecurityError

Bases: FileValidationError

Compressed file security check failed.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename of compressed file.

None
error_code str | None

Optional error code (defaults to COMPRESSION_GENERIC).

None
Source code in safeuploads/exceptions.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class CompressionSecurityError(FileValidationError):
    """
    Compressed file security check failed.

    Args:
        message: Human-readable error description.
        filename: Optional filename of compressed file.
        error_code: Optional error code (defaults to
            COMPRESSION_GENERIC).

    Attributes:
        None beyond inherited FileValidationError attributes.
    """

    pass

CompressionSecurityValidator

Bases: BaseValidator

Validates ZIP uploads against zip bombs and compression attacks.

Attributes:

Name Type Description
config

Security configuration for validation limits.

Source code in safeuploads/validators/compression_validator.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class CompressionSecurityValidator(BaseValidator):
    """
    Validates ZIP uploads against zip bombs and compression attacks.

    Attributes:
        config: Security configuration for validation limits.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the compression validator.

        Args:
            config: Security configuration with compression limits.
        """
        super().__init__(config)

    def validate_zip_compression_ratio(
        self, file_content: bytes, compressed_size: int
    ) -> None:
        """
        Validate ZIP archive against security limits.

        Args:
            file_content: Raw bytes of the ZIP archive.
            compressed_size: Size of the compressed archive in bytes.

        Raises:
            ZipBombError: If compression ratio exceeds maximum allowed
                or total uncompressed size is too large.
            CompressionSecurityError: If ZIP structure is invalid, too
                many entries, nested archives detected, or individual
                file too large.
            FileProcessingError: If unexpected error occurs during
                validation such as memory errors or I/O errors.
        """
        try:
            # Create a BytesIO object from file content for zipfile analysis
            zip_bytes = io.BytesIO(file_content)

            # Track analysis metrics
            total_uncompressed_size = 0
            total_compressed_size = compressed_size
            file_count = 0
            nested_archives = []
            max_compression_ratio = 0
            overall_compression_ratio = 0  # Initialize to avoid unbound variable

            # Analyze ZIP file structure with timeout protection
            start_time = time.time()

            with zipfile.ZipFile(zip_bytes, "r") as zip_file:
                # Check for excessive number of files
                zip_entries = zip_file.infolist()
                file_count = len(zip_entries)

                if file_count > self.config.limits.max_zip_entries:
                    logger.warning(
                        "ZIP contains too many files",
                        extra={
                            "error_type": "zip_too_many_entries",
                            "file_count": file_count,
                            "max_entries": self.config.limits.max_zip_entries,
                        },
                    )
                    raise CompressionSecurityError(
                        message=f"ZIP contains too many files: {file_count}. "
                        f"Maximum allowed: {self.config.limits.max_zip_entries}",
                        error_code=ErrorCode.ZIP_TOO_MANY_ENTRIES,
                    )

                # Analyze each entry in the ZIP
                for entry in zip_entries:
                    # Check for timeout
                    if (
                        time.time() - start_time
                        > self.config.limits.zip_analysis_timeout
                    ):
                        logger.error(
                            "ZIP analysis timeout",
                            extra={
                                "error_type": "zip_analysis_timeout",
                                "timeout": self.config.limits.zip_analysis_timeout,
                            },
                        )
                        raise ZipBombError(
                            message=f"ZIP analysis timeout after {self.config.limits.zip_analysis_timeout}s - potential zip bomb",
                            compression_ratio=0,
                        )

                    # Skip directories
                    if entry.is_dir():
                        continue

                    # Track uncompressed size
                    uncompressed_size = entry.file_size
                    compressed_size_entry = entry.compress_size
                    total_uncompressed_size += uncompressed_size

                    # Check individual file compression ratio
                    if compressed_size_entry > 0:  # Avoid division by zero
                        compression_ratio = uncompressed_size / compressed_size_entry
                        max_compression_ratio = max(
                            max_compression_ratio, compression_ratio
                        )

                        if compression_ratio > self.config.limits.max_compression_ratio:
                            logger.error(
                                "Excessive compression ratio detected",
                                extra={
                                    "error_type": "compression_ratio_exceeded",
                                    "file_name": entry.filename,
                                    "compression_ratio": compression_ratio,
                                    "max_ratio": self.config.limits.max_compression_ratio,
                                },
                            )
                            raise ZipBombError(
                                message=f"Excessive compression ratio detected: {compression_ratio:.1f}:1 for '{entry.filename}'. "
                                f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
                                compression_ratio=compression_ratio,
                            )

                    # Check for nested archive files
                    filename_lower = entry.filename.lower()
                    if any(
                        filename_lower.endswith(ext)
                        for ext in [".zip", ".rar", ".7z", ".tar", ".gz", ".bz2"]
                    ):
                        nested_archives.append(entry.filename)

                    # Check for excessively large individual files
                    # Use the configurable max_individual_file_size limit
                    if uncompressed_size > self.config.limits.max_individual_file_size:
                        logger.warning(
                            "Individual file too large",
                            extra={
                                "error_type": "file_too_large",
                                "file_name": entry.filename,
                                "size_mb": uncompressed_size // (1024 * 1024),
                                "max_size_mb": self.config.limits.max_individual_file_size
                                // (1024 * 1024),
                            },
                        )
                        raise CompressionSecurityError(
                            message=f"Individual file too large: '{entry.filename}' would expand to {uncompressed_size // (1024*1024)}MB. "
                            f"Maximum allowed: {self.config.limits.max_individual_file_size // (1024*1024)}MB",
                            error_code=ErrorCode.FILE_TOO_LARGE,
                        )

                # Check total uncompressed size
                if total_uncompressed_size > self.config.limits.max_uncompressed_size:
                    logger.warning(
                        "Total uncompressed size too large",
                        extra={
                            "error_type": "zip_too_large",
                            "total_size_mb": total_uncompressed_size // (1024 * 1024),
                            "max_size_mb": self.config.limits.max_uncompressed_size
                            // (1024 * 1024),
                        },
                    )
                    raise ZipBombError(
                        message=f"Total uncompressed size too large: {total_uncompressed_size // (1024*1024)}MB. "
                        f"Maximum allowed: {self.config.limits.max_uncompressed_size // (1024*1024)}MB",
                        compression_ratio=0,
                        uncompressed_size=total_uncompressed_size,
                        max_size=self.config.limits.max_uncompressed_size,
                    )

                # Check overall compression ratio
                if total_compressed_size > 0:
                    overall_compression_ratio = (
                        total_uncompressed_size / total_compressed_size
                    )
                    if (
                        overall_compression_ratio
                        > self.config.limits.max_compression_ratio
                    ):
                        logger.error(
                            "Overall compression ratio too high",
                            extra={
                                "error_type": "compression_ratio_exceeded",
                                "overall_ratio": overall_compression_ratio,
                                "max_ratio": self.config.limits.max_compression_ratio,
                            },
                        )
                        raise ZipBombError(
                            message=f"Overall compression ratio too high: {overall_compression_ratio:.1f}:1. "
                            f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
                            compression_ratio=overall_compression_ratio,
                            max_ratio=self.config.limits.max_compression_ratio,
                        )

                # Reject nested archives (potential security risk)
                if nested_archives:
                    logger.warning(
                        "Nested archives detected",
                        extra={
                            "error_type": "zip_nested_archive",
                            "nested_archives": nested_archives,
                        },
                    )
                    raise CompressionSecurityError(
                        message=f"Nested archives are not allowed: {', '.join(nested_archives)}",
                        error_code=ErrorCode.ZIP_NESTED_ARCHIVE,
                    )

                # Log analysis results
                logger.debug(
                    "ZIP analysis: %s files, %sMB uncompressed, max ratio: %.1f:1, overall ratio: %.1f:1",
                    file_count,
                    total_uncompressed_size // (1024 * 1024),
                    max_compression_ratio,
                    overall_compression_ratio,
                )

        except zipfile.BadZipFile as err:
            logger.error("Invalid or corrupted ZIP file", exc_info=True)
            raise CompressionSecurityError(
                message="Invalid or corrupted ZIP file",
                error_code=ErrorCode.ZIP_CORRUPT,
            ) from err
        except zipfile.LargeZipFile as err:
            logger.error("ZIP file too large to process", exc_info=True)
            raise CompressionSecurityError(
                message="ZIP file too large to process safely",
                error_code=ErrorCode.ZIP_TOO_LARGE,
            ) from err
        except MemoryError as err:
            logger.error("ZIP requires excessive memory", exc_info=True)
            raise ZipBombError(
                message="ZIP file requires too much memory to process - potential zip bomb",
                compression_ratio=0,
            ) from err
        except (ZipBombError, CompressionSecurityError):
            # Re-raise our own exceptions
            raise
        except Exception as err:
            logger.error(
                "Unexpected error during ZIP compression validation",
                exc_info=True,
            )
            raise FileProcessingError(
                message=f"ZIP validation failed: {str(err)}",
            ) from err

    def validate(self, file_content: bytes, compressed_size: int) -> None:
        """
        Validate the compression ratio of a ZIP file.

        Args:
            file_content: Raw bytes of the uploaded file.
            compressed_size: Size of the file after compression in bytes.

        Raises:
            ZipBombError: If compression ratio exceeds maximum allowed.
            CompressionSecurityError: If ZIP structure is invalid.
            FileProcessingError: If unexpected error occurs.
        """
        return self.validate_zip_compression_ratio(file_content, compressed_size)

__init__

__init__(config)

Initialize the compression validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

Security configuration with compression limits.

required
Source code in safeuploads/validators/compression_validator.py
36
37
38
39
40
41
42
43
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the compression validator.

    Args:
        config: Security configuration with compression limits.
    """
    super().__init__(config)

validate

validate(file_content, compressed_size)

Validate the compression ratio of a ZIP file.

Parameters:

Name Type Description Default
file_content bytes

Raw bytes of the uploaded file.

required
compressed_size int

Size of the file after compression in bytes.

required

Raises:

Type Description
ZipBombError

If compression ratio exceeds maximum allowed.

CompressionSecurityError

If ZIP structure is invalid.

FileProcessingError

If unexpected error occurs.

Source code in safeuploads/validators/compression_validator.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def validate(self, file_content: bytes, compressed_size: int) -> None:
    """
    Validate the compression ratio of a ZIP file.

    Args:
        file_content: Raw bytes of the uploaded file.
        compressed_size: Size of the file after compression in bytes.

    Raises:
        ZipBombError: If compression ratio exceeds maximum allowed.
        CompressionSecurityError: If ZIP structure is invalid.
        FileProcessingError: If unexpected error occurs.
    """
    return self.validate_zip_compression_ratio(file_content, compressed_size)

validate_zip_compression_ratio

validate_zip_compression_ratio(file_content, compressed_size)

Validate ZIP archive against security limits.

Parameters:

Name Type Description Default
file_content bytes

Raw bytes of the ZIP archive.

required
compressed_size int

Size of the compressed archive in bytes.

required

Raises:

Type Description
ZipBombError

If compression ratio exceeds maximum allowed or total uncompressed size is too large.

CompressionSecurityError

If ZIP structure is invalid, too many entries, nested archives detected, or individual file too large.

FileProcessingError

If unexpected error occurs during validation such as memory errors or I/O errors.

Source code in safeuploads/validators/compression_validator.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def validate_zip_compression_ratio(
    self, file_content: bytes, compressed_size: int
) -> None:
    """
    Validate ZIP archive against security limits.

    Args:
        file_content: Raw bytes of the ZIP archive.
        compressed_size: Size of the compressed archive in bytes.

    Raises:
        ZipBombError: If compression ratio exceeds maximum allowed
            or total uncompressed size is too large.
        CompressionSecurityError: If ZIP structure is invalid, too
            many entries, nested archives detected, or individual
            file too large.
        FileProcessingError: If unexpected error occurs during
            validation such as memory errors or I/O errors.
    """
    try:
        # Create a BytesIO object from file content for zipfile analysis
        zip_bytes = io.BytesIO(file_content)

        # Track analysis metrics
        total_uncompressed_size = 0
        total_compressed_size = compressed_size
        file_count = 0
        nested_archives = []
        max_compression_ratio = 0
        overall_compression_ratio = 0  # Initialize to avoid unbound variable

        # Analyze ZIP file structure with timeout protection
        start_time = time.time()

        with zipfile.ZipFile(zip_bytes, "r") as zip_file:
            # Check for excessive number of files
            zip_entries = zip_file.infolist()
            file_count = len(zip_entries)

            if file_count > self.config.limits.max_zip_entries:
                logger.warning(
                    "ZIP contains too many files",
                    extra={
                        "error_type": "zip_too_many_entries",
                        "file_count": file_count,
                        "max_entries": self.config.limits.max_zip_entries,
                    },
                )
                raise CompressionSecurityError(
                    message=f"ZIP contains too many files: {file_count}. "
                    f"Maximum allowed: {self.config.limits.max_zip_entries}",
                    error_code=ErrorCode.ZIP_TOO_MANY_ENTRIES,
                )

            # Analyze each entry in the ZIP
            for entry in zip_entries:
                # Check for timeout
                if (
                    time.time() - start_time
                    > self.config.limits.zip_analysis_timeout
                ):
                    logger.error(
                        "ZIP analysis timeout",
                        extra={
                            "error_type": "zip_analysis_timeout",
                            "timeout": self.config.limits.zip_analysis_timeout,
                        },
                    )
                    raise ZipBombError(
                        message=f"ZIP analysis timeout after {self.config.limits.zip_analysis_timeout}s - potential zip bomb",
                        compression_ratio=0,
                    )

                # Skip directories
                if entry.is_dir():
                    continue

                # Track uncompressed size
                uncompressed_size = entry.file_size
                compressed_size_entry = entry.compress_size
                total_uncompressed_size += uncompressed_size

                # Check individual file compression ratio
                if compressed_size_entry > 0:  # Avoid division by zero
                    compression_ratio = uncompressed_size / compressed_size_entry
                    max_compression_ratio = max(
                        max_compression_ratio, compression_ratio
                    )

                    if compression_ratio > self.config.limits.max_compression_ratio:
                        logger.error(
                            "Excessive compression ratio detected",
                            extra={
                                "error_type": "compression_ratio_exceeded",
                                "file_name": entry.filename,
                                "compression_ratio": compression_ratio,
                                "max_ratio": self.config.limits.max_compression_ratio,
                            },
                        )
                        raise ZipBombError(
                            message=f"Excessive compression ratio detected: {compression_ratio:.1f}:1 for '{entry.filename}'. "
                            f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
                            compression_ratio=compression_ratio,
                        )

                # Check for nested archive files
                filename_lower = entry.filename.lower()
                if any(
                    filename_lower.endswith(ext)
                    for ext in [".zip", ".rar", ".7z", ".tar", ".gz", ".bz2"]
                ):
                    nested_archives.append(entry.filename)

                # Check for excessively large individual files
                # Use the configurable max_individual_file_size limit
                if uncompressed_size > self.config.limits.max_individual_file_size:
                    logger.warning(
                        "Individual file too large",
                        extra={
                            "error_type": "file_too_large",
                            "file_name": entry.filename,
                            "size_mb": uncompressed_size // (1024 * 1024),
                            "max_size_mb": self.config.limits.max_individual_file_size
                            // (1024 * 1024),
                        },
                    )
                    raise CompressionSecurityError(
                        message=f"Individual file too large: '{entry.filename}' would expand to {uncompressed_size // (1024*1024)}MB. "
                        f"Maximum allowed: {self.config.limits.max_individual_file_size // (1024*1024)}MB",
                        error_code=ErrorCode.FILE_TOO_LARGE,
                    )

            # Check total uncompressed size
            if total_uncompressed_size > self.config.limits.max_uncompressed_size:
                logger.warning(
                    "Total uncompressed size too large",
                    extra={
                        "error_type": "zip_too_large",
                        "total_size_mb": total_uncompressed_size // (1024 * 1024),
                        "max_size_mb": self.config.limits.max_uncompressed_size
                        // (1024 * 1024),
                    },
                )
                raise ZipBombError(
                    message=f"Total uncompressed size too large: {total_uncompressed_size // (1024*1024)}MB. "
                    f"Maximum allowed: {self.config.limits.max_uncompressed_size // (1024*1024)}MB",
                    compression_ratio=0,
                    uncompressed_size=total_uncompressed_size,
                    max_size=self.config.limits.max_uncompressed_size,
                )

            # Check overall compression ratio
            if total_compressed_size > 0:
                overall_compression_ratio = (
                    total_uncompressed_size / total_compressed_size
                )
                if (
                    overall_compression_ratio
                    > self.config.limits.max_compression_ratio
                ):
                    logger.error(
                        "Overall compression ratio too high",
                        extra={
                            "error_type": "compression_ratio_exceeded",
                            "overall_ratio": overall_compression_ratio,
                            "max_ratio": self.config.limits.max_compression_ratio,
                        },
                    )
                    raise ZipBombError(
                        message=f"Overall compression ratio too high: {overall_compression_ratio:.1f}:1. "
                        f"Maximum allowed: {self.config.limits.max_compression_ratio}:1",
                        compression_ratio=overall_compression_ratio,
                        max_ratio=self.config.limits.max_compression_ratio,
                    )

            # Reject nested archives (potential security risk)
            if nested_archives:
                logger.warning(
                    "Nested archives detected",
                    extra={
                        "error_type": "zip_nested_archive",
                        "nested_archives": nested_archives,
                    },
                )
                raise CompressionSecurityError(
                    message=f"Nested archives are not allowed: {', '.join(nested_archives)}",
                    error_code=ErrorCode.ZIP_NESTED_ARCHIVE,
                )

            # Log analysis results
            logger.debug(
                "ZIP analysis: %s files, %sMB uncompressed, max ratio: %.1f:1, overall ratio: %.1f:1",
                file_count,
                total_uncompressed_size // (1024 * 1024),
                max_compression_ratio,
                overall_compression_ratio,
            )

    except zipfile.BadZipFile as err:
        logger.error("Invalid or corrupted ZIP file", exc_info=True)
        raise CompressionSecurityError(
            message="Invalid or corrupted ZIP file",
            error_code=ErrorCode.ZIP_CORRUPT,
        ) from err
    except zipfile.LargeZipFile as err:
        logger.error("ZIP file too large to process", exc_info=True)
        raise CompressionSecurityError(
            message="ZIP file too large to process safely",
            error_code=ErrorCode.ZIP_TOO_LARGE,
        ) from err
    except MemoryError as err:
        logger.error("ZIP requires excessive memory", exc_info=True)
        raise ZipBombError(
            message="ZIP file requires too much memory to process - potential zip bomb",
            compression_ratio=0,
        ) from err
    except (ZipBombError, CompressionSecurityError):
        # Re-raise our own exceptions
        raise
    except Exception as err:
        logger.error(
            "Unexpected error during ZIP compression validation",
            exc_info=True,
        )
        raise FileProcessingError(
            message=f"ZIP validation failed: {str(err)}",
        ) from err

ConfigValidationError dataclass

Configuration validation issue with severity and recommendation.

Attributes:

Name Type Description
error_type str

Type of the validation error.

message str

Human-readable error message.

severity str

Error severity level ('error', 'warning', 'info').

component str

Component that failed validation.

recommendation str

Optional recommendation to fix the issue.

Source code in safeuploads/exceptions.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@dataclass
class ConfigValidationError:
    """
    Configuration validation issue with severity and recommendation.

    Attributes:
        error_type: Type of the validation error.
        message: Human-readable error message.
        severity: Error severity level ('error', 'warning', 'info').
        component: Component that failed validation.
        recommendation: Optional recommendation to fix the issue.
    """

    error_type: str
    message: str
    severity: str  # 'error', 'warning', 'info'
    component: str
    recommendation: str = ""

DangerousExtensionCategory

Bases: Enum

File extension categories considered potentially dangerous for uploads.

Attributes:

Name Type Description
WINDOWS_EXECUTABLES

Traditional Windows executable formats.

SCRIPT_FILES

Script files that can execute code.

WEB_SCRIPTS

Web server and dynamic content scripts.

UNIX_EXECUTABLES

Unix/Linux executables and shell scripts.

MACOS_EXECUTABLES

macOS specific executables and applications.

JAVA_EXECUTABLES

Java related executables and bytecode.

MOBILE_APPS

Mobile application packages.

BROWSER_EXTENSIONS

Browser extensions and web applications.

PACKAGE_FORMATS

Modern package managers and distribution formats.

ARCHIVE_FORMATS

Archive formats that can contain executables.

VIRTUALIZATION_FORMATS

Virtualization and container formats.

OFFICE_MACROS

Office documents with macro capabilities.

SYSTEM_FILES

System shortcuts and configuration files.

SYSTEM_DRIVERS

System drivers and low-level components.

WINDOWS_THEMES

Windows theme and customization files.

HELP_FILES

Help and documentation files that can execute code.

Source code in safeuploads/enums.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class DangerousExtensionCategory(Enum):
    """
    File extension categories considered potentially dangerous for uploads.

    Attributes:
        WINDOWS_EXECUTABLES: Traditional Windows executable formats.
        SCRIPT_FILES: Script files that can execute code.
        WEB_SCRIPTS: Web server and dynamic content scripts.
        UNIX_EXECUTABLES: Unix/Linux executables and shell scripts.
        MACOS_EXECUTABLES: macOS specific executables and applications.
        JAVA_EXECUTABLES: Java related executables and bytecode.
        MOBILE_APPS: Mobile application packages.
        BROWSER_EXTENSIONS: Browser extensions and web applications.
        PACKAGE_FORMATS: Modern package managers and distribution formats.
        ARCHIVE_FORMATS: Archive formats that can contain executables.
        VIRTUALIZATION_FORMATS: Virtualization and container formats.
        OFFICE_MACROS: Office documents with macro capabilities.
        SYSTEM_FILES: System shortcuts and configuration files.
        SYSTEM_DRIVERS: System drivers and low-level components.
        WINDOWS_THEMES: Windows theme and customization files.
        HELP_FILES: Help and documentation files that can execute code.
    """

    # Traditional Windows executables
    WINDOWS_EXECUTABLES = {
        ".exe",
        ".bat",
        ".cmd",
        ".com",
        ".pif",
        ".scr",
        ".msi",
        ".dll",
    }

    # Script files that can execute code
    SCRIPT_FILES = {
        ".vbs",
        ".js",
        ".jse",
        ".wsf",
        ".wsh",
        ".hta",
        ".ps1",
        ".psm1",
        ".ps1xml",
        ".psc1",
        ".psd1",
        ".pssc",
        ".cdxml",
        ".xaml",
    }

    # Web server and dynamic content scripts
    WEB_SCRIPTS = {
        ".jsp",
        ".php",
        ".php3",
        ".php4",
        ".php5",
        ".phtml",
        ".asp",
        ".aspx",
        ".cer",
        ".cgi",
        ".pl",
        ".py",
        ".rb",
        ".go",
        ".lua",
    }

    # Unix/Linux executables and shell scripts
    UNIX_EXECUTABLES = {
        ".sh",
        ".bash",
        ".zsh",
        ".fish",
        ".csh",
        ".ksh",
        ".tcsh",
        ".run",
        ".bin",
        ".out",
        ".elf",
        ".so",
        ".a",
    }

    # macOS specific executables and applications
    MACOS_EXECUTABLES = {
        ".app",
        ".dmg",
        ".pkg",
        ".mpkg",
        ".command",
        ".tool",
        ".workflow",
        ".action",
        ".dylib",
        ".bundle",
        ".framework",
    }

    # Java related executables and bytecode
    JAVA_EXECUTABLES = {".jar", ".war", ".ear", ".jnlp", ".class"}

    # Mobile application packages
    MOBILE_APPS = {".apk", ".aab", ".ipa", ".appx", ".msix", ".xap"}

    # Browser extensions and web applications
    BROWSER_EXTENSIONS = {
        ".crx",
        ".xpi",
        ".safariextz",
        ".oex",
        ".nex",
        ".gadget",
    }

    # Modern package managers and distribution formats
    PACKAGE_FORMATS = {
        ".deb",
        ".rpm",
        ".snap",
        ".flatpak",
        ".appimage",
        ".vsix",
        ".nupkg",
        ".gem",
        ".whl",
        ".egg",
    }

    # Archive formats that can contain executables
    ARCHIVE_FORMATS = {
        ".7z",
        ".rar",
        ".cab",
        ".ace",
        ".arj",
        ".lzh",
        ".lha",
        ".zoo",
    }

    # Virtualization and container formats
    VIRTUALIZATION_FORMATS = {
        ".ova",
        ".ovf",
        ".vmdk",
        ".vdi",
        ".vhd",
        ".vhdx",
        ".qcow2",
        ".docker",
    }

    # Office documents with macro capabilities
    OFFICE_MACROS = {
        ".docm",
        ".dotm",
        ".xlsm",
        ".xltm",
        ".xlam",
        ".pptm",
        ".potm",
        ".ppam",
        ".sldm",
    }

    # System shortcuts and configuration files
    SYSTEM_FILES = {
        ".url",
        ".website",
        ".webloc",
        ".desktop",
        ".lnk",
        ".application",
        ".manifest",
        ".deploy",
        ".msu",
        ".patch",
        ".diff",
        ".reg",
        ".inf",
    }

    # System drivers and low-level components
    SYSTEM_DRIVERS = {".sys", ".drv", ".ocx", ".cpl"}

    # Windows theme and customization files
    WINDOWS_THEMES = {
        ".theme",
        ".themepack",
        ".scf",
        ".shs",
        ".shb",
        ".sct",
        ".ws",
        ".job",
        ".msc",
    }

    # Help and documentation files that can execute code
    HELP_FILES = {".chm", ".hlp"}

ErrorCode

Machine-readable error codes for file validation failures.

Source code in safeuploads/exceptions.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class ErrorCode:
    """
    Machine-readable error codes for file validation failures.

    Attributes:
        Error codes are class-level string constants for various
        validation failure types.
    """

    # Filename validation errors
    FILENAME_EMPTY = "FILENAME_EMPTY"
    FILENAME_INVALID = "FILENAME_INVALID"
    FILENAME_TOO_LONG = "FILENAME_TOO_LONG"

    # Unicode security errors
    UNICODE_SECURITY = "UNICODE_SECURITY"
    UNICODE_DANGEROUS_CHARS = "UNICODE_DANGEROUS_CHARS"
    UNICODE_NORMALIZATION_ERROR = "UNICODE_NORMALIZATION_ERROR"

    # Extension validation errors
    EXTENSION_BLOCKED = "EXTENSION_BLOCKED"
    EXTENSION_NOT_ALLOWED = "EXTENSION_NOT_ALLOWED"
    COMPOUND_EXTENSION_BLOCKED = "COMPOUND_EXTENSION_BLOCKED"
    EXTENSION_MISSING = "EXTENSION_MISSING"

    # Windows security errors
    WINDOWS_RESERVED_NAME = "WINDOWS_RESERVED_NAME"

    # File size errors
    FILE_TOO_LARGE = "FILE_TOO_LARGE"
    FILE_EMPTY = "FILE_EMPTY"
    FILE_SIZE_UNKNOWN = "FILE_SIZE_UNKNOWN"

    # MIME type errors
    MIME_TYPE_INVALID = "MIME_TYPE_INVALID"
    MIME_TYPE_MISMATCH = "MIME_TYPE_MISMATCH"
    MIME_DETECTION_FAILED = "MIME_DETECTION_FAILED"

    # File signature errors
    FILE_SIGNATURE_INVALID = "FILE_SIGNATURE_INVALID"
    FILE_SIGNATURE_MISSING = "FILE_SIGNATURE_MISSING"
    FILE_SIGNATURE_MISMATCH = "FILE_SIGNATURE_MISMATCH"

    # Compression and ZIP errors
    ZIP_BOMB_DETECTED = "ZIP_BOMB_DETECTED"
    ZIP_CONTENT_THREAT = "ZIP_CONTENT_THREAT"
    COMPRESSION_RATIO_EXCEEDED = "COMPRESSION_RATIO_EXCEEDED"
    ZIP_TOO_MANY_ENTRIES = "ZIP_TOO_MANY_ENTRIES"
    ZIP_INVALID_STRUCTURE = "ZIP_INVALID_STRUCTURE"
    ZIP_CORRUPT = "ZIP_CORRUPT"
    ZIP_TOO_LARGE = "ZIP_TOO_LARGE"
    ZIP_NESTED_ARCHIVE = "ZIP_NESTED_ARCHIVE"
    ZIP_DIRECTORY_TRAVERSAL = "ZIP_DIRECTORY_TRAVERSAL"
    ZIP_SYMLINK_DETECTED = "ZIP_SYMLINK_DETECTED"
    ZIP_ABSOLUTE_PATH = "ZIP_ABSOLUTE_PATH"
    ZIP_ANALYSIS_TIMEOUT = "ZIP_ANALYSIS_TIMEOUT"

    # Processing errors
    PROCESSING_ERROR = "PROCESSING_ERROR"
    IO_ERROR = "IO_ERROR"
    MEMORY_ERROR = "MEMORY_ERROR"

ExtensionSecurityError

Bases: FilenameSecurityError

Dangerous file extension detected.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename with dangerous extension.

None
extension str | None

Optional specific extension that was blocked.

None
error_code str | None

Optional error code (defaults to EXTENSION_BLOCKED).

None

Attributes:

Name Type Description
extension

The specific extension that was blocked.

Source code in safeuploads/exceptions.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class ExtensionSecurityError(FilenameSecurityError):
    """
    Dangerous file extension detected.

    Args:
        message: Human-readable error description.
        filename: Optional filename with dangerous extension.
        extension: Optional specific extension that was blocked.
        error_code: Optional error code (defaults to
            EXTENSION_BLOCKED).

    Attributes:
        extension: The specific extension that was blocked.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        extension: str | None = None,
        error_code: str | None = None,
    ):
        self.extension = extension
        super().__init__(
            message,
            filename=filename,
            error_code=error_code or ErrorCode.EXTENSION_BLOCKED,
        )

ExtensionSecurityValidator

Bases: BaseValidator

Validates filenames against configured forbidden extensions.

Attributes:

Name Type Description
config

File security configuration settings.

Source code in safeuploads/validators/extension_validator.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class ExtensionSecurityValidator(BaseValidator):
    """
    Validates filenames against configured forbidden extensions.

    Attributes:
        config: File security configuration settings.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the validator.

        Args:
            config: File security configuration settings.
        """
        super().__init__(config)

    def validate_extensions(self, filename: str) -> None:
        """
        Validate filename against blocked extensions.

        Args:
            filename: Name of the file to validate.

        Raises:
            ExtensionSecurityError: If blocked compound or single
                extension detected in filename.
        """
        # Check for compound dangerous extensions first (e.g., .tar.xz, .user.js)
        filename_lower = filename.lower()
        for compound_ext in self.config.COMPOUND_BLOCKED_EXTENSIONS:
            if filename_lower.endswith(compound_ext):
                logger.warning(
                    "Dangerous compound extension detected",
                    extra={
                        "error_type": "compound_extension_blocked",
                        "file_name": filename,
                        "extension": compound_ext,
                    },
                )
                raise ExtensionSecurityError(
                    message=f"Dangerous compound file extension '{compound_ext}' detected in filename. "
                    f"Upload rejected for security.",
                    filename=filename,
                    extension=compound_ext,
                    error_code=ErrorCode.COMPOUND_EXTENSION_BLOCKED,
                )

        # Check ALL extensions in the filename for dangerous ones
        parts = filename.split(".")
        if len(parts) > 1:
            for i in range(1, len(parts)):
                ext = f".{parts[i].lower()}"
                if ext in self.config.BLOCKED_EXTENSIONS:
                    logger.warning(
                        "Dangerous extension detected",
                        extra={
                            "error_type": "extension_blocked",
                            "file_name": filename,
                            "extension": ext,
                        },
                    )
                    raise ExtensionSecurityError(
                        message=f"Dangerous file extension '{ext}' detected in filename. "
                        f"Upload rejected for security.",
                        filename=filename,
                        extension=ext,
                        error_code=ErrorCode.EXTENSION_BLOCKED,
                    )

    def validate(self, filename: str) -> None:
        """
        Validate the given filename.

        Args:
            filename: Name of the file to validate.

        Raises:
            ExtensionSecurityError: If filename extension is not
                permitted.
        """
        return self.validate_extensions(filename)

__init__

__init__(config)

Initialize the validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration settings.

required
Source code in safeuploads/validators/extension_validator.py
24
25
26
27
28
29
30
31
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the validator.

    Args:
        config: File security configuration settings.
    """
    super().__init__(config)

validate

validate(filename)

Validate the given filename.

Parameters:

Name Type Description Default
filename str

Name of the file to validate.

required

Raises:

Type Description
ExtensionSecurityError

If filename extension is not permitted.

Source code in safeuploads/validators/extension_validator.py
86
87
88
89
90
91
92
93
94
95
96
97
def validate(self, filename: str) -> None:
    """
    Validate the given filename.

    Args:
        filename: Name of the file to validate.

    Raises:
        ExtensionSecurityError: If filename extension is not
            permitted.
    """
    return self.validate_extensions(filename)

validate_extensions

validate_extensions(filename)

Validate filename against blocked extensions.

Parameters:

Name Type Description Default
filename str

Name of the file to validate.

required

Raises:

Type Description
ExtensionSecurityError

If blocked compound or single extension detected in filename.

Source code in safeuploads/validators/extension_validator.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def validate_extensions(self, filename: str) -> None:
    """
    Validate filename against blocked extensions.

    Args:
        filename: Name of the file to validate.

    Raises:
        ExtensionSecurityError: If blocked compound or single
            extension detected in filename.
    """
    # Check for compound dangerous extensions first (e.g., .tar.xz, .user.js)
    filename_lower = filename.lower()
    for compound_ext in self.config.COMPOUND_BLOCKED_EXTENSIONS:
        if filename_lower.endswith(compound_ext):
            logger.warning(
                "Dangerous compound extension detected",
                extra={
                    "error_type": "compound_extension_blocked",
                    "file_name": filename,
                    "extension": compound_ext,
                },
            )
            raise ExtensionSecurityError(
                message=f"Dangerous compound file extension '{compound_ext}' detected in filename. "
                f"Upload rejected for security.",
                filename=filename,
                extension=compound_ext,
                error_code=ErrorCode.COMPOUND_EXTENSION_BLOCKED,
            )

    # Check ALL extensions in the filename for dangerous ones
    parts = filename.split(".")
    if len(parts) > 1:
        for i in range(1, len(parts)):
            ext = f".{parts[i].lower()}"
            if ext in self.config.BLOCKED_EXTENSIONS:
                logger.warning(
                    "Dangerous extension detected",
                    extra={
                        "error_type": "extension_blocked",
                        "file_name": filename,
                        "extension": ext,
                    },
                )
                raise ExtensionSecurityError(
                    message=f"Dangerous file extension '{ext}' detected in filename. "
                    f"Upload rejected for security.",
                    filename=filename,
                    extension=ext,
                    error_code=ErrorCode.EXTENSION_BLOCKED,
                )

FileProcessingError

Bases: FileSecurityError

Unexpected processing error during file validation.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
original_error Exception | None

Optional original exception that was caught.

None

Attributes:

Name Type Description
original_error

The original exception that was caught.

Source code in safeuploads/exceptions.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class FileProcessingError(FileSecurityError):
    """
    Unexpected processing error during file validation.

    Args:
        message: Human-readable error description.
        original_error: Optional original exception that was caught.

    Attributes:
        original_error: The original exception that was caught.
    """

    def __init__(self, message: str, original_error: Exception | None = None):
        self.original_error = original_error
        super().__init__(message, error_code=ErrorCode.PROCESSING_ERROR)

FileSecurityConfig

Centralizes file upload security settings and validation.

Attributes:

Name Type Description
limits

Security limits configuration instance.

ALLOWED_IMAGE_MIMES set[str]

Permitted MIME types for images.

ALLOWED_ZIP_MIMES set[str]

Permitted MIME types for ZIP files.

ALLOWED_IMAGE_EXTENSIONS set[str]

Permitted image file extensions.

ALLOWED_ZIP_EXTENSIONS set[str]

Permitted ZIP file extensions.

BLOCKED_EXTENSIONS set[str]

Dangerous file extensions to block.

COMPOUND_BLOCKED_EXTENSIONS set[str]

Multi-part extensions to block.

DANGEROUS_UNICODE_CHARS set[int]

Unicode characters for filename attacks.

WINDOWS_RESERVED_NAMES set[str]

Platform-specific reserved filenames.

Source code in safeuploads/config.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
class FileSecurityConfig:
    """
    Centralizes file upload security settings and validation.

    Attributes:
        limits: Security limits configuration instance.
        ALLOWED_IMAGE_MIMES: Permitted MIME types for images.
        ALLOWED_ZIP_MIMES: Permitted MIME types for ZIP files.
        ALLOWED_IMAGE_EXTENSIONS: Permitted image file extensions.
        ALLOWED_ZIP_EXTENSIONS: Permitted ZIP file extensions.
        BLOCKED_EXTENSIONS: Dangerous file extensions to block.
        COMPOUND_BLOCKED_EXTENSIONS: Multi-part extensions to block.
        DANGEROUS_UNICODE_CHARS: Unicode characters for filename attacks.
        WINDOWS_RESERVED_NAMES: Platform-specific reserved filenames.
    """

    # Security limits configuration
    limits = SecurityLimits()

    # Allowed MIME types for images
    ALLOWED_IMAGE_MIMES: set[str] = {"image/jpeg", "image/jpg", "image/png"}

    # Allowed MIME types for ZIP files
    ALLOWED_ZIP_MIMES: set[str] = {
        "application/zip",
        "application/x-zip-compressed",
        "multipart/x-zip",
    }

    # Allowed file extensions
    ALLOWED_IMAGE_EXTENSIONS: set[str] = {".jpg", ".jpeg", ".png"}
    ALLOWED_ZIP_EXTENSIONS: set[str] = {".zip"}

    # Generate dangerous file extensions from categorized enums
    @staticmethod
    def _generate_blocked_extensions() -> set[str]:
        """
        Aggregate all dangerous extension categories.

        Returns:
            Combined set of blocked file extensions.
        """
        blocked_extensions = set()

        # Combine all dangerous extension categories
        for category in DangerousExtensionCategory:
            blocked_extensions.update(category.value)

        return blocked_extensions

    # Generate compound dangerous file extensions from categorized enums
    @staticmethod
    def _generate_compound_blocked_extensions() -> set[str]:
        """
        Aggregate all compound extension categories.

        Returns:
            Combined set of blocked compound file extensions.
        """
        compound_extensions = set()

        # Combine all compound extension categories
        for category in CompoundExtensionCategory:
            compound_extensions.update(category.value)

        return compound_extensions

    # Generate dangerous Unicode characters from categorized enums
    @staticmethod
    def _generate_dangerous_unicode_chars() -> set[int]:
        """
        Aggregate all dangerous Unicode code points.

        Returns:
            Combined set of dangerous Unicode code points.
        """
        dangerous_chars = set()

        # Combine all Unicode attack categories
        for category in UnicodeAttackCategory:
            dangerous_chars.update(category.value)

        return dangerous_chars

    # Dangerous file extensions to explicitly block (generated from enums)
    BLOCKED_EXTENSIONS: set[str] = _generate_blocked_extensions()

    # Compound dangerous file extensions (multi-part extensions)
    # These are checked as complete strings, not individual parts
    COMPOUND_BLOCKED_EXTENSIONS: set[str] = _generate_compound_blocked_extensions()

    # Dangerous Unicode characters that can be used for filename attacks
    # These characters can disguise file extensions or cause rendering issues
    DANGEROUS_UNICODE_CHARS: set[int] = _generate_dangerous_unicode_chars()

    # Windows reserved names that cannot be used as filenames
    # These names are reserved by Windows regardless of extension
    WINDOWS_RESERVED_NAMES: set[str] = {
        "con",
        "prn",
        "aux",
        "nul",
        "com1",
        "com2",
        "com3",
        "com4",
        "com5",
        "com6",
        "com7",
        "com8",
        "com9",
        "lpt1",
        "lpt2",
        "lpt3",
        "lpt4",
        "lpt5",
        "lpt6",
        "lpt7",
        "lpt8",
        "lpt9",
    }

    # Configuration validation trigger
    @classmethod
    def __init_subclass__(cls, **kwargs):
        """
        Hook for subclass creation to validate configuration.

        Args:
            **kwargs: Subclass initialization arguments.
        """
        super().__init_subclass__(**kwargs)
        # Perform validation with warnings allowed (non-strict mode)
        try:
            cls.validate_and_report(strict=False)
        except Exception as err:
            logger.warning("Configuration validation failed: %s", err)

    @classmethod
    def get_extensions_by_category(
        cls, category: DangerousExtensionCategory
    ) -> set[str]:
        """
        Return extensions for a dangerous extension category.

        Args:
            category: The dangerous extension category.

        Returns:
            Copy of extensions in the specified category.
        """
        return category.value.copy()

    @classmethod
    def get_compound_extensions_by_category(
        cls, category: CompoundExtensionCategory
    ) -> set[str]:
        """
        Return compound extensions for a category.

        Args:
            category: The compound extension category.

        Returns:
            Copy of compound extensions in the specified category.
        """
        return category.value.copy()

    @classmethod
    def get_unicode_chars_by_category(cls, category: UnicodeAttackCategory) -> set[int]:
        """
        Return Unicode code points for an attack category.

        Args:
            category: The Unicode attack category.

        Returns:
            Copy of code points in the specified category.
        """
        return category.value.copy()

    @classmethod
    def is_extension_in_category(
        cls, extension: str, category: DangerousExtensionCategory
    ) -> bool:
        """
        Check if extension belongs to a dangerous category.

        Args:
            extension: File extension to evaluate.
            category: Category to check against.

        Returns:
            True if extension is in the category, False otherwise.
        """
        return extension.lower() in category.value

    @classmethod
    def get_extension_category(
        cls, extension: str
    ) -> DangerousExtensionCategory | None:
        """
        Return the dangerous extension category for an extension.

        Args:
            extension: The file extension to evaluate.

        Returns:
            Matching category if dangerous, None otherwise.
        """
        extension_lower = extension.lower()
        for category in DangerousExtensionCategory:
            if extension_lower in category.value:
                return category
        return None

    @classmethod
    def validate_configuration(cls, strict: bool = True) -> list[ConfigValidationError]:
        """
        Run all configuration validation routines.

        Args:
            strict: Reserved for future behavior adjustments.

        Returns:
            List of detected validation errors.
        """
        errors = []

        # Validate file size limits
        errors.extend(cls._validate_file_size_limits())

        # Validate MIME type configurations
        errors.extend(cls._validate_mime_configurations())

        # Validate file extension configurations
        errors.extend(cls._validate_extension_configurations())

        # Validate ZIP compression settings
        errors.extend(cls._validate_compression_settings())

        # Validate enum consistency
        errors.extend(cls._validate_enum_consistency())

        # Validate cross-configuration dependencies
        errors.extend(cls._validate_cross_dependencies())

        return errors

    @classmethod
    def _validate_file_size_limits(cls) -> list[ConfigValidationError]:
        """
        Validate configured file size limits.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check image size limits
        if cls.limits.max_image_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_size_limit",
                    message="max_image_size must be greater than 0",
                    severity="error",
                    component="file_sizes",
                    recommendation="Set max_image_size to a positive value (e.g., 20MB)",
                )
            )

        if cls.limits.max_image_size > 100 * 1024 * 1024:  # 100MB
            errors.append(
                ConfigValidationError(
                    error_type="excessive_size_limit",
                    message=f"max_image_size ({cls.limits.max_image_size // (1024*1024)}MB) is very large",
                    severity="warning",
                    component="file_sizes",
                    recommendation="Consider reducing image size limit to prevent resource exhaustion",
                )
            )

        # Check ZIP size limits
        if cls.limits.max_zip_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_size_limit",
                    message="max_zip_size must be greater than 0",
                    severity="error",
                    component="file_sizes",
                    recommendation="Set max_zip_size to a positive value (e.g., 500MB)",
                )
            )

        if cls.limits.max_zip_size > 2 * 1024 * 1024 * 1024:  # 2GB
            errors.append(
                ConfigValidationError(
                    error_type="excessive_size_limit",
                    message=f"max_zip_size ({cls.limits.max_zip_size // (1024*1024)}MB) is very large",
                    severity="warning",
                    component="file_sizes",
                    recommendation="Consider reducing ZIP size limit to prevent resource exhaustion",
                )
            )

        # Validate size relationship
        if cls.limits.max_zip_size <= cls.limits.max_image_size:
            errors.append(
                ConfigValidationError(
                    error_type="inconsistent_size_limits",
                    message="max_zip_size should typically be larger than max_image_size",
                    severity="warning",
                    component="file_sizes",
                    recommendation="ZIP files usually contain multiple files and should have higher limits",
                )
            )

        return errors

    @classmethod
    def _validate_mime_configurations(cls) -> list[ConfigValidationError]:
        """
        Validate MIME type configurations.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check image MIME types
        if not cls.ALLOWED_IMAGE_MIMES:
            errors.append(
                ConfigValidationError(
                    error_type="empty_mime_set",
                    message="ALLOWED_IMAGE_MIMES cannot be empty",
                    severity="error",
                    component="mime_types",
                    recommendation="Add at least one allowed image MIME type",
                )
            )

        # Validate image MIME type format
        for mime_type in cls.ALLOWED_IMAGE_MIMES:
            if not mime_type.startswith("image/"):
                errors.append(
                    ConfigValidationError(
                        error_type="invalid_image_mime",
                        message=f"Image MIME type '{mime_type}' should start with 'image/'",
                        severity="warning",
                        component="mime_types",
                        recommendation="Use standard image MIME types like 'image/jpeg', 'image/png'",
                    )
                )

        # Check ZIP MIME types
        if not cls.ALLOWED_ZIP_MIMES:
            errors.append(
                ConfigValidationError(
                    error_type="empty_mime_set",
                    message="ALLOWED_ZIP_MIMES cannot be empty",
                    severity="error",
                    component="mime_types",
                    recommendation="Add at least one allowed ZIP MIME type",
                )
            )

        # Check for duplicate MIME types
        all_mimes = list(cls.ALLOWED_IMAGE_MIMES) + list(cls.ALLOWED_ZIP_MIMES)
        duplicates = set([mime for mime in all_mimes if all_mimes.count(mime) > 1])
        if duplicates:
            errors.append(
                ConfigValidationError(
                    error_type="duplicate_mime_types",
                    message=f"Duplicate MIME types found: {duplicates}",
                    severity="warning",
                    component="mime_types",
                    recommendation="Remove duplicate MIME types to avoid confusion",
                )
            )

        return errors

    @classmethod
    def _validate_extension_configurations(cls) -> list[ConfigValidationError]:
        """
        Validate file extension configurations.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check extension format
        for ext_set_name, ext_set in [
            ("ALLOWED_IMAGE_EXTENSIONS", cls.ALLOWED_IMAGE_EXTENSIONS),
            ("ALLOWED_ZIP_EXTENSIONS", cls.ALLOWED_ZIP_EXTENSIONS),
        ]:
            if not ext_set:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_extension_set",
                        message=f"{ext_set_name} cannot be empty",
                        severity="error",
                        component="extensions",
                        recommendation=f"Add at least one extension to {ext_set_name}",
                    )
                )

            for ext in ext_set:
                if not ext.startswith("."):
                    errors.append(
                        ConfigValidationError(
                            error_type="invalid_extension_format",
                            message=f"Extension '{ext}' in {ext_set_name} should start with '.'",
                            severity="error",
                            component="extensions",
                            recommendation="Use format '.ext' for file extensions",
                        )
                    )

        # Check blocked extensions
        if not cls.BLOCKED_EXTENSIONS:
            errors.append(
                ConfigValidationError(
                    error_type="empty_blocked_extensions",
                    message="BLOCKED_EXTENSIONS is empty - security risk",
                    severity="error",
                    component="extensions",
                    recommendation="Ensure dangerous extensions are properly blocked",
                )
            )

        # Check for overlap between allowed and blocked extensions
        image_blocked = cls.ALLOWED_IMAGE_EXTENSIONS.intersection(
            cls.BLOCKED_EXTENSIONS
        )
        if image_blocked:
            errors.append(
                ConfigValidationError(
                    error_type="extension_conflict",
                    message=f"Image extensions {image_blocked} are both allowed and blocked",
                    severity="error",
                    component="extensions",
                    recommendation="Remove conflicts between allowed and blocked extensions",
                )
            )

        zip_blocked = cls.ALLOWED_ZIP_EXTENSIONS.intersection(cls.BLOCKED_EXTENSIONS)
        if zip_blocked:
            errors.append(
                ConfigValidationError(
                    error_type="extension_conflict",
                    message=f"ZIP extensions {zip_blocked} are both allowed and blocked",
                    severity="error",
                    component="extensions",
                    recommendation="Remove conflicts between allowed and blocked extensions",
                )
            )

        # Check compound extension consistency
        compound_overlap = cls.BLOCKED_EXTENSIONS.intersection(
            cls.COMPOUND_BLOCKED_EXTENSIONS
        )
        if compound_overlap:
            errors.append(
                ConfigValidationError(
                    error_type="compound_extension_overlap",
                    message=f"Extensions {compound_overlap} appear in both blocked and compound blocked lists",
                    severity="warning",
                    component="extensions",
                    recommendation="Compound extensions should only be in COMPOUND_BLOCKED_EXTENSIONS",
                )
            )

        return errors

    @classmethod
    def _validate_compression_settings(cls) -> list[ConfigValidationError]:
        """
        Validate compression-related limits.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Validate compression ratio
        if cls.limits.max_compression_ratio <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_compression_ratio",
                    message="max_compression_ratio must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable compression ratio limit (e.g., 100:1)",
                )
            )

        if cls.limits.max_compression_ratio < 10:
            errors.append(
                ConfigValidationError(
                    error_type="too_strict_compression",
                    message=f"max_compression_ratio ({cls.limits.max_compression_ratio}) is very strict",
                    severity="warning",
                    component="compression",
                    recommendation="Consider allowing higher compression ratios for legitimate files",
                )
            )

        if cls.limits.max_compression_ratio > 1000:
            errors.append(
                ConfigValidationError(
                    error_type="too_permissive_compression",
                    message=f"max_compression_ratio ({cls.limits.max_compression_ratio}) may allow zip bombs",
                    severity="warning",
                    component="compression",
                    recommendation="Reduce compression ratio limit to prevent zip bomb attacks",
                )
            )

        # Validate uncompressed size limit
        if cls.limits.max_uncompressed_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_uncompressed_size",
                    message="max_uncompressed_size must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable uncompressed size limit",
                )
            )

        # Validate individual file size limit
        if cls.limits.max_individual_file_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_individual_file_size",
                    message="max_individual_file_size must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable individual file size limit",
                )
            )

        # Check individual file size doesn't exceed total uncompressed size
        if cls.limits.max_individual_file_size > cls.limits.max_uncompressed_size:
            errors.append(
                ConfigValidationError(
                    error_type="inconsistent_size_limits",
                    message=f"max_individual_file_size ({cls.limits.max_individual_file_size // (1024*1024)}MB) "
                    f"exceeds max_uncompressed_size ({cls.limits.max_uncompressed_size // (1024*1024)}MB)",
                    severity="warning",
                    component="compression",
                    recommendation="Individual file size limit should not exceed total uncompressed size limit",
                )
            )

        # Validate ZIP entry limits
        if cls.limits.max_zip_entries <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_zip_entries",
                    message="max_zip_entries must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable limit for ZIP file entries",
                )
            )

        if cls.limits.max_zip_entries > 100000:
            errors.append(
                ConfigValidationError(
                    error_type="excessive_zip_entries",
                    message=f"max_zip_entries ({cls.limits.max_zip_entries}) is very high",
                    severity="warning",
                    component="compression",
                    recommendation="High entry limits may impact performance",
                )
            )

        # Validate timeout settings
        if cls.limits.zip_analysis_timeout <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_timeout",
                    message="zip_analysis_timeout must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable timeout for ZIP analysis",
                )
            )

        if cls.limits.zip_analysis_timeout > 30:
            errors.append(
                ConfigValidationError(
                    error_type="excessive_timeout",
                    message=f"zip_analysis_timeout ({cls.limits.zip_analysis_timeout}s) is very long",
                    severity="warning",
                    component="compression",
                    recommendation="Long timeouts may impact user experience",
                )
            )

        return errors

    @classmethod
    def _validate_enum_consistency(cls) -> list[ConfigValidationError]:
        """
        Validate enum categories for emptiness and overlaps.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check for empty enum categories
        for category in DangerousExtensionCategory:
            if not category.value:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_enum_category",
                        message=f"Extension category {category.name} is empty",
                        severity="warning",
                        component="enums",
                        recommendation=f"Add extensions to {category.name} or remove unused category",
                    )
                )

        for category in CompoundExtensionCategory:
            if not category.value:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_enum_category",
                        message=f"Compound extension category {category.name} is empty",
                        severity="warning",
                        component="enums",
                        recommendation=f"Add extensions to {category.name} or remove unused category",
                    )
                )

        for category in UnicodeAttackCategory:
            if not category.value:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_enum_category",
                        message=f"Unicode attack category {category.name} is empty",
                        severity="warning",
                        component="enums",
                        recommendation=f"Add Unicode characters to {category.name} or remove unused category",
                    )
                )

        # Check for overlapping extensions between categories
        all_extensions_by_category = {}
        for category in DangerousExtensionCategory:
            all_extensions_by_category[category.name] = category.value

        for cat1_name, cat1_exts in all_extensions_by_category.items():
            for cat2_name, cat2_exts in all_extensions_by_category.items():
                if cat1_name != cat2_name:
                    overlap = cat1_exts.intersection(cat2_exts)
                    if overlap:
                        errors.append(
                            ConfigValidationError(
                                error_type="category_overlap",
                                message=f"Categories {cat1_name} and {cat2_name} share extensions: {overlap}",
                                severity="info",
                                component="enums",
                                recommendation="Consider if extensions should belong to multiple categories",
                            )
                        )

        return errors

    @classmethod
    def _validate_cross_dependencies(cls) -> list[ConfigValidationError]:
        """
        Validate cross-field configuration constraints.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check Windows reserved names format
        for name in cls.WINDOWS_RESERVED_NAMES:
            if not name.islower():
                errors.append(
                    ConfigValidationError(
                        error_type="case_sensitive_reserved_name",
                        message=f"Windows reserved name '{name}' should be lowercase",
                        severity="warning",
                        component="reserved_names",
                        recommendation="Use lowercase for consistent case-insensitive matching",
                    )
                )

        # Validate Unicode character ranges
        for char_code in cls.DANGEROUS_UNICODE_CHARS:
            if not isinstance(char_code, int):
                errors.append(
                    ConfigValidationError(
                        error_type="invalid_unicode_char",
                        message=f"Unicode character code {char_code} is not an integer",
                        severity="error",
                        component="unicode",
                        recommendation="Use integer Unicode code points",
                    )
                )
            elif char_code < 0 or char_code > 0x10FFFF:
                errors.append(
                    ConfigValidationError(
                        error_type="invalid_unicode_range",
                        message=f"Unicode character code {char_code} is outside valid range",
                        severity="error",
                        component="unicode",
                        recommendation="Use valid Unicode code points (0-0x10FFFF)",
                    )
                )

        return errors

    @classmethod
    def validate_and_report(cls, strict: bool = True) -> None:
        """
        Validate configuration and log outcomes.

        Args:
            strict: If True, raise on errors/warnings.

        Raises:
            FileSecurityConfigurationError: If strict and issues found.
        """
        errors = cls.validate_configuration(strict=strict)

        if not errors:
            logger.info("File security configuration validation passed")
            return

        # Separate errors by severity
        error_list = [e for e in errors if e.severity == "error"]
        warning_list = [e for e in errors if e.severity == "warning"]
        info_list = [e for e in errors if e.severity == "info"]

        # Log validation results
        if error_list:
            for error in error_list:
                logger.error(
                    "Configuration error in %s: %s. %s",
                    error.component,
                    error.message,
                    error.recommendation,
                )

        if warning_list:
            for warning in warning_list:
                logger.warning(
                    "Configuration warning in %s: %s. %s",
                    warning.component,
                    warning.message,
                    warning.recommendation,
                )

        if info_list:
            for info in info_list:
                logger.info(
                    "Configuration info in %s: %s. %s",
                    info.component,
                    info.message,
                    info.recommendation,
                )

        # Raise exception if there are errors and strict mode is enabled
        if error_list and strict:
            raise FileSecurityConfigurationError(error_list)
        elif (error_list or warning_list) and strict:
            raise FileSecurityConfigurationError(error_list + warning_list)

__init_subclass__ classmethod

__init_subclass__(**kwargs)

Hook for subclass creation to validate configuration.

Parameters:

Name Type Description Default
**kwargs

Subclass initialization arguments.

{}
Source code in safeuploads/config.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@classmethod
def __init_subclass__(cls, **kwargs):
    """
    Hook for subclass creation to validate configuration.

    Args:
        **kwargs: Subclass initialization arguments.
    """
    super().__init_subclass__(**kwargs)
    # Perform validation with warnings allowed (non-strict mode)
    try:
        cls.validate_and_report(strict=False)
    except Exception as err:
        logger.warning("Configuration validation failed: %s", err)

get_compound_extensions_by_category classmethod

get_compound_extensions_by_category(category)

Return compound extensions for a category.

Parameters:

Name Type Description Default
category CompoundExtensionCategory

The compound extension category.

required

Returns:

Type Description
set[str]

Copy of compound extensions in the specified category.

Source code in safeuploads/config.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@classmethod
def get_compound_extensions_by_category(
    cls, category: CompoundExtensionCategory
) -> set[str]:
    """
    Return compound extensions for a category.

    Args:
        category: The compound extension category.

    Returns:
        Copy of compound extensions in the specified category.
    """
    return category.value.copy()

get_extension_category classmethod

get_extension_category(extension)

Return the dangerous extension category for an extension.

Parameters:

Name Type Description Default
extension str

The file extension to evaluate.

required

Returns:

Type Description
DangerousExtensionCategory | None

Matching category if dangerous, None otherwise.

Source code in safeuploads/config.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
@classmethod
def get_extension_category(
    cls, extension: str
) -> DangerousExtensionCategory | None:
    """
    Return the dangerous extension category for an extension.

    Args:
        extension: The file extension to evaluate.

    Returns:
        Matching category if dangerous, None otherwise.
    """
    extension_lower = extension.lower()
    for category in DangerousExtensionCategory:
        if extension_lower in category.value:
            return category
    return None

get_extensions_by_category classmethod

get_extensions_by_category(category)

Return extensions for a dangerous extension category.

Parameters:

Name Type Description Default
category DangerousExtensionCategory

The dangerous extension category.

required

Returns:

Type Description
set[str]

Copy of extensions in the specified category.

Source code in safeuploads/config.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@classmethod
def get_extensions_by_category(
    cls, category: DangerousExtensionCategory
) -> set[str]:
    """
    Return extensions for a dangerous extension category.

    Args:
        category: The dangerous extension category.

    Returns:
        Copy of extensions in the specified category.
    """
    return category.value.copy()

get_unicode_chars_by_category classmethod

get_unicode_chars_by_category(category)

Return Unicode code points for an attack category.

Parameters:

Name Type Description Default
category UnicodeAttackCategory

The Unicode attack category.

required

Returns:

Type Description
set[int]

Copy of code points in the specified category.

Source code in safeuploads/config.py
235
236
237
238
239
240
241
242
243
244
245
246
@classmethod
def get_unicode_chars_by_category(cls, category: UnicodeAttackCategory) -> set[int]:
    """
    Return Unicode code points for an attack category.

    Args:
        category: The Unicode attack category.

    Returns:
        Copy of code points in the specified category.
    """
    return category.value.copy()

is_extension_in_category classmethod

is_extension_in_category(extension, category)

Check if extension belongs to a dangerous category.

Parameters:

Name Type Description Default
extension str

File extension to evaluate.

required
category DangerousExtensionCategory

Category to check against.

required

Returns:

Type Description
bool

True if extension is in the category, False otherwise.

Source code in safeuploads/config.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@classmethod
def is_extension_in_category(
    cls, extension: str, category: DangerousExtensionCategory
) -> bool:
    """
    Check if extension belongs to a dangerous category.

    Args:
        extension: File extension to evaluate.
        category: Category to check against.

    Returns:
        True if extension is in the category, False otherwise.
    """
    return extension.lower() in category.value

validate_and_report classmethod

validate_and_report(strict=True)

Validate configuration and log outcomes.

Parameters:

Name Type Description Default
strict bool

If True, raise on errors/warnings.

True

Raises:

Type Description
FileSecurityConfigurationError

If strict and issues found.

Source code in safeuploads/config.py
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
@classmethod
def validate_and_report(cls, strict: bool = True) -> None:
    """
    Validate configuration and log outcomes.

    Args:
        strict: If True, raise on errors/warnings.

    Raises:
        FileSecurityConfigurationError: If strict and issues found.
    """
    errors = cls.validate_configuration(strict=strict)

    if not errors:
        logger.info("File security configuration validation passed")
        return

    # Separate errors by severity
    error_list = [e for e in errors if e.severity == "error"]
    warning_list = [e for e in errors if e.severity == "warning"]
    info_list = [e for e in errors if e.severity == "info"]

    # Log validation results
    if error_list:
        for error in error_list:
            logger.error(
                "Configuration error in %s: %s. %s",
                error.component,
                error.message,
                error.recommendation,
            )

    if warning_list:
        for warning in warning_list:
            logger.warning(
                "Configuration warning in %s: %s. %s",
                warning.component,
                warning.message,
                warning.recommendation,
            )

    if info_list:
        for info in info_list:
            logger.info(
                "Configuration info in %s: %s. %s",
                info.component,
                info.message,
                info.recommendation,
            )

    # Raise exception if there are errors and strict mode is enabled
    if error_list and strict:
        raise FileSecurityConfigurationError(error_list)
    elif (error_list or warning_list) and strict:
        raise FileSecurityConfigurationError(error_list + warning_list)

validate_configuration classmethod

validate_configuration(strict=True)

Run all configuration validation routines.

Parameters:

Name Type Description Default
strict bool

Reserved for future behavior adjustments.

True

Returns:

Type Description
list[ConfigValidationError]

List of detected validation errors.

Source code in safeuploads/config.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@classmethod
def validate_configuration(cls, strict: bool = True) -> list[ConfigValidationError]:
    """
    Run all configuration validation routines.

    Args:
        strict: Reserved for future behavior adjustments.

    Returns:
        List of detected validation errors.
    """
    errors = []

    # Validate file size limits
    errors.extend(cls._validate_file_size_limits())

    # Validate MIME type configurations
    errors.extend(cls._validate_mime_configurations())

    # Validate file extension configurations
    errors.extend(cls._validate_extension_configurations())

    # Validate ZIP compression settings
    errors.extend(cls._validate_compression_settings())

    # Validate enum consistency
    errors.extend(cls._validate_enum_consistency())

    # Validate cross-configuration dependencies
    errors.extend(cls._validate_cross_dependencies())

    return errors

FileSecurityConfigurationError

Bases: Exception

Configuration validation failed with aggregated errors.

Parameters:

Name Type Description Default
errors list[ConfigValidationError]

List of ConfigValidationError instances.

required

Attributes:

Name Type Description
errors

List of validation errors that caused failure.

Source code in safeuploads/exceptions.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class FileSecurityConfigurationError(Exception):
    """
    Configuration validation failed with aggregated errors.

    Args:
        errors: List of ConfigValidationError instances.

    Attributes:
        errors: List of validation errors that caused failure.
    """

    def __init__(self, errors: list[ConfigValidationError]):
        self.errors = errors
        error_messages = [
            f"{error.severity.upper()}: {error.message}" for error in errors
        ]
        super().__init__(
            f"Configuration validation failed: {'; '.join(error_messages)}"
        )

FileSecurityError

Bases: Exception

Base exception for all file security validation failures.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
error_code str | None

Optional machine-readable error code.

None

Attributes:

Name Type Description
message

Human-readable error message.

error_code

Machine-readable error code from ErrorCode.

Source code in safeuploads/exceptions.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class FileSecurityError(Exception):
    """
    Base exception for all file security validation failures.

    Args:
        message: Human-readable error description.
        error_code: Optional machine-readable error code.

    Attributes:
        message: Human-readable error message.
        error_code: Machine-readable error code from ErrorCode.
    """

    def __init__(self, message: str, error_code: str | None = None):
        self.message = message
        self.error_code = error_code
        super().__init__(message)

FileSignatureError

Bases: FileValidationError

File header signature invalid or mismatched.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename with signature issue.

None
expected_type str | None

Optional expected file type based on extension.

None

Attributes:

Name Type Description
expected_type

The expected file type based on extension.

Source code in safeuploads/exceptions.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class FileSignatureError(FileValidationError):
    """
    File header signature invalid or mismatched.

    Args:
        message: Human-readable error description.
        filename: Optional filename with signature issue.
        expected_type: Optional expected file type based on extension.

    Attributes:
        expected_type: The expected file type based on extension.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        expected_type: str | None = None,
    ):
        self.expected_type = expected_type
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.FILE_SIGNATURE_MISMATCH,
        )

FileSizeError

Bases: FileValidationError

File exceeds configured size limits.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename that exceeded size limits.

None
size int | None

Optional actual file size in bytes.

None
max_size int | None

Optional maximum allowed size in bytes.

None

Attributes:

Name Type Description
size

The actual file size in bytes.

max_size

The maximum allowed size in bytes.

Source code in safeuploads/exceptions.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class FileSizeError(FileValidationError):
    """
    File exceeds configured size limits.

    Args:
        message: Human-readable error description.
        filename: Optional filename that exceeded size limits.
        size: Optional actual file size in bytes.
        max_size: Optional maximum allowed size in bytes.

    Attributes:
        size: The actual file size in bytes.
        max_size: The maximum allowed size in bytes.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        size: int | None = None,
        max_size: int | None = None,
    ):
        self.size = size
        self.max_size = max_size
        super().__init__(
            message, filename=filename, error_code=ErrorCode.FILE_TOO_LARGE
        )

FileValidationError

Bases: FileSecurityError

File validation failed.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional name of the file that failed validation.

None
error_code str | None

Optional machine-readable error code.

None

Attributes:

Name Type Description
filename

Name of the file that failed validation.

Source code in safeuploads/exceptions.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class FileValidationError(FileSecurityError):
    """
    File validation failed.

    Args:
        message: Human-readable error description.
        filename: Optional name of the file that failed validation.
        error_code: Optional machine-readable error code.

    Attributes:
        filename: Name of the file that failed validation.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        error_code: str | None = None,
    ):
        self.filename = filename
        super().__init__(message, error_code)

FileValidator

Coordinated security validation for uploaded files.

Attributes:

Name Type Description
config

Active security configuration.

unicode_validator

Validator for Unicode-related checks.

extension_validator

Validator for file extension rules.

windows_validator

Validator enforcing Windows-specific constraints.

compression_validator

Validator handling compressed file limits.

zip_inspector

Inspector for ZIP archive contents.

magic_mime

MIME type detector based on python-magic.

magic_available

Whether python-magic was successfully initialized.

Source code in safeuploads/file_validator.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
class FileValidator:
    """
    Coordinated security validation for uploaded files.

    Attributes:
        config: Active security configuration.
        unicode_validator: Validator for Unicode-related checks.
        extension_validator: Validator for file extension rules.
        windows_validator: Validator enforcing Windows-specific constraints.
        compression_validator: Validator handling compressed file limits.
        zip_inspector: Inspector for ZIP archive contents.
        magic_mime: MIME type detector based on python-magic.
        magic_available: Whether python-magic was successfully initialized.
    """

    def __init__(self, config: FileSecurityConfig | None = None):
        """
        Initialize file validator with configuration and detection utilities.

        Args:
            config: Optional configuration object defining file security
                rules. Defaults to new FileSecurityConfig instance.

        Attributes:
            config: Active security configuration.
            unicode_validator: Validator for Unicode-related checks.
            extension_validator: Validator for file extension rules.
            windows_validator: Validator enforcing Windows constraints.
            compression_validator: Validator for compressed file limits.
            zip_inspector: Inspector for ZIP archive contents.
            magic_mime: MIME type detector based on python-magic.
            magic_available: Whether python-magic initialized successfully.
        """
        self.config = config or FileSecurityConfig()

        # Initialize specialized validators
        self.unicode_validator = UnicodeSecurityValidator(self.config)
        self.extension_validator = ExtensionSecurityValidator(self.config)
        self.windows_validator = WindowsSecurityValidator(self.config)
        self.compression_validator = CompressionSecurityValidator(self.config)
        self.zip_inspector = ZipContentInspector(self.config)

        # Initialize python-magic for content-based detection
        try:
            self.magic_mime = magic.Magic(mime=True)
            self.magic_available = True
            logger.debug("File content detection (python-magic) initialized")
        except Exception as err:
            self.magic_available = False
            logger.warning(
                "python-magic not available for content detection: %s",
                err,
            )

    def _detect_mime_type(self, file_content: bytes, filename: str) -> str:
        """
        Determine MIME type for file content.

        Args:
            file_content: Raw bytes of the file to inspect.
            filename: Original filename for fallback MIME detection.

        Returns:
            Detected MIME type or "application/octet-stream" if detection
            fails.
        """
        detected_mime = None

        # Content-based detection using python-magic (most reliable)
        if self.magic_available:
            try:
                detected_mime = self.magic_mime.from_buffer(file_content)
            except Exception as err:
                logger.warning("Magic MIME detection failed: %s", err)

        # Fallback to filename-based detection
        if not detected_mime:
            logger.info("Fallback to filename-based MIME detection")
            detected_mime, _ = mimetypes.guess_type(filename)

        return detected_mime or "application/octet-stream"

    def _validate_file_signature(self, file_content: bytes, expected_type: str) -> None:
        """
        Verify file content begins with known signature for expected type.

        Args:
            file_content: Raw bytes of the uploaded file.
            expected_type: Logical file category ("image" or "zip").

        Raises:
            FileSignatureError: File header doesn't match expected type
                signatures.
        """
        if len(file_content) < 4:
            raise FileSignatureError(
                f"File too small to verify {expected_type} signature",
                expected_type=expected_type,
            )

        # Common file signatures
        signatures = {
            "image": [
                b"\xff\xd8\xff",  # JPEG
                b"\xff\xd8\xff\xe1",  # JPEG EXIF (additional JPEG variant)
                b"\x89PNG\r\n\x1a\n",  # PNG
            ],
            "zip": [
                b"PK\x03\x04",  # ZIP file
                b"PK\x05\x06",  # Empty ZIP
                b"PK\x07\x08",  # ZIP with spanning
            ],
        }

        expected_signatures = signatures.get(expected_type, [])

        for signature in expected_signatures:
            if file_content.startswith(signature):
                return  # Signature matched

        # No matching signature found
        raise FileSignatureError(
            f"File content does not match expected {expected_type} format",
            expected_type=expected_type,
        )

    def _sanitize_filename(self, filename: str) -> str:
        """
        Sanitize user-provided filename to prevent security risks.

        Args:
            filename: Original filename supplied by the user.

        Returns:
            Sanitized filename safe for storage and processing.

        Raises:
            UnicodeSecurityError: Filename contains dangerous Unicode
                characters or fails normalization checks.
            WindowsReservedNameError: Filename uses Windows reserved
                device names.
            ExtensionSecurityError: Filename contains blocked or
                dangerous file extensions.
            ValueError: Filename is empty string.
        """
        if not filename:
            raise ValueError("Filename cannot be empty")

        # Unicode security validation (must be first)
        # This detects and blocks Unicode-based attacks before any other processing
        filename = self.unicode_validator.validate_unicode_security(filename)

        # Remove path components to prevent directory traversal
        filename = os.path.basename(filename)

        # Remove null bytes and control characters
        filename = "".join(
            char for char in filename if ord(char) >= 32 and char != "\x7f"
        )

        # Remove dangerous characters that could be used for path traversal or command injection
        dangerous_chars = '<>:"/\\|?*\x00'
        for char in dangerous_chars:
            filename = filename.replace(char, "_")

        # Check for Windows reserved names before any other processing
        # This must be done early to prevent reserved names from being created
        self.windows_validator.validate_windows_reserved_names(filename)

        # Handle compound and double extensions security risk
        # This also checks all dangerous extensions
        self.extension_validator.validate_extensions(filename)

        # Limit filename length (preserve extension)
        name_part, ext_part = os.path.splitext(filename)
        if len(name_part) > 100:
            name_part = name_part[:100]
            filename = name_part + ext_part

        # Ensure we don't end up with just an extension or empty name
        if not name_part or name_part.strip() == "":
            filename = f"file_{int(time.time())}{ext_part}"

        # Final check: ensure the sanitized filename doesn't become a reserved name
        self.windows_validator.validate_windows_reserved_names(filename)

        logger.debug(
            "Filename sanitized: original='%s' -> sanitized='%s'",
            os.path.basename(filename if filename else "None"),
            filename,
        )

        return filename

    def _validate_filename(self, file: UploadFile) -> None:
        """
        Validate filename of uploaded file and sanitize it in place.

        Args:
            file: Uploaded file whose filename should be validated and
                sanitized.

        Raises:
            FilenameSecurityError: Filename is empty, invalid, or fails
                sanitization.
            FileProcessingError: Unexpected error during filename
                validation.
        """
        # Check filename
        if not file.filename:
            raise FilenameSecurityError(
                "Filename is required",
                error_code=ErrorCode.FILENAME_EMPTY,
            )

        # Sanitize the filename to prevent security issues
        try:
            sanitized_filename = self._sanitize_filename(file.filename)

            # Update the file object with sanitized filename
            file.filename = sanitized_filename

            # Additional validation after sanitization
            if not sanitized_filename or sanitized_filename.strip() == "":
                raise FilenameSecurityError(
                    "Invalid filename after sanitization",
                    filename=file.filename,
                    error_code=ErrorCode.FILENAME_INVALID,
                )
        except FileValidationError:
            # Let FileValidationError and subclasses propagate
            raise
        except Exception as err:
            logger.exception("Unexpected error during filename validation: %s", err)
            raise FileProcessingError(
                "Filename validation failed due to internal error",
                original_error=err,
            ) from err

    def _validate_file_extension(
        self, file: UploadFile, allowed_extensions: set[str]
    ) -> None:
        """
        Validate extension of uploaded file against allowed and blocked lists.

        Args:
            file: File whose extension will be validated.
            allowed_extensions: Set of allowed file extensions.

        Raises:
            FilenameSecurityError: Filename is missing.
            ExtensionSecurityError: Extension is not allowed or is blocked.
        """
        # Check file extension
        if not file.filename:
            raise FilenameSecurityError(
                "Filename is required for extension validation",
                error_code=ErrorCode.FILENAME_EMPTY,
            )

        _, ext = os.path.splitext(file.filename.lower())
        if ext not in allowed_extensions:
            raise ExtensionSecurityError(
                f"Invalid file extension. Allowed: {', '.join(allowed_extensions)}",
                filename=file.filename,
                extension=ext,
                error_code=ErrorCode.EXTENSION_NOT_ALLOWED,
            )

        # Check for blocked extensions
        if ext in self.config.BLOCKED_EXTENSIONS:
            raise ExtensionSecurityError(
                f"File extension {ext} is blocked for security reasons",
                filename=file.filename,
                extension=ext,
                error_code=ErrorCode.EXTENSION_BLOCKED,
            )

    async def _validate_file_size(
        self, file: UploadFile, max_file_size: int
    ) -> tuple[bytes, int]:
        """
        Validate uploaded file size by sampling content and determining total bytes.

        Args:
            file: Uploaded file supporting asynchronous read and seek.
            max_file_size: Maximum allowed file size in bytes.

        Returns:
            Tuple containing first 8 KB of file content and detected file
            size in bytes.

        Raises:
            FileSizeError: File size exceeds maximum or file is empty.
        """
        # Read first chunk for content analysis
        file_content = await file.read(8192)  # Read first 8KB

        # Reset file position
        await file.seek(0)

        # Check file size
        file_size = len(file_content)
        if hasattr(file, "size") and file.size:
            file_size = file.size
        else:
            # Estimate size by reading the rest
            remaining = await file.read()
            file_size = len(file_content) + len(remaining)
            await file.seek(0)

        if file_size > max_file_size:
            raise FileSizeError(
                f"File too large. File size: {file_size // (1024*1024)}MB, maximum: {max_file_size // (1024*1024)}MB",
                size=file_size,
                max_size=max_file_size,
            )

        if file_size == 0:
            raise FileSizeError(
                "Empty file not allowed",
                size=0,
                max_size=max_file_size,
            )

        return file_content, file_size

    async def validate_image_file(self, file: UploadFile) -> None:
        """
        Validate uploaded image by checking filename, extension, size, MIME type, and signature.

        Args:
            file: Uploaded file to validate.

        Raises:
            FilenameSecurityError: Filename is empty, invalid, or fails
                security checks.
            ExtensionSecurityError: File extension is not allowed or is
                blocked.
            FileSizeError: File size exceeds maximum or file is empty.
            MimeTypeError: MIME type is not in allowed image types.
            FileSignatureError: File signature doesn't match expected image
                format.
            FileProcessingError: Unexpected error during validation.
        """
        try:
            # Validate filename (raises exceptions on failure)
            self._validate_filename(file)

            # Validate file extension (raises exceptions on failure)
            self._validate_file_extension(file, self.config.ALLOWED_IMAGE_EXTENSIONS)

            # Validate file size (raises exceptions on failure, returns content and size on success)
            file_content, file_size = await self._validate_file_size(
                file, self.config.limits.max_image_size
            )

            # Detect MIME type
            filename = file.filename or "unknown"
            detected_mime = self._detect_mime_type(file_content, filename)

            if detected_mime not in self.config.ALLOWED_IMAGE_MIMES:
                raise MimeTypeError(
                    f"Invalid file type. Detected: {detected_mime}. Allowed: {', '.join(self.config.ALLOWED_IMAGE_MIMES)}",
                    filename=filename,
                    detected_mime=detected_mime,
                    allowed_mimes=list(self.config.ALLOWED_IMAGE_MIMES),
                )

            # Validate file signature (raises exceptions on failure)
            self._validate_file_signature(file_content, "image")

            logger.debug(
                "Image file validation passed: %s (%s, %s bytes)",
                filename,
                detected_mime,
                file_size,
            )
        except FileValidationError:
            # Let FileValidationError and subclasses propagate
            raise
        except Exception as err:
            logger.exception("Error during image file validation: %s", err)
            raise FileProcessingError(
                "File validation failed due to internal error",
                original_error=err,
            ) from err

    async def validate_zip_file(self, file: UploadFile) -> None:
        """
        Validate uploaded ZIP archive against service configuration.

        Args:
            file: Incoming ZIP file-like object to validate.

        Raises:
            FilenameSecurityError: Filename is empty, invalid, or fails
                security checks.
            ExtensionSecurityError: File extension is not allowed or is
                blocked.
            FileSizeError: File size exceeds maximum or file is empty.
            MimeTypeError: MIME type is not in allowed ZIP types.
            FileSignatureError: File signature doesn't match expected ZIP
                format.
            CompressionSecurityError: ZIP compression validation failed
                (zip bomb detected).
            FileProcessingError: Unexpected error during validation.
        """
        try:
            # Validate filename (raises exceptions on failure)
            self._validate_filename(file)

            # Validate file extension (raises exceptions on failure)
            self._validate_file_extension(file, self.config.ALLOWED_ZIP_EXTENSIONS)

            # Validate file size (raises exceptions on failure, returns content and size on success)
            file_content, file_size = await self._validate_file_size(
                file, self.config.limits.max_zip_size
            )

            # Detect MIME type using first 8KB
            filename = file.filename or "unknown"
            detected_mime = self._detect_mime_type(file_content, filename)

            # Validate ZIP file signature first (most reliable check)
            # This will raise FileSignatureError if signature doesn't match
            try:
                self._validate_file_signature(file_content, "zip")
            except FileSignatureError as err:
                # Re-raise with more specific message
                raise FileSignatureError(
                    "File content does not match ZIP format",
                    filename=filename,
                    expected_type="zip",
                ) from err

            # Check MIME type, but allow application/octet-stream if signature is valid
            # Some ZIP files are detected as octet-stream, but signature check ensures it's really a ZIP
            if detected_mime not in self.config.ALLOWED_ZIP_MIMES:
                if detected_mime == "application/octet-stream":
                    # Valid ZIP file, just detected as generic binary
                    logger.debug(
                        "ZIP file detected as application/octet-stream, but signature is valid: %s",
                        filename,
                    )
                else:
                    raise MimeTypeError(
                        f"Invalid file type. Detected: {detected_mime}. Expected ZIP file.",
                        filename=filename,
                        detected_mime=detected_mime,
                        allowed_mimes=list(self.config.ALLOWED_ZIP_MIMES),
                    )

            # For ZIP validation (compression ratio and content inspection), we need the full file
            # Read the entire file content for proper ZIP analysis
            await file.seek(0)
            full_file_content = await file.read()
            file_size = len(full_file_content)

            # Reset file position for any subsequent operations
            await file.seek(0)

            # Validate ZIP compression ratio to detect zip bombs
            if file_size is not None:
                self.compression_validator.validate_zip_compression_ratio(
                    full_file_content, file_size
                )

            # Perform ZIP content inspection if enabled
            if self.config.limits.scan_zip_content:
                self.zip_inspector.inspect_zip_content(full_file_content)

            logger.debug(
                "ZIP file validation passed: %s (%s, %s bytes)",
                filename,
                detected_mime,
                file_size,
            )
        except FileValidationError:
            # Let FileValidationError and subclasses propagate
            raise
        except Exception as err:
            logger.exception("Error during ZIP file validation: %s", err)
            raise FileProcessingError(
                "File validation failed due to internal error",
                original_error=err,
            ) from err

__init__

__init__(config=None)

Initialize file validator with configuration and detection utilities.

Parameters:

Name Type Description Default
config FileSecurityConfig | None

Optional configuration object defining file security rules. Defaults to new FileSecurityConfig instance.

None

Attributes:

Name Type Description
config

Active security configuration.

unicode_validator

Validator for Unicode-related checks.

extension_validator

Validator for file extension rules.

windows_validator

Validator enforcing Windows constraints.

compression_validator

Validator for compressed file limits.

zip_inspector

Inspector for ZIP archive contents.

magic_mime

MIME type detector based on python-magic.

magic_available

Whether python-magic initialized successfully.

Source code in safeuploads/file_validator.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, config: FileSecurityConfig | None = None):
    """
    Initialize file validator with configuration and detection utilities.

    Args:
        config: Optional configuration object defining file security
            rules. Defaults to new FileSecurityConfig instance.

    Attributes:
        config: Active security configuration.
        unicode_validator: Validator for Unicode-related checks.
        extension_validator: Validator for file extension rules.
        windows_validator: Validator enforcing Windows constraints.
        compression_validator: Validator for compressed file limits.
        zip_inspector: Inspector for ZIP archive contents.
        magic_mime: MIME type detector based on python-magic.
        magic_available: Whether python-magic initialized successfully.
    """
    self.config = config or FileSecurityConfig()

    # Initialize specialized validators
    self.unicode_validator = UnicodeSecurityValidator(self.config)
    self.extension_validator = ExtensionSecurityValidator(self.config)
    self.windows_validator = WindowsSecurityValidator(self.config)
    self.compression_validator = CompressionSecurityValidator(self.config)
    self.zip_inspector = ZipContentInspector(self.config)

    # Initialize python-magic for content-based detection
    try:
        self.magic_mime = magic.Magic(mime=True)
        self.magic_available = True
        logger.debug("File content detection (python-magic) initialized")
    except Exception as err:
        self.magic_available = False
        logger.warning(
            "python-magic not available for content detection: %s",
            err,
        )

validate_image_file async

validate_image_file(file)

Validate uploaded image by checking filename, extension, size, MIME type, and signature.

Parameters:

Name Type Description Default
file UploadFileProtocol

Uploaded file to validate.

required

Raises:

Type Description
FilenameSecurityError

Filename is empty, invalid, or fails security checks.

ExtensionSecurityError

File extension is not allowed or is blocked.

FileSizeError

File size exceeds maximum or file is empty.

MimeTypeError

MIME type is not in allowed image types.

FileSignatureError

File signature doesn't match expected image format.

FileProcessingError

Unexpected error during validation.

Source code in safeuploads/file_validator.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
async def validate_image_file(self, file: UploadFile) -> None:
    """
    Validate uploaded image by checking filename, extension, size, MIME type, and signature.

    Args:
        file: Uploaded file to validate.

    Raises:
        FilenameSecurityError: Filename is empty, invalid, or fails
            security checks.
        ExtensionSecurityError: File extension is not allowed or is
            blocked.
        FileSizeError: File size exceeds maximum or file is empty.
        MimeTypeError: MIME type is not in allowed image types.
        FileSignatureError: File signature doesn't match expected image
            format.
        FileProcessingError: Unexpected error during validation.
    """
    try:
        # Validate filename (raises exceptions on failure)
        self._validate_filename(file)

        # Validate file extension (raises exceptions on failure)
        self._validate_file_extension(file, self.config.ALLOWED_IMAGE_EXTENSIONS)

        # Validate file size (raises exceptions on failure, returns content and size on success)
        file_content, file_size = await self._validate_file_size(
            file, self.config.limits.max_image_size
        )

        # Detect MIME type
        filename = file.filename or "unknown"
        detected_mime = self._detect_mime_type(file_content, filename)

        if detected_mime not in self.config.ALLOWED_IMAGE_MIMES:
            raise MimeTypeError(
                f"Invalid file type. Detected: {detected_mime}. Allowed: {', '.join(self.config.ALLOWED_IMAGE_MIMES)}",
                filename=filename,
                detected_mime=detected_mime,
                allowed_mimes=list(self.config.ALLOWED_IMAGE_MIMES),
            )

        # Validate file signature (raises exceptions on failure)
        self._validate_file_signature(file_content, "image")

        logger.debug(
            "Image file validation passed: %s (%s, %s bytes)",
            filename,
            detected_mime,
            file_size,
        )
    except FileValidationError:
        # Let FileValidationError and subclasses propagate
        raise
    except Exception as err:
        logger.exception("Error during image file validation: %s", err)
        raise FileProcessingError(
            "File validation failed due to internal error",
            original_error=err,
        ) from err

validate_zip_file async

validate_zip_file(file)

Validate uploaded ZIP archive against service configuration.

Parameters:

Name Type Description Default
file UploadFileProtocol

Incoming ZIP file-like object to validate.

required

Raises:

Type Description
FilenameSecurityError

Filename is empty, invalid, or fails security checks.

ExtensionSecurityError

File extension is not allowed or is blocked.

FileSizeError

File size exceeds maximum or file is empty.

MimeTypeError

MIME type is not in allowed ZIP types.

FileSignatureError

File signature doesn't match expected ZIP format.

CompressionSecurityError

ZIP compression validation failed (zip bomb detected).

FileProcessingError

Unexpected error during validation.

Source code in safeuploads/file_validator.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
async def validate_zip_file(self, file: UploadFile) -> None:
    """
    Validate uploaded ZIP archive against service configuration.

    Args:
        file: Incoming ZIP file-like object to validate.

    Raises:
        FilenameSecurityError: Filename is empty, invalid, or fails
            security checks.
        ExtensionSecurityError: File extension is not allowed or is
            blocked.
        FileSizeError: File size exceeds maximum or file is empty.
        MimeTypeError: MIME type is not in allowed ZIP types.
        FileSignatureError: File signature doesn't match expected ZIP
            format.
        CompressionSecurityError: ZIP compression validation failed
            (zip bomb detected).
        FileProcessingError: Unexpected error during validation.
    """
    try:
        # Validate filename (raises exceptions on failure)
        self._validate_filename(file)

        # Validate file extension (raises exceptions on failure)
        self._validate_file_extension(file, self.config.ALLOWED_ZIP_EXTENSIONS)

        # Validate file size (raises exceptions on failure, returns content and size on success)
        file_content, file_size = await self._validate_file_size(
            file, self.config.limits.max_zip_size
        )

        # Detect MIME type using first 8KB
        filename = file.filename or "unknown"
        detected_mime = self._detect_mime_type(file_content, filename)

        # Validate ZIP file signature first (most reliable check)
        # This will raise FileSignatureError if signature doesn't match
        try:
            self._validate_file_signature(file_content, "zip")
        except FileSignatureError as err:
            # Re-raise with more specific message
            raise FileSignatureError(
                "File content does not match ZIP format",
                filename=filename,
                expected_type="zip",
            ) from err

        # Check MIME type, but allow application/octet-stream if signature is valid
        # Some ZIP files are detected as octet-stream, but signature check ensures it's really a ZIP
        if detected_mime not in self.config.ALLOWED_ZIP_MIMES:
            if detected_mime == "application/octet-stream":
                # Valid ZIP file, just detected as generic binary
                logger.debug(
                    "ZIP file detected as application/octet-stream, but signature is valid: %s",
                    filename,
                )
            else:
                raise MimeTypeError(
                    f"Invalid file type. Detected: {detected_mime}. Expected ZIP file.",
                    filename=filename,
                    detected_mime=detected_mime,
                    allowed_mimes=list(self.config.ALLOWED_ZIP_MIMES),
                )

        # For ZIP validation (compression ratio and content inspection), we need the full file
        # Read the entire file content for proper ZIP analysis
        await file.seek(0)
        full_file_content = await file.read()
        file_size = len(full_file_content)

        # Reset file position for any subsequent operations
        await file.seek(0)

        # Validate ZIP compression ratio to detect zip bombs
        if file_size is not None:
            self.compression_validator.validate_zip_compression_ratio(
                full_file_content, file_size
            )

        # Perform ZIP content inspection if enabled
        if self.config.limits.scan_zip_content:
            self.zip_inspector.inspect_zip_content(full_file_content)

        logger.debug(
            "ZIP file validation passed: %s (%s, %s bytes)",
            filename,
            detected_mime,
            file_size,
        )
    except FileValidationError:
        # Let FileValidationError and subclasses propagate
        raise
    except Exception as err:
        logger.exception("Error during ZIP file validation: %s", err)
        raise FileProcessingError(
            "File validation failed due to internal error",
            original_error=err,
        ) from err

FilenameSecurityError

Bases: FileValidationError

Filename failed security checks.

Source code in safeuploads/exceptions.py
177
178
179
180
class FilenameSecurityError(FileValidationError):
    """Filename failed security checks."""

    pass

MimeTypeError

Bases: FileValidationError

File MIME type not allowed or mismatches extension.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename with MIME type issue.

None
detected_mime str | None

Optional detected MIME type string.

None
allowed_mimes list[str] | None

Optional list of allowed MIME types.

None
error_code str | None

Optional error code (defaults to MIME_TYPE_INVALID).

None

Attributes:

Name Type Description
detected_mime

The detected MIME type string.

allowed_mimes

List of allowed MIME types.

Source code in safeuploads/exceptions.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class MimeTypeError(FileValidationError):
    """
    File MIME type not allowed or mismatches extension.

    Args:
        message: Human-readable error description.
        filename: Optional filename with MIME type issue.
        detected_mime: Optional detected MIME type string.
        allowed_mimes: Optional list of allowed MIME types.
        error_code: Optional error code (defaults to
            MIME_TYPE_INVALID).

    Attributes:
        detected_mime: The detected MIME type string.
        allowed_mimes: List of allowed MIME types.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        detected_mime: str | None = None,
        allowed_mimes: list[str] | None = None,
        error_code: str | None = None,
    ):
        self.detected_mime = detected_mime
        self.allowed_mimes = allowed_mimes or []
        super().__init__(
            message,
            filename=filename,
            error_code=error_code or ErrorCode.MIME_TYPE_INVALID,
        )

SecurityLimits dataclass

Security constraints for file submissions.

Attributes:

Name Type Description
max_image_size int

Maximum size in bytes for image files.

max_zip_size int

Maximum size in bytes for ZIP archives.

max_compression_ratio int

Maximum expansion ratio for ZIP files.

max_uncompressed_size int

Maximum cumulative size of ZIP contents.

max_individual_file_size int

Maximum size of single file in ZIP.

max_zip_entries int

Maximum number of file entries in ZIP.

zip_analysis_timeout float

Maximum seconds for ZIP analysis.

max_zip_depth int

Maximum directory nesting depth in ZIP.

max_filename_length int

Maximum length for filenames in ZIP.

max_path_length int

Maximum length for full paths in ZIP.

allow_nested_archives bool

Whether nested archives are permitted.

allow_symlinks bool

Whether symbolic links are permitted.

allow_absolute_paths bool

Whether absolute paths are permitted.

scan_zip_content bool

Whether deep content inspection is enabled.

Source code in safeuploads/config.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class SecurityLimits:
    """
    Security constraints for file submissions.

    Attributes:
        max_image_size: Maximum size in bytes for image files.
        max_zip_size: Maximum size in bytes for ZIP archives.
        max_compression_ratio: Maximum expansion ratio for ZIP files.
        max_uncompressed_size: Maximum cumulative size of ZIP contents.
        max_individual_file_size: Maximum size of single file in ZIP.
        max_zip_entries: Maximum number of file entries in ZIP.
        zip_analysis_timeout: Maximum seconds for ZIP analysis.
        max_zip_depth: Maximum directory nesting depth in ZIP.
        max_filename_length: Maximum length for filenames in ZIP.
        max_path_length: Maximum length for full paths in ZIP.
        allow_nested_archives: Whether nested archives are permitted.
        allow_symlinks: Whether symbolic links are permitted.
        allow_absolute_paths: Whether absolute paths are permitted.
        scan_zip_content: Whether deep content inspection is enabled.
    """

    # File size limits (in bytes)
    max_image_size: int = 20 * 1024 * 1024  # 20MB for images
    max_zip_size: int = 500 * 1024 * 1024  # 500MB for ZIP files

    # ZIP compression security settings
    max_compression_ratio: int = 100  # Maximum allowed expansion ratio (e.g., 100:1)
    max_uncompressed_size: int = 1024 * 1024 * 1024  # 1GB max uncompressed size
    max_individual_file_size: int = (
        500 * 1024 * 1024
    )  # 500MB max per individual file in ZIP
    max_zip_entries: int = 10000  # Maximum number of files in ZIP archive
    zip_analysis_timeout: float = (
        5.0  # Maximum seconds to spend analyzing ZIP structure
    )

    # ZIP content inspection settings
    max_zip_depth: int = 10  # Maximum nesting depth for directories in ZIP
    max_filename_length: int = 255  # Maximum length for individual file names
    max_path_length: int = 1024  # Maximum length for full file paths
    max_number_files_same_type: int = 1000  # Maximum number of files of the same type
    allow_nested_archives: bool = False  # Whether to allow nested archive files
    allow_symlinks: bool = False  # Whether to allow symbolic links in ZIP
    allow_absolute_paths: bool = False  # Whether to allow absolute paths in ZIP
    scan_zip_content: bool = True  # Whether to perform deep content inspection

SuspiciousFilePattern

Bases: Enum

Categorized patterns used to flag potentially malicious uploads.

Attributes:

Name Type Description
DIRECTORY_TRAVERSAL

Directory traversal attack patterns.

SUSPICIOUS_NAMES

Suspicious filename patterns.

EXECUTABLE_SIGNATURES

Dangerous file content signatures.

SUSPICIOUS_PATHS

Suspicious path components.

Source code in safeuploads/enums.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
class SuspiciousFilePattern(Enum):
    """
    Categorized patterns used to flag potentially malicious uploads.

    Attributes:
        DIRECTORY_TRAVERSAL: Directory traversal attack patterns.
        SUSPICIOUS_NAMES: Suspicious filename patterns.
        EXECUTABLE_SIGNATURES: Dangerous file content signatures.
        SUSPICIOUS_PATHS: Suspicious path components.
    """

    # Directory traversal attack patterns
    DIRECTORY_TRAVERSAL = {
        "../",
        "..\\",
        ".../",
        "...\\",
        "....//",
        "....\\\\",
        "%2e%2e%2f",
        "%2e%2e%5c",  # URL encoded ../ and ..\
        "%252e%252e%252f",
        "%252e%252e%255c",  # Double URL encoded
    }

    # Suspicious filename patterns
    SUSPICIOUS_NAMES = {
        # Windows system files that shouldn't be in user uploads
        "autorun.inf",
        "desktop.ini",
        "thumbs.db",
        ".ds_store",
        # Common malware names
        "install.exe",
        "setup.exe",
        "update.exe",
        "patch.exe",
        "crack.exe",
        "keygen.exe",
        "loader.exe",
        "activator.exe",
        # Hidden or system-like files
        ".htaccess",
        ".htpasswd",
        "web.config",
        "robots.txt",
    }

    # Dangerous file content signatures (magic bytes)
    EXECUTABLE_SIGNATURES = {
        # Windows PE executables
        b"MZ",
        b"PE\x00\x00",
        # ELF executables (Linux)
        b"\x7fELF",
        # Mach-O executables (macOS)
        b"\xfe\xed\xfa\xce",
        b"\xfe\xed\xfa\xcf",
        b"\xce\xfa\xed\xfe",
        b"\xcf\xfa\xed\xfe",
        # Java class files
        b"\xca\xfe\xba\xbe",
        # Windows shortcuts (.lnk)
        b"L\x00\x00\x00",
    }

    # Suspicious path components
    SUSPICIOUS_PATHS = {
        # Windows system directories
        "windows/",
        "system32/",
        "syswow64/",
        "programfiles/",
        # Unix system directories
        "/bin/",
        "/sbin/",
        "/usr/bin/",
        "/usr/sbin/",
        "/etc/",
        # Web server directories
        "cgi-bin/",
        "htdocs/",
        "www/",
        "wwwroot/",
        # Development/build directories
        ".git/",
        ".svn/",
        "node_modules/",
        "__pycache__/",
    }

UnicodeAttackCategory

Bases: Enum

Categorized Unicode code points used in obfuscation attacks.

Attributes:

Name Type Description
DIRECTIONAL_OVERRIDES

Right-to-left and directional controls.

ZERO_WIDTH_CHARACTERS

Zero-width and invisible characters.

LANGUAGE_MARKS

Language and format specific characters.

CONFUSING_PUNCTUATION

Punctuation that can disguise extensions.

Source code in safeuploads/enums.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
class UnicodeAttackCategory(Enum):
    """
    Categorized Unicode code points used in obfuscation attacks.

    Attributes:
        DIRECTIONAL_OVERRIDES: Right-to-left and directional controls.
        ZERO_WIDTH_CHARACTERS: Zero-width and invisible characters.
        LANGUAGE_MARKS: Language and format specific characters.
        CONFUSING_PUNCTUATION: Punctuation that can disguise extensions.
    """

    # Right-to-Left and directional override characters
    DIRECTIONAL_OVERRIDES = {
        0x202E,  # U+202E RIGHT-TO-LEFT OVERRIDE
        0x202D,  # U+202D LEFT-TO-RIGHT OVERRIDE
        0x202A,  # U+202A LEFT-TO-RIGHT EMBEDDING
        0x202B,  # U+202B RIGHT-TO-LEFT EMBEDDING
        0x202C,  # U+202C POP DIRECTIONAL FORMATTING
        0x2066,  # U+2066 LEFT-TO-RIGHT ISOLATE
        0x2067,  # U+2067 RIGHT-TO-LEFT ISOLATE
        0x2068,  # U+2068 FIRST STRONG ISOLATE
        0x2069,  # U+2069 POP DIRECTIONAL ISOLATE
    }

    # Zero-width and invisible characters
    ZERO_WIDTH_CHARACTERS = {
        0x200B,  # U+200B ZERO WIDTH SPACE
        0x200C,  # U+200C ZERO WIDTH NON-JOINER
        0x200D,  # U+200D ZERO WIDTH JOINER
        0x2060,  # U+2060 WORD JOINER
        0xFEFF,  # U+FEFF ZERO WIDTH NO-BREAK SPACE (BOM)
        0x034F,  # U+034F COMBINING GRAPHEME JOINER
    }

    # Language and format specific characters
    LANGUAGE_MARKS = {
        0x061C,  # U+061C ARABIC LETTER MARK
        0x180E,  # U+180E MONGOLIAN VOWEL SEPARATOR
    }

    # Confusing punctuation that can disguise extensions
    CONFUSING_PUNCTUATION = {
        0x2024,  # U+2024 ONE DOT LEADER
        0x2025,  # U+2025 TWO DOT LEADER
        0x2026,  # U+2026 HORIZONTAL ELLIPSIS
        0xFF0E,  # U+FF0E FULLWIDTH FULL STOP
    }

UnicodeSecurityError

Bases: FilenameSecurityError

Dangerous Unicode characters detected in filename.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename containing dangerous Unicode.

None
dangerous_chars list[tuple[str, int, int]] | None

Optional list of (char, code_point, position) tuples for each dangerous character found.

None

Attributes:

Name Type Description
dangerous_chars

List of dangerous character tuples.

Source code in safeuploads/exceptions.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class UnicodeSecurityError(FilenameSecurityError):
    """
    Dangerous Unicode characters detected in filename.

    Args:
        message: Human-readable error description.
        filename: Optional filename containing dangerous Unicode.
        dangerous_chars: Optional list of (char, code_point, position)
            tuples for each dangerous character found.

    Attributes:
        dangerous_chars: List of dangerous character tuples.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        dangerous_chars: list[tuple[str, int, int]] | None = None,
    ):
        self.dangerous_chars = dangerous_chars or []
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.UNICODE_DANGEROUS_CHARS,
        )

UnicodeSecurityValidator

Bases: BaseValidator

Validates filenames for Unicode security threats.

Attributes:

Name Type Description
config

Runtime configuration for file security rules.

Source code in safeuploads/validators/unicode_validator.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class UnicodeSecurityValidator(BaseValidator):
    """
    Validates filenames for Unicode security threats.

    Attributes:
        config: Runtime configuration for file security rules.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the Unicode validator.

        Args:
            config: Runtime configuration that controls file security rules.
        """
        super().__init__(config)

    def validate_unicode_security(self, filename: str) -> str:
        """
        Validate filename for unsafe Unicode characters.

        Args:
            filename: The filename to validate and normalize.

        Returns:
            The NFC-normalized filename.

        Raises:
            UnicodeSecurityError: If dangerous Unicode characters are
                detected in the filename or result from normalization.
        """
        if not filename:
            return filename

        # Check for dangerous Unicode characters
        dangerous_chars_found = []
        for i, char in enumerate(filename):
            char_code = ord(char)
            if char_code in self.config.DANGEROUS_UNICODE_CHARS:
                dangerous_chars_found.append((char, char_code, i))

        # If dangerous characters found, reject the file entirely
        if dangerous_chars_found:
            char_details = []
            for char, code, pos in dangerous_chars_found:
                char_name = unicodedata.name(char, f"U+{code:04X}")
                char_details.append(
                    f"'{char}' (U+{code:04X}: {char_name}) at position {pos}"
                )

            logger.warning(
                "Dangerous Unicode characters detected",
                extra={
                    "error_type": "unicode_security",
                    "file_name": filename,
                    "char_codes": [code for _, code, _ in dangerous_chars_found],
                    "positions": [pos for _, _, pos in dangerous_chars_found],
                },
            )
            raise UnicodeSecurityError(
                message=f"Dangerous Unicode characters detected in filename: {', '.join(char_details)}. "
                f"These characters can be used to disguise file extensions or create security vulnerabilities.",
                filename=filename,
                dangerous_chars=dangerous_chars_found,
            )

        # Normalize Unicode to prevent normalization attacks
        # Use NFC (Canonical Decomposition, followed by Canonical Composition)
        # This prevents attacks where different Unicode representations of the same text are used
        normalized_filename = unicodedata.normalize("NFC", filename)

        # Check if normalization changed the filename significantly
        if normalized_filename != filename:
            logger.info(
                "Unicode normalization applied: '%s' -> '%s'",
                filename,
                normalized_filename,
            )

        # Additional check: ensure normalized filename doesn't contain dangerous chars
        # (some normalization attacks might introduce them)
        for char in normalized_filename:
            char_code = ord(char)
            if char_code in self.config.DANGEROUS_UNICODE_CHARS:
                char_name = unicodedata.name(char, f"U+{char_code:04X}")
                logger.error(
                    "Unicode normalization resulted in dangerous character",
                    extra={
                        "error_type": "unicode_normalization_error",
                        "file_name": filename,
                        "normalized_filename": normalized_filename,
                        "char_code": char_code,
                    },
                )
                raise UnicodeSecurityError(
                    message=f"Unicode normalization resulted in dangerous character: "
                    f"'{char}' (U+{char_code:04X}: {char_name})",
                    filename=filename,
                    dangerous_chars=[(char, char_code, 0)],
                )

        return normalized_filename

    def validate(self, filename: str) -> str:
        """
        Validate a filename for Unicode security issues.

        Args:
            filename: The name of the file to assess.

        Returns:
            The validated and normalized filename.
        """
        return self.validate_unicode_security(filename)

__init__

__init__(config)

Initialize the Unicode validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

Runtime configuration that controls file security rules.

required
Source code in safeuploads/validators/unicode_validator.py
27
28
29
30
31
32
33
34
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the Unicode validator.

    Args:
        config: Runtime configuration that controls file security rules.
    """
    super().__init__(config)

validate

validate(filename)

Validate a filename for Unicode security issues.

Parameters:

Name Type Description Default
filename str

The name of the file to assess.

required

Returns:

Type Description
str

The validated and normalized filename.

Source code in safeuploads/validators/unicode_validator.py
122
123
124
125
126
127
128
129
130
131
132
def validate(self, filename: str) -> str:
    """
    Validate a filename for Unicode security issues.

    Args:
        filename: The name of the file to assess.

    Returns:
        The validated and normalized filename.
    """
    return self.validate_unicode_security(filename)

validate_unicode_security

validate_unicode_security(filename)

Validate filename for unsafe Unicode characters.

Parameters:

Name Type Description Default
filename str

The filename to validate and normalize.

required

Returns:

Type Description
str

The NFC-normalized filename.

Raises:

Type Description
UnicodeSecurityError

If dangerous Unicode characters are detected in the filename or result from normalization.

Source code in safeuploads/validators/unicode_validator.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def validate_unicode_security(self, filename: str) -> str:
    """
    Validate filename for unsafe Unicode characters.

    Args:
        filename: The filename to validate and normalize.

    Returns:
        The NFC-normalized filename.

    Raises:
        UnicodeSecurityError: If dangerous Unicode characters are
            detected in the filename or result from normalization.
    """
    if not filename:
        return filename

    # Check for dangerous Unicode characters
    dangerous_chars_found = []
    for i, char in enumerate(filename):
        char_code = ord(char)
        if char_code in self.config.DANGEROUS_UNICODE_CHARS:
            dangerous_chars_found.append((char, char_code, i))

    # If dangerous characters found, reject the file entirely
    if dangerous_chars_found:
        char_details = []
        for char, code, pos in dangerous_chars_found:
            char_name = unicodedata.name(char, f"U+{code:04X}")
            char_details.append(
                f"'{char}' (U+{code:04X}: {char_name}) at position {pos}"
            )

        logger.warning(
            "Dangerous Unicode characters detected",
            extra={
                "error_type": "unicode_security",
                "file_name": filename,
                "char_codes": [code for _, code, _ in dangerous_chars_found],
                "positions": [pos for _, _, pos in dangerous_chars_found],
            },
        )
        raise UnicodeSecurityError(
            message=f"Dangerous Unicode characters detected in filename: {', '.join(char_details)}. "
            f"These characters can be used to disguise file extensions or create security vulnerabilities.",
            filename=filename,
            dangerous_chars=dangerous_chars_found,
        )

    # Normalize Unicode to prevent normalization attacks
    # Use NFC (Canonical Decomposition, followed by Canonical Composition)
    # This prevents attacks where different Unicode representations of the same text are used
    normalized_filename = unicodedata.normalize("NFC", filename)

    # Check if normalization changed the filename significantly
    if normalized_filename != filename:
        logger.info(
            "Unicode normalization applied: '%s' -> '%s'",
            filename,
            normalized_filename,
        )

    # Additional check: ensure normalized filename doesn't contain dangerous chars
    # (some normalization attacks might introduce them)
    for char in normalized_filename:
        char_code = ord(char)
        if char_code in self.config.DANGEROUS_UNICODE_CHARS:
            char_name = unicodedata.name(char, f"U+{char_code:04X}")
            logger.error(
                "Unicode normalization resulted in dangerous character",
                extra={
                    "error_type": "unicode_normalization_error",
                    "file_name": filename,
                    "normalized_filename": normalized_filename,
                    "char_code": char_code,
                },
            )
            raise UnicodeSecurityError(
                message=f"Unicode normalization resulted in dangerous character: "
                f"'{char}' (U+{char_code:04X}: {char_name})",
                filename=filename,
                dangerous_chars=[(char, char_code, 0)],
            )

    return normalized_filename

WindowsReservedNameError

Bases: FilenameSecurityError

Windows reserved device name used.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename using a reserved name.

None
reserved_name str | None

Optional specific reserved name detected.

None

Attributes:

Name Type Description
reserved_name

The specific reserved name that was detected.

Source code in safeuploads/exceptions.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class WindowsReservedNameError(FilenameSecurityError):
    """
    Windows reserved device name used.

    Args:
        message: Human-readable error description.
        filename: Optional filename using a reserved name.
        reserved_name: Optional specific reserved name detected.

    Attributes:
        reserved_name: The specific reserved name that was detected.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        reserved_name: str | None = None,
    ):
        self.reserved_name = reserved_name
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.WINDOWS_RESERVED_NAME,
        )

WindowsSecurityValidator

Bases: BaseValidator

Validator for Windows reserved device names.

Attributes:

Name Type Description
config

File security configuration settings.

Source code in safeuploads/validators/windows_validator.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class WindowsSecurityValidator(BaseValidator):
    """
    Validator for Windows reserved device names.

    Attributes:
        config: File security configuration settings.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the validator.

        Args:
            config: File security configuration settings.
        """
        super().__init__(config)

    def validate_windows_reserved_names(self, filename: str) -> None:
        """
        Validate filename against Windows reserved device names.

        Args:
            filename: The filename to validate.

        Raises:
            WindowsReservedNameError: If filename matches a Windows
                reserved device name.
        """
        # Check iteratively by removing extensions to handle compound extensions
        # e.g., "CON.tar.gz" -> check "con.tar" and "con"
        current_name = filename

        while current_name:
            # Get basename without extension
            name_without_ext, ext = os.path.splitext(current_name)

            # Normalize: lowercase, strip whitespace
            name_to_check = name_without_ext.lower().strip()
            # Remove leading dots to handle hidden files like ".CON.jpg"
            name_to_check = name_to_check.lstrip(".")
            # Remove trailing dots to handle cases like "con." or "con.."
            name_to_check = name_to_check.rstrip(".")

            if name_to_check in self.config.WINDOWS_RESERVED_NAMES:
                logger.warning(
                    "Windows reserved name detected",
                    extra={
                        "error_type": "windows_reserved_name",
                        "file_name": filename,
                        "reserved_name": name_to_check.upper(),
                    },
                )
                raise WindowsReservedNameError(
                    message=f"Filename '{filename}' uses Windows reserved name '{name_to_check.upper()}'. "
                    f"Reserved names: {', '.join(sorted(self.config.WINDOWS_RESERVED_NAMES)).upper()}",
                    filename=filename,
                    reserved_name=name_to_check.upper(),
                )

            # If no extension was removed, we're done
            if not ext or name_without_ext == current_name:
                break

            current_name = name_without_ext

    def validate(self, filename: str) -> None:
        """
        Validate filename against Windows reserved naming rules.

        Args:
            filename: The filename to validate.

        Raises:
            WindowsReservedNameError: If filename matches a Windows
                reserved device name.
        """
        return self.validate_windows_reserved_names(filename)

__init__

__init__(config)

Initialize the validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration settings.

required
Source code in safeuploads/validators/windows_validator.py
27
28
29
30
31
32
33
34
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the validator.

    Args:
        config: File security configuration settings.
    """
    super().__init__(config)

validate

validate(filename)

Validate filename against Windows reserved naming rules.

Parameters:

Name Type Description Default
filename str

The filename to validate.

required

Raises:

Type Description
WindowsReservedNameError

If filename matches a Windows reserved device name.

Source code in safeuploads/validators/windows_validator.py
84
85
86
87
88
89
90
91
92
93
94
95
def validate(self, filename: str) -> None:
    """
    Validate filename against Windows reserved naming rules.

    Args:
        filename: The filename to validate.

    Raises:
        WindowsReservedNameError: If filename matches a Windows
            reserved device name.
    """
    return self.validate_windows_reserved_names(filename)

validate_windows_reserved_names

validate_windows_reserved_names(filename)

Validate filename against Windows reserved device names.

Parameters:

Name Type Description Default
filename str

The filename to validate.

required

Raises:

Type Description
WindowsReservedNameError

If filename matches a Windows reserved device name.

Source code in safeuploads/validators/windows_validator.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def validate_windows_reserved_names(self, filename: str) -> None:
    """
    Validate filename against Windows reserved device names.

    Args:
        filename: The filename to validate.

    Raises:
        WindowsReservedNameError: If filename matches a Windows
            reserved device name.
    """
    # Check iteratively by removing extensions to handle compound extensions
    # e.g., "CON.tar.gz" -> check "con.tar" and "con"
    current_name = filename

    while current_name:
        # Get basename without extension
        name_without_ext, ext = os.path.splitext(current_name)

        # Normalize: lowercase, strip whitespace
        name_to_check = name_without_ext.lower().strip()
        # Remove leading dots to handle hidden files like ".CON.jpg"
        name_to_check = name_to_check.lstrip(".")
        # Remove trailing dots to handle cases like "con." or "con.."
        name_to_check = name_to_check.rstrip(".")

        if name_to_check in self.config.WINDOWS_RESERVED_NAMES:
            logger.warning(
                "Windows reserved name detected",
                extra={
                    "error_type": "windows_reserved_name",
                    "file_name": filename,
                    "reserved_name": name_to_check.upper(),
                },
            )
            raise WindowsReservedNameError(
                message=f"Filename '{filename}' uses Windows reserved name '{name_to_check.upper()}'. "
                f"Reserved names: {', '.join(sorted(self.config.WINDOWS_RESERVED_NAMES)).upper()}",
                filename=filename,
                reserved_name=name_to_check.upper(),
            )

        # If no extension was removed, we're done
        if not ext or name_without_ext == current_name:
            break

        current_name = name_without_ext

ZipBombError

Bases: CompressionSecurityError

Zip archive exceeds compression ratio or uncompressed size limits.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename of zip bomb.

None
compression_ratio float | None

Optional actual compression ratio detected.

None
uncompressed_size int | None

Optional total uncompressed size in bytes.

None
max_ratio float | None

Optional maximum allowed compression ratio.

None
max_size int | None

Optional maximum allowed uncompressed size in bytes.

None

Attributes:

Name Type Description
compression_ratio

Actual compression ratio detected.

uncompressed_size

Total uncompressed size in bytes.

max_ratio

Maximum allowed compression ratio.

max_size

Maximum allowed uncompressed size in bytes.

Source code in safeuploads/exceptions.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
class ZipBombError(CompressionSecurityError):
    """
    Zip archive exceeds compression ratio or uncompressed size limits.

    Args:
        message: Human-readable error description.
        filename: Optional filename of zip bomb.
        compression_ratio: Optional actual compression ratio detected.
        uncompressed_size: Optional total uncompressed size in bytes.
        max_ratio: Optional maximum allowed compression ratio.
        max_size: Optional maximum allowed uncompressed size in bytes.

    Attributes:
        compression_ratio: Actual compression ratio detected.
        uncompressed_size: Total uncompressed size in bytes.
        max_ratio: Maximum allowed compression ratio.
        max_size: Maximum allowed uncompressed size in bytes.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        compression_ratio: float | None = None,
        uncompressed_size: int | None = None,
        max_ratio: float | None = None,
        max_size: int | None = None,
    ):
        self.compression_ratio = compression_ratio
        self.uncompressed_size = uncompressed_size
        self.max_ratio = max_ratio
        self.max_size = max_size
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.ZIP_BOMB_DETECTED,
        )

ZipContentError

Bases: CompressionSecurityError

Zip archive contains dangerous content or structure.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename of problematic archive.

None
threats list[str] | None

Optional list of detected threat descriptions.

None
error_code str | None

Optional error code (defaults to ZIP_CONTENT_THREAT).

None

Attributes:

Name Type Description
threats

List of detected threat descriptions.

Source code in safeuploads/exceptions.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
class ZipContentError(CompressionSecurityError):
    """
    Zip archive contains dangerous content or structure.

    Args:
        message: Human-readable error description.
        filename: Optional filename of problematic archive.
        threats: Optional list of detected threat descriptions.
        error_code: Optional error code (defaults to
            ZIP_CONTENT_THREAT).

    Attributes:
        threats: List of detected threat descriptions.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        threats: list[str] | None = None,
        error_code: str | None = None,
    ):
        self.threats = threats or []
        super().__init__(
            message,
            filename=filename,
            error_code=error_code or ErrorCode.ZIP_CONTENT_THREAT,
        )

ZipContentInspector

Inspects ZIP archive contents for security threats.

Attributes:

Name Type Description
config

File security configuration.

Source code in safeuploads/inspectors/zip_inspector.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
class ZipContentInspector:
    """
    Inspects ZIP archive contents for security threats.

    Attributes:
        config: File security configuration.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize ZIP inspector with configuration.

        Args:
            config: File security configuration.
        """
        self.config = config

    def inspect_zip_content(self, file_content: bytes) -> None:
        """
        Inspect ZIP archive for potential security threats.

        Args:
            file_content: Raw bytes of ZIP archive.

        Raises:
            ZipContentError: If security threats are detected in ZIP
                content such as directory traversal, symlinks, nested
                archives, or suspicious patterns.
            FileProcessingError: If ZIP structure is invalid or
                unexpected error occurs during inspection.
        """
        try:
            zip_bytes = io.BytesIO(file_content)
            threats_found = []

            # Start analysis timer
            start_time = time.time()

            with zipfile.ZipFile(zip_bytes, "r") as zip_file:
                zip_entries = zip_file.infolist()

                # Analyze each entry in the ZIP
                for entry in zip_entries:
                    # Check for timeout
                    if (
                        time.time() - start_time
                        > self.config.limits.zip_analysis_timeout
                    ):
                        logger.error(
                            "ZIP content inspection timeout",
                            extra={
                                "error_type": "zip_analysis_timeout",
                                "timeout": self.config.limits.zip_analysis_timeout,
                            },
                        )
                        raise ZipContentError(
                            message=f"ZIP content inspection timeout after {self.config.limits.zip_analysis_timeout}s",
                            threats=["Analysis timeout - potential zip bomb"],
                            error_code=ErrorCode.ZIP_ANALYSIS_TIMEOUT,
                        )

                    # Inspect individual entry
                    entry_threats = self._inspect_zip_entry(entry, zip_file)
                    threats_found.extend(entry_threats)

                # Check for ZIP structure threats
                structure_threats = self._inspect_zip_structure(zip_entries)
                threats_found.extend(structure_threats)

                # Return results
                if threats_found:
                    logger.warning(
                        "ZIP content threats detected",
                        extra={
                            "error_type": "zip_content_threat",
                            "threats": threats_found,
                            "threat_count": len(threats_found),
                        },
                    )
                    raise ZipContentError(
                        message=f"ZIP content threats detected: {'; '.join(threats_found)}",
                        threats=threats_found,
                    )

                logger.debug(
                    "ZIP content inspection passed: %s entries analyzed",
                    len(zip_entries),
                )

        except ZipContentError:
            # Re-raise our own exceptions
            raise
        except zipfile.BadZipFile as err:
            logger.error("Invalid or corrupted ZIP file structure", exc_info=True)
            raise FileProcessingError(
                message="Invalid or corrupted ZIP file structure",
                original_error=err,
            ) from err
        except Exception as err:
            logger.error(
                "Unexpected error during ZIP content inspection",
                exc_info=True,
            )
            raise FileProcessingError(
                message=f"ZIP content inspection failed: {str(err)}",
                original_error=err,
            ) from err

    def _inspect_zip_entry(
        self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
    ) -> list[str]:
        """
        Inspect single ZIP entry for security threats.

        Args:
            entry: ZIP entry metadata.
            zip_file: Parent ZIP archive.

        Returns:
            List of threat descriptions.
        """
        threats = []
        filename = entry.filename

        # 1. Check for directory traversal attacks
        if self._has_directory_traversal(filename):
            threats.append(f"Directory traversal attack in '{filename}'")

        # 2. Check for absolute paths
        if not self.config.limits.allow_absolute_paths and self._has_absolute_path(
            filename
        ):
            threats.append(f"Absolute path detected in '{filename}'")

        # 3. Check for symbolic links
        if not self.config.limits.allow_symlinks and self._is_symlink(entry):
            threats.append(f"Symbolic link detected: '{filename}'")

        # 4. Check filename length limits
        if len(os.path.basename(filename)) > self.config.limits.max_filename_length:
            threats.append(
                f"Filename too long: '{filename}' ({len(os.path.basename(filename))} chars)"
            )

        # 5. Check path length limits
        if len(filename) > self.config.limits.max_path_length:
            threats.append(f"Path too long: '{filename}' ({len(filename)} chars)")

        # 6. Check for suspicious filename patterns
        suspicious_patterns = self._check_suspicious_patterns(filename)
        threats.extend(suspicious_patterns)

        # 7. Check for nested archives
        if not self.config.limits.allow_nested_archives and self._is_nested_archive(
            filename
        ):
            threats.append(f"Nested archive detected: '{filename}'")

        # 8. Check file content if enabled and entry is small enough
        if (
            self.config.limits.scan_zip_content
            and not entry.is_dir()
            and entry.file_size < 1024 * 1024
        ):  # 1MB limit for content scan
            content_threats = self._inspect_entry_content(entry, zip_file)
            threats.extend(content_threats)

        return threats

    def _inspect_zip_structure(self, entries: list[zipfile.ZipInfo]) -> list[str]:
        """
        Inspect ZIP structure for anomalies.

        Args:
            entries: All ZIP entries to analyze.

        Returns:
            List of structural threat descriptions.
        """
        threats = []

        # Check directory depth
        max_depth = 0
        for entry in entries:
            depth = entry.filename.count("/") + entry.filename.count("\\")
            max_depth = max(max_depth, depth)

        if max_depth > self.config.limits.max_zip_depth:
            threats.append(
                f"Excessive directory depth: {max_depth} (max: {self.config.limits.max_zip_depth})"
            )

        # Check for suspicious file distribution
        file_types = {}
        for entry in entries:
            if not entry.is_dir():
                ext = os.path.splitext(entry.filename)[1].lower()
                file_types[ext] = file_types.get(ext, 0) + 1

        # Check for excessive number of same-type files (potential spam/bomb)
        for ext, count in file_types.items():
            if count > self.config.limits.max_number_files_same_type:
                threats.append(
                    f"Excessive number of {ext} files: {self.config.limits.max_number_files_same_type}"
                )

        return threats

    def _has_directory_traversal(self, filename: str) -> bool:
        """
        Check for directory traversal indicators.

        Args:
            filename: Filename to check.

        Returns:
            True if traversal detected.
        """
        filename_lower = filename.lower()

        for category in SuspiciousFilePattern:
            if category == SuspiciousFilePattern.DIRECTORY_TRAVERSAL:
                for pattern in category.value:
                    if pattern.lower() in filename_lower:
                        return True

        # Additional checks for normalized paths
        normalized = os.path.normpath(filename)
        if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
            return True

        return False

    def _has_absolute_path(self, filename: str) -> bool:
        """
        Check if filename is an absolute path.

        Args:
            filename: Path to check.

        Returns:
            True if absolute path detected.
        """
        return (
            filename.startswith("/")  # Unix absolute path
            or filename.startswith("\\")  # Windows UNC path
            or (len(filename) > 1 and filename[1] == ":")  # Windows drive path
        )

    def _is_symlink(self, entry: zipfile.ZipInfo) -> bool:
        """
        Check if entry is a symbolic link.

        Args:
            entry: ZIP entry to check.

        Returns:
            True if entry is a symlink.
        """
        # Check if entry has symlink attributes
        return (entry.external_attr >> 16) & 0o120000 == 0o120000

    def _check_suspicious_patterns(self, filename: str) -> list[str]:
        """
        Check filename for suspicious patterns.

        Args:
            filename: Filename to check.

        Returns:
            List of pattern warnings.
        """
        threats = []
        filename_lower = filename.lower()
        basename = os.path.basename(filename_lower)

        # Check suspicious names
        for pattern in SuspiciousFilePattern.SUSPICIOUS_NAMES.value:
            if basename == pattern.lower():
                threats.append(f"Suspicious filename pattern: '{filename}'")
                break

        # Check suspicious path components
        for pattern in SuspiciousFilePattern.SUSPICIOUS_PATHS.value:
            if pattern.lower() in filename_lower:
                threats.append(
                    f"Suspicious path component: '{filename}' contains '{pattern}'"
                )
                break

        return threats

    def _is_nested_archive(self, filename: str) -> bool:
        """
        Check if filename represents a nested archive.

        Args:
            filename: Filename to check.

        Returns:
            True if nested archive detected.
        """
        ext = os.path.splitext(filename)[1].lower()

        for category in ZipThreatCategory:
            if category == ZipThreatCategory.NESTED_ARCHIVES:
                return ext in category.value

        return False

    def _inspect_entry_content(
        self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
    ) -> list[str]:
        """
        Inspect ZIP entry content for malicious signatures.

        Args:
            entry: ZIP entry to inspect.
            zip_file: Parent ZIP archive.

        Returns:
            List of content threat descriptions.
        """
        threats = []

        try:
            # Read first few bytes to check for executable signatures
            with zip_file.open(entry, "r") as file:
                content_sample = file.read(512)  # Read first 512 bytes

                # Check for executable signatures
                for signature in SuspiciousFilePattern.EXECUTABLE_SIGNATURES.value:
                    if content_sample.startswith(signature):
                        threats.append(
                            f"Executable content detected in '{entry.filename}'"
                        )
                        break

                binary_exts = set()
                for category in BinaryFileCategory:
                    binary_exts.update(category.value)

                ext = os.path.splitext(entry.filename)[1].lower()
                if ext not in binary_exts:
                    # Check for script content patterns
                    if self._contains_script_patterns(content_sample, entry.filename):
                        threats.append(f"Script content detected in '{entry.filename}'")

        except Exception as err:
            logger.warning(
                "Could not inspect content of '%s': %s",
                entry.filename,
                err,
            )

        return threats

    def _contains_script_patterns(self, content: bytes, filename: str) -> bool:
        """
        Check content for malicious script patterns.

        Args:
            content: Raw bytes to inspect.
            filename: Filename for context.

        Returns:
            True if script patterns found.
        """
        try:
            # Try to decode as text
            text_content = content.decode("utf-8", errors="ignore").lower()

            # Check for common script patterns
            script_patterns = [
                "#!/bin/",
                "#!/usr/bin/",
                "powershell",
                "cmd.exe",
                "eval(",
                "exec(",
                "system(",
                "shell_exec(",
                "<script",
                "<?php",
                "<%",
                "import os",
                "import subprocess",
            ]

            for pattern in script_patterns:
                if pattern in text_content:
                    return True

        except Exception:
            # If we can't decode as text, it's probably binary
            pass

        return False

__init__

__init__(config)

Initialize ZIP inspector with configuration.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration.

required
Source code in safeuploads/inspectors/zip_inspector.py
30
31
32
33
34
35
36
37
def __init__(self, config: FileSecurityConfig):
    """
    Initialize ZIP inspector with configuration.

    Args:
        config: File security configuration.
    """
    self.config = config

inspect_zip_content

inspect_zip_content(file_content)

Inspect ZIP archive for potential security threats.

Parameters:

Name Type Description Default
file_content bytes

Raw bytes of ZIP archive.

required

Raises:

Type Description
ZipContentError

If security threats are detected in ZIP content such as directory traversal, symlinks, nested archives, or suspicious patterns.

FileProcessingError

If ZIP structure is invalid or unexpected error occurs during inspection.

Source code in safeuploads/inspectors/zip_inspector.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def inspect_zip_content(self, file_content: bytes) -> None:
    """
    Inspect ZIP archive for potential security threats.

    Args:
        file_content: Raw bytes of ZIP archive.

    Raises:
        ZipContentError: If security threats are detected in ZIP
            content such as directory traversal, symlinks, nested
            archives, or suspicious patterns.
        FileProcessingError: If ZIP structure is invalid or
            unexpected error occurs during inspection.
    """
    try:
        zip_bytes = io.BytesIO(file_content)
        threats_found = []

        # Start analysis timer
        start_time = time.time()

        with zipfile.ZipFile(zip_bytes, "r") as zip_file:
            zip_entries = zip_file.infolist()

            # Analyze each entry in the ZIP
            for entry in zip_entries:
                # Check for timeout
                if (
                    time.time() - start_time
                    > self.config.limits.zip_analysis_timeout
                ):
                    logger.error(
                        "ZIP content inspection timeout",
                        extra={
                            "error_type": "zip_analysis_timeout",
                            "timeout": self.config.limits.zip_analysis_timeout,
                        },
                    )
                    raise ZipContentError(
                        message=f"ZIP content inspection timeout after {self.config.limits.zip_analysis_timeout}s",
                        threats=["Analysis timeout - potential zip bomb"],
                        error_code=ErrorCode.ZIP_ANALYSIS_TIMEOUT,
                    )

                # Inspect individual entry
                entry_threats = self._inspect_zip_entry(entry, zip_file)
                threats_found.extend(entry_threats)

            # Check for ZIP structure threats
            structure_threats = self._inspect_zip_structure(zip_entries)
            threats_found.extend(structure_threats)

            # Return results
            if threats_found:
                logger.warning(
                    "ZIP content threats detected",
                    extra={
                        "error_type": "zip_content_threat",
                        "threats": threats_found,
                        "threat_count": len(threats_found),
                    },
                )
                raise ZipContentError(
                    message=f"ZIP content threats detected: {'; '.join(threats_found)}",
                    threats=threats_found,
                )

            logger.debug(
                "ZIP content inspection passed: %s entries analyzed",
                len(zip_entries),
            )

    except ZipContentError:
        # Re-raise our own exceptions
        raise
    except zipfile.BadZipFile as err:
        logger.error("Invalid or corrupted ZIP file structure", exc_info=True)
        raise FileProcessingError(
            message="Invalid or corrupted ZIP file structure",
            original_error=err,
        ) from err
    except Exception as err:
        logger.error(
            "Unexpected error during ZIP content inspection",
            exc_info=True,
        )
        raise FileProcessingError(
            message=f"ZIP content inspection failed: {str(err)}",
            original_error=err,
        ) from err

ZipThreatCategory

Bases: Enum

Categories of potentially harmful contents within ZIP archives.

Attributes:

Name Type Description
NESTED_ARCHIVES

Archive format threats.

EXECUTABLE_FILES

Executable content threats.

SCRIPT_FILES

Script and code threats.

SYSTEM_FILES

System and configuration threats.

Source code in safeuploads/enums.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
class ZipThreatCategory(Enum):
    """
    Categories of potentially harmful contents within ZIP archives.

    Attributes:
        NESTED_ARCHIVES: Archive format threats.
        EXECUTABLE_FILES: Executable content threats.
        SCRIPT_FILES: Script and code threats.
        SYSTEM_FILES: System and configuration threats.
    """

    # Archive format threats
    NESTED_ARCHIVES = {
        ".zip",
        ".rar",
        ".7z",
        ".tar",
        ".gz",
        ".bz2",
        ".xz",
        ".tar.gz",
        ".tar.bz2",
        ".tar.xz",
        ".tgz",
        ".tbz2",
    }

    # Executable content threats
    EXECUTABLE_FILES = {
        ".exe",
        ".com",
        ".bat",
        ".cmd",
        ".scr",
        ".pif",
        ".bin",
        ".run",
        ".app",
        ".deb",
        ".rpm",
        ".msi",
    }

    # Script and code threats
    SCRIPT_FILES = {
        ".js",
        ".vbs",
        ".ps1",
        ".sh",
        ".bash",
        ".py",
        ".php",
        ".pl",
        ".rb",
        ".lua",
        ".asp",
        ".jsp",
    }

    # System and configuration threats
    SYSTEM_FILES = {
        ".dll",
        ".so",
        ".dylib",
        ".sys",
        ".drv",
        ".inf",
        ".reg",
        ".cfg",
        ".conf",
        ".ini",
    }