- json相关
 
# 合并json
def merge_jsons(json_names, merged_name):
    df_jsons = []
    for json_name in json_names:
        df_jsons.append(pd.read_json(json_name, lines=True))
    df_merge = pd.concat(df_jsons)
    fw_all = open(merged_name, 'w+')
    for index, row in df_merge.iterrows():
        fw_all.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
    fw_all.close()
   
   
# df文件按fw的json保存(路径少一个\/少一个/)
def save_df_to_json(df, save_json_name):
    fw = open(save_json_name, 'w+')
    for index, row in df.iterrows():
        fw.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
    fw.close()
    
# json的按行格式(jsonl)和高可读性格式(indent=4)转换
def lines_json_to_indent_json(source_json, target_json):
    df_s = pd.read_json(source_json, lines=True)
    fw_t = open(target_json, 'w+')
    lists = []
    for index, row in df_s.iterrows():
        lists.append(row.to_dict())
    json.dump(lists, fw_t, indent=4, ensure_ascii=False)
    fw_t.close()
def indent_json_to_lines_json(source_json, target_json):
    df_s = pd.read_json(source_json)
    fw_t = open(target_json, 'w+')
    for index, row in df_s.iterrows():
        fw_t.write(json.dumps(row.to_dict(), ensure_ascii=False) + '\n')
    fw_t.close()- 可视化图片
 
# 可视化图片
from aoss_client.client import Client
from PIL import Image
import io
import os
client = Client('aoss.conf')
def vis_picture(img_path, vis_width=224):
    if 's3://' in img_path:
        # print(f"image in reading, {img_path}")
        try:
            image = Image.open(io.BytesIO(client.get(img_path)))
        except Exception as e:
            print(e)
    else:
        image = Image.open(img_path)
    # print("image read end")
    # print(img_path, image.size)
    # vis_width = 224
    image = image.resize((vis_width, int(image.size[1]/image.size[0]*vis_width)))
    
    from IPython.display import display
    display(image)- Prompt
 
prompt ='''
// sys prompt
你是一名《角色定义》,根据[变量],并依据<request>来《解决问题》。
# request #
《示例》
拒绝回答主要指模型不回答或拒绝回答用户的问题,例如以下情况:
1. 表示抱歉,随后表示无法回答/解释;
2. 表示无法评价/回答;
3. 表示不能回答用户的问题;
# input format #
[对话]以以下格式给出:
[
    {{"from": "human", "value": "人类问题"}}, 
    {{"from": "gpt", "value": "模型的回答"}},
    ...
]
# output format #
请输出的内容遵守严格的json格式,且只有以下的json内容,具体的json格式如下:
{{
    "is_refused": 0或1(出现拒绝回答的情况为1,0表示正常回答),
    "reson": "原因,给出拒绝回答的原文内容所在"
}}without code block
// usr prompt
请根据以下对话内容,判断模型gpt的回答是否出现拒答。
[对话]: {dialog}
'''api接口
4.1 本地/ceph图片读取用于apidef encode_image_to_base64(image_path): if 's3' in image_path: # return get_ceph_img(image_path) # return base64.b64encode(io.BytesIO(client.get(image_path))).decode("utf-8") return base64.b64encode(client.get(image_path)).decode("utf-8") else: with open(image_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8")api断点重跑,去重
def get_result(save_json, deduplication_key : str, df_source, ): if not os.path.exists(os.path.dirname(save_json)): os.makedirs(os.path.dirname(save_json)) fw = open(save_json, 'a+') exist_save = pd.read_json(save_json, lines=True) try: exist_name = exist_save[deduplication_key].to_list() except: exist_name = [] for i in df_source.index: row = df_source.loc[i] if row[deduplication_key] in exist_name: print(f'{i} has been processed.') continue- 判断文件所在路径是否存在,不存在新建
 
# file_name
if not os.path.exists(os.path.dirname(file_name)):
        os.makedirs(os.path.dirname(file_name))- Pandas的iloc和loc
 
# iloc为顺序索引,只接收数字,从0到len(df)
# loc为根据index索引,可以是字母,根据index来索引
import pandas as pd
df1 = pd.DataFrame({
    'A': [1, 1, 3],
    'B': [1, 5, 6],
    'C': [7, 8, 9]
})
df2 = df1.drop_duplicates(subset=['A'])
for i in df2.index:
    print(df2.iloc[i])
- 多线程调用api
 
