
为 Python + On-Premise 环境设计的FHIR R5映射模板。这个模板将聚焦于一个常见的临床预测场景:“患者30天内再入院风险预测”,因为它需要整合患者基本信息、诊断和实验室观测等多维度数据,是展示FHIR强大关联能力的绝佳案例。
一个完整、注释清晰的 Python 脚本
fhir_mapping_template.py。这个脚本将演示两个关键转换步骤:
第一步:从类EHR表格数据(DataFrame) -> FHIR R5 JSON资源
模拟从医院信息系统(HIS)或数据仓库中导出的两张表:
patients.csv 和
encounters_diagnoses.csv。提供一个核心函数,将每一行表格数据转换为符合R5规范的
Patient,
Encounter, 和
Condition 资源。展示如何使用标准编码系统(如SNOMED-CT, LOINC)和本地标识符。
第二步:从FHIR R5 JSON资源 -> 机器学习特征向量(DataFrame)
提供核心函数,解析生成的FHIR JSON资源。演示如何“反解析”FHIR资源,提取并聚合特征(如计算年龄、统计特定诊断的数量),最终形成一个可直接用于模型训练的、扁平化的特征表。这个模板清晰地看到数据在标准化前后的形态,以及如何在其间进行双向转换。
fhir_mapping_template.py - 完整代码模板
# -*- coding: utf-8 -*-
"""
FHIR R5 映射模板:从EHR数据到FHIR资源,再到ML特征
本模板演示了在On-Premise环境下,如何使用Python(主要是Pandas库):
1. 将模拟的EHR表格数据转换为符合FHIR R5标准的JSON资源(Patient, Encounter, Condition)。
2. 将这些FHIR JSON资源解析、聚合,转换为可用于机器学习的特征DataFrame。
适用场景: 患者30天内再入院风险预测
环境: Python 3.8+, on-premise
依赖: pip install pandas
"""
import pandas as pd
import json
from datetime import datetime
# ==============================================================================
# 第一部分:模拟EHR数据源 (在实际项目中,请用 pd.read_sql 或 pd.read_csv 替换)
# ==============================================================================
def create_mock_ehr_data():
"""
创建模拟的EHR表格数据,模拟从数据库导出的CSV文件。
返回两个DataFrame: patients_df 和 encounters_df。
"""
print("🔧 步骤 1: 创建模拟的EHR数据...")
# 模拟患者基本信息表
patients_data = {
'local_patient_id': ['P001', 'P002', 'P003'],
'first_name': ['三', '四', '五'],
'last_name': ['张', '李', '王'],
'gender': ['male', 'female', 'male'],
'birth_date': ['1965-03-12', '1978-07-22', '1990-11-30'],
'ssn': ['111-22-3333', '222-33-4444', '333-44-5555']
}
patients_df = pd.DataFrame(patients_data)
# 模拟就诊和诊断信息表
encounters_data = {
'local_encounter_id': ['E001', 'E002', 'E003'],
'local_patient_id': ['P001', 'P001', 'P002'],
'encounter_start_date': ['2023-10-15', '2023-10-25', '2023-11-05'],
'encounter_end_date': ['2023-10-18', '2023-10-26', '2023-11-08'],
'diagnosis_code': ['440240005', '22298006', '386661006'], # 使用SNOMED-CT编码
'diagnosis_display': ['Myocardial Infarction', 'Myocardial Infarction', 'Fever']
}
encounters_df = pd.DataFrame(encounters_data)
print(" - 模拟患者数据 ({} 条):".format(len(patients_df)))
print(patients_df)
print("
- 模拟就诊/诊断数据 ({} 条):".format(len(encounters_df)))
print(encounters_df)
return patients_df, encounters_df
# ==============================================================================
# 第二部分:EHR数据 -> FHIR R5 资源
# ==============================================================================
def map_row_to_patient_fhir(row: pd.Series) -> dict:
"""
将患者数据行映射为FHIR R5 Patient资源。
"""
patient_resource = {
"resourceType": "Patient",
"identifier": [
{
"type": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/v2-0203",
"code": "MR",
"display": "Medical Record Number"
}
]
},
"system": "http://hospital.example/mrn",
"value": row['local_patient_id']
},
{
"type": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/v2-0203",
"code": "SS",
"display": "Social Security Number"
}
]
},
"system": "http://hl7.org/fhir/sid/us-ssn",
"value": row['ssn']
}
],
"name": [
{
"use": "official",
"family": row['last_name'],
"given": [row['first_name']]
}
],
"gender": row['gender'],
"birthDate": row['birth_date'],
# active: 默认假设患者档案是活跃的
"active": True
}
return patient_resource
def map_rows_to_encounter_condition_fhir(encounter_row: pd.Series, patient_reference: str) -> dict:
"""
将就诊和诊断行映射为FHIR Encounter和Condition资源,并打包进一个Bundle。
Encounter和Condition通过Patient reference链接。
"""
encounter_resource = {
"resourceType": "Encounter",
"id": encounter_row['local_encounter_id'], # 使用本地ID作为逻辑ID
"status": "finished",
"class": { "system": "http://terminology.hl7.org/CodeSystem/v3-ActCode", "code": "IMP" },
"subject": {"reference": patient_reference},
"period": {
"start": encounter_row['encounter_start_date'],
"end": encounter_row['encounter_end_date']
}
}
condition_resource = {
"resourceType": "Condition",
"verificationStatus": {"coding": [{"system": "http://terminology.hl7.org/CodeSystem/condition-ver-status", "code": "confirmed"}]},
"category": [{"coding": [{"system": "http://terminology.hl7.org/CodeSystem/condition-category", "code": "encounter-diagnosis", "display": "Encounter Diagnosis"}]}],
"code": {
"coding": [
{
"system": "http://snomed.info/sct",
"code": encounter_row['diagnosis_code'],
"display": encounter_row['diagnosis_display']
}
]
},
"subject": {"reference": patient_reference},
"encounter": {"reference": f"Encounter/{encounter_row['local_encounter_id']}"}
}
return encounter_resource, condition_resource
def convert_ehr_to_fhir_bundle(patients_df: pd.DataFrame, encounters_df: pd.DataFrame) -> dict:
"""
将整个EHR DataFrame转换为一个FHIR Bundle资源。
"""
print("
🔧 步骤 2: 将EHR数据转换为FHIR R5 Bundle...")
bundle = {
"resourceType": "Bundle",
"id": "bundle-example-ehr-to-fhir",
"type": "collection",
"entry": []
}
# 为保持患者和就诊的关联,先创建一个患者字典以便快速查找
patient_id_to_fhir_id_map = {}
# 1. 遍历并转换患者数据
for _, patient_row in patients_df.iterrows():
patient_fhir = map_row_to_patient_fhir(patient_row)
# 使用本地ID作为FHIR逻辑ID,方便后续引用
fhir_patient_id = f"patient-{patient_row['local_patient_id']}"
patient_fhir['id'] = fhir_patient_id
patient_id_to_fhir_id_map[patient_row['local_patient_id']] = fhir_patient_id
bundle["entry"].append({
"fullUrl": f"urn:uuid:{fhir_patient_id}",
"resource": patient_fhir
})
# 2. 遍历并转换就诊/诊断数据
for _, encounter_row in encounters_df.iterrows():
# 确保关联的患者存在
if encounter_row['local_patient_id'] in patient_id_to_fhir_id_map:
patient_ref = f"Patient/{patient_id_to_fhir_id_map[encounter_row['local_patient_id']]}"
encounter_fhir, condition_fhir = map_rows_to_encounter_condition_fhir(encounter_row, patient_ref)
bundle["entry"].extend([
{"fullUrl": f"urn:uuid:{encounter_fhir['id']}", "resource": encounter_fhir},
{"fullUrl": f"urn:uuid:condition-{encounter_fhir['id']}", "resource": condition_fhir}
])
print(" - FHIR Bundle 已生成,包含 {} 个资源。".format(len(bundle['entry'])))
# (可选) 将FHIR Bundle保存为JSON文件用于检查
# with open('output_fhir_bundle.json', 'w', encoding='utf-8') as f:
# json.dump(bundle, f, indent=2, ensure_ascii=False)
# print(" - 已保存到 'output_fhir_bundle.json'")
return bundle
# ==============================================================================
# 第三部分:FHIR R5 资源 -> ML 特征 DataFrame
# ==============================================================================
def extract_features_from_fhir_bundle(bundle: dict) -> pd.DataFrame:
"""
从FHIR Bundle中解析并提取特征,构建ML训练用的DataFrame。
这是联邦学习客户端在本地数据上进行特征工程的关键步骤。
"""
print("
🔧 步骤 3: 从FHIR Bundle中提取ML特征...")
# 1. 将Bundle中的资源按类型分类,方便处理
patients = {res['id']: res for entry in bundle['entry'] if res := entry.get('resource') if res.get('resourceType') == 'Patient'}
encounters = {res['id']: res for entry in bundle['entry'] if res := entry.get('resource') if res.get('resourceType') == 'Encounter'}
conditions = [res for entry in bundle['entry'] if res := entry.get('resource') if res.get('resourceType') == 'Condition']
# 2. 聚合特征
feature_rows = []
for patient_id, patient_res in patients.items():
features = {}
# --- 特征提取:从 Patient 资源 ---
features['patient_id'] = patient_res['identifier'][0]['value'] # 获取MRN
# 计算年龄 (这是一个非常有用的特征工程示例)
birth_year = int(patient_res['birthDate'][:4])
current_year = datetime.now().year
features['age'] = current_year - birth_year
features['gender'] = 1 if patient_res['gender'] == 'male' else 0 # 性别编码
# --- 特征提取:关联 Encounter 和 Condition 资源 ---
patient_encounters = [e for e in encounters.values() if e['subject']['reference'] == f"Patient/{patient_id}"]
# 特征: 总就诊次数
features['total_encounters'] = len(patient_encounters)
# 特征: 诊断计数 (以心梗为例)
mi_conditions = [c for c in conditions if c['subject']['reference'] == f"Patient/{patient_id}" and '440240005' in [coding['code'] for coding in c['code']['coding']]]
features['count_mi_diagnosis'] = len(mi_conditions)
# 特征: 是否有发热诊断
fever_conditions = [c for c in conditions if c['subject']['reference'] == f"Patient/{patient_id}" and '386661006' in [coding['code'] for coding in c['code']['coding']]]
features['has_fever_diagnosis'] = 1 if fever_conditions else 0
feature_rows.append(features)
ml_df = pd.DataFrame(feature_rows)
print(" - ML 特征 DataFrame 已创建:")
print(ml_df)
return ml_df
# ==============================================================================
# 第四部分:主执行流程
# ==============================================================================
if __name__ == '__main__':
"""
演示完整的EHR -> FHIR -> ML特征转换流程。
"""
print("🚀 开始 FHIR R5 映射模板演示...")
# 步骤1: 模拟或加载您的EHR数据
patients_df, encounters_df = create_mock_ehr_data()
# 步骤2: 将EHR数据转换为FHIR R5 Bundle
fhir_bundle = convert_ehr_to_fhir_bundle(patients_df, encounters_df)
# 步骤3: 从FHIR Bundle中提取机器学习特征
ml_features_df = extract_features_from_fhir_bundle(fhir_bundle)
# 最终输出: ml_features_df 可以直接用于后续的联邦学习客户端训练
print("
✅ 演示完成! 最终生成的ML特征 DataFrame 已准备好用于模型训练。")
fhir_mapping_template.py。安装依赖:确保Python环境已安装Pandas:
pip install pandas。运行演示:直接在终端运行
python fhir_mapping_template.py,看到从模拟数据到最终ML特征表的完整转换过程和打印结果。定制项目(最重要的步骤):
替换数据源:在
create_mock_ehr_data() 函数中,自己的数据加载逻辑替换掉模拟数据的创建。例如:
# 从数据库加载
import sqlalchemy
engine = sqlalchemy.create_engine('your-database-connection-string')
patients_df = pd.read_sql("SELECT * FROM patient_table", engine)
# 从CSV加载
encounters_df = pd.read_csv('path/to/your/encounters.csv')
调整映射逻辑:
修改
map_row_to_patient_fhir 和
map_rows_to_encounter_condition_fhir 函数,使其中的字段名(如
row['local_patient_id'])与真实表列名对应。根据Profile要求,添加或删除
identifier、
name、
address 等字段。关键:确保使用的编码系统(如SNOMED-CT, LOINC)与机构标准一致。
定制特征工程:在
extract_features_from_fhir_bundle 函数中,根据模型需求,添加、删除或修改特征。例如,如果有
Observation 资源(如实验室检查),可以编写逻辑来提取患者最近的肌酐值或平均血压。
$validate 请求,以确保生成的数据100%符合Profile定义,从源头保证数据质量。
# 伪代码示例
# validation_url = "https://your-fhir-server.example/fhir/Patient/$validate"
# response = requests.post(validation_url, headers=HEADERS, json=patient_fhir)
# response.raise_for_status()
依赖管理:将项目依赖(
pandas,
sqlalchemy等)写入
requirements.txt 文件,并使用虚拟环境(如venv或conda)来隔离项目环境,保证可复现性。
这个模板提供了一个坚实的起点。现在,可以将医院的真实数据“喂”给它,并观察它如何一步步变为标准、可用的FHIR资源和宝贵的模型特征。
当将此模板应用于实际数据后,如果遇到任何具体问题(例如,如何处理某个复杂的嵌套结构,或者如何映射某个特定的EHR字段),随时可以再来找我。我们可以一起优化它,让它完美适配需求!