chinese_llama2_demo/sample/aipass_client.py

94 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import copy
import json
import requests
import jsonpath_rw
from sample import ne_utils
from urllib import parse
response_path_list = ['$..payload.result', ]
media_type_list = ["text", "audio", "image", "video"]
# 准备请求数据
def prepare_req_data(request_data):
new_request_data = copy.deepcopy(request_data)
media_path2name = {}
for media_type in media_type_list:
media_expr = jsonpath_rw.parse("$..payload.*.{}".format(media_type))
media_match = media_expr.find(new_request_data)
if len(media_match) > 0:
for media in media_match:
media_path2name[str(media.full_path)] = media.value
for media_path, media_name in media_path2name.items():
payload_path_list = media_path.split(".")
f_data = ne_utils.get_file_bytes(media_name)
new_request_data['header']['status'] = 3
new_request_data['payload'][payload_path_list[1]][payload_path_list[2]] = base64.b64encode(f_data).decode()
new_request_data['payload'][payload_path_list[1]]['status'] = 3
return new_request_data
# 执行http请求
def execute(request_url, request_data, method, app_id, api_key, api_secret):
# 清除文件
print("calling %s"%request_url)
ne_utils.del_file('./resource/output')
# 获取请求url
auth_request_url = ne_utils.build_auth_request_url(request_url, method, api_key, api_secret)
url_result = parse.urlparse(request_url)
headers = {'content-type': "application/json", 'host': url_result.hostname, 'app_id': app_id}
# 准备待发送的数据
new_request_data = prepare_req_data(request_data)
# print("请求数据:{}\n".format(new_request_data))
response = requests.post(auth_request_url, data=json.dumps(new_request_data), headers=headers)
new_respnse = deal_response(response)
return new_respnse
# 处理响应数据
def deal_response(response):
temp_result = json.loads(response.content.decode())
# print("响应数据:{}\n".format(temp_result))
header = temp_result.get('header')
if header is None:
return
code = header.get('code')
if header is None or code != 0:
print("获取结果失败请根据code查证问题原因")
return
print("sid:{}".format(header.get('sid')))
# 打印Base64解码后数据并生成文件
if response_path_list is None or len(response_path_list) == 0:
return
for response_path in response_path_list:
response_expr = jsonpath_rw.parse(response_path)
response_match = response_expr.find(temp_result)
if len(response_match) > 0:
for response_item in response_match:
if response_item.value is None:
continue
encoding = response_item.value.get('encoding')
if encoding is None or len(encoding) == 0:
continue
for media_type in media_type_list:
media_value = response_item.value.get(media_type)
if media_value is None or len(media_value) == 0:
continue
real_data = base64.b64decode(media_value)
# print(real_data.decode('utf-8'))
return real_data.decode('utf-8')
# pprint.pprint(json.loads(real_data))
# response_path_split_list = response_path.split('.')
# write_file_path = "./resource/output/{}.{}".\
# format(response_path_split_list[len(response_path_split_list)-1], encoding)
# with open(write_file_path, "ab") as file:
# file.write(real_data)