程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

以图搜图实现六之web 在线查询服务

发布于2019-08-22 19:49     阅读(383)     评论(0)     点赞(4)     收藏(3)


  1. 创建一个python flask 项目
  2. 根据vgg16模型,将图片转为特征 extract_cnn_vgg16_keras.py
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from numpy import linalg as LA

import keras  
import numpy as np


class VGGNet:
    def __init__(self):
        # weights: 'imagenet'
        # pooling: 'max' or 'avg'
        # input_shape: (width, height, 3), width and height should >= 48
        self.input_shape = (224, 224, 3)
        self.weight = 'imagenet'
        self.pooling = 'max'
        self.model = VGG16(weights = self.weight, input_shape = (self.input_shape[0], self.input_shape[1], self.input_shape[2]), pooling = self.pooling, include_top = False)
        self.model.predict(np.zeros((1, 224, 224 , 3)))

    '''
    Use vgg16 model to extract features
    Output normalized feature vector
    '''
    def extract_feat(self, img_path):
        img = image.load_img(img_path, target_size=(self.input_shape[0], self.input_shape[1]))
        img = image.img_to_array(img)
        img = np.expand_dims(img, axis=0)
        img = preprocess_input(img)
        feat = self.model.predict(img)
        norm_feat = feat[0]/LA.norm(feat[0])
        return norm_feat

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  1. uuid 生成工具类 strUtil.py
#-*-coding:utf-8-*-
import datetime
import random

class Pic_str:
    def create_uuid(self): #生成唯一的图片的名称字符串,防止图片显示时的重名问题
        nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S");  # 生成当前时间
        randomNum = random.randint(0, 100);  # 生成的随机整数n,其䶿0<=n<=100
        if randomNum <= 10:
            randomNum = str(0) + str(randomNum);
        uniqueNum = str(nowTime) + str(randomNum);
        return uniqueNum;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4.服务查询类 picture_upload_vgg.py
#encoding:utf-8
#!/usr/bin/env python
from argparse import ArgumentParser
import base64
from itertools import cycle, product
import json
import os
import pdb
from pprint import pformat, pprint
import random
import sys
from time import time, sleep
import time
from urllib.parse import quote_plus

from PIL import Image
from elasticsearch import Elasticsearch, helpers
import elasticsearch
from flask import Flask, render_template, jsonify, request, make_response, send_from_directory, abort
from more_itertools import chunked

import requests
from sklearn.neighbors import NearestNeighbors
from werkzeug.utils import secure_filename

from strUtil import Pic_str
import numpy as np

from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from numpy import linalg as LA
import keras
import tensorflow as tf
from keras.models import load_model

import math

from extract_cnn_vgg16_keras import VGGNet
from fdfs_client.client import Fdfs_client

app = Flask(name)
UPLOAD_FOLDER = ‘upload’
app.config[‘UPLOAD_FOLDER’] = UPLOAD_FOLDER
basedir = os.path.abspath(os.path.dirname(file))
ALLOWED_EXTENSIONS = set([‘png’, ‘jpg’, ‘JPG’, ‘PNG’, ‘gif’, ‘GIF’])

def allowed_file(filename):
return ‘.’ in filename and filename.rsplit(’.’, 1)[1] in ALLOWED_EXTENSIONS

@app.route(’/’)
@app.route(’/upload’)
def upload_test():
return render_template(‘up.html’)

#上传文件
@app.route(’/up_photo’, methods=[‘POST’], strict_slashes=False)
def api_upload():
file_dir = os.path.join(basedir, app.config[‘UPLOAD_FOLDER’])
if not os.path.exists(file_dir):
os.makedirs(file_dir)
f = request.files[‘photo’]

if f and allowed_file(f.filename):
fname = secure_filename(f.filename)
print(fname)
ext = fname.rsplit(’.’, 1)[1]
new_filename = Pic_str().create_uuid() + ‘.’ + ext

f.save(os.path.join(file_dir, new_filename))

print("upload_file_path" ,os.path.join(file_dir, new_filename))

#client = Fdfs_client('/mnt/lwf/client2.conf')
#pic_message =  client.upload_by_filename(os.path.join(file_dir, new_filename))
#print('pic_message' ,pic_message)
#query_img='http://ippic.zgg.com/'+ pic_message["Remote file_id"]  

img_dic={}




