Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions dingo/exec/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ def evaluate_single_data(self, dingo_id: str, eval_fields: dict, eval_type: str,
model_cls = Model.rule_name_map.get(e_c_i.name)
model = model_cls() # 实例化类为对象,避免多线程配置覆盖
Model.set_config_rule(model, e_c_i.config)
if getattr(model_cls, "__module__", "").startswith("dingo.model.rule.scibase."):
if "dynamic_config" not in model.__dict__:
model.dynamic_config = model.dynamic_config.model_copy(deep=True)
if model.dynamic_config.parameters is None:
model.dynamic_config.parameters = {}
model.dynamic_config.parameters.setdefault(
"_dingo_dataset_sql_config",
self.input_args.dataset.sql_config.model_dump(),
)
model.dynamic_config.parameters.setdefault(
"_dingo_dataset_s3_config",
self.input_args.dataset.s3_config.model_dump(),
)
model.dynamic_config.parameters.setdefault("_dingo_dataset_source", self.input_args.dataset.source)
model.dynamic_config.parameters.setdefault("_dingo_dataset_format", self.input_args.dataset.format)
model.dynamic_config.parameters.setdefault("_dingo_input_path", self.input_args.input_path)
setattr(model_cls, "dynamic_config", model.dynamic_config)
model = model_cls
Comment on lines +179 to +196

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Modifying the class-level dynamic_config attribute globally via setattr(model_cls, "dynamic_config", model.dynamic_config) and then assigning the class back to model (model = model_cls) is extremely dangerous in concurrent or multi-threaded environments. This completely defeats the purpose of instantiating the class to avoid multi-thread config overwrite (as explicitly noted in the comment on line 177).\n\nIf multiple threads or concurrent tasks evaluate the same SciBase rules with different configurations, they will overwrite each other's dynamic_config class attribute, leading to severe race conditions and incorrect evaluations.\n\nRecommended Fix:\nInstead of using @classmethod for eval in the SciBase rules and hacking the executor to modify the class globally, define eval as a standard instance method (i.e., def eval(self, input_data: Data)) in all SciBase rule classes. This allows Model.set_config_rule(model, e_c_i.config) to set the configuration on the instance model safely and thread-safely, without any class-level modifications.

elif eval_type == 'llm':
model_cls = Model.llm_name_map.get(e_c_i.name)
model = model_cls()
Expand Down
26 changes: 24 additions & 2 deletions dingo/exec/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,30 @@ def evaluate_item(self, eval_fields: dict, eval_type: str, map_data: dict, eval_

for e_c_i in eval_list:
if eval_type == 'rule':
model = Model.rule_name_map.get(e_c_i.name)
Model.set_config_rule(model, e_c_i.config)
model_cls = Model.rule_name_map.get(e_c_i.name)
if getattr(model_cls, "__module__", "").startswith("dingo.model.rule.scibase."):
model = model_cls()
Model.set_config_rule(model, e_c_i.config)
if "dynamic_config" not in model.__dict__:
model.dynamic_config = model.dynamic_config.model_copy(deep=True)
if model.dynamic_config.parameters is None:
model.dynamic_config.parameters = {}
model.dynamic_config.parameters.setdefault(
"_dingo_dataset_sql_config",
self.input_args.dataset.sql_config.model_dump(),
)
model.dynamic_config.parameters.setdefault(
"_dingo_dataset_s3_config",
self.input_args.dataset.s3_config.model_dump(),
)
model.dynamic_config.parameters.setdefault("_dingo_dataset_source", self.input_args.dataset.source)
model.dynamic_config.parameters.setdefault("_dingo_dataset_format", self.input_args.dataset.format)
model.dynamic_config.parameters.setdefault("_dingo_input_path", self.input_args.input_path)
setattr(model_cls, "dynamic_config", model.dynamic_config)
model = model_cls
Comment on lines +240 to +259

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Modifying the class-level dynamic_config globally via setattr(model_cls, "dynamic_config", model.dynamic_config) is not thread-safe and will cause race conditions when running concurrent Spark tasks with different configurations.\n\nRecommended Fix:\nDefine eval as an instance method in the SciBase rules and keep the configuration isolated at the instance level, avoiding class-level modifications.

else:
model = model_cls
Model.set_config_rule(model, e_c_i.config)
elif eval_type == 'llm':
model = Model.llm_name_map.get(e_c_i.name)
Model.set_config_llm(model, e_c_i.config)
Expand Down
24 changes: 15 additions & 9 deletions dingo/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,22 @@ def load_model(cls):
if cls.module_loaded:
return
this_module_directory = os.path.dirname(os.path.abspath(__file__))
# rule auto register
for file in os.listdir(os.path.join(this_module_directory, "rule")):
path = os.path.join(this_module_directory, "rule", file)
if (
os.path.isfile(path)
and file.endswith(".py")
and not file == "__init__.py"
):
# rule auto register - recursively scan subdirectories
rule_base_dir = os.path.join(this_module_directory, "rule")
for root, dirs, files in os.walk(rule_base_dir):
dirs[:] = [d for d in dirs if d != "__pycache__"]

for file in files:
if not file.endswith(".py") or file == "__init__.py":
continue
rel_path = os.path.relpath(root, rule_base_dir)
if rel_path == ".":
module_name = f"dingo.model.rule.{file[:-3]}"
else:
rel_module = rel_path.replace(os.sep, ".")
module_name = f"dingo.model.rule.{rel_module}.{file[:-3]}"
try:
importlib.import_module("dingo.model.rule." + file.split(".")[0])
importlib.import_module(module_name)
except ModuleNotFoundError as e:
log.debug(e)

Expand Down
2 changes: 1 addition & 1 deletion dingo/model/rule/scibase/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Quanliang/scibase rule implementations."""
"""SciBase QA rule implementations."""
21 changes: 21 additions & 0 deletions dingo/model/rule/scibase/assets/ebook_unique_mapping.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
字段名,数据类型,聚合策略,策略参数,源字段名,去重 / 聚合处理逻辑
isbns,array<string>,isbn_normalize,,,数组聚合,全部转换为13位ISBN格式,10位前面加978,13位保留,其他丢弃,全局去重
isbn13,string,isbn_min,,,唯一去重键;从isbns数组中取最小的归一化13位ISBN
title,string,freq_lex_max,min_len=2;max_len=1000,,在非空值里取词频最高;剔除长度<2或>1000极值;词频相同取字典序最大值
abstract,string,freq_lex_max,min_len=10;max_len=10000,,在非空值里取词频最高;剔除长度<10或>10000极值;词频相同取字典序最大值
language,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
type,array<string>,dedup_array,lower=true,,统一小写后数组聚合去重
author,array<string>,dedup_array,,,数组聚合去重
contributors,array<string>,dedup_array,,,数组聚合去重
indexed_in,array<string>,dedup_array,,,数组聚合去重
identifiers,"map<string,string>",merge_map,,,"key去重,相同key取max(value)"
publication_publisher,array<string>,dedup_array,,publisher,数组聚合去重;原字段名称publisher
publication_published_year,int,freq_int_max,min_val=1000;max_val=CURRENT_YEAR;extract_year=true,published_year,在非空值里取词频最高;剔除<1000或>当年极值;词频相同取最大值
publication_published_place,array<string>,dedup_array,,published_place,数组聚合去重;原字段名称published_place
publication_published_country,array<string>,dedup_array,,published_country,数组聚合去重;原字段名称published_country
publication_pages,int,max_int,,pages,取本书多版本中的最大页数
subjects,array<string>,dedup_array,,,数组聚合去重
genre,array<string>,dedup_array,,,数组聚合去重
category,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
access_oa_url,array<string>,dedup_array,,oa_url,数组聚合去重;原字段名称oa_url
dt,string,latest_dt,,,保留最新分区日期
61 changes: 61 additions & 0 deletions dingo/model/rule/scibase/assets/osi_arxiv_mapping.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
预期字段,arxiv对应字段,字段值数据类型
track_id,数仓自己赋予,String
title,title,String
abstract,abstract,String
language,无,String
doi,doi;在 doi 为空时,使用"10.48550/arxiv."拼接doc_id,String
type,无,List[string]
author,author 解析为作者数组(字符串拆分),List[string]
identifiers,oaiId->oai_identifier、"arxivId->paper_id去掉http前缀",Object
indexed_in,新增字符串"arxiv",List[string]
published_date,updated,String
published_year,updated中年份,s3是RFC 1123 时间格式,需要转化为 yyyy-mm-dd 格式和db做对比,Integer
venue,,Object
venue.name,journal_ref :从 journal_ref 解析期刊/会议名(后续处理),String
venue.type,无,String
venue.issn,无,List[string]
venue.publisher,无,List[string]
venue.biblio,,Object
venue.biblio.volume,从 journal_ref解析(后续处理),String
venue.biblio.issue,从 journal_ref解析(后续处理),String
venue.biblio.pages,从 journal_ref解析(后续处理),String
access_is_oa,布尔值true,String
access_oa_status,空字符串,String
access_oa_url,pdf_url(get_pdf=0 时为""),String
access_license,license_url 将协议链接映射为对应可选值填入,String
keywords,无,List[string]
fieldsOfStudy,无,List[object]
s2FieldsOfStudy,无,List[object]
primary_topic,无,Object
topics,无,List[object]
concepts,无,List[object]
subject,无,String
major,无,String
major_2,无,String
major_3,无,String
category,无,String
area,无,String
grade_class,无,String
grade,无,String
origin_id,doc_id,String
origin_osi,取值"arxiv",String
origin_db_source,无,String
reference_count,无,Integer
citation_count,无,Integer
influential_citation_count,无,Integer
references,无,List[string]
related_works,无,List[string]
citation_normalized_percentile,无,Object
cited_by_percentile_year,无,Object
fwci,无,Float
cited_by_api_url,无,String
locations,,List[object]
locations.type,对pdf_url来说,get_pdf值为1时,值为download,get_pdf值为0时,保持空字符串;对source_url来说,get_source值为1时值为download,get_source值为0时保持空字符串。,String
locations.url,"pdf_url,source_url",String
locations.license,license_url 将协议链接映射为对应可选值,String
locations.is_oa,对pdf_url来说,get_pdf值为1时,值为true,get_pdf值为0时为false;对source_url来说,get_source值为1时值为true,get_source值为0时为false。,String
classifications,,Object
mesh,无,List[object]
msc_class,msc_class,String
acm_class,acm_class,String
arxiv_category,category,List[string]
42 changes: 42 additions & 0 deletions dingo/model/rule/scibase/assets/paper_unique_mapping.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
字段名,数据类型,聚合策略,策略参数,源字段名,去重 / 聚合处理逻辑
doi,string,key_lower,,,唯一去重键,精确匹配,统一小写
identifiers,"map<string,string>",merge_identifiers,,,"MAP聚合,key去重,doi/DOI/mag/MAG小写后与origin_osi拼接,相同key取max(value)"
indexed_in,array<string>,dedup_array,,,数组聚合并去重
type,array<string>,dedup_array,lower=true,,统一小写后数组聚合去重
title,string,freq_lex_max,min_len=2;max_len=1000,,在非空值里取词频最高;剔除长度<2或>1000极值;词频相同取字典序最大值
abstract,string,freq_lex_max,min_len=10;max_len=10000,,在非空值里取词频最高;剔除长度<10或>10000极值;词频相同取字典序最大值
author,"array<struct<name:string,orcid:string>>",dedup_struct,,,数组聚合去重
language,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
published_year,int,freq_int_max,min_val=1000;max_val=CURRENT_YEAR,,在非空值里取词频最高;剔除<1000或>当年极值;词频相同取最大值
published_date,string,freq_date,,,在非空值里取词频最高的出版日期;剔除年份异常值;词频相同取字典序最大值
venue_name,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
venue_type,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
venue_issn,array<string>,dedup_array,,,数组聚合去重
venue_publisher,array<string>,dedup_array,,,数组聚合去重
access_license,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
biblio_volume,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
biblio_issue,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
biblio_pages,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
access_is_oa,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
access_oa_status,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
access_oa_url,array<string>,dedup_array,,,数组聚合去重
locations,"array<struct<type:string,url:string,license:string,is_oa:string>>",dedup_locations,,,"STRUCT转成STRING再去重,key已排序"
keywords,array<string>,dedup_array,,,数组聚合去重
fieldsOfStudy,"array<map<string,string>>",dedup_map,,,MAP转成STRING再去重
s2fieldsofstudy,"array<map<string,string>>",dedup_map,,,MAP转成STRING再去重
primary_topic,"STRUCT<id:STRING,display_name:STRING,score:DECIMAL(10,4),subfield:STRUCT<id:STRING,display_name:STRING>,field:STRUCT<id:STRING,display_name:STRING>,domain:STRUCT<id:STRING,display_name:STRING>>",freq_struct,,,在非空值里取词频最高;词频相同取字典序最大值
topics,"ARRAY<STRUCT<id:STRING,display_name:STRING,score:DECIMAL(10,4),subfield:STRUCT<id:STRING,display_name:STRING>,field:STRUCT<id:STRING,display_name:STRING>,domain:STRUCT<id:STRING,display_name:STRING>>>",dedup_struct,,,STRUCT转成STRING再去重
concepts,"array<map<string,string>>",dedup_map,,,MAP转成STRING再去重
category,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
reference_count,int,freq_int_max,,,在非空值里取词频最高;词频相同取最大值
citation_count,int,freq_int_max,,,在非空值里取词频最高;词频相同取最大值
influential_citation_count,int,freq_int_max,,,在非空值里取词频最高;词频相同取最大值
references,array<string>,dedup_array,,,数组聚合去重
related_works,array<string>,dedup_array,,,数组聚合去重
citation_normalized_percentile,"MAP<STRING,STRING>",merge_map,,,"MAP聚合,key去重,相同key取max(value)"
cited_by_percentile_year,"MAP<STRING,STRING>",merge_map,,,"MAP聚合,key去重,相同key取max(value)"
fwci,"decimal(15,4)",freq_decimal_max,,,在非空值里取词频最高;词频相同取最大值
cited_by_api_url,string,freq_lex_max,,,在非空值里取词频最高;词频相同取字典序最大值
mesh,"ARRAY<MAP<STRING,STRING>>",dedup_map,,,MAP转成STRING再去重
classifications,"STRUCT<mesh:ARRAY<MAP<STRING,STRING>>,msc_class:STRING,acm_class:STRING,arxiv_category:ARRAY<STRING>>",random_pick_cls,,,"mesh随机取一条不为空的值;msc_class/acm_class/arxiv_category从arxiv记录取"
dt,string,latest_dt,,,保留最新分区日期
Loading