- json相关
# 合并json
def merge_jsons(json_names, merged_name):
df_jsons = []
for json_name in json_names:
df_jsons.append(pd.read_json(json_name, lines=True))
df_merge = pd.concat(df_jsons)
fw_all = open(merged_name, 'w+')
for index, row in df_merge.iterrows():
fw_all.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
fw_all.close()
# df文件按fw的json保存(路径少一个\/少一个/)
def save_df_to_json(df, save_json_name):
fw = open(save_json_name, 'w+')
for index, row in df.iterrows():
fw.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
fw.close()
# json的按行格式(jsonl)和高可读性格式(indent=4)转换
def lines_json_to_indent_json(source_json, target_json):
df_s = pd.read_json(source_json, lines=True)
fw_t = open(target_json, 'w+')
lists = []
for index, row in df_s.iterrows():
lists.append(row.to_dict())
json.dump(lists, fw_t, indent=4, ensure_ascii=False)
fw_t.close()
def indent_json_to_lines_json(source_json, target_json):
df_s = pd.read_json(source_json)
fw_t = open(target_json, 'w+')
for index, row in df_s.iterrows():
fw_t.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
fw_t.close()
- 可视化图片
# 可视化图片
from aoss_client.client import Client
from PIL import Image
import io
import os
client = Client('aoss.conf')
def vis_picture(img_path, vis_width=224):
if 's3://' in img_path:
# print(f"image in reading, {img_path}")
try:
image = Image.open(io.BytesIO(client.get(img_path)))
except Exception as e:
print(e)
else:
image = Image.open(img_path)
# print("image read end")
# print(img_path, image.size)
# vis_width = 224
image = image.resize((vis_width, int(image.size[1]/image.size[0]*vis_width)))
from IPython.display import display
display(image)
- Prompt
prompt ='''
// sys prompt
你是一名《角色定义》,根据[变量],并依据<request>来《解决问题》。
# request #
《示例》
拒绝回答主要指模型不回答或拒绝回答用户的问题,例如以下情况:
1. 表示抱歉,随后表示无法回答/解释;
2. 表示无法评价/回答;
3. 表示不能回答用户的问题;
# input format #
[对话]以以下格式给出:
[
{{"from": "human", "value": "人类问题"}},
{{"from": "gpt", "value": "模型的回答"}},
...
]
# output format #
请输出的内容遵守严格的json格式,且只有以下的json内容,具体的json格式如下:
{{
"is_refused": 0或1(出现拒绝回答的情况为1,0表示正常回答),
"reson": "原因,给出拒绝回答的原文内容所在"
}}without code block
// usr prompt
请根据以下对话内容,判断模型gpt的回答是否出现拒答。
[对话]: {dialog}
'''
api接口
4.1 本地/ceph图片读取用于apidef encode_image_to_base64(image_path): if 's3' in image_path: # return get_ceph_img(image_path) # return base64.b64encode(io.BytesIO(client.get(image_path))).decode("utf-8") return base64.b64encode(client.get(image_path)).decode("utf-8") else: with open(image_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8")
api断点重跑,去重
def get_result(save_json, deduplication_key : str, df_source, ): if not os.path.exists(os.path.dirname(save_json)): os.makedirs(os.path.dirname(save_json)) fw = open(save_json, 'a+') exist_save = pd.read_json(save_json, lines=True) try: exist_name = exist_save[deduplication_key].to_list() except: exist_name = [] for i in df_source.index: row = df_source.loc[i] if row[deduplication_key] in exist_name: print(f'{i} has been processed.') continue
- 判断文件所在路径是否存在,不存在新建
# file_name
if not os.path.exists(os.path.dirname(file_name)):
os.makedirs(os.path.dirname(file_name))
- Pandas的iloc和loc
# iloc为顺序索引,只接收数字,从0到len(df)
# loc为根据index索引,可以是字母,根据index来索引
import pandas as pd
df1 = pd.DataFrame({
'A': [1, 1, 3],
'B': [1, 5, 6],
'C': [7, 8, 9]
})
df2 = df1.drop_duplicates(subset=['A'])
for i in df2.index:
print(df2.iloc[i])
- 多线程调用api
def get_gpt_result_multi_process(json_path, save_json, deduplication_key, save_key="gpt_result", max_workers=5):
df = pd.read_json(json_path, lines=True)
if not os.path.exists(os.path.dirname(save_json)):
os.makedirs(os.path.dirname(save_json))
fw = open(save_json, 'a+')
exist_save = pd.read_json(save_json, lines=True)
try:
exist_name = exist_save[deduplication_key].to_list()
except:
exist_name = []
def get_answer_i(i):
print(f"{i} of {len(df)} is start")
row = df.iloc[i]
if row[deduplication_key] in exist_name:
return [None, i, f"{i} has been processed"]
# 获取question数据
question = ""
answer = ""
result = api_request(messages)
row_new = row.copy()
if result is None:
return [None, i, f"{i}'s result is None"]
row_new[save_key] = result
# try:
# row_new['low_quality_result'] = eval(result)
# except:
# return [None, i, f"{i} output result is not a json"]
return [row_new, i]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for result in executor.map(get_answer_i, range(len(df))):
if result[0] is None:
print(result[2])
continue
fw.write(json.dumps(result[0].to_dict(), ensure_ascii=False) + '\n')
fw.close()
添加控制台运行参数/命令参数
import argparse parser = argparse.ArgumentParser() parser.add_argument('--input-id', type=str, default="1") args = parser.parse_args()
- 终端后台启动命令,按顺序执行命令
nohup python -u job.py > log.log 2>&1 &
pid=$!
wait $pid
nohup python -u job.py > log.log 2>&1 &
- 模型对比可视化显示
def create_image_with_texts_with_scale(image_path, texts, font_path, img_width=800):
# Load image if available, otherwise create a blank image
if image_path and os.path.exists(image_path):
image = Image.open(image_path)
else:
image = Image.new('RGB', (400, 400), color='white')
image = resize_image(image, img_width)
print(image.size)
font = ImageFont.truetype(font_path, 20)
draw = ImageDraw.Draw(image)
# Calculate text dimensions and wrap texts
wrapped_texts = []
for text in texts:
wrapped_lines = [textwrap.fill(line, width=40) for line in text.splitlines()]
# 将处理后的行重新组合为一个文本
wrapped_text = "\n".join(wrapped_lines)
wrapped_texts.append(wrapped_text)
# text_sizes_new = []
# for text in wrapped_texts:
# l, t, r, b = draw.textbbox((0, 0), text, font=font)
# text_sizes_new.append([r - l, b - t])
# print("size: ")
# print(text_sizes_new)
text_sizes = [draw.textsize(text, font=font) for text in wrapped_texts]
# print(text_sizes)
max_text_height = max([size[1] for size in text_sizes])
total_text_width = sum([size[0] for size in text_sizes]) + 10 * (len(wrapped_texts) - 1)
new_image_height = max(image.height, max_text_height)
new_image_width = image.width + total_text_width + 20 # 20 for padding
new_image = Image.new('RGB', (new_image_width, new_image_height), color='white')
new_image.paste(image, (0, 0))
draw = ImageDraw.Draw(new_image)
x_text = image.width + 10
y_text = (new_image_height - max_text_height) // 2
for text in wrapped_texts:
draw.text((x_text, y_text), text, font=font, fill='black')
x_text += draw.textsize(text, font=font)[0] + 10
return new_image
def save_images_to_pdf2(image_text_array, pdf_path, font_path):
image_bytes_list = []
for i, item in enumerate(image_text_array):
image_path = item.get('image', None)
texts = item.get('texts', [])
img = create_image_with_texts_no_scale(image_path, texts, font_path)
# img.save(f'./pdf_review/{i}.png')
img = resize_image(img, 2156)
image_bytes = io.BytesIO()
img.save(image_bytes, format='JPEG')
image_bytes_list.append(image_bytes.getvalue())
with open(pdf_path, "wb") as f:
f.write(img2pdf.convert(image_bytes_list))
文件名添加r_len格式
def rename_with_r_len(json_name): df_t = pd.read_json(json_name, lines=True) new_name = json_name.rpartition('.')[0] + f"_r{len(df_t)}" + '.' + json_name.rpartition('.')[-1] os.rename(json_name, new_name)
- git文件提交流程
# 标准格式处理(一个json一个文件夹)
def split_on_one_dir_json_data_to_every_dir(jsons):
for json_name in jsons:
basename = os.path.basename(json_name)
new_name = os.path.join(os.path.dirname(json_name), basename.rpartition('.')[0], basename)
if not os.path.exists(os.path.dirname(new_name)):
os.makedirs(os.path.dirname(new_name))
os.rename(json_name, new_name)
import glob
import os
jsons = glob.glob('/*.jsonl')
split_on_one_dir_json_data_to_every_dir(jsons)
# readme.md 文件生成
content = '''
# readme内容
'''
def gen_readme_md_file(json_name):
base_name = os.path.basename(json_name)
subject = base_name.split('_')[1]
df_t = pd.read_json(json_name, lines=True)
size = len(df_t)
dir_name = os.path.dirname(json_name)
fw = open(os.path.join(dir_name, 'readme.md'), 'w+', encoding='utf-8')
fw.write(content.format(basename=base_name, date="20240809", subject=subject, size=size))
fw.close()
import pandas as pd
import glob
import os
jsons = glob.glob('/*/*.jsonl')
for json_name in jsons:
gen_readme_md_file(json_name)
# ipynb文件生成
import nbformat as nbf
data_check_func_code = '''
import json
def check_format(data, verbose=False):
# check valid turn
if len(data['conversations']) < 2:
return "invalid turn"
# check contains height, width if contain image
if 'image' not in data:
pass
else:
if 'width' not in data or 'height' not in data:
return "no height/width"
if type(data['image']) is list:
if type(data['width']) is not list or type(data['height']) is not list:
return "no height/width"
for x in data['width']:
if x<=0:
return "no height/width"
for x in data['height']:
if x <= 0:
return "no height/width"
elif type(data['image']) is str:
if type(data['width']) is not int or type(data['height']) is not int or data['width'] <= 0 or data['height'] <=0:
return "no height/width"
else:
return "unknown error"
# check no empty
turn = 0
num_image = 0
for i, message in enumerate(data['conversations']):
num_image += message['value'].count("<image>")
# check is valid type
if not isinstance(message['value'], str):
return "invalid type"
# check no empty
if not (len(message['value']) > 0):
return f"empty {message['from']} message"
if message['from'] == 'human':
if turn % 2 != 0:
return "not conversation"
turn += 1
if message['from'] == 'gpt':
if turn % 2 != 1:
return "not conversation"
turn += 1
if 'image' not in data:
pass
elif type(data['image']) is list:
if num_image != len(data['image']):
return "wrong image number"
elif type(data['image']) is str:
if num_image != 1:
print(data['image'])
return "wrong image number"
else:
return "unknown error"
return None
'''
data_check_run_code = '''
from datasets import load_dataset
from collections import defaultdict
def filter_dataset(dataset):
new_dataset = []
invalid_type2idx = defaultdict(list)
for idx, data in enumerate(dataset):
try:
ret = check_format(data)
if ret is None:
new_dataset.append(data)
else:
invalid_type2idx[ret].append(idx)
except:
invalid_type2idx["unknown error"].append(idx)
return new_dataset, invalid_type2idx
data_json = "{json_name}"
dataset = ds = load_dataset('json', data_files=data_json, split='train')
new_dataset, invalid_type2idx = filter_dataset(dataset)
'''
def gen_ipynb_file(json_name):
basename = os.path.basename(json_name)
dirname = os.path.dirname(json_name)
fw = open(os.path.join(dirname, basename.rpartition('.')[0] + '.ipynb'), 'w+', encoding='utf-8')
nb = nbf.v4.new_notebook()
markdown_cell = nbf.v4.new_markdown_cell("### 数据检查")
code_cell1 = nbf.v4.new_code_cell(data_check_func_code)
code_cell2 = nbf.v4.new_code_cell(data_check_run_code.format(json_name=json_name))
nb.cells.append(markdown_cell)
nb.cells.append(code_cell1)
nb.cells.append(code_cell2)
nbf.write(nb, fw)
jsons = glob.glob('/*/*.jsonl')
for json_name in jsons:
gen_ipynb_file(json_name)
常用bash命令
# nvidia-smi watch -n 0.1 -d nvidia-smi
评论 (0)