Skip to content

qdrant_vector

AsyncQdrantVector

Source code in src/agere/addons/qdrant_vector.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
class AsyncQdrantVector:

    def __init__(
        self,
        position: str,
        position_type: Literal["memory", "disk", "server", "cloud"],
        api_key: str | None = None,
        text_splitter: TextSplitterInterface | None = None,
        logger: Logger | None = None,
    ):
        _import_qdrant_client()
        self.logger = logger or get_null_logger()
        if position_type == "memory":
            self.async_qdrant_client = AsyncQdrantClient(location=":memory:")
        elif position_type == "disk":
            self.async_qdrant_client = AsyncQdrantClient(path=position)
        elif position_type == "server":
            if api_key:
                self.async_qdrant_client = AsyncQdrantClient(url=position, api_key=api_key)
            else:
                self.async_qdrant_client = AsyncQdrantClient(url=position)
        elif position_type == "cloud":
            self.async_qdrant_client = AsyncQdrantClient(url=position, api_key=api_key)
        self.text_splitter = text_splitter

    def set_embedding_model(self, embedding_model_name: str) -> None:
        _import_fastembed()
        self.async_qdrant_client.set_model(embedding_model_name)

    @property
    def default_vector_size(self) -> int:
        _import_fastembed()
        return self._get_fastembed_model_params(
            model_name=self.async_qdrant_client.embedding_model_name
        )[0]

    def split(self, text: str) -> Iterable[str]:
        """Split the text.

        When specified a splitter, it will use that splitter to split the text,
        otherwise, return the original text as a list.
        """
        if self.text_splitter is None:
            return [text]
        return self.text_splitter.split(text)

    def _get_fastembed_model_params(self, model_name: str) -> tuple[int, models.Distance]:
        _import_fastembed()
        from qdrant_client.async_qdrant_fastembed import SUPPORTED_EMBEDDING_MODELS
        if model_name not in SUPPORTED_EMBEDDING_MODELS:
            raise ValueError(
                f"Unsupported embedding model: {model_name}. Supported models: {SUPPORTED_EMBEDDING_MODELS}"
            )
        return SUPPORTED_EMBEDDING_MODELS[model_name]

    async def create_collection(
        self,
        collection_name: str,
        vectors_config: types.VectorParams | Mapping[str, types.VectorParams] | None = None,
        sparse_vectors_config: Mapping[str, types.SparseVectorParams] | None = None,
        init_from_collection_name: str | None = None,
        **kwargs,
    ) -> bool:
        """Creates empty collection with given parameters.

        Arguments:
            collection_name: The name of the collection to create.
            vectors_config: Specify vectors config. Left to be None if using fastembed.
            sparse_vectors_config: Specify the sparse vectors config.
            init_from_collection_name: Use data stored in another collection to initialize this collection.

        Returns:
            Operation result.
        """
        return await self.async_qdrant_client.create_collection(
            collection_name=collection_name,
            vectors_config=vectors_config or self.async_qdrant_client.get_fastembed_vector_params(),
            sparse_vectors_config=sparse_vectors_config,
            init_from=models.InitFrom(
                collection=init_from_collection_name
            ) if init_from_collection_name is not None else None,
            **kwargs
        )

    async def recreate_collection(
        self,
        collection_name: str,
        vectors_config: types.VectorParams | Mapping[str, types.VectorParams] | None = None,
        sparse_vectors_config: Mapping[str, types.SparseVectorParams] | None = None,
        **kwargs,
    ) -> bool:
        """Delete and create empty collection with given parameters.

        Arguments:
            collection_name: The name of the collection to create.
            vectors_config: Specify vectors config. Left to be None if using fastembed.
            sparse_vectors_config: Specify the sparse vectors config.

        Returns:
            Operation result.
        """
        return await self.async_qdrant_client.recreate_collection(
            collection_name=collection_name,
            vectors_config=vectors_config or self.async_qdrant_client.get_fastembed_vector_params(),
            sparse_vectors_config=sparse_vectors_config,
            **kwargs
        )

    async def get_collection(self, collection_name: str) -> types.CollectionInfo:
        """Gets the a collections based upon collection name.

        Returns:
            CollectionInfo: Collection Information from Qdrant about collection.
        """
        collection_info = await self.async_qdrant_client.get_collection(collection_name=collection_name)
        return collection_info

    async def delete_collection(self, collection_name: str) -> None:
        """Deletes a collection.

        Arguments:
            collection_name: The name of the collection to delete.
        """
        await self.async_qdrant_client.delete_collection(collection_name=collection_name)

    async def update_collection(self, collection_name: str, **kwargs):
        """Update parameters of the collection."""
        await self.async_qdrant_client.update_collection(collection_name=collection_name, **kwargs)

    async def get_all_collections(
        self,
    ) -> list[str]:
        """Gets the list of collections.

        Returns: The list of collections.
        """
        collection_info = await self.async_qdrant_client.get_collections()
        return [collection.name for collection in collection_info.collections]

    async def does_collection_exist(self, collection_name: str) -> bool:
        """Checks if a collection exists.

        Arguments:
            collection_name: The name of the collection to check.

        Returns:
            bool: True if the collection exists; otherwise, False.
        """
        return await self.async_qdrant_client.collection_exists(collection_name=collection_name)

    async def get_collection_info(self, collection_name: str) -> types.CollectionInfo:
        """Get the collection information."""
        return await self.async_qdrant_client.get_collection(collection_name=collection_name)

    async def add(
        self,
        collection_name: str,
        documents: Iterable[str],
        names: Iterable[str | None] | None = None,
        categories: Iterable[str | None] | None = None,
        kinds: Iterable[str | None] | None = None,
        created_datetimes: Iterable[datetime | None] | None= None,
        updated_datetimes: Iterable[datetime | None] | None = None,
        metadata: Iterable[dict[str, Any]] | None = None,
        ids: Iterable[ExtendedPointId] | None = None,
        batch_size: int = 32,
        parallel: int | None = None,
        **kwargs,
    ) -> list[str | int]:
        """
        Adds text documents into qdrant collection.
        If collection does not exist, it will be created with default parameters.
        Metadata in combination with documents will be added as payload.
        Documents will be embedded using the specified embedding model.

        If you want to use your own vectors, use `upsert` method instead.

        Args:
            collection_name (str):
                Name of the collection to add documents to.
            documents (Iterable[str]):
                List of documents to embed and add to the collection.
            names (Iterable[str | None]):
                Specify the corresponding name. It is part of the metadata.
                Default to None.
            categorys (Iterable[str | None] | None):
                Specify the corresponding category. It is part of the metadata.
                Default to None.
            kinds (Iterable[str | None] | None):
                Specify the corresponding kind. It is part of the metadata.
                Default to None.
            created_datetimes (Iterable[datetime | None]):
                The time of creation. If not specified, it will be generated automatically.
            updated_datetimes (Iterable[datetime | None] | None):
                The time of modification. If not specified, it will be generated automatically.
            metadata (Iterable[Dict[str, Any]] | None):
                List of other metadata dicts. Defaults to None.
            ids (Iterable[models.ExtendedPointId] | None):
                List of ids to assign to documents.
                If not specified, UUIDs will be generated. Defaults to None.
            batch_size (int | None):
                How many documents to embed and upload in single request. Defaults to 32.
            parallel (Optional[int] | None):
                How many parallel workers to use for embedding. Defaults to None.
                If number is specified, data-parallel process will be used.

        Raises:
            ImportError: If fastembed is not installed.

        Returns:
            List of IDs of added documents. If no ids provided, UUIDs will be randomly generated on client side.
        """
        _import_fastembed()
        current_utc_datetime = datetime.now(timezone.utc)
        time_now_rfc3339 = current_utc_datetime.isoformat()
        none_cycle = cycle([None])
        time_now_cycle = cycle([time_now_rfc3339])
        names_ = iter(names) if names is not None else none_cycle
        categories_ = iter(categories) if categories is not None else none_cycle
        kinds_ = iter(kinds) if kinds is not None else none_cycle
        created_datetimes_ = iter(created_datetimes) if created_datetimes is not None else time_now_cycle
        updated_datetimes_ = iter(updated_datetimes) if updated_datetimes is not None else time_now_cycle
        metadata_ = iter(metadata) if metadata is not None else cycle([{}])
        updated_metadata = (
            {
                "name": next(names_),
                "category": next(categories_),
                "kind": next(kinds_),
                "created_datetime": next(created_datetimes_),
                "updated_datetime": next(updated_datetimes_),
                **next(metadata_)
            } for _ in documents
        )

        texts_list = []
        metadata_list = []
        for doc, meta in zip(documents, updated_metadata):
            texts = list(self.split(doc))
            texts_list.extend(texts)
            metadata_list.extend([meta] * len(texts))

        return await self.async_qdrant_client.add(
            collection_name=collection_name,
            documents=texts_list,
            metadata=metadata_list,
            ids=ids,
            batch_size=batch_size,
            parallel=parallel,
            **kwargs
        )

    async def query(
        self,
        collection_name: str,
        query_text: str,
        query_filter: Filter | None = None,
        limit: int = 5,
        score_threshold : float | None = None,
        return_text: bool = True,
        **kwargs,
    ) -> list[QueryResponse] | list[str]:
        """
        Search for documents in a collection.
        This method automatically embeds the query text using the specified embedding model.
        If you want to use your own query vector, use `search` method instead.

        Args:
            collection_name: Collection to search in
            query_text:
                Text to search for. This text will be embedded using the specified embedding model.
                And then used as a query vector.
            query_filter:
                Exclude vectors which doesn't fit given conditions.
                If `None` - search among all vectors
            limit: How many results return
            score_threshold:
                Return only results that exceed this score.
                If it is None, no score filtering is applied. Default to None.
            return_text: Only return document text if True.
            **kwargs: Additional search parameters. See `qdrant_client.models.SearchRequest` for details.

        Returns:
            list[types.ScoredPoint] | list[str]: List of scored points.

        """
        _import_fastembed()
        result = await self.async_qdrant_client.query(
            collection_name=collection_name,
            query_text=query_text,
            query_filter=query_filter,
            limit=limit,
            **kwargs,
        )
        if score_threshold:
            result = [point for point in result if point.score >= score_threshold]
        if return_text:
            return [point.document for point in result]
        else:
            return result

    async def query_batch(
        self,
        collection_name: str,
        query_texts: list[str],
        query_filter: Filter | None = None,
        limit: int = 5,
        score_threshold : float | None = None,
        return_text: bool = True,
        **kwargs,
    ) -> list[list[QueryResponse]] | list[list[str]]:
        """
        Search for documents in a collection with batched query.
        This method automatically embeds the query text using the specified embedding model.

        Args:
            collection_name: Collection to search in
            query_texts:
                A list of texts to search for. Each text will be embedded using the specified embedding model.
                And then used as a query vector for a separate search requests.
            query_filter:
                Exclude vectors which doesn't fit given conditions.
                If `None` - search among all vectors
                This filter will be applied to all search requests.
            limit: How many results return
            score_threshold:
                Return only results that exceed this score.
                If it is None, no score filtering is applied. Default to None.
            return_text: Only return document text if True.
            **kwargs: Additional search parameters. See `qdrant_client.models.SearchRequest` for details.

        Returns:
            list[list[QueryResponse]] | list[list[str]]: List of lists of responses for each query text.

        """
        _import_fastembed()
        result = await self.async_qdrant_client.query_batch(
            collection_name=collection_name,
            query_texts=query_texts,
            query_filter=query_filter,
            limit=limit,
            **kwargs,
        )
        if score_threshold:
            result = [[point for point in inner_list if point.score >= score_threshold] for inner_list in result]
        if return_text:
            return [[point.document for point in inner_list] for inner_list in result]
        else:
            return result

    async def delete(self, collection_name: str, filter: Filter) -> None:
        """Delete the records selected by the filter."""
        await self.async_qdrant_client.delete(
            collection_name=collection_name,
            points_selector=models.FilterSelector(filter=filter)
        )

    async def scroll(
        self,
        collection_name: str,
        scroll_filter: types.Filter | None = None,
        limit: int = 10,
        with_payload: bool | Sequence[str] | types.PayloadSelector = True,
        with_vectors: bool | Sequence[str]= False,
        order_by: types.OrderBy | None = None,
    ) -> tuple[list[types.Record], types.PointId | None]:
        """Scroll over all (matching) points in the collection.

        This method provides a way to iterate over all stored points with some optional filtering condition.
        Scroll does not apply any similarity estimations, it will return points sorted by id in ascending order.

        Args:
            collection_name: Name of the collection
            scroll_filter: If provided - only returns points matching filtering conditions
            limit: How many points to return
            with_payload:
                - Specify which stored payload should be attached to the result.
                - If `True` - attach all payload
                - If `False` - do not attach any payload
                - If List of string - include only specified fields
                - If `PayloadSelector` - use explicit rules
            with_vectors:
                - If `True` - Attach stored vector to the search result.
                - If `False` (default) - Do not attach vector.
                - If List of string - include only specified fields
            order_by: Order the records by a payload key. If `None` - order by id

        Returns:
            A pair of (List of points) and (optional offset for the next scroll request).
            If next page offset is `None` - there is no more points in the collection to scroll.
        """
        return await self.async_qdrant_client.scroll(
            collection_name=collection_name,
            scroll_filter=scroll_filter,
            limit=limit,
            order_by=order_by,
            with_payload=with_payload,
            with_vectors=with_vectors,
        )

    async def count(
        self,
        collection_name: str,
        count_filter: types.Filter | None = None,
        exact: bool = True
    ) -> int:
        """Count points in the collection.

        Count points in the collection matching the given filter.

        Args:
            collection_name: name of the collection to count points in
            count_filter: filtering conditions
            exact:
                If `True` - provide the exact count of points matching the filter.
                If `False` - provide the approximate count of points matching the filter. Works faster.
        Returns:
            Amount of points in the collection matching the filter.
        """
        result = await self.async_qdrant_client.count(
            collection_name=collection_name,
            count_filter=count_filter,
            exact=exact,
        )
        return result.count

    def metadata_filter(
        self,
        names: list[str] | None = None,
        categories: list[str] | None = None,
        kinds: list[str] | None = None,
        created_datetime_range: tuple[datetime | None, datetime | None] = (None, None,),
        updated_datetime_range: tuple[datetime | None, datetime | None] = (None, None,),
        document_texts: list[str] | None = None,
    ) -> Filter:
        """Generate the corresponding filter based no the conditions.

        Within each option, the logic is 'OR', and between multiple options, the
        logic is 'AND'.

        Args:
            names: Filter by name.
            categories: Filter by category.
            kinds: Filter by kind.
            created_datetime_range:
                Filter by creation time, which is a tuple with the first time as the start time
                and the second time as the end time.
            updated_datetime_range:
                Filter by modification time, which is a tuple with the first time as the start
                time and the second time as the end time.
            document_texts:
                Filter by text content.
                Note that when there are multiple text contents, it means the entries that match
                must contain all these text contents simultaneously.

        Returns:
            The filter.
        """        
        name_condition = models.FieldCondition(
            key="name", match=models.MatchAny(any=names),
        ) if names else models.FieldCondition(
            key="name", match=models.MatchExcept(**{"except": []}),
        )

        category_condition = models.FieldCondition(
            key="category", match=models.MatchAny(any=categories),
        ) if categories else models.FieldCondition(
            key="category", match=models.MatchExcept(**{"except": []}),
        )

        kind_condition = models.FieldCondition(
            key="kind", match=models.MatchAny(any=kinds),
        ) if kinds else models.FieldCondition(
            key="kind", match=models.MatchExcept(**{"except": []}),
        )

        created_datetime_range_condition = models.FieldCondition(
            key="created_datetime", range=models.DatetimeRange(
                gt=None,
                gte=created_datetime_range[0],
                lt=None,
                lte=created_datetime_range[1],
            ),
        )

        updated_datetime_range_condition = models.FieldCondition(
            key="updated_datetime", range=models.DatetimeRange(
                gt=None,
                gte=updated_datetime_range[0],
                lt=None,
                lte=updated_datetime_range[1],
            ),
        )

        document_text_condition = models.Filter(
            must=[
                models.FieldCondition(
                    key="document", match=models.MatchText(text=text),
                ) for text in document_texts
            ]
        ) if document_texts else models.Filter(
            must_not=[
                models.IsEmptyCondition(is_empty=models.PayloadField(key="document"))
            ]
        )

        filter = models.Filter(
            must=[
                name_condition,
                category_condition,
                kind_condition,
                created_datetime_range_condition,
                updated_datetime_range_condition,
                document_text_condition,
            ],
        )
        return filter

