发布于2024-11-30 17:59 阅读(520) 评论(0) 点赞(17) 收藏(3)
我使用 upsert 函数多次插入约 7k 行具有唯一主键值的行,以查看 upsert 的工作原理。插入后,我尝试使用以下方法查找集合中的行数:
collection.num_entities
num_entities = client.get_collection_stats(collection_name)["row_count"]
print(f"Number of entities in the collection: {num_entities}")
collection.query(output_fields=["count(*)"])
它们每个都返回 46k 个计数,其中包括应该删除的行。而这个返回的正是 7k 个唯一 ID:
query_result = collection.query(
expr="id != ''",
output_fields=["id"]
)
在进行搜索或查询时,我也看不到重复的行。这种现象的原因是什么?如果我想获取集合中实际的行数(不包括已删除的行数),该如何获取?
collection.num_entities
内部调用client.get_collection_stats()
我们知道,milvus 是按段来管理数据的,每个密封段的元数据(包括初始行数、s3 路径)都记录在 Etcd 中。删除操作不会直接影响密封段,所有删除的 id 都存储在 S3 中的“delta_log”中,而初始行数在 Etcd 中不会改变。
client.get_collection_stats()
快速迭代 etcd 中的段,将每个段的初始行数相加,并返回总和值。所以,它不计算已删除的项目。当用户不想加载集合并快速获取原始数字时,此接口有时很有用。collection.query(output_fields=["count(*)"])
这是一个真正的查询操作,需要加载集合。它会扫描所有段,包括已密封的段和正在增长的段,并统计已删除的项目。通常,如果您需要精确的数字,则需要将一致性级别设置为“强”,以确保 Pulsar 中的所有数据都被查询节点使用,以便数据可查询。results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")
query_result = collection.query(expr="id != ''", output_fields=["id"])
也是一个真正的查询操作,返回集合中所有唯一的id。
为了清楚地理解,请阅读此示例:
import random
import time
from pymilvus import (
connections, utility, Collection, FieldSchema, CollectionSchema, DataType)
connections.connect(host="localhost", port=19530)
dim = 128
metric_type = "COSINE"
collection_name = "test"
schema = CollectionSchema(
fields=[
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="vector", dtype = DataType.FLOAT_VECTOR, dim=dim),
])
utility.drop_collection(collection_name)
collection = Collection(collection_name, schema)
print(collection_name, "created")
index_params = {
'metric_type': metric_type,
'index_type': "FLAT",
'params': {},
}
collection.create_index(field_name="vector", index_params=index_params)
collection.load()
# insert 7k items with unique ids
batch = 7000
data = [
[k for k in range(batch)],
[[random.random() for _ in range(dim)] for _ in range(batch)],
]
collection.insert(data=data)
print(collection_name, "data inserted")
# call upsert to repeat update the 7k items
for i in range(6):
collection.upsert(data=data)
print(collection_name, "data upserted")
collection.flush() # call flush() here to force the buffer to be flushed to sealed segments. In practice, we don't recommend manually call this method
print("num_entities =", collection.num_entities) # expect 49k
# consistency_level = Stong to ensure all the data is consumed by querynode and queryable
# see https://milvus.io/docs/consistency.md#Consistency-levels
results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")
print("query((count(*)) =", results) # expect 7k
query_result = collection.query(expr="id >= 0", output_fields=["id"], consistency_level="Strong")
print("query(id >= 0) =", len(query_result)) # expect 7k
该脚本的运行结果:
test created
test data inserted
test data upserted
num_entities = 49000
query((count(*)) = data: ["{'count(*)': 7000}"]
query(id >= 0) = 7000
作者:黑洞官方问答小能手
链接:https://www.pythonheidong.com/blog/article/2046397/56ee814b17291da088a9/
来源:python黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 python黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-1
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!