Data Sources Integration Tutorial for Embedding Studio¶
This tutorial will guide you through integrating various data sources with Embedding Studio. You'll learn how to load data from PostgreSQL databases, cloud storage providers like Amazon S3 and Google Cloud Storage, and how to combine multiple data sources in a unified architecture.
1. Introduction to Data Loaders¶
Data loaders are the foundation of Embedding Studio's data integration layer. They handle:
- Connecting to data sources
- Fetching raw data
- Transforming data into a format suitable for embedding
- Managing pagination and batching for large datasets
All data loaders implement the DataLoader
abstract base class, which defines a standard interface:
class DataLoader(ABC):
@property
@abstractmethod
def item_meta_cls(self) -> Type[ItemMeta]:
"""Returns the ItemMeta class used by this loader."""
@abstractmethod
def load(self, items_data: List[ItemMeta]) -> Dataset:
"""Loads items as a Hugging Face Dataset."""
@abstractmethod
def load_items(self, items_data: List[ItemMeta]) -> List[DownloadedItem]:
"""Loads specific items by their metadata."""
@abstractmethod
def _load_batch_with_offset(self, offset: int, batch_size: int, **kwargs) -> List[DownloadedItem]:
"""Loads a batch of items with pagination."""
@abstractmethod
def total_count(self, **kwargs) -> Optional[int]:
"""Returns the total count of available items."""
def load_all(self, batch_size: int, **kwargs) -> Generator[List[DownloadedItem], None, None]:
"""Generator that loads all items in batches."""
2. Core Data Loader Components¶
ItemMeta Classes¶
Each data loader has a corresponding ItemMeta
class that stores metadata about items:
S3FileMeta
: Metadata for files in Amazon S3 bucketsGCPFileMeta
: Metadata for files in Google Cloud StoragePgsqlFileMeta
: Metadata for rows in PostgreSQL tables
These classes provide a consistent way to identify and reference items across different data sources:
class BucketFileMeta(ItemMeta):
bucket: str # Storage bucket name
file: str # File path within the bucket
index: Optional[int] # Optional index for sub-items
@property
def derived_id(self) -> str:
"""Creates a unique identifier from bucket and file."""
if self.index is None:
return f"{self.bucket}/{self.file}"
else:
return f"{self.bucket}/{self.file}:{self.index}"
Query Generators¶
SQL-based loaders use query generators to build database queries:
class QueryGenerator:
def __init__(self, table_name: str, engine: Engine) -> None:
"""Initialize with a table name and database engine."""
self.table_name = table_name
self.engine = engine
self.metadata = MetaData()
def fetch_all(self, row_ids: List[str]) -> Select:
"""Generate a query to fetch multiple rows by ID."""
def one(self, row_id: str) -> Select:
"""Generate a query to fetch a single row by ID."""
def all(self, offset: int, batch_size: int) -> Select:
"""Generate a paginated query."""
def count(self) -> Select:
"""Generate a count query."""
3. PostgreSQL Integration¶
PostgreSQL integration allows you to load data directly from relational databases.
Basic Setup¶
from embedding_studio.data_storage.loaders.sql.pgsql.pgsql_text_loader import PgsqlTextLoader
from embedding_studio.data_storage.loaders.sql.query_generator import QueryGenerator
# Define database connection
connection_string = "postgresql://username:password@hostname:5432/database"
# Create loader
data_loader = PgsqlTextLoader(
connection_string=connection_string,
query_generator=QueryGenerator, # Use default query generator
text_column="content" # Column containing text data
)
Custom Query Generator¶
For more advanced database schemas, create a custom query generator:
from sqlalchemy import Engine, MetaData, Select, Table, func, select
from embedding_studio.data_storage.loaders.sql.query_generator import QueryGenerator
class ArticleQueryGenerator(QueryGenerator):
def __init__(self, engine: Engine) -> None:
super().__init__("articles", engine)
self.metadata = MetaData()
self.articles_table = None
self.authors_table = None
self.categories_table = None
def _init_tables(self):
if self.articles_table is None:
# Load tables with reflection
self.articles_table = Table(self.table_name, self.metadata, autoload_with=self.engine)
self.authors_table = Table("authors", self.metadata, autoload_with=self.engine)
self.categories_table = Table("categories", self.metadata, autoload_with=self.engine)
def all(self, offset: int, batch_size: int) -> Select:
"""Join articles with authors and categories."""
self._init_tables()
return (
select(
self.articles_table,
self.authors_table.c.name.label("author_name"),
self.categories_table.c.name.label("category_name"),
# Create a rich text field combining multiple columns
func.concat(
"Title: ", self.articles_table.c.title, "\n",
"Author: ", self.authors_table.c.name, "\n",
"Category: ", self.categories_table.c.name, "\n",
"Content: ", self.articles_table.c.content
).label("rich_text")
)
.join(
self.authors_table,
self.articles_table.c.author_id == self.authors_table.c.id
)
.join(
self.categories_table,
self.articles_table.c.category_id == self.categories_table.c.id
)
.order_by(self.articles_table.c.id)
.limit(batch_size)
.offset(offset)
)
Working with Multiple Text Columns¶
When your data has multiple text fields, use PgsqlMultiTextColumnLoader
:
from embedding_studio.data_storage.loaders.sql.pgsql.pgsql_multi_text_column_loader import PgsqlMultiTextColumnLoader
# Load multiple text columns
data_loader = PgsqlMultiTextColumnLoader(
connection_string=connection_string,
query_generator=QueryGenerator,
text_columns=["title", "summary", "content"] # Process all these columns
)
Loading JSON Data¶
For JSON data stored in PostgreSQL:
from embedding_studio.data_storage.loaders.sql.pgsql.pgsql_jsonb_loader import PgsqlJSONBLoader
# Load JSONB data
data_loader = PgsqlJSONBLoader(
connection_string=connection_string,
query_generator=QueryGenerator,
jsonb_column="data", # Column containing JSONB
fields_to_keep=["title", "tags"] # Extract only these fields
)
4. Amazon S3 Integration¶
Amazon S3 integration allows you to work with files stored in S3 buckets.
Authentication¶
from embedding_studio.data_storage.loaders.cloud_storage.s3.s3_text_loader import AwsS3TextLoader
# With explicit credentials
data_loader = AwsS3TextLoader(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY"
)
# With IAM role (for EC2/ECS/Lambda)
data_loader = AwsS3TextLoader(
role_arn="arn:aws:iam::123456789012:role/your-role",
external_id="optional-external-id" # If required by the role
)
# With anonymous access (for public buckets)
data_loader = AwsS3TextLoader(
use_system_info=True # Use credentials from environment or instance profile
)
Loading Text Files¶
from embedding_studio.data_storage.loaders.cloud_storage.s3.item_meta import S3FileMeta
# Create loader
text_loader = AwsS3TextLoader(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY",
encoding="utf-8" # Specify text encoding
)
# Load specific text files
items = text_loader.load_items([
S3FileMeta(bucket="my-data-bucket", file="documents/doc1.txt"),
S3FileMeta(bucket="my-data-bucket", file="documents/doc2.txt")
])
# Work with loaded items
for item in items:
print(f"ID: {item.id}")
print(f"Content: {item.data[:100]}...") # First 100 chars
Loading JSON Files¶
from embedding_studio.data_storage.loaders.cloud_storage.s3.s3_json_loader import AwsS3JSONLoader
# Create loader with field filtering
json_loader = AwsS3JSONLoader(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY",
fields_to_keep=["id", "title", "description", "tags"],
encoding="utf-8"
)
# Load JSON files
items = json_loader.load_items([
S3FileMeta(bucket="my-data-bucket", file="data/item1.json"),
S3FileMeta(bucket="my-data-bucket", file="data/item2.json")
])
Loading Images¶
from embedding_studio.data_storage.loaders.cloud_storage.s3.s3_image_loader import AwsS3ImageLoader
from PIL import Image
# Create image loader
image_loader = AwsS3ImageLoader(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY"
)
# Load image files (returns PIL Image objects)
items = image_loader.load_items([
S3FileMeta(bucket="my-data-bucket", file="images/img1.jpg"),
S3FileMeta(bucket="my-data-bucket", file="images/img2.png")
])
# Process images
for item in items:
img = item.data # PIL Image object
width, height = img.size
print(f"Image {item.id}: {width}x{height}")
Loading Entire Buckets¶
# Load all text files from a bucket in batches
for batch in text_loader.load_all(batch_size=100, buckets=["my-data-bucket"]):
for item in batch:
# Process each item in the batch
process_text(item.data)
5. Google Cloud Storage Integration¶
Google Cloud Storage integration is similar to Amazon S3 but uses GCP-specific authentication.
Authentication¶
from embedding_studio.data_storage.loaders.cloud_storage.gcp.gcp_text_loader import GCPTextLoader
# With service account credentials file
data_loader = GCPTextLoader(
credentials_path="./path/to/service-account.json",
use_system_info=False
)
# With application default credentials
data_loader = GCPTextLoader(
use_system_info=True # Use ADC from environment
)
Loading Text Files¶
from embedding_studio.data_storage.loaders.cloud_storage.gcp.item_meta import GCPFileMeta
# Create loader
text_loader = GCPTextLoader(
use_system_info=True,
encoding="utf-8"
)
# Load specific text files
items = text_loader.load_items([
GCPFileMeta(bucket="my-gcp-bucket", file="documents/doc1.txt"),
GCPFileMeta(bucket="my-gcp-bucket", file="documents/doc2.txt")
])
Loading JSON and Images¶
The pattern is similar to S3 loaders:
from embedding_studio.data_storage.loaders.cloud_storage.gcp.gcp_json_loader import GCPJSONLoader
from embedding_studio.data_storage.loaders.cloud_storage.gcp.gcp_image_loader import GCPImageLoader
# JSON loader
json_loader = GCPJSONLoader(
use_system_info=True,
fields_to_keep=["name", "description", "metadata"],
encoding="utf-8"
)
# Image loader
image_loader = GCPImageLoader(
use_system_info=True
)
6. Aggregated Data Loading¶
The AggregatedDataLoader
is a powerful component that lets you combine multiple data loaders into a single unified interface. This is particularly useful when your data is distributed across different systems or when you want to create embeddings from heterogeneous data sources.
Basic Usage¶
Here's how to combine multiple data sources:
from embedding_studio.data_storage.loaders.aggregated_data_loader import AggregatedDataLoader
from embedding_studio.data_storage.loaders.item_meta import ItemMetaWithSourceInfo
class CustomItemMeta(ItemMetaWithSourceInfo):
"""Custom item metadata with source tracking."""
pass
# Create specialized loaders for each source
postgres_loader = PgsqlTextLoader(
connection_string="postgresql://user:pass@host:5432/db",
query_generator=QueryGenerator,
text_column="content"
)
s3_loader = AwsS3TextLoader(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY",
encoding="utf-8"
)
gcp_loader = GCPTextLoader(
use_system_info=True,
encoding="utf-8"
)
# Combine loaders into a single interface
aggregated_loader = AggregatedDataLoader(
{
"postgres": postgres_loader,
"s3": s3_loader,
"gcp": gcp_loader
},
item_meta_cls=CustomItemMeta
)
# Load items from different sources through one interface
items = aggregated_loader.load_items([
CustomItemMeta(source_name="postgres", object_id="row_123"),
CustomItemMeta(source_name="s3", bucket="my-bucket", file="doc.txt"),
CustomItemMeta(source_name="gcp", bucket="gcp-bucket", file="file.txt")
])
Real-World Example with PostgreSQL Sources¶
A common scenario is loading data from multiple database tables using specialized query generators. Here's a real-world example that loads data from both models and datasets tables in PostgreSQL:
from embedding_studio.data_storage.loaders.aggregated_data_loader import AggregatedDataLoader
from embedding_studio.data_storage.loaders.sql.pgsql.pgsql_multi_text_column_loader import PgsqlMultiTextColumnLoader
from plugins.custom.data_storage.loaders.hf_models_query_generator import HugsearchModelsQueryGenerator
from plugins.custom.data_storage.loaders.hf_datasets_query_generator import HugsearchDatasetsQueryGenerator
from plugins.custom.data_storage.loaders.item_meta import PgsqlItemMetaWithSourceInfo
# Database connection string
connection_string = "postgresql://username:password@hostname:5432/database"
# Create an aggregated loader that combines models and datasets sources
self.data_loader = AggregatedDataLoader(
{
"hf_models": PgsqlMultiTextColumnLoader(
connection_string=connection_string,
query_generator=HugsearchModelsQueryGenerator,
text_columns=["id", "readme", "tags"],
),
"hf_datasets": PgsqlMultiTextColumnLoader(
connection_string=connection_string,
query_generator=HugsearchDatasetsQueryGenerator,
text_columns=["id", "readme", "tags"],
),
},
item_meta_cls=PgsqlItemMetaWithSourceInfo,
)
In this example:
- Two specialized query generators (
HugsearchModelsQueryGenerator
andHugsearchDatasetsQueryGenerator
) are used to build tailored SQL queries for different tables - Both loaders use the
PgsqlMultiTextColumnLoader
to load multiple text columns from each table - The
PgsqlItemMetaWithSourceInfo
class tracks which source each item came from - All these loaders are combined into a single
AggregatedDataLoader
with source name keys
How AggregatedDataLoader Works¶
The AggregatedDataLoader
routes requests to the appropriate loader based on the source_name
property in each item's metadata:
- When you call
load_items()
, it groups items by theirsource_name
- It then dispatches each group to the corresponding loader
- Results from all loaders are combined and returned as a single list
- The same process happens for all other loader methods like
load()
andload_all()
This allows transparent access to multiple data sources while maintaining the standard DataLoader
interface.
Custom ItemMeta with Source Information¶
To use AggregatedDataLoader
, your ItemMeta class must extend ItemMetaWithSourceInfo
, which adds a source_name
field:
class PgsqlItemMetaWithSourceInfo(ItemMetaWithSourceInfo):
"""Metadata for a PostgreSQL item with source tracking."""
@property
def derived_id(self) -> str:
"""
Return unique ID combining source name and object ID.
Format: source_name:object_id
"""
return f"{self.source_name}:{self.object_id}"
The source_name
field tells the aggregated loader which sub-loader should handle each item.
Benefits of Using AggregatedDataLoader¶
- Unified Interface: Access multiple data sources through a single consistent API
- Source Tracking: Each item maintains information about its origin
- Scalability: Easily add new data sources without changing client code
- Heterogeneous Data: Combine different types of data (text, images, structured) in one pipeline
7. Using Data Loaders in Plugins¶
To integrate a data loader into your Embedding Studio plugin:
from embedding_studio.core.plugin import FineTuningMethod
from embedding_studio.data_storage.loaders.data_loader import DataLoader
class MyPlugin(FineTuningMethod):
def __init__(self):
# Configure your data loader in the plugin initialization
self.data_loader = PgsqlTextLoader(
connection_string="postgresql://user:pass@host:5432/db",
query_generator=QueryGenerator,
text_column="content"
)
# ... other initializations
def get_data_loader(self) -> DataLoader:
"""Return the configured data loader."""
return self.data_loader
8. Best Practices¶
Error Handling¶
Data loaders include built-in retry logic for transient failures:
# Configure custom retry settings
from embedding_studio.workers.fine_tuning.utils.config import RetryConfig, RetryParams
retry_config = RetryConfig(
default_params=RetryParams(
max_attempts=5,
wait_time_seconds=2.0
)
)
data_loader = PgsqlTextLoader(
connection_string=connection_string,
query_generator=QueryGenerator,
retry_config=retry_config # Custom retry configuration
)
Credentials Management¶
Always store credentials securely:
- Use environment variables when possible
- Consider AWS Secrets Manager, GCP Secret Manager, or HashiCorp Vault
- For development, use
.env
files (but exclude from version control) - For production, use IAM roles/service accounts with minimal permissions
Performance Optimization¶
For large datasets:
- Use appropriate batch sizes (typically 100-1000 items)
- Consider implementing pagination in custom query generators
- Monitor memory usage when processing large files
- For very large datasets, use streaming approaches rather than loading all at once
# Process a large bucket in manageable batches
for batch in loader.load_all(batch_size=500, buckets=["large-data-bucket"]):
# Process each batch incrementally
process_batch(batch)
# Explicitly free memory if needed
del batch
gc.collect()
Caching Strategies¶
For infrequently changing data:
- Implement a local cache of downloaded files
- Use ETags or last-modified timestamps to detect changes
- Consider Redis or a similar service for distributed caching
- For cloud storage, check if CDN caching is appropriate
# Simple caching example (for illustration)
class CachedLoader:
def __init__(self, loader, cache_dir):
self.loader = loader
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def load_items(self, items_data):
results = []
for item in items_data:
cache_path = os.path.join(self.cache_dir, f"{item.id}.cache")
if os.path.exists(cache_path):
# Load from cache
with open(cache_path, 'rb') as f:
results.append(pickle.load(f))
else:
# Load from source
loaded_item = self.loader.load_items([item])[0]
# Cache for next time
with open(cache_path, 'wb') as f:
pickle.dump(loaded_item, f)
results.append(loaded_item)
return results