程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

查找集合中的行数

发布于2024-11-30 17:59     阅读(520)     评论(0)     点赞(17)     收藏(3)


我使用 upsert 函数多次插入约 7k 行具有唯一主键值的行,以查看 upsert 的工作原理。插入后,我尝试使用以下方法查找集合中的行数:

collection.num_entities

num_entities = client.get_collection_stats(collection_name)["row_count"]

print(f"Number of entities in the collection: {num_entities}")

collection.query(output_fields=["count(*)"])

它们每个都返回 46k 个计数,其中包括应该删除的行。而这个返回的正是 7k 个唯一 ID:

query_result = collection.query(
    expr="id != ''",
    output_fields=["id"]
)

在进行搜索或查询时,我也看不到重复的行。这种现象的原因是什么?如果我想获取集合中实际的行数(不包括已删除的行数),该如何获取?


解决方案


  • collection.num_entities内部调用client.get_collection_stats() 我们知道,milvus 是按段来管理数据的,每个密封段的元数据(包括初始行数、s3 路径)都记录在 Etcd 中。删除操作不会直接影响密封段,所有删除的 id 都存储在 S3 中的“delta_log”中,而初始行数在 Etcd 中不会改变。 client.get_collection_stats()快速迭代 etcd 中的段,将每个段的初始行数相加,并返回总和值。所以,它不计算已删除的项目。当用户不想加载集合并快速获取原始数字时,此接口有时很有用。
  • collection.query(output_fields=["count(*)"]) 这是一个真正的查询操作,需要加载集合。它会扫描所有段,包括已密封的段和正在增长的段,并统计已删除的项目。通常,如果您需要精确的数字,则需要将一致性级别设置为“强”,以确保 Pulsar 中的所有数据都被查询节点使用,以便数据可查询。
results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")

query_result = collection.query(expr="id != ''", output_fields=["id"])也是一个真正的查询操作,返回集合中所有唯一的id。

为了清楚地理解,请阅读此示例:

import random
import time

from pymilvus import (
    connections, utility, Collection, FieldSchema, CollectionSchema, DataType)

connections.connect(host="localhost", port=19530)

dim = 128
metric_type = "COSINE"

collection_name = "test"

schema = CollectionSchema(
    fields=[
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="vector", dtype = DataType.FLOAT_VECTOR, dim=dim),
    ])
utility.drop_collection(collection_name)
collection = Collection(collection_name, schema)
print(collection_name, "created")

index_params = {
    'metric_type': metric_type,
    'index_type': "FLAT",
    'params': {},
}
collection.create_index(field_name="vector", index_params=index_params)
collection.load()

# insert 7k items with unique ids
batch = 7000
data = [
    [k for k in range(batch)],
    [[random.random() for _ in range(dim)] for _ in range(batch)],
]
collection.insert(data=data)
print(collection_name, "data inserted")

# call upsert to repeat update the 7k items
for i in range(6):
    collection.upsert(data=data)
print(collection_name, "data upserted")

collection.flush() # call flush() here to force the buffer to be flushed to sealed segments. In practice, we don't recommend manually call this method
print("num_entities =", collection.num_entities) # expect 49k

# consistency_level = Stong to ensure all the data is consumed by querynode and queryable
# see https://milvus.io/docs/consistency.md#Consistency-levels
results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")
print("query((count(*)) =", results) # expect 7k

query_result = collection.query(expr="id >= 0", output_fields=["id"], consistency_level="Strong")
print("query(id >= 0) =", len(query_result)) # expect 7k

该脚本的运行结果:

test created
test data inserted
test data upserted
num_entities = 49000
query((count(*)) = data: ["{'count(*)': 7000}"] 
query(id >= 0) = 7000


所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:https://www.pythonheidong.com/blog/article/2046397/56ee814b17291da088a9/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

17 0
收藏该文
已收藏

评论内容:(最多支持255个字符)