首页
关于
Search
1
欢迎使用 Typecho
17 阅读
2
常用代码复用
14 阅读
默认分类
python
生活记录
服务器
登录
Search
标签搜索
代码复用
Typecho
累计撰写
2
篇文章
累计收到
1
条评论
首页
栏目
默认分类
python
生活记录
服务器
页面
关于
搜索到
1
篇与
的结果
2024-10-17
常用代码复用
json相关# 合并json def merge_jsons(json_names, merged_name): df_jsons = [] for json_name in json_names: df_jsons.append(pd.read_json(json_name, lines=True)) df_merge = pd.concat(df_jsons) fw_all = open(merged_name, 'w+') for index, row in df_merge.iterrows(): fw_all.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n') fw_all.close() # df文件按fw的json保存(路径少一个\/少一个/) def save_df_to_json(df, save_json_name): fw = open(save_json_name, 'w+') for index, row in df.iterrows(): fw.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n') fw.close() # json的按行格式(jsonl)和高可读性格式(indent=4)转换 def lines_json_to_indent_json(source_json, target_json): df_s = pd.read_json(source_json, lines=True) fw_t = open(target_json, 'w+') lists = [] for index, row in df_s.iterrows(): lists.append(row.to_dict()) json.dump(lists, fw_t, indent=4, ensure_ascii=False) fw_t.close() def indent_json_to_lines_json(source_json, target_json): df_s = pd.read_json(source_json) fw_t = open(target_json, 'w+') for index, row in df_s.iterrows(): fw_t.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n') fw_t.close()可视化图片# 可视化图片 from aoss_client.client import Client from PIL import Image import io import os client = Client('aoss.conf') def vis_picture(img_path, vis_width=224): if 's3://' in img_path: # print(f"image in reading, {img_path}") try: image = Image.open(io.BytesIO(client.get(img_path))) except Exception as e: print(e) else: image = Image.open(img_path) # print("image read end") # print(img_path, image.size) # vis_width = 224 image = image.resize((vis_width, int(image.size[1]/image.size[0]*vis_width))) from IPython.display import display display(image)Promptprompt =''' // sys prompt 你是一名《角色定义》,根据[变量],并依据<request>来《解决问题》。 # request # 《示例》 拒绝回答主要指模型不回答或拒绝回答用户的问题,例如以下情况: 1. 表示抱歉,随后表示无法回答/解释; 2. 表示无法评价/回答; 3. 表示不能回答用户的问题; # input format # [对话]以以下格式给出: [ {{"from": "human", "value": "人类问题"}}, {{"from": "gpt", "value": "模型的回答"}}, ... ] # output format # 请输出的内容遵守严格的json格式,且只有以下的json内容,具体的json格式如下: {{ "is_refused": 0或1(出现拒绝回答的情况为1,0表示正常回答), "reson": "原因,给出拒绝回答的原文内容所在" }}without code block // usr prompt 请根据以下对话内容,判断模型gpt的回答是否出现拒答。 [对话]: {dialog} '''api接口 4.1 本地/ceph图片读取用于apidef encode_image_to_base64(image_path): if 's3' in image_path: # return get_ceph_img(image_path) # return base64.b64encode(io.BytesIO(client.get(image_path))).decode("utf-8") return base64.b64encode(client.get(image_path)).decode("utf-8") else: with open(image_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8")api断点重跑,去重def get_result(save_json, deduplication_key : str, df_source, ): if not os.path.exists(os.path.dirname(save_json)): os.makedirs(os.path.dirname(save_json)) fw = open(save_json, 'a+') exist_save = pd.read_json(save_json, lines=True) try: exist_name = exist_save[deduplication_key].to_list() except: exist_name = [] for i in df_source.index: row = df_source.loc[i] if row[deduplication_key] in exist_name: print(f'{i} has been processed.') continue 判断文件所在路径是否存在,不存在新建# file_name if not os.path.exists(os.path.dirname(file_name)): os.makedirs(os.path.dirname(file_name))Pandas的iloc和loc# iloc为顺序索引,只接收数字,从0到len(df) # loc为根据index索引,可以是字母,根据index来索引 import pandas as pd df1 = pd.DataFrame({ 'A': [1, 1, 3], 'B': [1, 5, 6], 'C': [7, 8, 9] }) df2 = df1.drop_duplicates(subset=['A']) for i in df2.index: print(df2.iloc[i]) 多线程调用apidef get_gpt_result_multi_process(json_path, save_json, deduplication_key, save_key="gpt_result", max_workers=5): df = pd.read_json(json_path, lines=True) if not os.path.exists(os.path.dirname(save_json)): os.makedirs(os.path.dirname(save_json)) fw = open(save_json, 'a+') exist_save = pd.read_json(save_json, lines=True) try: exist_name = exist_save[deduplication_key].to_list() except: exist_name = [] def get_answer_i(i): print(f"{i} of {len(df)} is start") row = df.iloc[i] if row[deduplication_key] in exist_name: return [None, i, f"{i} has been processed"] # 获取question数据 question = "" answer = "" result = api_request(messages) row_new = row.copy() if result is None: return [None, i, f"{i}'s result is None"] row_new[save_key] = result # try: # row_new['low_quality_result'] = eval(result) # except: # return [None, i, f"{i} output result is not a json"] return [row_new, i] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for result in executor.map(get_answer_i, range(len(df))): if result[0] is None: print(result[2]) continue fw.write(json.dumps(result[0].to_dict(), ensure_ascii=False) + '\n') fw.close()添加控制台运行参数/命令参数import argparse parser = argparse.ArgumentParser() parser.add_argument('--input-id', type=str, default="1") args = parser.parse_args()终端后台启动命令,按顺序执行命令nohup python -u job.py > log.log 2>&1 & pid=$! wait $pid nohup python -u job.py > log.log 2>&1 & 模型对比可视化显示def create_image_with_texts_with_scale(image_path, texts, font_path, img_width=800): # Load image if available, otherwise create a blank image if image_path and os.path.exists(image_path): image = Image.open(image_path) else: image = Image.new('RGB', (400, 400), color='white') image = resize_image(image, img_width) print(image.size) font = ImageFont.truetype(font_path, 20) draw = ImageDraw.Draw(image) # Calculate text dimensions and wrap texts wrapped_texts = [] for text in texts: wrapped_lines = [textwrap.fill(line, width=40) for line in text.splitlines()] # 将处理后的行重新组合为一个文本 wrapped_text = "\n".join(wrapped_lines) wrapped_texts.append(wrapped_text) # text_sizes_new = [] # for text in wrapped_texts: # l, t, r, b = draw.textbbox((0, 0), text, font=font) # text_sizes_new.append([r - l, b - t]) # print("size: ") # print(text_sizes_new) text_sizes = [draw.textsize(text, font=font) for text in wrapped_texts] # print(text_sizes) max_text_height = max([size[1] for size in text_sizes]) total_text_width = sum([size[0] for size in text_sizes]) + 10 * (len(wrapped_texts) - 1) new_image_height = max(image.height, max_text_height) new_image_width = image.width + total_text_width + 20 # 20 for padding new_image = Image.new('RGB', (new_image_width, new_image_height), color='white') new_image.paste(image, (0, 0)) draw = ImageDraw.Draw(new_image) x_text = image.width + 10 y_text = (new_image_height - max_text_height) // 2 for text in wrapped_texts: draw.text((x_text, y_text), text, font=font, fill='black') x_text += draw.textsize(text, font=font)[0] + 10 return new_image def save_images_to_pdf2(image_text_array, pdf_path, font_path): image_bytes_list = [] for i, item in enumerate(image_text_array): image_path = item.get('image', None) texts = item.get('texts', []) img = create_image_with_texts_no_scale(image_path, texts, font_path) # img.save(f'./pdf_review/{i}.png') img = resize_image(img, 2156) image_bytes = io.BytesIO() img.save(image_bytes, format='JPEG') image_bytes_list.append(image_bytes.getvalue()) with open(pdf_path, "wb") as f: f.write(img2pdf.convert(image_bytes_list))文件名添加r_len格式def rename_with_r_len(json_name): df_t = pd.read_json(json_name, lines=True) new_name = json_name.rpartition('.')[0] + f"_r{len(df_t)}" + '.' + json_name.rpartition('.')[-1] os.rename(json_name, new_name) git文件提交流程# 标准格式处理(一个json一个文件夹) def split_on_one_dir_json_data_to_every_dir(jsons): for json_name in jsons: basename = os.path.basename(json_name) new_name = os.path.join(os.path.dirname(json_name), basename.rpartition('.')[0], basename) if not os.path.exists(os.path.dirname(new_name)): os.makedirs(os.path.dirname(new_name)) os.rename(json_name, new_name) import glob import os jsons = glob.glob('/*.jsonl') split_on_one_dir_json_data_to_every_dir(jsons)# readme.md 文件生成 content = ''' # readme内容 ''' def gen_readme_md_file(json_name): base_name = os.path.basename(json_name) subject = base_name.split('_')[1] df_t = pd.read_json(json_name, lines=True) size = len(df_t) dir_name = os.path.dirname(json_name) fw = open(os.path.join(dir_name, 'readme.md'), 'w+', encoding='utf-8') fw.write(content.format(basename=base_name, date="20240809", subject=subject, size=size)) fw.close() import pandas as pd import glob import os jsons = glob.glob('/*/*.jsonl') for json_name in jsons: gen_readme_md_file(json_name)# ipynb文件生成 import nbformat as nbf data_check_func_code = ''' import json def check_format(data, verbose=False): # check valid turn if len(data['conversations']) < 2: return "invalid turn" # check contains height, width if contain image if 'image' not in data: pass else: if 'width' not in data or 'height' not in data: return "no height/width" if type(data['image']) is list: if type(data['width']) is not list or type(data['height']) is not list: return "no height/width" for x in data['width']: if x<=0: return "no height/width" for x in data['height']: if x <= 0: return "no height/width" elif type(data['image']) is str: if type(data['width']) is not int or type(data['height']) is not int or data['width'] <= 0 or data['height'] <=0: return "no height/width" else: return "unknown error" # check no empty turn = 0 num_image = 0 for i, message in enumerate(data['conversations']): num_image += message['value'].count("<image>") # check is valid type if not isinstance(message['value'], str): return "invalid type" # check no empty if not (len(message['value']) > 0): return f"empty {message['from']} message" if message['from'] == 'human': if turn % 2 != 0: return "not conversation" turn += 1 if message['from'] == 'gpt': if turn % 2 != 1: return "not conversation" turn += 1 if 'image' not in data: pass elif type(data['image']) is list: if num_image != len(data['image']): return "wrong image number" elif type(data['image']) is str: if num_image != 1: print(data['image']) return "wrong image number" else: return "unknown error" return None ''' data_check_run_code = ''' from datasets import load_dataset from collections import defaultdict def filter_dataset(dataset): new_dataset = [] invalid_type2idx = defaultdict(list) for idx, data in enumerate(dataset): try: ret = check_format(data) if ret is None: new_dataset.append(data) else: invalid_type2idx[ret].append(idx) except: invalid_type2idx["unknown error"].append(idx) return new_dataset, invalid_type2idx data_json = "{json_name}" dataset = ds = load_dataset('json', data_files=data_json, split='train') new_dataset, invalid_type2idx = filter_dataset(dataset) ''' def gen_ipynb_file(json_name): basename = os.path.basename(json_name) dirname = os.path.dirname(json_name) fw = open(os.path.join(dirname, basename.rpartition('.')[0] + '.ipynb'), 'w+', encoding='utf-8') nb = nbf.v4.new_notebook() markdown_cell = nbf.v4.new_markdown_cell("### 数据检查") code_cell1 = nbf.v4.new_code_cell(data_check_func_code) code_cell2 = nbf.v4.new_code_cell(data_check_run_code.format(json_name=json_name)) nb.cells.append(markdown_cell) nb.cells.append(code_cell1) nb.cells.append(code_cell2) nbf.write(nb, fw) jsons = glob.glob('/*/*.jsonl') for json_name in jsons: gen_ipynb_file(json_name)常用bash命令# nvidia-smi watch -n 0.1 -d nvidia-smi
2024年10月17日
14 阅读
0 评论
0 点赞