智慧医疗:FHIR R5、联邦学习与MLOps三位一体的AI产品化实战指南(案例)

  • 时间:2025-11-20 20:53 作者: 来源: 阅读:0
  • 扫一扫,手机访问
摘要: 为 Python + On-Premise 环境设计的FHIR R5映射模板。这个模板将聚焦于一个常见的临床预测场景:“患者30天内再入院风险预测”,因为它需要整合患者基本信息、诊断和实验室观测等多维度数据,是展示FHIR强大关联能力的绝佳案例。 交付内容:FHIR R5 映射 Python 模板 一个完整、注释清晰的 Python 脚本 fhir_mapping_template.py。

Python + On-Premise 环境设计的FHIR R5映射模板。这个模板将聚焦于一个常见的临床预测场景:“患者30天内再入院风险预测”,因为它需要整合患者基本信息、诊断和实验室观测等多维度数据,是展示FHIR强大关联能力的绝佳案例。


交付内容:FHIR R5 映射 Python 模板

一个完整、注释清晰的 Python 脚本 fhir_mapping_template.py。这个脚本将演示两个关键转换步骤:

第一步:从类EHR表格数据(DataFrame) -> FHIR R5 JSON资源

模拟从医院信息系统(HIS)或数据仓库中导出的两张表: patients.csv encounters_diagnoses.csv。提供一个核心函数,将每一行表格数据转换为符合R5规范的 Patient, Encounter, 和 Condition 资源。展示如何使用标准编码系统(如SNOMED-CT, LOINC)和本地标识符。

第二步:从FHIR R5 JSON资源 -> 机器学习特征向量(DataFrame)

提供核心函数,解析生成的FHIR JSON资源。演示如何“反解析”FHIR资源,提取并聚合特征(如计算年龄、统计特定诊断的数量),最终形成一个可直接用于模型训练的、扁平化的特征表。

这个模板清晰地看到数据在标准化前后的形态,以及如何在其间进行双向转换。


fhir_mapping_template.py - 完整代码模板


# -*- coding: utf-8 -*-
"""
FHIR R5 映射模板:从EHR数据到FHIR资源,再到ML特征

本模板演示了在On-Premise环境下,如何使用Python(主要是Pandas库):
1. 将模拟的EHR表格数据转换为符合FHIR R5标准的JSON资源(Patient, Encounter, Condition)。
2. 将这些FHIR JSON资源解析、聚合,转换为可用于机器学习的特征DataFrame。

适用场景: 患者30天内再入院风险预测
环境: Python 3.8+, on-premise
依赖: pip install pandas
"""

import pandas as pd
import json
from datetime import datetime

# ==============================================================================
# 第一部分:模拟EHR数据源 (在实际项目中,请用 pd.read_sql 或 pd.read_csv 替换)
# ==============================================================================

