Skip to main content

Milvus

Overview

This page guides you through the process of setting up the Milvus destination connector.

There are three parts to this:

  • Processing - split up individual records in chunks so they will fit the context window and decide which fields to use as context and which are supplementary metadata.
  • Embedding - convert the text into a vector representation using a pre-trained model (Currently, OpenAI's text-embedding-ada-002 and Cohere's embed-english-light-v2.0 are supported.)
  • Indexing - store the vectors in a vector database for similarity search

Prerequisites

To use the Milvus destination, you'll need:

  • An account with API access for OpenAI or Cohere (depending on which embedding method you want to use)
  • Either a running self-managed Milvus instance or a Zilliz account

You'll need the following information to configure the destination:

  • Embedding service API Key - The API key for your OpenAI or Cohere account
  • Milvus Endpoint URL - The URL of your Milvus instance
  • Either Milvus API token or Milvus Instance Username and Password
  • Milvus Collection name - The name of the collection to load data into

Features

FeatureSupported?Notes
Full Refresh SyncYes
Incremental - Append SyncYes
Incremental - Append + DedupedYesDeleting records via CDC is not supported (see issue #29827)
PartitionsNo
Record-defined IDNoAuto-id needs to be enabled

Configuration

Processing

Each record will be split into text fields and meta fields as configured in the "Processing" section. All text fields are concatenated into a single string and then split into chunks of configured length. If specified, the metadata fields are stored as-is along with the embedded text chunks.

When specifying text fields, you can access nested fields in the record by using dot notation, e.g. user.name will access the name field in the user object. It's also possible to use wildcards to access all fields in an object, e.g. users.*.name will access all names fields in all entries of the users array.

The chunk length is measured in tokens produced by the tiktoken library. The maximum is 8191 tokens, which is the maximum length supported by the text-embedding-ada-002 model.

The stream name gets added as a metadata field _ab_stream to each document. If available, the primary key of the record is used to identify the document to avoid duplications when updated versions of records are indexed. It is added as the _ab_record_id metadata field.

Embedding

The connector can use one of the following embedding methods:

  1. OpenAI - using OpenAI API , the connector will produce embeddings using the text-embedding-ada-002 model with 1536 dimensions. This integration will be constrained by the speed of the OpenAI embedding API.

  2. Cohere - using the Cohere API, the connector will produce embeddings using the embed-english-light-v2.0 model with 1024 dimensions.

For testing purposes, it's also possible to use the Fake embeddings integration. It will generate random embeddings and is suitable to test a data pipeline without incurring embedding costs.

Indexing

To get started, create a new collection in your Milvus instance. Make sure that

  • The primary key field is set to auto_id
  • There is a vector field with the correct dimensionality (1536 for OpenAI, 1024 for Cohere) and a configured index

Setting up a collection

When using the Zilliz cloud, this can be done using the UI - in this case only the colleciton name and the vector dimensionality needs to be configured, the vector field with index will be automatically created under the name vector. Using the REST API, the following command will create the index:

POST /v1/vector/collections/create
{
"collectionName": "my-collection",
"dimension": 1536,
"metricType": "L2",
"vectorField": "vector",
“primaryField”: “pk”
}

When using a self-hosted Milvus clustger, the collection needs to be created using the Milvus CLI or Python client. The following commands will create a collection set up for loading data via Airbyte:

from pymilvus import CollectionSchema, FieldSchema, DataType

pk = FieldSchema(name="pk",dtype=DataType.INT64, is_primary=True, auto_id=True)
vector = FieldSchema(name="vector",dtype=DataType.FLOAT_VECTOR,dim=1536)
schema = CollectionSchema(fields=[id, vector], enable_dynamic_field=True)
collection = Collection(name="test_collection", schema=schema)
collection.create_index(field_name="vector", index_params={ "metric_type":"L2", "index_type":"IVF_FLAT", "params":{"nlist":1024} })

Langchain integration

To initialize a langchain vector store based on the indexed data, use the following code:

embeddings = OpenAIEmbeddings(openai_api_key="my-key")
vector_store = Milvus(embeddings=embeddings, collection_name="my-collection", connection_args={"uri": "my-zilliz-endpoint", "token": "my-api-key"})
vector_store.fields.append("text")
# call vs.fields.append() for all fields you need from the metadata

vector_store.similarity_search("test")

CHANGELOG

VersionDatePull RequestSubject
0.0.12023-08-12#29442Milvus connector with some embedders