add(collection_name, documents, names=None, categories=None, kinds=None, created_datetimes=None, updated_datetimes=None, metadata=None, ids=None, batch_size=32, parallel=None, **kwargs) async

Adds text documents into qdrant collection. If collection does not exist, it will be created with default parameters. Metadata in combination with documents will be added as payload. Documents will be embedded using the specified embedding model.

If you want to use your own vectors, use upsert method instead.

Parameters:

Name Type Description Default
collection_name str

Name of the collection to add documents to.

required
documents Iterable[str]

List of documents to embed and add to the collection.

required
names Iterable[str | None]

Specify the corresponding name. It is part of the metadata. Default to None.

None
categorys Iterable[str | None] | None

Specify the corresponding category. It is part of the metadata. Default to None.

required
kinds Iterable[str | None] | None

Specify the corresponding kind. It is part of the metadata. Default to None.

None
created_datetimes Iterable[datetime | None]

The time of creation. If not specified, it will be generated automatically.

None
updated_datetimes Iterable[datetime | None] | None

The time of modification. If not specified, it will be generated automatically.

None
metadata Iterable[Dict[str, Any]] | None

List of other metadata dicts. Defaults to None.

None
ids Iterable[ExtendedPointId] | None

List of ids to assign to documents. If not specified, UUIDs will be generated. Defaults to None.

