发布于2019-08-22 19:49 阅读(383) 评论(0) 点赞(4) 收藏(3)
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from numpy import linalg as LA
import keras
import numpy as np
class VGGNet:
def __init__(self):
# weights: 'imagenet'
# pooling: 'max' or 'avg'
# input_shape: (width, height, 3), width and height should >= 48
self.input_shape = (224, 224, 3)
self.weight = 'imagenet'
self.pooling = 'max'
self.model = VGG16(weights = self.weight, input_shape = (self.input_shape[0], self.input_shape[1], self.input_shape[2]), pooling = self.pooling, include_top = False)
self.model.predict(np.zeros((1, 224, 224 , 3)))
'''
Use vgg16 model to extract features
Output normalized feature vector
'''
def extract_feat(self, img_path):
img = image.load_img(img_path, target_size=(self.input_shape[0], self.input_shape[1]))
img = image.img_to_array(img)
img = np.expand_dims(img, axis=0)
img = preprocess_input(img)
feat = self.model.predict(img)
norm_feat = feat[0]/LA.norm(feat[0])
return norm_feat
#-*-coding:utf-8-*-
import datetime
import random
class Pic_str:
def create_uuid(self): #生成唯一的图片的名称字符串,防止图片显示时的重名问题
nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S"); # 生成当前时间
randomNum = random.randint(0, 100); # 生成的随机整数n,其䶿0<=n<=100
if randomNum <= 10:
randomNum = str(0) + str(randomNum);
uniqueNum = str(nowTime) + str(randomNum);
return uniqueNum;
4.服务查询类 picture_upload_vgg.py
#encoding:utf-8
#!/usr/bin/env python
from argparse import ArgumentParser
import base64
from itertools import cycle, product
import json
import os
import pdb
from pprint import pformat, pprint
import random
import sys
from time import time, sleep
import time
from urllib.parse import quote_plus
from PIL import Image
from elasticsearch import Elasticsearch, helpers
import elasticsearch
from flask import Flask, render_template, jsonify, request, make_response, send_from_directory, abort
from more_itertools import chunked
import requests
from sklearn.neighbors import NearestNeighbors
from werkzeug.utils import secure_filename
from strUtil import Pic_str
import numpy as np
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from numpy import linalg as LA
import keras
import tensorflow as tf
from keras.models import load_model
import math
from extract_cnn_vgg16_keras import VGGNet
from fdfs_client.client import Fdfs_client
app = Flask(name)
UPLOAD_FOLDER = ‘upload’
app.config[‘UPLOAD_FOLDER’] = UPLOAD_FOLDER
basedir = os.path.abspath(os.path.dirname(file))
ALLOWED_EXTENSIONS = set([‘png’, ‘jpg’, ‘JPG’, ‘PNG’, ‘gif’, ‘GIF’])
def allowed_file(filename):
return ‘.’ in filename and filename.rsplit(’.’, 1)[1] in ALLOWED_EXTENSIONS
@app.route(’/’)
@app.route(’/upload’)
def upload_test():
return render_template(‘up.html’)
#上传文件
@app.route(’/up_photo’, methods=[‘POST’], strict_slashes=False)
def api_upload():
file_dir = os.path.join(basedir, app.config[‘UPLOAD_FOLDER’])
if not os.path.exists(file_dir):
os.makedirs(file_dir)
f = request.files[‘photo’]
if f and allowed_file(f.filename):
fname = secure_filename(f.filename)
print(fname)
ext = fname.rsplit(’.’, 1)[1]
new_filename = Pic_str().create_uuid() + ‘.’ + ext
f.save(os.path.join(file_dir, new_filename))
print("upload_file_path" ,os.path.join(file_dir, new_filename))
#client = Fdfs_client('/mnt/lwf/client2.conf')
#pic_message = client.upload_by_filename(os.path.join(file_dir, new_filename))
#print('pic_message' ,pic_message)
#query_img='http://ippic.zgg.com/'+ pic_message["Remote file_id"]
img_dic={}
keras.backend.clear_session()
model = VGGNet()
norm_feat = model.extract_feat(os.path.join(file_dir, new_filename))
#print("norm_feat " + norm_feat.tolist())
if(os.path.exists(os.path.join(file_dir, new_filename))):
os.remove(os.path.join(file_dir, new_filename))
print("删除图片成功")
else:
print("未删除图片")
img_name = new_filename.split(".")[0]
img_dic['_id']=img_name
img_dic['_source']={}
img_dic['_source']['_aknn_vector']=norm_feat.tolist()
img_dic['_source']['url'] =new_filename
img_json = json.dumps(img_dic)
print("img_json ",img_json)
#上传单个img_json 文档
body_data = {}
body_data['_index']='twitter_image'
body_data['_type'] ='twitter_image'
body_data['_aknn_uri']='aknn_models/aknn_model/twitter_image_search'
doc_data=[]
doc_data.append(img_dic);
body_data['_aknn_docs']=doc_data;
body_json = json.dumps(body_data)
print(body_json)
#上传格式
body = '''{
"_index": "twitter_images",
"_type": "twitter_image",
"_aknn_uri": "aknn_models/aknn_model/twitter_image_search",
"_aknn_docs": [
{
"_id": 1,
"_source": {
"_aknn_vector": [
0.12,
0.23
],
"url":"http://a.jpg"
}
}
]
}’’’
hostList = ['172.16.1.8','172.16.1.9','172.16.1.10']
hostIndex = random.randint(0,2)
host = hostList[hostIndex]
headers = {'Content-Type': 'application/json'}
code = requests.post('http://'+host+':9200/_aknn_index', headers=headers,data = body_json)
print('code' ,code)
#查询
get_url = 'http://'+host+':9200/twitter_image/twitter_image/'+img_name+'/_aknn_search?k1=10000&k2=10'
print('get_url' ,get_url)
req = requests.get(get_url)
print('req' ,req)
hits = req.json()["hits"]["hits"]
took_ms = req.json()["took"]
#query_img, neighbor_imgs = hits[0], hits[1:]
#query_img, neighbor_imgs = hits[0], hits[1:]
neighbor_imgs = hits[1:]
query_img=hits[0]
print(query_img,' ',neighbor_imgs)
#删除
#http://192.168.43.100:9200/twitter_image/twitter_image/1
delete_code = requests.delete('http://'+host+':9200/twitter_image/twitter_image/' +img_name)
print('删除成功 ' ,delete_code)
#file_dir = os.path.join(basedir, app.config['UPLOAD_FOLDER'])
#image_data = open(os.path.join(file_dir, '%s' % new_filename), "rb").read()
#print("image_data" ,image_data)
#response = make_response(image_data)
#response.headers['Content-Type'] = 'image/png'
# Render template.'file:///' +
return render_template(
"index.html",
es_index="twitter_image",
es_type="twitter_image",
took_ms=took_ms,
query_img_url=os.path.join(file_dir, new_filename),
query_img=query_img,
neighbor_imgs=neighbor_imgs)
#return jsonify({"success": 0, "msg": "上传成功"})
else:
return jsonify({“error”: 1001, “msg”: “上传失败”})
@app.route(’/download/string:filename’, methods=[‘GET’])
def download(filename):
if request.method == “GET”:
if os.path.isfile(os.path.join(‘upload’, filename)):
return send_from_directory(‘upload’, filename, as_attachment=True)
pass
@app.route(’/show/string:filename’, methods=[‘GET’])
def show_photo(filename):
file_dir = os.path.join(basedir, app.config[‘UPLOAD_FOLDER’])
if request.method == ‘GET’:
if filename is None:
pass
else:
image_data = open(os.path.join(file_dir, ‘%s’ % filename), “rb”).read()
response = make_response(image_data)
response.headers[‘Content-Type’] = ‘image/png’
return response
else:
pass
if name == ‘main’:
app.run( host=‘172.16.1.141’, debug=True)
作者:短发越来越短
链接:https://www.pythonheidong.com/blog/article/53531/1192710e808d547da1ee/
来源:python黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 python黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-1
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!