def get_gpt_result_multi_process(json_path, save_json, deduplication_key, save_key="gpt_result", max_workers=5):
    df = pd.read_json(json_path, lines=True)
    if not os.path.exists(os.path.dirname(save_json)):
        os.makedirs(os.path.dirname(save_json))
    fw = open(save_json, 'a+')
    exist_save = pd.read_json(save_json, lines=True)
    
    try:
        exist_name = exist_save[deduplication_key].to_list()
    except:
        exist_name = []
    def get_answer_i(i):
        print(f"{i} of {len(df)} is start")
        row = df.iloc[i]
        if row[deduplication_key] in exist_name:
            return [None, i, f"{i} has been processed"]
         
         # 获取question数据
        question = ""
        answer = ""
        
        result = api_request(messages)
        
        row_new = row.copy()
        if result is None:
            return [None, i, f"{i}'s result is None"]
        row_new[save_key] = result
        # try:
        #     row_new['low_quality_result'] = eval(result)
        # except:
        #     return [None, i, f"{i} output result is not a json"]
        return [row_new, i]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        for result in executor.map(get_answer_i, range(len(df))):
            if result[0] is None:
                print(result[2])
                continue
            fw.write(json.dumps(result[0].to_dict(), ensure_ascii=False) + '\n')
    fw.close()添加控制台运行参数/命令参数
import argparse parser = argparse.ArgumentParser() parser.add_argument('--input-id', type=str, default="1") args = parser.parse_args()- 终端后台启动命令,按顺序执行命令
 
nohup python -u job.py > log.log 2>&1 &
pid=$!
wait $pid
nohup python -u job.py > log.log 2>&1 &
- 模型对比可视化显示
 
def create_image_with_texts_with_scale(image_path, texts, font_path, img_width=800):
    # Load image if available, otherwise create a blank image
    if image_path and os.path.exists(image_path):
        image = Image.open(image_path)
    else:
        image = Image.new('RGB', (400, 400), color='white')
    image = resize_image(image, img_width)
    print(image.size)
    font = ImageFont.truetype(font_path, 20)
    draw = ImageDraw.Draw(image)
    # Calculate text dimensions and wrap texts
    wrapped_texts = []
    for text in texts:
        wrapped_lines = [textwrap.fill(line, width=40) for line in text.splitlines()]
        # 将处理后的行重新组合为一个文本
        wrapped_text = "\n".join(wrapped_lines)
        wrapped_texts.append(wrapped_text)
    # text_sizes_new = []
    # for text in wrapped_texts:
    #     l, t, r, b = draw.textbbox((0, 0), text, font=font)
    #     text_sizes_new.append([r - l, b - t])
    # print("size: ")
    # print(text_sizes_new)
    text_sizes = [draw.textsize(text, font=font) for text in wrapped_texts]
    # print(text_sizes)
    max_text_height = max([size[1] for size in text_sizes])
    total_text_width = sum([size[0] for size in text_sizes]) + 10 * (len(wrapped_texts) - 1)
    new_image_height = max(image.height, max_text_height)
    new_image_width = image.width + total_text_width + 20  # 20 for padding
    new_image = Image.new('RGB', (new_image_width, new_image_height), color='white')
    new_image.paste(image, (0, 0))
    draw = ImageDraw.Draw(new_image)
    x_text = image.width + 10
    y_text = (new_image_height - max_text_height) // 2
    for text in wrapped_texts:
        draw.text((x_text, y_text), text, font=font, fill='black')
        x_text += draw.textsize(text, font=font)[0] + 10
    return new_image
    
   def save_images_to_pdf2(image_text_array, pdf_path, font_path):
    image_bytes_list = []
    for i, item in enumerate(image_text_array):
        image_path = item.get('image', None)
        texts = item.get('texts', [])
        img = create_image_with_texts_no_scale(image_path, texts, font_path)
        # img.save(f'./pdf_review/{i}.png')
        img = resize_image(img, 2156)
        image_bytes = io.BytesIO()
        img.save(image_bytes, format='JPEG')
        image_bytes_list.append(image_bytes.getvalue())
    with open(pdf_path, "wb") as f:
        f.write(img2pdf.convert(image_bytes_list))文件名添加r_len格式
def rename_with_r_len(json_name): df_t = pd.read_json(json_name, lines=True) new_name = json_name.rpartition('.')[0] + f"_r{len(df_t)}" + '.' + json_name.rpartition('.')[-1] os.rename(json_name, new_name)- git文件提交流程
 
# 标准格式处理(一个json一个文件夹)
def split_on_one_dir_json_data_to_every_dir(jsons):
    for json_name in jsons:
        basename = os.path.basename(json_name)
        new_name = os.path.join(os.path.dirname(json_name), basename.rpartition('.')[0], basename)
        if not os.path.exists(os.path.dirname(new_name)):
            os.makedirs(os.path.dirname(new_name))
        os.rename(json_name, new_name)
