标签搜索
侧边栏壁纸
  • 累计撰写 2 篇文章
  • 累计收到 1 条评论

常用代码复用

mistyrain
2024-10-17 / 0 评论 / 14 阅读 / 正在检测是否收录...
  1. json相关
# 合并json
def merge_jsons(json_names, merged_name):
    df_jsons = []
    for json_name in json_names:
        df_jsons.append(pd.read_json(json_name, lines=True))
    df_merge = pd.concat(df_jsons)

    fw_all = open(merged_name, 'w+')

    for index, row in df_merge.iterrows():
        fw_all.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
    fw_all.close()
   
   
# df文件按fw的json保存(路径少一个\/少一个/)
def save_df_to_json(df, save_json_name):
    fw = open(save_json_name, 'w+')
    for index, row in df.iterrows():
        fw.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
    fw.close()
    

# json的按行格式(jsonl)和高可读性格式(indent=4)转换
def lines_json_to_indent_json(source_json, target_json):
    df_s = pd.read_json(source_json, lines=True)
    fw_t = open(target_json, 'w+')

    lists = []
    for index, row in df_s.iterrows():
        lists.append(row.to_dict())
    json.dump(lists, fw_t, indent=4, ensure_ascii=False)
    fw_t.close()

def indent_json_to_lines_json(source_json, target_json):
    df_s = pd.read_json(source_json)
    fw_t = open(target_json, 'w+')
    for index, row in df_s.iterrows():
        fw_t.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
    fw_t.close()
  1. 可视化图片
# 可视化图片
from aoss_client.client import Client
from PIL import Image
import io
import os

client = Client('aoss.conf')

def vis_picture(img_path, vis_width=224):
    if 's3://' in img_path:
        # print(f"image in reading, {img_path}")
        try:
            image = Image.open(io.BytesIO(client.get(img_path)))
        except Exception as e:
            print(e)
    else:
        image = Image.open(img_path)
    # print("image read end")
    # print(img_path, image.size)
    # vis_width = 224
    image = image.resize((vis_width, int(image.size[1]/image.size[0]*vis_width)))
    
    from IPython.display import display
    display(image)
  1. Prompt
prompt ='''
// sys prompt
你是一名《角色定义》,根据[变量],并依据<request>来《解决问题》。

# request #
《示例》
拒绝回答主要指模型不回答或拒绝回答用户的问题,例如以下情况:
1. 表示抱歉,随后表示无法回答/解释;
2. 表示无法评价/回答;
3. 表示不能回答用户的问题;

# input format #
[对话]以以下格式给出:
[
    {{"from": "human", "value": "人类问题"}}, 
    {{"from": "gpt", "value": "模型的回答"}},
    ...
]

# output format #
请输出的内容遵守严格的json格式,且只有以下的json内容,具体的json格式如下:
{{
    "is_refused": 0或1(出现拒绝回答的情况为1,0表示正常回答),
    "reson": "原因,给出拒绝回答的原文内容所在"
}}without code block

// usr prompt
请根据以下对话内容,判断模型gpt的回答是否出现拒答。
[对话]: {dialog}
'''
  1. api接口
    4.1 本地/ceph图片读取用于api

    def encode_image_to_base64(image_path):
     if 's3' in image_path:
         # return get_ceph_img(image_path)
         # return base64.b64encode(io.BytesIO(client.get(image_path))).decode("utf-8")
         return base64.b64encode(client.get(image_path)).decode("utf-8")
     else:
         with open(image_path, "rb") as f:
             return base64.b64encode(f.read()).decode("utf-8")
  2. api断点重跑,去重

    def get_result(save_json, deduplication_key : str, df_source, ):
     if not os.path.exists(os.path.dirname(save_json)):
             os.makedirs(os.path.dirname(save_json))
     fw = open(save_json, 'a+')
     exist_save = pd.read_json(save_json, lines=True)
     
     try:
         exist_name = exist_save[deduplication_key].to_list()
     except:
         exist_name = []
     
     for i in df_source.index:
         row = df_source.loc[i]
         if row[deduplication_key] in exist_name:
             print(f'{i} has been processed.')
             continue
    
  3. 判断文件所在路径是否存在,不存在新建
# file_name
if not os.path.exists(os.path.dirname(file_name)):
        os.makedirs(os.path.dirname(file_name))
  1. Pandas的iloc和loc
# iloc为顺序索引,只接收数字,从0到len(df)
# loc为根据index索引,可以是字母,根据index来索引

import pandas as pd
df1 = pd.DataFrame({
    'A': [1, 1, 3],
    'B': [1, 5, 6],
    'C': [7, 8, 9]
})