None
batch_size int | None

How many documents to embed and upload in single request. Defaults to 32.

32
parallel Optional[int] | None

How many parallel workers to use for embedding. Defaults to None. If number is specified, data-parallel process will be used.

None

Raises:

Type Description
ImportError

If fastembed is not installed.

Returns:

Type Description
list[str | int]

List of IDs of added documents. If no ids provided, UUIDs will be randomly generated on client side.

Source code in src/agere/addons/qdrant_vector.py
async def add(
    self,
    collection_name: str,
    documents: Iterable[str],
    names: Iterable[str | None] | None = None,
    categories: Iterable[str | None] | None = None,
    kinds: Iterable[str | None] | None = None,
    created_datetimes: Iterable[datetime | None] | None= None,
    updated_datetimes: Iterable[datetime | None] | None = None,
    metadata: Iterable[dict[str, Any]] | None = None,
    ids: Iterable[ExtendedPointId] | None = None,
    batch_size: int = 32,
    parallel: int | None = None,
    **kwargs,
) -> list[str | int]:
    """
    Adds text documents into qdrant collection.
    If collection does not exist, it will be created with default parameters.
    Metadata in combination with documents will be added as payload.
    Documents will be embedded using the specified embedding model.

    If you want to use your own vectors, use `upsert` method instead.

    Args:
        collection_name (str):
            Name of the collection to add documents to.
        documents (Iterable[str]):
            List of documents to embed and add to the collection.
        names (Iterable[str | None]):
            Specify the corresponding name. It is part of the metadata.
            Default to None.
        categorys (Iterable[str | None] | None):
            Specify the corresponding category. It is part of the metadata.
            Default to None.
        kinds (Iterable[str | None] | None):
            Specify the corresponding kind. It is part of the metadata.
            Default to None.
        created_datetimes (Iterable[datetime | None]):
            The time of creation. If not specified, it will be generated automatically.
        updated_datetimes (Iterable[datetime | None] | None):
            The time of modification. If not specified, it will be generated automatically.
        metadata (Iterable[Dict[str, Any]] | None):
            List of other metadata dicts. Defaults to None.
        ids (Iterable[models.ExtendedPointId] | None):
            List of ids to assign to documents.
            If not specified, UUIDs will be generated. Defaults to None.
        batch_size (int | None):
            How many documents to embed and upload in single request. Defaults to 32.
        parallel (Optional[int] | None):
            How many parallel workers to use for embedding. Defaults to None.
            If number is specified, data-parallel process will be used.

    Raises:
        ImportError: If fastembed is not installed.

    Returns:
        List of IDs of added documents. If no ids provided, UUIDs will be randomly generated on client side.
    """
    _import_fastembed()
    current_utc_datetime = datetime.now(timezone.utc)
    time_now_rfc3339 = current_utc_datetime.isoformat()
    none_cycle = cycle([None])
    time_now_cycle = cycle([time_now_rfc3339])
    names_ = iter(names) if names is not None else none_cycle
    categories_ = iter(categories) if categories is not None else none_cycle
    kinds_ = iter(kinds) if kinds is not None else none_cycle
    created_datetimes_ = iter(created_datetimes) if created_datetimes is not None else time_now_cycle
    updated_datetimes_ = iter(updated_datetimes) if updated_datetimes is not None else time_now_cycle
    metadata_ = iter(metadata) if metadata is not None else cycle([{}])
    updated_metadata = (
        {
            "name": next(names_),
            "category": next(categories_),
            "kind": next(kinds_),
            "created_datetime": next(created_datetimes_),
            "updated_datetime": next(updated_datetimes_),
            **next(metadata_)
        } for _ in documents
    )

    texts_list = []
    metadata_list = []
    for doc, meta in zip(documents, updated_metadata):
        texts = list(self.split(doc))
        texts_list.extend(texts)
        metadata_list.extend([meta] * len(texts))

    return await self.async_qdrant_client.add(
        collection_name=collection_name,
        documents=texts_list,
        metadata=metadata_list,
        ids=ids,
        batch_size=batch_size,
        parallel=parallel,
        **kwargs
    )

