ai-business-write/check_remaining_fields.py

132 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
检查剩余的未处理字段并生成合适的field_code
"""
import os
import pymysql
import re
from typing import Dict, List
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def is_chinese(text: str) -> bool:
"""判断字符串是否包含中文字符"""
if not text:
return False
return bool(re.search(r'[\u4e00-\u9fff]', text))
def generate_field_code(field_name: str) -> str:
"""根据字段名称生成field_code"""
# 移除常见前缀
name = field_name.replace('被核查人员', 'target_').replace('被核查人', 'target_')
# 转换为小写并替换特殊字符
code = name.lower()
code = re.sub(r'[^\w\u4e00-\u9fff]', '_', code)
code = re.sub(r'_+', '_', code).strip('_')
# 如果还是中文,尝试更智能的转换
if is_chinese(code):
# 简单的拼音映射(这里只是示例,实际应该使用拼音库)
# 暂时使用更简单的规则
code = field_name.lower()
code = code.replace('被核查人员', 'target_')
code = code.replace('被核查人', 'target_')
code = code.replace('谈话', 'interview_')
code = code.replace('审批', 'approval_')
code = code.replace('核查', 'investigation_')
code = code.replace('人员', '')
code = code.replace('时间', '_time')
code = code.replace('地点', '_location')
code = code.replace('部门', '_department')
code = code.replace('姓名', '_name')
code = code.replace('号码', '_number')
code = code.replace('情况', '_situation')
code = code.replace('问题', '_issue')
code = code.replace('描述', '_description')
code = re.sub(r'[^\w]', '_', code)
code = re.sub(r'_+', '_', code).strip('_')
return code
def check_remaining_fields():
"""检查剩余的未处理字段"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("="*80)
print("检查剩余的未处理字段")
print("="*80)
# 查询所有包含中文field_code的字段
cursor.execute("""
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s AND (
filed_code REGEXP '[\\u4e00-\\u9fff]'
OR filed_code IS NULL
OR filed_code = ''
)
ORDER BY name
""", (TENANT_ID,))
fields = cursor.fetchall()
print(f"\n找到 {len(fields)} 个仍需要处理的字段:\n")
suggestions = []
for field in fields:
suggested_code = generate_field_code(field['name'])
suggestions.append({
'id': field['id'],
'name': field['name'],
'current_code': field['filed_code'],
'suggested_code': suggested_code,
'field_type': field['field_type']
})
print(f" ID: {field['id']}")
print(f" 名称: {field['name']}")
print(f" 当前field_code: {field['filed_code']}")
print(f" 建议field_code: {suggested_code}")
print(f" field_type: {field['field_type']}")
print()
# 询问是否更新
if suggestions:
print("="*80)
choice = input("是否更新这些字段的field_code(y/n默认n): ").strip().lower()
if choice == 'y':
print("\n开始更新...")
for sug in suggestions:
cursor.execute("""
UPDATE f_polic_field
SET filed_code = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s
""", (sug['suggested_code'], 655162080928945152, sug['id']))
print(f" ✓ 更新字段 ID {sug['id']}: {sug['name']} -> {sug['suggested_code']}")
conn.commit()
print("\n✓ 更新完成")
else:
print("未执行更新")
cursor.close()
conn.close()
if __name__ == '__main__':
check_remaining_fields()