Skip to content

API Reference

File Security Module.

A comprehensive file security system for validating uploads and preventing attacks.

AuditEvent dataclass

A structured security audit event.

Attributes:

Name Type Description
event_type AuditEventType

Category of the audit event.

correlation_id str

Unique ID linking related events.

filename str

Name of the file being validated.

result str

Outcome description.

details str

Additional context information.

duration_ms float

Elapsed time in milliseconds.

source_ip str | None

Optional client IP address.

timestamp float

Monotonic time when event was created.

Source code in safeuploads/audit.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@dataclass
class AuditEvent:
    """
    A structured security audit event.

    Attributes:
        event_type: Category of the audit event.
        correlation_id: Unique ID linking related events.
        filename: Name of the file being validated.
        result: Outcome description.
        details: Additional context information.
        duration_ms: Elapsed time in milliseconds.
        source_ip: Optional client IP address.
        timestamp: Monotonic time when event was created.
    """

    event_type: AuditEventType
    correlation_id: str
    filename: str = ""
    result: str = ""
    details: str = ""
    duration_ms: float = 0.0
    source_ip: str | None = None
    timestamp: float = field(default_factory=time.monotonic)

AuditEventType

Bases: Enum

Types of security audit events.

Attributes:

Name Type Description
VALIDATION_START

Validation process started.

VALIDATION_SUCCESS

Validation completed successfully.

VALIDATION_FAILURE

Validation failed with error.

THREAT_DETECTED

Security threat detected.

RESOURCE_LIMIT

Resource limit exceeded.

Source code in safeuploads/audit.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class AuditEventType(Enum):
    """
    Types of security audit events.

    Attributes:
        VALIDATION_START: Validation process started.
        VALIDATION_SUCCESS: Validation completed successfully.
        VALIDATION_FAILURE: Validation failed with error.
        THREAT_DETECTED: Security threat detected.
        RESOURCE_LIMIT: Resource limit exceeded.
    """

    VALIDATION_START = "validation_start"
    VALIDATION_SUCCESS = "validation_success"
    VALIDATION_FAILURE = "validation_failure"
    THREAT_DETECTED = "threat_detected"
    RESOURCE_LIMIT = "resource_limit"

BaseValidator

Bases: ABC

Abstract base class for file security validators.

Attributes:

Name Type Description
config

File security configuration parameters.

Source code in safeuploads/validators/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BaseValidator(ABC):
    """
    Abstract base class for file security validators.

    Attributes:
        config: File security configuration parameters.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize validator with configuration.

        Args:
            config: File security settings to apply.
        """
        self.config = config

    @abstractmethod
    def validate(self, *args, **kwargs) -> Any:
        """
        Validate data using subclass-specific logic.

        Args:
            *args: Positional arguments for concrete validator.
            **kwargs: Keyword arguments for concrete validator.

        Returns:
            Validated result defined by subclass.
        """

__init__

__init__(config)

Initialize validator with configuration.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security settings to apply.

required
Source code in safeuploads/validators/base.py
20
21
22
23
24
25
26
27
def __init__(self, config: FileSecurityConfig):
    """
    Initialize validator with configuration.

    Args:
        config: File security settings to apply.
    """
    self.config = config

validate abstractmethod

validate(*args, **kwargs)

Validate data using subclass-specific logic.

Parameters:

Name Type Description Default
*args

Positional arguments for concrete validator.

()
**kwargs

Keyword arguments for concrete validator.

{}

Returns:

Type Description
Any

Validated result defined by subclass.

Source code in safeuploads/validators/base.py
29
30
31
32
33
34
35
36
37
38
39
40
@abstractmethod
def validate(self, *args, **kwargs) -> Any:
    """
    Validate data using subclass-specific logic.

    Args:
        *args: Positional arguments for concrete validator.
        **kwargs: Keyword arguments for concrete validator.

    Returns:
        Validated result defined by subclass.
    """

BinaryFileCategory

Bases: Enum

Enumeration of binary file categories.

Attributes:

Name Type Description
FITNESS_FILES

Fitness/activity binary file extensions (e.g., '.fit').

Source code in safeuploads/enums.py
 6
 7
 8
 9
10
11
12
13
14
15
16
class BinaryFileCategory(Enum):
    """
    Enumeration of binary file categories.

    Attributes:
        FITNESS_FILES: Fitness/activity binary file
            extensions (e.g., '.fit').
    """

    # Fitness/activity binary files
    FITNESS_FILES = {".fit"}

CompoundExtensionCategory

Bases: Enum

Categorized compound file extensions that combine multiple suffixes.

Attributes:

Name Type Description
COMPRESSED_ARCHIVES

Multi-part archive formats.

JAVASCRIPT_VARIANTS

Specialized JavaScript files.

WEB_CONTENT

Minified static web assets.

Source code in safeuploads/enums.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
class CompoundExtensionCategory(Enum):
    """
    Categorized compound file extensions that combine multiple suffixes.

    Attributes:
        COMPRESSED_ARCHIVES: Multi-part archive formats.
        JAVASCRIPT_VARIANTS: Specialized JavaScript files.
        WEB_CONTENT: Minified static web assets.
    """

    # Compressed archive formats
    COMPRESSED_ARCHIVES = {
        ".tar.xz",
        ".tar.gz",
        ".tar.bz2",
        ".tar.lz",
        ".tar.lzma",
        ".tar.Z",
        ".tgz",
        ".tbz2",
    }

    # JavaScript related compound extensions
    JAVASCRIPT_VARIANTS = {".user.js", ".backup.js", ".min.js", ".worker.js"}

    # Web content compound extensions
    WEB_CONTENT = {".min.css", ".min.html"}

CompressionSecurityError

Bases: FileValidationError

Compressed file security check failed.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename of compressed file.

None
error_code str | None

Optional error code (defaults to COMPRESSION_GENERIC).

None
Source code in safeuploads/exceptions.py
382
383
384
385
386
387
388
389
390
391
392
393
394
class CompressionSecurityError(FileValidationError):
    """
    Compressed file security check failed.

    Args:
        message: Human-readable error description.
        filename: Optional filename of compressed file.
        error_code: Optional error code (defaults to
            COMPRESSION_GENERIC).

    Attributes:
        None beyond inherited FileValidationError attributes.
    """

CompressionSecurityValidator

Bases: BaseValidator

Validates ZIP uploads against zip bombs and compression attacks.

Attributes:

Name Type Description
config

Security configuration for validation limits.

Source code in safeuploads/validators/compression_validator.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
class CompressionSecurityValidator(BaseValidator):
    """
    Validates ZIP uploads against zip bombs and compression attacks.

    Attributes:
        config: Security configuration for validation limits.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the compression validator.

        Args:
            config: Security configuration with compression limits.
        """
        super().__init__(config)
        self._audit = SecurityAuditLogger(
            enabled=config.limits.enable_audit_logging
        )

    def validate_zip_compression_ratio(
        self, file_obj: SeekableFile, compressed_size: int
    ) -> None:
        """
        Validate ZIP archive against security limits.

        Args:
            file_obj: Seekable file-like object containing ZIP data.
            compressed_size: Size of the compressed archive in bytes.

        Raises:
            ZipBombError: If compression ratio exceeds maximum allowed
                or total uncompressed size is too large.
            CompressionSecurityError: If ZIP structure is invalid, too
                many entries, nested archives detected, or individual
                file too large.
            FileProcessingError: If unexpected error occurs during
                validation such as memory errors or I/O errors.
        """
        try:
            # Seek to start for zipfile analysis
            file_obj.seek(0)

            # Track analysis metrics
            total_uncompressed_size = 0
            total_compressed_size = compressed_size
            file_count = 0
            nested_archives = []
            max_compression_ratio = 0
            overall_compression_ratio = (
                0  # Initialize to avoid unbound variable
            )

            # Analyze ZIP file structure with timeout protection
            start_time = time.monotonic()

            with zipfile.ZipFile(file_obj, "r") as zip_file:
                # Check for excessive number of files
                zip_entries = zip_file.infolist()
                file_count = len(zip_entries)

                if file_count > self.config.limits.max_zip_entries:
                    logger.warning(
                        "ZIP contains too many files",
                        extra=log_extra(
                            {
                                "error_type": "zip_too_many_entries",
                                "file_count": file_count,
                                "max_entries": (
                                    self.config.limits.max_zip_entries
                                ),
                            }
                        ),
                    )
                    raise CompressionSecurityError(
                        message=(
                            "ZIP contains too many"
                            f" files: {file_count}."
                            " Maximum allowed:"
                            f" {self.config.limits.max_zip_entries}"
                        ),
                        error_code=ErrorCode.ZIP_TOO_MANY_ENTRIES,
                    )

                # Analyze each entry in the ZIP
                for entry in zip_entries:
                    # Check for timeout
                    if (
                        time.monotonic() - start_time
                        > self.config.limits.zip_analysis_timeout
                    ):
                        logger.error(
                            "ZIP analysis timeout",
                            extra=log_extra(
                                {
                                    "error_type": "zip_analysis_timeout",
                                    "timeout": (
                                        self.config.limits.zip_analysis_timeout
                                    ),
                                }
                            ),
                        )
                        raise ZipBombError(
                            message=(
                                "ZIP analysis timeout"
                                " after"
                                f" {self.config.limits.zip_analysis_timeout}s"
                                " - potential zip bomb"
                            ),
                            compression_ratio=0,
                        )

                    # Skip directories
                    if entry.is_dir():
                        continue

                    # Track uncompressed size
                    uncompressed_size = entry.file_size
                    compressed_size_entry = entry.compress_size
                    total_uncompressed_size += uncompressed_size

                    # Check individual file compression ratio
                    if compressed_size_entry > 0:  # Avoid division by zero
                        compression_ratio = (
                            uncompressed_size / compressed_size_entry
                        )
                        max_compression_ratio = max(
                            max_compression_ratio, compression_ratio
                        )

                        if (
                            compression_ratio
                            > self.config.limits.max_compression_ratio
                        ):
                            logger.error(
                                "Excessive compression ratio",
                                extra=log_extra(
                                    {
                                        "error_type": (
                                            "compression_ratio_exceeded"
                                        ),
                                        "file_name": entry.filename,
                                        "compression_ratio": (
                                            compression_ratio
                                        ),
                                        "max_ratio": (
                                            self.config.limits.max_compression_ratio
                                        ),
                                    }
                                ),
                            )
                            cid = get_correlation_id()
                            if cid:
                                self._audit.threat(
                                    entry.filename,
                                    cid,
                                    "Zip bomb — excessive compression ratio",
                                )
                            max_ratio = (
                                self.config.limits.max_compression_ratio
                            )
                            raise ZipBombError(
                                message=(
                                    "Excessive compression"
                                    " ratio detected:"
                                    f" {compression_ratio:.1f}:1"
                                    f" for '{entry.filename}'."
                                    " Maximum allowed:"
                                    f" {max_ratio}:1"
                                ),
                                compression_ratio=compression_ratio,
                            )

                    # Check for nested archive files
                    filename_lower = entry.filename.lower()
                    if any(
                        filename_lower.endswith(ext)
                        for ext in [
                            ".zip",
                            ".rar",
                            ".7z",
                            ".tar",
                            ".gz",
                            ".bz2",
                        ]
                    ):
                        nested_archives.append(entry.filename)

                    # Check for excessively large individual files
                    # Use the configurable max_individual_file_size limit
                    if (
                        uncompressed_size
                        > self.config.limits.max_individual_file_size
                    ):
                        logger.warning(
                            "Individual file too large",
                            extra=log_extra(
                                {
                                    "error_type": "file_too_large",
                                    "file_name": entry.filename,
                                    "size_mb": uncompressed_size
                                    // (1024 * 1024),
                                    "max_size_mb": (
                                        self.config.limits.max_individual_file_size
                                        // (1024 * 1024)
                                    ),
                                }
                            ),
                        )
                        max_file_mb = (
                            self.config.limits.max_individual_file_size
                            // (1024 * 1024)
                        )
                        raise CompressionSecurityError(
                            message=(
                                "Individual file too"
                                f" large: '{entry.filename}'"
                                " would expand to"
                                f" {uncompressed_size // (1024 * 1024)}MB."
                                " Maximum allowed:"
                                f" {max_file_mb}MB"
                            ),
                            error_code=ErrorCode.FILE_TOO_LARGE,
                        )

                # Check total uncompressed size
                if (
                    total_uncompressed_size
                    > self.config.limits.max_uncompressed_size
                ):
                    logger.warning(
                        "Total uncompressed size too large",
                        extra=log_extra(
                            {
                                "error_type": "zip_too_large",
                                "total_size_mb": total_uncompressed_size
                                // (1024 * 1024),
                                "max_size_mb": (
                                    self.config.limits.max_uncompressed_size
                                    // (1024 * 1024)
                                ),
                            }
                        ),
                    )
                    max_uncomp_mb = (
                        self.config.limits.max_uncompressed_size
                        // (1024 * 1024)
                    )
                    raise ZipBombError(
                        message=(
                            "Total uncompressed size"
                            " too large:"
                            f" {total_uncompressed_size // (1024 * 1024)}MB."
                            " Maximum allowed:"
                            f" {max_uncomp_mb}MB"
                        ),
                        compression_ratio=0,
                        uncompressed_size=total_uncompressed_size,
                        max_size=self.config.limits.max_uncompressed_size,
                    )

                # Check overall compression ratio
                if total_compressed_size > 0:
                    overall_compression_ratio = (
                        total_uncompressed_size / total_compressed_size
                    )
                    if (
                        overall_compression_ratio
                        > self.config.limits.max_compression_ratio
                    ):
                        logger.error(
                            "Overall compression ratio too high",
                            extra=log_extra(
                                {
                                    "error_type": (
                                        "compression_ratio_exceeded"
                                    ),
                                    "overall_ratio": (
                                        overall_compression_ratio
                                    ),
                                    "max_ratio": (
                                        self.config.limits.max_compression_ratio
                                    ),
                                }
                            ),
                        )
                        raise ZipBombError(
                            message=(
                                "Overall compression ratio"
                                " too high:"
                                f" {overall_compression_ratio:.1f}:1."
                                " Maximum allowed:"
                                f" {self.config.limits.max_compression_ratio}"
                                ":1"
                            ),
                            compression_ratio=(overall_compression_ratio),
                            max_ratio=(
                                self.config.limits.max_compression_ratio
                            ),
                        )

                # Reject nested archives (potential security risk)
                if (
                    nested_archives
                    and not self.config.limits.allow_nested_archives
                ):
                    logger.warning(
                        "Nested archives detected",
                        extra=log_extra(
                            {
                                "error_type": "zip_nested_archive",
                                "nested_archives": nested_archives,
                            }
                        ),
                    )
                    raise CompressionSecurityError(
                        message=(
                            "Nested archives are not"
                            " allowed:"
                            f" {', '.join(nested_archives)}"
                        ),
                        error_code=ErrorCode.ZIP_NESTED_ARCHIVE,
                    )

                # Cumulative entry count check for
                # complexity attack prevention
                max_recursive = self.config.limits.max_total_entries_recursive
                if file_count > max_recursive:
                    logger.error(
                        "ZIP entry count exceeds recursive limit",
                        extra=log_extra(
                            {
                                "file_count": file_count,
                                "max_recursive": max_recursive,
                            }
                        ),
                    )
                    raise CompressionSecurityError(
                        message=(
                            "ZIP entry count"
                            f" ({file_count})"
                            " exceeds recursive limit"
                            f" ({max_recursive})"
                        ),
                        error_code=(ErrorCode.ZIP_COMPLEXITY_ATTACK),
                    )

                # Log analysis results
                logger.debug(
                    "ZIP analysis: %s files, %sMB uncompressed,"
                    " max ratio: %.1f:1,"
                    " overall ratio: %.1f:1",
                    file_count,
                    total_uncompressed_size // (1024 * 1024),
                    max_compression_ratio,
                    overall_compression_ratio,
                )

        except zipfile.BadZipFile as err:
            logger.error("Invalid or corrupted ZIP file", exc_info=True)
            raise CompressionSecurityError(
                message="Invalid or corrupted ZIP file",
                error_code=ErrorCode.ZIP_CORRUPT,
            ) from err
        except zipfile.LargeZipFile as err:
            logger.error("ZIP file too large to process", exc_info=True)
            raise CompressionSecurityError(
                message="ZIP file too large to process safely",
                error_code=ErrorCode.ZIP_TOO_LARGE,
            ) from err
        except MemoryError as err:
            logger.error("ZIP requires excessive memory", exc_info=True)
            raise ZipBombError(
                message=(
                    "ZIP file requires too much"
                    " memory to process"
                    " - potential zip bomb"
                ),
                compression_ratio=0,
            ) from err
        except (ZipBombError, CompressionSecurityError):
            # Re-raise our own exceptions
            raise
        except Exception as err:
            logger.error(
                "Unexpected error during ZIP compression validation",
                exc_info=True,
            )
            raise FileProcessingError(
                message="ZIP validation failed due to an internal error",
            ) from err

    def validate(self, file_obj: SeekableFile, compressed_size: int) -> None:
        """
        Validate the compression ratio of a ZIP file.

        Args:
            file_obj: Seekable file-like object of the ZIP.
            compressed_size: Size of the file after compression
                in bytes.

        Raises:
            ZipBombError: If compression ratio exceeds maximum.
            CompressionSecurityError: If ZIP structure is invalid.
            FileProcessingError: If unexpected error occurs.
        """
        return self.validate_zip_compression_ratio(file_obj, compressed_size)

__init__

__init__(config)

Initialize the compression validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

Security configuration with compression limits.

required
Source code in safeuploads/validators/compression_validator.py
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the compression validator.

    Args:
        config: Security configuration with compression limits.
    """
    super().__init__(config)
    self._audit = SecurityAuditLogger(
        enabled=config.limits.enable_audit_logging
    )

validate

validate(file_obj, compressed_size)

Validate the compression ratio of a ZIP file.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file-like object of the ZIP.

required
compressed_size int

Size of the file after compression in bytes.

required

Raises:

Type Description
ZipBombError

If compression ratio exceeds maximum.

CompressionSecurityError

If ZIP structure is invalid.

FileProcessingError

If unexpected error occurs.

Source code in safeuploads/validators/compression_validator.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def validate(self, file_obj: SeekableFile, compressed_size: int) -> None:
    """
    Validate the compression ratio of a ZIP file.

    Args:
        file_obj: Seekable file-like object of the ZIP.
        compressed_size: Size of the file after compression
            in bytes.

    Raises:
        ZipBombError: If compression ratio exceeds maximum.
        CompressionSecurityError: If ZIP structure is invalid.
        FileProcessingError: If unexpected error occurs.
    """
    return self.validate_zip_compression_ratio(file_obj, compressed_size)

validate_zip_compression_ratio

validate_zip_compression_ratio(file_obj, compressed_size)

Validate ZIP archive against security limits.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file-like object containing ZIP data.

required
compressed_size int

Size of the compressed archive in bytes.

required

Raises:

Type Description
ZipBombError

If compression ratio exceeds maximum allowed or total uncompressed size is too large.

CompressionSecurityError

If ZIP structure is invalid, too many entries, nested archives detected, or individual file too large.

FileProcessingError

If unexpected error occurs during validation such as memory errors or I/O errors.

Source code in safeuploads/validators/compression_validator.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def validate_zip_compression_ratio(
    self, file_obj: SeekableFile, compressed_size: int
) -> None:
    """
    Validate ZIP archive against security limits.

    Args:
        file_obj: Seekable file-like object containing ZIP data.
        compressed_size: Size of the compressed archive in bytes.

    Raises:
        ZipBombError: If compression ratio exceeds maximum allowed
            or total uncompressed size is too large.
        CompressionSecurityError: If ZIP structure is invalid, too
            many entries, nested archives detected, or individual
            file too large.
        FileProcessingError: If unexpected error occurs during
            validation such as memory errors or I/O errors.
    """
    try:
        # Seek to start for zipfile analysis
        file_obj.seek(0)

        # Track analysis metrics
        total_uncompressed_size = 0
        total_compressed_size = compressed_size
        file_count = 0
        nested_archives = []
        max_compression_ratio = 0
        overall_compression_ratio = (
            0  # Initialize to avoid unbound variable
        )

        # Analyze ZIP file structure with timeout protection
        start_time = time.monotonic()

        with zipfile.ZipFile(file_obj, "r") as zip_file:
            # Check for excessive number of files
            zip_entries = zip_file.infolist()
            file_count = len(zip_entries)

            if file_count > self.config.limits.max_zip_entries:
                logger.warning(
                    "ZIP contains too many files",
                    extra=log_extra(
                        {
                            "error_type": "zip_too_many_entries",
                            "file_count": file_count,
                            "max_entries": (
                                self.config.limits.max_zip_entries
                            ),
                        }
                    ),
                )
                raise CompressionSecurityError(
                    message=(
                        "ZIP contains too many"
                        f" files: {file_count}."
                        " Maximum allowed:"
                        f" {self.config.limits.max_zip_entries}"
                    ),
                    error_code=ErrorCode.ZIP_TOO_MANY_ENTRIES,
                )

            # Analyze each entry in the ZIP
            for entry in zip_entries:
                # Check for timeout
                if (
                    time.monotonic() - start_time
                    > self.config.limits.zip_analysis_timeout
                ):
                    logger.error(
                        "ZIP analysis timeout",
                        extra=log_extra(
                            {
                                "error_type": "zip_analysis_timeout",
                                "timeout": (
                                    self.config.limits.zip_analysis_timeout
                                ),
                            }
                        ),
                    )
                    raise ZipBombError(
                        message=(
                            "ZIP analysis timeout"
                            " after"
                            f" {self.config.limits.zip_analysis_timeout}s"
                            " - potential zip bomb"
                        ),
                        compression_ratio=0,
                    )

                # Skip directories
                if entry.is_dir():
                    continue

                # Track uncompressed size
                uncompressed_size = entry.file_size
                compressed_size_entry = entry.compress_size
                total_uncompressed_size += uncompressed_size

                # Check individual file compression ratio
                if compressed_size_entry > 0:  # Avoid division by zero
                    compression_ratio = (
                        uncompressed_size / compressed_size_entry
                    )
                    max_compression_ratio = max(
                        max_compression_ratio, compression_ratio
                    )

                    if (
                        compression_ratio
                        > self.config.limits.max_compression_ratio
                    ):
                        logger.error(
                            "Excessive compression ratio",
                            extra=log_extra(
                                {
                                    "error_type": (
                                        "compression_ratio_exceeded"
                                    ),
                                    "file_name": entry.filename,
                                    "compression_ratio": (
                                        compression_ratio
                                    ),
                                    "max_ratio": (
                                        self.config.limits.max_compression_ratio
                                    ),
                                }
                            ),
                        )
                        cid = get_correlation_id()
                        if cid:
                            self._audit.threat(
                                entry.filename,
                                cid,
                                "Zip bomb — excessive compression ratio",
                            )
                        max_ratio = (
                            self.config.limits.max_compression_ratio
                        )
                        raise ZipBombError(
                            message=(
                                "Excessive compression"
                                " ratio detected:"
                                f" {compression_ratio:.1f}:1"
                                f" for '{entry.filename}'."
                                " Maximum allowed:"
                                f" {max_ratio}:1"
                            ),
                            compression_ratio=compression_ratio,
                        )

                # Check for nested archive files
                filename_lower = entry.filename.lower()
                if any(
                    filename_lower.endswith(ext)
                    for ext in [
                        ".zip",
                        ".rar",
                        ".7z",
                        ".tar",
                        ".gz",
                        ".bz2",
                    ]
                ):
                    nested_archives.append(entry.filename)

                # Check for excessively large individual files
                # Use the configurable max_individual_file_size limit
                if (
                    uncompressed_size
                    > self.config.limits.max_individual_file_size
                ):
                    logger.warning(
                        "Individual file too large",
                        extra=log_extra(
                            {
                                "error_type": "file_too_large",
                                "file_name": entry.filename,
                                "size_mb": uncompressed_size
                                // (1024 * 1024),
                                "max_size_mb": (
                                    self.config.limits.max_individual_file_size
                                    // (1024 * 1024)
                                ),
                            }
                        ),
                    )
                    max_file_mb = (
                        self.config.limits.max_individual_file_size
                        // (1024 * 1024)
                    )
                    raise CompressionSecurityError(
                        message=(
                            "Individual file too"
                            f" large: '{entry.filename}'"
                            " would expand to"
                            f" {uncompressed_size // (1024 * 1024)}MB."
                            " Maximum allowed:"
                            f" {max_file_mb}MB"
                        ),
                        error_code=ErrorCode.FILE_TOO_LARGE,
                    )

            # Check total uncompressed size
            if (
                total_uncompressed_size
                > self.config.limits.max_uncompressed_size
            ):
                logger.warning(
                    "Total uncompressed size too large",
                    extra=log_extra(
                        {
                            "error_type": "zip_too_large",
                            "total_size_mb": total_uncompressed_size
                            // (1024 * 1024),
                            "max_size_mb": (
                                self.config.limits.max_uncompressed_size
                                // (1024 * 1024)
                            ),
                        }
                    ),
                )
                max_uncomp_mb = (
                    self.config.limits.max_uncompressed_size
                    // (1024 * 1024)
                )
                raise ZipBombError(
                    message=(
                        "Total uncompressed size"
                        " too large:"
                        f" {total_uncompressed_size // (1024 * 1024)}MB."
                        " Maximum allowed:"
                        f" {max_uncomp_mb}MB"
                    ),
                    compression_ratio=0,
                    uncompressed_size=total_uncompressed_size,
                    max_size=self.config.limits.max_uncompressed_size,
                )

            # Check overall compression ratio
            if total_compressed_size > 0:
                overall_compression_ratio = (
                    total_uncompressed_size / total_compressed_size
                )
                if (
                    overall_compression_ratio
                    > self.config.limits.max_compression_ratio
                ):
                    logger.error(
                        "Overall compression ratio too high",
                        extra=log_extra(
                            {
                                "error_type": (
                                    "compression_ratio_exceeded"
                                ),
                                "overall_ratio": (
                                    overall_compression_ratio
                                ),
                                "max_ratio": (
                                    self.config.limits.max_compression_ratio
                                ),
                            }
                        ),
                    )
                    raise ZipBombError(
                        message=(
                            "Overall compression ratio"
                            " too high:"
                            f" {overall_compression_ratio:.1f}:1."
                            " Maximum allowed:"
                            f" {self.config.limits.max_compression_ratio}"
                            ":1"
                        ),
                        compression_ratio=(overall_compression_ratio),
                        max_ratio=(
                            self.config.limits.max_compression_ratio
                        ),
                    )

            # Reject nested archives (potential security risk)
            if (
                nested_archives
                and not self.config.limits.allow_nested_archives
            ):
                logger.warning(
                    "Nested archives detected",
                    extra=log_extra(
                        {
                            "error_type": "zip_nested_archive",
                            "nested_archives": nested_archives,
                        }
                    ),
                )
                raise CompressionSecurityError(
                    message=(
                        "Nested archives are not"
                        " allowed:"
                        f" {', '.join(nested_archives)}"
                    ),
                    error_code=ErrorCode.ZIP_NESTED_ARCHIVE,
                )

            # Cumulative entry count check for
            # complexity attack prevention
            max_recursive = self.config.limits.max_total_entries_recursive
            if file_count > max_recursive:
                logger.error(
                    "ZIP entry count exceeds recursive limit",
                    extra=log_extra(
                        {
                            "file_count": file_count,
                            "max_recursive": max_recursive,
                        }
                    ),
                )
                raise CompressionSecurityError(
                    message=(
                        "ZIP entry count"
                        f" ({file_count})"
                        " exceeds recursive limit"
                        f" ({max_recursive})"
                    ),
                    error_code=(ErrorCode.ZIP_COMPLEXITY_ATTACK),
                )

            # Log analysis results
            logger.debug(
                "ZIP analysis: %s files, %sMB uncompressed,"
                " max ratio: %.1f:1,"
                " overall ratio: %.1f:1",
                file_count,
                total_uncompressed_size // (1024 * 1024),
                max_compression_ratio,
                overall_compression_ratio,
            )

    except zipfile.BadZipFile as err:
        logger.error("Invalid or corrupted ZIP file", exc_info=True)
        raise CompressionSecurityError(
            message="Invalid or corrupted ZIP file",
            error_code=ErrorCode.ZIP_CORRUPT,
        ) from err
    except zipfile.LargeZipFile as err:
        logger.error("ZIP file too large to process", exc_info=True)
        raise CompressionSecurityError(
            message="ZIP file too large to process safely",
            error_code=ErrorCode.ZIP_TOO_LARGE,
        ) from err
    except MemoryError as err:
        logger.error("ZIP requires excessive memory", exc_info=True)
        raise ZipBombError(
            message=(
                "ZIP file requires too much"
                " memory to process"
                " - potential zip bomb"
            ),
            compression_ratio=0,
        ) from err
    except (ZipBombError, CompressionSecurityError):
        # Re-raise our own exceptions
        raise
    except Exception as err:
        logger.error(
            "Unexpected error during ZIP compression validation",
            exc_info=True,
        )
        raise FileProcessingError(
            message="ZIP validation failed due to an internal error",
        ) from err

ConfigValidationError dataclass

Configuration validation issue with severity and recommendation.

Attributes:

Name Type Description
error_type str

Type of the validation error.

message str

Human-readable error message.

severity str

Error severity level ('error', 'warning', 'info').

component str

Component that failed validation.

recommendation str

Optional recommendation to fix the issue.

Source code in safeuploads/exceptions.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@dataclass
class ConfigValidationError:
    """
    Configuration validation issue with severity and recommendation.

    Attributes:
        error_type: Type of the validation error.
        message: Human-readable error message.
        severity: Error severity level ('error', 'warning', 'info').
        component: Component that failed validation.
        recommendation: Optional recommendation to fix the issue.
    """

    error_type: str
    message: str
    severity: str  # 'error', 'warning', 'info'
    component: str
    recommendation: str = ""

ContentSecurityInspector

Scans file content for embedded malware and scripts.

Three detection layers: 1. Executable signatures — PE, ELF, Mach-O, Java class headers in files that should not be executables. 2. Script injection — Common web shell and script markers in non-script files. 3. Polyglot detection — Secondary format signatures (ZIP/JAR, RAR, Java class) embedded after a valid image or document header.

Attributes:

Name Type Description
config

File security configuration.

Source code in safeuploads/inspectors/content_inspector.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class ContentSecurityInspector:
    """
    Scans file content for embedded malware and scripts.

    Three detection layers:
    1. **Executable signatures** — PE, ELF, Mach-O, Java
       class headers in files that should not be executables.
    2. **Script injection** — Common web shell and script
       markers in non-script files.
    3. **Polyglot detection** — Secondary format signatures
       (ZIP/JAR, RAR, Java class) embedded after a valid
       image or document header.

    Attributes:
        config: File security configuration.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the content inspector.

        Args:
            config: File security configuration.
        """
        self.config = config
        self._audit = SecurityAuditLogger(
            enabled=config.limits.enable_audit_logging
        )

        # Pre-compile signature sets
        self._executable_sigs: tuple[bytes, ...] = tuple(
            sig
            for cat in (
                MalwareSignatureCategory.PE_EXECUTABLE,
                MalwareSignatureCategory.ELF_EXECUTABLE,
                MalwareSignatureCategory.MACHO_EXECUTABLE,
                MalwareSignatureCategory.JAVA_CLASS,
                MalwareSignatureCategory.WINDOWS_SHORTCUT,
            )
            for sig in cat.value
        )
        self._webshell_sigs: tuple[bytes, ...] = tuple(
            MalwareSignatureCategory.WEBSHELL_PATTERNS.value
        )
        self._polyglot_sigs: tuple[bytes, ...] = tuple(
            MalwareSignatureCategory.POLYGLOT_SIGNATURES.value
        )

    def scan_content(
        self,
        content: bytes,
        filename: str = "",
        expected_type: str = "",
    ) -> list[str]:
        """
        Scan file content for embedded threats.

        Args:
            content: Raw bytes to inspect (typically the
                first ``content_scan_max_size`` bytes).
            filename: Original filename for context.
            expected_type: Logical file type the content
                should represent (e.g. "image", "zip").

        Returns:
            List of threat descriptions found. Empty list
            means content is clean.
        """
        threats: list[str] = []

        # 1. Executable signature scan
        threats.extend(self._check_executable_signatures(content, filename))

        # 2. Script injection scan
        threats.extend(self._check_script_patterns(content, filename))

        # 3. Polyglot detection
        threats.extend(self._check_polyglot(content, filename, expected_type))

        if threats:
            logger.warning(
                "Content analysis threats detected in '%s': %s",
                filename,
                "; ".join(threats),
                extra=log_extra(),
            )
            cid = get_correlation_id()
            if cid:
                self._audit.threat(
                    filename,
                    cid,
                    "; ".join(threats),
                )

        return threats

    def _check_executable_signatures(
        self, content: bytes, filename: str
    ) -> list[str]:
        """
        Check for executable headers in content.

        Args:
            content: Raw bytes to inspect.
            filename: Filename for context.

        Returns:
            List of threat descriptions.
        """
        threats: list[str] = []
        for sig in self._executable_sigs:
            if sig in content:
                threats.append(
                    f"Executable signature detected in '{filename}': {sig!r}"
                )
                break
        return threats

    def _check_script_patterns(
        self, content: bytes, filename: str
    ) -> list[str]:
        """
        Check for script injection markers.

        Args:
            content: Raw bytes to inspect.
            filename: Filename for context.

        Returns:
            List of threat descriptions.
        """
        threats: list[str] = []
        # Check binary-level web shell signatures
        for sig in self._webshell_sigs:
            if sig in content:
                threats.append(
                    f"Web shell signature detected in '{filename}': {sig!r}"
                )
                break

        # Text-level script pattern scan
        try:
            text = content.decode("utf-8", errors="ignore").lower()
            for pattern in _SCRIPT_PATTERNS:
                if pattern in text:
                    threats.append(
                        f"Script pattern detected in '{filename}': '{pattern}'"
                    )
                    break
        except Exception:  # noqa: S110
            pass  # Binary decoding failure is non-critical

        return threats

    def _check_polyglot(
        self,
        content: bytes,
        filename: str,
        expected_type: str,
    ) -> list[str]:
        """
        Check for polyglot files (valid in multiple formats).

        Looks for secondary format signatures embedded after
        the expected header. Only runs for image and activity
        file types where polyglot attacks are most dangerous.

        Args:
            content: Raw bytes to inspect.
            filename: Filename for context.
            expected_type: Expected file type ("image", etc.)

        Returns:
            List of threat descriptions.
        """
        if expected_type not in ("image", "activity"):
            return []

        threats: list[str] = []
        # Skip first 8 bytes (longest common header is
        # PNG at 8 bytes) and search rest for secondary
        # signatures to detect polyglot files
        tail = content[8:]
        for sig in self._polyglot_sigs:
            if sig in tail:
                threats.append(
                    f"Polyglot file detected"
                    f" in '{filename}':"
                    f" secondary signature {sig!r}"
                    f" found after header"
                )
                break
        return threats

__init__

__init__(config)

Initialize the content inspector.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration.

required
Source code in safeuploads/inspectors/content_inspector.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the content inspector.

    Args:
        config: File security configuration.
    """
    self.config = config
    self._audit = SecurityAuditLogger(
        enabled=config.limits.enable_audit_logging
    )

    # Pre-compile signature sets
    self._executable_sigs: tuple[bytes, ...] = tuple(
        sig
        for cat in (
            MalwareSignatureCategory.PE_EXECUTABLE,
            MalwareSignatureCategory.ELF_EXECUTABLE,
            MalwareSignatureCategory.MACHO_EXECUTABLE,
            MalwareSignatureCategory.JAVA_CLASS,
            MalwareSignatureCategory.WINDOWS_SHORTCUT,
        )
        for sig in cat.value
    )
    self._webshell_sigs: tuple[bytes, ...] = tuple(
        MalwareSignatureCategory.WEBSHELL_PATTERNS.value
    )
    self._polyglot_sigs: tuple[bytes, ...] = tuple(
        MalwareSignatureCategory.POLYGLOT_SIGNATURES.value
    )