count(collection_name, count_filter=None, exact=True) async

Count points in the collection.

Count points in the collection matching the given filter.

Parameters:

Name Type Description Default
collection_name str

name of the collection to count points in

required
count_filter Filter | None

filtering conditions

None
exact bool

If True - provide the exact count of points matching the filter. If False - provide the approximate count of points matching the filter. Works faster.

True

Returns: Amount of points in the collection matching the filter.

Source code in src/agere/addons/qdrant_vector.py
async def count(
    self,
    collection_name: str,
    count_filter: types.Filter | None = None,
    exact: bool = True
) -> int:
    """Count points in the collection.

    Count points in the collection matching the given filter.

    Args:
        collection_name: name of the collection to count points in
        count_filter: filtering conditions
        exact:
            If `True` - provide the exact count of points matching the filter.
            If `False` - provide the approximate count of points matching the filter. Works faster.
    Returns:
        Amount of points in the collection matching the filter.
    """
    result = await self.async_qdrant_client.count(
        collection_name=collection_name,
        count_filter=count_filter,
        exact=exact,
    )
    return result.count

create_collection(collection_name, vectors_config=None, sparse_vectors_config=None, init_from_collection_name=None, **kwargs) async

Creates empty collection with given parameters.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to create.

required
vectors_config VectorParams | Mapping[str, VectorParams] | None

