Source code for dagster._core.definitions.time_window_partitions

import functools
import hashlib
import json
import re
from datetime import datetime
from enum import Enum
from typing import (

import pendulum

import dagster._check as check
from dagster._annotations import PublicAttr, public
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (

from ..errors import (
from .partition import (
from .partition_key_range import PartitionKeyRange

[docs]class TimeWindow(NamedTuple): """An interval that is closed at the start and open at the end. Attributes: start (datetime): A pendulum datetime that marks the start of the window. end (datetime): A pendulum datetime that marks the end of the window. """ start: PublicAttr[datetime] end: PublicAttr[datetime]
[docs]class TimeWindowPartitionsDefinition( PartitionsDefinition, NamedTuple( "_TimeWindowPartitionsDefinition", [ ("start", PublicAttr[datetime]), ("timezone", PublicAttr[str]), ("end", PublicAttr[Optional[datetime]]), ("fmt", PublicAttr[str]), ("end_offset", PublicAttr[int]), ("cron_schedule", PublicAttr[str]), ], ), ): r"""A set of partitions where each partitions corresponds to a time window. The provided cron_schedule determines the bounds of the time windows. E.g. a cron_schedule of "0 0 \\* \\* \\*" will result in daily partitions that start at midnight and end at midnight of the following day. The string partition_key associated with each partition corresponds to the start of the partition's time window. The first partition in the set will start on at the first cron_schedule tick that is equal to or after the given start datetime. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. Args: cron_schedule (str): Determines the bounds of the time windows. start (datetime): The first partition in the set will start on at the first cron_schedule tick that is equal to or after this value. timezone (Optional[str]): The timezone in which each time should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". end (datetime): The last partition (excluding) in the set. fmt (str): The date format to use for partition_keys. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ def __new__( cls, start: Union[datetime, str], fmt: str, end: Union[datetime, str, None] = None, schedule_type: Optional[ScheduleType] = None, timezone: Optional[str] = None, end_offset: int = 0, minute_offset: Optional[int] = None, hour_offset: Optional[int] = None, day_offset: Optional[int] = None, cron_schedule: Optional[str] = None, ): check.opt_str_param(timezone, "timezone") timezone = timezone or "UTC" if isinstance(start, datetime): start_dt = pendulum.instance(start, tz=timezone) else: start_dt = pendulum.instance(datetime.strptime(start, fmt), tz=timezone) if not end: end_dt = None elif isinstance(end, datetime): end_dt = pendulum.instance(end, tz=timezone) else: end_dt = pendulum.instance(datetime.strptime(end, fmt), tz=timezone) if cron_schedule is not None: check.invariant( schedule_type is None and not minute_offset and not hour_offset and not day_offset, ( "If cron_schedule argument is provided, then schedule_type, minute_offset, " "hour_offset, and day_offset can't also be provided" ), ) else: if schedule_type is None: check.failed("One of schedule_type and cron_schedule must be provided") cron_schedule = cron_schedule_from_schedule_type_and_offsets( schedule_type=schedule_type, minute_offset=minute_offset or 0, hour_offset=hour_offset or 0, day_offset=day_offset or 0, ) if not is_valid_cron_schedule(cron_schedule): raise DagsterInvalidDefinitionError( f"Found invalid cron schedule '{cron_schedule}' for a" " TimeWindowPartitionsDefinition." ) return super(TimeWindowPartitionsDefinition, cls).__new__( cls, start_dt, timezone, end_dt, fmt, end_offset, cron_schedule ) def get_current_timestamp(self, current_time: Optional[datetime] = None) -> float: return ( pendulum.instance(current_time, tz=self.timezone) if current_time else ).timestamp() def get_num_partitions( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> int: # Method added for performance reasons. # Fetching partition keys requires significantly more compute time to # string format datetimes. current_timestamp = self.get_current_timestamp(current_time=current_time) partitions_past_current_time = 0 num_partitions = 0 for time_window in self._iterate_time_windows(self.start): if self.end and time_window.end.timestamp() > self.end.timestamp(): break if ( time_window.end.timestamp() <= current_timestamp or partitions_past_current_time < self.end_offset ): num_partitions += 1 if time_window.end.timestamp() > current_timestamp: partitions_past_current_time += 1 else: break if self.end_offset < 0: num_partitions += self.end_offset return num_partitions def get_partition_keys_between_indexes( self, start_idx: int, end_idx: int, current_time: Optional[datetime] = None ) -> List[str]: # Fetches the partition keys between the given start and end indices. # Start index is inclusive, end index is exclusive. # Method added for performance reasons, to only string format # partition keys included within the indices. current_timestamp = self.get_current_timestamp(current_time=current_time) partitions_past_current_time = 0 partition_keys = [] reached_end = False for idx, time_window in enumerate(self._iterate_time_windows(self.start)): if time_window.end.timestamp() >= current_timestamp: reached_end = True if self.end and time_window.end.timestamp() > self.end.timestamp(): reached_end = True if ( time_window.end.timestamp() <= current_timestamp or partitions_past_current_time < self.end_offset ): if idx >= start_idx and idx < end_idx: partition_keys.append(time_window.start.strftime(self.fmt)) if time_window.end.timestamp() > current_timestamp: partitions_past_current_time += 1 else: break if len(partition_keys) >= end_idx - start_idx: break if reached_end and self.end_offset < 0: partition_keys = partition_keys[: self.end_offset] return partition_keys def get_partition_keys( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[str]: current_timestamp = self.get_current_timestamp(current_time=current_time) partitions_past_current_time = 0 partition_keys: List[str] = [] for time_window in self._iterate_time_windows(self.start): if self.end and time_window.end.timestamp() > self.end.timestamp(): break if ( time_window.end.timestamp() <= current_timestamp or partitions_past_current_time < self.end_offset ): partition_keys.append(time_window.start.strftime(self.fmt)) if time_window.end.timestamp() > current_timestamp: partitions_past_current_time += 1 else: break if self.end_offset < 0: partition_keys = partition_keys[: self.end_offset] return partition_keys def _get_validated_time_window_for_partition_key( self, partition_key: str, current_time: Optional[datetime] = None ) -> Optional[TimeWindow]: """Returns a TimeWindow for the given partition key if it is valid, otherwise returns None. """ try: time_window = self.time_window_for_partition_key(partition_key) except ValueError: return None first_partition_window = self.get_first_partition_window(current_time=current_time) last_partition_window = self.get_last_partition_window(current_time=current_time) if ( first_partition_window is None or last_partition_window is None or time_window.start < first_partition_window.start or time_window.start > last_partition_window.start or time_window.start.strftime(self.fmt) != partition_key ): return None return time_window def __str__(self) -> str: schedule_str = ( self.schedule_type.value.capitalize() if self.schedule_type else self.cron_schedule ) partition_def_str = ( f"{schedule_str}, starting {self.start.strftime(self.fmt)} {self.timezone}." ) if self.end_offset != 0: partition_def_str += ( " End offsetted by" f" {self.end_offset} partition{'' if self.end_offset == 1 else 's'}." ) return partition_def_str def __repr__(self): # Between python 3.8 and 3.9 the repr of a datetime object changed. # Replaces start time with timestamp as a workaround to make sure the repr is consistent across versions. return ( f"TimeWindowPartitionsDefinition(start={self.start.timestamp()}," f" timezone='{self.timezone}', fmt='{self.fmt}', end_offset={self.end_offset}," f" cron_schedule='{self.cron_schedule}')" ) def __hash__(self): return hash(tuple(self.__repr__())) @functools.lru_cache(maxsize=100) def _time_window_for_partition_key(self, *, partition_key: str) -> TimeWindow: partition_key_dt = pendulum.instance( datetime.strptime(partition_key, self.fmt), tz=self.timezone ) return next(iter(self._iterate_time_windows(partition_key_dt))) def time_window_for_partition_key(self, partition_key: str) -> TimeWindow: return self._time_window_for_partition_key(partition_key=partition_key) def time_windows_for_partition_keys( self, partition_keys: Sequence[str], ) -> Sequence[TimeWindow]: if len(partition_keys) == 0: return [] sorted_pks = sorted(partition_keys, key=lambda pk: datetime.strptime(pk, self.fmt)) cur_windows_iterator = iter( self._iterate_time_windows( pendulum.instance(datetime.strptime(sorted_pks[0], self.fmt), tz=self.timezone) ) ) partition_key_time_windows: List[TimeWindow] = [] for partition_key in sorted_pks: next_window = next(cur_windows_iterator) if next_window.start.strftime(self.fmt) == partition_key: partition_key_time_windows.append(next_window) else: cur_windows_iterator = iter( self._iterate_time_windows( pendulum.instance( datetime.strptime(partition_key, self.fmt), tz=self.timezone ) ) ) partition_key_time_windows.append(next(cur_windows_iterator)) start_time_window = self.get_first_partition_window() end_time_window = self.get_last_partition_window() if end_time_window is None or start_time_window is None: check.failed("No partitions in the PartitionsDefinition") start_timestamp = start_time_window.start.timestamp() end_timestamp = end_time_window.end.timestamp() partition_key_time_windows = [ tw for tw in partition_key_time_windows if tw.start.timestamp() >= start_timestamp and tw.end.timestamp() <= end_timestamp ] return partition_key_time_windows def start_time_for_partition_key(self, partition_key: str) -> datetime: partition_key_dt = pendulum.instance( datetime.strptime(partition_key, self.fmt), tz=self.timezone ) # the datetime format might not include granular components, so we need to recover them # we make the assumption that the parsed partition key is <= the start datetime return next(iter(self._iterate_time_windows(partition_key_dt))).start def get_next_partition_key( self, partition_key: str, current_time: Optional[datetime] = None ) -> Optional[str]: last_partition_window = self.get_last_partition_window(current_time) if last_partition_window is None: return None partition_key_dt = pendulum.instance( datetime.strptime(partition_key, self.fmt), tz=self.timezone ) windows_iter = iter(self._iterate_time_windows(partition_key_dt)) next(windows_iter) start_time = next(windows_iter).start if start_time >= last_partition_window.end: return None else: return start_time.strftime(self.fmt) def get_next_partition_window( self, end_dt: datetime, current_time: Optional[datetime] = None ) -> Optional[TimeWindow]: last_partition_window = self.get_last_partition_window(current_time) if last_partition_window is None: return None windows_iter = iter(self._iterate_time_windows(end_dt)) next_window = next(windows_iter) if next_window.start >= last_partition_window.end: return None else: return next_window def get_prev_partition_window(self, start_dt: datetime) -> Optional[TimeWindow]: windows_iter = iter(self._reverse_iterate_time_windows(start_dt)) prev_window = next(windows_iter) first_partition_window = self.get_first_partition_window() if first_partition_window is None or prev_window.start < first_partition_window.start: return None else: return prev_window @functools.lru_cache(maxsize=5) def _get_first_partition_window(self, *, current_time: datetime) -> Optional[TimeWindow]: current_timestamp = current_time.timestamp() time_window = next(iter(self._iterate_time_windows(self.start))) if self.end_offset == 0: return time_window if time_window.end.timestamp() <= current_timestamp else None elif self.end_offset > 0: iterator = iter(self._iterate_time_windows(current_time)) # first returned time window is time window of current time curr_window_plus_offset = next(iterator) for _ in range(self.end_offset): curr_window_plus_offset = next(iterator) return ( time_window if time_window.end.timestamp() <= curr_window_plus_offset.start.timestamp() else None ) else: # end offset < 0 end_window = None iterator = iter(self._reverse_iterate_time_windows(current_time)) for _ in range(abs(self.end_offset)): end_window = next(iterator) if end_window is None: check.failed("end_window should not be None") return ( time_window if time_window.end.timestamp() <= end_window.start.timestamp() else None ) def get_first_partition_window( self, current_time: Optional[datetime] = None ) -> Optional[TimeWindow]: current_time = cast( datetime, pendulum.instance(current_time, tz=self.timezone) if current_time else, ) return self._get_first_partition_window(current_time=current_time) @functools.lru_cache(maxsize=5) def _get_last_partition_window(self, *, current_time: datetime) -> Optional[TimeWindow]: if self.get_first_partition_window(current_time) is None: return None current_time = ( pendulum.instance(current_time, tz=self.timezone) if current_time else ) if self.end and self.end < current_time: current_time = self.end if self.end_offset == 0: return next(iter(self._reverse_iterate_time_windows(current_time))) else: # TODO: make this efficient last_partition_key = super().get_last_partition_key(current_time) return ( self.time_window_for_partition_key(last_partition_key) if last_partition_key else None ) def get_last_partition_window( self, current_time: Optional[datetime] = None ) -> Optional[TimeWindow]: current_time = cast( datetime, pendulum.instance(current_time, tz=self.timezone) if current_time else, ) return self._get_last_partition_window(current_time=current_time) def get_first_partition_key( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Optional[str]: first_window = self.get_first_partition_window(current_time) if first_window is None: return None return first_window.start.strftime(self.fmt) def get_last_partition_key( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Optional[str]: last_window = self.get_last_partition_window(current_time) if last_window is None: return None return last_window.start.strftime(self.fmt) def end_time_for_partition_key(self, partition_key: str) -> datetime: return self.time_window_for_partition_key(partition_key).end def get_partition_keys_in_time_window(self, time_window: TimeWindow) -> Sequence[str]: result: List[str] = [] for partition_time_window in self._iterate_time_windows(time_window.start): if partition_time_window.start < time_window.end: result.append(partition_time_window.start.strftime(self.fmt)) else: break return result def get_partition_key_range_for_time_window(self, time_window: TimeWindow) -> PartitionKeyRange: start_partition_key = self.get_partition_key_for_timestamp(time_window.start.timestamp()) end_partition_key = self.get_partition_key_for_timestamp( cast(TimeWindow, self.get_prev_partition_window(time_window.end)).start.timestamp() ) return PartitionKeyRange(start_partition_key, end_partition_key) def get_partition_keys_in_range( self, partition_key_range: PartitionKeyRange, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[str]: start_time = self.start_time_for_partition_key(partition_key_range.start) end_time = self.end_time_for_partition_key(partition_key_range.end) return self.get_partition_keys_in_time_window(TimeWindow(start_time, end_time)) @public @property def schedule_type(self) -> Optional[ScheduleType]: """Optional[ScheduleType]: An enum representing the partition cadence (hourly, daily, weekly, or monthly). """ if re.fullmatch(r"\d+ \* \* \* \*", self.cron_schedule): return ScheduleType.HOURLY elif re.fullmatch(r"\d+ \d+ \* \* \*", self.cron_schedule): return ScheduleType.DAILY elif re.fullmatch(r"\d+ \d+ \* \* \d+", self.cron_schedule): return ScheduleType.WEEKLY elif re.fullmatch(r"\d+ \d+ \d+ \* \*", self.cron_schedule): return ScheduleType.MONTHLY else: return None @public @property def minute_offset(self) -> int: """int: Number of minutes past the hour to "split" partitions. Defaults to 0. For example, returns 15 if each partition starts at 15 minutes past the hour. """ match = re.fullmatch(r"(\d+) (\d+|\*) (\d+|\*) (\d+|\*) (\d+|\*)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no minute offset") return int(match.groups()[0]) @public @property def hour_offset(self) -> int: """int: Number of hours past 00:00 to "split" partitions. Defaults to 0. For example, returns 1 if each partition starts at 01:00. """ match = re.fullmatch(r"(\d+|\*) (\d+) (\d+|\*) (\d+|\*) (\d+|\*)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no hour offset") return int(match.groups()[1]) @public @property def day_offset(self) -> int: """int: For a weekly or monthly partitions definition, returns the day to "split" partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule. For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday. For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month). """ schedule_type = self.schedule_type if schedule_type == ScheduleType.WEEKLY: match = re.fullmatch(r"(\d+|\*) (\d+|\*) (\d+|\*) (\d+|\*) (\d+)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no day offset") return int(match.groups()[4]) elif schedule_type == ScheduleType.MONTHLY: match = re.fullmatch(r"(\d+|\*) (\d+|\*) (\d+) (\d+|\*) (\d+|\*)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no day offset") return int(match.groups()[2]) else: check.failed(f"Unsupported schedule type for day_offset: {schedule_type}")
[docs] @public def get_cron_schedule( self, minute_of_hour: Optional[int] = None, hour_of_day: Optional[int] = None, day_of_week: Optional[int] = None, day_of_month: Optional[int] = None, ) -> str: """The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning. This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am. """ if ( minute_of_hour is None and hour_of_day is None and day_of_week is None and day_of_month is None ): return self.cron_schedule schedule_type = self.schedule_type if schedule_type is None: check.failed( f"{self.cron_schedule} does not support" " minute_of_hour/hour_of_day/day_of_week/day_of_month arguments" ) minute_of_hour = cast( int, check.opt_int_param(minute_of_hour, "minute_of_hour", default=self.minute_offset), ) if schedule_type == ScheduleType.HOURLY: check.invariant( hour_of_day is None, "Cannot set hour parameter with hourly partitions." ) else: hour_of_day = cast( int, check.opt_int_param(hour_of_day, "hour_of_day", default=self.hour_offset) ) if schedule_type == ScheduleType.DAILY: check.invariant( day_of_week is None, "Cannot set day of week parameter with daily partitions." ) check.invariant( day_of_month is None, "Cannot set day of month parameter with daily partitions." ) if schedule_type == ScheduleType.MONTHLY: default = self.day_offset or 1 day_offset = check.opt_int_param(day_of_month, "day_of_month", default=default) elif schedule_type == ScheduleType.WEEKLY: default = self.day_offset or 0 day_offset = check.opt_int_param(day_of_week, "day_of_week", default=default) else: day_offset = 0 return cron_schedule_from_schedule_type_and_offsets( schedule_type, minute_offset=minute_of_hour, hour_offset=hour_of_day or 0, day_offset=day_offset, )
def _iterate_time_windows(self, start: datetime) -> Iterable[TimeWindow]: """Returns an infinite generator of time windows that start after the given start time.""" start_timestamp = pendulum.instance(start, tz=self.timezone).timestamp() iterator = cron_string_iterator( start_timestamp=start_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ) prev_time = next(iterator) while prev_time.timestamp() < start_timestamp: prev_time = next(iterator) while True: next_time = next(iterator) yield TimeWindow(prev_time, next_time) prev_time = next_time def _reverse_iterate_time_windows(self, end: datetime) -> Iterable[TimeWindow]: """Returns an infinite generator of time windows that end before the given end time.""" end_timestamp = pendulum.instance(end, tz=self.timezone).timestamp() iterator = reverse_cron_string_iterator( end_timestamp=end_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ) prev_time = next(iterator) while prev_time.timestamp() > end_timestamp: prev_time = next(iterator) while True: next_time = next(iterator) yield TimeWindow(next_time, prev_time) prev_time = next_time def get_partition_key_for_timestamp(self, timestamp: float, end_closed: bool = False) -> str: """Args: timestamp (float): Timestamp from the unix epoch, UTC. end_closed (bool): Whether the interval is closed at the end or at the beginning. """ iterator = cron_string_iterator( timestamp, self.cron_schedule, self.timezone, start_offset=-1 ) # prev will be < timestamp prev = next(iterator) # prev_next will be >= timestamp prev_next = next(iterator) if end_closed or prev_next.timestamp() > timestamp: return prev.strftime(self.fmt) else: return prev_next.strftime(self.fmt) def less_than(self, partition_key1: str, partition_key2: str) -> bool: """Returns true if the partition_key1 is earlier than partition_key2.""" return self.start_time_for_partition_key( partition_key1 ) < self.start_time_for_partition_key(partition_key2) @property def partitions_subset_class(self) -> Type["PartitionsSubset"]: return TimeWindowPartitionsSubset def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) def is_valid_partition_key(self, partition_key: str) -> bool: try: partition_time = pendulum.instance( datetime.strptime(partition_key, self.fmt), tz=self.timezone ) return partition_time >= self.start except ValueError: return False def get_serializable_unique_identifier( self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None ) -> str: return hashlib.sha1(self.__repr__().encode("utf-8")).hexdigest() def has_partition_key( self, partition_key: str, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> bool: return bool(self._get_validated_time_window_for_partition_key(partition_key, current_time))
[docs]class DailyPartitionsDefinition(TimeWindowPartitionsDefinition): """A set of daily partitions. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python DailyPartitionsDefinition(start_date="2022-03-12") # creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ... DailyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=16) # creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ... """ def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, hour_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): _fmt = fmt or DEFAULT_DATE_FORMAT return super(DailyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.DAILY, start=start_date, end=end_date, minute_offset=minute_offset, hour_offset=hour_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
def wrap_time_window_run_config_fn( run_config_fn: Optional[Callable[[datetime, datetime], Mapping[str, Any]]], partitions_def: TimeWindowPartitionsDefinition, ) -> Callable[[str], Mapping[str, Any]]: def _run_config_wrapper(key: str) -> Mapping[str, Any]: if not run_config_fn: return {} time_window = partitions_def.time_window_for_partition_key(key) return run_config_fn(time_window.start, time_window.end) return _run_config_wrapper def wrap_time_window_tags_fn( tags_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]], partitions_def: TimeWindowPartitionsDefinition, ) -> Callable[[str], Mapping[str, str]]: def _tag_wrapper(key: str) -> Mapping[str, str]: if not tags_fn: return {} time_window = partitions_def.time_window_for_partition_key(key) return tags_fn(time_window.start, time_window.end) return _tag_wrapper
[docs]def daily_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, hour_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[DailyPartitionsDefinition], ]: """Defines run config over a set of daily partitions. The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @daily_partitioned_config(start_date="2022-03-12") # creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ... @daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16) # creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]] ) -> PartitionedConfig[DailyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = DailyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, hour_offset=hour_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
[docs]class HourlyPartitionsDefinition(TimeWindowPartitionsDefinition): """A set of hourly partitions. The first partition in the set will start on the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12)) # creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ... HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12), minute_offset=15) # creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ... """ def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): _fmt = fmt or DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE return super(HourlyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.HOURLY, start=start_date, end=end_date, minute_offset=minute_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
[docs]def hourly_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[HourlyPartitionsDefinition], ]: """Defines run config over a set of hourly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @hourly_partitioned_config(start_date=datetime(2022, 03, 12)) # creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ... @hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15) # creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]] ) -> PartitionedConfig[HourlyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = HourlyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
[docs]class MonthlyPartitionsDefinition(TimeWindowPartitionsDefinition): """A set of monthly partitions. The first partition in the set will start at the soonest first of the month after start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the month to "split" the partition. Defaults to 1. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python MonthlyPartitionsDefinition(start_date="2022-03-12") # creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ... MonthlyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5) # creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ... """ def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 1, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): _fmt = fmt or DEFAULT_DATE_FORMAT return super(MonthlyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.MONTHLY, start=start_date, end=end_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
[docs]def monthly_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 1, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[MonthlyPartitionsDefinition], ]: """Defines run config over a set of monthly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the month to "split" the partition. Defaults to 1. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @monthly_partitioned_config(start_date="2022-03-12") # creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ... @monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5) # creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]] ) -> PartitionedConfig[MonthlyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = MonthlyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
[docs]class WeeklyPartitionsDefinition(TimeWindowPartitionsDefinition): """Defines a set of weekly partitions. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the week to "split" the partition. Defaults to 0 (Sunday). timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python WeeklyPartitionsDefinition(start_date="2022-03-12") # creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ... WeeklyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6) # creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ... """ def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, ): _fmt = fmt or DEFAULT_DATE_FORMAT return super(WeeklyPartitionsDefinition, cls).__new__( cls, schedule_type=ScheduleType.WEEKLY, start=start_date, end=end_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, )
[docs]def weekly_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[WeeklyPartitionsDefinition], ]: """Defines run config over a set of weekly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the week to "split" the partition. Defaults to 0 (Sunday). timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @weekly_partitioned_config(start_date="2022-03-12") # creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ... @weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6) # creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]] ) -> PartitionedConfig[WeeklyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = WeeklyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
class TimeWindowPartitionsSubset(PartitionsSubset): # Every time we change the serialization format, we should increment the version number. # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 def __init__( self, partitions_def: TimeWindowPartitionsDefinition, num_partitions: int, included_time_windows: Optional[Sequence[TimeWindow]] = None, included_partition_keys: Optional[AbstractSet[str]] = None, ): self._partitions_def = check.inst_param( partitions_def, "partitions_def", TimeWindowPartitionsDefinition ) self._included_time_windows = included_time_windows self._num_partitions = num_partitions check.param_invariant( not (included_partition_keys and included_time_windows), "Cannot specify both included_partition_keys and included_time_windows", ) self._included_time_windows = check.opt_nullable_sequence_param( included_time_windows, "included_time_windows", of_type=TimeWindow ) self._included_partition_keys = check.opt_nullable_set_param( included_partition_keys, "included_partition_keys", of_type=str ) @property def included_time_windows(self) -> Sequence[TimeWindow]: if self._included_time_windows is None: result_time_windows, _ = self._add_partitions_to_time_windows( initial_windows=[], partition_keys=list(check.not_none(self._included_partition_keys)), ) self._included_time_windows = result_time_windows return self._included_time_windows def _get_partition_time_windows_not_in_subset( self, current_time: Optional[datetime] = None, ) -> Sequence[TimeWindow]: """Returns a list of partition time windows that are not in the subset. Each time window is a single partition. """ first_tw = self._partitions_def.get_first_partition_window(current_time=current_time) last_tw = self._partitions_def.get_last_partition_window(current_time=current_time) if not first_tw or not last_tw: check.failed("No partitions found") if len(self.included_time_windows) == 0: return [TimeWindow(first_tw.start, last_tw.end)] time_windows = [] if first_tw.start < self.included_time_windows[0].start: time_windows.append(TimeWindow(first_tw.start, self.included_time_windows[0].start)) for i in range(len(self.included_time_windows) - 1): if self.included_time_windows[i].start >= last_tw.end: break if self.included_time_windows[i].end < last_tw.end: if self.included_time_windows[i + 1].start <= last_tw.end: time_windows.append( TimeWindow( self.included_time_windows[i].end, self.included_time_windows[i + 1].start, ) ) else: time_windows.append( TimeWindow( self.included_time_windows[i].end, last_tw.end, ) ) if last_tw.end > self.included_time_windows[-1].end: time_windows.append(TimeWindow(self.included_time_windows[-1].end, last_tw.end)) return time_windows def get_partition_keys_not_in_subset( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: partition_keys: List[str] = [] for tw in self._get_partition_time_windows_not_in_subset(current_time): partition_keys.extend(self._partitions_def.get_partition_keys_in_time_window(tw)) return partition_keys @public def get_partition_keys(self, current_time: Optional[datetime] = None) -> Iterable[str]: if self._included_partition_keys is None: return [ pk for time_window in self.included_time_windows for pk in self._partitions_def.get_partition_keys_in_time_window(time_window) ] return list(self._included_partition_keys) if self._included_partition_keys else [] def get_partition_key_ranges( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: return [ self._partitions_def.get_partition_key_range_for_time_window(window) for window in self.included_time_windows ] def _add_partitions_to_time_windows( self, initial_windows: Sequence[TimeWindow], partition_keys: Sequence[str] ) -> Tuple[Sequence[TimeWindow], int]: """Merges a set of partition keys into an existing set of time windows, returning the minimized set of time windows and the number of partitions added. """ result_windows = [*initial_windows] time_windows = self._partitions_def.time_windows_for_partition_keys( list(partition_keys), ) num_added_partitions = 0 for window in sorted(time_windows): # go in reverse order because it's more common to add partitions at the end than the # beginning for i in reversed(range(len(result_windows))): included_window = result_windows[i] lt_end_of_range = window.start < included_window.end gte_start_of_range = window.start >= included_window.start if lt_end_of_range and gte_start_of_range: break if not lt_end_of_range: merge_with_range = included_window.end == window.start merge_with_later_range = i + 1 < len(result_windows) and ( window.end == result_windows[i + 1].start ) if merge_with_range and merge_with_later_range: result_windows[i] = TimeWindow( included_window.start, result_windows[i + 1].end ) del result_windows[i + 1] elif merge_with_range: result_windows[i] = TimeWindow(included_window.start, window.end) elif merge_with_later_range: result_windows[i + 1] = TimeWindow(window.start, result_windows[i + 1].end) else: result_windows.insert(i + 1, window) num_added_partitions += 1 break else: if result_windows and window.start == result_windows[0].start: result_windows[0] = TimeWindow(window.start, included_window.end) # type: ignore else: result_windows.insert(0, window) num_added_partitions += 1 return result_windows, num_added_partitions def with_partition_keys(self, partition_keys: Iterable[str]) -> "TimeWindowPartitionsSubset": # if we are representing things as a static set of keys, continue doing so if self._included_partition_keys is not None: new_partitions = {*self._included_partition_keys, *partition_keys} return TimeWindowPartitionsSubset( self._partitions_def, num_partitions=len(new_partitions), included_partition_keys=new_partitions, ) result_windows, added_partitions = self._add_partitions_to_time_windows( self.included_time_windows, list(partition_keys) ) return TimeWindowPartitionsSubset( self._partitions_def, num_partitions=self._num_partitions + added_partitions, included_time_windows=result_windows, ) @classmethod def from_serialized( cls, partitions_def: PartitionsDefinition, serialized: str ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) loaded = json.loads(serialized) def tuples_to_time_windows(tuples): return [ TimeWindow( pendulum.from_timestamp(tup[0], tz=partitions_def.timezone), pendulum.from_timestamp(tup[1], tz=partitions_def.timezone), ) for tup in tuples ] if isinstance(loaded, list): # backwards compatibility time_windows = tuples_to_time_windows(loaded) num_partitions = sum( len(partitions_def.get_partition_keys_in_time_window(time_window)) for time_window in time_windows ) elif isinstance(loaded, dict) and ( "version" not in loaded or loaded["version"] == cls.SERIALIZATION_VERSION ): # version 1 time_windows = tuples_to_time_windows(loaded["time_windows"]) num_partitions = loaded["num_partitions"] else: raise DagsterInvalidDeserializationVersionError( f"Attempted to deserialize partition subset with version {loaded.get('version')}," f" but only version {cls.SERIALIZATION_VERSION} is supported." ) return TimeWindowPartitionsSubset( partitions_def, num_partitions=num_partitions, included_time_windows=time_windows ) @classmethod def can_deserialize( cls, partitions_def: PartitionsDefinition, serialized: str, serialized_partitions_def_unique_id: Optional[str], serialized_partitions_def_class_name: Optional[str], ) -> bool: if ( serialized_partitions_def_class_name and serialized_partitions_def_class_name != partitions_def.__class__.__name__ ): return False if serialized_partitions_def_unique_id: return ( partitions_def.get_serializable_unique_identifier() == serialized_partitions_def_unique_id ) data = json.loads(serialized) return isinstance(data, list) or ( isinstance(data, dict) and data.get("time_windows") is not None and data.get("num_partitions") is not None ) @classmethod def empty_subset(cls, partitions_def: PartitionsDefinition) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) return cls(partitions_def, 0, [], set()) def serialize(self) -> str: return json.dumps( { "version": self.SERIALIZATION_VERSION, "time_windows": [ (window.start.timestamp(), window.end.timestamp()) for window in self.included_time_windows ], "num_partitions": self._num_partitions, } ) @property def partitions_def(self) -> PartitionsDefinition: return self._partitions_def def __eq__(self, other): return ( isinstance(other, TimeWindowPartitionsSubset) and self._partitions_def == other._partitions_def and ( # faster comparison, but will not catch all cases ( self._included_time_windows == other._included_time_windows and self._included_partition_keys == other._included_partition_keys ) # slower comparison, catches all cases or self.included_time_windows == other.included_time_windows ) ) def __len__(self) -> int: return self._num_partitions def __contains__(self, partition_key: str) -> bool: if self._included_partition_keys is not None: return partition_key in self._included_partition_keys time_window = self._partitions_def.time_window_for_partition_key(partition_key) return any( time_window.start >= included_time_window.start and time_window.start < included_time_window.end for included_time_window in self.included_time_windows ) def __repr__(self) -> str: return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges()})" class PartitionRangeStatus(Enum): MATERIALIZING = "MATERIALIZING" MATERIALIZED = "MATERIALIZED" FAILED = "FAILED" PARTITION_RANGE_STATUS_PRIORITY = [ PartitionRangeStatus.MATERIALIZING, PartitionRangeStatus.FAILED, PartitionRangeStatus.MATERIALIZED, ] class PartitionTimeWindowStatus: def __init__(self, time_window: TimeWindow, status: PartitionRangeStatus): self.time_window = time_window self.status = status def __repr__(self): return f"({self.time_window.start} - {self.time_window.end}): {self.status.value}" def __eq__(self, other): return ( isinstance(other, PartitionTimeWindowStatus) and self.time_window == other.time_window and self.status == other.status ) def _flatten( high_pri_time_windows: List[PartitionTimeWindowStatus], low_pri_time_windows: List[PartitionTimeWindowStatus], ) -> List[PartitionTimeWindowStatus]: high_pri_time_windows = sorted(high_pri_time_windows, key=lambda t: t.time_window.start) low_pri_time_windows = sorted(low_pri_time_windows, key=lambda t: t.time_window.start) high_pri_idx = 0 low_pri_idx = 0 filtered_low_pri: List[PartitionTimeWindowStatus] = [] # slice and dice the low pri time windows so there's no overlap with high pri while True: if low_pri_idx >= len(low_pri_time_windows): # reached end of materialized break if high_pri_idx >= len(high_pri_time_windows): # reached end of failed, add all remaining materialized bc there's no overlap filtered_low_pri.extend(low_pri_time_windows[low_pri_idx:]) break low_pri_tw = low_pri_time_windows[low_pri_idx] high_pri_tw = high_pri_time_windows[high_pri_idx] if low_pri_tw.time_window.start < high_pri_tw.time_window.start: if low_pri_tw.time_window.end <= high_pri_tw.time_window.start: # low_pri_tw is entirely before high pri filtered_low_pri.append(low_pri_tw) low_pri_idx += 1 else: # high pri cuts the low pri short filtered_low_pri.append( PartitionTimeWindowStatus( TimeWindow( low_pri_tw.time_window.start, high_pri_tw.time_window.start, ), low_pri_tw.status, ) ) if low_pri_tw.time_window.end > high_pri_tw.time_window.end: # the low pri time window will continue on the other end of the high pri # and get split in two. Modify low_pri[low_pri_idx] to be # the second half of the low pri time window. It will be added in the next iteration. # (don't add it now, because we need to check if it overlaps with the next high pri) low_pri_time_windows[low_pri_idx] = PartitionTimeWindowStatus( TimeWindow(high_pri_tw.time_window.end, low_pri_tw.time_window.end), low_pri_tw.status, ) high_pri_idx += 1 else: # the rest of the low pri time window is inside the high pri time window low_pri_idx += 1 else: if low_pri_tw.time_window.start >= high_pri_tw.time_window.end: # high pri is entirely before low pri. The next high pri may overlap high_pri_idx += 1 elif low_pri_tw.time_window.end <= high_pri_tw.time_window.end: # low pri is entirely within high pri, skip it low_pri_idx += 1 else: # high pri cuts out the start of the low pri. It will continue on the other end. # Modify low_pri[low_pri_idx] to shorten the start. It will be added # in the next iteration. (don't add it now, because we need to check if it overlaps with the next high pri) low_pri_time_windows[low_pri_idx] = PartitionTimeWindowStatus( TimeWindow(high_pri_tw.time_window.end, low_pri_tw.time_window.end), low_pri_tw.status, ) high_pri_idx += 1 # combine the high pri windwos with the filtered low pri windows flattened_time_windows = high_pri_time_windows flattened_time_windows.extend(filtered_low_pri) flattened_time_windows.sort(key=lambda t: t.time_window.start) return flattened_time_windows def fetch_flattened_time_window_ranges( subsets: Mapping[PartitionRangeStatus, TimeWindowPartitionsSubset] ) -> Sequence[PartitionTimeWindowStatus]: """Given potentially overlapping subsets, return a flattened list of timewindows where the highest priority status wins on overlaps. """ prioritized_subsets = sorted( [(status, subset) for status, subset in subsets.items()], key=lambda t: PARTITION_RANGE_STATUS_PRIORITY.index(t[0]), ) # progressively add lower priority time windows to the list of higher priority time windows flattened_time_window_statuses = [] for status, subset in prioritized_subsets: subset_time_window_statuses = [ PartitionTimeWindowStatus(tw, status) for tw in subset.included_time_windows ] flattened_time_window_statuses = _flatten( flattened_time_window_statuses, subset_time_window_statuses ) return flattened_time_window_statuses def has_one_dimension_time_window_partitioning( partitions_def: PartitionsDefinition, ) -> bool: from .multi_dimensional_partitions import MultiPartitionsDefinition if isinstance(partitions_def, TimeWindowPartitionsDefinition): return True if isinstance(partitions_def, MultiPartitionsDefinition): time_window_dims = [ dim for dim in partitions_def.partitions_defs if isinstance(dim.partitions_def, TimeWindowPartitionsDefinition) ] if len(time_window_dims) == 1: return True return False def get_time_partitions_def( partitions_def: Optional[PartitionsDefinition], ) -> Optional[TimeWindowPartitionsDefinition]: """For a given PartitionsDefinition, return the associated TimeWindowPartitionsDefinition if it exists. """ from .multi_dimensional_partitions import MultiPartitionsDefinition if partitions_def is None: return None elif isinstance(partitions_def, TimeWindowPartitionsDefinition): return partitions_def elif isinstance( partitions_def, MultiPartitionsDefinition ) and has_one_dimension_time_window_partitioning(partitions_def): return cast( TimeWindowPartitionsDefinition, partitions_def.time_window_dimension.partitions_def ) else: return None def get_time_partition_key( partitions_def: Optional[PartitionsDefinition], partition_key: Optional[str] ) -> str: from .multi_dimensional_partitions import MultiPartitionsDefinition if partitions_def is None or partition_key is None: check.failed( "Cannot get time partitions key from when partitions def is None or partition key is" " None" ) elif isinstance(partitions_def, TimeWindowPartitionsDefinition): return partition_key elif isinstance(partitions_def, MultiPartitionsDefinition): return partitions_def.get_partition_key_from_str(partition_key).keys_by_dimension[ ] else: check.failed(f"Cannot get time partition from non-time partitions def {partitions_def}")