ai-business-write/init_all_fields_from_excel.py

381 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
从Excel解析结果初始化所有字段到数据库
基于parsed_fields.json文件初始化所有模板和字段
"""
import pymysql
import json
from datetime import datetime
from pathlib import Path
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 固定值
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
# 字段名称到字段编码的映射(用于将中文名称转换为标准编码)
FIELD_NAME_TO_CODE_MAP = {
# 输入字段
'线索信息': 'clue_info',
'被核查人员工作基本情况线索': 'target_basic_info_clue',
# 输出字段 - 基本信息
'被核查人姓名': 'target_name',
'被核查人员单位及职务': 'target_organization_and_position',
'被核查人员性别': 'target_gender',
'被核查人员出生年月': 'target_date_of_birth',
'被核查人员出生年月日': 'target_date_of_birth_full',
'被核查人员政治面貌': 'target_political_status',
'被核查人员职级': 'target_professional_rank',
# 输出字段 - 其他信息
'线索来源': 'clue_source',
'主要问题线索': 'target_issue_description',
'初步核实审批表承办部门意见': 'department_opinion',
'初步核实审批表填表人': 'filler_name',
# 其他常用字段根据Excel数据补充
'请示报告卡请示时间': 'report_card_request_time',
'被核查人员身份证件及号码': 'target_id_number',
'被核查人员身份证号': 'target_id_number',
'应到时间': 'appointment_time',
'应到地点': 'appointment_location',
'批准时间': 'approval_time',
'承办部门': 'handling_department',
'承办人': 'handler_name',
'谈话通知时间': 'notification_time',
'谈话通知地点': 'notification_location',
'被核查人员住址': 'target_address',
'被核查人员户籍住址': 'target_registered_address',
'被核查人员联系方式': 'target_contact',
'被核查人员籍贯': 'target_place_of_origin',
'被核查人员民族': 'target_ethnicity',
}
def generate_id():
"""生成ID使用时间戳+随机数的方式,模拟雪花算法)"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def get_field_code_from_name(field_name: str, default_prefix: str = 'field') -> str:
"""
根据字段名称获取字段编码
Args:
field_name: 字段名称(中文)
default_prefix: 默认前缀(如果找不到映射,使用此前缀)
Returns:
字段编码
"""
# 先查映射表
if field_name in FIELD_NAME_TO_CODE_MAP:
return FIELD_NAME_TO_CODE_MAP[field_name]
# 如果没有映射,尝试使用字段名称本身(如果已经是英文编码)
if field_name and not any('\u4e00' <= char <= '\u9fff' for char in field_name):
# 如果字段名称中不包含中文字符,直接使用
return field_name.lower().replace(' ', '_')
# 如果包含中文,转换为拼音或者使用默认格式
# 这里简化处理,直接使用字段名称(实际应该转换为拼音)
return field_name
def load_parsed_fields():
"""加载解析的字段数据"""
json_file = Path(__file__).parent / 'parsed_fields.json'
if not json_file.exists():
raise FileNotFoundError(f"解析结果文件不存在: {json_file}")
with open(json_file, 'r', encoding='utf-8') as f:
return json.load(f)
def init_all_fields(conn, parsed_data):
"""
初始化所有字段(输入字段和输出字段)
Returns:
字段编码到字段ID的映射字典
"""
cursor = conn.cursor()
field_map = {} # field_code -> field_id
print("="*60)
print("开始初始化所有字段...")
print("="*60)
# 收集所有唯一的字段
all_fields = {}
# 收集输入字段
for template_name, template_info in parsed_data.items():
for input_field in template_info.get('input_fields', []):
field_name = input_field['name']
field_code = input_field.get('field_code') or get_field_code_from_name(field_name)
if field_code not in all_fields:
all_fields[field_code] = {
'name': field_name,
'field_code': field_code,
'field_type': 1 # 输入字段
}
# 收集输出字段
for template_name, template_info in parsed_data.items():
for output_field in template_info.get('output_fields', []):
field_name = output_field['name']
field_code = output_field.get('field_code') or get_field_code_from_name(field_name)
if field_code not in all_fields:
all_fields[field_code] = {
'name': field_name,
'field_code': field_code,
'field_type': 2 # 输出字段
}
elif all_fields[field_code]['field_type'] == 1:
# 如果同一个字段编码既出现在输入字段也出现在输出字段,优先作为输出字段
all_fields[field_code]['field_type'] = 2
print(f"\n共找到 {len(all_fields)} 个唯一字段")
# 按字段类型分组
input_fields = [f for f in all_fields.values() if f['field_type'] == 1]
output_fields = [f for f in all_fields.values() if f['field_type'] == 2]
print(f" 输入字段: {len(input_fields)}")
print(f" 输出字段: {len(output_fields)}")
# 初始化字段
for field_code, field_info in all_fields.items():
# 检查字段是否已存在
check_sql = """
SELECT id FROM f_polic_field
WHERE tenant_id = %s AND filed_code = %s
"""
cursor.execute(check_sql, (TENANT_ID, field_code))
existing = cursor.fetchone()
if existing:
field_id = existing[0]
print(f" 字段 '{field_info['name']}' (code: {field_code}) 已存在ID: {field_id}")
else:
field_id = generate_id()
insert_sql = """
INSERT INTO f_polic_field
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
field_id,
TENANT_ID,
field_info['name'],
field_code,
field_info['field_type'],
CURRENT_TIME,
CREATED_BY,
CURRENT_TIME,
CREATED_BY,
1 # state: 1表示启用
))
field_type_str = "输入字段" if field_info['field_type'] == 1 else "输出字段"
print(f" ✓ 创建{field_type_str}: {field_info['name']} (code: {field_code}), ID: {field_id}")
field_map[field_code] = field_id
conn.commit()
return field_map
def init_file_configs(conn, parsed_data):
"""
初始化所有文件配置
Returns:
文件配置名称到文件配置ID的映射字典
"""
cursor = conn.cursor()
config_map = {} # template_name -> file_config_id
print("\n" + "="*60)
print("开始初始化文件配置...")
print("="*60)
for template_name, template_info in parsed_data.items():
template_code = template_info.get('template_code', template_name)
# 检查文件配置是否已存在
check_sql = """
SELECT id, template_code FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s
"""
cursor.execute(check_sql, (TENANT_ID, template_name))
existing = cursor.fetchone()
if existing:
file_config_id = existing[0]
existing_template_code = existing[1]
# 如果已存在但template_code不同更新它
if existing_template_code != template_code:
update_sql = """
UPDATE f_polic_file_config
SET template_code = %s, updated_time = %s, updated_by = %s
WHERE id = %s
"""
cursor.execute(update_sql, (template_code, CURRENT_TIME, CREATED_BY, file_config_id))
conn.commit()
print(f" ✓ 更新文件配置 '{template_name}' 的template_code: {existing_template_code} -> {template_code}")
print(f" 文件配置 '{template_name}' 已存在ID: {file_config_id}, template_code: {template_code}")
else:
file_config_id = generate_id()
# 注意不再写入input_data字段只写入template_code
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, template_code, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 默认文件路径(可以根据实际情况调整)
default_file_path = f"/templates/{template_name}.docx"
cursor.execute(insert_sql, (
file_config_id,
TENANT_ID,
None, # parent_id
template_name,
template_code,
default_file_path,
CURRENT_TIME,
CREATED_BY,
CURRENT_TIME,
CREATED_BY,
1 # state: 1表示启用
))
print(f" ✓ 创建文件配置: {template_name} (template_code: {template_code}), ID: {file_config_id}")
config_map[template_name] = file_config_id
conn.commit()
return config_map
def init_file_field_relations(conn, parsed_data, field_map, config_map):
"""初始化文件和字段的关联关系(只关联输出字段)"""
cursor = conn.cursor()
print("\n" + "="*60)
print("开始初始化文件和字段的关联关系...")
print("="*60)
for template_name, template_info in parsed_data.items():
file_config_id = config_map.get(template_name)
if not file_config_id:
print(f" 警告: 模板 '{template_name}' 的文件配置不存在,跳过")
continue
output_fields = template_info.get('output_fields', [])
print(f"\n 处理模板: {template_name} ({len(output_fields)} 个输出字段)")
for output_field in output_fields:
field_name = output_field['name']
field_code = output_field.get('field_code') or get_field_code_from_name(field_name)
field_id = field_map.get(field_code)
if not field_id:
print(f" 警告: 字段 '{field_name}' (code: {field_code}) 不存在,跳过")
continue
# 检查关联关系是否已存在
check_sql = """
SELECT id FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s AND filed_id = %s
"""
cursor.execute(check_sql, (TENANT_ID, file_config_id, field_id))
existing = cursor.fetchone()
if existing:
print(f" 关联关系已存在: {field_name}")
else:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
relation_id,
TENANT_ID,
file_config_id,
field_id,
CURRENT_TIME,
CREATED_BY,
CURRENT_TIME,
CREATED_BY,
1 # state: 1表示启用
))
print(f" ✓ 创建关联: {field_name} (code: {field_code})")
conn.commit()
def main():
"""主函数"""
try:
# 加载解析的字段数据
print("="*60)
print("加载Excel解析结果...")
print("="*60)
parsed_data = load_parsed_fields()
print(f"✓ 成功加载 {len(parsed_data)} 个模板的数据")
# 连接数据库
print("\n" + "="*60)
print("连接数据库...")
print("="*60)
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功")
try:
# 1. 初始化所有字段
field_map = init_all_fields(conn, parsed_data)
# 2. 初始化文件配置
config_map = init_file_configs(conn, parsed_data)
# 3. 初始化关联关系
init_file_field_relations(conn, parsed_data, field_map, config_map)
print("\n" + "="*60)
print("✓ 所有数据初始化完成!")
print("="*60)
finally:
conn.close()
except Exception as e:
print(f"\n错误: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()