Specify vectors config. Left to be None if using fastembed.

None
sparse_vectors_config Mapping[str, SparseVectorParams] | None

Specify the sparse vectors config.

None
init_from_collection_name str | None

Use data stored in another collection to initialize this collection.

None

Returns:

Type Description
bool

Operation result.

Source code in src/agere/addons/qdrant_vector.py
async def create_collection(
    self,
    collection_name: str,
    vectors_config: types.VectorParams | Mapping[str, types.VectorParams] | None = None,
    sparse_vectors_config: Mapping[str, types.SparseVectorParams] | None = None,
    init_from_collection_name: str | None = None,
    **kwargs,
) -> bool:
    """Creates empty collection with given parameters.

    Arguments:
        collection_name: The name of the collection to create.
        vectors_config: Specify vectors config. Left to be None if using fastembed.
        sparse_vectors_config: Specify the sparse vectors config.
        init_from_collection_name: Use data stored in another collection to initialize this collection.

    Returns:
        Operation result.
    """
    return await self.async_qdrant_client.create_collection(
        collection_name=collection_name,
        vectors_config=vectors_config or self.async_qdrant_client.get_fastembed_vector_params(),
        sparse_vectors_config=sparse_vectors_config,
        init_from=models.InitFrom(
            collection=init_from_collection_name
        ) if init_from_collection_name is not None else None,
        **kwargs
    )

delete(collection_name, filter) async

Delete the records selected by the filter.

Source code in src/agere/addons/qdrant_vector.py
async def delete(self, collection_name: str, filter: Filter) -> None:
    """Delete the records selected by the filter."""
    await self.async_qdrant_client.delete(
        collection_name=collection_name,
        points_selector=models.FilterSelector(filter=filter)
    )

delete_collection(collection_name) async

Deletes a collection.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to delete.