scan_content

scan_content(content, filename='', expected_type='')

Scan file content for embedded threats.

Parameters:

Name Type Description Default
content bytes

Raw bytes to inspect (typically the first content_scan_max_size bytes).

required
filename str

Original filename for context.

''
expected_type str

Logical file type the content should represent (e.g. "image", "zip").

''

Returns:

Type Description
list[str]

List of threat descriptions found. Empty list

list[str]

means content is clean.

Source code in safeuploads/inspectors/content_inspector.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def scan_content(
    self,
    content: bytes,
    filename: str = "",
    expected_type: str = "",
) -> list[str]:
    """
    Scan file content for embedded threats.

    Args:
        content: Raw bytes to inspect (typically the
            first ``content_scan_max_size`` bytes).
        filename: Original filename for context.
        expected_type: Logical file type the content
            should represent (e.g. "image", "zip").

    Returns:
        List of threat descriptions found. Empty list
        means content is clean.
    """
    threats: list[str] = []

    # 1. Executable signature scan
    threats.extend(self._check_executable_signatures(content, filename))

    # 2. Script injection scan
    threats.extend(self._check_script_patterns(content, filename))

    # 3. Polyglot detection
    threats.extend(self._check_polyglot(content, filename, expected_type))

    if threats:
        logger.warning(
            "Content analysis threats detected in '%s': %s",
            filename,
            "; ".join(threats),
            extra=log_extra(),
        )
        cid = get_correlation_id()
        if cid:
            self._audit.threat(
                filename,
                cid,
                "; ".join(threats),
            )

    return threats

DangerousExtensionCategory

Bases: Enum

File extension categories considered potentially dangerous for uploads.

Attributes:

Name Type Description
WINDOWS_EXECUTABLES

Traditional Windows executable formats.

SCRIPT_FILES

Script files that can execute code.

WEB_SCRIPTS

Web server and dynamic content scripts.

UNIX_EXECUTABLES

Unix/Linux executables and shell scripts.

MACOS_EXECUTABLES

macOS specific executables and applications.

JAVA_EXECUTABLES

Java related executables and bytecode.

MOBILE_APPS

Mobile application packages.

BROWSER_EXTENSIONS

Browser extensions and web applications.

PACKAGE_FORMATS

Modern package managers and distribution formats.

ARCHIVE_FORMATS

Archive formats that can contain executables.

VIRTUALIZATION_FORMATS

Virtualization and container formats.

OFFICE_MACROS

Office documents with macro capabilities.

SYSTEM_FILES

System shortcuts and configuration files.

SYSTEM_DRIVERS

System drivers and low-level components.

WINDOWS_THEMES

Windows theme and customization files.

HELP_FILES

Help and documentation files that can execute code.

Source code in safeuploads/enums.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class DangerousExtensionCategory(Enum):
    """
    File extension categories considered potentially dangerous for uploads.

    Attributes:
        WINDOWS_EXECUTABLES: Traditional Windows executable formats.
        SCRIPT_FILES: Script files that can execute code.
        WEB_SCRIPTS: Web server and dynamic content scripts.
        UNIX_EXECUTABLES: Unix/Linux executables and shell scripts.
        MACOS_EXECUTABLES: macOS specific executables and applications.
        JAVA_EXECUTABLES: Java related executables and bytecode.
        MOBILE_APPS: Mobile application packages.
        BROWSER_EXTENSIONS: Browser extensions and web applications.
        PACKAGE_FORMATS: Modern package managers and distribution formats.
        ARCHIVE_FORMATS: Archive formats that can contain executables.
        VIRTUALIZATION_FORMATS: Virtualization and container formats.
        OFFICE_MACROS: Office documents with macro capabilities.
        SYSTEM_FILES: System shortcuts and configuration files.
        SYSTEM_DRIVERS: System drivers and low-level components.
        WINDOWS_THEMES: Windows theme and customization files.
        HELP_FILES: Help and documentation files that can execute code.
    """

    # Traditional Windows executables
    WINDOWS_EXECUTABLES = {
        ".exe",
        ".bat",
        ".cmd",
        ".com",
        ".pif",
        ".scr",
        ".msi",
        ".dll",
    }

    # Script files that can execute code
    SCRIPT_FILES = {
        ".vbs",
        ".js",
        ".jse",
        ".wsf",
        ".wsh",
        ".hta",
        ".ps1",
        ".psm1",
        ".ps1xml",
        ".psc1",
        ".psd1",
        ".pssc",
        ".cdxml",
        ".xaml",
    }

    # Web server and dynamic content scripts
    WEB_SCRIPTS = {
        ".jsp",
        ".php",
        ".php3",
        ".php4",
        ".php5",
        ".phtml",
        ".asp",
        ".aspx",
        ".cer",
        ".cgi",
        ".pl",
        ".py",
        ".rb",
        ".go",
        ".lua",
    }

    # Unix/Linux executables and shell scripts
    UNIX_EXECUTABLES = {
        ".sh",
        ".bash",
        ".zsh",
        ".fish",
        ".csh",
        ".ksh",
        ".tcsh",
        ".run",
        ".bin",
        ".out",
        ".elf",
        ".so",
        ".a",
    }

    # macOS specific executables and applications
    MACOS_EXECUTABLES = {
        ".app",
        ".dmg",
        ".pkg",
        ".mpkg",
        ".command",
        ".tool",
        ".workflow",
        ".action",
        ".dylib",
        ".bundle",
        ".framework",
    }

    # Java related executables and bytecode
    JAVA_EXECUTABLES = {".jar", ".war", ".ear", ".jnlp", ".class"}

    # Mobile application packages
    MOBILE_APPS = {".apk", ".aab", ".ipa", ".appx", ".msix", ".xap"}

    # Browser extensions and web applications
    BROWSER_EXTENSIONS = {
        ".crx",
        ".xpi",
        ".safariextz",
        ".oex",
        ".nex",
        ".gadget",
    }

    # Modern package managers and distribution formats
    PACKAGE_FORMATS = {
        ".deb",
        ".rpm",
        ".snap",
        ".flatpak",
        ".appimage",
        ".vsix",
        ".nupkg",
        ".gem",
        ".whl",
        ".egg",
    }

    # Archive formats that can contain executables
    ARCHIVE_FORMATS = {
        ".7z",
        ".rar",
        ".cab",
        ".ace",
        ".arj",
        ".lzh",
        ".lha",
        ".zoo",
    }

    # Virtualization and container formats
    VIRTUALIZATION_FORMATS = {
        ".ova",
        ".ovf",
        ".vmdk",
        ".vdi",
        ".vhd",
        ".vhdx",
        ".qcow2",
        ".docker",
    }

    # Office documents with macro capabilities
    OFFICE_MACROS = {
        ".docm",
        ".dotm",
        ".xlsm",
        ".xltm",
        ".xlam",
        ".pptm",
        ".potm",
        ".ppam",
        ".sldm",
    }

    # System shortcuts and configuration files
    SYSTEM_FILES = {
        ".url",
        ".website",
        ".webloc",
        ".desktop",
        ".lnk",
        ".application",
        ".manifest",
        ".deploy",
        ".msu",
        ".patch",
        ".diff",
        ".reg",
        ".inf",
    }

    # System drivers and low-level components
    SYSTEM_DRIVERS = {".sys", ".drv", ".ocx", ".cpl"}

    # Windows theme and customization files
    WINDOWS_THEMES = {
        ".theme",
        ".themepack",
        ".scf",
        ".shs",
        ".shb",
        ".sct",
        ".ws",
        ".job",
        ".msc",
    }

    # Help and documentation files that can execute code
    HELP_FILES = {".chm", ".hlp"}

ErrorCode

Machine-readable error codes for file validation failures.

Source code in safeuploads/exceptions.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class ErrorCode:
    """
    Machine-readable error codes for file validation failures.

    Attributes:
        Error codes are class-level string constants for various
        validation failure types.
    """

    # Filename validation errors
    FILENAME_EMPTY = "FILENAME_EMPTY"
    FILENAME_INVALID = "FILENAME_INVALID"
    FILENAME_TOO_LONG = "FILENAME_TOO_LONG"

    # Unicode security errors
    UNICODE_SECURITY = "UNICODE_SECURITY"
    UNICODE_DANGEROUS_CHARS = "UNICODE_DANGEROUS_CHARS"
    UNICODE_NORMALIZATION_ERROR = "UNICODE_NORMALIZATION_ERROR"

    # Extension validation errors
    EXTENSION_BLOCKED = "EXTENSION_BLOCKED"
    EXTENSION_NOT_ALLOWED = "EXTENSION_NOT_ALLOWED"
    COMPOUND_EXTENSION_BLOCKED = "COMPOUND_EXTENSION_BLOCKED"
    EXTENSION_MISSING = "EXTENSION_MISSING"

    # Windows security errors
    WINDOWS_RESERVED_NAME = "WINDOWS_RESERVED_NAME"

    # File size errors
    FILE_TOO_LARGE = "FILE_TOO_LARGE"
    FILE_EMPTY = "FILE_EMPTY"
    FILE_SIZE_UNKNOWN = "FILE_SIZE_UNKNOWN"

    # MIME type errors
    MIME_TYPE_INVALID = "MIME_TYPE_INVALID"
    MIME_TYPE_MISMATCH = "MIME_TYPE_MISMATCH"
    MIME_DETECTION_FAILED = "MIME_DETECTION_FAILED"

    # File signature errors
    FILE_SIGNATURE_INVALID = "FILE_SIGNATURE_INVALID"
    FILE_SIGNATURE_MISSING = "FILE_SIGNATURE_MISSING"
    FILE_SIGNATURE_MISMATCH = "FILE_SIGNATURE_MISMATCH"

    # Compression and ZIP errors
    ZIP_BOMB_DETECTED = "ZIP_BOMB_DETECTED"
    ZIP_CONTENT_THREAT = "ZIP_CONTENT_THREAT"
    COMPRESSION_RATIO_EXCEEDED = "COMPRESSION_RATIO_EXCEEDED"
    ZIP_TOO_MANY_ENTRIES = "ZIP_TOO_MANY_ENTRIES"
    ZIP_INVALID_STRUCTURE = "ZIP_INVALID_STRUCTURE"
    ZIP_CORRUPT = "ZIP_CORRUPT"
    ZIP_TOO_LARGE = "ZIP_TOO_LARGE"
    ZIP_NESTED_ARCHIVE = "ZIP_NESTED_ARCHIVE"
    ZIP_DIRECTORY_TRAVERSAL = "ZIP_DIRECTORY_TRAVERSAL"
    ZIP_SYMLINK_DETECTED = "ZIP_SYMLINK_DETECTED"
    ZIP_ABSOLUTE_PATH = "ZIP_ABSOLUTE_PATH"
    ZIP_ANALYSIS_TIMEOUT = "ZIP_ANALYSIS_TIMEOUT"
    ZIP_RECURSIVE_STRUCTURE = "ZIP_RECURSIVE_STRUCTURE"
    ZIP_QUINE_DETECTED = "ZIP_QUINE_DETECTED"
    ZIP_COMPLEXITY_ATTACK = "ZIP_COMPLEXITY_ATTACK"

    # Resource limit errors
    RESOURCE_LIMIT_EXCEEDED = "RESOURCE_LIMIT_EXCEEDED"
    RESOURCE_TIME_EXCEEDED = "RESOURCE_TIME_EXCEEDED"
    RESOURCE_MEMORY_EXCEEDED = "RESOURCE_MEMORY_EXCEEDED"

    # Processing errors
    PROCESSING_ERROR = "PROCESSING_ERROR"
    IO_ERROR = "IO_ERROR"
    MEMORY_ERROR = "MEMORY_ERROR"

ExtensionSecurityError

Bases: FilenameSecurityError

Dangerous file extension detected.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename with dangerous extension.

None
extension str | None

Optional specific extension that was blocked.

None
error_code str | None

Optional error code (defaults to EXTENSION_BLOCKED).

None

Attributes:

Name Type Description
extension

The specific extension that was blocked.

Source code in safeuploads/exceptions.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class ExtensionSecurityError(FilenameSecurityError):
    """
    Dangerous file extension detected.

    Args:
        message: Human-readable error description.
        filename: Optional filename with dangerous extension.
        extension: Optional specific extension that was blocked.
        error_code: Optional error code (defaults to
            EXTENSION_BLOCKED).

    Attributes:
        extension: The specific extension that was blocked.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        extension: str | None = None,
        error_code: str | None = None,
    ):
        """Initialize with blocked extension."""
        self.extension = extension
        super().__init__(
            message,
            filename=filename,
            error_code=error_code or ErrorCode.EXTENSION_BLOCKED,
        )

__init__

__init__(
    message, filename=None, extension=None, error_code=None
)

Initialize with blocked extension.

Source code in safeuploads/exceptions.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def __init__(
    self,
    message: str,
    filename: str | None = None,
    extension: str | None = None,
    error_code: str | None = None,
):
    """Initialize with blocked extension."""
    self.extension = extension
    super().__init__(
        message,
        filename=filename,
        error_code=error_code or ErrorCode.EXTENSION_BLOCKED,
    )

ExtensionSecurityValidator

Bases: BaseValidator

Validates filenames against configured forbidden extensions.

Attributes:

Name Type Description
config

File security configuration settings.

Source code in safeuploads/validators/extension_validator.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ExtensionSecurityValidator(BaseValidator):
    """
    Validates filenames against configured forbidden extensions.

    Attributes:
        config: File security configuration settings.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the validator.

        Args:
            config: File security configuration settings.
        """
        super().__init__(config)
        # Pre-compile as frozensets for O(1) lookup
        self._blocked: frozenset[str] = frozenset(config.BLOCKED_EXTENSIONS)
        self._compound_blocked: tuple[str, ...] = tuple(
            config.COMPOUND_BLOCKED_EXTENSIONS
        )

    def validate_extensions(self, filename: str) -> None:
        """
        Validate filename against blocked extensions.

        Args:
            filename: Name of the file to validate.

        Raises:
            ExtensionSecurityError: If blocked compound or single
                extension detected in filename.
        """
        # Check for compound dangerous extensions first
        # (e.g., .tar.xz, .user.js)
        filename_lower = filename.lower()
        for compound_ext in self._compound_blocked:
            if filename_lower.endswith(compound_ext):
                logger.warning(
                    "Dangerous compound extension detected",
                    extra={
                        "error_type": "compound_extension_blocked",
                        "file_name": filename,
                        "extension": compound_ext,
                    },
                )
                raise ExtensionSecurityError(
                    message=(
                        f"Dangerous compound file extension"
                        f" '{compound_ext}' detected"
                        f" in filename."
                        f" Upload rejected for security."
                    ),
                    filename=filename,
                    extension=compound_ext,
                    error_code=ErrorCode.COMPOUND_EXTENSION_BLOCKED,
                )

        # Check ALL extensions in the filename for dangerous ones
        parts = filename.split(".")
        if len(parts) > 1:
            for i in range(1, len(parts)):
                ext = f".{parts[i].lower()}"
                if ext in self._blocked:
                    logger.warning(
                        "Dangerous extension detected",
                        extra={
                            "error_type": "extension_blocked",
                            "file_name": filename,
                            "extension": ext,
                        },
                    )
                    raise ExtensionSecurityError(
                        message=(
                            f"Dangerous file extension"
                            f" '{ext}' detected"
                            f" in filename."
                            f" Upload rejected for security."
                        ),
                        filename=filename,
                        extension=ext,
                        error_code=ErrorCode.EXTENSION_BLOCKED,
                    )

    def validate(self, filename: str) -> None:
        """
        Validate the given filename.

        Args:
            filename: Name of the file to validate.

        Raises:
            ExtensionSecurityError: If filename extension is not
                permitted.
        """
        return self.validate_extensions(filename)

__init__

__init__(config)

Initialize the validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration settings.

required
Source code in safeuploads/validators/extension_validator.py
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the validator.

    Args:
        config: File security configuration settings.
    """
    super().__init__(config)
    # Pre-compile as frozensets for O(1) lookup
    self._blocked: frozenset[str] = frozenset(config.BLOCKED_EXTENSIONS)
    self._compound_blocked: tuple[str, ...] = tuple(
        config.COMPOUND_BLOCKED_EXTENSIONS
    )

validate

validate(filename)

Validate the given filename.

Parameters:

Name Type Description Default
filename str

Name of the file to validate.

required

Raises:

Type Description
ExtensionSecurityError

If filename extension is not permitted.

Source code in safeuploads/validators/extension_validator.py
102
103
104
105
106
107
108
109
110
111
112
113
def validate(self, filename: str) -> None:
    """
    Validate the given filename.

    Args:
        filename: Name of the file to validate.

    Raises:
        ExtensionSecurityError: If filename extension is not
            permitted.
    """
    return self.validate_extensions(filename)

validate_extensions

validate_extensions(filename)

Validate filename against blocked extensions.

Parameters:

Name Type Description Default
filename str

Name of the file to validate.

required

Raises:

Type Description
ExtensionSecurityError

If blocked compound or single extension detected in filename.

Source code in safeuploads/validators/extension_validator.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def validate_extensions(self, filename: str) -> None:
    """
    Validate filename against blocked extensions.

    Args:
        filename: Name of the file to validate.

    Raises:
        ExtensionSecurityError: If blocked compound or single
            extension detected in filename.
    """
    # Check for compound dangerous extensions first
    # (e.g., .tar.xz, .user.js)
    filename_lower = filename.lower()
    for compound_ext in self._compound_blocked:
        if filename_lower.endswith(compound_ext):
            logger.warning(
                "Dangerous compound extension detected",
                extra={
                    "error_type": "compound_extension_blocked",
                    "file_name": filename,
                    "extension": compound_ext,
                },
            )
            raise ExtensionSecurityError(
                message=(
                    f"Dangerous compound file extension"
                    f" '{compound_ext}' detected"
                    f" in filename."
                    f" Upload rejected for security."
                ),
                filename=filename,
                extension=compound_ext,
                error_code=ErrorCode.COMPOUND_EXTENSION_BLOCKED,
            )

    # Check ALL extensions in the filename for dangerous ones
    parts = filename.split(".")
    if len(parts) > 1:
        for i in range(1, len(parts)):
            ext = f".{parts[i].lower()}"
            if ext in self._blocked:
                logger.warning(
                    "Dangerous extension detected",
                    extra={
                        "error_type": "extension_blocked",
                        "file_name": filename,
                        "extension": ext,
                    },
                )
                raise ExtensionSecurityError(
                    message=(
                        f"Dangerous file extension"
                        f" '{ext}' detected"
                        f" in filename."
                        f" Upload rejected for security."
                    ),
                    filename=filename,
                    extension=ext,
                    error_code=ErrorCode.EXTENSION_BLOCKED,
                )

FileProcessingError

Bases: FileSecurityError

Unexpected processing error during file validation.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
original_error Exception | None

Optional original exception that was caught.

None

Attributes:

Name Type Description
original_error

The original exception that was caught.

Source code in safeuploads/exceptions.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
class FileProcessingError(FileSecurityError):
    """
    Unexpected processing error during file validation.

    Args:
        message: Human-readable error description.
        original_error: Optional original exception that was caught.

    Attributes:
        original_error: The original exception that was caught.
    """

    def __init__(self, message: str, original_error: Exception | None = None):
        """Initialize with original error."""
        self.original_error = original_error
        super().__init__(message, error_code=ErrorCode.PROCESSING_ERROR)

__init__

__init__(message, original_error=None)

Initialize with original error.

Source code in safeuploads/exceptions.py
480
481
482
483
def __init__(self, message: str, original_error: Exception | None = None):
    """Initialize with original error."""
    self.original_error = original_error
    super().__init__(message, error_code=ErrorCode.PROCESSING_ERROR)

FileSecurityConfig

Centralizes file upload security settings and validation.

Attributes:

Name Type Description
limits

Security limits configuration instance.

ALLOWED_IMAGE_MIMES set[str]

Permitted MIME types for images.

ALLOWED_ZIP_MIMES set[str]

Permitted MIME types for ZIP files.

ALLOWED_IMAGE_EXTENSIONS set[str]

Permitted image file extensions.

ALLOWED_ZIP_EXTENSIONS set[str]

Permitted ZIP file extensions.

BLOCKED_EXTENSIONS frozenset[str]

Dangerous file extensions to block.

COMPOUND_BLOCKED_EXTENSIONS frozenset[str]

Multi-part extensions to block.

DANGEROUS_UNICODE_CHARS frozenset[int]

Unicode characters for filename attacks.

WINDOWS_RESERVED_NAMES frozenset[str]

Platform-specific reserved filenames.