import glob
import os
jsons = glob.glob('/*.jsonl')
split_on_one_dir_json_data_to_every_dir(jsons)# readme.md 文件生成
content = '''
# readme内容
'''
def gen_readme_md_file(json_name):
    base_name = os.path.basename(json_name)
    subject = base_name.split('_')[1]
    df_t =  pd.read_json(json_name, lines=True)
    size = len(df_t)
    dir_name = os.path.dirname(json_name)
    fw = open(os.path.join(dir_name, 'readme.md'), 'w+', encoding='utf-8')
    fw.write(content.format(basename=base_name, date="20240809", subject=subject, size=size))
    fw.close()
import pandas as pd
import glob
import os
jsons = glob.glob('/*/*.jsonl')
for json_name in jsons:
    gen_readme_md_file(json_name)# ipynb文件生成
import nbformat as nbf
data_check_func_code = '''
import json
def check_format(data, verbose=False):
    # check valid turn
    if len(data['conversations']) < 2:
        return "invalid turn"
    # check contains height, width if contain image
    if 'image' not in data:
        pass
    else:
        if 'width' not in data or 'height' not in data:
            return "no height/width"
            
        if type(data['image']) is list:
            if type(data['width']) is not list or type(data['height']) is not list:
                return "no height/width"
            for x in data['width']:
                if x<=0:
                    return "no height/width"
            for x in data['height']:
                if x <= 0:
                    return "no height/width"
        elif type(data['image']) is str:
            if type(data['width']) is not int or type(data['height']) is not int or data['width'] <= 0 or data['height'] <=0:
                return "no height/width"
        else:
            return "unknown error"  
    
    # check no empty
    turn = 0
    num_image = 0
    for i, message in enumerate(data['conversations']):
        num_image += message['value'].count("<image>")
        # check is valid type
        if not isinstance(message['value'], str):
            return "invalid type"
        # check no empty
        if not (len(message['value']) > 0):
            return f"empty {message['from']} message"
        if message['from'] == 'human':
            if turn % 2 != 0:
                return "not conversation"
            turn += 1
        if message['from'] == 'gpt':
            if turn % 2 != 1:
                return "not conversation"
            turn += 1
    if 'image' not in data:
        pass
    elif type(data['image']) is list:
        if num_image != len(data['image']):
            return "wrong image number"
    elif type(data['image']) is str:
        if num_image != 1:
            print(data['image'])
            return "wrong image number"
    else:
        return "unknown error"      
    return None 
'''
data_check_run_code = '''
from datasets import load_dataset
from collections import defaultdict
def filter_dataset(dataset):
    new_dataset = []
    invalid_type2idx = defaultdict(list)
    for idx, data in enumerate(dataset):
        try:
            ret = check_format(data)
            if ret is None:
                new_dataset.append(data)
            else:
                invalid_type2idx[ret].append(idx)
        except:
            invalid_type2idx["unknown error"].append(idx)
    return new_dataset, invalid_type2idx
    
data_json = "{json_name}"
dataset = ds = load_dataset('json', data_files=data_json, split='train')
new_dataset, invalid_type2idx = filter_dataset(dataset)
'''
def gen_ipynb_file(json_name):
    basename = os.path.basename(json_name)
    dirname = os.path.dirname(json_name)
    fw = open(os.path.join(dirname, basename.rpartition('.')[0] + '.ipynb'), 'w+', encoding='utf-8')
    nb = nbf.v4.new_notebook()
    markdown_cell = nbf.v4.new_markdown_cell("### 数据检查")
    code_cell1 = nbf.v4.new_code_cell(data_check_func_code)
    code_cell2 = nbf.v4.new_code_cell(data_check_run_code.format(json_name=json_name))
    nb.cells.append(markdown_cell)
    nb.cells.append(code_cell1)
    nb.cells.append(code_cell2)
    nbf.write(nb, fw)
jsons = glob.glob('/*/*.jsonl')
for json_name in jsons:
    gen_ipynb_file(json_name)常用bash命令
# nvidia-smi watch -n 0.1 -d nvidia-smi
                            
                            
2 条评论
2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
新车首发,新的一年,只带想赚米的人coinsrore.com
新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
新车上路,只带前10个人coinsrore.com
新盘首开 新盘首开 征召客户!!!coinsrore.com
新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
新车即将上线 真正的项目,期待你的参与coinsrore.com
新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com
新项目准备上线,寻找志同道合的合作伙伴coinsrore.com