""" 检查剩余的未处理字段,并生成合适的field_code """ import os import pymysql import re from typing import Dict, List # 数据库连接配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 def is_chinese(text: str) -> bool: """判断字符串是否包含中文字符""" if not text: return False return bool(re.search(r'[\u4e00-\u9fff]', text)) def generate_field_code(field_name: str) -> str: """根据字段名称生成field_code""" # 移除常见前缀 name = field_name.replace('被核查人员', 'target_').replace('被核查人', 'target_') # 转换为小写并替换特殊字符 code = name.lower() code = re.sub(r'[^\w\u4e00-\u9fff]', '_', code) code = re.sub(r'_+', '_', code).strip('_') # 如果还是中文,尝试更智能的转换 if is_chinese(code): # 简单的拼音映射(这里只是示例,实际应该使用拼音库) # 暂时使用更简单的规则 code = field_name.lower() code = code.replace('被核查人员', 'target_') code = code.replace('被核查人', 'target_') code = code.replace('谈话', 'interview_') code = code.replace('审批', 'approval_') code = code.replace('核查', 'investigation_') code = code.replace('人员', '') code = code.replace('时间', '_time') code = code.replace('地点', '_location') code = code.replace('部门', '_department') code = code.replace('姓名', '_name') code = code.replace('号码', '_number') code = code.replace('情况', '_situation') code = code.replace('问题', '_issue') code = code.replace('描述', '_description') code = re.sub(r'[^\w]', '_', code) code = re.sub(r'_+', '_', code).strip('_') return code def check_remaining_fields(): """检查剩余的未处理字段""" conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor(pymysql.cursors.DictCursor) print("="*80) print("检查剩余的未处理字段") print("="*80) # 查询所有包含中文field_code的字段 cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s AND ( filed_code REGEXP '[\\u4e00-\\u9fff]' OR filed_code IS NULL OR filed_code = '' ) ORDER BY name """, (TENANT_ID,)) fields = cursor.fetchall() print(f"\n找到 {len(fields)} 个仍需要处理的字段:\n") suggestions = [] for field in fields: suggested_code = generate_field_code(field['name']) suggestions.append({ 'id': field['id'], 'name': field['name'], 'current_code': field['filed_code'], 'suggested_code': suggested_code, 'field_type': field['field_type'] }) print(f" ID: {field['id']}") print(f" 名称: {field['name']}") print(f" 当前field_code: {field['filed_code']}") print(f" 建议field_code: {suggested_code}") print(f" field_type: {field['field_type']}") print() # 询问是否更新 if suggestions: print("="*80) choice = input("是否更新这些字段的field_code?(y/n,默认n): ").strip().lower() if choice == 'y': print("\n开始更新...") for sug in suggestions: cursor.execute(""" UPDATE f_polic_field SET filed_code = %s, updated_time = NOW(), updated_by = %s WHERE id = %s """, (sug['suggested_code'], 655162080928945152, sug['id'])) print(f" ✓ 更新字段 ID {sug['id']}: {sug['name']} -> {sug['suggested_code']}") conn.commit() print("\n✓ 更新完成") else: print("未执行更新") cursor.close() conn.close() if __name__ == '__main__': check_remaining_fields()