@dataclass(repr=False)classSpanNode:"""A node in the span tree; provides references to parents/children for easy traversal and queries."""name:strtrace_id:intspan_id:intparent_span_id:int|Nonestart_timestamp:datetimeend_timestamp:datetimeattributes:dict[str,AttributeValue]@propertydefduration(self)->timedelta:"""Return the span's duration as a timedelta, or None if start/end not set."""returnself.end_timestamp-self.start_timestamp@propertydefchildren(self)->list[SpanNode]:returnlist(self.children_by_id.values())@propertydefdescendants(self)->list[SpanNode]:"""Return all descendants of this node in DFS order."""returnself.find_descendants(lambda_:True)@propertydefancestors(self)->list[SpanNode]:"""Return all ancestors of this node."""returnself.find_ancestors(lambda_:True)@propertydefnode_key(self)->str:returnf'{self.trace_id:032x}:{self.span_id:016x}'@propertydefparent_node_key(self)->str|None:returnNoneifself.parent_span_idisNoneelsef'{self.trace_id:032x}:{self.parent_span_id:016x}'# -------------------------------------------------------------------------# Construction# -------------------------------------------------------------------------def__post_init__(self):self.parent:SpanNode|None=Noneself.children_by_id:dict[str,SpanNode]={}@staticmethoddeffrom_readable_span(span:ReadableSpan)->SpanNode:assertspan.contextisnotNone,'Span has no context'assertspan.start_timeisnotNone,'Span has no start time'assertspan.end_timeisnotNone,'Span has no end time'returnSpanNode(name=span.name,trace_id=span.context.trace_id,span_id=span.context.span_id,parent_span_id=span.parent.span_idifspan.parentelseNone,start_timestamp=datetime.fromtimestamp(span.start_time/1e9,tz=timezone.utc),end_timestamp=datetime.fromtimestamp(span.end_time/1e9,tz=timezone.utc),attributes=dict(span.attributesor{}),)defadd_child(self,child:SpanNode)->None:"""Attach a child node to this node's list of children."""assertchild.trace_id==self.trace_id,f"traces don't match: {child.trace_id:032x} != {self.trace_id:032x}"assertchild.parent_span_id==self.span_id,(f'parent span mismatch: {child.parent_span_id:016x} != {self.span_id:016x}')self.children_by_id[child.node_key]=childchild.parent=self# -------------------------------------------------------------------------# Child queries# -------------------------------------------------------------------------deffind_children(self,predicate:SpanQuery|SpanPredicate)->list[SpanNode]:"""Return all immediate children that satisfy the given predicate."""returnlist(self._filter_children(predicate))deffirst_child(self,predicate:SpanQuery|SpanPredicate)->SpanNode|None:"""Return the first immediate child that satisfies the given predicate, or None if none match."""returnnext(self._filter_children(predicate),None)defany_child(self,predicate:SpanQuery|SpanPredicate)->bool:"""Returns True if there is at least one child that satisfies the predicate."""returnself.first_child(predicate)isnotNonedef_filter_children(self,predicate:SpanQuery|SpanPredicate)->Iterator[SpanNode]:return(childforchildinself.childrenifchild.matches(predicate))# -------------------------------------------------------------------------# Descendant queries (DFS)# -------------------------------------------------------------------------deffind_descendants(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->list[SpanNode]:"""Return all descendant nodes that satisfy the given predicate in DFS order."""returnlist(self._filter_descendants(predicate,stop_recursing_when))deffirst_descendant(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->SpanNode|None:"""DFS: Return the first descendant (in DFS order) that satisfies the given predicate, or `None` if none match."""returnnext(self._filter_descendants(predicate,stop_recursing_when),None)defany_descendant(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->bool:"""Returns `True` if there is at least one descendant that satisfies the predicate."""returnself.first_descendant(predicate,stop_recursing_when)isnotNonedef_filter_descendants(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None)->Iterator[SpanNode]:stack=list(self.children)whilestack:node=stack.pop()ifnode.matches(predicate):yieldnodeifstop_recursing_whenisnotNoneandnode.matches(stop_recursing_when):continuestack.extend(node.children)# -------------------------------------------------------------------------# Ancestor queries (DFS "up" the chain)# -------------------------------------------------------------------------deffind_ancestors(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->list[SpanNode]:"""Return all ancestors that satisfy the given predicate."""returnlist(self._filter_ancestors(predicate,stop_recursing_when))deffirst_ancestor(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->SpanNode|None:"""Return the closest ancestor that satisfies the given predicate, or `None` if none match."""returnnext(self._filter_ancestors(predicate,stop_recursing_when),None)defany_ancestor(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->bool:"""Returns True if any ancestor satisfies the predicate."""returnself.first_ancestor(predicate,stop_recursing_when)isnotNonedef_filter_ancestors(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None)->Iterator[SpanNode]:node=self.parentwhilenode:ifnode.matches(predicate):yieldnodeifstop_recursing_whenisnotNoneandnode.matches(stop_recursing_when):breaknode=node.parent# -------------------------------------------------------------------------# Query matching# -------------------------------------------------------------------------defmatches(self,query:SpanQuery|SpanPredicate)->bool:"""Check if the span node matches the query conditions or predicate."""ifcallable(query):returnquery(self)returnself._matches_query(query)def_matches_query(self,query:SpanQuery)->bool:# noqa C901"""Check if the span matches the query conditions."""# Logical combinationsifor_:=query.get('or_'):iflen(query)>1:raiseValueError("Cannot combine 'or_' conditions with other conditions at the same level")returnany(self._matches_query(q)forqinor_)ifnot_:=query.get('not_'):ifself._matches_query(not_):returnFalseifand_:=query.get('and_'):results=[self._matches_query(q)forqinand_]ifnotall(results):returnFalse# At this point, all existing ANDs and no existing ORs have passed, so it comes down to this condition# Name conditionsif(name_equals:=query.get('name_equals'))andself.name!=name_equals:returnFalseif(name_contains:=query.get('name_contains'))andname_containsnotinself.name:returnFalseif(name_matches_regex:=query.get('name_matches_regex'))andnotre.match(name_matches_regex,self.name):returnFalse# Attribute conditionsif(has_attributes:=query.get('has_attributes'))andnotall(self.attributes.get(key)==valueforkey,valueinhas_attributes.items()):returnFalseif(has_attributes_keys:=query.get('has_attribute_keys'))andnotall(keyinself.attributesforkeyinhas_attributes_keys):returnFalse# Timing conditionsif(min_duration:=query.get('min_duration'))isnotNone:ifnotisinstance(min_duration,timedelta):min_duration=timedelta(seconds=min_duration)ifself.duration<min_duration:returnFalseif(max_duration:=query.get('max_duration'))isnotNone:ifnotisinstance(max_duration,timedelta):max_duration=timedelta(seconds=max_duration)ifself.duration>max_duration:returnFalse# Children conditionsif(min_child_count:=query.get('min_child_count'))andlen(self.children)<min_child_count:returnFalseif(max_child_count:=query.get('max_child_count'))andlen(self.children)>max_child_count:returnFalseif(some_child_has:=query.get('some_child_has'))andnotany(child._matches_query(some_child_has)forchildinself.children):returnFalseif(all_children_have:=query.get('all_children_have'))andnotall(child._matches_query(all_children_have)forchildinself.children):returnFalseif(no_child_has:=query.get('no_child_has'))andany(child._matches_query(no_child_has)forchildinself.children):returnFalse# Descendant conditions# The following local functions with cache decorators are used to avoid repeatedly evaluating these properties@cachedefdescendants():returnself.descendants@cachedefpruned_descendants():stop_recursing_when=query.get('stop_recursing_when')return(self._filter_descendants(lambda_:True,stop_recursing_when)ifstop_recursing_whenelsedescendants())if(min_descendant_count:=query.get('min_descendant_count'))andlen(descendants())<min_descendant_count:returnFalseif(max_descendant_count:=query.get('max_descendant_count'))andlen(descendants())>max_descendant_count:returnFalseif(some_descendant_has:=query.get('some_descendant_has'))andnotany(descendant._matches_query(some_descendant_has)fordescendantinpruned_descendants()):returnFalseif(all_descendants_have:=query.get('all_descendants_have'))andnotall(descendant._matches_query(all_descendants_have)fordescendantinpruned_descendants()):returnFalseif(no_descendant_has:=query.get('no_descendant_has'))andany(descendant._matches_query(no_descendant_has)fordescendantinpruned_descendants()):returnFalse# Ancestor conditions# The following local functions with cache decorators are used to avoid repeatedly evaluating these properties@cachedefancestors():returnself.ancestors@cachedefpruned_ancestors():stop_recursing_when=query.get('stop_recursing_when')returnself._filter_ancestors(lambda_:True,stop_recursing_when)ifstop_recursing_whenelseancestors()if(min_depth:=query.get('min_depth'))andlen(ancestors())<min_depth:returnFalseif(max_depth:=query.get('max_depth'))andlen(ancestors())>max_depth:returnFalseif(some_ancestor_has:=query.get('some_ancestor_has'))andnotany(ancestor._matches_query(some_ancestor_has)forancestorinpruned_ancestors()):returnFalseif(all_ancestors_have:=query.get('all_ancestors_have'))andnotall(ancestor._matches_query(all_ancestors_have)forancestorinpruned_ancestors()):returnFalseif(no_ancestor_has:=query.get('no_ancestor_has'))andany(ancestor._matches_query(no_ancestor_has)forancestorinpruned_ancestors()):returnFalsereturnTrue# -------------------------------------------------------------------------# String representation# -------------------------------------------------------------------------defrepr_xml(self,include_children:bool=True,include_trace_id:bool=False,include_span_id:bool=False,include_start_timestamp:bool=False,include_duration:bool=False,)->str:"""Return an XML-like string representation of the node. Optionally includes children, trace_id, span_id, start_timestamp, and duration. """first_line_parts=[f'<SpanNode name={self.name!r}']ifinclude_trace_id:first_line_parts.append(f"trace_id='{self.trace_id:032x}'")ifinclude_span_id:first_line_parts.append(f"span_id='{self.span_id:016x}'")ifinclude_start_timestamp:first_line_parts.append(f'start_timestamp={self.start_timestamp.isoformat()!r}')ifinclude_duration:first_line_parts.append(f"duration='{self.duration}'")extra_lines:list[str]=[]ifinclude_childrenandself.children:first_line_parts.append('>')forchildinself.children:extra_lines.append(indent(child.repr_xml(include_children=include_children,include_trace_id=include_trace_id,include_span_id=include_span_id,include_start_timestamp=include_start_timestamp,include_duration=include_duration,),' ',))extra_lines.append('</SpanNode>')else:ifself.children:first_line_parts.append('children=...')first_line_parts.append('/>')return'\n'.join([' '.join(first_line_parts),*extra_lines])def__str__(self)->str:ifself.children:returnf"<SpanNode name={self.name!r} span_id='{self.span_id:016x}'>...</SpanNode>"else:returnf"<SpanNode name={self.name!r} span_id='{self.span_id:016x}' />"def__repr__(self)->str:returnself.repr_xml()
Attach a child node to this node's list of children.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
151152153154155156157158
defadd_child(self,child:SpanNode)->None:"""Attach a child node to this node's list of children."""assertchild.trace_id==self.trace_id,f"traces don't match: {child.trace_id:032x} != {self.trace_id:032x}"assertchild.parent_span_id==self.span_id,(f'parent span mismatch: {child.parent_span_id:016x} != {self.span_id:016x}')self.children_by_id[child.node_key]=childchild.parent=self
Return all immediate children that satisfy the given predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
163164165
deffind_children(self,predicate:SpanQuery|SpanPredicate)->list[SpanNode]:"""Return all immediate children that satisfy the given predicate."""returnlist(self._filter_children(predicate))
Return the first immediate child that satisfies the given predicate, or None if none match.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
167168169
deffirst_child(self,predicate:SpanQuery|SpanPredicate)->SpanNode|None:"""Return the first immediate child that satisfies the given predicate, or None if none match."""returnnext(self._filter_children(predicate),None)
Returns True if there is at least one child that satisfies the predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
171172173
defany_child(self,predicate:SpanQuery|SpanPredicate)->bool:"""Returns True if there is at least one child that satisfies the predicate."""returnself.first_child(predicate)isnotNone
Return all descendant nodes that satisfy the given predicate in DFS order.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
181182183184185
deffind_descendants(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->list[SpanNode]:"""Return all descendant nodes that satisfy the given predicate in DFS order."""returnlist(self._filter_descendants(predicate,stop_recursing_when))
DFS: Return the first descendant (in DFS order) that satisfies the given predicate, or None if none match.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
187188189190191
deffirst_descendant(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->SpanNode|None:"""DFS: Return the first descendant (in DFS order) that satisfies the given predicate, or `None` if none match."""returnnext(self._filter_descendants(predicate,stop_recursing_when),None)
Returns True if there is at least one descendant that satisfies the predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
193194195196197
defany_descendant(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->bool:"""Returns `True` if there is at least one descendant that satisfies the predicate."""returnself.first_descendant(predicate,stop_recursing_when)isnotNone
Return all ancestors that satisfy the given predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
214215216217218
deffind_ancestors(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->list[SpanNode]:"""Return all ancestors that satisfy the given predicate."""returnlist(self._filter_ancestors(predicate,stop_recursing_when))
Return the closest ancestor that satisfies the given predicate, or None if none match.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
220221222223224
deffirst_ancestor(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->SpanNode|None:"""Return the closest ancestor that satisfies the given predicate, or `None` if none match."""returnnext(self._filter_ancestors(predicate,stop_recursing_when),None)
Returns True if any ancestor satisfies the predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
226227228229230
defany_ancestor(self,predicate:SpanQuery|SpanPredicate,stop_recursing_when:SpanQuery|SpanPredicate|None=None)->bool:"""Returns True if any ancestor satisfies the predicate."""returnself.first_ancestor(predicate,stop_recursing_when)isnotNone
Check if the span node matches the query conditions or predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
246247248249250251
defmatches(self,query:SpanQuery|SpanPredicate)->bool:"""Check if the span node matches the query conditions or predicate."""ifcallable(query):returnquery(self)returnself._matches_query(query)
defrepr_xml(self,include_children:bool=True,include_trace_id:bool=False,include_span_id:bool=False,include_start_timestamp:bool=False,include_duration:bool=False,)->str:"""Return an XML-like string representation of the node. Optionally includes children, trace_id, span_id, start_timestamp, and duration. """first_line_parts=[f'<SpanNode name={self.name!r}']ifinclude_trace_id:first_line_parts.append(f"trace_id='{self.trace_id:032x}'")ifinclude_span_id:first_line_parts.append(f"span_id='{self.span_id:016x}'")ifinclude_start_timestamp:first_line_parts.append(f'start_timestamp={self.start_timestamp.isoformat()!r}')ifinclude_duration:first_line_parts.append(f"duration='{self.duration}'")extra_lines:list[str]=[]ifinclude_childrenandself.children:first_line_parts.append('>')forchildinself.children:extra_lines.append(indent(child.repr_xml(include_children=include_children,include_trace_id=include_trace_id,include_span_id=include_span_id,include_start_timestamp=include_start_timestamp,include_duration=include_duration,),' ',))extra_lines.append('</SpanNode>')else:ifself.children:first_line_parts.append('children=...')first_line_parts.append('/>')return'\n'.join([' '.join(first_line_parts),*extra_lines])
classSpanQuery(TypedDict,total=False):"""A serializable query for filtering SpanNodes based on various conditions. All fields are optional and combined with AND logic by default. """# These fields are ordered to match the implementation of SpanNode.matches_query for easy review.# * Individual span conditions come first because these are generally the cheapest to evaluate# * Logical combinations come next because they may just be combinations of individual span conditions# * Related-span conditions come last because they may require the most work to evaluate# Individual span conditions## Name conditionsname_equals:strname_contains:strname_matches_regex:str# regex pattern## Attribute conditionshas_attributes:dict[str,Any]has_attribute_keys:list[str]## Timing conditionsmin_duration:timedelta|floatmax_duration:timedelta|float# Logical combinations of conditionsnot_:SpanQueryand_:list[SpanQuery]or_:list[SpanQuery]# Child conditionsmin_child_count:intmax_child_count:intsome_child_has:SpanQueryall_children_have:SpanQueryno_child_has:SpanQuery# Recursive conditionsstop_recursing_when:SpanQuery"""If present, stop recursing through ancestors or descendants at nodes that match this condition."""## Descendant conditionsmin_descendant_count:intmax_descendant_count:intsome_descendant_has:SpanQueryall_descendants_have:SpanQueryno_descendant_has:SpanQuery## Ancestor conditionsmin_depth:int# depth is equivalent to ancestor count; roots have depth 0max_depth:intsome_ancestor_has:SpanQueryall_ancestors_have:SpanQueryno_ancestor_has:SpanQuery
@dataclass(repr=False)classSpanTree:"""A container that builds a hierarchy of SpanNode objects from a list of finished spans. You can then search or iterate the tree to make your assertions (using DFS for traversal). """roots:list[SpanNode]=field(default_factory=list)nodes_by_id:dict[str,SpanNode]=field(default_factory=dict)# -------------------------------------------------------------------------# Construction# -------------------------------------------------------------------------def__post_init__(self):self._rebuild_tree()defadd_spans(self,spans:list[SpanNode])->None:"""Add a list of spans to the tree, rebuilding the tree structure."""forspaninspans:self.nodes_by_id[span.node_key]=spanself._rebuild_tree()defadd_readable_spans(self,readable_spans:list[ReadableSpan]):self.add_spans([SpanNode.from_readable_span(span)forspaninreadable_spans])def_rebuild_tree(self):# Ensure spans are ordered by start_timestamp so that roots and children end up in the right ordernodes=list(self.nodes_by_id.values())nodes.sort(key=lambdanode:node.start_timestampordatetime.min)self.nodes_by_id={node.node_key:nodefornodeinnodes}# Build the parent/child relationshipsfornodeinself.nodes_by_id.values():parent_node_key=node.parent_node_keyifparent_node_keyisnotNone:parent_node=self.nodes_by_id.get(parent_node_key)ifparent_nodeisnotNone:parent_node.add_child(node)# Determine the roots# A node is a "root" if its parent is None or if its parent's span_id is not in the current set of spans.self.roots=[]fornodeinself.nodes_by_id.values():parent_node_key=node.parent_node_keyifparent_node_keyisNoneorparent_node_keynotinself.nodes_by_id:self.roots.append(node)# -------------------------------------------------------------------------# Node filtering and iteration# -------------------------------------------------------------------------deffind(self,predicate:SpanQuery|SpanPredicate)->list[SpanNode]:"""Find all nodes in the entire tree that match the predicate, scanning from each root in DFS order."""returnlist(self._filter(predicate))deffirst(self,predicate:SpanQuery|SpanPredicate)->SpanNode|None:"""Find the first node that matches a predicate, scanning from each root in DFS order. Returns `None` if not found."""returnnext(self._filter(predicate),None)defany(self,predicate:SpanQuery|SpanPredicate)->bool:"""Returns True if any node in the tree matches the predicate."""returnself.first(predicate)isnotNonedef_filter(self,predicate:SpanQuery|SpanPredicate)->Iterator[SpanNode]:fornodeinself:ifnode.matches(predicate):yieldnodedef__iter__(self)->Iterator[SpanNode]:"""Return an iterator over all nodes in the tree."""returniter(self.nodes_by_id.values())# -------------------------------------------------------------------------# String representation# -------------------------------------------------------------------------defrepr_xml(self,include_children:bool=True,include_trace_id:bool=False,include_span_id:bool=False,include_start_timestamp:bool=False,include_duration:bool=False,)->str:"""Return an XML-like string representation of the tree, optionally including children, trace_id, span_id, duration, and timestamps."""ifnotself.roots:return'<SpanTree />'repr_parts=['<SpanTree>',*[indent(root.repr_xml(include_children=include_children,include_trace_id=include_trace_id,include_span_id=include_span_id,include_start_timestamp=include_start_timestamp,include_duration=include_duration,),' ',)forrootinself.roots],'</SpanTree>',]return'\n'.join(repr_parts)def__str__(self):returnf'<SpanTree num_roots={len(self.roots)} total_spans={len(self.nodes_by_id)} />'def__repr__(self):returnself.repr_xml()
Add a list of spans to the tree, rebuilding the tree structure.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
454455456457458
defadd_spans(self,spans:list[SpanNode])->None:"""Add a list of spans to the tree, rebuilding the tree structure."""forspaninspans:self.nodes_by_id[span.node_key]=spanself._rebuild_tree()
Find all nodes in the entire tree that match the predicate, scanning from each root in DFS order.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
488489490
deffind(self,predicate:SpanQuery|SpanPredicate)->list[SpanNode]:"""Find all nodes in the entire tree that match the predicate, scanning from each root in DFS order."""returnlist(self._filter(predicate))
Find the first node that matches a predicate, scanning from each root in DFS order. Returns None if not found.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
492493494
deffirst(self,predicate:SpanQuery|SpanPredicate)->SpanNode|None:"""Find the first node that matches a predicate, scanning from each root in DFS order. Returns `None` if not found."""returnnext(self._filter(predicate),None)
Returns True if any node in the tree matches the predicate.
Source code in pydantic_evals/pydantic_evals/otel/span_tree.py
496497498
defany(self,predicate:SpanQuery|SpanPredicate)->bool:"""Returns True if any node in the tree matches the predicate."""returnself.first(predicate)isnotNone
defrepr_xml(self,include_children:bool=True,include_trace_id:bool=False,include_span_id:bool=False,include_start_timestamp:bool=False,include_duration:bool=False,)->str:"""Return an XML-like string representation of the tree, optionally including children, trace_id, span_id, duration, and timestamps."""ifnotself.roots:return'<SpanTree />'repr_parts=['<SpanTree>',*[indent(root.repr_xml(include_children=include_children,include_trace_id=include_trace_id,include_span_id=include_span_id,include_start_timestamp=include_start_timestamp,include_duration=include_duration,),' ',)forrootinself.roots],'</SpanTree>',]return'\n'.join(repr_parts)