df2 = df1.drop_duplicates(subset=['A'])

for i in df2.index:
    print(df2.iloc[i])
  1. 多线程调用api
def get_gpt_result_multi_process(json_path, save_json, deduplication_key, save_key="gpt_result", max_workers=5):
    df = pd.read_json(json_path, lines=True)

    if not os.path.exists(os.path.dirname(save_json)):
        os.makedirs(os.path.dirname(save_json))
    fw = open(save_json, 'a+')
    exist_save = pd.read_json(save_json, lines=True)
    
    try:
        exist_name = exist_save[deduplication_key].to_list()
    except:
        exist_name = []

    def get_answer_i(i):
        print(f"{i} of {len(df)} is start")
        row = df.iloc[i]
        if row[deduplication_key] in exist_name:
            return [None, i, f"{i} has been processed"]
         
         # 获取question数据
        question = ""
        answer = ""
        
        result = api_request(messages)
        
        row_new = row.copy()
        if result is None:
            return [None, i, f"{i}'s result is None"]
        row_new[save_key] = result
        # try:
        #     row_new['low_quality_result'] = eval(result)
        # except:
        #     return [None, i, f"{i} output result is not a json"]
        return [row_new, i]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        for result in executor.map(get_answer_i, range(len(df))):
            if result[0] is None:
                print(result[2])
                continue
            fw.write(json.dumps(result[0].to_dict(), ensure_ascii=False) + '\n')
    fw.close()
  1. 添加控制台运行参数/命令参数

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-id', type=str, default="1")
    args = parser.parse_args()
  2. 终端后台启动命令,按顺序执行命令
nohup python -u job.py > log.log 2>&1 &

pid=$!
wait $pid

nohup python -u job.py > log.log 2>&1 &
  1. 模型对比可视化显示
def create_image_with_texts_with_scale(image_path, texts, font_path, img_width=800):
    # Load image if available, otherwise create a blank image
    if image_path and os.path.exists(image_path):
        image = Image.open(image_path)
    else:
        image = Image.new('RGB', (400, 400), color='white')
    image = resize_image(image, img_width)
    print(image.size)
    font = ImageFont.truetype(font_path, 20)
    draw = ImageDraw.Draw(image)

    # Calculate text dimensions and wrap texts
    wrapped_texts = []
    for text in texts:
        wrapped_lines = [textwrap.fill(line, width=40) for line in text.splitlines()]
        # 将处理后的行重新组合为一个文本
        wrapped_text = "\n".join(wrapped_lines)
        wrapped_texts.append(wrapped_text)
    # text_sizes_new = []
    # for text in wrapped_texts:
    #     l, t, r, b = draw.textbbox((0, 0), text, font=font)
    #     text_sizes_new.append([r - l, b - t])
    # print("size: ")
    # print(text_sizes_new)
    text_sizes = [draw.textsize(text, font=font) for text in wrapped_texts]
    # print(text_sizes)
    max_text_height = max([size[1] for size in text_sizes])
    total_text_width = sum([size[0] for size in text_sizes]) + 10 * (len(wrapped_texts) - 1)

    new_image_height = max(image.height, max_text_height)
    new_image_width = image.width + total_text_width + 20  # 20 for padding

    new_image = Image.new('RGB', (new_image_width, new_image_height), color='white')
    new_image.paste(image, (0, 0))

    draw = ImageDraw.Draw(new_image)
    x_text = image.width + 10
    y_text = (new_image_height - max_text_height) // 2

    for text in wrapped_texts:
        draw.text((x_text, y_text), text, font=font, fill='black')
        x_text += draw.textsize(text, font=font)[0] + 10

    return new_image
    
   def save_images_to_pdf2(image_text_array, pdf_path, font_path):
    image_bytes_list = []
    for i, item in enumerate(image_text_array):
        image_path = item.get('image', None)
        texts = item.get('texts', [])
        img = create_image_with_texts_no_scale(image_path, texts, font_path)
        # img.save(f'./pdf_review/{i}.png')
        img = resize_image(img, 2156)
        image_bytes = io.BytesIO()
        img.save(image_bytes, format='JPEG')
        image_bytes_list.append(image_bytes.getvalue())
    with open(pdf_path, "wb") as f:
        f.write(img2pdf.convert(image_bytes_list))
  1. 文件名添加r_len格式

    def rename_with_r_len(json_name):
    df_t = pd.read_json(json_name, lines=True)
    new_name = json_name.rpartition('.')[0] + f"_r{len(df_t)}" + '.' + json_name.rpartition('.')[-1]
    os.rename(json_name, new_name)  
  2. git文件提交流程