required
Source code in src/agere/addons/qdrant_vector.py
async def delete_collection(self, collection_name: str) -> None:
    """Deletes a collection.

    Arguments:
        collection_name: The name of the collection to delete.
    """
    await self.async_qdrant_client.delete_collection(collection_name=collection_name)

does_collection_exist(collection_name) async

Checks if a collection exists.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to check.

required

Returns:

Name Type Description
bool bool

True if the collection exists; otherwise, False.

Source code in src/agere/addons/qdrant_vector.py
async def does_collection_exist(self, collection_name: str) -> bool:
    """Checks if a collection exists.

    Arguments:
        collection_name: The name of the collection to check.

    Returns:
        bool: True if the collection exists; otherwise, False.
    """
    return await self.async_qdrant_client.collection_exists(collection_name=collection_name)

get_all_collections() async

Gets the list of collections.

Returns: The list of collections.

Source code in src/agere/addons/qdrant_vector.py
async def get_all_collections(
    self,
) -> list[str]:
    """Gets the list of collections.

    Returns: The list of collections.
    """
    collection_info = await self.async_qdrant_client.get_collections()
    return [collection.name for collection in collection_info.collections]

get_collection(collection_name) async

Gets the a collections based upon collection name.

Returns:

Name Type Description
CollectionInfo CollectionInfo

Collection Information from Qdrant about collection.

Source code in src/agere/addons/qdrant_vector.py
async def get_collection(self, collection_name: str) -> types.CollectionInfo:
    """Gets the a collections based upon collection name.

    Returns:
        CollectionInfo: Collection Information from Qdrant about collection.
    """
    collection_info = await self.async_qdrant_client.get_collection(collection_name=collection_name)
    return collection_info

get_collection_info(collection_name) async

Get the collection information.

Source code in src/agere/addons/qdrant_vector.py
async def get_collection_info(self, collection_name: str) -> types.CollectionInfo:
    """Get the collection information."""
    return await self.async_qdrant_client.get_collection(collection_name=collection_name)

metadata_filter(names=None, categories=None, kinds=None, created_datetime_range=(None, None), updated_datetime_range=(None, None), document_texts=None)

Generate the corresponding filter based no the conditions.

Within each option, the logic is 'OR', and between multiple options, the logic is 'AND'.

Parameters:

Name Type Description Default
names list[str] | None

Filter by name.

None
categories list[str] | None

Filter by category.

None
kinds list[str] | None

Filter by kind.

None
created_datetime_range tuple[datetime | None, datetime | None]

Filter by creation time, which is a tuple with the first time as the start time and the second time as the end time.

(None, None)
updated_datetime_range tuple[datetime | None, datetime | None]

Filter by modification time, which is a tuple with the first time as the start time and the second time as the end time.

(None, None)
document_texts list[str] | None

Filter by text content. Note that when there are multiple text contents, it means the entries that match must contain all these text contents simultaneously.

None

Returns:

Type Description
Filter

The filter.

Source code in src/agere/addons/qdrant_vector.py
def metadata_filter(
    self,
    names: list[str] | None = None,
    categories: list[str] | None = None,
    kinds: list[str] | None = None,
    created_datetime_range: tuple[datetime | None, datetime | None] = (None, None,),
    updated_datetime_range: tuple[datetime | None, datetime | None] = (None, None,),
    document_texts: list[str] | None = None,
) -> Filter:
    """Generate the corresponding filter based no the conditions.

    Within each option, the logic is 'OR', and between multiple options, the
    logic is 'AND'.

    Args:
        names: Filter by name.
        categories: Filter by category.
        kinds: Filter by kind.
        created_datetime_range:
            Filter by creation time, which is a tuple with the first time as the start time
            and the second time as the end time.
        updated_datetime_range:
            Filter by modification time, which is a tuple with the first time as the start
            time and the second time as the end time.
        document_texts:
            Filter by text content.
            Note that when there are multiple text contents, it means the entries that match
            must contain all these text contents simultaneously.

    Returns:
        The filter.
    """        
    name_condition = models.FieldCondition(
        key="name", match=models.MatchAny(any=names),
    ) if names else models.FieldCondition(
        key="name", match=models.MatchExcept(**{"except": []}),
    )

    category_condition = models.FieldCondition(
        key="category", match=models.MatchAny(any=categories),
    ) if categories else models.FieldCondition(
        key="category", match=models.MatchExcept(**{"except": []}),
    )

    kind_condition = models.FieldCondition(
        key="kind", match=models.MatchAny(any=kinds),
    ) if kinds else models.FieldCondition(
        key="kind", match=models.MatchExcept(**{"except": []}),
    )

    created_datetime_range_condition = models.FieldCondition(
        key="created_datetime", range=models.DatetimeRange(
            gt=None,
            gte=created_datetime_range[0],
            lt=None,
            lte=created_datetime_range[1],
        ),
    )

    updated_datetime_range_condition = models.FieldCondition(
        key="updated_datetime", range=models.DatetimeRange(
            gt=None,
            gte=updated_datetime_range[0],
            lt=None,
            lte=updated_datetime_range[1],
        ),
    )

    document_text_condition = models.Filter(
        must=[
            models.FieldCondition(
                key="document", match=models.MatchText(text=text),
            ) for text in document_texts
        ]
    ) if document_texts else models.Filter(
        must_not=[
            models.IsEmptyCondition(is_empty=models.PayloadField(key="document"))
        ]
    )

    filter = models.Filter(
        must=[
            name_condition,
            category_condition,
            kind_condition,
            created_datetime_range_condition,
            updated_datetime_range_condition,
            document_text_condition,
        ],
    )
    return filter

