ai-business-write/enable_all_fields.py

232 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
启用f_polic_field表中所有字段将state更新为1
"""
import pymysql
import os
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
UPDATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
def check_field_states(conn):
"""检查字段状态统计"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 统计各状态的字段数量使用CAST来正确处理二进制类型
sql = """
SELECT
CAST(state AS UNSIGNED) as state_int,
field_type,
COUNT(*) as count
FROM f_polic_field
WHERE tenant_id = %s
GROUP BY CAST(state AS UNSIGNED), field_type
ORDER BY field_type, CAST(state AS UNSIGNED)
"""
cursor.execute(sql, (TENANT_ID,))
stats = cursor.fetchall()
cursor.close()
return stats
def get_fields_by_state(conn, state):
"""获取指定状态的字段列表"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, CAST(state AS UNSIGNED) as state_int
FROM f_polic_field
WHERE tenant_id = %s
AND CAST(state AS UNSIGNED) = %s
ORDER BY field_type, name
"""
cursor.execute(sql, (TENANT_ID, state))
fields = cursor.fetchall()
cursor.close()
return fields
def enable_all_fields(conn, dry_run=True):
"""启用所有字段"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 查询需要更新的字段使用CAST来正确处理二进制类型
sql = """
SELECT id, name, filed_code, field_type, CAST(state AS UNSIGNED) as state_int
FROM f_polic_field
WHERE tenant_id = %s
AND CAST(state AS UNSIGNED) != 1
ORDER BY field_type, name
"""
cursor.execute(sql, (TENANT_ID,))
fields_to_update = cursor.fetchall()
if not fields_to_update:
print("✓ 所有字段已经是启用状态,无需更新")
cursor.close()
return 0
print(f"\n找到 {len(fields_to_update)} 个需要启用的字段:")
for field in fields_to_update:
field_type_str = "输出字段" if field['field_type'] == 2 else "输入字段"
print(f" - {field['name']} ({field['filed_code']}) [{field_type_str}] (当前state={field['state_int']})")
if dry_run:
print("\n⚠ 这是预览模式dry_run=True不会实际更新数据库")
cursor.close()
return len(fields_to_update)
# 执行更新使用CAST来正确比较
update_sql = """
UPDATE f_polic_field
SET state = 1, updated_time = %s, updated_by = %s
WHERE tenant_id = %s
AND CAST(state AS UNSIGNED) != 1
"""
cursor.execute(update_sql, (CURRENT_TIME, UPDATED_BY, TENANT_ID))
updated_count = cursor.rowcount
conn.commit()
cursor.close()
return updated_count
def main():
"""主函数"""
print("="*80)
print("启用f_polic_field表中所有字段")
print("="*80)
print()
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功")
except Exception as e:
print(f"✗ 数据库连接失败: {str(e)}")
return
try:
# 1. 检查当前状态统计
print("\n正在检查字段状态统计...")
stats = check_field_states(conn)
print("\n字段状态统计:")
total_fields = 0
enabled_fields = 0
disabled_fields = 0
for stat in stats:
state_int = stat['state_int']
field_type = stat['field_type']
count = stat['count']
total_fields += count
state_str = "启用" if state_int == 1 else "未启用"
type_str = "输出字段" if field_type == 2 else "输入字段"
print(f" {type_str} - {state_str} (state={state_int}): {count}")
if state_int == 1:
enabled_fields += count
else:
disabled_fields += count
print(f"\n总计: {total_fields} 个字段")
print(f" 启用: {enabled_fields}")
print(f" 未启用: {disabled_fields}")
# 2. 显示未启用的字段详情
if disabled_fields > 0:
print(f"\n正在查询未启用的字段详情...")
disabled_fields_list = get_fields_by_state(conn, 0)
print(f"\n未启用的字段列表 ({len(disabled_fields_list)} 个):")
for field in disabled_fields_list:
field_type_str = "输出字段" if field['field_type'] == 2 else "输入字段"
print(f" - {field['name']} ({field['filed_code']}) [{field_type_str}]")
# 3. 预览更新dry_run
print("\n" + "="*80)
print("预览更新(不会实际修改数据库)")
print("="*80)
count_to_update = enable_all_fields(conn, dry_run=True)
if count_to_update == 0:
print("\n所有字段已经是启用状态,无需更新")
return
# 4. 确认是否执行更新
print("\n" + "="*80)
print("准备执行更新")
print("="*80)
print(f"将更新 {count_to_update} 个字段的状态为启用state=1")
# 实际执行更新
print("\n正在执行更新...")
updated_count = enable_all_fields(conn, dry_run=False)
print(f"\n✓ 更新成功!共更新 {updated_count} 个字段")
# 5. 验证更新结果
print("\n正在验证更新结果...")
final_stats = check_field_states(conn)
print("\n更新后的字段状态统计:")
final_enabled = 0
final_disabled = 0
for stat in final_stats:
state_int = stat['state_int']
field_type = stat['field_type']
count = stat['count']
state_str = "启用" if state_int == 1 else "未启用"
type_str = "输出字段" if field_type == 2 else "输入字段"
print(f" {type_str} - {state_str} (state={state_int}): {count}")
if state_int == 1:
final_enabled += count
else:
final_disabled += count
print(f"\n总计: {final_enabled + final_disabled} 个字段")
print(f" 启用: {final_enabled}")
print(f" 未启用: {final_disabled}")
if final_disabled == 0:
print("\n✓ 所有字段已成功启用!")
else:
print(f"\n⚠ 仍有 {final_disabled} 个字段未启用")
print("\n" + "="*80)
print("操作完成!")
print("="*80)
except Exception as e:
print(f"\n✗ 处理失败: {str(e)}")
import traceback
traceback.print_exc()
conn.rollback()
finally:
conn.close()
if __name__ == '__main__':
main()