# 标准格式处理(一个json一个文件夹)
def split_on_one_dir_json_data_to_every_dir(jsons):
    for json_name in jsons:
        basename = os.path.basename(json_name)
        new_name = os.path.join(os.path.dirname(json_name), basename.rpartition('.')[0], basename)
        if not os.path.exists(os.path.dirname(new_name)):
            os.makedirs(os.path.dirname(new_name))
        os.rename(json_name, new_name)
import glob
import os
jsons = glob.glob('/*.jsonl')
split_on_one_dir_json_data_to_every_dir(jsons)
# readme.md 文件生成
content = '''
# readme内容
'''
def gen_readme_md_file(json_name):
    base_name = os.path.basename(json_name)
    subject = base_name.split('_')[1]
    df_t =  pd.read_json(json_name, lines=True)
    size = len(df_t)
    dir_name = os.path.dirname(json_name)
    fw = open(os.path.join(dir_name, 'readme.md'), 'w+', encoding='utf-8')
    fw.write(content.format(basename=base_name, date="20240809", subject=subject, size=size))
    fw.close()
import pandas as pd
import glob
import os
jsons = glob.glob('/*/*.jsonl')
for json_name in jsons:
    gen_readme_md_file(json_name)
# ipynb文件生成
import nbformat as nbf
data_check_func_code = '''
import json

def check_format(data, verbose=False):
    # check valid turn
    if len(data['conversations']) < 2:
        return "invalid turn"
    # check contains height, width if contain image
    if 'image' not in data:
        pass
    else:
        if 'width' not in data or 'height' not in data:
            return "no height/width"
            
        if type(data['image']) is list:
            if type(data['width']) is not list or type(data['height']) is not list:
                return "no height/width"
            for x in data['width']:
                if x<=0:
                    return "no height/width"
            for x in data['height']:
                if x <= 0:
                    return "no height/width"
        elif type(data['image']) is str:
            if type(data['width']) is not int or type(data['height']) is not int or data['width'] <= 0 or data['height'] <=0:
                return "no height/width"
        else:
            return "unknown error"  
    
    # check no empty
    turn = 0
    num_image = 0
    for i, message in enumerate(data['conversations']):
        num_image += message['value'].count("<image>")
        # check is valid type
        if not isinstance(message['value'], str):
            return "invalid type"
        # check no empty
        if not (len(message['value']) > 0):
            return f"empty {message['from']} message"
        if message['from'] == 'human':
            if turn % 2 != 0:
                return "not conversation"
            turn += 1
        if message['from'] == 'gpt':
            if turn % 2 != 1:
                return "not conversation"
            turn += 1
    if 'image' not in data:
        pass
    elif type(data['image']) is list:
        if num_image != len(data['image']):
            return "wrong image number"
    elif type(data['image']) is str:
        if num_image != 1:
            print(data['image'])
            return "wrong image number"
    else:
        return "unknown error"      
    return None 
'''
data_check_run_code = '''
from datasets import load_dataset
from collections import defaultdict

def filter_dataset(dataset):
    new_dataset = []
    invalid_type2idx = defaultdict(list)
    for idx, data in enumerate(dataset):
        try:
            ret = check_format(data)
            if ret is None:
                new_dataset.append(data)
            else:
                invalid_type2idx[ret].append(idx)
        except:
            invalid_type2idx["unknown error"].append(idx)
    return new_dataset, invalid_type2idx
    
data_json = "{json_name}"
dataset = ds = load_dataset('json', data_files=data_json, split='train')
new_dataset, invalid_type2idx = filter_dataset(dataset)
'''

def gen_ipynb_file(json_name):
    basename = os.path.basename(json_name)
    dirname = os.path.dirname(json_name)
    fw = open(os.path.join(dirname, basename.rpartition('.')[0] + '.ipynb'), 'w+', encoding='utf-8')
    nb = nbf.v4.new_notebook()
    markdown_cell = nbf.v4.new_markdown_cell("### 数据检查")
    code_cell1 = nbf.v4.new_code_cell(data_check_func_code)
    code_cell2 = nbf.v4.new_code_cell(data_check_run_code.format(json_name=json_name))
    nb.cells.append(markdown_cell)
    nb.cells.append(code_cell1)
    nb.cells.append(code_cell2)
    nbf.write(nb, fw)

jsons = glob.glob('/*/*.jsonl')
for json_name in jsons:
    gen_ipynb_file(json_name)
  1. 常用bash命令

    # nvidia-smi
    watch -n 0.1 -d nvidia-smi 
0

评论 (0)

取消