Source code in safeuploads/config.py
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
class FileSecurityConfig:
    """
    Centralizes file upload security settings and validation.

    Attributes:
        limits: Security limits configuration instance.
        ALLOWED_IMAGE_MIMES: Permitted MIME types for images.
        ALLOWED_ZIP_MIMES: Permitted MIME types for ZIP files.
        ALLOWED_IMAGE_EXTENSIONS: Permitted image file extensions.
        ALLOWED_ZIP_EXTENSIONS: Permitted ZIP file extensions.
        BLOCKED_EXTENSIONS: Dangerous file extensions to block.
        COMPOUND_BLOCKED_EXTENSIONS: Multi-part extensions to block.
        DANGEROUS_UNICODE_CHARS: Unicode characters for filename attacks.
        WINDOWS_RESERVED_NAMES: Platform-specific reserved filenames.
    """

    # Security limits configuration
    limits = SecurityLimits()

    # Allowed MIME types for images
    ALLOWED_IMAGE_MIMES: set[str] = {"image/jpeg", "image/jpg", "image/png"}

    # Allowed MIME types for ZIP files
    ALLOWED_ZIP_MIMES: set[str] = {
        "application/zip",
        "application/x-zip-compressed",
        "multipart/x-zip",
    }

    # Allowed MIME types for activity files (GPX/TCX/FIT)
    ALLOWED_ACTIVITY_MIMES: set[str] = {
        "application/gpx+xml",
        "application/xml",
        "text/xml",
        "application/octet-stream",  # FIT files detected as binary
    }

    # Allowed MIME types for gzip files
    ALLOWED_GZIP_MIMES: set[str] = {
        "application/gzip",
        "application/x-gzip",
    }

    # Allowed file extensions
    ALLOWED_IMAGE_EXTENSIONS: set[str] = {".jpg", ".jpeg", ".png"}
    ALLOWED_ZIP_EXTENSIONS: set[str] = {".zip"}
    ALLOWED_ACTIVITY_EXTENSIONS: set[str] = {
        ".gpx",
        ".tcx",
        ".fit",
    }
    ALLOWED_GZIP_EXTENSIONS: set[str] = {".gz"}

    # Generate dangerous file extensions from categorized enums
    @staticmethod
    def _generate_blocked_extensions() -> frozenset[str]:
        """
        Aggregate all dangerous extension categories.

        Returns:
            Combined frozenset of blocked file extensions.
        """
        blocked_extensions: set[str] = set()

        # Combine all dangerous extension categories
        for category in DangerousExtensionCategory:
            blocked_extensions.update(category.value)

        return frozenset(blocked_extensions)

    # Generate compound dangerous file extensions
    @staticmethod
    def _generate_compound_blocked_extensions() -> frozenset[str]:
        """
        Aggregate all compound extension categories.

        Returns:
            Combined frozenset of blocked compound extensions.
        """
        compound_extensions: set[str] = set()

        # Combine all compound extension categories
        for category in CompoundExtensionCategory:
            compound_extensions.update(category.value)

        return frozenset(compound_extensions)

    # Generate dangerous Unicode characters from categorized enums
    @staticmethod
    def _generate_dangerous_unicode_chars() -> frozenset[int]:
        """
        Aggregate all dangerous Unicode code points.

        Returns:
            Combined frozenset of dangerous Unicode code points.
        """
        dangerous_chars: set[int] = set()

        # Combine all Unicode attack categories
        for category in UnicodeAttackCategory:
            dangerous_chars.update(category.value)

        return frozenset(dangerous_chars)

    # Dangerous file extensions (generated from enums)
    BLOCKED_EXTENSIONS: frozenset[str] = _generate_blocked_extensions()

    # Compound dangerous extensions (multi-part)
    COMPOUND_BLOCKED_EXTENSIONS: frozenset[str] = (
        _generate_compound_blocked_extensions()
    )

    # Dangerous Unicode characters for filename attacks
    DANGEROUS_UNICODE_CHARS: frozenset[int] = (
        _generate_dangerous_unicode_chars()
    )

    # Windows reserved names
    WINDOWS_RESERVED_NAMES: frozenset[str] = frozenset(
        {
            "con",
            "prn",
            "aux",
            "nul",
            "com1",
            "com2",
            "com3",
            "com4",
            "com5",
            "com6",
            "com7",
            "com8",
            "com9",
            "lpt1",
            "lpt2",
            "lpt3",
            "lpt4",
            "lpt5",
            "lpt6",
            "lpt7",
            "lpt8",
            "lpt9",
        }
    )

    # Configuration validation trigger
    @classmethod
    def __init_subclass__(cls, **kwargs):
        """
        Validate configuration on subclass creation.

        Args:
            **kwargs: Subclass initialization arguments.
        """
        super().__init_subclass__(**kwargs)
        # Perform validation with warnings allowed (non-strict mode)
        try:
            cls.validate_and_report(strict=False)
        except Exception as err:
            logger.warning("Configuration validation failed: %s", err)

    @classmethod
    def get_extensions_by_category(
        cls, category: DangerousExtensionCategory
    ) -> set[str]:
        """
        Return extensions for a dangerous extension category.

        Args:
            category: The dangerous extension category.

        Returns:
            Copy of extensions in the specified category.
        """
        return category.value.copy()

    @classmethod
    def get_compound_extensions_by_category(
        cls, category: CompoundExtensionCategory
    ) -> set[str]:
        """
        Return compound extensions for a category.

        Args:
            category: The compound extension category.

        Returns:
            Copy of compound extensions in the specified category.
        """
        return category.value.copy()

    @classmethod
    def get_unicode_chars_by_category(
        cls, category: UnicodeAttackCategory
    ) -> set[int]:
        """
        Return Unicode code points for an attack category.

        Args:
            category: The Unicode attack category.

        Returns:
            Copy of code points in the specified category.
        """
        return category.value.copy()

    @classmethod
    def is_extension_in_category(
        cls, extension: str, category: DangerousExtensionCategory
    ) -> bool:
        """
        Check if extension belongs to a dangerous category.

        Args:
            extension: File extension to evaluate.
            category: Category to check against.

        Returns:
            True if extension is in the category, False otherwise.
        """
        return extension.lower() in category.value

    @classmethod
    def get_extension_category(
        cls, extension: str
    ) -> DangerousExtensionCategory | None:
        """
        Return the dangerous extension category for an extension.

        Args:
            extension: The file extension to evaluate.

        Returns:
            Matching category if dangerous, None otherwise.
        """
        extension_lower = extension.lower()
        for category in DangerousExtensionCategory:
            if extension_lower in category.value:
                return category
        return None

    @classmethod
    def validate_configuration(
        cls, strict: bool = True
    ) -> list[ConfigValidationError]:
        """
        Run all configuration validation routines.

        Args:
            strict: Reserved for future behavior adjustments.

        Returns:
            List of detected validation errors.
        """
        errors = []

        # Validate file size limits
        errors.extend(cls._validate_file_size_limits())

        # Validate MIME type configurations
        errors.extend(cls._validate_mime_configurations())

        # Validate file extension configurations
        errors.extend(cls._validate_extension_configurations())

        # Validate ZIP compression settings
        errors.extend(cls._validate_compression_settings())

        # Validate enum consistency
        errors.extend(cls._validate_enum_consistency())

        # Validate cross-configuration dependencies
        errors.extend(cls._validate_cross_dependencies())

        return errors

    @classmethod
    def _validate_file_size_limits(cls) -> list[ConfigValidationError]:
        """
        Validate configured file size limits.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check image size limits
        if cls.limits.max_image_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_size_limit",
                    message="max_image_size must be greater than 0",
                    severity="error",
                    component="file_sizes",
                    recommendation=(
                        "Set max_image_size to a positive value (e.g., 20MB)"
                    ),
                )
            )

        if cls.limits.max_image_size > 100 * 1024 * 1024:  # 100MB
            errors.append(
                ConfigValidationError(
                    error_type="excessive_size_limit",
                    message=(
                        "max_image_size"
                        f" ({cls.limits.max_image_size // (1024 * 1024)}"
                        "MB) is very large"
                    ),
                    severity="warning",
                    component="file_sizes",
                    recommendation=(
                        "Consider reducing image size"
                        " limit to prevent resource"
                        " exhaustion"
                    ),
                )
            )

        # Check ZIP size limits
        if cls.limits.max_zip_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_size_limit",
                    message="max_zip_size must be greater than 0",
                    severity="error",
                    component="file_sizes",
                    recommendation=(
                        "Set max_zip_size to a positive value (e.g., 500MB)"
                    ),
                )
            )

        if cls.limits.max_zip_size > 2 * 1024 * 1024 * 1024:  # 2GB
            errors.append(
                ConfigValidationError(
                    error_type="excessive_size_limit",
                    message=(
                        "max_zip_size"
                        f" ({cls.limits.max_zip_size // (1024 * 1024)}"
                        "MB) is very large"
                    ),
                    severity="warning",
                    component="file_sizes",
                    recommendation=(
                        "Consider reducing ZIP size"
                        " limit to prevent resource"
                        " exhaustion"
                    ),
                )
            )

        # Validate size relationship
        if cls.limits.max_zip_size <= cls.limits.max_image_size:
            errors.append(
                ConfigValidationError(
                    error_type="inconsistent_size_limits",
                    message=(
                        "max_zip_size should typically be"
                        " larger than max_image_size"
                    ),
                    severity="warning",
                    component="file_sizes",
                    recommendation=(
                        "ZIP files usually contain"
                        " multiple files and should"
                        " have higher limits"
                    ),
                )
            )

        return errors

    @classmethod
    def _validate_mime_configurations(cls) -> list[ConfigValidationError]:
        """
        Validate MIME type configurations.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check image MIME types
        if not cls.ALLOWED_IMAGE_MIMES:
            errors.append(
                ConfigValidationError(
                    error_type="empty_mime_set",
                    message="ALLOWED_IMAGE_MIMES cannot be empty",
                    severity="error",
                    component="mime_types",
                    recommendation="Add at least one allowed image MIME type",
                )
            )

        # Validate image MIME type format
        for mime_type in cls.ALLOWED_IMAGE_MIMES:
            if not mime_type.startswith("image/"):
                errors.append(
                    ConfigValidationError(
                        error_type="invalid_image_mime",
                        message=(
                            "Image MIME type"
                            f" '{mime_type}' should"
                            " start with 'image/'"
                        ),
                        severity="warning",
                        component="mime_types",
                        recommendation=(
                            "Use standard image MIME"
                            " types like 'image/jpeg',"
                            " 'image/png'"
                        ),
                    )
                )

        # Check ZIP MIME types
        if not cls.ALLOWED_ZIP_MIMES:
            errors.append(
                ConfigValidationError(
                    error_type="empty_mime_set",
                    message="ALLOWED_ZIP_MIMES cannot be empty",
                    severity="error",
                    component="mime_types",
                    recommendation="Add at least one allowed ZIP MIME type",
                )
            )

        # Check for duplicate MIME types
        all_mimes = list(cls.ALLOWED_IMAGE_MIMES) + list(cls.ALLOWED_ZIP_MIMES)
        duplicates = {mime for mime in all_mimes if all_mimes.count(mime) > 1}
        if duplicates:
            errors.append(
                ConfigValidationError(
                    error_type="duplicate_mime_types",
                    message=f"Duplicate MIME types found: {duplicates}",
                    severity="warning",
                    component="mime_types",
                    recommendation=(
                        "Remove duplicate MIME types to avoid confusion"
                    ),
                )
            )

        return errors

    @classmethod
    def _validate_extension_configurations(cls) -> list[ConfigValidationError]:
        """
        Validate file extension configurations.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check extension format
        for ext_set_name, ext_set in [
            ("ALLOWED_IMAGE_EXTENSIONS", cls.ALLOWED_IMAGE_EXTENSIONS),
            ("ALLOWED_ZIP_EXTENSIONS", cls.ALLOWED_ZIP_EXTENSIONS),
        ]:
            if not ext_set:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_extension_set",
                        message=f"{ext_set_name} cannot be empty",
                        severity="error",
                        component="extensions",
                        recommendation=(
                            f"Add at least one extension to {ext_set_name}"
                        ),
                    )
                )

            for ext in ext_set:
                if not ext.startswith("."):
                    errors.append(
                        ConfigValidationError(
                            error_type="invalid_extension_format",
                            message=(
                                f"Extension '{ext}'"
                                f" in {ext_set_name}"
                                " should start with '.'"
                            ),
                            severity="error",
                            component="extensions",
                            recommendation=(
                                "Use format '.ext' for file extensions"
                            ),
                        )
                    )

        # Check blocked extensions
        if not cls.BLOCKED_EXTENSIONS:
            errors.append(
                ConfigValidationError(
                    error_type="empty_blocked_extensions",
                    message="BLOCKED_EXTENSIONS is empty - security risk",
                    severity="error",
                    component="extensions",
                    recommendation=(
                        "Ensure dangerous extensions are properly blocked"
                    ),
                )
            )

        # Check for overlap between allowed and blocked extensions
        image_blocked = cls.ALLOWED_IMAGE_EXTENSIONS.intersection(
            cls.BLOCKED_EXTENSIONS
        )
        if image_blocked:
            errors.append(
                ConfigValidationError(
                    error_type="extension_conflict",
                    message=(
                        f"Image extensions {image_blocked}"
                        " are both allowed and blocked"
                    ),
                    severity="error",
                    component="extensions",
                    recommendation=(
                        "Remove conflicts between"
                        " allowed and blocked"
                        " extensions"
                    ),
                )
            )

        zip_blocked = cls.ALLOWED_ZIP_EXTENSIONS.intersection(
            cls.BLOCKED_EXTENSIONS
        )
        if zip_blocked:
            errors.append(
                ConfigValidationError(
                    error_type="extension_conflict",
                    message=(
                        f"ZIP extensions {zip_blocked}"
                        " are both allowed and blocked"
                    ),
                    severity="error",
                    component="extensions",
                    recommendation=(
                        "Remove conflicts between"
                        " allowed and blocked"
                        " extensions"
                    ),
                )
            )

        # Check compound extension consistency
        compound_overlap = cls.BLOCKED_EXTENSIONS.intersection(
            cls.COMPOUND_BLOCKED_EXTENSIONS
        )
        if compound_overlap:
            errors.append(
                ConfigValidationError(
                    error_type="compound_extension_overlap",
                    message=(
                        f"Extensions {compound_overlap}"
                        " appear in both blocked and"
                        " compound blocked lists"
                    ),
                    severity="warning",
                    component="extensions",
                    recommendation=(
                        "Compound extensions should"
                        " only be in"
                        " COMPOUND_BLOCKED_EXTENSIONS"
                    ),
                )
            )

        return errors

    @classmethod
    def _validate_compression_settings(cls) -> list[ConfigValidationError]:
        """
        Validate compression-related limits.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Validate compression ratio
        if cls.limits.max_compression_ratio <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_compression_ratio",
                    message="max_compression_ratio must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation=(
                        "Set a reasonable compression"
                        " ratio limit (e.g., 100:1)"
                    ),
                )
            )

        if cls.limits.max_compression_ratio < 10:
            errors.append(
                ConfigValidationError(
                    error_type="too_strict_compression",
                    message=(
                        "max_compression_ratio"
                        f" ({cls.limits.max_compression_ratio})"
                        " is very strict"
                    ),
                    severity="warning",
                    component="compression",
                    recommendation=(
                        "Consider allowing higher"
                        " compression ratios for"
                        " legitimate files"
                    ),
                )
            )

        if cls.limits.max_compression_ratio > 1000:
            errors.append(
                ConfigValidationError(
                    error_type="too_permissive_compression",
                    message=(
                        "max_compression_ratio"
                        f" ({cls.limits.max_compression_ratio})"
                        " may allow zip bombs"
                    ),
                    severity="warning",
                    component="compression",
                    recommendation=(
                        "Reduce compression ratio"
                        " limit to prevent zip bomb"
                        " attacks"
                    ),
                )
            )

        # Validate uncompressed size limit
        if cls.limits.max_uncompressed_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_uncompressed_size",
                    message="max_uncompressed_size must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable uncompressed size limit",
                )
            )

        # Validate individual file size limit
        if cls.limits.max_individual_file_size <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_individual_file_size",
                    message="max_individual_file_size must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation=(
                        "Set a reasonable individual file size limit"
                    ),
                )
            )

        # Check individual file size doesn't exceed total uncompressed size
        if cls.limits.max_individual_file_size > (
            cls.limits.max_uncompressed_size
        ):
            ind_mb = cls.limits.max_individual_file_size // (1024 * 1024)
            uncomp_mb = cls.limits.max_uncompressed_size // (1024 * 1024)
            errors.append(
                ConfigValidationError(
                    error_type="inconsistent_size_limits",
                    message=(
                        "max_individual_file_size"
                        f" ({ind_mb}MB) exceeds"
                        " max_uncompressed_size"
                        f" ({uncomp_mb}MB)"
                    ),
                    severity="warning",
                    component="compression",
                    recommendation=(
                        "Individual file size limit"
                        " should not exceed total"
                        " uncompressed size limit"
                    ),
                )
            )

        # Validate ZIP entry limits
        if cls.limits.max_zip_entries <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_zip_entries",
                    message="max_zip_entries must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation=(
                        "Set a reasonable limit for ZIP file entries"
                    ),
                )
            )

        if cls.limits.max_zip_entries > 100000:
            errors.append(
                ConfigValidationError(
                    error_type="excessive_zip_entries",
                    message=(
                        "max_zip_entries"
                        f" ({cls.limits.max_zip_entries})"
                        " is very high"
                    ),
                    severity="warning",
                    component="compression",
                    recommendation="High entry limits may impact performance",
                )
            )

        # Validate timeout settings
        if cls.limits.zip_analysis_timeout <= 0:
            errors.append(
                ConfigValidationError(
                    error_type="invalid_timeout",
                    message="zip_analysis_timeout must be greater than 0",
                    severity="error",
                    component="compression",
                    recommendation="Set a reasonable timeout for ZIP analysis",
                )
            )

        if cls.limits.zip_analysis_timeout > 30:
            errors.append(
                ConfigValidationError(
                    error_type="excessive_timeout",
                    message=(
                        "zip_analysis_timeout"
                        f" ({cls.limits.zip_analysis_timeout}s)"
                        " is very long"
                    ),
                    severity="warning",
                    component="compression",
                    recommendation="Long timeouts may impact user experience",
                )
            )

        return errors

    @classmethod
    def _validate_enum_consistency(cls) -> list[ConfigValidationError]:
        """
        Validate enum categories for emptiness and overlaps.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check for empty enum categories
        for category in DangerousExtensionCategory:
            if not category.value:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_enum_category",
                        message=f"Extension category {category.name} is empty",
                        severity="warning",
                        component="enums",
                        recommendation=(
                            "Add extensions to"
                            f" {category.name} or remove"
                            " unused category"
                        ),
                    )
                )

        for category in CompoundExtensionCategory:
            if not category.value:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_enum_category",
                        message=(
                            "Compound extension"
                            " category"
                            f" {category.name} is empty"
                        ),
                        severity="warning",
                        component="enums",
                        recommendation=(
                            "Add extensions to"
                            f" {category.name} or remove"
                            " unused category"
                        ),
                    )
                )

        for category in UnicodeAttackCategory:
            if not category.value:
                errors.append(
                    ConfigValidationError(
                        error_type="empty_enum_category",
                        message=(
                            f"Unicode attack category {category.name} is empty"
                        ),
                        severity="warning",
                        component="enums",
                        recommendation=(
                            "Add Unicode characters to"
                            f" {category.name} or remove"
                            " unused category"
                        ),
                    )
                )

        # Check for overlapping extensions between categories
        all_extensions_by_category = {}
        for category in DangerousExtensionCategory:
            all_extensions_by_category[category.name] = category.value

        for cat1_name, cat1_exts in all_extensions_by_category.items():
            for cat2_name, cat2_exts in all_extensions_by_category.items():
                if cat1_name != cat2_name:
                    overlap = cat1_exts.intersection(cat2_exts)
                    if overlap:
                        errors.append(
                            ConfigValidationError(
                                error_type="category_overlap",
                                message=(
                                    f"Categories {cat1_name}"
                                    f" and {cat2_name}"
                                    " share extensions:"
                                    f" {overlap}"
                                ),
                                severity="info",
                                component="enums",
                                recommendation=(
                                    "Consider if extensions"
                                    " should belong to"
                                    " multiple categories"
                                ),
                            )
                        )

        return errors

    @classmethod
    def _validate_cross_dependencies(cls) -> list[ConfigValidationError]:
        """
        Validate cross-field configuration constraints.

        Returns:
            List of detected configuration issues.
        """
        errors = []

        # Check Windows reserved names format
        for name in cls.WINDOWS_RESERVED_NAMES:
            if not name.islower():
                errors.append(
                    ConfigValidationError(
                        error_type="case_sensitive_reserved_name",
                        message=(
                            "Windows reserved name"
                            f" '{name}' should be"
                            " lowercase"
                        ),
                        severity="warning",
                        component="reserved_names",
                        recommendation=(
                            "Use lowercase for"
                            " consistent"
                            " case-insensitive matching"
                        ),
                    )
                )

        # Validate Unicode character ranges
        for char_code in cls.DANGEROUS_UNICODE_CHARS:
            if not isinstance(char_code, int):
                errors.append(
                    ConfigValidationError(
                        error_type="invalid_unicode_char",
                        message=(
                            "Unicode character code"
                            f" {char_code} is not"
                            " an integer"
                        ),
                        severity="error",
                        component="unicode",
                        recommendation="Use integer Unicode code points",
                    )
                )
            elif char_code < 0 or char_code > 0x10FFFF:
                errors.append(
                    ConfigValidationError(
                        error_type="invalid_unicode_range",
                        message=(
                            "Unicode character code"
                            f" {char_code} is outside"
                            " valid range"
                        ),
                        severity="error",
                        component="unicode",
                        recommendation=(
                            "Use valid Unicode code points (0-0x10FFFF)"
                        ),
                    )
                )

        return errors

    @classmethod
    def validate_and_report(cls, strict: bool = True) -> None:
        """
        Validate configuration and log outcomes.

        Args:
            strict: If True, raise on errors/warnings.

        Raises:
            FileSecurityConfigurationError: If strict and issues found.
        """
        errors = cls.validate_configuration(strict=strict)

        if not errors:
            logger.info("File security configuration validation passed")
            return

        # Separate errors by severity
        error_list = [e for e in errors if e.severity == "error"]
        warning_list = [e for e in errors if e.severity == "warning"]
        info_list = [e for e in errors if e.severity == "info"]

        # Log validation results
        if error_list:
            for error in error_list:
                logger.error(
                    "Configuration error in %s: %s. %s",
                    error.component,
                    error.message,
                    error.recommendation,
                )

        if warning_list:
            for warning in warning_list:
                logger.warning(
                    "Configuration warning in %s: %s. %s",
                    warning.component,
                    warning.message,
                    warning.recommendation,
                )

        if info_list:
            for info in info_list:
                logger.info(
                    "Configuration info in %s: %s. %s",
                    info.component,
                    info.message,
                    info.recommendation,
                )

        # Raise exception if there are errors and strict mode is enabled
        if error_list and strict:
            raise FileSecurityConfigurationError(error_list)
        if (error_list or warning_list) and strict:
            raise FileSecurityConfigurationError(error_list + warning_list)

__init_subclass__ classmethod

__init_subclass__(**kwargs)

Validate configuration on subclass creation.

Parameters:

Name Type Description Default
**kwargs

Subclass initialization arguments.

{}
Source code in safeuploads/config.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@classmethod
def __init_subclass__(cls, **kwargs):
    """
    Validate configuration on subclass creation.

    Args:
        **kwargs: Subclass initialization arguments.
    """
    super().__init_subclass__(**kwargs)
    # Perform validation with warnings allowed (non-strict mode)
    try:
        cls.validate_and_report(strict=False)
    except Exception as err:
        logger.warning("Configuration validation failed: %s", err)

get_compound_extensions_by_category classmethod

get_compound_extensions_by_category(category)

Return compound extensions for a category.

Parameters:

Name Type Description Default
category CompoundExtensionCategory

The compound extension category.

required

Returns:

Type Description
set[str]

Copy of compound extensions in the specified category.

Source code in safeuploads/config.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
@classmethod
def get_compound_extensions_by_category(
    cls, category: CompoundExtensionCategory
) -> set[str]:
    """
    Return compound extensions for a category.

    Args:
        category: The compound extension category.

    Returns:
        Copy of compound extensions in the specified category.
    """
    return category.value.copy()

get_extension_category classmethod

get_extension_category(extension)

Return the dangerous extension category for an extension.

Parameters:

Name Type Description Default
extension str

The file extension to evaluate.

required

Returns:

Type Description
DangerousExtensionCategory | None

Matching category if dangerous, None otherwise.

Source code in safeuploads/config.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
@classmethod
def get_extension_category(
    cls, extension: str
) -> DangerousExtensionCategory | None:
    """
    Return the dangerous extension category for an extension.

    Args:
        extension: The file extension to evaluate.

    Returns:
        Matching category if dangerous, None otherwise.
    """
    extension_lower = extension.lower()
    for category in DangerousExtensionCategory:
        if extension_lower in category.value:
            return category
    return None

get_extensions_by_category classmethod

get_extensions_by_category(category)

Return extensions for a dangerous extension category.

Parameters:

Name Type Description Default
category DangerousExtensionCategory

The dangerous extension category.

required

Returns:

Type Description
set[str]

Copy of extensions in the specified category.

Source code in safeuploads/config.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@classmethod
def get_extensions_by_category(
    cls, category: DangerousExtensionCategory
) -> set[str]:
    """
    Return extensions for a dangerous extension category.

    Args:
        category: The dangerous extension category.

    Returns:
        Copy of extensions in the specified category.
    """
    return category.value.copy()

get_unicode_chars_by_category classmethod

get_unicode_chars_by_category(category)

Return Unicode code points for an attack category.

Parameters:

Name Type Description Default
category UnicodeAttackCategory

The Unicode attack category.

required

Returns:

Type Description
set[int]

Copy of code points in the specified category.

Source code in safeuploads/config.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@classmethod
def get_unicode_chars_by_category(
    cls, category: UnicodeAttackCategory
) -> set[int]:
    """
    Return Unicode code points for an attack category.

    Args:
        category: The Unicode attack category.

    Returns:
        Copy of code points in the specified category.
    """
    return category.value.copy()

is_extension_in_category classmethod

is_extension_in_category(extension, category)

Check if extension belongs to a dangerous category.

Parameters:

Name Type Description Default
extension str

File extension to evaluate.

required
category DangerousExtensionCategory

Category to check against.

required

Returns:

Type Description
bool

True if extension is in the category, False otherwise.

Source code in safeuploads/config.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
@classmethod
def is_extension_in_category(
    cls, extension: str, category: DangerousExtensionCategory
) -> bool:
    """
    Check if extension belongs to a dangerous category.

    Args:
        extension: File extension to evaluate.
        category: Category to check against.

    Returns:
        True if extension is in the category, False otherwise.
    """
    return extension.lower() in category.value

validate_and_report classmethod

validate_and_report(strict=True)

Validate configuration and log outcomes.

Parameters:

Name Type Description Default
strict bool

If True, raise on errors/warnings.

True

Raises:

Type Description
FileSecurityConfigurationError

If strict and issues found.

Source code in safeuploads/config.py
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
@classmethod
def validate_and_report(cls, strict: bool = True) -> None:
    """
    Validate configuration and log outcomes.

    Args:
        strict: If True, raise on errors/warnings.

    Raises:
        FileSecurityConfigurationError: If strict and issues found.
    """
    errors = cls.validate_configuration(strict=strict)

    if not errors:
        logger.info("File security configuration validation passed")
        return

    # Separate errors by severity
    error_list = [e for e in errors if e.severity == "error"]
    warning_list = [e for e in errors if e.severity == "warning"]
    info_list = [e for e in errors if e.severity == "info"]

    # Log validation results
    if error_list:
        for error in error_list:
            logger.error(
                "Configuration error in %s: %s. %s",
                error.component,
                error.message,
                error.recommendation,
            )

    if warning_list:
        for warning in warning_list:
            logger.warning(
                "Configuration warning in %s: %s. %s",
                warning.component,
                warning.message,
                warning.recommendation,
            )

    if info_list:
        for info in info_list:
            logger.info(
                "Configuration info in %s: %s. %s",
                info.component,
                info.message,
                info.recommendation,
            )

    # Raise exception if there are errors and strict mode is enabled
    if error_list and strict:
        raise FileSecurityConfigurationError(error_list)
    if (error_list or warning_list) and strict:
        raise FileSecurityConfigurationError(error_list + warning_list)

validate_configuration classmethod

validate_configuration(strict=True)

Run all configuration validation routines.

Parameters:

Name Type Description Default
strict bool

Reserved for future behavior adjustments.

True

Returns:

Type Description
list[ConfigValidationError]

List of detected validation errors.

Source code in safeuploads/config.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
@classmethod
def validate_configuration(
    cls, strict: bool = True
) -> list[ConfigValidationError]:
    """
    Run all configuration validation routines.

    Args:
        strict: Reserved for future behavior adjustments.

    Returns:
        List of detected validation errors.
    """
    errors = []

    # Validate file size limits
    errors.extend(cls._validate_file_size_limits())

    # Validate MIME type configurations
    errors.extend(cls._validate_mime_configurations())

    # Validate file extension configurations
    errors.extend(cls._validate_extension_configurations())

    # Validate ZIP compression settings
    errors.extend(cls._validate_compression_settings())

    # Validate enum consistency
    errors.extend(cls._validate_enum_consistency())

    # Validate cross-configuration dependencies
    errors.extend(cls._validate_cross_dependencies())

    return errors

FileSecurityConfigurationError

Bases: Exception

Configuration validation failed with aggregated errors.

Parameters:

Name Type Description Default
errors list[ConfigValidationError]

List of ConfigValidationError instances.

required

Attributes:

Name Type Description
errors

List of validation errors that caused failure.

Source code in safeuploads/exceptions.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class FileSecurityConfigurationError(Exception):
    """
    Configuration validation failed with aggregated errors.

    Args:
        errors: List of ConfigValidationError instances.

    Attributes:
        errors: List of validation errors that caused failure.
    """

    def __init__(self, errors: list[ConfigValidationError]):
        """Initialize with validation errors."""
        self.errors = errors
        error_messages = [
            f"{error.severity.upper()}: {error.message}" for error in errors
        ]
        super().__init__(
            f"Configuration validation failed: {'; '.join(error_messages)}"
        )

__init__

__init__(errors)

Initialize with validation errors.

Source code in safeuploads/exceptions.py
41
42
43
44
45
46
47
48
49
def __init__(self, errors: list[ConfigValidationError]):
    """Initialize with validation errors."""
    self.errors = errors
    error_messages = [
        f"{error.severity.upper()}: {error.message}" for error in errors
    ]
    super().__init__(
        f"Configuration validation failed: {'; '.join(error_messages)}"
    )

FileSecurityError

Bases: Exception

Base exception for all file security validation failures.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
error_code str | None

Optional machine-readable error code.

None

Attributes:

Name Type Description
message

Human-readable error message.

error_code

Machine-readable error code from ErrorCode.

Source code in safeuploads/exceptions.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class FileSecurityError(Exception):
    """
    Base exception for all file security validation failures.

    Args:
        message: Human-readable error description.
        error_code: Optional machine-readable error code.

    Attributes:
        message: Human-readable error message.
        error_code: Machine-readable error code from ErrorCode.
    """

    def __init__(self, message: str, error_code: str | None = None):
        """Initialize with message and error code."""
        self.message = message
        self.error_code = error_code
        super().__init__(message)

__init__

__init__(message, error_code=None)

Initialize with message and error code.

Source code in safeuploads/exceptions.py
146
147
148
149
150
def __init__(self, message: str, error_code: str | None = None):
    """Initialize with message and error code."""
    self.message = message
    self.error_code = error_code
    super().__init__(message)

FileSignatureError

Bases: FileValidationError

File header signature invalid or mismatched.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename with signature issue.

None
expected_type str | None

Optional expected file type based on extension.

None

Attributes:

Name Type Description
expected_type

The expected file type based on extension.

Source code in safeuploads/exceptions.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class FileSignatureError(FileValidationError):
    """
    File header signature invalid or mismatched.

    Args:
        message: Human-readable error description.
        filename: Optional filename with signature issue.
        expected_type: Optional expected file type based on extension.

    Attributes:
        expected_type: The expected file type based on extension.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        expected_type: str | None = None,
    ):
        """Initialize with expected type."""
        self.expected_type = expected_type
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.FILE_SIGNATURE_MISMATCH,
        )