query(collection_name, query_text, query_filter=None, limit=5, score_threshold=None, return_text=True, **kwargs) async

Search for documents in a collection. This method automatically embeds the query text using the specified embedding model. If you want to use your own query vector, use search method instead.

Parameters:

Name Type Description Default
collection_name str

Collection to search in

required
query_text str

Text to search for. This text will be embedded using the specified embedding model. And then used as a query vector.

required
query_filter Filter | None

Exclude vectors which doesn't fit given conditions. If None - search among all vectors

None
limit int

How many results return

5
score_threshold float | None

Return only results that exceed this score. If it is None, no score filtering is applied. Default to None.

None
return_text bool

Only return document text if True.

True
**kwargs

Additional search parameters. See qdrant_client.models.SearchRequest for details.

{}

Returns:

Type Description
list[QueryResponse] | list[str]

list[types.ScoredPoint] | list[str]: List of scored points.

Source code in src/agere/addons/qdrant_vector.py
async def query(
    self,
    collection_name: str,
    query_text: str,
    query_filter: Filter | None = None,
    limit: int = 5,
    score_threshold : float | None = None,
    return_text: bool = True,
    **kwargs,
) -> list[QueryResponse] | list[str]:
    """
    Search for documents in a collection.
    This method automatically embeds the query text using the specified embedding model.
    If you want to use your own query vector, use `search` method instead.

    Args:
        collection_name: Collection to search in
        query_text:
            Text to search for. This text will be embedded using the specified embedding model.
            And then used as a query vector.
        query_filter:
            Exclude vectors which doesn't fit given conditions.
            If `None` - search among all vectors
        limit: How many results return
        score_threshold:
            Return only results that exceed this score.
            If it is None, no score filtering is applied. Default to None.
        return_text: Only return document text if True.
        **kwargs: Additional search parameters. See `qdrant_client.models.SearchRequest` for details.

    Returns:
        list[types.ScoredPoint] | list[str]: List of scored points.

    """
    _import_fastembed()
    result = await self.async_qdrant_client.query(
        collection_name=collection_name,
        query_text=query_text,
        query_filter=query_filter,
        limit=limit,
        **kwargs,
    )
    if score_threshold:
        result = [point for point in result if point.score >= score_threshold]
    if return_text:
        return [point.document for point in result]
    else:
        return result

query_batch(collection_name, query_texts, query_filter=None, limit=5, score_threshold=None, return_text=True, **kwargs) async

Search for documents in a collection with batched query. This method automatically embeds the query text using the specified embedding model.

Parameters:

Name Type Description Default
collection_name str

Collection to search in

required
query_texts list[str]

A list of texts to search for. Each text will be embedded using the specified embedding model. And then used as a query vector for a separate search requests.

required
query_filter Filter | None

Exclude vectors which doesn't fit given conditions. If None - search among all vectors This filter will be applied to all search requests.

None
limit int

How many results return

5
score_threshold float | None

Return only results that exceed this score. If it is None, no score filtering is applied. Default to None.

None
return_text bool

Only return document text if True.

True
**kwargs

Additional search parameters. See qdrant_client.models.SearchRequest for details.

{}

Returns:

Type Description
list[list[QueryResponse]] | list[list[str]]

list[list[QueryResponse]] | list[list[str]]: List of lists of responses for each query text.

Source code in src/agere/addons/qdrant_vector.py
async def query_batch(
    self,
    collection_name: str,
    query_texts: list[str],
    query_filter: Filter | None = None,
    limit: int = 5,
    score_threshold : float | None = None,
    return_text: bool = True,
    **kwargs,
) -> list[list[QueryResponse]] | list[list[str]]:
    """
    Search for documents in a collection with batched query.
    This method automatically embeds the query text using the specified embedding model.

    Args:
        collection_name: Collection to search in
        query_texts:
            A list of texts to search for. Each text will be embedded using the specified embedding model.
            And then used as a query vector for a separate search requests.
        query_filter:
            Exclude vectors which doesn't fit given conditions.
            If `None` - search among all vectors
            This filter will be applied to all search requests.
        limit: How many results return
        score_threshold:
            Return only results that exceed this score.
            If it is None, no score filtering is applied. Default to None.
        return_text: Only return document text if True.
        **kwargs: Additional search parameters. See `qdrant_client.models.SearchRequest` for details.

    Returns:
        list[list[QueryResponse]] | list[list[str]]: List of lists of responses for each query text.

    """
    _import_fastembed()
    result = await self.async_qdrant_client.query_batch(
        collection_name=collection_name,
        query_texts=query_texts,
        query_filter=query_filter,
        limit=limit,
        **kwargs,
    )
    if score_threshold:
        result = [[point for point in inner_list if point.score >= score_threshold] for inner_list in result]
    if return_text:
        return [[point.document for point in inner_list] for inner_list in result]
    else:
        return result