keras.backend.clear_session()
model = VGGNet()





norm_feat = model.extract_feat(os.path.join(file_dir, new_filename))

#print("norm_feat " + norm_feat.tolist())

if(os.path.exists(os.path.join(file_dir, new_filename))):
    os.remove(os.path.join(file_dir, new_filename))
    print("删除图片成功")
else:
    print("未删除图片")

       

img_name = new_filename.split(".")[0]

img_dic['_id']=img_name 
img_dic['_source']={}
img_dic['_source']['_aknn_vector']=norm_feat.tolist()
img_dic['_source']['url'] =new_filename 

 
        
img_json = json.dumps(img_dic)
print("img_json ",img_json)

#上传单个img_json 文档
body_data = {}
body_data['_index']='twitter_image'
body_data['_type'] ='twitter_image'
body_data['_aknn_uri']='aknn_models/aknn_model/twitter_image_search'
doc_data=[]
doc_data.append(img_dic);
body_data['_aknn_docs']=doc_data;

body_json = json.dumps(body_data)
print(body_json)


#上传格式
body = '''{
  "_index": "twitter_images",
  "_type": "twitter_image",
  "_aknn_uri": "aknn_models/aknn_model/twitter_image_search",
  "_aknn_docs": [
  {
  "_id": 1,
  "_source": {
    "_aknn_vector": [
      0.12,
      0.23
    ],
    "url":"http://a.jpg"
   }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

]
}’’’

hostList = ['172.16.1.8','172.16.1.9','172.16.1.10']
hostIndex = random.randint(0,2)
host = hostList[hostIndex]    
  
headers = {'Content-Type': 'application/json'}  
code  = requests.post('http://'+host+':9200/_aknn_index',  headers=headers,data = body_json)
print('code' ,code)

#查询
get_url = 'http://'+host+':9200/twitter_image/twitter_image/'+img_name+'/_aknn_search?k1=10000&k2=10'
print('get_url' ,get_url)
req = requests.get(get_url)
print('req' ,req)

hits = req.json()["hits"]["hits"]
took_ms = req.json()["took"]
#query_img, neighbor_imgs = hits[0], hits[1:]
#query_img, neighbor_imgs = hits[0], hits[1:]
neighbor_imgs = hits[1:]
query_img=hits[0]
print(query_img,' ',neighbor_imgs)


#删除
#http://192.168.43.100:9200/twitter_image/twitter_image/1
delete_code = requests.delete('http://'+host+':9200/twitter_image/twitter_image/' +img_name)
print('删除成功 ' ,delete_code)



#file_dir = os.path.join(basedir, app.config['UPLOAD_FOLDER'])
#image_data = open(os.path.join(file_dir, '%s' % new_filename), "rb").read()
#print("image_data" ,image_data)
#response = make_response(image_data)
#response.headers['Content-Type'] = 'image/png'



# Render template.'file:///' + 
return render_template(
    "index.html",
    es_index="twitter_image",
    es_type="twitter_image",
    took_ms=took_ms,
    query_img_url=os.path.join(file_dir, new_filename),
    query_img=query_img,
    neighbor_imgs=neighbor_imgs)





#return jsonify({"success": 0, "msg": "上传成功"})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

else:
return jsonify({“error”: 1001, “msg”: “上传失败”})

@app.route(’/download/string:filename’, methods=[‘GET’])
def download(filename):
if request.method == “GET”:
if os.path.isfile(os.path.join(‘upload’, filename)):
return send_from_directory(‘upload’, filename, as_attachment=True)
pass

@app.route(’/show/string:filename’, methods=[‘GET’])
def show_photo(filename):
file_dir = os.path.join(basedir, app.config[‘UPLOAD_FOLDER’])
if request.method == ‘GET’:
if filename is None:
pass
else:
image_data = open(os.path.join(file_dir, ‘%s’ % filename), “rb”).read()
response = make_response(image_data)
response.headers[‘Content-Type’] = ‘image/png’
return response
else:
pass

if name == ‘main’:
app.run( host=‘172.16.1.141’, debug=True)

  1. 网页端查询
    在这里插入图片描述


所属网站分类: 技术文章 > 博客

作者:短发越来越短

链接:https://www.pythonheidong.com/blog/article/53531/1192710e808d547da1ee/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

4 0
收藏该文
已收藏

评论内容:(最多支持255个字符)