__init__

__init__(message, filename=None, expected_type=None)

Initialize with expected type.

Source code in safeuploads/exceptions.py
362
363
364
365
366
367
368
369
370
371
372
373
374
def __init__(
    self,
    message: str,
    filename: str | None = None,
    expected_type: str | None = None,
):
    """Initialize with expected type."""
    self.expected_type = expected_type
    super().__init__(
        message,
        filename=filename,
        error_code=ErrorCode.FILE_SIGNATURE_MISMATCH,
    )

FileSizeError

Bases: FileValidationError

File exceeds configured size limits.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename that exceeded size limits.

None
size int | None

Optional actual file size in bytes.

None
max_size int | None

Optional maximum allowed size in bytes.

None

Attributes:

Name Type Description
size

The actual file size in bytes.

max_size

The maximum allowed size in bytes.

Source code in safeuploads/exceptions.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class FileSizeError(FileValidationError):
    """
    File exceeds configured size limits.

    Args:
        message: Human-readable error description.
        filename: Optional filename that exceeded size limits.
        size: Optional actual file size in bytes.
        max_size: Optional maximum allowed size in bytes.

    Attributes:
        size: The actual file size in bytes.
        max_size: The maximum allowed size in bytes.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        size: int | None = None,
        max_size: int | None = None,
    ):
        """Initialize with size details."""
        self.size = size
        self.max_size = max_size
        super().__init__(
            message, filename=filename, error_code=ErrorCode.FILE_TOO_LARGE
        )

__init__

__init__(message, filename=None, size=None, max_size=None)

Initialize with size details.

Source code in safeuploads/exceptions.py
299
300
301
302
303
304
305
306
307
308
309
310
311
def __init__(
    self,
    message: str,
    filename: str | None = None,
    size: int | None = None,
    max_size: int | None = None,
):
    """Initialize with size details."""
    self.size = size
    self.max_size = max_size
    super().__init__(
        message, filename=filename, error_code=ErrorCode.FILE_TOO_LARGE
    )

FileValidationError

Bases: FileSecurityError

File validation failed.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional name of the file that failed validation.

None
error_code str | None

Optional machine-readable error code.

None

Attributes:

Name Type Description
filename

Name of the file that failed validation.

Source code in safeuploads/exceptions.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class FileValidationError(FileSecurityError):
    """
    File validation failed.

    Args:
        message: Human-readable error description.
        filename: Optional name of the file that failed validation.
        error_code: Optional machine-readable error code.

    Attributes:
        filename: Name of the file that failed validation.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        error_code: str | None = None,
    ):
        """Initialize with message and filename."""
        self.filename = filename
        super().__init__(message, error_code)

__init__

__init__(message, filename=None, error_code=None)

Initialize with message and filename.

Source code in safeuploads/exceptions.py
171
172
173
174
175
176
177
178
179
def __init__(
    self,
    message: str,
    filename: str | None = None,
    error_code: str | None = None,
):
    """Initialize with message and filename."""
    self.filename = filename
    super().__init__(message, error_code)

FileValidator

Coordinated security validation for uploaded files.

Attributes:

Name Type Description
config

Active security configuration.

unicode_validator

Validator for Unicode-related checks.

extension_validator

Validator for file extension rules.

windows_validator

Validator enforcing Windows-specific constraints.

compression_validator

Validator handling compressed file limits.

zip_inspector

Inspector for ZIP archive contents.

magic_mime

MIME type detector based on python-magic.

magic_available

Whether python-magic was successfully initialized.

Source code in safeuploads/file_validator.py
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
class FileValidator:
    """
    Coordinated security validation for uploaded files.

    Attributes:
        config: Active security configuration.
        unicode_validator: Validator for Unicode-related checks.
        extension_validator: Validator for file extension rules.
        windows_validator: Validator enforcing Windows-specific constraints.
        compression_validator: Validator handling compressed file limits.
        zip_inspector: Inspector for ZIP archive contents.
        magic_mime: MIME type detector based on python-magic.
        magic_available: Whether python-magic was successfully initialized.
    """

    def __init__(self, config: FileSecurityConfig | None = None):
        """
        Initialize file validator with configuration and detection utilities.

        Args:
            config: Optional configuration object defining file security
                rules. Defaults to new FileSecurityConfig instance.

        Attributes:
            config: Active security configuration.
            unicode_validator: Validator for Unicode-related checks.
            extension_validator: Validator for file extension rules.
            windows_validator: Validator enforcing Windows constraints.
            compression_validator: Validator for compressed file limits.
            zip_inspector: Inspector for ZIP archive contents.
            magic_mime: MIME type detector based on python-magic.
            magic_available: Whether python-magic initialized successfully.
        """
        self.config = config or FileSecurityConfig()

        # Initialize specialized validators
        self.unicode_validator = UnicodeSecurityValidator(self.config)
        self.extension_validator = ExtensionSecurityValidator(self.config)
        self.windows_validator = WindowsSecurityValidator(self.config)
        self.compression_validator = CompressionSecurityValidator(self.config)
        self.zip_inspector = ZipContentInspector(self.config)
        self.xml_validator = XmlSecurityValidator(self.config)
        self.gzip_inspector = GzipContentInspector(self.config)
        self.content_inspector = ContentSecurityInspector(self.config)

        # Initialize audit logger
        self._audit = SecurityAuditLogger(
            enabled=self.config.limits.enable_audit_logging
        )

        # Initialize python-magic for content-based detection
        try:
            self.magic_mime = magic.Magic(mime=True)
            self.magic_available = True
            logger.debug("File content detection (python-magic) initialized")
        except Exception as err:
            self.magic_available = False
            logger.warning(
                "python-magic not available for content detection: %s",
                err,
            )

    def _detect_mime_type(self, file_content: bytes, filename: str) -> str:
        """
        Determine MIME type for file content.

        Args:
            file_content: Raw bytes of the file to inspect.
            filename: Original filename for fallback MIME detection.

        Returns:
            Detected MIME type or "application/octet-stream" if detection
            fails.
        """
        detected_mime = None

        # Content-based detection using python-magic (most reliable)
        if self.magic_available:
            try:
                detected_mime = self.magic_mime.from_buffer(file_content)
            except Exception as err:
                logger.warning("Magic MIME detection failed: %s", err)

        # Fallback to filename-based detection
        if not detected_mime:
            logger.info("Fallback to filename-based MIME detection")
            detected_mime = self._guess_mime_by_name(filename)

        return detected_mime or "application/octet-stream"

    @staticmethod
    @functools.lru_cache(maxsize=64)
    def _guess_mime_by_name(filename: str) -> str | None:
        """
        Guess MIME type from filename extension with caching.

        Uses only the file extension as input to
        ``mimetypes.guess_type`` to keep the cache keyspace
        small and prevent attacker-controlled filenames from
        bloating the LRU cache.

        Args:
            filename: Filename to guess MIME type for.

        Returns:
            Guessed MIME type or None.
        """
        ext = os.path.splitext(filename)[1].lower()
        if not ext:
            return None
        mime, _ = mimetypes.guess_type(f"file{ext}")
        return mime

    def _validate_file_signature(
        self, file_content: bytes, expected_type: str
    ) -> None:
        """
        Verify file content begins with known signature for expected type.

        Args:
            file_content: Raw bytes of the uploaded file.
            expected_type: Logical file category ("image" or "zip").

        Raises:
            FileSignatureError: File header doesn't match expected type
                signatures.
        """
        if len(file_content) < 4:
            raise FileSignatureError(
                f"File too small to verify {expected_type} signature",
                expected_type=expected_type,
            )

        # Common file signatures
        signatures = {
            "image": [
                b"\xff\xd8\xff",  # JPEG
                b"\xff\xd8\xff\xe1",  # JPEG EXIF (additional JPEG variant)
                b"\x89PNG\r\n\x1a\n",  # PNG
            ],
            "zip": [
                b"PK\x03\x04",  # ZIP file
                b"PK\x05\x06",  # Empty ZIP
                b"PK\x07\x08",  # ZIP with spanning
            ],
            "gzip": [
                b"\x1f\x8b",  # gzip magic number
            ],
            "activity": [
                b"<?xml",  # XML header (GPX/TCX)
                b"\xef\xbb\xbf<?xml",  # XML with BOM
            ],
            "fit": [
                # FIT header: size byte, protocol, profile,
                # data size (4 bytes), then ".FIT" at byte 8
            ],
        }

        expected_signatures = signatures.get(expected_type, [])

        for signature in expected_signatures:
            if file_content.startswith(signature):
                return  # Signature matched

        # FIT files: ".FIT" at bytes 8-11
        if (
            expected_type == "fit"
            and len(file_content) >= 12
            and file_content[8:12] == b".FIT"
        ):
            return

        # No matching signature found
        raise FileSignatureError(
            f"File content does not match expected {expected_type} format",
            expected_type=expected_type,
        )

    def _sanitize_filename(self, filename: str) -> str:
        """
        Sanitize user-provided filename to prevent security risks.

        Args:
            filename: Original filename supplied by the user.

        Returns:
            Sanitized filename safe for storage and processing.

        Raises:
            UnicodeSecurityError: Filename contains dangerous Unicode
                characters or fails normalization checks.
            WindowsReservedNameError: Filename uses Windows reserved
                device names.
            ExtensionSecurityError: Filename contains blocked or
                dangerous file extensions.
            ValueError: Filename is empty string.
        """
        if not filename:
            raise ValueError("Filename cannot be empty")

        # Unicode security validation (must be first)
        # This detects and blocks Unicode-based attacks
        # before any other processing
        filename = self.unicode_validator.validate_unicode_security(filename)

        # Remove path components to prevent directory traversal
        filename = os.path.basename(filename)

        # Remove null bytes and control characters
        filename = "".join(
            char for char in filename if ord(char) >= 32 and char != "\x7f"
        )

        # Remove dangerous characters that could be used
        # for path traversal or command injection
        dangerous_chars = '<>:"/\\|?*\x00'
        for char in dangerous_chars:
            filename = filename.replace(char, "_")

        # Check for Windows reserved names before any other processing
        # This must be done early to prevent reserved names from being created
        self.windows_validator.validate_windows_reserved_names(filename)

        # Handle compound and double extensions security risk
        # This also checks all dangerous extensions
        self.extension_validator.validate_extensions(filename)

        # Limit filename length (preserve extension)
        name_part, ext_part = os.path.splitext(filename)
        if len(name_part) > 100:
            name_part = name_part[:100]
            filename = name_part + ext_part

        # Ensure we don't end up with just an extension or empty name
        if not name_part or name_part.strip() == "":
            filename = f"file_{int(time.time())}{ext_part}"

        # Final check: ensure the sanitized filename
        # doesn't become a reserved name
        self.windows_validator.validate_windows_reserved_names(filename)

        logger.debug(
            "Filename sanitized: original='%s' -> sanitized='%s'",
            os.path.basename(filename if filename else "None"),
            filename,
        )

        return filename

    def _validate_filename(self, file: UploadFile) -> None:
        """
        Validate filename of uploaded file and sanitize it in place.

        Args:
            file: Uploaded file whose filename should be validated and
                sanitized.

        Raises:
            FilenameSecurityError: Filename is empty, invalid, or fails
                sanitization.
            FileProcessingError: Unexpected error during filename
                validation.
        """
        # Check filename
        if not file.filename:
            raise FilenameSecurityError(
                "Filename is required",
                error_code=ErrorCode.FILENAME_EMPTY,
            )

        # Sanitize the filename to prevent security issues
        try:
            sanitized_filename = self._sanitize_filename(file.filename)

            # Update the file object with sanitized filename
            file.filename = sanitized_filename

            # Additional validation after sanitization
            if not sanitized_filename or sanitized_filename.strip() == "":
                raise FilenameSecurityError(
                    "Invalid filename after sanitization",
                    filename=file.filename,
                    error_code=ErrorCode.FILENAME_INVALID,
                )
        except FileValidationError:
            # Let FileValidationError and subclasses propagate
            raise
        except Exception as err:
            logger.exception(
                "Unexpected error during filename validation: %s", err
            )
            raise FileProcessingError(
                "Filename validation failed due to internal error",
                original_error=err,
            ) from err

    def _validate_file_extension(
        self, file: UploadFile, allowed_extensions: set[str]
    ) -> None:
        """
        Validate extension of uploaded file against allowed and blocked lists.

        Args:
            file: File whose extension will be validated.
            allowed_extensions: Set of allowed file extensions.

        Raises:
            FilenameSecurityError: Filename is missing.
            ExtensionSecurityError: Extension is not allowed or is blocked.
        """
        # Check file extension
        if not file.filename:
            raise FilenameSecurityError(
                "Filename is required for extension validation",
                error_code=ErrorCode.FILENAME_EMPTY,
            )

        _, ext = os.path.splitext(file.filename.lower())
        if ext not in allowed_extensions:
            raise ExtensionSecurityError(
                (
                    "Invalid file extension."
                    " Allowed:"
                    f" {', '.join(allowed_extensions)}"
                ),
                filename=file.filename,
                extension=ext,
                error_code=ErrorCode.EXTENSION_NOT_ALLOWED,
            )

        # Check for blocked extensions
        if ext in self.config.BLOCKED_EXTENSIONS:
            raise ExtensionSecurityError(
                f"File extension {ext} is blocked for security reasons",
                filename=file.filename,
                extension=ext,
                error_code=ErrorCode.EXTENSION_BLOCKED,
            )

    async def _validate_file_size(
        self, file: UploadFile, max_file_size: int
    ) -> tuple[bytes, int]:
        """
        Validate uploaded file size by sampling content.

        Determine total bytes from the uploaded file.

        Args:
            file: Uploaded file supporting asynchronous read and seek.
            max_file_size: Maximum allowed file size in bytes.

        Returns:
            Tuple containing first 8 KB of file content and detected file
            size in bytes.

        Raises:
            FileSizeError: File size exceeds maximum or file is empty.
        """
        # Read first chunk for content analysis
        file_content = await file.read(8192)  # Read first 8KB

        # Reset file position
        await file.seek(0)

        # Check file size
        file_size = len(file_content)
        if hasattr(file, "size") and file.size:
            file_size = file.size
        else:
            # Determine size via chunked reads to prevent
            # memory exhaustion when Content-Length is absent
            chunk_size = self.config.limits.chunk_size
            file_size = 0
            while True:
                chunk = await file.read(chunk_size)
                if not chunk:
                    break
                file_size += len(chunk)
                if file_size > max_file_size:
                    await file.seek(0)
                    raise FileSizeError(
                        f"File too large. "
                        f"Maximum: "
                        f"{max_file_size // (1024 * 1024)}MB",
                        size=file_size,
                        max_size=max_file_size,
                    )
            await file.seek(0)

        if file_size > max_file_size:
            raise FileSizeError(
                (
                    f"File too large. File size:"
                    f" {file_size // (1024 * 1024)}MB,"
                    f" maximum:"
                    f" {max_file_size // (1024 * 1024)}MB"
                ),
                size=file_size,
                max_size=max_file_size,
            )

        if file_size == 0:
            raise FileSizeError(
                "Empty file not allowed",
                size=0,
                max_size=max_file_size,
            )

        return file_content, file_size

    async def _stream_to_temp_file(
        self, file: UploadFile, max_file_size: int
    ) -> tuple[tempfile.SpooledTemporaryFile, int]:
        """
        Stream uploaded file to a SpooledTemporaryFile with size validation.

        Reads the upload in chunks to avoid loading the entire file
        into memory. The SpooledTemporaryFile stays in memory for
        files smaller than max_memory_buffer_size and spills to
        disk for larger files.

        Args:
            file: Uploaded file supporting asynchronous read/seek.
            max_file_size: Maximum allowed file size in bytes.

        Returns:
            Tuple of SpooledTemporaryFile positioned at start and
            total bytes written.

        Raises:
            FileSizeError: File exceeds maximum or is empty.
        """
        temp = tempfile.SpooledTemporaryFile(  # noqa: SIM115
            max_size=self.config.limits.max_memory_buffer_size
        )
        total_bytes = 0
        chunk_size = self.config.limits.chunk_size

        await file.seek(0)

        try:
            while True:
                chunk = await file.read(chunk_size)
                if not chunk:
                    break
                total_bytes += len(chunk)
                if total_bytes > max_file_size:
                    temp.close()
                    raise FileSizeError(
                        f"File too large. "
                        f"Maximum: "
                        f"{max_file_size // (1024 * 1024)}MB",
                        size=total_bytes,
                        max_size=max_file_size,
                    )
                temp.write(chunk)

            if total_bytes == 0:
                temp.close()
                raise FileSizeError(
                    "Empty file not allowed",
                    size=0,
                    max_size=max_file_size,
                )

            temp.seek(0)
            return temp, total_bytes
        except FileSizeError:
            raise
        except Exception:
            temp.close()
            raise

    async def validate_image_file(self, file: UploadFile) -> None:
        """
        Validate uploaded image by checking filename.

        Check extension, size, MIME type, and signature.

        Args:
            file: Uploaded file to validate.

        Raises:
            FilenameSecurityError: Filename is empty, invalid, or fails
                security checks.
            ExtensionSecurityError: File extension is not allowed or is
                blocked.
            FileSizeError: File size exceeds maximum or file is empty.
            MimeTypeError: MIME type is not in allowed image types.
            FileSignatureError: File signature doesn't match expected image
                format.
            FileProcessingError: Unexpected error during validation.
        """
        cid = set_correlation_id()
        filename = file.filename or "unknown"
        self._audit.start(filename, cid)
        t0 = time.monotonic()
        try:
            # Validate filename (raises exceptions on failure)
            self._validate_filename(file)

            # Validate file extension (raises exceptions on failure)
            self._validate_file_extension(
                file, self.config.ALLOWED_IMAGE_EXTENSIONS
            )

            with ResourceMonitor(
                max_time_seconds=(
                    self.config.limits.max_validation_time_seconds
                ),
                max_memory_mb=(self.config.limits.max_validation_memory_mb),
            ):
                # Validate file size (raises on failure,
                # returns content and size on success)
                file_content, file_size = await self._validate_file_size(
                    file, self.config.limits.max_image_size
                )

                # Detect MIME type
                filename = file.filename or "unknown"
                detected_mime = self._detect_mime_type(file_content, filename)

                if detected_mime not in self.config.ALLOWED_IMAGE_MIMES:
                    raise MimeTypeError(
                        (
                            "Invalid file type."
                            f" Detected: {detected_mime}."
                            " Allowed:"
                            f" {', '.join(self.config.ALLOWED_IMAGE_MIMES)}"
                        ),
                        filename=filename,
                        detected_mime=detected_mime,
                        allowed_mimes=list(self.config.ALLOWED_IMAGE_MIMES),
                    )

                # Validate file signature (raises exceptions on failure)
                self._validate_file_signature(file_content, "image")

                # Optional content analysis
                if self.config.limits.enable_content_analysis:
                    scan_size = self.config.limits.content_scan_max_size
                    sample = file_content[:scan_size]
                    threats = self.content_inspector.scan_content(
                        sample,
                        filename,
                        "image",
                    )
                    if threats:
                        raise FileProcessingError(
                            "Content analysis threats"
                            " detected:"
                            f" {'; '.join(threats)}"
                        )

                logger.debug(
                    "Image file validation passed: %s (%s, %s bytes)",
                    filename,
                    detected_mime,
                    file_size,
                )
            ms = (time.monotonic() - t0) * 1000
            self._audit.success(filename, cid, ms)
        except (
            FileValidationError,
            ResourceLimitError,
            FileProcessingError,
        ) as exc:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                str(exc),
            )
            raise
        except Exception as err:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                "internal_error",
            )
            logger.exception("Error during image file validation: %s", err)
            raise FileProcessingError(
                "File validation failed due to internal error",
                original_error=err,
            ) from err
        finally:
            reset_correlation_id()

    async def validate_zip_file(self, file: UploadFile) -> None:
        """
        Validate uploaded ZIP archive against service configuration.

        Args:
            file: Incoming ZIP file-like object to validate.

        Raises:
            FilenameSecurityError: Filename is empty, invalid, or fails
                security checks.
            ExtensionSecurityError: File extension is not allowed or is
                blocked.
            FileSizeError: File size exceeds maximum or file is empty.
            MimeTypeError: MIME type is not in allowed ZIP types.
            FileSignatureError: File signature doesn't match expected ZIP
                format.
            CompressionSecurityError: ZIP compression validation failed
                (zip bomb detected).
            FileProcessingError: Unexpected error during validation.
        """
        cid = set_correlation_id()
        filename = file.filename or "unknown"
        self._audit.start(filename, cid)
        t0 = time.monotonic()
        try:
            # Validate filename (raises exceptions on failure)
            self._validate_filename(file)

            # Validate file extension (raises exceptions on failure)
            self._validate_file_extension(
                file, self.config.ALLOWED_ZIP_EXTENSIONS
            )

            with ResourceMonitor(
                max_time_seconds=(
                    self.config.limits.max_validation_time_seconds
                ),
                max_memory_mb=(self.config.limits.max_validation_memory_mb),
            ):
                # Stream file to SpooledTemporaryFile with size validation
                temp_file, file_size = await self._stream_to_temp_file(
                    file, self.config.limits.max_zip_size
                )

                try:
                    # Read header for MIME/signature checks
                    header = temp_file.read(8192)
                    temp_file.seek(0)

                    # Detect MIME type using header bytes
                    filename = file.filename or "unknown"
                    detected_mime = self._detect_mime_type(header, filename)

                    # Validate ZIP file signature first
                    try:
                        self._validate_file_signature(header, "zip")
                    except FileSignatureError as err:
                        raise FileSignatureError(
                            "File content does not match ZIP format",
                            filename=filename,
                            expected_type="zip",
                        ) from err

                    # Check MIME type, allow octet-stream if signature valid
                    if detected_mime not in self.config.ALLOWED_ZIP_MIMES:
                        if detected_mime == "application/octet-stream":
                            logger.debug(
                                "ZIP file detected as "
                                "application/octet-stream, "
                                "but signature is valid: %s",
                                filename,
                            )
                        else:
                            raise MimeTypeError(
                                f"Invalid file type. "
                                f"Detected: {detected_mime}. "
                                f"Expected ZIP file.",
                                filename=filename,
                                detected_mime=detected_mime,
                                allowed_mimes=list(
                                    self.config.ALLOWED_ZIP_MIMES
                                ),
                            )

                    # Validate ZIP compression ratio
                    self.compression_validator.validate_zip_compression_ratio(
                        temp_file, file_size
                    )

                    # Perform ZIP content inspection if enabled
                    if self.config.limits.scan_zip_content:
                        temp_file.seek(0)
                        self.zip_inspector.inspect_zip_content(temp_file)

                    # Optional content analysis
                    if self.config.limits.enable_content_analysis:
                        temp_file.seek(0)
                        scan_size = self.config.limits.content_scan_max_size
                        sample = temp_file.read(scan_size)
                        temp_file.seek(0)
                        threats = self.content_inspector.scan_content(
                            sample,
                            filename,
                            "zip",
                        )
                        if threats:
                            raise FileProcessingError(
                                "Content analysis"
                                " threats detected:"
                                f" {'; '.join(threats)}"
                            )

                    logger.debug(
                        "ZIP file validation passed: %s (%s, %s bytes)",
                        filename,
                        detected_mime,
                        file_size,
                    )
                finally:
                    temp_file.close()
            ms = (time.monotonic() - t0) * 1000
            self._audit.success(filename, cid, ms)
        except (
            FileValidationError,
            ResourceLimitError,
            FileProcessingError,
        ) as exc:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                str(exc),
            )
            raise
        except Exception as err:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                "internal_error",
            )
            logger.exception("Error during ZIP file validation: %s", err)
            raise FileProcessingError(
                "File validation failed due to internal error",
                original_error=err,
            ) from err
        finally:
            reset_correlation_id()

    async def validate_activity_file(self, file: UploadFile) -> None:
        """
        Validate uploaded activity file (GPX, TCX, FIT).

        For XML-based formats (GPX/TCX) performs XXE-safe
        parsing via ``defusedxml``. For FIT files validates
        the binary signature.

        Args:
            file: Uploaded activity file to validate.

        Raises:
            FilenameSecurityError: Filename fails security.
            ExtensionSecurityError: Extension not allowed.
            FileSizeError: File exceeds size limit or empty.
            MimeTypeError: MIME type not allowed.
            FileSignatureError: Signature mismatch.
            FileProcessingError: XML parsing or other error.
        """
        cid = set_correlation_id()
        filename = file.filename or "unknown"
        self._audit.start(filename, cid)
        t0 = time.monotonic()
        try:
            self._validate_filename(file)
            self._validate_file_extension(
                file,
                self.config.ALLOWED_ACTIVITY_EXTENSIONS,
            )

            with ResourceMonitor(
                max_time_seconds=(
                    self.config.limits.max_validation_time_seconds
                ),
                max_memory_mb=(self.config.limits.max_validation_memory_mb),
            ):
                temp_file, file_size = await self._stream_to_temp_file(
                    file,
                    self.config.limits.max_activity_file_size,
                )

                try:
                    header = temp_file.read(8192)
                    temp_file.seek(0)

                    filename = file.filename or "unknown"
                    detected_mime = self._detect_mime_type(header, filename)

                    _, ext = os.path.splitext(filename.lower())
                    is_fit = ext == ".fit"

                    # Signature check
                    sig_type = "fit" if is_fit else "activity"
                    try:
                        self._validate_file_signature(header, sig_type)
                    except FileSignatureError as err:
                        raise FileSignatureError(
                            "File content does not match"
                            f" expected {sig_type} format",
                            filename=filename,
                            expected_type=sig_type,
                        ) from err

                    # MIME check — be lenient for FIT
                    if not is_fit:
                        allowed = self.config.ALLOWED_ACTIVITY_MIMES
                        if detected_mime not in allowed:
                            raise MimeTypeError(
                                "Invalid file type."
                                f" Detected: {detected_mime}."
                                " Expected activity file.",
                                filename=filename,
                                detected_mime=detected_mime,
                                allowed_mimes=list(allowed),
                            )

                    # XXE-safe XML validation for GPX/TCX
                    if not is_fit:
                        self.xml_validator.validate_xml_safety(temp_file)

                    logger.debug(
                        "Activity file validation passed: %s (%s, %s bytes)",
                        filename,
                        detected_mime,
                        file_size,
                    )
                finally:
                    temp_file.close()
            ms = (time.monotonic() - t0) * 1000
            self._audit.success(filename, cid, ms)
        except (FileValidationError, ResourceLimitError) as exc:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                str(exc),
            )
            raise
        except Exception as err:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                "internal_error",
            )
            logger.exception(
                "Error during activity file validation: %s",
                err,
            )
            raise FileProcessingError(
                "File validation failed due to internal error",
                original_error=err,
            ) from err
        finally:
            reset_correlation_id()

    async def validate_gzip_file(self, file: UploadFile) -> None:
        """
        Validate uploaded gzip archive.

        Checks filename, extension, size, MIME type,
        signature, and performs decompression bomb detection.

        Args:
            file: Uploaded gzip file to validate.

        Raises:
            FilenameSecurityError: Filename fails security.
            ExtensionSecurityError: Extension not allowed.
            FileSizeError: File exceeds size limit or empty.
            MimeTypeError: MIME type not allowed.
            FileSignatureError: Signature mismatch.
            ZipBombError: Decompression bomb detected.
            CompressionSecurityError: Invalid gzip structure.
            FileProcessingError: Unexpected error.
        """
        cid = set_correlation_id()
        filename = file.filename or "unknown"
        self._audit.start(filename, cid)
        t0 = time.monotonic()
        try:
            self._validate_filename(file)
            self._validate_file_extension(
                file,
                self.config.ALLOWED_GZIP_EXTENSIONS,
            )

            with ResourceMonitor(
                max_time_seconds=(
                    self.config.limits.max_validation_time_seconds
                ),
                max_memory_mb=(self.config.limits.max_validation_memory_mb),
            ):
                temp_file, file_size = await self._stream_to_temp_file(
                    file,
                    self.config.limits.max_gzip_size,
                )

                try:
                    header = temp_file.read(8192)
                    temp_file.seek(0)

                    filename = file.filename or "unknown"
                    detected_mime = self._detect_mime_type(header, filename)

                    # Signature check
                    try:
                        self._validate_file_signature(header, "gzip")
                    except FileSignatureError as err:
                        raise FileSignatureError(
                            "File content does not match gzip format",
                            filename=filename,
                            expected_type="gzip",
                        ) from err

                    # MIME check — allow octet-stream
                    allowed = self.config.ALLOWED_GZIP_MIMES
                    if (
                        detected_mime not in allowed
                        and detected_mime != "application/octet-stream"
                    ):
                        raise MimeTypeError(
                            "Invalid file type."
                            f" Detected:"
                            f" {detected_mime}."
                            " Expected gzip file.",
                            filename=filename,
                            detected_mime=detected_mime,
                            allowed_mimes=list(allowed),
                        )

                    # Decompression bomb check
                    self.gzip_inspector.inspect_gzip_content(
                        temp_file, file_size
                    )

                    logger.debug(
                        "Gzip file validation passed: %s (%s, %s bytes)",
                        filename,
                        detected_mime,
                        file_size,
                    )
                finally:
                    temp_file.close()
            ms = (time.monotonic() - t0) * 1000
            self._audit.success(filename, cid, ms)
        except (FileValidationError, ResourceLimitError) as exc:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                str(exc),
            )
            raise
        except Exception as err:
            ms = (time.monotonic() - t0) * 1000
            self._audit.failure(
                filename,
                cid,
                ms,
                "internal_error",
            )
            logger.exception(
                "Error during gzip file validation: %s",
                err,
            )
            raise FileProcessingError(
                "File validation failed due to internal error",
                original_error=err,
            ) from err
        finally:
            reset_correlation_id()

