s3_loader
Documentation for read_from_s3
¶
Functionality¶
Reads a file from S3 and returns its content wrapped in a BytesIO object. In case the file is not found (HTTP 404), it logs an error and returns None. Other S3 errors raise the original exception.
Parameters¶
client
: Boto3 S3 client used to download the file.bucket
: Name of the S3 bucket.file
: The key or path of the file in the bucket.
Usage¶
- Purpose: Download a file from S3 as a BytesIO stream.
Example¶
from boto3 import client
s3_client = client("s3")
file_stream = read_from_s3(s3_client, "mybucket", "file.txt")
if file_stream:
content = file_stream.read()
print(content)
Documentation for AwsS3DataLoader
¶
Functionality¶
The AwsS3DataLoader class is a DataLoader implementation that loads data items from AWS S3 buckets. It supports retry strategies for handling failures during downloads and allows custom authentication methods such as role-based access, direct keys, or anonymous access.
Inheritance¶
Inherits from DataLoader.
Motivation¶
This class is designed to efficiently load data from AWS S3 by providing mechanisms for error handling and flexible authentication. Its retry capabilities ensure robustness in unstable network conditions.
Parameters¶
retry_config
: Configuration for retry logic during operations.features
: Expected dataset features schema.kwargs
: Additional parameters for AWS S3 credentials configuration.
Usage¶
- Purpose - Load and process data from AWS S3 buckets with reliable error handling and customizable authentication.
Example¶
loader = AwsS3DataLoader(retry_config=my_retry, features=my_features)
data = loader.load_data('bucket_name', 'file_key')
Documentation for AwsS3DataLoader.item_meta_cls
¶
Functionality¶
Returns the class used to represent metadata for files stored in AWS S3. This property returns the S3FileMeta
class, which encapsulates metadata associated with S3 objects.
Parameters¶
This is a property and does not accept any parameters.
Usage¶
- Purpose: To obtain the metadata class type configured for the S3 data loader.
Example¶
from embedding_studio.data_storage.loaders.cloud_storage.s3.s3_loader import AwsS3DataLoader
# create an instance (provide necessary parameters)
loader = AwsS3DataLoader()
meta_cls = loader.item_meta_cls
print(meta_cls)
Documentation for AwsS3DataLoader._get_default_retry_config
¶
Functionality¶
Creates a default retry configuration for S3 operations. It sets up retry parameters for credential acquisition and data downloads using predefined timeout and attempt values from the settings.
Parameters¶
This method does not accept any parameters.
Usage¶
- Purpose: Provides a default retry configuration when no custom configuration is specified.
Example¶
config = AwsS3DataLoader._get_default_retry_config()
print(config)
Documentation for AwsS3DataLoader._read_from_s3
¶
Functionality¶
Reads a file from an AWS S3 bucket using retry capabilities. This method calls a helper function to download the file and returns a BytesIO object with its contents.
Parameters¶
client
: A boto3 S3 client with valid AWS credentials.bucket
: S3 bucket name as a non-empty string.file
: File key or path in the bucket as a non-empty string.
Usage¶
- Purpose: Reliably download a file from S3 with automatic retries.
- Decorated with @retry_method to handle intermittent failures.
Example¶
client = s3_loader._get_client("task_id")
file_obj = s3_loader._read_from_s3(client, "my-bucket", "folder/data.txt")
Documentation for AwsS3DataLoader._get_client
¶
Functionality¶
Obtains an S3 client with proper credentials. If access keys are not provided and use_system_info
is False, an anonymous client is created. Alternatively, role-based credentials are assumed. The method is retried upon failure for reliability.
Parameters¶
task_id
: Unique identifier for the session, used as the role session name.
Usage¶
- Purpose: Create a boto3 S3 client configured with either direct or role-based credentials.
Example¶
# Example usage of _get_client
loader = AwsS3DataLoader(credentials=aws_creds)
client = loader._get_client(task_id="session123")
Documentation for AwsS3DataLoader._get_item
¶
Functionality¶
Processes a downloaded file stored as a BytesIO object and returns it. By default, it returns the file unmodified. This method acts as a hook for subclasses to override for custom processing, such as decoding JSON or other file formats.
Parameters¶
file
: An io.BytesIO object containing the downloaded file content.
Return Value¶
- The processed data item. By default, the original BytesIO object is returned.
Usage¶
- Purpose: To convert raw S3 file data into a format that can be further processed by the application.
Example¶
# Example subclass overriding _get_item
def _get_item(self, file: io.BytesIO) -> dict:
import json
data = json.loads(file.read().decode('utf-8'))
return data
Documentation for AwsS3DataLoader._get_data_from_s3
¶
Functionality¶
Main method to download and process data from AWS S3. It accepts a list of S3FileMeta objects representing files to download and process. It connects to AWS S3 using a generated client, downloads files, and uses caching to avoid redundant downloads.
Parameters¶
files
: A list of S3FileMeta objects with metadata for each file.ignore_failures
: Boolean flag. If True, continues on failure; otherwise, raises an exception.
Usage¶
- Purpose: Retrieve and process data from AWS S3 with error handling.
Example¶
files = [
S3FileMeta(bucket="mybucket", file="file1.csv"),
S3FileMeta(bucket="mybucket", file="file2.csv")
]
data = list(loader._get_data_from_s3(files))
for d, meta in data:
print(d)
Documentation for AwsS3DataLoader._process_file_meta
¶
Functionality¶
Processes a single file metadata to download a file from AWS S3 and prepare its associated data objects. It uses the provided S3 client to download data (if not cached), handles errors based on the ignore_failures flag, and yields tuples containing a data dictionary and its corresponding file metadata.
Parameters¶
s3_client
: The boto3 S3 client configured to access AWS S3.file_meta
: An instance of S3FileMeta containing metadata for the file.ignore_failures
: A boolean flag that, if True, prevents the raising of exceptions on failure.uploaded
: A dictionary cache for downloaded files, using a tuple of (bucket, file) as the key.
Usage¶
- Purpose: To download a file from S3 based on its metadata and yield processed data objects for further use.
Example¶
Assuming file_meta
is a valid S3FileMeta object and s3_client
is set up:
for data, meta in loader._process_file_meta(
s3_client, file_meta, ignore_failures=True, uploaded={}
):
print(data, meta)
Documentation for AwsS3DataLoader._download_and_get_item
¶
Functionality¶
This method downloads a file from S3 if it is not already cached. It uses the provided S3 client to fetch the file as a stream and processes it into a usable data item. The downloaded item is then stored in a cache dictionary to avoid redundant downloads.
Parameters¶
s3_client
: Configured boto3 S3 client for S3 operations.file_meta
: S3FileMeta object containing bucket and file path info.uploaded
: Cache dictionary with keys as (bucket, file) and values as the downloaded item.
Usage¶
- Purpose: Retrieve and cache file data from AWS S3 storage.
Example¶
Assuming s3_client
is configured and file_meta
is an S3FileMeta:
uploaded = {}
item = loader._download_and_get_item(s3_client, file_meta, uploaded)
Documentation for AwsS3DataLoader._yield_item_objects
¶
Functionality¶
Yields data objects based on item data and its S3 metadata. It handles both lists and single items.
Parameters¶
item
: Downloaded or retrieved item data.file_meta
: S3FileMeta object with item metadata.
Usage¶
- Purpose: Generates tuples of item dictionary and metadata. Use this method to iterate over items from S3 files.
Example¶
Assume item
is a list:
for obj, meta in loader._yield_item_objects(item, file_meta):
print(obj, meta)
Documentation for AwsS3DataLoader._create_item_object
¶
Functionality¶
This method creates a dictionary object from the item data and associates it with its S3 metadata. If the loader's features property is not set or if the item is not a dictionary, the item is stored under the key "item". Otherwise, the item dictionary is merged into the resulting dictionary.
Parameters¶
item
: The data content to be converted into a dictionary.file_meta
: An S3FileMeta object containing metadata (e.g., the item's id).
Usage¶
- Purpose: Encapsulates raw item data and metadata into a uniform tuple for further processing.
Example¶
Suppose an item and metadata are provided as follows:
item = {"name": "example"}
file_meta.id = "123"
The method returns a tuple like:
({"item_id": "123", "name": "example"}, file_meta)
Documentation for AwsS3DataLoader._generate_dataset_from_s3
¶
Functionality¶
This generator function converts a list of S3 file metadata into a stream of item dictionaries. It iterates over S3 data and yields each item, making it suitable for dataset creation.
Parameters¶
files
: List of S3FileMeta objects representing the files to load.
Usage¶
- Purpose: Transforms S3 file metadata into a dataset generator, enabling easy loading into a Hugging Face Dataset.
Example¶
s3_loader = AwsS3DataLoader(
retry_config=my_retry_config,
features=my_features
)
dataset = Dataset.from_generator(
lambda: s3_loader._generate_dataset_from_s3(s3_files),
features=my_features
)
Documentation for AwsS3DataLoader.load
¶
Functionality¶
This method loads data from S3 files into a Hugging Face Dataset. It converts a list of S3FileMeta objects into a dataset using the Dataset.from_generator function. The method reads file contents and builds a dataset with the provided features.
Parameters¶
items_data (List[S3FileMeta])
: A list of S3FileMeta objects describing the files to load from AWS S3.
Return Value¶
Dataset
: A Hugging Face Dataset object containing the loaded data.
Usage¶
- Purpose: To efficiently load and structure data from AWS S3 for analysis or machine learning tasks.
Example¶
loader = AwsS3DataLoader(features=features)
dataset = loader.load(items_data)
Documentation for AwsS3DataLoader.load_items
¶
Functionality¶
Loads individual items from S3 files and returns a list of DownloadedItem objects containing both data and metadata. Unlike load(), this method creates a list rather than a Dataset.
Parameters¶
items_data (List[S3FileMeta])
: List of S3 file metadata to load.
Usage¶
- Purpose: To load individual items from S3 and wrap them in a DownloadedItem structure.
Example¶
# Initialize loader with necessary configuration
loader = AwsS3DataLoader(retry_config=retry_conf, features=features)
items = loader.load_items(items_data)
Documentation for AwsS3DataLoader._load_batch_with_offset
¶
Functionality¶
This method loads a batch of files from AWS S3 using pagination. It connects to S3, retrieves files starting at a specified offset, and returns a list of DownloadedItem objects containing the file key, content, and metadata.
Parameters¶
offset
: The starting offset used as the token for pagination.batch_size
: The maximum number of files to load in the batch.**kwargs
: Additional keyword arguments, including the keybucket
for the S3 bucket name.
Usage¶
- Purpose: Internally used to load files in manageable chunks from an S3 bucket with retry and error handling.
Example¶
loader = AwsS3DataLoader(...)
batch = loader._load_batch_with_offset(0, 10, bucket="my-s3-bucket")
for item in batch:
print(item.id, item.data)
Documentation for AwsS3DataLoader.load_all
¶
Functionality¶
This method is a generator that continuously retrieves data batches from one or more S3 buckets. Each iteration yields a batch of downloaded items, processing them in manageable chunks.
Parameters¶
batch_size
: Defines the number of items in each batch.**kwargs
: Additional keyword arguments. Should include:buckets
: List of S3 bucket names to load files from.
Usage¶
- Purpose: Load large S3 datasets in smaller batches, improving memory usage and enabling incremental processing.
Example¶
loader = AwsS3DataLoader()
for batch in loader.load_all(batch_size=10, buckets=["my-bucket"]):
# Process each batch of downloaded items
process(batch)
Documentation for AwsS3DataLoader.total_count
¶
Functionality¶
Returns the total count of items available in the S3 bucket. The base method returns None because S3 does not provide an efficient way to count objects without listing them.
Parameters¶
kwargs
: Additional parameters for configuration, such as the S3 bucket name.
Usage¶
- Purpose: Retrieve the item count when available using S3 APIs.
Example¶
# Example override implementation
def total_count(self, **kwargs) -> Optional[int]:
try:
response = self._get_client(str(uuid.uuid4())).list_objects_v2(
Bucket=kwargs['bucket'], MaxKeys=0
)
return response.get('KeyCount', 0)
except Exception:
return None