851 lines
32 KiB
Python
851 lines
32 KiB
Python
"""Sensor platform for garbage_collection."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import date, datetime, time, timedelta
|
|
from typing import Any, Dict, Generator
|
|
|
|
from dateutil.easter import easter
|
|
from dateutil.relativedelta import relativedelta
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import (
|
|
ATTR_DEVICE_CLASS,
|
|
ATTR_HIDDEN,
|
|
CONF_ENTITIES,
|
|
CONF_NAME,
|
|
WEEKDAYS,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.entity import DeviceInfo
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.helpers.restore_state import RestoreEntity
|
|
|
|
from . import const, helpers
|
|
from .calendar import EntitiesCalendarData
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
SCAN_INTERVAL = timedelta(seconds=10)
|
|
THROTTLE_INTERVAL = timedelta(seconds=60)
|
|
|
|
|
|
async def async_setup_entry(
|
|
_: HomeAssistant, config_entry: ConfigEntry, async_add_devices: AddEntitiesCallback
|
|
) -> None:
|
|
"""Create garbage collection entities defined in config_flow and add them to HA."""
|
|
frequency = config_entry.options.get(const.CONF_FREQUENCY)
|
|
name = (
|
|
config_entry.title
|
|
if config_entry.title is not None
|
|
else config_entry.data.get(CONF_NAME)
|
|
)
|
|
_frequency_function = {
|
|
"weekly": WeeklyCollection,
|
|
"even-weeks": WeeklyCollection,
|
|
"odd-weeks": WeeklyCollection,
|
|
"every-n-weeks": WeeklyCollection,
|
|
"every-n-days": DailyCollection,
|
|
"monthly": MonthlyCollection,
|
|
"annual": AnnualCollection,
|
|
"group": GroupCollection,
|
|
"blank": BlankCollection,
|
|
}
|
|
if frequency in _frequency_function:
|
|
add_devices = _frequency_function[frequency]
|
|
async_add_devices([add_devices(config_entry)], True)
|
|
else:
|
|
_LOGGER.error("(%s) Unknown frequency %s", name, frequency)
|
|
raise ValueError
|
|
|
|
|
|
class GarbageCollection(RestoreEntity):
|
|
"""GarbageCollection Sensor class."""
|
|
|
|
__slots__ = (
|
|
"_attr_icon",
|
|
"_attr_name",
|
|
"_attr_state",
|
|
"_collection_dates",
|
|
"_date_format",
|
|
"_days",
|
|
"_first_month",
|
|
"_hidden",
|
|
"_holiday_dates",
|
|
"_move_holidays",
|
|
"_icon_normal",
|
|
"_icon_today",
|
|
"_icon_tomorrow",
|
|
"_last_month",
|
|
"_last_updated",
|
|
"_manual",
|
|
"_next_date",
|
|
"_verbose_format",
|
|
"_verbose_state",
|
|
"config_entry",
|
|
"expire_after",
|
|
"last_collection",
|
|
)
|
|
|
|
def __init__(self, config_entry: ConfigEntry) -> None:
|
|
"""Read configuration and initialise class variables."""
|
|
config = config_entry.options
|
|
self.config_entry = config_entry
|
|
self._attr_name = (
|
|
config_entry.title
|
|
if config_entry.title is not None
|
|
else config.get(CONF_NAME)
|
|
)
|
|
self._hidden = config.get(ATTR_HIDDEN, False)
|
|
self._manual = config.get(const.CONF_MANUAL)
|
|
first_month = config.get(const.CONF_FIRST_MONTH, const.DEFAULT_FIRST_MONTH)
|
|
months = [m["value"] for m in const.MONTH_OPTIONS]
|
|
self._first_month: int = (
|
|
months.index(first_month) + 1 if first_month in months else 1
|
|
)
|
|
last_month = config.get(const.CONF_LAST_MONTH, const.DEFAULT_LAST_MONTH)
|
|
self._last_month: int = (
|
|
months.index(last_month) + 1 if last_month in months else 12
|
|
)
|
|
self._verbose_state = config.get(const.CONF_VERBOSE_STATE)
|
|
self._icon_normal = config.get(const.CONF_ICON_NORMAL)
|
|
self._icon_today = config.get(const.CONF_ICON_TODAY)
|
|
self._icon_tomorrow = config.get(const.CONF_ICON_TOMORROW)
|
|
exp = config.get(const.CONF_EXPIRE_AFTER)
|
|
self.expire_after: time | None = (
|
|
None
|
|
if (
|
|
exp is None
|
|
or datetime.strptime(exp, "%H:%M:%S").time() == time(0, 0, 0)
|
|
)
|
|
else datetime.strptime(exp, "%H:%M:%S").time()
|
|
)
|
|
self._date_format = config.get(
|
|
const.CONF_DATE_FORMAT, const.DEFAULT_DATE_FORMAT
|
|
)
|
|
self._verbose_format = config.get(
|
|
const.CONF_VERBOSE_FORMAT, const.DEFAULT_VERBOSE_FORMAT
|
|
)
|
|
self._holiday_dates: set[tuple[int, int]] = set()
|
|
self._move_holidays = bool(config.get(const.CONF_MOVE_COUNTRY_HOLIDAYS, False))
|
|
if self._move_holidays:
|
|
raw_dates = config.get(
|
|
const.CONF_HOLIDAY_DATES, const.DEFAULT_HOLIDAY_DATES
|
|
)
|
|
for chunk in raw_dates.split(","):
|
|
chunk = chunk.strip()
|
|
if not chunk:
|
|
continue
|
|
try:
|
|
month_str, day_str = chunk.split("-")
|
|
self._holiday_dates.add((int(month_str), int(day_str)))
|
|
except ValueError:
|
|
_LOGGER.error(
|
|
"(%s) Invalid holiday date '%s', expected format MM-DD",
|
|
self._attr_name,
|
|
chunk,
|
|
)
|
|
self._collection_dates: list[date] = []
|
|
self._next_date: date | None = None
|
|
self._last_updated: datetime | None = None
|
|
self.last_collection: datetime | None = None
|
|
self._days: int | None = None
|
|
self._attr_state = "" if bool(self._verbose_state) else 2
|
|
self._attr_icon = self._icon_normal
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""When sensor is added to hassio, add it to calendar."""
|
|
await super().async_added_to_hass()
|
|
self.hass.data[const.DOMAIN][const.SENSOR_PLATFORM][self.entity_id] = self
|
|
|
|
# Restore stored state
|
|
if (state := await self.async_get_last_state()) is not None:
|
|
self._last_updated = None # Unblock update - after options change
|
|
self._attr_state = state.state
|
|
self._days = (
|
|
state.attributes[const.ATTR_DAYS]
|
|
if const.ATTR_DAYS in state.attributes
|
|
else None
|
|
)
|
|
next_date = (
|
|
helpers.parse_datetime(state.attributes[const.ATTR_NEXT_DATE])
|
|
if const.ATTR_NEXT_DATE in state.attributes
|
|
else None
|
|
)
|
|
self._next_date = None if next_date is None else next_date.date()
|
|
self.last_collection = (
|
|
helpers.parse_datetime(state.attributes[const.ATTR_LAST_COLLECTION])
|
|
if const.ATTR_LAST_COLLECTION in state.attributes
|
|
else None
|
|
)
|
|
|
|
# Create device
|
|
device_registry = dr.async_get(self.hass)
|
|
device_registry.async_get_or_create(
|
|
config_entry_id=self.config_entry.entry_id,
|
|
identifiers={(const.DOMAIN, self.unique_id)},
|
|
name=self._attr_name,
|
|
manufacturer="bruxy70",
|
|
)
|
|
|
|
# Create or add to calendar
|
|
if not self.hidden:
|
|
if const.CALENDAR_PLATFORM not in self.hass.data[const.DOMAIN]:
|
|
self.hass.data[const.DOMAIN][
|
|
const.CALENDAR_PLATFORM
|
|
] = EntitiesCalendarData(self.hass)
|
|
_LOGGER.debug("Creating garbage_collection calendar")
|
|
await self.hass.config_entries.async_forward_entry_setups(
|
|
self.config_entry, [const.CALENDAR_PLATFORM]
|
|
)
|
|
|
|
self.hass.data[const.DOMAIN][const.CALENDAR_PLATFORM].add_entity(
|
|
self.entity_id
|
|
)
|
|
|
|
async def async_will_remove_from_hass(self) -> None:
|
|
"""When sensor is added to hassio, remove it."""
|
|
await super().async_will_remove_from_hass()
|
|
del self.hass.data[const.DOMAIN][const.SENSOR_PLATFORM][self.entity_id]
|
|
self.hass.data[const.DOMAIN][const.CALENDAR_PLATFORM].remove_entity(
|
|
self.entity_id
|
|
)
|
|
|
|
@property
|
|
def unique_id(self) -> str:
|
|
"""Return a unique ID to use for this sensor."""
|
|
if "unique_id" in self.config_entry.data: # From legacy config
|
|
return self.config_entry.data["unique_id"]
|
|
return self.config_entry.entry_id
|
|
|
|
@property
|
|
def device_info(self) -> DeviceInfo | None:
|
|
"""Return device info."""
|
|
return {
|
|
"identifiers": {(const.DOMAIN, self.unique_id)},
|
|
"name": self.config_entry.data.get("name"),
|
|
"manufacturer": "bruxy70",
|
|
}
|
|
|
|
@property
|
|
def name(self) -> str | None:
|
|
"""Return the name of the sensor."""
|
|
return self._attr_name
|
|
|
|
@property
|
|
def next_date(self) -> date | None:
|
|
"""Return next date attribute."""
|
|
return self._next_date
|
|
|
|
@property
|
|
def hidden(self) -> bool:
|
|
"""Return the hidden attribute."""
|
|
return self._hidden
|
|
|
|
@property
|
|
def native_unit_of_measurement(self) -> str | None:
|
|
"""Return unit of measurement - None for numerical value."""
|
|
return None
|
|
|
|
@property
|
|
def native_value(self) -> object:
|
|
"""Return the state of the sensor."""
|
|
return self._attr_state
|
|
|
|
@property
|
|
def last_updated(self) -> datetime | None:
|
|
"""Return when the sensor was last updated."""
|
|
return self._last_updated
|
|
|
|
@property
|
|
def icon(self) -> str:
|
|
"""Return the entity icon."""
|
|
return self._attr_icon
|
|
|
|
@property
|
|
def extra_state_attributes(self) -> Dict[str, Any]:
|
|
"""Return the state attributes."""
|
|
state_attr = {
|
|
const.ATTR_DAYS: self._days,
|
|
const.ATTR_LAST_COLLECTION: self.last_collection,
|
|
const.ATTR_LAST_UPDATED: self._last_updated,
|
|
const.ATTR_NEXT_DATE: None
|
|
if self._next_date is None
|
|
else datetime(
|
|
self._next_date.year, self._next_date.month, self._next_date.day
|
|
).astimezone(),
|
|
# Needed for translations to work
|
|
ATTR_DEVICE_CLASS: self.DEVICE_CLASS,
|
|
}
|
|
return state_attr
|
|
|
|
@property
|
|
def DEVICE_CLASS(self) -> str: # pylint: disable=C0103
|
|
"""Return the class of the sensor."""
|
|
return const.DEVICE_CLASS
|
|
|
|
def __repr__(self) -> str:
|
|
"""Return main sensor parameters."""
|
|
return (
|
|
f"{self.__class__.__name__}(name={self._attr_name}, "
|
|
f"entity_id={self.entity_id}, "
|
|
f"state={self.state}, "
|
|
f"attributes={self.extra_state_attributes})"
|
|
)
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
"""Find the next possible date starting from day1.
|
|
|
|
Only based on calendar, not looking at include/exclude days.
|
|
Must be implemented for each child class.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def _async_ready_for_update(self) -> bool:
|
|
"""Check if the entity is ready for the update.
|
|
|
|
Skip the update if the sensor was updated today
|
|
Except for the sensors with with next date today and after the expiration time
|
|
"""
|
|
current_date_time = helpers.now()
|
|
today = current_date_time.date()
|
|
try:
|
|
ready_for_update = bool(self._last_updated.date() != today) # type: ignore
|
|
except AttributeError:
|
|
return True
|
|
try:
|
|
if self._next_date == today and (
|
|
(
|
|
isinstance(self.expire_after, time)
|
|
and current_date_time.time() >= self.expire_after
|
|
)
|
|
or (
|
|
isinstance(self.last_collection, datetime)
|
|
and self.last_collection.date() == today
|
|
)
|
|
):
|
|
return True
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
return ready_for_update
|
|
|
|
def date_inside(self, dat: date) -> bool:
|
|
"""Check if the date is inside first and last date."""
|
|
month = dat.month
|
|
if self._first_month <= self._last_month:
|
|
return bool(self._first_month <= month <= self._last_month)
|
|
return bool(self._first_month <= month or month <= self._last_month)
|
|
|
|
def move_to_range(self, day: date) -> date:
|
|
"""If the date is not in range, move to the range."""
|
|
if not self.date_inside(day):
|
|
year = day.year
|
|
month = day.month
|
|
months = [m["label"] for m in const.MONTH_OPTIONS]
|
|
if self._first_month <= self._last_month < month:
|
|
_LOGGER.debug(
|
|
"(%s) %s outside the range, lookig from %s next year",
|
|
self._attr_name,
|
|
day,
|
|
months[self._first_month - 1],
|
|
)
|
|
return date(year + 1, self._first_month, 1)
|
|
_LOGGER.debug(
|
|
"(%s) %s outside the range, searching from %s",
|
|
self._attr_name,
|
|
day,
|
|
months[self._first_month - 1],
|
|
)
|
|
return date(year, self._first_month, 1)
|
|
return day
|
|
|
|
def collection_schedule(
|
|
self, date1: date | None = None, date2: date | None = None
|
|
) -> Generator[date, None, None]:
|
|
"""Get dates within configured date range."""
|
|
today = helpers.now().date()
|
|
first_date: date = date(today.year - 1, 1, 1) if date1 is None else date1
|
|
last_date: date = date(today.year + 1, 12, 31) if date2 is None else date2
|
|
first_date = self.move_to_range(first_date)
|
|
while True:
|
|
try:
|
|
next_date = self._find_candidate_date(first_date)
|
|
except (TypeError, ValueError):
|
|
return
|
|
if next_date is None or next_date > last_date:
|
|
return
|
|
if (new_date := self.move_to_range(next_date)) != next_date:
|
|
first_date = new_date # continue from next year
|
|
else:
|
|
yield next_date
|
|
first_date = next_date + relativedelta(days=1) # look from the next day
|
|
|
|
def _is_exception_date(self, day: date) -> bool:
|
|
"""Check if a date matches a configured fixed exception, or a movable one.
|
|
|
|
Fixed exceptions come from the configured MM-DD list (e.g. Christmas,
|
|
New Year, Labour Day for collectors that only shift on those).
|
|
Movable exceptions are always checked too: Easter Monday and Whit
|
|
Monday (Pentecost Monday) are the only French public holidays that
|
|
ever fall on a Monday, which only matters for Monday-based schedules
|
|
and is harmless to check for any other day.
|
|
"""
|
|
if (day.month, day.day) in self._holiday_dates:
|
|
return True
|
|
easter_sunday = easter(day.year)
|
|
easter_monday = easter_sunday + timedelta(days=1)
|
|
pentecost_monday = easter_sunday + timedelta(days=50)
|
|
return day in (easter_monday, pentecost_monday)
|
|
|
|
def _shift_for_holiday(self, collection_date: date) -> date:
|
|
"""Move the date forward by one day for each matching exception date.
|
|
|
|
Cascades forward if the new day also matches an exception date.
|
|
"""
|
|
if not self._move_holidays:
|
|
return collection_date
|
|
shifted = collection_date
|
|
while self._is_exception_date(shifted):
|
|
shifted = shifted + timedelta(days=1)
|
|
if shifted != collection_date:
|
|
_LOGGER.debug(
|
|
"(%s) %s shifted to %s because of a configured exception date",
|
|
self._attr_name,
|
|
collection_date,
|
|
shifted,
|
|
)
|
|
return shifted
|
|
|
|
async def _async_load_collection_dates(self) -> None:
|
|
"""Fill the collection dates list."""
|
|
self._collection_dates.clear()
|
|
for collection_date in self.collection_schedule():
|
|
collection_date = self._shift_for_holiday(collection_date)
|
|
if collection_date not in self._collection_dates:
|
|
self._collection_dates.append(collection_date)
|
|
self._collection_dates.sort()
|
|
|
|
async def add_date(self, collection_date: date) -> None:
|
|
"""Add date to _collection_dates."""
|
|
if collection_date not in self._collection_dates:
|
|
self._collection_dates.append(collection_date)
|
|
self._collection_dates.sort()
|
|
else:
|
|
_LOGGER.warning(
|
|
"%s not added to %s - already on the collection schedule",
|
|
collection_date,
|
|
self.name,
|
|
)
|
|
|
|
async def remove_date(self, collection_date: date) -> None:
|
|
"""Remove date from _collection dates."""
|
|
try:
|
|
self._collection_dates.remove(collection_date)
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"%s not removed from %s - not in the collection schedule",
|
|
collection_date,
|
|
self.name,
|
|
)
|
|
|
|
def get_next_date(self, first_date: date, ignore_today=False) -> date | None:
|
|
"""Get next date from self._collection_dates."""
|
|
current_date_time = helpers.now()
|
|
for d in self._collection_dates: # pylint: disable=invalid-name
|
|
if d < first_date:
|
|
continue
|
|
if not ignore_today and d == current_date_time.date():
|
|
expiration = (
|
|
self.expire_after
|
|
if self.expire_after is not None
|
|
else time(23, 59, 59)
|
|
)
|
|
if current_date_time.time() > expiration or (
|
|
self.last_collection is not None
|
|
and self.last_collection.date() == current_date_time.date()
|
|
and current_date_time.time() >= self.last_collection.time()
|
|
):
|
|
continue
|
|
return d
|
|
return None
|
|
|
|
async def async_update(self) -> None:
|
|
"""Get the latest data and updates the states."""
|
|
if not await self._async_ready_for_update() or not self.hass.is_running:
|
|
return
|
|
|
|
_LOGGER.debug("(%s) Calling update", self._attr_name)
|
|
await self._async_load_collection_dates()
|
|
_LOGGER.debug(
|
|
"(%s) Dates loaded, firing a garbage_collection_loaded event",
|
|
self._attr_name,
|
|
)
|
|
event_data = {
|
|
"entity_id": self.entity_id,
|
|
"collection_dates": helpers.dates_to_texts(self._collection_dates),
|
|
}
|
|
self.hass.bus.async_fire("garbage_collection_loaded", event_data)
|
|
if not self._manual:
|
|
self.update_state()
|
|
|
|
def update_state(self) -> None:
|
|
"""Pick the first event from collection dates, update attributes."""
|
|
_LOGGER.debug("(%s) Looking for next collection", self._attr_name)
|
|
self._last_updated = helpers.now()
|
|
today = self._last_updated.date()
|
|
self._next_date = self.get_next_date(today)
|
|
if self._next_date is not None:
|
|
_LOGGER.debug(
|
|
"(%s) next_date (%s), today (%s)",
|
|
self._attr_name,
|
|
self._next_date,
|
|
today,
|
|
)
|
|
self._days = (self._next_date - today).days
|
|
next_date_txt = self._next_date.strftime(self._date_format)
|
|
_LOGGER.debug(
|
|
"(%s) Found next collection date: %s, that is in %d days",
|
|
self._attr_name,
|
|
next_date_txt,
|
|
self._days,
|
|
)
|
|
if self._days > 1:
|
|
if bool(self._verbose_state):
|
|
self._attr_state = self._verbose_format.format(
|
|
date=next_date_txt, days=self._days
|
|
)
|
|
# self._attr_state = "on_date"
|
|
else:
|
|
self._attr_state = 2
|
|
self._attr_icon = self._icon_normal
|
|
else:
|
|
if self._days == 0:
|
|
if bool(self._verbose_state):
|
|
self._attr_state = const.STATE_TODAY
|
|
else:
|
|
self._attr_state = self._days
|
|
self._attr_icon = self._icon_today
|
|
elif self._days == 1:
|
|
if bool(self._verbose_state):
|
|
self._attr_state = const.STATE_TOMORROW
|
|
else:
|
|
self._attr_state = self._days
|
|
self._attr_icon = self._icon_tomorrow
|
|
else:
|
|
self._days = None
|
|
self._attr_state = None
|
|
self._attr_icon = self._icon_normal
|
|
|
|
|
|
class WeeklyCollection(GarbageCollection):
|
|
"""Collection every n weeks, odd weeks or even weeks."""
|
|
|
|
__slots__ = "_collection_days", "_first_week", "_period"
|
|
|
|
def __init__(self, config_entry: ConfigEntry) -> None:
|
|
"""Read parameters specific for Weekly Collection Frequency."""
|
|
super().__init__(config_entry)
|
|
config = config_entry.options
|
|
self._collection_days = config.get(const.CONF_COLLECTION_DAYS, [])
|
|
self._period: int
|
|
self._first_week: int
|
|
frequency = config.get(const.CONF_FREQUENCY)
|
|
if frequency == "weekly":
|
|
self._period = 1
|
|
self._first_week = 1
|
|
elif frequency == "even-weeks":
|
|
self._period = 2
|
|
self._first_week = 2
|
|
elif frequency == "odd-weeks":
|
|
self._period = 2
|
|
self._first_week = 1
|
|
else:
|
|
self._period = config.get(const.CONF_PERIOD, 1)
|
|
self._first_week = config.get(const.CONF_FIRST_WEEK, 1)
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
"""Calculate possible date, for weekly frequency."""
|
|
week = day1.isocalendar()[1]
|
|
weekday = day1.weekday()
|
|
offset = -1
|
|
if (week - self._first_week) % self._period == 0: # Collection this week
|
|
for day_name in self._collection_days:
|
|
day_index = WEEKDAYS.index(day_name)
|
|
if day_index >= weekday: # Collection still did not happen
|
|
offset = day_index - weekday
|
|
break
|
|
iterate_by_week = 7 - weekday + WEEKDAYS.index(self._collection_days[0])
|
|
while offset == -1: # look in following weeks
|
|
candidate = day1 + relativedelta(days=iterate_by_week)
|
|
week = candidate.isocalendar()[1]
|
|
if (week - self._first_week) % self._period == 0:
|
|
offset = iterate_by_week
|
|
break
|
|
iterate_by_week += 7
|
|
return day1 + relativedelta(days=offset)
|
|
|
|
|
|
class DailyCollection(GarbageCollection):
|
|
"""Collection every n days."""
|
|
|
|
__slots__ = "_first_date", "_period"
|
|
|
|
def __init__(self, config_entry: ConfigEntry) -> None:
|
|
"""Read parameters specific for Daily Collection Frequency."""
|
|
super().__init__(config_entry)
|
|
config = config_entry.options
|
|
self._period = config.get(const.CONF_PERIOD)
|
|
self._first_date: date | None
|
|
try:
|
|
self._first_date = helpers.to_date(config.get(const.CONF_FIRST_DATE))
|
|
except ValueError:
|
|
self._first_date = None
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
"""Calculate possible date, for every-n-days frequency."""
|
|
try:
|
|
if (day1 - self._first_date).days % self._period == 0: # type: ignore
|
|
return day1
|
|
offset = self._period - (
|
|
(day1 - self._first_date).days % self._period # type: ignore
|
|
)
|
|
except TypeError as error:
|
|
raise ValueError(
|
|
f"({self._attr_name}) Please configure first_date and period "
|
|
"for every-n-days collection frequency."
|
|
) from error
|
|
return day1 + relativedelta(days=offset)
|
|
|
|
|
|
class MonthlyCollection(GarbageCollection):
|
|
"""Collection every nth weekday of each month."""
|
|
|
|
__slots__ = (
|
|
"_collection_days",
|
|
"_monthly_force_week_numbers",
|
|
"_period",
|
|
"_weekday_order_numbers",
|
|
"_week_order_numbers",
|
|
)
|
|
|
|
def __init__(self, config_entry: ConfigEntry) -> None:
|
|
"""Read parameters specific for Monthly Collection Frequency."""
|
|
super().__init__(config_entry)
|
|
config = config_entry.options
|
|
self._collection_days = config.get(const.CONF_COLLECTION_DAYS, [])
|
|
self._monthly_force_week_numbers = config.get(
|
|
const.CONF_FORCE_WEEK_NUMBERS, False
|
|
)
|
|
self._weekday_order_numbers: list
|
|
self._week_order_numbers: list
|
|
order_numbers: list = []
|
|
if const.CONF_WEEKDAY_ORDER_NUMBER in config:
|
|
order_numbers = list(map(int, config[const.CONF_WEEKDAY_ORDER_NUMBER]))
|
|
if self._monthly_force_week_numbers:
|
|
self._weekday_order_numbers = []
|
|
self._week_order_numbers = order_numbers
|
|
else:
|
|
self._weekday_order_numbers = order_numbers
|
|
self._week_order_numbers = []
|
|
self._period = config.get(const.CONF_PERIOD, 1)
|
|
|
|
@staticmethod
|
|
def nth_week_date(
|
|
week_number: int, date_of_month: date, collection_day: int
|
|
) -> date:
|
|
"""Find weekday in the nth week of the month."""
|
|
first_of_month = date(date_of_month.year, date_of_month.month, 1)
|
|
return first_of_month + relativedelta(
|
|
days=collection_day - first_of_month.weekday() + (week_number - 1) * 7
|
|
)
|
|
|
|
@staticmethod
|
|
def nth_weekday_date(
|
|
weekday_number: int, date_of_month: date, collection_day: int
|
|
) -> date:
|
|
"""Find nth weekday of the month."""
|
|
first_of_month = date(date_of_month.year, date_of_month.month, 1)
|
|
# 1st of the month is before the day of collection
|
|
# (so 1st collection week the week when month starts)
|
|
if collection_day >= first_of_month.weekday():
|
|
return first_of_month + relativedelta(
|
|
days=collection_day
|
|
- first_of_month.weekday()
|
|
+ (weekday_number - 1) * 7
|
|
)
|
|
return first_of_month + relativedelta(
|
|
days=7
|
|
- first_of_month.weekday()
|
|
+ collection_day
|
|
+ (weekday_number - 1) * 7
|
|
)
|
|
|
|
def _monthly_candidate(self, day1: date) -> date:
|
|
"""Calculate possible date, for monthly frequency."""
|
|
if self._monthly_force_week_numbers:
|
|
for week_order_number in self._week_order_numbers:
|
|
candidate_date = MonthlyCollection.nth_week_date(
|
|
week_order_number, day1, WEEKDAYS.index(self._collection_days[0])
|
|
)
|
|
# date is today or in the future -> we have the date
|
|
if candidate_date >= day1:
|
|
return candidate_date
|
|
else:
|
|
for weekday_order_number in self._weekday_order_numbers:
|
|
candidate_date = MonthlyCollection.nth_weekday_date(
|
|
weekday_order_number,
|
|
day1,
|
|
WEEKDAYS.index(self._collection_days[0]),
|
|
)
|
|
# date is today or in the future -> we have the date
|
|
if candidate_date >= day1:
|
|
return candidate_date
|
|
if day1.month == 12:
|
|
next_collection_month = date(day1.year + 1, 1, 1)
|
|
else:
|
|
next_collection_month = date(day1.year, day1.month + 1, 1)
|
|
if self._monthly_force_week_numbers:
|
|
return MonthlyCollection.nth_week_date(
|
|
self._week_order_numbers[0],
|
|
next_collection_month,
|
|
WEEKDAYS.index(self._collection_days[0]),
|
|
)
|
|
return MonthlyCollection.nth_weekday_date(
|
|
self._weekday_order_numbers[0],
|
|
next_collection_month,
|
|
WEEKDAYS.index(self._collection_days[0]),
|
|
)
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
if self._period is None or self._period == 1:
|
|
return self._monthly_candidate(day1)
|
|
else:
|
|
candidate_date = self._monthly_candidate(day1)
|
|
while (candidate_date.month - self._first_month) % self._period != 0:
|
|
candidate_date = self._monthly_candidate(
|
|
candidate_date + relativedelta(days=1)
|
|
)
|
|
return candidate_date
|
|
|
|
|
|
class AnnualCollection(GarbageCollection):
|
|
"""Collection every year."""
|
|
|
|
__slots__ = ("_date",)
|
|
|
|
def __init__(self, config_entry: ConfigEntry) -> None:
|
|
"""Read parameters specific for Annual Collection Frequency."""
|
|
super().__init__(config_entry)
|
|
config = config_entry.options
|
|
self._date = config.get(const.CONF_DATE)
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
"""Calculate possible date, for annual frequency."""
|
|
year = day1.year
|
|
try:
|
|
conf_date = datetime.strptime(self._date, "%m/%d").date()
|
|
except TypeError as error:
|
|
raise ValueError(
|
|
f"({self._attr_name}) Please configure the date "
|
|
"for annual collection frequency."
|
|
) from error
|
|
if (candidate_date := date(year, conf_date.month, conf_date.day)) < day1:
|
|
candidate_date = date(year + 1, conf_date.month, conf_date.day)
|
|
return candidate_date
|
|
|
|
|
|
class GroupCollection(GarbageCollection):
|
|
"""Group number of sensors."""
|
|
|
|
__slots__ = ("_entities",)
|
|
|
|
def __init__(self, config_entry: ConfigEntry) -> None:
|
|
"""Read parameters specific for Group Collection Frequency."""
|
|
super().__init__(config_entry)
|
|
config = config_entry.options
|
|
self._entities = config.get(CONF_ENTITIES, [])
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
"""Calculate possible date, for group frequency."""
|
|
candidate_date = None
|
|
try:
|
|
for entity_id in self._entities:
|
|
entity: GarbageCollection = self.hass.data[const.DOMAIN][
|
|
const.SENSOR_PLATFORM
|
|
][entity_id]
|
|
next_date = entity.get_next_date(day1)
|
|
if next_date is not None and (
|
|
candidate_date is None or next_date < candidate_date
|
|
):
|
|
candidate_date = next_date
|
|
except KeyError as error:
|
|
raise ValueError from error
|
|
except TypeError as error:
|
|
_LOGGER.error("(%s) Please add entities for the group.", self._attr_name)
|
|
raise ValueError from error
|
|
return candidate_date
|
|
|
|
async def _async_ready_for_update(self) -> bool:
|
|
"""Check if the entity is ready for the update.
|
|
|
|
For group sensors wait for update of the sensors in the group
|
|
"""
|
|
current_date_time = helpers.now()
|
|
today = current_date_time.date()
|
|
try:
|
|
ready_for_update = bool(self._last_updated.date() != today) # type: ignore
|
|
except AttributeError:
|
|
ready_for_update = True
|
|
members_ready = True
|
|
for entity_id in self._entities:
|
|
try:
|
|
entity: GarbageCollection = self.hass.data[const.DOMAIN][
|
|
const.SENSOR_PLATFORM
|
|
][entity_id]
|
|
await entity.async_update()
|
|
except KeyError:
|
|
members_ready = False
|
|
break
|
|
if (last_updated := entity.last_updated) is None:
|
|
ready_for_update = True
|
|
continue
|
|
# Wait for all members to get updated
|
|
if last_updated.date() != today:
|
|
members_ready = False
|
|
break
|
|
# A member got updated after the group update
|
|
if self._last_updated is None or last_updated > self._last_updated:
|
|
ready_for_update = True
|
|
if ready_for_update and not members_ready:
|
|
ready_for_update = False
|
|
return ready_for_update
|
|
|
|
|
|
class BlankCollection(GarbageCollection):
|
|
"""No collection - for mnual update."""
|
|
|
|
def _find_candidate_date(self, day1: date) -> date | None:
|
|
"""Do not return any date for blank frequency."""
|
|
return None
|
|
|
|
async def _async_load_collection_dates(self) -> None:
|
|
"""Clear collection dates (filled in by the blueprint)."""
|
|
self._collection_dates.clear()
|
|
return
|
|
|
|
async def async_update(self) -> None:
|
|
"""Get the latest data and updates the states."""
|
|
if not await self._async_ready_for_update() or not self.hass.is_running:
|
|
return
|
|
|
|
_LOGGER.debug("(%s) Calling update", self._attr_name)
|
|
await self._async_load_collection_dates()
|
|
_LOGGER.debug(
|
|
"(%s) Dates loaded, firing a garbage_collection_loaded event",
|
|
self._attr_name,
|
|
)
|
|
event_data = {
|
|
"entity_id": self.entity_id,
|
|
"collection_dates": [],
|
|
}
|
|
self.hass.bus.async_fire("garbage_collection_loaded", event_data) |