__init__

__init__(config=None)

Initialize file validator with configuration and detection utilities.

Parameters:

Name Type Description Default
config FileSecurityConfig | None

Optional configuration object defining file security rules. Defaults to new FileSecurityConfig instance.

None

Attributes:

Name Type Description
config

Active security configuration.

unicode_validator

Validator for Unicode-related checks.

extension_validator

Validator for file extension rules.

windows_validator

Validator enforcing Windows constraints.

compression_validator

Validator for compressed file limits.

zip_inspector

Inspector for ZIP archive contents.

magic_mime

MIME type detector based on python-magic.

magic_available

Whether python-magic initialized successfully.

Source code in safeuploads/file_validator.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(self, config: FileSecurityConfig | None = None):
    """
    Initialize file validator with configuration and detection utilities.

    Args:
        config: Optional configuration object defining file security
            rules. Defaults to new FileSecurityConfig instance.

    Attributes:
        config: Active security configuration.
        unicode_validator: Validator for Unicode-related checks.
        extension_validator: Validator for file extension rules.
        windows_validator: Validator enforcing Windows constraints.
        compression_validator: Validator for compressed file limits.
        zip_inspector: Inspector for ZIP archive contents.
        magic_mime: MIME type detector based on python-magic.
        magic_available: Whether python-magic initialized successfully.
    """
    self.config = config or FileSecurityConfig()

    # Initialize specialized validators
    self.unicode_validator = UnicodeSecurityValidator(self.config)
    self.extension_validator = ExtensionSecurityValidator(self.config)
    self.windows_validator = WindowsSecurityValidator(self.config)
    self.compression_validator = CompressionSecurityValidator(self.config)
    self.zip_inspector = ZipContentInspector(self.config)
    self.xml_validator = XmlSecurityValidator(self.config)
    self.gzip_inspector = GzipContentInspector(self.config)
    self.content_inspector = ContentSecurityInspector(self.config)

    # Initialize audit logger
    self._audit = SecurityAuditLogger(
        enabled=self.config.limits.enable_audit_logging
    )

    # Initialize python-magic for content-based detection
    try:
        self.magic_mime = magic.Magic(mime=True)
        self.magic_available = True
        logger.debug("File content detection (python-magic) initialized")
    except Exception as err:
        self.magic_available = False
        logger.warning(
            "python-magic not available for content detection: %s",
            err,
        )

validate_activity_file async

validate_activity_file(file)

Validate uploaded activity file (GPX, TCX, FIT).

For XML-based formats (GPX/TCX) performs XXE-safe parsing via defusedxml. For FIT files validates the binary signature.

Parameters:

Name Type Description Default
file UploadFileProtocol

Uploaded activity file to validate.

required

Raises:

Type Description
FilenameSecurityError

Filename fails security.

ExtensionSecurityError

Extension not allowed.

FileSizeError

File exceeds size limit or empty.

MimeTypeError

MIME type not allowed.

FileSignatureError

Signature mismatch.

FileProcessingError

XML parsing or other error.

Source code in safeuploads/file_validator.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
async def validate_activity_file(self, file: UploadFile) -> None:
    """
    Validate uploaded activity file (GPX, TCX, FIT).

    For XML-based formats (GPX/TCX) performs XXE-safe
    parsing via ``defusedxml``. For FIT files validates
    the binary signature.

    Args:
        file: Uploaded activity file to validate.

    Raises:
        FilenameSecurityError: Filename fails security.
        ExtensionSecurityError: Extension not allowed.
        FileSizeError: File exceeds size limit or empty.
        MimeTypeError: MIME type not allowed.
        FileSignatureError: Signature mismatch.
        FileProcessingError: XML parsing or other error.
    """
    cid = set_correlation_id()
    filename = file.filename or "unknown"
    self._audit.start(filename, cid)
    t0 = time.monotonic()
    try:
        self._validate_filename(file)
        self._validate_file_extension(
            file,
            self.config.ALLOWED_ACTIVITY_EXTENSIONS,
        )

        with ResourceMonitor(
            max_time_seconds=(
                self.config.limits.max_validation_time_seconds
            ),
            max_memory_mb=(self.config.limits.max_validation_memory_mb),
        ):
            temp_file, file_size = await self._stream_to_temp_file(
                file,
                self.config.limits.max_activity_file_size,
            )

            try:
                header = temp_file.read(8192)
                temp_file.seek(0)

                filename = file.filename or "unknown"
                detected_mime = self._detect_mime_type(header, filename)

                _, ext = os.path.splitext(filename.lower())
                is_fit = ext == ".fit"

                # Signature check
                sig_type = "fit" if is_fit else "activity"
                try:
                    self._validate_file_signature(header, sig_type)
                except FileSignatureError as err:
                    raise FileSignatureError(
                        "File content does not match"
                        f" expected {sig_type} format",
                        filename=filename,
                        expected_type=sig_type,
                    ) from err

                # MIME check — be lenient for FIT
                if not is_fit:
                    allowed = self.config.ALLOWED_ACTIVITY_MIMES
                    if detected_mime not in allowed:
                        raise MimeTypeError(
                            "Invalid file type."
                            f" Detected: {detected_mime}."
                            " Expected activity file.",
                            filename=filename,
                            detected_mime=detected_mime,
                            allowed_mimes=list(allowed),
                        )

                # XXE-safe XML validation for GPX/TCX
                if not is_fit:
                    self.xml_validator.validate_xml_safety(temp_file)

                logger.debug(
                    "Activity file validation passed: %s (%s, %s bytes)",
                    filename,
                    detected_mime,
                    file_size,
                )
            finally:
                temp_file.close()
        ms = (time.monotonic() - t0) * 1000
        self._audit.success(filename, cid, ms)
    except (FileValidationError, ResourceLimitError) as exc:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            str(exc),
        )
        raise
    except Exception as err:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            "internal_error",
        )
        logger.exception(
            "Error during activity file validation: %s",
            err,
        )
        raise FileProcessingError(
            "File validation failed due to internal error",
            original_error=err,
        ) from err
    finally:
        reset_correlation_id()

validate_gzip_file async

validate_gzip_file(file)

Validate uploaded gzip archive.

Checks filename, extension, size, MIME type, signature, and performs decompression bomb detection.

Parameters:

Name Type Description Default
file UploadFileProtocol

Uploaded gzip file to validate.

required

Raises:

Type Description
FilenameSecurityError

Filename fails security.

ExtensionSecurityError

Extension not allowed.

FileSizeError

File exceeds size limit or empty.

MimeTypeError

MIME type not allowed.

FileSignatureError

Signature mismatch.

ZipBombError

Decompression bomb detected.

CompressionSecurityError

Invalid gzip structure.

FileProcessingError

Unexpected error.

Source code in safeuploads/file_validator.py
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
async def validate_gzip_file(self, file: UploadFile) -> None:
    """
    Validate uploaded gzip archive.

    Checks filename, extension, size, MIME type,
    signature, and performs decompression bomb detection.

    Args:
        file: Uploaded gzip file to validate.

    Raises:
        FilenameSecurityError: Filename fails security.
        ExtensionSecurityError: Extension not allowed.
        FileSizeError: File exceeds size limit or empty.
        MimeTypeError: MIME type not allowed.
        FileSignatureError: Signature mismatch.
        ZipBombError: Decompression bomb detected.
        CompressionSecurityError: Invalid gzip structure.
        FileProcessingError: Unexpected error.
    """
    cid = set_correlation_id()
    filename = file.filename or "unknown"
    self._audit.start(filename, cid)
    t0 = time.monotonic()
    try:
        self._validate_filename(file)
        self._validate_file_extension(
            file,
            self.config.ALLOWED_GZIP_EXTENSIONS,
        )

        with ResourceMonitor(
            max_time_seconds=(
                self.config.limits.max_validation_time_seconds
            ),
            max_memory_mb=(self.config.limits.max_validation_memory_mb),
        ):
            temp_file, file_size = await self._stream_to_temp_file(
                file,
                self.config.limits.max_gzip_size,
            )

            try:
                header = temp_file.read(8192)
                temp_file.seek(0)

                filename = file.filename or "unknown"
                detected_mime = self._detect_mime_type(header, filename)

                # Signature check
                try:
                    self._validate_file_signature(header, "gzip")
                except FileSignatureError as err:
                    raise FileSignatureError(
                        "File content does not match gzip format",
                        filename=filename,
                        expected_type="gzip",
                    ) from err

                # MIME check — allow octet-stream
                allowed = self.config.ALLOWED_GZIP_MIMES
                if (
                    detected_mime not in allowed
                    and detected_mime != "application/octet-stream"
                ):
                    raise MimeTypeError(
                        "Invalid file type."
                        f" Detected:"
                        f" {detected_mime}."
                        " Expected gzip file.",
                        filename=filename,
                        detected_mime=detected_mime,
                        allowed_mimes=list(allowed),
                    )

                # Decompression bomb check
                self.gzip_inspector.inspect_gzip_content(
                    temp_file, file_size
                )

                logger.debug(
                    "Gzip file validation passed: %s (%s, %s bytes)",
                    filename,
                    detected_mime,
                    file_size,
                )
            finally:
                temp_file.close()
        ms = (time.monotonic() - t0) * 1000
        self._audit.success(filename, cid, ms)
    except (FileValidationError, ResourceLimitError) as exc:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            str(exc),
        )
        raise
    except Exception as err:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            "internal_error",
        )
        logger.exception(
            "Error during gzip file validation: %s",
            err,
        )
        raise FileProcessingError(
            "File validation failed due to internal error",
            original_error=err,
        ) from err
    finally:
        reset_correlation_id()

validate_image_file async

validate_image_file(file)

Validate uploaded image by checking filename.

Check extension, size, MIME type, and signature.

Parameters:

Name Type Description Default
file UploadFileProtocol

Uploaded file to validate.

required

Raises:

Type Description
FilenameSecurityError

Filename is empty, invalid, or fails security checks.

ExtensionSecurityError

File extension is not allowed or is blocked.

FileSizeError

File size exceeds maximum or file is empty.

MimeTypeError

MIME type is not in allowed image types.

FileSignatureError

File signature doesn't match expected image format.

FileProcessingError

Unexpected error during validation.

Source code in safeuploads/file_validator.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
async def validate_image_file(self, file: UploadFile) -> None:
    """
    Validate uploaded image by checking filename.

    Check extension, size, MIME type, and signature.

    Args:
        file: Uploaded file to validate.

    Raises:
        FilenameSecurityError: Filename is empty, invalid, or fails
            security checks.
        ExtensionSecurityError: File extension is not allowed or is
            blocked.
        FileSizeError: File size exceeds maximum or file is empty.
        MimeTypeError: MIME type is not in allowed image types.
        FileSignatureError: File signature doesn't match expected image
            format.
        FileProcessingError: Unexpected error during validation.
    """
    cid = set_correlation_id()
    filename = file.filename or "unknown"
    self._audit.start(filename, cid)
    t0 = time.monotonic()
    try:
        # Validate filename (raises exceptions on failure)
        self._validate_filename(file)

        # Validate file extension (raises exceptions on failure)
        self._validate_file_extension(
            file, self.config.ALLOWED_IMAGE_EXTENSIONS
        )

        with ResourceMonitor(
            max_time_seconds=(
                self.config.limits.max_validation_time_seconds
            ),
            max_memory_mb=(self.config.limits.max_validation_memory_mb),
        ):
            # Validate file size (raises on failure,
            # returns content and size on success)
            file_content, file_size = await self._validate_file_size(
                file, self.config.limits.max_image_size
            )

            # Detect MIME type
            filename = file.filename or "unknown"
            detected_mime = self._detect_mime_type(file_content, filename)

            if detected_mime not in self.config.ALLOWED_IMAGE_MIMES:
                raise MimeTypeError(
                    (
                        "Invalid file type."
                        f" Detected: {detected_mime}."
                        " Allowed:"
                        f" {', '.join(self.config.ALLOWED_IMAGE_MIMES)}"
                    ),
                    filename=filename,
                    detected_mime=detected_mime,
                    allowed_mimes=list(self.config.ALLOWED_IMAGE_MIMES),
                )

            # Validate file signature (raises exceptions on failure)
            self._validate_file_signature(file_content, "image")

            # Optional content analysis
            if self.config.limits.enable_content_analysis:
                scan_size = self.config.limits.content_scan_max_size
                sample = file_content[:scan_size]
                threats = self.content_inspector.scan_content(
                    sample,
                    filename,
                    "image",
                )
                if threats:
                    raise FileProcessingError(
                        "Content analysis threats"
                        " detected:"
                        f" {'; '.join(threats)}"
                    )

            logger.debug(
                "Image file validation passed: %s (%s, %s bytes)",
                filename,
                detected_mime,
                file_size,
            )
        ms = (time.monotonic() - t0) * 1000
        self._audit.success(filename, cid, ms)
    except (
        FileValidationError,
        ResourceLimitError,
        FileProcessingError,
    ) as exc:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            str(exc),
        )
        raise
    except Exception as err:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            "internal_error",
        )
        logger.exception("Error during image file validation: %s", err)
        raise FileProcessingError(
            "File validation failed due to internal error",
            original_error=err,
        ) from err
    finally:
        reset_correlation_id()

validate_zip_file async

validate_zip_file(file)

Validate uploaded ZIP archive against service configuration.

Parameters:

Name Type Description Default
file UploadFileProtocol

Incoming ZIP file-like object to validate.

required

Raises:

Type Description
FilenameSecurityError

Filename is empty, invalid, or fails security checks.

ExtensionSecurityError

File extension is not allowed or is blocked.

FileSizeError

File size exceeds maximum or file is empty.

MimeTypeError

MIME type is not in allowed ZIP types.

FileSignatureError

File signature doesn't match expected ZIP format.

CompressionSecurityError

ZIP compression validation failed (zip bomb detected).

FileProcessingError

Unexpected error during validation.

Source code in safeuploads/file_validator.py
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
async def validate_zip_file(self, file: UploadFile) -> None:
    """
    Validate uploaded ZIP archive against service configuration.

    Args:
        file: Incoming ZIP file-like object to validate.

    Raises:
        FilenameSecurityError: Filename is empty, invalid, or fails
            security checks.
        ExtensionSecurityError: File extension is not allowed or is
            blocked.
        FileSizeError: File size exceeds maximum or file is empty.
        MimeTypeError: MIME type is not in allowed ZIP types.
        FileSignatureError: File signature doesn't match expected ZIP
            format.
        CompressionSecurityError: ZIP compression validation failed
            (zip bomb detected).
        FileProcessingError: Unexpected error during validation.
    """
    cid = set_correlation_id()
    filename = file.filename or "unknown"
    self._audit.start(filename, cid)
    t0 = time.monotonic()
    try:
        # Validate filename (raises exceptions on failure)
        self._validate_filename(file)

        # Validate file extension (raises exceptions on failure)
        self._validate_file_extension(
            file, self.config.ALLOWED_ZIP_EXTENSIONS
        )

        with ResourceMonitor(
            max_time_seconds=(
                self.config.limits.max_validation_time_seconds
            ),
            max_memory_mb=(self.config.limits.max_validation_memory_mb),
        ):
            # Stream file to SpooledTemporaryFile with size validation
            temp_file, file_size = await self._stream_to_temp_file(
                file, self.config.limits.max_zip_size
            )

            try:
                # Read header for MIME/signature checks
                header = temp_file.read(8192)
                temp_file.seek(0)

                # Detect MIME type using header bytes
                filename = file.filename or "unknown"
                detected_mime = self._detect_mime_type(header, filename)

                # Validate ZIP file signature first
                try:
                    self._validate_file_signature(header, "zip")
                except FileSignatureError as err:
                    raise FileSignatureError(
                        "File content does not match ZIP format",
                        filename=filename,
                        expected_type="zip",
                    ) from err

                # Check MIME type, allow octet-stream if signature valid
                if detected_mime not in self.config.ALLOWED_ZIP_MIMES:
                    if detected_mime == "application/octet-stream":
                        logger.debug(
                            "ZIP file detected as "
                            "application/octet-stream, "
                            "but signature is valid: %s",
                            filename,
                        )
                    else:
                        raise MimeTypeError(
                            f"Invalid file type. "
                            f"Detected: {detected_mime}. "
                            f"Expected ZIP file.",
                            filename=filename,
                            detected_mime=detected_mime,
                            allowed_mimes=list(
                                self.config.ALLOWED_ZIP_MIMES
                            ),
                        )

                # Validate ZIP compression ratio
                self.compression_validator.validate_zip_compression_ratio(
                    temp_file, file_size
                )

                # Perform ZIP content inspection if enabled
                if self.config.limits.scan_zip_content:
                    temp_file.seek(0)
                    self.zip_inspector.inspect_zip_content(temp_file)

                # Optional content analysis
                if self.config.limits.enable_content_analysis:
                    temp_file.seek(0)
                    scan_size = self.config.limits.content_scan_max_size
                    sample = temp_file.read(scan_size)
                    temp_file.seek(0)
                    threats = self.content_inspector.scan_content(
                        sample,
                        filename,
                        "zip",
                    )
                    if threats:
                        raise FileProcessingError(
                            "Content analysis"
                            " threats detected:"
                            f" {'; '.join(threats)}"
                        )

                logger.debug(
                    "ZIP file validation passed: %s (%s, %s bytes)",
                    filename,
                    detected_mime,
                    file_size,
                )
            finally:
                temp_file.close()
        ms = (time.monotonic() - t0) * 1000
        self._audit.success(filename, cid, ms)
    except (
        FileValidationError,
        ResourceLimitError,
        FileProcessingError,
    ) as exc:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            str(exc),
        )
        raise
    except Exception as err:
        ms = (time.monotonic() - t0) * 1000
        self._audit.failure(
            filename,
            cid,
            ms,
            "internal_error",
        )
        logger.exception("Error during ZIP file validation: %s", err)
        raise FileProcessingError(
            "File validation failed due to internal error",
            original_error=err,
        ) from err
    finally:
        reset_correlation_id()

FilenameSecurityError

Bases: FileValidationError

Filename failed security checks.

Source code in safeuploads/exceptions.py
187
188
class FilenameSecurityError(FileValidationError):
    """Filename failed security checks."""

GzipContentInspector

Inspects gzip archives for decompression bomb attacks.

Reads the compressed stream in chunks, tracking the compression ratio and total uncompressed size against configurable limits.

Attributes:

Name Type Description
config

File security configuration.

Source code in safeuploads/inspectors/gzip_inspector.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class GzipContentInspector:
    """
    Inspects gzip archives for decompression bomb attacks.

    Reads the compressed stream in chunks, tracking the
    compression ratio and total uncompressed size against
    configurable limits.

    Attributes:
        config: File security configuration.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize gzip inspector with configuration.

        Args:
            config: File security configuration.
        """
        self.config = config
        self._audit = SecurityAuditLogger(
            enabled=config.limits.enable_audit_logging
        )

    def inspect_gzip_content(
        self,
        file_obj: SeekableFile,
        compressed_size: int,
    ) -> None:
        """
        Inspect gzip archive for decompression bombs.

        Args:
            file_obj: Seekable file containing gzip data.
            compressed_size: Size of the compressed file in bytes.

        Raises:
            ZipBombError: If compression ratio or uncompressed
                size exceeds configured limits.
            CompressionSecurityError: If the gzip structure is
                invalid or corrupted.
            FileProcessingError: If an unexpected error occurs.
        """
        file_obj.seek(0)
        total_uncompressed = 0
        chunk_size = self.config.limits.chunk_size
        max_ratio = self.config.limits.max_compression_ratio
        max_uncompressed = self.config.limits.max_uncompressed_size

        try:
            with gzip.open(file_obj, "rb") as gz:
                while True:
                    chunk = gz.read(chunk_size)
                    if not chunk:
                        break
                    total_uncompressed += len(chunk)

                    # Check uncompressed size limit
                    if total_uncompressed > max_uncompressed:
                        logger.error(
                            "Gzip uncompressed size exceeded: %dMB > %dMB",
                            total_uncompressed // (1024 * 1024),
                            max_uncompressed // (1024 * 1024),
                            extra=log_extra(),
                        )
                        cid = get_correlation_id()
                        if cid:
                            self._audit.threat(
                                "",
                                cid,
                                "Gzip decompression bomb — size exceeded",
                            )
                        raise ZipBombError(
                            message=(
                                "Gzip uncompressed size too"
                                " large:"
                                f" {total_uncompressed // (1024 * 1024)}MB."
                                " Maximum:"
                                f" {max_uncompressed // (1024 * 1024)}MB"
                            ),
                            compression_ratio=0,
                            uncompressed_size=(total_uncompressed),
                            max_size=max_uncompressed,
                        )

                    # Check ratio progressively
                    if compressed_size > 0:
                        ratio = total_uncompressed / compressed_size
                        if ratio > max_ratio:
                            logger.error(
                                "Gzip compression ratio"
                                " exceeded:"
                                " %.1f:1 > %d:1",
                                ratio,
                                max_ratio,
                                extra=log_extra(),
                            )
                            cid = get_correlation_id()
                            if cid:
                                self._audit.threat(
                                    "",
                                    cid,
                                    "Gzip decompression bomb — ratio exceeded",
                                )
                            raise ZipBombError(
                                message=(
                                    "Gzip compression ratio"
                                    " too high:"
                                    f" {ratio:.1f}:1."
                                    " Maximum:"
                                    f" {max_ratio}:1"
                                ),
                                compression_ratio=ratio,
                                max_ratio=float(max_ratio),
                            )

        except ZipBombError:
            raise
        except gzip.BadGzipFile as err:
            logger.error(
                "Invalid or corrupted gzip file",
                exc_info=True,
            )
            raise CompressionSecurityError(
                message=("Invalid or corrupted gzip file"),
                error_code=ErrorCode.ZIP_CORRUPT,
            ) from err
        except EOFError as err:
            logger.error("Truncated gzip file", exc_info=True)
            raise CompressionSecurityError(
                message="Truncated gzip file",
                error_code=ErrorCode.ZIP_CORRUPT,
            ) from err
        except MemoryError as err:
            logger.error(
                "Gzip requires excessive memory",
                exc_info=True,
            )
            raise ZipBombError(
                message=(
                    "Gzip requires too much memory"
                    " — potential decompression bomb"
                ),
                compression_ratio=0,
            ) from err
        except Exception as err:
            logger.error(
                "Unexpected error during gzip inspection",
                exc_info=True,
            )
            raise FileProcessingError(
                message="Gzip inspection failed",
                original_error=err,
            ) from err

        # Final overall ratio check
        if compressed_size > 0 and total_uncompressed > 0:
            overall_ratio = total_uncompressed / compressed_size
            logger.debug(
                "Gzip analysis: %dMB uncompressed, ratio %.1f:1",
                total_uncompressed // (1024 * 1024),
                overall_ratio,
            )

        file_obj.seek(0)

__init__

__init__(config)

Initialize gzip inspector with configuration.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration.

required
Source code in safeuploads/inspectors/gzip_inspector.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, config: FileSecurityConfig):
    """
    Initialize gzip inspector with configuration.

    Args:
        config: File security configuration.
    """
    self.config = config
    self._audit = SecurityAuditLogger(
        enabled=config.limits.enable_audit_logging
    )

inspect_gzip_content

inspect_gzip_content(file_obj, compressed_size)

Inspect gzip archive for decompression bombs.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file containing gzip data.

required
compressed_size int

Size of the compressed file in bytes.

required

Raises:

Type Description
ZipBombError

If compression ratio or uncompressed size exceeds configured limits.

CompressionSecurityError

If the gzip structure is invalid or corrupted.

FileProcessingError

If an unexpected error occurs.

Source code in safeuploads/inspectors/gzip_inspector.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def inspect_gzip_content(
    self,
    file_obj: SeekableFile,
    compressed_size: int,
) -> None:
    """
    Inspect gzip archive for decompression bombs.

    Args:
        file_obj: Seekable file containing gzip data.
        compressed_size: Size of the compressed file in bytes.

    Raises:
        ZipBombError: If compression ratio or uncompressed
            size exceeds configured limits.
        CompressionSecurityError: If the gzip structure is
            invalid or corrupted.
        FileProcessingError: If an unexpected error occurs.
    """
    file_obj.seek(0)
    total_uncompressed = 0
    chunk_size = self.config.limits.chunk_size
    max_ratio = self.config.limits.max_compression_ratio
    max_uncompressed = self.config.limits.max_uncompressed_size

    try:
        with gzip.open(file_obj, "rb") as gz:
            while True:
                chunk = gz.read(chunk_size)
                if not chunk:
                    break
                total_uncompressed += len(chunk)

                # Check uncompressed size limit
                if total_uncompressed > max_uncompressed:
                    logger.error(
                        "Gzip uncompressed size exceeded: %dMB > %dMB",
                        total_uncompressed // (1024 * 1024),
                        max_uncompressed // (1024 * 1024),
                        extra=log_extra(),
                    )
                    cid = get_correlation_id()
                    if cid:
                        self._audit.threat(
                            "",
                            cid,
                            "Gzip decompression bomb — size exceeded",
                        )
                    raise ZipBombError(
                        message=(
                            "Gzip uncompressed size too"
                            " large:"
                            f" {total_uncompressed // (1024 * 1024)}MB."
                            " Maximum:"
                            f" {max_uncompressed // (1024 * 1024)}MB"
                        ),
                        compression_ratio=0,
                        uncompressed_size=(total_uncompressed),
                        max_size=max_uncompressed,
                    )

                # Check ratio progressively
                if compressed_size > 0:
                    ratio = total_uncompressed / compressed_size
                    if ratio > max_ratio:
                        logger.error(
                            "Gzip compression ratio"
                            " exceeded:"
                            " %.1f:1 > %d:1",
                            ratio,
                            max_ratio,
                            extra=log_extra(),
                        )
                        cid = get_correlation_id()
                        if cid:
                            self._audit.threat(
                                "",
                                cid,
                                "Gzip decompression bomb — ratio exceeded",
                            )
                        raise ZipBombError(
                            message=(
                                "Gzip compression ratio"
                                " too high:"
                                f" {ratio:.1f}:1."
                                " Maximum:"
                                f" {max_ratio}:1"
                            ),
                            compression_ratio=ratio,
                            max_ratio=float(max_ratio),
                        )

    except ZipBombError:
        raise
    except gzip.BadGzipFile as err:
        logger.error(
            "Invalid or corrupted gzip file",
            exc_info=True,
        )
        raise CompressionSecurityError(
            message=("Invalid or corrupted gzip file"),
            error_code=ErrorCode.ZIP_CORRUPT,
        ) from err
    except EOFError as err:
        logger.error("Truncated gzip file", exc_info=True)
        raise CompressionSecurityError(
            message="Truncated gzip file",
            error_code=ErrorCode.ZIP_CORRUPT,
        ) from err
    except MemoryError as err:
        logger.error(
            "Gzip requires excessive memory",
            exc_info=True,
        )
        raise ZipBombError(
            message=(
                "Gzip requires too much memory"
                " — potential decompression bomb"
            ),
            compression_ratio=0,
        ) from err
    except Exception as err:
        logger.error(
            "Unexpected error during gzip inspection",
            exc_info=True,
        )
        raise FileProcessingError(
            message="Gzip inspection failed",
            original_error=err,
        ) from err

    # Final overall ratio check
    if compressed_size > 0 and total_uncompressed > 0:
        overall_ratio = total_uncompressed / compressed_size
        logger.debug(
            "Gzip analysis: %dMB uncompressed, ratio %.1f:1",
            total_uncompressed // (1024 * 1024),
            overall_ratio,
        )

    file_obj.seek(0)

MalwareSignatureCategory

Bases: Enum

Byte-level signatures for embedded malware detection.

Attributes:

Name Type Description
PE_EXECUTABLE

Windows PE/MZ executable headers.

ELF_EXECUTABLE

Linux ELF executable headers.

MACHO_EXECUTABLE

macOS Mach-O executable headers.

JAVA_CLASS

Java class file headers.

WINDOWS_SHORTCUT

Windows .lnk shortcut headers.

WEBSHELL_PATTERNS

Common web shell script markers.

POLYGLOT_SIGNATURES

Multi-format file signatures.

Source code in safeuploads/enums.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class MalwareSignatureCategory(Enum):
    """
    Byte-level signatures for embedded malware detection.

    Attributes:
        PE_EXECUTABLE: Windows PE/MZ executable headers.
        ELF_EXECUTABLE: Linux ELF executable headers.
        MACHO_EXECUTABLE: macOS Mach-O executable headers.
        JAVA_CLASS: Java class file headers.
        WINDOWS_SHORTCUT: Windows .lnk shortcut headers.
        WEBSHELL_PATTERNS: Common web shell script markers.
        POLYGLOT_SIGNATURES: Multi-format file signatures.
    """

    # Windows PE executables
    PE_EXECUTABLE = {
        b"MZ",
        b"PE\x00\x00",
    }

    # Linux ELF executables
    ELF_EXECUTABLE = {
        b"\x7fELF",
    }

    # macOS Mach-O executables (32/64, big/little endian)
    MACHO_EXECUTABLE = {
        b"\xfe\xed\xfa\xce",
        b"\xfe\xed\xfa\xcf",
        b"\xce\xfa\xed\xfe",
        b"\xcf\xfa\xed\xfe",
    }

    # Java class files
    JAVA_CLASS = {
        b"\xca\xfe\xba\xbe",
    }

    # Windows shortcuts (.lnk)
    WINDOWS_SHORTCUT = {
        b"L\x00\x00\x00",
    }

    # Common web shell / script injection markers
    WEBSHELL_PATTERNS = {
        b"<?php",
        b"<%",
        b"<script",
    }

    # Polyglot detection: secondary format signatures
    # that should not appear in image/activity files
    POLYGLOT_SIGNATURES = {
        b"PK\x03\x04",  # ZIP/JAR inside image (GIFAR)
        b"\xca\xfe\xba\xbe",  # Java class inside image
        b"Rar!\x1a\x07",  # RAR inside image
    }