recreate_collection(collection_name, vectors_config=None, sparse_vectors_config=None, **kwargs) async

Delete and create empty collection with given parameters.

Parameters:

Name Type Description Default
collection_name str

The name of the collection to create.

required
vectors_config VectorParams | Mapping[str, VectorParams] | None

Specify vectors config. Left to be None if using fastembed.

None
sparse_vectors_config Mapping[str, SparseVectorParams] | None

Specify the sparse vectors config.

None

Returns:

Type Description
bool

Operation result.

Source code in src/agere/addons/qdrant_vector.py
async def recreate_collection(
    self,
    collection_name: str,
    vectors_config: types.VectorParams | Mapping[str, types.VectorParams] | None = None,
    sparse_vectors_config: Mapping[str, types.SparseVectorParams] | None = None,
    **kwargs,
) -> bool:
    """Delete and create empty collection with given parameters.

    Arguments:
        collection_name: The name of the collection to create.
        vectors_config: Specify vectors config. Left to be None if using fastembed.
        sparse_vectors_config: Specify the sparse vectors config.

    Returns:
        Operation result.
    """
    return await self.async_qdrant_client.recreate_collection(
        collection_name=collection_name,
        vectors_config=vectors_config or self.async_qdrant_client.get_fastembed_vector_params(),
        sparse_vectors_config=sparse_vectors_config,
        **kwargs
    )

scroll(collection_name, scroll_filter=None, limit=10, with_payload=True, with_vectors=False, order_by=None) async

Scroll over all (matching) points in the collection.

This method provides a way to iterate over all stored points with some optional filtering condition. Scroll does not apply any similarity estimations, it will return points sorted by id in ascending order.

Parameters:

Name Type Description Default
collection_name str

Name of the collection

required
scroll_filter Filter | None

If provided - only returns points matching filtering conditions

None
limit int

How many points to return

10
with_payload bool | Sequence[str] | PayloadSelector
  • Specify which stored payload should be attached to the result.
  • If True - attach all payload
  • If False - do not attach any payload
  • If List of string - include only specified fields
  • If PayloadSelector - use explicit rules
True
with_vectors bool | Sequence[str]
  • If True - Attach stored vector to the search result.
  • If False (default) - Do not attach vector.
  • If List of string - include only specified fields
False
order_by OrderBy | None

Order the records by a payload key. If None - order by id

None

Returns:

Type Description
list[Record]

A pair of (List of points) and (optional offset for the next scroll request).

PointId | None

If next page offset is None - there is no more points in the collection to scroll.

Source code in src/agere/addons/qdrant_vector.py
async def scroll(
    self,
    collection_name: str,
    scroll_filter: types.Filter | None = None,
    limit: int = 10,
    with_payload: bool | Sequence[str] | types.PayloadSelector = True,
    with_vectors: bool | Sequence[str]= False,
    order_by: types.OrderBy | None = None,
) -> tuple[list[types.Record], types.PointId | None]:
    """Scroll over all (matching) points in the collection.

    This method provides a way to iterate over all stored points with some optional filtering condition.
    Scroll does not apply any similarity estimations, it will return points sorted by id in ascending order.

    Args:
        collection_name: Name of the collection
        scroll_filter: If provided - only returns points matching filtering conditions
        limit: How many points to return
        with_payload:
            - Specify which stored payload should be attached to the result.
            - If `True` - attach all payload
            - If `False` - do not attach any payload
            - If List of string - include only specified fields
            - If `PayloadSelector` - use explicit rules
        with_vectors:
            - If `True` - Attach stored vector to the search result.
            - If `False` (default) - Do not attach vector.
            - If List of string - include only specified fields
        order_by: Order the records by a payload key. If `None` - order by id

    Returns:
        A pair of (List of points) and (optional offset for the next scroll request).
        If next page offset is `None` - there is no more points in the collection to scroll.
    """
    return await self.async_qdrant_client.scroll(
        collection_name=collection_name,
        scroll_filter=scroll_filter,
        limit=limit,
        order_by=order_by,
        with_payload=with_payload,
        with_vectors=with_vectors,
    )

split(text)

Split the text.

When specified a splitter, it will use that splitter to split the text, otherwise, return the original text as a list.

Source code in src/agere/addons/qdrant_vector.py
def split(self, text: str) -> Iterable[str]:
    """Split the text.

    When specified a splitter, it will use that splitter to split the text,
    otherwise, return the original text as a list.
    """
    if self.text_splitter is None:
        return [text]
    return self.text_splitter.split(text)

update_collection(collection_name, **kwargs) async

Update parameters of the collection.

Source code in src/agere/addons/qdrant_vector.py
async def update_collection(self, collection_name: str, **kwargs):
    """Update parameters of the collection."""
    await self.async_qdrant_client.update_collection(collection_name=collection_name, **kwargs)