def create_mock_ehr_data():
    """
    创建模拟的EHR表格数据,模拟从数据库导出的CSV文件。
    返回两个DataFrame: patients_df 和 encounters_df。
    """
    print("🔧 步骤 1: 创建模拟的EHR数据...")
    
    # 模拟患者基本信息表
    patients_data = {
        'local_patient_id': ['P001', 'P002', 'P003'],
        'first_name': ['三', '四', '五'],
        'last_name': ['张', '李', '王'],
        'gender': ['male', 'female', 'male'],
        'birth_date': ['1965-03-12', '1978-07-22', '1990-11-30'],
        'ssn': ['111-22-3333', '222-33-4444', '333-44-5555']
    }
    patients_df = pd.DataFrame(patients_data)

    # 模拟就诊和诊断信息表
    encounters_data = {
        'local_encounter_id': ['E001', 'E002', 'E003'],
        'local_patient_id': ['P001', 'P001', 'P002'],
        'encounter_start_date': ['2023-10-15', '2023-10-25', '2023-11-05'],
        'encounter_end_date': ['2023-10-18', '2023-10-26', '2023-11-08'],
        'diagnosis_code': ['440240005', '22298006', '386661006'], # 使用SNOMED-CT编码
        'diagnosis_display': ['Myocardial Infarction', 'Myocardial Infarction', 'Fever']
    }
    encounters_df = pd.DataFrame(encounters_data)
    
    print("   - 模拟患者数据 ({} 条):".format(len(patients_df)))
    print(patients_df)
    print("
   - 模拟就诊/诊断数据 ({} 条):".format(len(encounters_df)))
    print(encounters_df)
    
    return patients_df, encounters_df

# ==============================================================================
# 第二部分:EHR数据 -> FHIR R5 资源
# ==============================================================================

def map_row_to_patient_fhir(row: pd.Series) -> dict:
    """
    将患者数据行映射为FHIR R5 Patient资源。
    """
    patient_resource = {
        "resourceType": "Patient",
        "identifier": [
            {
                "type": {
                    "coding": [
                        {
                            "system": "http://terminology.hl7.org/CodeSystem/v2-0203",
                            "code": "MR",
                            "display": "Medical Record Number"
                        }
                    ]
                },
                "system": "http://hospital.example/mrn",
                "value": row['local_patient_id']
            },
            {
                "type": {
                    "coding": [
                        {
                            "system": "http://terminology.hl7.org/CodeSystem/v2-0203",
                            "code": "SS",
                            "display": "Social Security Number"
                        }
                    ]
                },
                "system": "http://hl7.org/fhir/sid/us-ssn",
                "value": row['ssn']
            }
        ],
        "name": [
            {
                "use": "official",
                "family": row['last_name'],
                "given": [row['first_name']]
            }
        ],
        "gender": row['gender'],
        "birthDate": row['birth_date'],
        # active: 默认假设患者档案是活跃的
        "active": True
    }
    return patient_resource

def map_rows_to_encounter_condition_fhir(encounter_row: pd.Series, patient_reference: str) -> dict:
    """
    将就诊和诊断行映射为FHIR Encounter和Condition资源,并打包进一个Bundle。
    Encounter和Condition通过Patient reference链接。
    """
    encounter_resource = {
        "resourceType": "Encounter",
        "id": encounter_row['local_encounter_id'], # 使用本地ID作为逻辑ID
        "status": "finished",
        "class": { "system": "http://terminology.hl7.org/CodeSystem/v3-ActCode", "code": "IMP" },
        "subject": {"reference": patient_reference},
        "period": {
            "start": encounter_row['encounter_start_date'],
            "end": encounter_row['encounter_end_date']
        }
    }

    condition_resource = {
        "resourceType": "Condition",
        "verificationStatus": {"coding": [{"system": "http://terminology.hl7.org/CodeSystem/condition-ver-status", "code": "confirmed"}]},
        "category": [{"coding": [{"system": "http://terminology.hl7.org/CodeSystem/condition-category", "code": "encounter-diagnosis", "display": "Encounter Diagnosis"}]}],
        "code": {
            "coding": [
                {
                    "system": "http://snomed.info/sct",
                    "code": encounter_row['diagnosis_code'],
                    "display": encounter_row['diagnosis_display']
                }
            ]
        },
        "subject": {"reference": patient_reference},
        "encounter": {"reference": f"Encounter/{encounter_row['local_encounter_id']}"}
    }
    
    return encounter_resource, condition_resource


def convert_ehr_to_fhir_bundle(patients_df: pd.DataFrame, encounters_df: pd.DataFrame) -> dict:
    """
    将整个EHR DataFrame转换为一个FHIR Bundle资源。
    """
    print("
🔧 步骤 2: 将EHR数据转换为FHIR R5 Bundle...")
    
    bundle = {
        "resourceType": "Bundle",
        "id": "bundle-example-ehr-to-fhir",
        "type": "collection",
        "entry": []
    }
    
    # 为保持患者和就诊的关联,先创建一个患者字典以便快速查找
    patient_id_to_fhir_id_map = {}
    
    # 1. 遍历并转换患者数据
    for _, patient_row in patients_df.iterrows():
        patient_fhir = map_row_to_patient_fhir(patient_row)
        # 使用本地ID作为FHIR逻辑ID,方便后续引用
        fhir_patient_id = f"patient-{patient_row['local_patient_id']}"
        patient_fhir['id'] = fhir_patient_id
        
        patient_id_to_fhir_id_map[patient_row['local_patient_id']] = fhir_patient_id
        
        bundle["entry"].append({
            "fullUrl": f"urn:uuid:{fhir_patient_id}",
            "resource": patient_fhir
        })

    # 2. 遍历并转换就诊/诊断数据
    for _, encounter_row in encounters_df.iterrows():
        # 确保关联的患者存在
        if encounter_row['local_patient_id'] in patient_id_to_fhir_id_map:
            patient_ref = f"Patient/{patient_id_to_fhir_id_map[encounter_row['local_patient_id']]}"
            
            encounter_fhir, condition_fhir = map_rows_to_encounter_condition_fhir(encounter_row, patient_ref)
            
            bundle["entry"].extend([
                {"fullUrl": f"urn:uuid:{encounter_fhir['id']}", "resource": encounter_fhir},
                {"fullUrl": f"urn:uuid:condition-{encounter_fhir['id']}", "resource": condition_fhir}
            ])
            
    print("   - FHIR Bundle 已生成,包含 {} 个资源。".format(len(bundle['entry'])))
    # (可选) 将FHIR Bundle保存为JSON文件用于检查
    # with open('output_fhir_bundle.json', 'w', encoding='utf-8') as f:
    #     json.dump(bundle, f, indent=2, ensure_ascii=False)
    # print("   - 已保存到 'output_fhir_bundle.json'")
        
    return bundle

# ==============================================================================
# 第三部分:FHIR R5 资源 -> ML 特征 DataFrame
# ==============================================================================

def extract_features_from_fhir_bundle(bundle: dict) -> pd.DataFrame:
    """
    从FHIR Bundle中解析并提取特征,构建ML训练用的DataFrame。
    这是联邦学习客户端在本地数据上进行特征工程的关键步骤。
    """
    print("
🔧 步骤 3: 从FHIR Bundle中提取ML特征...")
    
    # 1. 将Bundle中的资源按类型分类,方便处理
    patients = {res['id']: res for entry in bundle['entry'] if res := entry.get('resource') if res.get('resourceType') == 'Patient'}
    encounters = {res['id']: res for entry in bundle['entry'] if res := entry.get('resource') if res.get('resourceType') == 'Encounter'}
    conditions = [res for entry in bundle['entry'] if res := entry.get('resource') if res.get('resourceType') == 'Condition']
    
    # 2. 聚合特征
    feature_rows = []
    
    for patient_id, patient_res in patients.items():
        features = {}
        
        # --- 特征提取:从 Patient 资源 ---
        features['patient_id'] = patient_res['identifier'][0]['value'] # 获取MRN
        
        # 计算年龄 (这是一个非常有用的特征工程示例)
        birth_year = int(patient_res['birthDate'][:4])
        current_year = datetime.now().year
        features['age'] = current_year - birth_year
        
        features['gender'] = 1 if patient_res['gender'] == 'male' else 0 # 性别编码
        
        # --- 特征提取:关联 Encounter 和 Condition 资源 ---
        patient_encounters = [e for e in encounters.values() if e['subject']['reference'] == f"Patient/{patient_id}"]
        
        # 特征: 总就诊次数
        features['total_encounters'] = len(patient_encounters)
        
        # 特征: 诊断计数 (以心梗为例)
        mi_conditions = [c for c in conditions if c['subject']['reference'] == f"Patient/{patient_id}" and '440240005' in [coding['code'] for coding in c['code']['coding']]]
        features['count_mi_diagnosis'] = len(mi_conditions)
        
        # 特征: 是否有发热诊断
        fever_conditions = [c for c in conditions if c['subject']['reference'] == f"Patient/{patient_id}" and '386661006' in [coding['code'] for coding in c['code']['coding']]]
        features['has_fever_diagnosis'] = 1 if fever_conditions else 0
        
        feature_rows.append(features)

    ml_df = pd.DataFrame(feature_rows)
    print("   - ML 特征 DataFrame 已创建:")
    print(ml_df)
    
    return ml_df

# ==============================================================================
# 第四部分:主执行流程
# ==============================================================================

if __name__ == '__main__':
    """
    演示完整的EHR -> FHIR -> ML特征转换流程。
    """
    print("🚀 开始 FHIR R5 映射模板演示...")
    
    # 步骤1: 模拟或加载您的EHR数据
    patients_df, encounters_df = create_mock_ehr_data()
    
    # 步骤2: 将EHR数据转换为FHIR R5 Bundle
    fhir_bundle = convert_ehr_to_fhir_bundle(patients_df, encounters_df)
    
    # 步骤3: 从FHIR Bundle中提取机器学习特征
    ml_features_df = extract_features_from_fhir_bundle(fhir_bundle)
    
    # 最终输出: ml_features_df 可以直接用于后续的联邦学习客户端训练
    print("
✅ 演示完成! 最终生成的ML特征 DataFrame 已准备好用于模型训练。")


如何使用和定制这个模板

保存文件:将以上代码保存为 fhir_mapping_template.py安装依赖:确保Python环境已安装Pandas: pip install pandas运行演示:直接在终端运行 python fhir_mapping_template.py,看到从模拟数据到最终ML特征表的完整转换过程和打印结果。定制项目(最重要的步骤)替换数据源:在 create_mock_ehr_data() 函数中,自己的数据加载逻辑替换掉模拟数据的创建。例如:

# 从数据库加载
import sqlalchemy
engine = sqlalchemy.create_engine('your-database-connection-string')
patients_df = pd.read_sql("SELECT * FROM patient_table", engine)
# 从CSV加载
encounters_df = pd.read_csv('path/to/your/encounters.csv')
调整映射逻辑: 修改 map_row_to_patient_fhir map_rows_to_encounter_condition_fhir 函数,使其中的字段名(如 row['local_patient_id'])与真实表列名对应。根据Profile要求,添加或删除 identifier name address 等字段。关键:确保使用的编码系统(如SNOMED-CT, LOINC)与机构标准一致。 定制特征工程:在 extract_features_from_fhir_bundle 函数中,根据模型需求,添加、删除或修改特征。例如,如果有 Observation 资源(如实验室检查),可以编写逻辑来提取患者最近的肌酐值或平均血压。

On-Premise 生产环境建议

性能:如果处理的是百万级患者数据,使用Pandas可能会遇到内存瓶颈。可以考虑使用 DaskPolars 库,它们提供了与Pandas相似的API但具有更好的并行处理和内存管理能力。验证:在生产代码中,每当生成一个FHIR资源后,都应该向FHIR服务器发送一个 $validate 请求,以确保生成的数据100%符合Profile定义,从源头保证数据质量。

# 伪代码示例
# validation_url = "https://your-fhir-server.example/fhir/Patient/$validate"
# response = requests.post(validation_url, headers=HEADERS, json=patient_fhir)
# response.raise_for_status()
依赖管理:将项目依赖( pandas, sqlalchemy等)写入 requirements.txt 文件,并使用虚拟环境(如venv或conda)来隔离项目环境,保证可复现性。

这个模板提供了一个坚实的起点。现在,可以将医院的真实数据“喂”给它,并观察它如何一步步变为标准、可用的FHIR资源和宝贵的模型特征。

当将此模板应用于实际数据后,如果遇到任何具体问题(例如,如何处理某个复杂的嵌套结构,或者如何映射某个特定的EHR字段),随时可以再来找我。我们可以一起优化它,让它完美适配需求!

  • 全部评论(0)
手机二维码手机访问领取大礼包
返回顶部