MimeTypeError

Bases: FileValidationError

File MIME type not allowed or mismatches extension.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename with MIME type issue.

None
detected_mime str | None

Optional detected MIME type string.

None
allowed_mimes list[str] | None

Optional list of allowed MIME types.

None
error_code str | None

Optional error code (defaults to MIME_TYPE_INVALID).

None

Attributes:

Name Type Description
detected_mime

The detected MIME type string.

allowed_mimes

List of allowed MIME types.

Source code in safeuploads/exceptions.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class MimeTypeError(FileValidationError):
    """
    File MIME type not allowed or mismatches extension.

    Args:
        message: Human-readable error description.
        filename: Optional filename with MIME type issue.
        detected_mime: Optional detected MIME type string.
        allowed_mimes: Optional list of allowed MIME types.
        error_code: Optional error code (defaults to
            MIME_TYPE_INVALID).

    Attributes:
        detected_mime: The detected MIME type string.
        allowed_mimes: List of allowed MIME types.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        detected_mime: str | None = None,
        allowed_mimes: list[str] | None = None,
        error_code: str | None = None,
    ):
        """Initialize with MIME type details."""
        self.detected_mime = detected_mime
        self.allowed_mimes = allowed_mimes or []
        super().__init__(
            message,
            filename=filename,
            error_code=error_code or ErrorCode.MIME_TYPE_INVALID,
        )

__init__

__init__(
    message,
    filename=None,
    detected_mime=None,
    allowed_mimes=None,
    error_code=None,
)

Initialize with MIME type details.

Source code in safeuploads/exceptions.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def __init__(
    self,
    message: str,
    filename: str | None = None,
    detected_mime: str | None = None,
    allowed_mimes: list[str] | None = None,
    error_code: str | None = None,
):
    """Initialize with MIME type details."""
    self.detected_mime = detected_mime
    self.allowed_mimes = allowed_mimes or []
    super().__init__(
        message,
        filename=filename,
        error_code=error_code or ErrorCode.MIME_TYPE_INVALID,
    )

ResourceLimitError

Bases: FileProcessingError

Validation exceeded configured resource limits.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
error_code str

Machine-readable error code.

RESOURCE_LIMIT_EXCEEDED
elapsed_seconds float | None

Optional wall-clock time consumed.

None
memory_bytes int | None

Optional peak memory consumed in bytes.

None
original_error Exception | None

Optional original exception.

None

Attributes:

Name Type Description
elapsed_seconds

Wall-clock seconds consumed.

memory_bytes

Peak memory consumed in bytes.

Source code in safeuploads/exceptions.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
class ResourceLimitError(FileProcessingError):
    """
    Validation exceeded configured resource limits.

    Args:
        message: Human-readable error description.
        error_code: Machine-readable error code.
        elapsed_seconds: Optional wall-clock time consumed.
        memory_bytes: Optional peak memory consumed in bytes.
        original_error: Optional original exception.

    Attributes:
        elapsed_seconds: Wall-clock seconds consumed.
        memory_bytes: Peak memory consumed in bytes.
    """

    def __init__(
        self,
        message: str,
        error_code: str = ErrorCode.RESOURCE_LIMIT_EXCEEDED,
        elapsed_seconds: float | None = None,
        memory_bytes: int | None = None,
        original_error: Exception | None = None,
    ):
        """Initialize with resource limit details."""
        self.elapsed_seconds = elapsed_seconds
        self.memory_bytes = memory_bytes
        super().__init__(message, original_error=original_error)
        self.error_code = error_code

__init__

__init__(
    message,
    error_code=ErrorCode.RESOURCE_LIMIT_EXCEEDED,
    elapsed_seconds=None,
    memory_bytes=None,
    original_error=None,
)

Initialize with resource limit details.

Source code in safeuploads/exceptions.py
507
508
509
510
511
512
513
514
515
516
517
518
519
def __init__(
    self,
    message: str,
    error_code: str = ErrorCode.RESOURCE_LIMIT_EXCEEDED,
    elapsed_seconds: float | None = None,
    memory_bytes: int | None = None,
    original_error: Exception | None = None,
):
    """Initialize with resource limit details."""
    self.elapsed_seconds = elapsed_seconds
    self.memory_bytes = memory_bytes
    super().__init__(message, original_error=original_error)
    self.error_code = error_code

ResourceMonitor

Context manager that enforces wall-clock and memory limits.

Tracks elapsed time continuously and samples memory usage via resource.getrusage on entry and exit. Raises ResourceLimitError when either limit is exceeded.

Attributes:

Name Type Description
max_time_seconds

Maximum allowed wall-clock seconds.

max_memory_bytes

Maximum allowed memory delta in bytes.

start_time float

Timestamp when the context was entered.

start_memory int

RSS memory in bytes at context entry.

Source code in safeuploads/utils.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class ResourceMonitor:
    """
    Context manager that enforces wall-clock and memory limits.

    Tracks elapsed time continuously and samples memory usage
    via ``resource.getrusage`` on entry and exit. Raises
    ``ResourceLimitError`` when either limit is exceeded.

    Attributes:
        max_time_seconds: Maximum allowed wall-clock seconds.
        max_memory_bytes: Maximum allowed memory delta in bytes.
        start_time: Timestamp when the context was entered.
        start_memory: RSS memory in bytes at context entry.
    """

    def __init__(
        self,
        max_time_seconds: float = 30.0,
        max_memory_mb: int = 512,
    ):
        """
        Initialize the resource monitor.

        Args:
            max_time_seconds: Wall-clock timeout in seconds.
            max_memory_mb: Maximum memory delta in megabytes.
        """
        self.max_time_seconds = max_time_seconds
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.start_time: float = 0.0
        self.start_memory: int = 0
        self._elapsed: float = 0.0
        self._memory_delta: int = 0

    def __enter__(self) -> "ResourceMonitor":
        """
        Record baseline time and memory on context entry.

        Returns:
            Self for use in ``with`` statements.
        """
        self.start_time = time.monotonic()
        self.start_memory = self._get_rss_bytes()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Check resource usage on context exit.

        Args:
            exc_type: Exception type if raised inside block.
            exc_val: Exception value if raised inside block.
            exc_tb: Exception traceback if raised inside block.

        Raises:
            ResourceLimitError: If time or memory limits were
                exceeded during the monitored block.
        """
        if exc_type is not None:
            return

        self._elapsed = time.monotonic() - self.start_time
        current_memory = self._get_rss_bytes()
        self._memory_delta = current_memory - self.start_memory

        if self._elapsed > self.max_time_seconds:
            logger.error(
                "Validation time limit exceeded: %.2fs > %.2fs",
                self._elapsed,
                self.max_time_seconds,
            )
            raise ResourceLimitError(
                message=(
                    f"Validation time limit exceeded: "
                    f"{self._elapsed:.1f}s "
                    f"(max {self.max_time_seconds:.1f}s)"
                ),
                error_code=ErrorCode.RESOURCE_TIME_EXCEEDED,
                elapsed_seconds=self._elapsed,
            )

        if (
            self._memory_delta > 0
            and self._memory_delta > self.max_memory_bytes
        ):
            delta_mb = self._memory_delta // (1024 * 1024)
            max_mb = self.max_memory_bytes // (1024 * 1024)
            logger.error(
                "Validation memory limit exceeded: %dMB > %dMB",
                delta_mb,
                max_mb,
            )
            raise ResourceLimitError(
                message=(
                    f"Validation memory limit exceeded: "
                    f"{delta_mb}MB (max {max_mb}MB)"
                ),
                error_code=(ErrorCode.RESOURCE_MEMORY_EXCEEDED),
                memory_bytes=self._memory_delta,
            )

    def check_time(self) -> None:
        """
        Check elapsed time mid-operation.

        Raises:
            ResourceLimitError: If the wall-clock time limit has
                been exceeded since context entry.
        """
        elapsed = time.monotonic() - self.start_time
        if elapsed > self.max_time_seconds:
            logger.error(
                "Validation time limit exceeded: %.2fs > %.2fs",
                elapsed,
                self.max_time_seconds,
            )
            raise ResourceLimitError(
                message=(
                    f"Validation time limit exceeded: "
                    f"{elapsed:.1f}s "
                    f"(max {self.max_time_seconds:.1f}s)"
                ),
                error_code=ErrorCode.RESOURCE_TIME_EXCEEDED,
                elapsed_seconds=elapsed,
            )

    @property
    def elapsed(self) -> float:
        """
        Return elapsed seconds since context entry.

        Returns:
            Elapsed wall-clock seconds.
        """
        if self.start_time == 0.0:
            return 0.0
        return time.monotonic() - self.start_time

    @property
    def memory_delta(self) -> int:
        """
        Return memory delta since context entry.

        Returns:
            Memory delta in bytes (may be negative).
        """
        if self.start_memory == 0:
            return 0
        return self._get_rss_bytes() - self.start_memory

    @staticmethod
    def _get_rss_bytes() -> int:
        """
        Return current process RSS in bytes.

        Returns:
            Resident set size in bytes.
        """
        usage = resource.getrusage(resource.RUSAGE_SELF)
        # macOS reports in bytes, Linux in kilobytes
        import sys

        if sys.platform == "darwin":
            return usage.ru_maxrss
        return usage.ru_maxrss * 1024

elapsed property

elapsed

Return elapsed seconds since context entry.

Returns:

Type Description
float

Elapsed wall-clock seconds.

memory_delta property

memory_delta

Return memory delta since context entry.

Returns:

Type Description
int

Memory delta in bytes (may be negative).

__enter__

__enter__()

Record baseline time and memory on context entry.

Returns:

Type Description
ResourceMonitor

Self for use in with statements.

Source code in safeuploads/utils.py
46
47
48
49
50
51
52
53
54
55
def __enter__(self) -> "ResourceMonitor":
    """
    Record baseline time and memory on context entry.

    Returns:
        Self for use in ``with`` statements.
    """
    self.start_time = time.monotonic()
    self.start_memory = self._get_rss_bytes()
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Check resource usage on context exit.

Parameters:

Name Type Description Default
exc_type

Exception type if raised inside block.

required
exc_val

Exception value if raised inside block.

required
exc_tb

Exception traceback if raised inside block.

required

Raises:

Type Description
ResourceLimitError

If time or memory limits were exceeded during the monitored block.

Source code in safeuploads/utils.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """
    Check resource usage on context exit.

    Args:
        exc_type: Exception type if raised inside block.
        exc_val: Exception value if raised inside block.
        exc_tb: Exception traceback if raised inside block.

    Raises:
        ResourceLimitError: If time or memory limits were
            exceeded during the monitored block.
    """
    if exc_type is not None:
        return

    self._elapsed = time.monotonic() - self.start_time
    current_memory = self._get_rss_bytes()
    self._memory_delta = current_memory - self.start_memory

    if self._elapsed > self.max_time_seconds:
        logger.error(
            "Validation time limit exceeded: %.2fs > %.2fs",
            self._elapsed,
            self.max_time_seconds,
        )
        raise ResourceLimitError(
            message=(
                f"Validation time limit exceeded: "
                f"{self._elapsed:.1f}s "
                f"(max {self.max_time_seconds:.1f}s)"
            ),
            error_code=ErrorCode.RESOURCE_TIME_EXCEEDED,
            elapsed_seconds=self._elapsed,
        )

    if (
        self._memory_delta > 0
        and self._memory_delta > self.max_memory_bytes
    ):
        delta_mb = self._memory_delta // (1024 * 1024)
        max_mb = self.max_memory_bytes // (1024 * 1024)
        logger.error(
            "Validation memory limit exceeded: %dMB > %dMB",
            delta_mb,
            max_mb,
        )
        raise ResourceLimitError(
            message=(
                f"Validation memory limit exceeded: "
                f"{delta_mb}MB (max {max_mb}MB)"
            ),
            error_code=(ErrorCode.RESOURCE_MEMORY_EXCEEDED),
            memory_bytes=self._memory_delta,
        )

__init__

__init__(max_time_seconds=30.0, max_memory_mb=512)

Initialize the resource monitor.

Parameters:

Name Type Description Default
max_time_seconds float

Wall-clock timeout in seconds.

30.0
max_memory_mb int

Maximum memory delta in megabytes.

512
Source code in safeuploads/utils.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self,
    max_time_seconds: float = 30.0,
    max_memory_mb: int = 512,
):
    """
    Initialize the resource monitor.

    Args:
        max_time_seconds: Wall-clock timeout in seconds.
        max_memory_mb: Maximum memory delta in megabytes.
    """
    self.max_time_seconds = max_time_seconds
    self.max_memory_bytes = max_memory_mb * 1024 * 1024
    self.start_time: float = 0.0
    self.start_memory: int = 0
    self._elapsed: float = 0.0
    self._memory_delta: int = 0

check_time

check_time()

Check elapsed time mid-operation.

Raises:

Type Description
ResourceLimitError

If the wall-clock time limit has been exceeded since context entry.

Source code in safeuploads/utils.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def check_time(self) -> None:
    """
    Check elapsed time mid-operation.

    Raises:
        ResourceLimitError: If the wall-clock time limit has
            been exceeded since context entry.
    """
    elapsed = time.monotonic() - self.start_time
    if elapsed > self.max_time_seconds:
        logger.error(
            "Validation time limit exceeded: %.2fs > %.2fs",
            elapsed,
            self.max_time_seconds,
        )
        raise ResourceLimitError(
            message=(
                f"Validation time limit exceeded: "
                f"{elapsed:.1f}s "
                f"(max {self.max_time_seconds:.1f}s)"
            ),
            error_code=ErrorCode.RESOURCE_TIME_EXCEEDED,
            elapsed_seconds=elapsed,
        )

SecurityAuditLogger

Emits structured audit log records.

All records are emitted under the safeuploads.audit logger with extra fields containing the structured AuditEvent data. The application configures handlers on this logger (or a parent) to capture events.

Attributes:

Name Type Description
enabled

Whether audit logging is active.

Source code in safeuploads/audit.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class SecurityAuditLogger:
    """
    Emits structured audit log records.

    All records are emitted under the ``safeuploads.audit``
    logger with ``extra`` fields containing the structured
    ``AuditEvent`` data. The application configures handlers
    on this logger (or a parent) to capture events.

    Attributes:
        enabled: Whether audit logging is active.
    """

    def __init__(self, enabled: bool = False):
        """
        Initialize the audit logger.

        Args:
            enabled: Whether to emit audit events.
        """
        self.enabled = enabled

    def log_event(self, event: AuditEvent) -> None:
        """
        Emit an audit event as a structured log record.

        Args:
            event: The audit event to record.
        """
        if not self.enabled:
            return

        extra = {
            "audit_event_type": event.event_type.value,
            "audit_correlation_id": event.correlation_id,
            "audit_filename": event.filename,
            "audit_result": event.result,
            "audit_details": event.details,
            "audit_duration_ms": event.duration_ms,
            "audit_source_ip": event.source_ip or "",
        }

        level = logging.INFO
        if event.event_type in (
            AuditEventType.THREAT_DETECTED,
            AuditEventType.VALIDATION_FAILURE,
            AuditEventType.RESOURCE_LIMIT,
        ):
            level = logging.WARNING

        _audit_logger.log(
            level,
            "[%s] %s file=%s result=%s",
            event.correlation_id[:12],
            event.event_type.value,
            event.filename,
            event.result,
            extra=extra,
        )

    def start(
        self,
        filename: str,
        correlation_id: str,
    ) -> None:
        """
        Log a validation start event.

        Args:
            filename: Name of the file being validated.
            correlation_id: Unique operation identifier.
        """
        self.log_event(
            AuditEvent(
                event_type=AuditEventType.VALIDATION_START,
                correlation_id=correlation_id,
                filename=filename,
                result="started",
            )
        )

    def success(
        self,
        filename: str,
        correlation_id: str,
        duration_ms: float,
    ) -> None:
        """
        Log a validation success event.

        Args:
            filename: Name of the validated file.
            correlation_id: Unique operation identifier.
            duration_ms: Validation duration in milliseconds.
        """
        self.log_event(
            AuditEvent(
                event_type=(AuditEventType.VALIDATION_SUCCESS),
                correlation_id=correlation_id,
                filename=filename,
                result="passed",
                duration_ms=duration_ms,
            )
        )

    def failure(
        self,
        filename: str,
        correlation_id: str,
        duration_ms: float,
        error: str,
        details: str = "",
    ) -> None:
        """
        Log a validation failure event.

        Args:
            filename: Name of the failed file.
            correlation_id: Unique operation identifier.
            duration_ms: Validation duration in milliseconds.
            error: Short error description.
            details: Additional failure context.
        """
        self.log_event(
            AuditEvent(
                event_type=(AuditEventType.VALIDATION_FAILURE),
                correlation_id=correlation_id,
                filename=filename,
                result=error,
                details=details,
                duration_ms=duration_ms,
            )
        )

    def threat(
        self,
        filename: str,
        correlation_id: str,
        threat_description: str,
    ) -> None:
        """
        Log a threat detection event.

        Args:
            filename: Name of the threatening file.
            correlation_id: Unique operation identifier.
            threat_description: Description of the threat.
        """
        self.log_event(
            AuditEvent(
                event_type=AuditEventType.THREAT_DETECTED,
                correlation_id=correlation_id,
                filename=filename,
                result="threat_detected",
                details=threat_description,
            )
        )

__init__

__init__(enabled=False)

Initialize the audit logger.

Parameters:

Name Type Description Default
enabled bool

Whether to emit audit events.

False
Source code in safeuploads/audit.py
153
154
155
156
157
158
159
160
def __init__(self, enabled: bool = False):
    """
    Initialize the audit logger.

    Args:
        enabled: Whether to emit audit events.
    """
    self.enabled = enabled

failure

failure(
    filename, correlation_id, duration_ms, error, details=""
)

Log a validation failure event.

Parameters:

Name Type Description Default
filename str

Name of the failed file.

required
correlation_id str

Unique operation identifier.

required
duration_ms float

Validation duration in milliseconds.

required
error str

Short error description.

required
details str

Additional failure context.

''
Source code in safeuploads/audit.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def failure(
    self,
    filename: str,
    correlation_id: str,
    duration_ms: float,
    error: str,
    details: str = "",
) -> None:
    """
    Log a validation failure event.

    Args:
        filename: Name of the failed file.
        correlation_id: Unique operation identifier.
        duration_ms: Validation duration in milliseconds.
        error: Short error description.
        details: Additional failure context.
    """
    self.log_event(
        AuditEvent(
            event_type=(AuditEventType.VALIDATION_FAILURE),
            correlation_id=correlation_id,
            filename=filename,
            result=error,
            details=details,
            duration_ms=duration_ms,
        )
    )

log_event

log_event(event)

Emit an audit event as a structured log record.

Parameters:

Name Type Description Default
event AuditEvent

The audit event to record.

required
Source code in safeuploads/audit.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def log_event(self, event: AuditEvent) -> None:
    """
    Emit an audit event as a structured log record.

    Args:
        event: The audit event to record.
    """
    if not self.enabled:
        return

    extra = {
        "audit_event_type": event.event_type.value,
        "audit_correlation_id": event.correlation_id,
        "audit_filename": event.filename,
        "audit_result": event.result,
        "audit_details": event.details,
        "audit_duration_ms": event.duration_ms,
        "audit_source_ip": event.source_ip or "",
    }

    level = logging.INFO
    if event.event_type in (
        AuditEventType.THREAT_DETECTED,
        AuditEventType.VALIDATION_FAILURE,
        AuditEventType.RESOURCE_LIMIT,
    ):
        level = logging.WARNING

    _audit_logger.log(
        level,
        "[%s] %s file=%s result=%s",
        event.correlation_id[:12],
        event.event_type.value,
        event.filename,
        event.result,
        extra=extra,
    )

start

start(filename, correlation_id)

Log a validation start event.

Parameters:

Name Type Description Default
filename str

Name of the file being validated.

required
correlation_id str

Unique operation identifier.

required
Source code in safeuploads/audit.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def start(
    self,
    filename: str,
    correlation_id: str,
) -> None:
    """
    Log a validation start event.

    Args:
        filename: Name of the file being validated.
        correlation_id: Unique operation identifier.
    """
    self.log_event(
        AuditEvent(
            event_type=AuditEventType.VALIDATION_START,
            correlation_id=correlation_id,
            filename=filename,
            result="started",
        )
    )

success

success(filename, correlation_id, duration_ms)

Log a validation success event.

Parameters:

Name Type Description Default
filename str

Name of the validated file.

required
correlation_id str

Unique operation identifier.

required
duration_ms float

Validation duration in milliseconds.

required
Source code in safeuploads/audit.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def success(
    self,
    filename: str,
    correlation_id: str,
    duration_ms: float,
) -> None:
    """
    Log a validation success event.

    Args:
        filename: Name of the validated file.
        correlation_id: Unique operation identifier.
        duration_ms: Validation duration in milliseconds.
    """
    self.log_event(
        AuditEvent(
            event_type=(AuditEventType.VALIDATION_SUCCESS),
            correlation_id=correlation_id,
            filename=filename,
            result="passed",
            duration_ms=duration_ms,
        )
    )

threat

threat(filename, correlation_id, threat_description)

Log a threat detection event.

Parameters:

Name Type Description Default
filename str

Name of the threatening file.

required
correlation_id str

Unique operation identifier.

required
threat_description str

Description of the threat.

required
Source code in safeuploads/audit.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def threat(
    self,
    filename: str,
    correlation_id: str,
    threat_description: str,
) -> None:
    """
    Log a threat detection event.

    Args:
        filename: Name of the threatening file.
        correlation_id: Unique operation identifier.
        threat_description: Description of the threat.
    """
    self.log_event(
        AuditEvent(
            event_type=AuditEventType.THREAT_DETECTED,
            correlation_id=correlation_id,
            filename=filename,
            result="threat_detected",
            details=threat_description,
        )
    )

SecurityLimits dataclass

Security constraints for file submissions.

Attributes:

Name Type Description
max_image_size int

Maximum size in bytes for image files.

max_zip_size int

Maximum size in bytes for ZIP archives.

max_compression_ratio int

Maximum expansion ratio for ZIP files.

max_uncompressed_size int

Maximum cumulative size of ZIP contents.

max_individual_file_size int

Maximum size of single file in ZIP.

max_zip_entries int

Maximum number of file entries in ZIP.

zip_analysis_timeout float

Maximum seconds for ZIP analysis.

max_zip_depth int

Maximum directory nesting depth in ZIP.

max_filename_length int

Maximum length for filenames in ZIP.

max_path_length int

Maximum length for full paths in ZIP.

allow_nested_archives bool

Whether nested archives are permitted.

allow_symlinks bool

Whether symbolic links are permitted.

allow_absolute_paths bool

Whether absolute paths are permitted.

scan_zip_content bool

Whether deep content inspection is enabled.

Source code in safeuploads/config.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class SecurityLimits:
    """
    Security constraints for file submissions.

    Attributes:
        max_image_size: Maximum size in bytes for image files.
        max_zip_size: Maximum size in bytes for ZIP archives.
        max_compression_ratio: Maximum expansion ratio for ZIP files.
        max_uncompressed_size: Maximum cumulative size of ZIP contents.
        max_individual_file_size: Maximum size of single file in ZIP.
        max_zip_entries: Maximum number of file entries in ZIP.
        zip_analysis_timeout: Maximum seconds for ZIP analysis.
        max_zip_depth: Maximum directory nesting depth in ZIP.
        max_filename_length: Maximum length for filenames in ZIP.
        max_path_length: Maximum length for full paths in ZIP.
        allow_nested_archives: Whether nested archives are permitted.
        allow_symlinks: Whether symbolic links are permitted.
        allow_absolute_paths: Whether absolute paths are permitted.
        scan_zip_content: Whether deep content inspection is enabled.
    """

    # File size limits (in bytes)
    max_image_size: int = 20 * 1024 * 1024  # 20MB for images
    max_zip_size: int = 500 * 1024 * 1024  # 500MB for ZIP files
    max_activity_file_size: int = 50 * 1024 * 1024  # 50MB for GPX/TCX/FIT
    max_gzip_size: int = 500 * 1024 * 1024  # 500MB for gzip files

    # Streaming validation settings
    max_memory_buffer_size: int = (
        10 * 1024 * 1024  # 10MB before spilling to disk
    )
    chunk_size: int = 65536  # 64KB chunks for streaming reads

    # Resource monitoring limits
    max_validation_memory_mb: int = 512  # Max MB during validation
    max_validation_time_seconds: float = (
        30.0  # Overall validation timeout in seconds
    )

    # ZIP compression security settings
    # Maximum allowed expansion ratio (e.g., 100:1)
    max_compression_ratio: int = 100
    # 1GB max uncompressed size
    max_uncompressed_size: int = 1024 * 1024 * 1024
    max_individual_file_size: int = (
        500 * 1024 * 1024
    )  # 500MB max per individual file in ZIP
    max_zip_entries: int = 10000  # Maximum number of files in ZIP archive
    zip_analysis_timeout: float = (
        5.0  # Maximum seconds to spend analyzing ZIP structure
    )

    # ZIP content inspection settings
    max_zip_depth: int = 10  # Maximum nesting depth for directories in ZIP
    max_filename_length: int = 255  # Maximum length for individual file names
    max_path_length: int = 1024  # Maximum length for full file paths
    # Maximum number of files of the same type
    max_number_files_same_type: int = 1000
    # Whether to allow nested archive files
    allow_nested_archives: bool = False
    # Whether to allow symbolic links in ZIP
    allow_symlinks: bool = False
    # Whether to allow absolute paths in ZIP
    allow_absolute_paths: bool = False
    scan_zip_content: bool = True  # Whether to perform deep content inspection

    # Recursive ZIP inspection limits
    max_total_entries_recursive: int = (
        50000  # Max total entries across all nesting levels
    )

    # Audit logging
    enable_audit_logging: bool = False  # Structured security event logging

    # Content analysis (optional deep scan)
    enable_content_analysis: bool = False  # Malware/script/polyglot scan
    content_scan_max_size: int = (
        50 * 1024 * 1024  # Max bytes to scan (50MB)
    )

SeekableFile

Bases: Protocol

Protocol for seekable binary file-like objects.

Implemented by BytesIO, SpooledTemporaryFile, and regular file objects opened in binary mode.

Attributes:

Name Type Description
read bytes

Read bytes from the file.

seek int

Move file pointer to specified position.

tell int

Return current file pointer position.

Source code in safeuploads/protocols.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@runtime_checkable
class SeekableFile(Protocol):
    """
    Protocol for seekable binary file-like objects.

    Implemented by BytesIO, SpooledTemporaryFile, and regular
    file objects opened in binary mode.

    Attributes:
        read: Read bytes from the file.
        seek: Move file pointer to specified position.
        tell: Return current file pointer position.
    """

    def read(self, size: int = -1) -> bytes:
        """
        Read bytes from the file.

        Args:
            size: Number of bytes to read. -1 reads all.

        Returns:
            Bytes read from the file.
        """
        ...  # pragma: no cover

    def seek(self, offset: int, whence: int = 0) -> int:
        """
        Move file pointer to specified position.

        Args:
            offset: Position offset in bytes.
            whence: Reference point for offset (0=start,
                1=current, 2=end).

        Returns:
            New absolute position in the file.
        """
        ...  # pragma: no cover

    def tell(self) -> int:
        """
        Return current file pointer position.

        Returns:
            Current position in the file.
        """
        ...  # pragma: no cover

read

read(size=-1)

Read bytes from the file.

Parameters:

Name Type Description Default
size int

Number of bytes to read. -1 reads all.

-1

Returns:

Type Description
bytes

Bytes read from the file.

Source code in safeuploads/protocols.py
26
27
28
29
30
31
32
33
34
35
36
def read(self, size: int = -1) -> bytes:
    """
    Read bytes from the file.

    Args:
        size: Number of bytes to read. -1 reads all.

    Returns:
        Bytes read from the file.
    """
    ...  # pragma: no cover

seek

seek(offset, whence=0)

Move file pointer to specified position.

Parameters:

Name Type Description Default
offset int

Position offset in bytes.

required
whence int

Reference point for offset (0=start, 1=current, 2=end).

0

Returns:

Type Description
int

New absolute position in the file.

Source code in safeuploads/protocols.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def seek(self, offset: int, whence: int = 0) -> int:
    """
    Move file pointer to specified position.

    Args:
        offset: Position offset in bytes.
        whence: Reference point for offset (0=start,
            1=current, 2=end).

    Returns:
        New absolute position in the file.
    """
    ...  # pragma: no cover

tell

tell()

Return current file pointer position.

Returns:

Type Description
int

Current position in the file.

Source code in safeuploads/protocols.py
52
53
54
55
56
57
58
59
def tell(self) -> int:
    """
    Return current file pointer position.

    Returns:
        Current position in the file.
    """
    ...  # pragma: no cover

SuspiciousFilePattern

Bases: Enum

Categorized patterns used to flag potentially malicious uploads.

Attributes:

Name Type Description
DIRECTORY_TRAVERSAL

Directory traversal attack patterns.

SUSPICIOUS_NAMES

Suspicious filename patterns.

EXECUTABLE_SIGNATURES

Dangerous file content signatures.

SUSPICIOUS_PATHS

Suspicious path components.

