Source code for async_storages.s3

from io import BytesIO
import mimetypes
from pathlib import PurePosixPath
from typing import Any, BinaryIO, override

from async_storages.base import BaseStorage
from async_storages.utils import secure_filename

try:
    import aioboto3
    from botocore.exceptions import ClientError
except ImportError:
    raise ImportError(
        "'aioboto3' is not installed. Install with 'fastapi-async-storages[s3]'."
    )


[docs] class S3Storage(BaseStorage): """ Asynchronous storage backend for Amazon S3-compatible object storage. This class provides async methods for uploading, retrieving, and deleting files in an S3 bucket using the ``aioboto3`` client. Credentials can be provided explicitly via ``aws_access_key_id`` and ``aws_secret_access_key`` (useful in development with MinIO or similar), or omitted to let ``aioboto3`` resolve them automatically through the standard AWS credential chain (environment variables, IAM roles, instance metadata, etc.). :param bucket_name: Name of the S3 bucket. :type bucket_name: str :param endpoint_url: The S3 endpoint hostname (without protocol). Optional; when omitted the SDK uses the default AWS S3 endpoint for the region. :type endpoint_url: str or None :param aws_access_key_id: AWS access key ID for authentication. Optional; when omitted, credentials are resolved via the AWS credential chain. :type aws_access_key_id: str or None :param aws_secret_access_key: AWS secret access key for authentication. Optional; when omitted, credentials are resolved via the AWS credential chain. :type aws_secret_access_key: str or None :param region_name: AWS region name (optional). :type region_name: str or None :param use_ssl: Whether to use HTTPS (True) or HTTP (False). :type use_ssl: bool :param default_acl: Default Access Control List (ACL) to apply when uploading files. :type default_acl: str or None :param custom_domain: Custom domain for serving files (e.g. CDN). :type custom_domain: str or None :param querystring_auth: Whether to generate presigned URLs with query parameters. :type querystring_auth: bool :raises ImportError: If ``aioboto3`` is not installed. """ def __init__( self, bucket_name: str, endpoint_url: str | None = None, aws_access_key_id: str | None = None, aws_secret_access_key: str | None = None, region_name: str | None = None, use_ssl: bool = True, default_acl: str | None = None, custom_domain: str | None = None, querystring_auth: bool = False, ) -> None: if endpoint_url is not None: assert not endpoint_url.startswith("http"), ( "Endpoint should not contain protocol" ) self.bucket_name: str = bucket_name self.endpoint_url: str | None = ( endpoint_url.rstrip("/") if endpoint_url else None ) self.aws_access_key_id: str | None = aws_access_key_id self.aws_secret_access_key: str | None = aws_secret_access_key self.region_name: str | None = region_name self.use_ssl: bool = use_ssl self.default_acl: str | None = default_acl self.custom_domain: str | None = custom_domain self.querystring_auth: bool = querystring_auth self._http_scheme: str = "https" if self.use_ssl else "http" self._url: str | None = ( f"{self._http_scheme}://{self.endpoint_url}" if self.endpoint_url else None ) self._session: "aioboto3.Session" = aioboto3.Session() def _get_s3_client(self) -> Any: kwargs: dict[str, Any] = { "region_name": self.region_name, "use_ssl": self.use_ssl, } if self._url is not None: kwargs["endpoint_url"] = self._url if self.aws_access_key_id is not None: kwargs["aws_access_key_id"] = self.aws_access_key_id if self.aws_secret_access_key is not None: kwargs["aws_secret_access_key"] = self.aws_secret_access_key return self._session.client("s3", **kwargs)
[docs] @override def get_name(self, name: str) -> str: """ Sanitize and normalize a file path before uploading to S3. Removes unsafe path components (``..`` or ``.``) and ensures each segment is a secure filename. :param name: Original file name or path. :type name: str :return: Sanitized file path. :rtype: str """ parts = PurePosixPath(name).parts safe_parts = [ secure_filename(part) for part in parts if part not in ("..", ".", "") ] if not safe_parts: raise ValueError("Invalid object key") return str(PurePosixPath(*safe_parts))
[docs] @override async def get_size(self, name: str) -> int: """ Retrieve the size of an S3 object in bytes. :param name: The object key (path) in the S3 bucket. :type name: str :return: The file size in bytes, or ``0`` if the object does not exist. :rtype: int :raises botocore.exceptions.ClientError: If an unexpected S3 error occurs. """ name = self.get_name(name) async with self._get_s3_client() as s3_client: try: res = await s3_client.head_object(Bucket=self.bucket_name, Key=name) return int(res.get("ContentLength", 0)) except ClientError as e: code = e.response.get("Error", {}).get("Code") status = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode") if code in ("NoSuchKey", "NotFound") or status == 404: return 0 raise
[docs] @override async def get_path(self, name: str) -> str: """ Generate a URL for accessing an S3 object. If ``custom_domain`` is set, returns a static URL using that domain. If ``querystring_auth`` is True, returns a presigned URL with temporary access. :param name: The object key (path) in the S3 bucket. :type name: str :return: A direct or presigned URL for the file. :rtype: str """ if self.custom_domain: return f"{self._http_scheme}://{self.custom_domain}/{name}" elif self.querystring_auth: async with self._get_s3_client() as s3_client: params = {"Bucket": self.bucket_name, "Key": name} return await s3_client.generate_presigned_url( "get_object", Params=params ) else: if self.endpoint_url: url = f"{self._http_scheme}://{self.endpoint_url}/{self.bucket_name}/{name}" else: # Default S3 URL format when no custom endpoint is provided url = ( f"{self._http_scheme}://{self.bucket_name}.s3.amazonaws.com/{name}" ) return url
[docs] @override async def open(self, name: str) -> BytesIO: """ Open an object from S3 and return it as an in-memory binary stream. This method fetches the file contents asynchronously and returns a ``BytesIO`` object positioned at the start of the file. :param name: The object key (path) in the S3 bucket. :type name: str :return: A BytesIO object containing the file's contents. :rtype: BytesIO :raises FileNotFoundError: If the object is not found. :raises botocore.exceptions.ClientError: If the object cannot be fetched. """ name = self.get_name(name) async with self._get_s3_client() as s3_client: try: response = await s3_client.get_object(Bucket=self.bucket_name, Key=name) except ClientError as e: code = e.response.get("Error", {}).get("Code") if code in ("NoSuchKey", "NotFound"): raise FileNotFoundError( f"Object not found in bucket: {name}" ) from e raise async with response["Body"] as stream: data = await stream.read() return BytesIO(data)
[docs] @override async def upload(self, file: BinaryIO, name: str) -> str: """ Upload a file object to the configured S3 bucket. :param file: Binary file-like object to upload. :type file: BinaryIO :param name: Target object key (path) in the S3 bucket. :type name: str :return: The name or key of the uploaded object. :rtype: str :raises botocore.exceptions.ClientError: If the upload fails. """ name = self.get_name(name) content_type, _ = mimetypes.guess_type(name) extra_args = {"ContentType": content_type or "application/octet-stream"} if self.default_acl: extra_args["ACL"] = self.default_acl async with self._get_s3_client() as s3_client: file.seek(0) await s3_client.put_object( Bucket=self.bucket_name, Key=name, Body=file, **extra_args ) return name
[docs] @override async def delete(self, name: str) -> None: """ Delete an object from the S3 bucket. :param name: The object key (path) to delete. :type name: str :return: None :rtype: None :raises botocore.exceptions.ClientError: If the delete operation fails. """ async with self._get_s3_client() as s3_client: try: await s3_client.delete_object(Bucket=self.bucket_name, Key=name) except ClientError as e: if e.response.get("Error", {}).get("Code") != "NoSuchKey": raise