Source code for algebra.interval_operations

"""
interval algebra primitives.
"""

from functools import cmp_to_key

from clothesline.algebra.symbols import x_cmp
from clothesline.interval_peg import IntervalPeg
from clothesline.exceptions import InvalidCombineEndState


[docs]def combine_intervals( # noqa: PLR0914, PLR0912 int_maker, interval_iterables, combiner_function=lambda q: q[0], ): """ The main workhorse for interval algebra. A list of N iterables over intervals is combined according to some prescriptions. `int_maker` is a function from (peg1, peg2) to an instance of the desired interval* class (e.g. the constructor "Interval" itself). `interval_iterables` is a list of iterables, each consisting of several intervals. Each of the iterables is an "operand" in some arbitrary (set-, i.e. boolean-) prescription to combine them. With a single iterable as input (N=1), i.e. `interval_iterables = [[int1, int2, ...]]` these are made into a normalized-form list of intervals. With two iterables as input (N=2), `interval_iterables = [[intA1, intA2, ...], [intB1, intB2, ...]]` one can achieve - setA U setB (default) - setA - setB - setA ^ setB depending on the passed combiner_function. `combiner_function` is a function from a tuple of N booleans to a single boolean: it specifies whether a point or an open interval will be in the result according to whether it was contained in each of the N inputs: - (q0, q1) => q0 or q1 # for union - (q0, q1) => q0 and (not q1) # for set difference - (q0, q1) => q0 xor q1 # for '^' N > 2 will presumably never be used. """ interval_lists = [ list(interval_ite) for interval_ite in interval_iterables ] # noqa: E501 n_i_lists = len(interval_lists) # 1. 'split' phase markers = sorted( set( peg.value for i_list in interval_lists for interval in i_list for peg in interval.pegs() ), key=cmp_to_key(x_cmp), ) m_index_map = {val: index for index, val in enumerate(markers)} point_included = {marker: {} for marker in markers} range_included = {marker: {} for marker in markers} # looping labeling to be tweaked for multi-source case for i_list_index, i_list in enumerate(interval_lists): for interval in i_list: # ends pointlike status for peg in interval.pegs(): if peg.included: point_included[peg.value][i_list_index] = True # internal pointlike values, if any begin_peg = interval.begin end_peg = interval.end begin_m_index = m_index_map[begin_peg.value] end_m_index = m_index_map[end_peg.value] for internal_m_index in range(begin_m_index + 1, end_m_index): internal_marker = markers[internal_m_index] point_included[internal_marker][i_list_index] = True # ranges (first + internal if any) for m_index in range(begin_m_index, end_m_index): marker = markers[m_index] range_included[marker][i_list_index] = True # 2. 'project' phase, using combiner_function point_merged = {marker: False for marker in markers} range_merged = {marker: False for marker in markers} for marker in markers: point_merged[marker] = combiner_function( [ point_included[marker].get(i_list_index, False) for i_list_index in range(n_i_lists) ] ) for marker in markers[:-1]: range_merged[marker] = combiner_function( [ range_included[marker].get(i_list_index, False) for i_list_index in range(n_i_lists) ] ) # # 3. 'merge' phase final_intervals = [] i_buffer = [] # a mutable state for m_index, marker in enumerate(markers): next_is_range = range_merged[marker] point_included = point_merged[marker] if not i_buffer: if next_is_range: i_buffer = [(marker, point_included), None] else: if point_included: # this point, isolated, is a zero-length interval: final_intervals.append( int_maker( IntervalPeg(marker, True), IntervalPeg(marker, True), ) ) else: # no-op pass else: # i_buffer exists already if next_is_range: if point_included: # write/extend current buffer's endpoint i_buffer[1] = (marker, point_included) else: # a hole: finalize/flush buffer and re-init it at once i_buffer[1] = (marker, point_included) final_intervals.append( int_maker( IntervalPeg(i_buffer[0][0], i_buffer[0][1]), IntervalPeg(i_buffer[1][0], i_buffer[1][1]), ) ) i_buffer = [(marker, point_included), None] else: # end of the interval being built: flush and reset buffer i_buffer[1] = (marker, point_included) final_intervals.append( int_maker( IntervalPeg(i_buffer[0][0], i_buffer[0][1]), IntervalPeg(i_buffer[1][0], i_buffer[1][1]), ) ) i_buffer = [] # if i_buffer: raise InvalidCombineEndState("Inconsistent end state in merge phase") return final_intervals