Source code in safeuploads/enums.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class SuspiciousFilePattern(Enum):
    """
    Categorized patterns used to flag potentially malicious uploads.

    Attributes:
        DIRECTORY_TRAVERSAL: Directory traversal attack patterns.
        SUSPICIOUS_NAMES: Suspicious filename patterns.
        EXECUTABLE_SIGNATURES: Dangerous file content signatures.
        SUSPICIOUS_PATHS: Suspicious path components.
    """

    # Directory traversal attack patterns
    DIRECTORY_TRAVERSAL = {
        "../",
        "..\\",
        ".../",
        "...\\",
        "....//",
        "....\\\\",
        "%2e%2e%2f",
        "%2e%2e%5c",  # URL encoded ../ and ..\
        "%252e%252e%252f",
        "%252e%252e%255c",  # Double URL encoded
    }

    # Suspicious filename patterns
    SUSPICIOUS_NAMES = {
        # Windows system files that shouldn't be in user uploads
        "autorun.inf",
        "desktop.ini",
        "thumbs.db",
        ".ds_store",
        # Common malware names
        "install.exe",
        "setup.exe",
        "update.exe",
        "patch.exe",
        "crack.exe",
        "keygen.exe",
        "loader.exe",
        "activator.exe",
        # Hidden or system-like files
        ".htaccess",
        ".htpasswd",
        "web.config",
        "robots.txt",
    }

    # Dangerous file content signatures (magic bytes)
    EXECUTABLE_SIGNATURES = {
        # Windows PE executables
        b"MZ",
        b"PE\x00\x00",
        # ELF executables (Linux)
        b"\x7fELF",
        # Mach-O executables (macOS)
        b"\xfe\xed\xfa\xce",
        b"\xfe\xed\xfa\xcf",
        b"\xce\xfa\xed\xfe",
        b"\xcf\xfa\xed\xfe",
        # Java class files
        b"\xca\xfe\xba\xbe",
        # Windows shortcuts (.lnk)
        b"L\x00\x00\x00",
    }

    # Suspicious path components
    SUSPICIOUS_PATHS = {
        # Windows system directories
        "windows/",
        "system32/",
        "syswow64/",
        "programfiles/",
        # Unix system directories
        "/bin/",
        "/sbin/",
        "/usr/bin/",
        "/usr/sbin/",
        "/etc/",
        # Web server directories
        "cgi-bin/",
        "htdocs/",
        "www/",
        "wwwroot/",
        # Development/build directories
        ".git/",
        ".svn/",
        "node_modules/",
        "__pycache__/",
    }

UnicodeAttackCategory

Bases: Enum

Categorized Unicode code points used in obfuscation attacks.

Attributes:

Name Type Description
DIRECTIONAL_OVERRIDES

Right-to-left and directional controls.

ZERO_WIDTH_CHARACTERS

Zero-width and invisible characters.

LANGUAGE_MARKS

Language and format specific characters.

CONFUSING_PUNCTUATION

Punctuation that can disguise extensions.

Source code in safeuploads/enums.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
class UnicodeAttackCategory(Enum):
    """
    Categorized Unicode code points used in obfuscation attacks.

    Attributes:
        DIRECTIONAL_OVERRIDES: Right-to-left and directional controls.
        ZERO_WIDTH_CHARACTERS: Zero-width and invisible characters.
        LANGUAGE_MARKS: Language and format specific characters.
        CONFUSING_PUNCTUATION: Punctuation that can disguise extensions.
    """

    # Right-to-Left and directional override characters
    DIRECTIONAL_OVERRIDES = {
        0x202E,  # U+202E RIGHT-TO-LEFT OVERRIDE
        0x202D,  # U+202D LEFT-TO-RIGHT OVERRIDE
        0x202A,  # U+202A LEFT-TO-RIGHT EMBEDDING
        0x202B,  # U+202B RIGHT-TO-LEFT EMBEDDING
        0x202C,  # U+202C POP DIRECTIONAL FORMATTING
        0x2066,  # U+2066 LEFT-TO-RIGHT ISOLATE
        0x2067,  # U+2067 RIGHT-TO-LEFT ISOLATE
        0x2068,  # U+2068 FIRST STRONG ISOLATE
        0x2069,  # U+2069 POP DIRECTIONAL ISOLATE
    }

    # Zero-width and invisible characters
    ZERO_WIDTH_CHARACTERS = {
        0x200B,  # U+200B ZERO WIDTH SPACE
        0x200C,  # U+200C ZERO WIDTH NON-JOINER
        0x200D,  # U+200D ZERO WIDTH JOINER
        0x2060,  # U+2060 WORD JOINER
        0xFEFF,  # U+FEFF ZERO WIDTH NO-BREAK SPACE (BOM)
        0x034F,  # U+034F COMBINING GRAPHEME JOINER
    }

    # Language and format specific characters
    LANGUAGE_MARKS = {
        0x061C,  # U+061C ARABIC LETTER MARK
        0x180E,  # U+180E MONGOLIAN VOWEL SEPARATOR
    }

    # Confusing punctuation that can disguise extensions
    CONFUSING_PUNCTUATION = {
        0x2024,  # U+2024 ONE DOT LEADER
        0x2025,  # U+2025 TWO DOT LEADER
        0x2026,  # U+2026 HORIZONTAL ELLIPSIS
        0xFF0E,  # U+FF0E FULLWIDTH FULL STOP
    }

UnicodeSecurityError

Bases: FilenameSecurityError

Dangerous Unicode characters detected in filename.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename containing dangerous Unicode.

None
dangerous_chars list[tuple[str, int, int]] | None

Optional list of (char, code_point, position) tuples for each dangerous character found.

None

Attributes:

Name Type Description
dangerous_chars

List of dangerous character tuples.

Source code in safeuploads/exceptions.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class UnicodeSecurityError(FilenameSecurityError):
    """
    Dangerous Unicode characters detected in filename.

    Args:
        message: Human-readable error description.
        filename: Optional filename containing dangerous Unicode.
        dangerous_chars: Optional list of (char, code_point, position)
            tuples for each dangerous character found.

    Attributes:
        dangerous_chars: List of dangerous character tuples.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        dangerous_chars: list[tuple[str, int, int]] | None = None,
    ):
        """Initialize with dangerous characters."""
        self.dangerous_chars = dangerous_chars or []
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.UNICODE_DANGEROUS_CHARS,
        )

__init__

__init__(message, filename=None, dangerous_chars=None)

Initialize with dangerous characters.

Source code in safeuploads/exceptions.py
205
206
207
208
209
210
211
212
213
214
215
216
217
def __init__(
    self,
    message: str,
    filename: str | None = None,
    dangerous_chars: list[tuple[str, int, int]] | None = None,
):
    """Initialize with dangerous characters."""
    self.dangerous_chars = dangerous_chars or []
    super().__init__(
        message,
        filename=filename,
        error_code=ErrorCode.UNICODE_DANGEROUS_CHARS,
    )

UnicodeSecurityValidator

Bases: BaseValidator

Validates filenames for Unicode security threats.

Attributes:

Name Type Description
config

Runtime configuration for file security rules.

Source code in safeuploads/validators/unicode_validator.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class UnicodeSecurityValidator(BaseValidator):
    """
    Validates filenames for Unicode security threats.

    Attributes:
        config: Runtime configuration for file security rules.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the Unicode validator.

        Args:
            config: Runtime configuration that controls
                file security rules.
        """
        super().__init__(config)
        # Pre-compile as frozenset for O(1) code point lookup
        self._dangerous_chars: frozenset[int] = frozenset(
            config.DANGEROUS_UNICODE_CHARS
        )

    def validate_unicode_security(self, filename: str) -> str:
        """
        Validate filename for unsafe Unicode characters.

        Args:
            filename: The filename to validate and normalize.

        Returns:
            The NFC-normalized filename.

        Raises:
            UnicodeSecurityError: If dangerous Unicode characters are
                detected in the filename or result from normalization.
        """
        if not filename:
            return filename

        # Check for dangerous Unicode characters
        dangerous_chars_found = []
        for i, char in enumerate(filename):
            char_code = ord(char)
            if char_code in self._dangerous_chars:
                dangerous_chars_found.append((char, char_code, i))

        # If dangerous characters found, reject the file entirely
        if dangerous_chars_found:
            char_details = []
            for char, code, pos in dangerous_chars_found:
                char_name = unicodedata.name(char, f"U+{code:04X}")
                char_details.append(
                    f"'{char}' (U+{code:04X}: {char_name}) at position {pos}"
                )

            logger.warning(
                "Dangerous Unicode characters detected",
                extra={
                    "error_type": "unicode_security",
                    "file_name": filename,
                    "char_codes": [
                        code for _, code, _ in dangerous_chars_found
                    ],
                    "positions": [pos for _, _, pos in dangerous_chars_found],
                },
            )
            raise UnicodeSecurityError(
                message=(
                    "Dangerous Unicode characters"
                    " detected in filename:"
                    f" {', '.join(char_details)}."
                    " These characters can be used"
                    " to disguise file extensions"
                    " or create security"
                    " vulnerabilities."
                ),
                filename=filename,
                dangerous_chars=dangerous_chars_found,
            )

        # Normalize Unicode to prevent normalization attacks
        # Use NFC (Canonical Decomposition, then
        # Canonical Composition). This prevents attacks
        # where different Unicode representations of
        # the same text are used.
        normalized_filename = unicodedata.normalize("NFC", filename)

        # Check if normalization changed the filename significantly
        if normalized_filename != filename:
            logger.info(
                "Unicode normalization applied: '%s' -> '%s'",
                filename,
                normalized_filename,
            )

        # Additional check: ensure normalized filename
        # doesn't contain dangerous chars
        # (some normalization attacks might introduce
        # them)
        for char in normalized_filename:
            char_code = ord(char)
            if char_code in self._dangerous_chars:
                char_name = unicodedata.name(char, f"U+{char_code:04X}")
                logger.error(
                    "Unicode normalization resulted in dangerous character",
                    extra={
                        "error_type": "unicode_normalization_error",
                        "file_name": filename,
                        "normalized_filename": normalized_filename,
                        "char_code": char_code,
                    },
                )
                raise UnicodeSecurityError(
                    message=(
                        "Unicode normalization resulted"
                        " in dangerous character:"
                        f" '{char}'"
                        f" (U+{char_code:04X}:"
                        f" {char_name})"
                    ),
                    filename=filename,
                    dangerous_chars=[(char, char_code, 0)],
                )

        return normalized_filename

    def validate(self, filename: str) -> str:
        """
        Validate a filename for Unicode security issues.

        Args:
            filename: The name of the file to assess.

        Returns:
            The validated and normalized filename.
        """
        return self.validate_unicode_security(filename)

__init__

__init__(config)

Initialize the Unicode validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

Runtime configuration that controls file security rules.

required
Source code in safeuploads/validators/unicode_validator.py
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the Unicode validator.

    Args:
        config: Runtime configuration that controls
            file security rules.
    """
    super().__init__(config)
    # Pre-compile as frozenset for O(1) code point lookup
    self._dangerous_chars: frozenset[int] = frozenset(
        config.DANGEROUS_UNICODE_CHARS
    )

validate

validate(filename)

Validate a filename for Unicode security issues.

Parameters:

Name Type Description Default
filename str

The name of the file to assess.

required

Returns:

Type Description
str

The validated and normalized filename.

Source code in safeuploads/validators/unicode_validator.py
145
146
147
148
149
150
151
152
153
154
155
def validate(self, filename: str) -> str:
    """
    Validate a filename for Unicode security issues.

    Args:
        filename: The name of the file to assess.

    Returns:
        The validated and normalized filename.
    """
    return self.validate_unicode_security(filename)

validate_unicode_security

validate_unicode_security(filename)

Validate filename for unsafe Unicode characters.

Parameters:

Name Type Description Default
filename str

The filename to validate and normalize.

required

Returns:

Type Description
str

The NFC-normalized filename.

Raises:

Type Description
UnicodeSecurityError

If dangerous Unicode characters are detected in the filename or result from normalization.

Source code in safeuploads/validators/unicode_validator.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def validate_unicode_security(self, filename: str) -> str:
    """
    Validate filename for unsafe Unicode characters.

    Args:
        filename: The filename to validate and normalize.

    Returns:
        The NFC-normalized filename.

    Raises:
        UnicodeSecurityError: If dangerous Unicode characters are
            detected in the filename or result from normalization.
    """
    if not filename:
        return filename

    # Check for dangerous Unicode characters
    dangerous_chars_found = []
    for i, char in enumerate(filename):
        char_code = ord(char)
        if char_code in self._dangerous_chars:
            dangerous_chars_found.append((char, char_code, i))

    # If dangerous characters found, reject the file entirely
    if dangerous_chars_found:
        char_details = []
        for char, code, pos in dangerous_chars_found:
            char_name = unicodedata.name(char, f"U+{code:04X}")
            char_details.append(
                f"'{char}' (U+{code:04X}: {char_name}) at position {pos}"
            )

        logger.warning(
            "Dangerous Unicode characters detected",
            extra={
                "error_type": "unicode_security",
                "file_name": filename,
                "char_codes": [
                    code for _, code, _ in dangerous_chars_found
                ],
                "positions": [pos for _, _, pos in dangerous_chars_found],
            },
        )
        raise UnicodeSecurityError(
            message=(
                "Dangerous Unicode characters"
                " detected in filename:"
                f" {', '.join(char_details)}."
                " These characters can be used"
                " to disguise file extensions"
                " or create security"
                " vulnerabilities."
            ),
            filename=filename,
            dangerous_chars=dangerous_chars_found,
        )

    # Normalize Unicode to prevent normalization attacks
    # Use NFC (Canonical Decomposition, then
    # Canonical Composition). This prevents attacks
    # where different Unicode representations of
    # the same text are used.
    normalized_filename = unicodedata.normalize("NFC", filename)

    # Check if normalization changed the filename significantly
    if normalized_filename != filename:
        logger.info(
            "Unicode normalization applied: '%s' -> '%s'",
            filename,
            normalized_filename,
        )

    # Additional check: ensure normalized filename
    # doesn't contain dangerous chars
    # (some normalization attacks might introduce
    # them)
    for char in normalized_filename:
        char_code = ord(char)
        if char_code in self._dangerous_chars:
            char_name = unicodedata.name(char, f"U+{char_code:04X}")
            logger.error(
                "Unicode normalization resulted in dangerous character",
                extra={
                    "error_type": "unicode_normalization_error",
                    "file_name": filename,
                    "normalized_filename": normalized_filename,
                    "char_code": char_code,
                },
            )
            raise UnicodeSecurityError(
                message=(
                    "Unicode normalization resulted"
                    " in dangerous character:"
                    f" '{char}'"
                    f" (U+{char_code:04X}:"
                    f" {char_name})"
                ),
                filename=filename,
                dangerous_chars=[(char, char_code, 0)],
            )

    return normalized_filename

WindowsReservedNameError

Bases: FilenameSecurityError

Windows reserved device name used.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename using a reserved name.

None
reserved_name str | None

Optional specific reserved name detected.

None

Attributes:

Name Type Description
reserved_name

The specific reserved name that was detected.

Source code in safeuploads/exceptions.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class WindowsReservedNameError(FilenameSecurityError):
    """
    Windows reserved device name used.

    Args:
        message: Human-readable error description.
        filename: Optional filename using a reserved name.
        reserved_name: Optional specific reserved name detected.

    Attributes:
        reserved_name: The specific reserved name that was detected.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        reserved_name: str | None = None,
    ):
        """Initialize with reserved name."""
        self.reserved_name = reserved_name
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.WINDOWS_RESERVED_NAME,
        )

__init__

__init__(message, filename=None, reserved_name=None)

Initialize with reserved name.

Source code in safeuploads/exceptions.py
264
265
266
267
268
269
270
271
272
273
274
275
276
def __init__(
    self,
    message: str,
    filename: str | None = None,
    reserved_name: str | None = None,
):
    """Initialize with reserved name."""
    self.reserved_name = reserved_name
    super().__init__(
        message,
        filename=filename,
        error_code=ErrorCode.WINDOWS_RESERVED_NAME,
    )

WindowsSecurityValidator

Bases: BaseValidator

Validator for Windows reserved device names.

Attributes:

Name Type Description
config

File security configuration settings.

Source code in safeuploads/validators/windows_validator.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class WindowsSecurityValidator(BaseValidator):
    """
    Validator for Windows reserved device names.

    Attributes:
        config: File security configuration settings.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the validator.

        Args:
            config: File security configuration settings.
        """
        super().__init__(config)

    def validate_windows_reserved_names(self, filename: str) -> None:
        """
        Validate filename against Windows reserved device names.

        Args:
            filename: The filename to validate.

        Raises:
            WindowsReservedNameError: If filename matches a Windows
                reserved device name.
        """
        # Check iteratively by removing extensions
        # to handle compound extensions
        # e.g., "CON.tar.gz" -> check "con.tar" and "con"
        current_name = filename

        while current_name:
            # Get basename without extension
            name_without_ext, ext = os.path.splitext(current_name)

            # Normalize: lowercase, strip whitespace
            name_to_check = name_without_ext.lower().strip()
            # Remove leading dots for hidden files
            # like ".CON.jpg"
            name_to_check = name_to_check.lstrip(".")
            # Remove trailing dots to handle cases like "con." or "con.."
            name_to_check = name_to_check.rstrip(".")

            if name_to_check in self.config.WINDOWS_RESERVED_NAMES:
                logger.warning(
                    "Windows reserved name detected",
                    extra={
                        "error_type": "windows_reserved_name",
                        "file_name": filename,
                        "reserved_name": name_to_check.upper(),
                    },
                )
                raise WindowsReservedNameError(
                    message=(
                        f"Filename '{filename}' uses"
                        f" Windows reserved name"
                        f" '{name_to_check.upper()}'."
                        f" Reserved names:"
                        " {}".format(
                            ", ".join(
                                sorted(self.config.WINDOWS_RESERVED_NAMES)
                            ).upper()
                        )
                    ),
                    filename=filename,
                    reserved_name=name_to_check.upper(),
                )

            # If no extension was removed, we're done
            if not ext or name_without_ext == current_name:
                break

            current_name = name_without_ext

    def validate(self, filename: str) -> None:
        """
        Validate filename against Windows reserved naming rules.

        Args:
            filename: The filename to validate.

        Raises:
            WindowsReservedNameError: If filename matches a Windows
                reserved device name.
        """
        return self.validate_windows_reserved_names(filename)

__init__

__init__(config)

Initialize the validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration settings.

required
Source code in safeuploads/validators/windows_validator.py
27
28
29
30
31
32
33
34
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the validator.

    Args:
        config: File security configuration settings.
    """
    super().__init__(config)

validate

validate(filename)

Validate filename against Windows reserved naming rules.

Parameters:

Name Type Description Default
filename str

The filename to validate.

required

Raises:

Type Description
WindowsReservedNameError

If filename matches a Windows reserved device name.

Source code in safeuploads/validators/windows_validator.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def validate(self, filename: str) -> None:
    """
    Validate filename against Windows reserved naming rules.

    Args:
        filename: The filename to validate.

    Raises:
        WindowsReservedNameError: If filename matches a Windows
            reserved device name.
    """
    return self.validate_windows_reserved_names(filename)

validate_windows_reserved_names

validate_windows_reserved_names(filename)

Validate filename against Windows reserved device names.

Parameters:

Name Type Description Default
filename str

The filename to validate.

required

Raises:

Type Description
WindowsReservedNameError

If filename matches a Windows reserved device name.

Source code in safeuploads/validators/windows_validator.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def validate_windows_reserved_names(self, filename: str) -> None:
    """
    Validate filename against Windows reserved device names.

    Args:
        filename: The filename to validate.

    Raises:
        WindowsReservedNameError: If filename matches a Windows
            reserved device name.
    """
    # Check iteratively by removing extensions
    # to handle compound extensions
    # e.g., "CON.tar.gz" -> check "con.tar" and "con"
    current_name = filename

    while current_name:
        # Get basename without extension
        name_without_ext, ext = os.path.splitext(current_name)

        # Normalize: lowercase, strip whitespace
        name_to_check = name_without_ext.lower().strip()
        # Remove leading dots for hidden files
        # like ".CON.jpg"
        name_to_check = name_to_check.lstrip(".")
        # Remove trailing dots to handle cases like "con." or "con.."
        name_to_check = name_to_check.rstrip(".")

        if name_to_check in self.config.WINDOWS_RESERVED_NAMES:
            logger.warning(
                "Windows reserved name detected",
                extra={
                    "error_type": "windows_reserved_name",
                    "file_name": filename,
                    "reserved_name": name_to_check.upper(),
                },
            )
            raise WindowsReservedNameError(
                message=(
                    f"Filename '{filename}' uses"
                    f" Windows reserved name"
                    f" '{name_to_check.upper()}'."
                    f" Reserved names:"
                    " {}".format(
                        ", ".join(
                            sorted(self.config.WINDOWS_RESERVED_NAMES)
                        ).upper()
                    )
                ),
                filename=filename,
                reserved_name=name_to_check.upper(),
            )

        # If no extension was removed, we're done
        if not ext or name_without_ext == current_name:
            break

        current_name = name_without_ext

XmlSecurityValidator

Bases: BaseValidator

Validates XML-based activity files for XXE and entity attacks.

Uses defusedxml to parse XML safely. Rejects files containing DTD declarations, external entities, or excessive entity expansion.

Attributes:

Name Type Description
config

Security configuration for validation limits.

Source code in safeuploads/validators/xml_validator.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class XmlSecurityValidator(BaseValidator):
    """
    Validates XML-based activity files for XXE and entity attacks.

    Uses ``defusedxml`` to parse XML safely. Rejects files
    containing DTD declarations, external entities, or
    excessive entity expansion.

    Attributes:
        config: Security configuration for validation limits.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize the XML security validator.

        Args:
            config: Security configuration with file limits.
        """
        super().__init__(config)

    def validate_xml_safety(self, file_obj: SeekableFile) -> None:
        """
        Parse XML with XXE protections and validate structure.

        Args:
            file_obj: Seekable file containing XML data.

        Raises:
            FileProcessingError: If the XML is malformed, contains
                XXE attacks, or fails safety checks.
        """
        file_obj.seek(0)

        try:
            # defusedxml blocks external entities and
            # entity expansion by default.
            # forbid_dtd=True rejects ALL DTD declarations.
            DefusedET.parse(file_obj, forbid_dtd=True)
        except DefusedET.DTDForbidden as err:
            logger.warning("XML contains forbidden DTD declaration")
            raise FileProcessingError(
                "XML contains forbidden DTD declaration"
            ) from err
        except DefusedET.EntitiesForbidden as err:
            logger.warning("XML contains forbidden entity reference")
            raise FileProcessingError(
                "XML contains forbidden external entity"
            ) from err
        except DefusedET.ExternalReferenceForbidden as err:
            logger.warning("XML contains forbidden external reference")
            raise FileProcessingError(
                "XML contains forbidden external reference"
            ) from err
        except ParseError as err:
            logger.warning("Malformed XML: %s", err)
            raise FileProcessingError("Malformed XML content") from err
        except Exception as err:
            logger.warning("XML validation failed: %s", err)
            raise FileProcessingError("XML validation failed") from err

        file_obj.seek(0)

    def validate(self, file_obj: SeekableFile) -> None:
        """
        Validate XML file for security threats.

        Args:
            file_obj: Seekable file containing XML data.

        Raises:
            FileProcessingError: If the XML fails safety checks.
        """
        return self.validate_xml_safety(file_obj)

__init__

__init__(config)

Initialize the XML security validator.

Parameters:

Name Type Description Default
config FileSecurityConfig

Security configuration with file limits.

required
Source code in safeuploads/validators/xml_validator.py
34
35
36
37
38
39
40
41
def __init__(self, config: FileSecurityConfig):
    """
    Initialize the XML security validator.

    Args:
        config: Security configuration with file limits.
    """
    super().__init__(config)

validate

validate(file_obj)

Validate XML file for security threats.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file containing XML data.

required

Raises:

Type Description
FileProcessingError

If the XML fails safety checks.

Source code in safeuploads/validators/xml_validator.py
85
86
87
88
89
90
91
92
93
94
95
def validate(self, file_obj: SeekableFile) -> None:
    """
    Validate XML file for security threats.

    Args:
        file_obj: Seekable file containing XML data.

    Raises:
        FileProcessingError: If the XML fails safety checks.
    """
    return self.validate_xml_safety(file_obj)

validate_xml_safety

validate_xml_safety(file_obj)

Parse XML with XXE protections and validate structure.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file containing XML data.

required

Raises:

Type Description
FileProcessingError

If the XML is malformed, contains XXE attacks, or fails safety checks.

Source code in safeuploads/validators/xml_validator.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def validate_xml_safety(self, file_obj: SeekableFile) -> None:
    """
    Parse XML with XXE protections and validate structure.

    Args:
        file_obj: Seekable file containing XML data.

    Raises:
        FileProcessingError: If the XML is malformed, contains
            XXE attacks, or fails safety checks.
    """
    file_obj.seek(0)

    try:
        # defusedxml blocks external entities and
        # entity expansion by default.
        # forbid_dtd=True rejects ALL DTD declarations.
        DefusedET.parse(file_obj, forbid_dtd=True)
    except DefusedET.DTDForbidden as err:
        logger.warning("XML contains forbidden DTD declaration")
        raise FileProcessingError(
            "XML contains forbidden DTD declaration"
        ) from err
    except DefusedET.EntitiesForbidden as err:
        logger.warning("XML contains forbidden entity reference")
        raise FileProcessingError(
            "XML contains forbidden external entity"
        ) from err
    except DefusedET.ExternalReferenceForbidden as err:
        logger.warning("XML contains forbidden external reference")
        raise FileProcessingError(
            "XML contains forbidden external reference"
        ) from err
    except ParseError as err:
        logger.warning("Malformed XML: %s", err)
        raise FileProcessingError("Malformed XML content") from err
    except Exception as err:
        logger.warning("XML validation failed: %s", err)
        raise FileProcessingError("XML validation failed") from err

    file_obj.seek(0)

ZipBombError

Bases: CompressionSecurityError

Zip archive exceeds compression ratio or uncompressed size limits.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename of zip bomb.

None
compression_ratio float | None

Optional actual compression ratio detected.

None
uncompressed_size int | None

Optional total uncompressed size in bytes.

None
max_ratio float | None

Optional maximum allowed compression ratio.

None
max_size int | None

Optional maximum allowed uncompressed size in bytes.

None

Attributes:

Name Type Description
compression_ratio

Actual compression ratio detected.

uncompressed_size

Total uncompressed size in bytes.

max_ratio

Maximum allowed compression ratio.

max_size

Maximum allowed uncompressed size in bytes.

Source code in safeuploads/exceptions.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class ZipBombError(CompressionSecurityError):
    """
    Zip archive exceeds compression ratio or uncompressed size limits.

    Args:
        message: Human-readable error description.
        filename: Optional filename of zip bomb.
        compression_ratio: Optional actual compression ratio detected.
        uncompressed_size: Optional total uncompressed size in bytes.
        max_ratio: Optional maximum allowed compression ratio.
        max_size: Optional maximum allowed uncompressed size in bytes.

    Attributes:
        compression_ratio: Actual compression ratio detected.
        uncompressed_size: Total uncompressed size in bytes.
        max_ratio: Maximum allowed compression ratio.
        max_size: Maximum allowed uncompressed size in bytes.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        compression_ratio: float | None = None,
        uncompressed_size: int | None = None,
        max_ratio: float | None = None,
        max_size: int | None = None,
    ):
        """Initialize with compression details."""
        self.compression_ratio = compression_ratio
        self.uncompressed_size = uncompressed_size
        self.max_ratio = max_ratio
        self.max_size = max_size
        super().__init__(
            message,
            filename=filename,
            error_code=ErrorCode.ZIP_BOMB_DETECTED,
        )

__init__

__init__(
    message,
    filename=None,
    compression_ratio=None,
    uncompressed_size=None,
    max_ratio=None,
    max_size=None,
)

Initialize with compression details.

Source code in safeuploads/exceptions.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def __init__(
    self,
    message: str,
    filename: str | None = None,
    compression_ratio: float | None = None,
    uncompressed_size: int | None = None,
    max_ratio: float | None = None,
    max_size: int | None = None,
):
    """Initialize with compression details."""
    self.compression_ratio = compression_ratio
    self.uncompressed_size = uncompressed_size
    self.max_ratio = max_ratio
    self.max_size = max_size
    super().__init__(
        message,
        filename=filename,
        error_code=ErrorCode.ZIP_BOMB_DETECTED,
    )

ZipContentError

Bases: CompressionSecurityError

Zip archive contains dangerous content or structure.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
filename str | None

Optional filename of problematic archive.

None
threats list[str] | None

Optional list of detected threat descriptions.

None
error_code str | None

Optional error code (defaults to ZIP_CONTENT_THREAT).

None

Attributes:

Name Type Description
threats

List of detected threat descriptions.

Source code in safeuploads/exceptions.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
class ZipContentError(CompressionSecurityError):
    """
    Zip archive contains dangerous content or structure.

    Args:
        message: Human-readable error description.
        filename: Optional filename of problematic archive.
        threats: Optional list of detected threat descriptions.
        error_code: Optional error code (defaults to
            ZIP_CONTENT_THREAT).

    Attributes:
        threats: List of detected threat descriptions.
    """

    def __init__(
        self,
        message: str,
        filename: str | None = None,
        threats: list[str] | None = None,
        error_code: str | None = None,
    ):
        """Initialize with threat details."""
        self.threats = threats or []
        super().__init__(
            message,
            filename=filename,
            error_code=error_code or ErrorCode.ZIP_CONTENT_THREAT,
        )

__init__

__init__(
    message, filename=None, threats=None, error_code=None
)

Initialize with threat details.

Source code in safeuploads/exceptions.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def __init__(
    self,
    message: str,
    filename: str | None = None,
    threats: list[str] | None = None,
    error_code: str | None = None,
):
    """Initialize with threat details."""
    self.threats = threats or []
    super().__init__(
        message,
        filename=filename,
        error_code=error_code or ErrorCode.ZIP_CONTENT_THREAT,
    )

ZipContentInspector

Inspects ZIP archive contents for security threats.

Attributes:

Name Type Description
config

File security configuration.

Source code in safeuploads/inspectors/zip_inspector.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
class ZipContentInspector:
    """
    Inspects ZIP archive contents for security threats.

    Attributes:
        config: File security configuration.
    """

    def __init__(self, config: FileSecurityConfig):
        """
        Initialize ZIP inspector with configuration.

        Args:
            config: File security configuration.
        """
        self.config = config
        self._audit = SecurityAuditLogger(
            enabled=config.limits.enable_audit_logging
        )

        # Pre-compile pattern sets for O(1) lookups
        self._traversal_patterns: tuple[str, ...] = tuple(
            p.lower() for p in SuspiciousFilePattern.DIRECTORY_TRAVERSAL.value
        )
        self._suspicious_names: frozenset[str] = frozenset(
            n.lower() for n in SuspiciousFilePattern.SUSPICIOUS_NAMES.value
        )
        self._suspicious_paths: tuple[str, ...] = tuple(
            p.lower() for p in SuspiciousFilePattern.SUSPICIOUS_PATHS.value
        )
        self._nested_archive_exts: frozenset[str] = frozenset(
            ZipThreatCategory.NESTED_ARCHIVES.value
        )
        self._binary_exts: frozenset[str] = frozenset(
            ext for cat in BinaryFileCategory for ext in cat.value
        )
        self._exec_signatures: tuple[bytes, ...] = tuple(
            SuspiciousFilePattern.EXECUTABLE_SIGNATURES.value
        )

    def inspect_zip_content(self, file_obj: SeekableFile) -> None:
        """
        Inspect ZIP archive for potential security threats.

        Args:
            file_obj: Seekable file-like object containing ZIP data.

        Raises:
            ZipContentError: If security threats are detected in ZIP
                content such as directory traversal, symlinks, nested
                archives, or suspicious patterns.
            FileProcessingError: If ZIP structure is invalid or
                unexpected error occurs during inspection.
        """
        try:
            file_obj.seek(0)
            threats_found = []

            # Start analysis timer
            start_time = time.monotonic()

            with zipfile.ZipFile(file_obj, "r") as zip_file:
                zip_entries = zip_file.infolist()

                # Analyze each entry in the ZIP
                for entry in zip_entries:
                    # Check for timeout
                    if (
                        time.monotonic() - start_time
                        > self.config.limits.zip_analysis_timeout
                    ):
                        logger.error(
                            "ZIP content inspection timeout",
                            extra=log_extra(
                                {
                                    "error_type": "zip_analysis_timeout",
                                    "timeout": (
                                        self.config.limits.zip_analysis_timeout
                                    ),
                                }
                            ),
                        )
                        raise ZipContentError(
                            message=(
                                "ZIP content inspection"
                                " timeout after"
                                f" {self.config.limits.zip_analysis_timeout}s"
                            ),
                            threats=["Analysis timeout - potential zip bomb"],
                            error_code=ErrorCode.ZIP_ANALYSIS_TIMEOUT,
                        )

                    # Inspect individual entry
                    entry_threats = self._inspect_zip_entry(entry, zip_file)
                    threats_found.extend(entry_threats)

                # Check for ZIP structure threats
                structure_threats = self._inspect_zip_structure(zip_entries)
                threats_found.extend(structure_threats)

                # Return results
                if threats_found:
                    logger.warning(
                        "ZIP content threats detected",
                        extra=log_extra(
                            {
                                "error_type": "zip_content_threat",
                                "threats": threats_found,
                                "threat_count": len(threats_found),
                            }
                        ),
                    )
                    cid = get_correlation_id()
                    if cid:
                        self._audit.threat(
                            "",
                            cid,
                            "; ".join(threats_found),
                        )
                    raise ZipContentError(
                        message=(
                            "ZIP content threats"
                            " detected:"
                            f" {'; '.join(threats_found)}"
                        ),
                        threats=threats_found,
                    )

                logger.debug(
                    "ZIP content inspection passed: %s entries analyzed",
                    len(zip_entries),
                )

            # Recursive nested archive inspection
            # when nested archives are allowed
            if self.config.limits.allow_nested_archives:
                file_obj.seek(0)
                self.inspect_nested_archives(file_obj)

        except ZipContentError:
            # Re-raise our own exceptions
            raise
        except zipfile.BadZipFile as err:
            logger.error(
                "Invalid or corrupted ZIP file structure", exc_info=True
            )
            raise FileProcessingError(
                message="Invalid or corrupted ZIP file structure",
                original_error=err,
            ) from err
        except Exception as err:
            logger.error(
                "Unexpected error during ZIP content inspection",
                exc_info=True,
            )
            raise FileProcessingError(
                message="ZIP content inspection failed "
                "due to an internal error",
                original_error=err,
            ) from err

    def _inspect_zip_entry(
        self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
    ) -> list[str]:
        """
        Inspect single ZIP entry for security threats.

        Args:
            entry: ZIP entry metadata.
            zip_file: Parent ZIP archive.

        Returns:
            List of threat descriptions.
        """
        threats = []
        filename = entry.filename

        # 1. Check for null bytes (truncation attacks)
        if "\x00" in filename:
            threats.append(f"Null byte in filename: '{filename}'")

        # 2. Check for directory traversal attacks
        if self._has_directory_traversal(filename):
            threats.append(f"Directory traversal attack in '{filename}'")

        # 3. Check for absolute paths
        if (
            not self.config.limits.allow_absolute_paths
            and self._has_absolute_path(filename)
        ):
            threats.append(f"Absolute path detected in '{filename}'")

        # 4. Check for symbolic links
        if not self.config.limits.allow_symlinks and self._is_symlink(entry):
            threats.append(f"Symbolic link detected: '{filename}'")

        # 5. Check filename length limits
        if (
            len(os.path.basename(filename))
            > self.config.limits.max_filename_length
        ):
            threats.append(
                f"Filename too long: '{filename}'"
                f" ({len(os.path.basename(filename))}"
                " chars)"
            )

        # 6. Check path length limits
        if len(filename) > self.config.limits.max_path_length:
            threats.append(
                f"Path too long: '{filename}' ({len(filename)} chars)"
            )

        # 7. Check for suspicious filename patterns
        suspicious_patterns = self._check_suspicious_patterns(filename)
        threats.extend(suspicious_patterns)

        # 8. Check for nested archives
        if (
            not self.config.limits.allow_nested_archives
            and self._is_nested_archive(filename)
        ):
            threats.append(f"Nested archive detected: '{filename}'")

        # 9. Check file content if enabled
        # Only first 512 bytes are read, so no size gate needed
        if self.config.limits.scan_zip_content and not entry.is_dir():
            content_threats = self._inspect_entry_content(entry, zip_file)
            threats.extend(content_threats)

        return threats

    def _inspect_zip_structure(
        self, entries: list[zipfile.ZipInfo]
    ) -> list[str]:
        """
        Inspect ZIP structure for anomalies.

        Args:
            entries: All ZIP entries to analyze.

        Returns:
            List of structural threat descriptions.
        """
        threats = []

        # Check directory depth
        max_depth = 0
        for entry in entries:
            depth = entry.filename.count("/") + entry.filename.count("\\")
            max_depth = max(max_depth, depth)

        if max_depth > self.config.limits.max_zip_depth:
            threats.append(
                f"Excessive directory depth: {max_depth}"
                f" (max: {self.config.limits.max_zip_depth})"
            )

        # Check for suspicious file distribution
        file_types = {}
        for entry in entries:
            if not entry.is_dir():
                ext = os.path.splitext(entry.filename)[1].lower()
                file_types[ext] = file_types.get(ext, 0) + 1

        # Check for excessive number of same-type files (potential spam/bomb)
        for ext, count in file_types.items():
            if count > self.config.limits.max_number_files_same_type:
                threats.append(
                    f"Excessive number of {ext} files:"
                    f" {self.config.limits.max_number_files_same_type}"
                )

        return threats

    def _has_directory_traversal(self, filename: str) -> bool:
        """
        Check for directory traversal indicators.

        Args:
            filename: Filename to check.

        Returns:
            True if traversal detected.
        """
        filename_lower = filename.lower()

        for pattern in self._traversal_patterns:
            if pattern in filename_lower:
                return True

        # Additional checks for normalized paths
        normalized = os.path.normpath(filename)
        return (
            normalized.startswith("..")
            or "/.." in normalized
            or "\\.." in normalized
        )

    def _has_absolute_path(self, filename: str) -> bool:
        """
        Check if filename is an absolute path.

        Args:
            filename: Path to check.

        Returns:
            True if absolute path detected.
        """
        return (
            filename.startswith(("/", "\\"))  # Unix/Windows path
            or (len(filename) > 1 and filename[1] == ":")  # Windows drive path
        )

    def _is_symlink(self, entry: zipfile.ZipInfo) -> bool:
        """
        Check if entry is a symbolic link.

        Args:
            entry: ZIP entry to check.

        Returns:
            True if entry is a symlink.
        """
        # Check if entry has symlink attributes
        return (entry.external_attr >> 16) & 0o120000 == 0o120000

    def _check_suspicious_patterns(self, filename: str) -> list[str]:
        """
        Check filename for suspicious patterns.

        Args:
            filename: Filename to check.

        Returns:
            List of pattern warnings.
        """
        threats = []
        filename_lower = filename.lower()
        basename = os.path.basename(filename_lower)

        # Check suspicious names
        for pattern in self._suspicious_names:
            if basename == pattern:
                threats.append(f"Suspicious filename pattern: '{filename}'")
                break

        # Check suspicious path components
        for pattern in self._suspicious_paths:
            if pattern in filename_lower:
                threats.append(
                    "Suspicious path component:"
                    f" '{filename}' contains"
                    f" '{pattern}'"
                )
                break

        return threats

    def _is_nested_archive(self, filename: str) -> bool:
        """
        Check if filename represents a nested archive.

        Args:
            filename: Filename to check.

        Returns:
            True if nested archive detected.
        """
        ext = os.path.splitext(filename)[1].lower()
        return ext in self._nested_archive_exts

    def _inspect_entry_content(
        self, entry: zipfile.ZipInfo, zip_file: zipfile.ZipFile
    ) -> list[str]:
        """
        Inspect ZIP entry content for malicious signatures.

        Args:
            entry: ZIP entry to inspect.
            zip_file: Parent ZIP archive.

        Returns:
            List of content threat descriptions.
        """
        threats = []

        try:
            # Read first few bytes to check for executable signatures
            with zip_file.open(entry, "r") as file:
                content_sample = file.read(512)  # Read first 512 bytes

                # Check for executable signatures
                for signature in self._exec_signatures:
                    if content_sample.startswith(signature):
                        threats.append(
                            "Executable content"
                            f" detected in"
                            f" '{entry.filename}'"
                        )
                        break

                ext = os.path.splitext(entry.filename)[1].lower()
                if (
                    ext not in self._binary_exts
                    and self._contains_script_patterns(
                        content_sample, entry.filename
                    )
                ):
                    threats.append(
                        f"Script content detected in '{entry.filename}'"
                    )

        except Exception as err:
            logger.warning(
                "Could not inspect content of '%s': %s",
                entry.filename,
                err,
            )

        return threats

    def _contains_script_patterns(self, content: bytes, filename: str) -> bool:
        """
        Check content for malicious script patterns.

        Args:
            content: Raw bytes to inspect.
            filename: Filename for context.

        Returns:
            True if script patterns found.
        """
        try:
            # Try to decode as text
            text_content = content.decode("utf-8", errors="ignore").lower()

            # Check for common script patterns
            script_patterns = [
                "#!/bin/",
                "#!/usr/bin/",
                "powershell",
                "cmd.exe",
                "eval(",
                "exec(",
                "system(",
                "shell_exec(",
                "<script",
                "<?php",
                "<%",
                "import os",
                "import subprocess",
            ]

            for pattern in script_patterns:
                if pattern in text_content:
                    return True

        except Exception:
            # If we can't decode as text, it's probably binary
            logger.debug(
                "Could not decode content of '%s' as text",
                filename,
            )

        return False

    # ----------------------------------------------------------------
    # Recursive / quine / complexity detection
    # ----------------------------------------------------------------

    def _compute_archive_hash(self, file_obj: SeekableFile) -> str:
        """
        Compute SHA-256 hash of archive content.

        Args:
            file_obj: Seekable file containing the archive.

        Returns:
            Hex digest string.
        """
        file_obj.seek(0)
        h = hashlib.sha256()
        while True:
            chunk = file_obj.read(65536)
            if not chunk:
                break
            h.update(chunk)
        file_obj.seek(0)
        return h.hexdigest()

    def inspect_nested_archives(
        self,
        file_obj: SeekableFile,
        *,
        depth: int = 0,
        seen_hashes: set[str] | None = None,
        total_entries: int = 0,
        start_time: float | None = None,
    ) -> None:
        """
        Recursively inspect nested archives.

        Only called when ``allow_nested_archives`` is True.
        Tracks depth, cumulative entry count, elapsed time,
        and archive hashes to detect recursive/quine
        structures.

        Args:
            file_obj: Seekable file containing ZIP data.
            depth: Current nesting depth (0 = outermost).
            seen_hashes: Set of SHA-256 hashes already seen.
            total_entries: Cumulative entry count so far.
            start_time: Monotonic timestamp of initial call.

        Raises:
            ZipContentError: If recursive structure, quine,
                or complexity attack is detected.
        """
        if seen_hashes is None:
            seen_hashes = set()
        if start_time is None:
            start_time = time.monotonic()

        max_depth = self.config.limits.max_zip_depth
        timeout = self.config.limits.zip_analysis_timeout
        max_entries = self.config.limits.max_total_entries_recursive

        # Depth check
        if depth > max_depth:
            raise ZipContentError(
                message=(
                    f"Excessive nesting depth: {depth} (max {max_depth})"
                ),
                threats=[f"Nesting depth {depth} exceeds limit {max_depth}"],
                error_code=(ErrorCode.ZIP_RECURSIVE_STRUCTURE),
            )

        # Quine / recursive check via hash
        archive_hash = self._compute_archive_hash(file_obj)
        if archive_hash in seen_hashes:
            raise ZipContentError(
                message=(
                    "Recursive ZIP structure detected"
                    " — archive contains itself"
                ),
                threats=["Quine/recursive ZIP detected"],
                error_code=ErrorCode.ZIP_QUINE_DETECTED,
            )
        seen_hashes.add(archive_hash)

        file_obj.seek(0)

        try:
            with zipfile.ZipFile(file_obj, "r") as zf:
                entries = zf.infolist()
                total_entries += len(entries)

                # Complexity check
                if total_entries > max_entries:
                    raise ZipContentError(
                        message=(
                            "Total recursive entries"
                            f" ({total_entries})"
                            " exceeds limit"
                            f" ({max_entries})"
                        ),
                        threats=[
                            f"Complexity attack: {total_entries} entries"
                        ],
                        error_code=(ErrorCode.ZIP_COMPLEXITY_ATTACK),
                    )

                for entry in entries:
                    # Timeout
                    elapsed = time.monotonic() - start_time
                    if elapsed > timeout:
                        raise ZipContentError(
                            message=(
                                "Recursive inspection"
                                " timeout after"
                                f" {timeout}s"
                            ),
                            threats=["Recursive inspection timeout"],
                            error_code=(ErrorCode.ZIP_ANALYSIS_TIMEOUT),
                        )

                    if entry.is_dir():
                        continue

                    ext = os.path.splitext(entry.filename)[1].lower()
                    is_archive = any(
                        ext == a
                        for a in (
                            ".zip",
                            ".jar",
                            ".war",
                            ".ear",
                        )
                    )
                    if not is_archive:
                        continue

                    # Size guard for nested archive
                    if (
                        entry.file_size
                        > self.config.limits.max_individual_file_size
                    ):
                        continue

                    try:
                        data = zf.read(entry.filename)
                    except Exception:
                        logger.warning(
                            "Could not read nested archive '%s'",
                            entry.filename,
                        )
                        continue

                    nested_buf = io.BytesIO(data)
                    if not zipfile.is_zipfile(nested_buf):
                        continue

                    # Recurse
                    self.inspect_nested_archives(
                        nested_buf,
                        depth=depth + 1,
                        seen_hashes=seen_hashes,
                        total_entries=total_entries,
                        start_time=start_time,
                    )

        except ZipContentError:
            raise
        except zipfile.BadZipFile:
            pass  # Let outer handler deal with it
        except Exception as err:
            logger.warning(
                "Error during recursive inspection at depth %d: %s",
                depth,
                err,
            )

__init__

__init__(config)

Initialize ZIP inspector with configuration.

Parameters:

Name Type Description Default
config FileSecurityConfig

File security configuration.

required
Source code in safeuploads/inspectors/zip_inspector.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self, config: FileSecurityConfig):
    """
    Initialize ZIP inspector with configuration.

    Args:
        config: File security configuration.
    """
    self.config = config
    self._audit = SecurityAuditLogger(
        enabled=config.limits.enable_audit_logging
    )

    # Pre-compile pattern sets for O(1) lookups
    self._traversal_patterns: tuple[str, ...] = tuple(
        p.lower() for p in SuspiciousFilePattern.DIRECTORY_TRAVERSAL.value
    )
    self._suspicious_names: frozenset[str] = frozenset(
        n.lower() for n in SuspiciousFilePattern.SUSPICIOUS_NAMES.value
    )
    self._suspicious_paths: tuple[str, ...] = tuple(
        p.lower() for p in SuspiciousFilePattern.SUSPICIOUS_PATHS.value
    )
    self._nested_archive_exts: frozenset[str] = frozenset(
        ZipThreatCategory.NESTED_ARCHIVES.value
    )
    self._binary_exts: frozenset[str] = frozenset(
        ext for cat in BinaryFileCategory for ext in cat.value
    )
    self._exec_signatures: tuple[bytes, ...] = tuple(
        SuspiciousFilePattern.EXECUTABLE_SIGNATURES.value
    )

inspect_nested_archives

inspect_nested_archives(
    file_obj,
    *,
    depth=0,
    seen_hashes=None,
    total_entries=0,
    start_time=None,
)

Recursively inspect nested archives.

Only called when allow_nested_archives is True. Tracks depth, cumulative entry count, elapsed time, and archive hashes to detect recursive/quine structures.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file containing ZIP data.

required
depth int

Current nesting depth (0 = outermost).

0
seen_hashes set[str] | None

Set of SHA-256 hashes already seen.

None
total_entries int

Cumulative entry count so far.

0
start_time float | None

Monotonic timestamp of initial call.

None

Raises:

Type Description
ZipContentError

If recursive structure, quine, or complexity attack is detected.

Source code in safeuploads/inspectors/zip_inspector.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
def inspect_nested_archives(
    self,
    file_obj: SeekableFile,
    *,
    depth: int = 0,
    seen_hashes: set[str] | None = None,
    total_entries: int = 0,
    start_time: float | None = None,
) -> None:
    """
    Recursively inspect nested archives.

    Only called when ``allow_nested_archives`` is True.
    Tracks depth, cumulative entry count, elapsed time,
    and archive hashes to detect recursive/quine
    structures.

    Args:
        file_obj: Seekable file containing ZIP data.
        depth: Current nesting depth (0 = outermost).
        seen_hashes: Set of SHA-256 hashes already seen.
        total_entries: Cumulative entry count so far.
        start_time: Monotonic timestamp of initial call.

    Raises:
        ZipContentError: If recursive structure, quine,
            or complexity attack is detected.
    """
    if seen_hashes is None:
        seen_hashes = set()
    if start_time is None:
        start_time = time.monotonic()

    max_depth = self.config.limits.max_zip_depth
    timeout = self.config.limits.zip_analysis_timeout
    max_entries = self.config.limits.max_total_entries_recursive

    # Depth check
    if depth > max_depth:
        raise ZipContentError(
            message=(
                f"Excessive nesting depth: {depth} (max {max_depth})"
            ),
            threats=[f"Nesting depth {depth} exceeds limit {max_depth}"],
            error_code=(ErrorCode.ZIP_RECURSIVE_STRUCTURE),
        )

    # Quine / recursive check via hash
    archive_hash = self._compute_archive_hash(file_obj)
    if archive_hash in seen_hashes:
        raise ZipContentError(
            message=(
                "Recursive ZIP structure detected"
                " — archive contains itself"
            ),
            threats=["Quine/recursive ZIP detected"],
            error_code=ErrorCode.ZIP_QUINE_DETECTED,
        )
    seen_hashes.add(archive_hash)

    file_obj.seek(0)

    try:
        with zipfile.ZipFile(file_obj, "r") as zf:
            entries = zf.infolist()
            total_entries += len(entries)

            # Complexity check
            if total_entries > max_entries:
                raise ZipContentError(
                    message=(
                        "Total recursive entries"
                        f" ({total_entries})"
                        " exceeds limit"
                        f" ({max_entries})"
                    ),
                    threats=[
                        f"Complexity attack: {total_entries} entries"
                    ],
                    error_code=(ErrorCode.ZIP_COMPLEXITY_ATTACK),
                )

            for entry in entries:
                # Timeout
                elapsed = time.monotonic() - start_time
                if elapsed > timeout:
                    raise ZipContentError(
                        message=(
                            "Recursive inspection"
                            " timeout after"
                            f" {timeout}s"
                        ),
                        threats=["Recursive inspection timeout"],
                        error_code=(ErrorCode.ZIP_ANALYSIS_TIMEOUT),
                    )

                if entry.is_dir():
                    continue

                ext = os.path.splitext(entry.filename)[1].lower()
                is_archive = any(
                    ext == a
                    for a in (
                        ".zip",
                        ".jar",
                        ".war",
                        ".ear",
                    )
                )
                if not is_archive:
                    continue

                # Size guard for nested archive
                if (
                    entry.file_size
                    > self.config.limits.max_individual_file_size
                ):
                    continue

                try:
                    data = zf.read(entry.filename)
                except Exception:
                    logger.warning(
                        "Could not read nested archive '%s'",
                        entry.filename,
                    )
                    continue

                nested_buf = io.BytesIO(data)
                if not zipfile.is_zipfile(nested_buf):
                    continue

                # Recurse
                self.inspect_nested_archives(
                    nested_buf,
                    depth=depth + 1,
                    seen_hashes=seen_hashes,
                    total_entries=total_entries,
                    start_time=start_time,
                )

    except ZipContentError:
        raise
    except zipfile.BadZipFile:
        pass  # Let outer handler deal with it
    except Exception as err:
        logger.warning(
            "Error during recursive inspection at depth %d: %s",
            depth,
            err,
        )

inspect_zip_content

inspect_zip_content(file_obj)

Inspect ZIP archive for potential security threats.

Parameters:

Name Type Description Default
file_obj SeekableFile

Seekable file-like object containing ZIP data.

required

Raises:

Type Description
ZipContentError

If security threats are detected in ZIP content such as directory traversal, symlinks, nested archives, or suspicious patterns.

FileProcessingError

If ZIP structure is invalid or unexpected error occurs during inspection.

Source code in safeuploads/inspectors/zip_inspector.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def inspect_zip_content(self, file_obj: SeekableFile) -> None:
    """
    Inspect ZIP archive for potential security threats.

    Args:
        file_obj: Seekable file-like object containing ZIP data.

    Raises:
        ZipContentError: If security threats are detected in ZIP
            content such as directory traversal, symlinks, nested
            archives, or suspicious patterns.
        FileProcessingError: If ZIP structure is invalid or
            unexpected error occurs during inspection.
    """
    try:
        file_obj.seek(0)
        threats_found = []

        # Start analysis timer
        start_time = time.monotonic()

        with zipfile.ZipFile(file_obj, "r") as zip_file:
            zip_entries = zip_file.infolist()

            # Analyze each entry in the ZIP
            for entry in zip_entries:
                # Check for timeout
                if (
                    time.monotonic() - start_time
                    > self.config.limits.zip_analysis_timeout
                ):
                    logger.error(
                        "ZIP content inspection timeout",
                        extra=log_extra(
                            {
                                "error_type": "zip_analysis_timeout",
                                "timeout": (
                                    self.config.limits.zip_analysis_timeout
                                ),
                            }
                        ),
                    )
                    raise ZipContentError(
                        message=(
                            "ZIP content inspection"
                            " timeout after"
                            f" {self.config.limits.zip_analysis_timeout}s"
                        ),
                        threats=["Analysis timeout - potential zip bomb"],
                        error_code=ErrorCode.ZIP_ANALYSIS_TIMEOUT,
                    )

                # Inspect individual entry
                entry_threats = self._inspect_zip_entry(entry, zip_file)
                threats_found.extend(entry_threats)

            # Check for ZIP structure threats
            structure_threats = self._inspect_zip_structure(zip_entries)
            threats_found.extend(structure_threats)

            # Return results
            if threats_found:
                logger.warning(
                    "ZIP content threats detected",
                    extra=log_extra(
                        {
                            "error_type": "zip_content_threat",
                            "threats": threats_found,
                            "threat_count": len(threats_found),
                        }
                    ),
                )
                cid = get_correlation_id()
                if cid:
                    self._audit.threat(
                        "",
                        cid,
                        "; ".join(threats_found),
                    )
                raise ZipContentError(
                    message=(
                        "ZIP content threats"
                        " detected:"
                        f" {'; '.join(threats_found)}"
                    ),
                    threats=threats_found,
                )

            logger.debug(
                "ZIP content inspection passed: %s entries analyzed",
                len(zip_entries),
            )

        # Recursive nested archive inspection
        # when nested archives are allowed
        if self.config.limits.allow_nested_archives:
            file_obj.seek(0)
            self.inspect_nested_archives(file_obj)

    except ZipContentError:
        # Re-raise our own exceptions
        raise
    except zipfile.BadZipFile as err:
        logger.error(
            "Invalid or corrupted ZIP file structure", exc_info=True
        )
        raise FileProcessingError(
            message="Invalid or corrupted ZIP file structure",
            original_error=err,
        ) from err
    except Exception as err:
        logger.error(
            "Unexpected error during ZIP content inspection",
            exc_info=True,
        )
        raise FileProcessingError(
            message="ZIP content inspection failed "
            "due to an internal error",
            original_error=err,
        ) from err

ZipThreatCategory

Bases: Enum

Categories of potentially harmful contents within ZIP archives.

Attributes:

Name Type Description
NESTED_ARCHIVES

Archive format threats.

EXECUTABLE_FILES

Executable content threats.

SCRIPT_FILES

Script and code threats.

SYSTEM_FILES

System and configuration threats.

RECURSIVE_STRUCTURE

Self-referencing ZIP structures.

QUINE_ARCHIVE

ZIP that contains a copy of itself.

COMPLEXITY_ATTACK

Algorithmic complexity exploits.

Source code in safeuploads/enums.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
class ZipThreatCategory(Enum):
    """
    Categories of potentially harmful contents within ZIP archives.

    Attributes:
        NESTED_ARCHIVES: Archive format threats.
        EXECUTABLE_FILES: Executable content threats.
        SCRIPT_FILES: Script and code threats.
        SYSTEM_FILES: System and configuration threats.
        RECURSIVE_STRUCTURE: Self-referencing ZIP structures.
        QUINE_ARCHIVE: ZIP that contains a copy of itself.
        COMPLEXITY_ATTACK: Algorithmic complexity exploits.
    """

    # Archive format threats
    NESTED_ARCHIVES = {
        ".zip",
        ".rar",
        ".7z",
        ".tar",
        ".gz",
        ".bz2",
        ".xz",
        ".tar.gz",
        ".tar.bz2",
        ".tar.xz",
        ".tgz",
        ".tbz2",
    }

    # Executable content threats
    EXECUTABLE_FILES = {
        ".exe",
        ".com",
        ".bat",
        ".cmd",
        ".scr",
        ".pif",
        ".bin",
        ".run",
        ".app",
        ".deb",
        ".rpm",
        ".msi",
    }

    # Script and code threats
    SCRIPT_FILES = {
        ".js",
        ".vbs",
        ".ps1",
        ".sh",
        ".bash",
        ".py",
        ".php",
        ".pl",
        ".rb",
        ".lua",
        ".asp",
        ".jsp",
    }

    # System and configuration threats
    SYSTEM_FILES = {
        ".dll",
        ".so",
        ".dylib",
        ".sys",
        ".drv",
        ".inf",
        ".reg",
        ".cfg",
        ".conf",
        ".ini",
    }

    # Self-referencing ZIP structures
    RECURSIVE_STRUCTURE = {"recursive_zip"}

    # ZIP that contains a copy of itself
    QUINE_ARCHIVE = {"quine_zip"}

    # Algorithmic complexity exploits
    COMPLEXITY_ATTACK = {"complexity_attack"}

get_correlation_id

get_correlation_id()

Return the current correlation ID.

Returns:

Type Description
str | None

Correlation ID string or None if not set.

Source code in safeuploads/audit.py
33
34
35
36
37
38
39
40
def get_correlation_id() -> str | None:
    """
    Return the current correlation ID.

    Returns:
        Correlation ID string or None if not set.
    """
    return correlation_id_var.get()

set_correlation_id

set_correlation_id(cid=None)

Set a new correlation ID for the current context.

Parameters:

Name Type Description Default
cid str | None

Explicit ID to use. Generates a UUID4 if None.

None

Returns:

Type Description
str

The correlation ID that was set.

Source code in safeuploads/audit.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def set_correlation_id(cid: str | None = None) -> str:
    """
    Set a new correlation ID for the current context.

    Args:
        cid: Explicit ID to use. Generates a UUID4 if None.

    Returns:
        The correlation ID that was set.
    """
    if cid is None:
        cid = uuid.uuid4().hex
    correlation_id_var.set(cid)
    return cid