This commit is contained in:
Thomas Luther 2024-04-16 19:07:59 +02:00
parent 3c88d4152a
commit 336beacfe3
9 changed files with 248 additions and 93 deletions

2
Pipfile.lock generated
View File

@ -301,7 +301,7 @@
"sha256:c05567e9c24a6b9faaa835c4821bad0590fbb9d5779e7caa6e1cc4978e7eb24f" "sha256:c05567e9c24a6b9faaa835c4821bad0590fbb9d5779e7caa6e1cc4978e7eb24f"
], ],
"markers": "python_version >= '3.5'", "markers": "python_version >= '3.5'",
"version": "==3.6" "version": ">=3.7"
}, },
"multidict": { "multidict": {
"hashes": [ "hashes": [

View File

@ -0,0 +1 @@
"""Init for api."""

View File

@ -7,6 +7,7 @@ pip install aiohttp
from __future__ import annotations from __future__ import annotations
from asyncio import sleep
from base64 import b64encode from base64 import b64encode
import contextlib import contextlib
import copy import copy
@ -255,6 +256,18 @@ class SolixParmType(Enum):
SOLARBANK_SCHEDULE = "4" SOLARBANK_SCHEDULE = "4"
@dataclass(frozen=True)
class ApiCategories:
"""Dataclass to specify supported Api categorties for regular Api cache refresh cycles."""
site_price: str = "site_price"
device_auto_upgrade: str = "device_auto_upgrade"
solarbank_energy: str = "solarbank_energy"
solarbank_fittings: str = "solarbank_fittings"
solarbank_cutoff: str = "solarbank_cutoff"
solarbank_solar_info: str = "solarbank_solar_info"
@dataclass(frozen=True) @dataclass(frozen=True)
class SolixDeviceCapacity: class SolixDeviceCapacity:
"""Dataclass for Anker Solix device capacities in Wh by Part Number.""" """Dataclass for Anker Solix device capacities in Wh by Part Number."""
@ -282,11 +295,12 @@ class SolixDeviceCapacity:
class SolixDeviceCategory: class SolixDeviceCategory:
"""Dataclass for Anker Solix device types by Part Number to be used for standalone/unbound device categorization.""" """Dataclass for Anker Solix device types by Part Number to be used for standalone/unbound device categorization."""
# Solarbanks
A17C0: str = SolixDeviceType.SOLARBANK.value # SOLIX E1600 Solarbank A17C0: str = SolixDeviceType.SOLARBANK.value # SOLIX E1600 Solarbank
# Inverter
A5140: str = SolixDeviceType.INVERTER.value # MI60 Inverter A5140: str = SolixDeviceType.INVERTER.value # MI60 Inverter
A5143: str = SolixDeviceType.INVERTER.value # MI80 Inverter A5143: str = SolixDeviceType.INVERTER.value # MI80 Inverter
# Portable Power Stations (PPS)
A1720: str = ( A1720: str = (
SolixDeviceType.PPS.value SolixDeviceType.PPS.value
) # Anker PowerHouse 521 Portable Power Station ) # Anker PowerHouse 521 Portable Power Station
@ -310,9 +324,9 @@ class SolixDeviceCategory:
) # SOLIX F2000 Portable Power Station (PowerHouse 767) ) # SOLIX F2000 Portable Power Station (PowerHouse 767)
A1781: str = SolixDeviceType.PPS.value # SOLIX F2600 Portable Power Station A1781: str = SolixDeviceType.PPS.value # SOLIX F2600 Portable Power Station
A1790: str = SolixDeviceType.PPS.value # SOLIX F3800 Portable Power Station A1790: str = SolixDeviceType.PPS.value # SOLIX F3800 Portable Power Station
# Home Power Panels
A17B1: str = SolixDeviceType.POWERPANEL.value # SOLIX Home Power Panel A17B1: str = SolixDeviceType.POWERPANEL.value # SOLIX Home Power Panel
# Power Cooler
A17A0: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 30 A17A0: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 30
A17A1: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 40 A17A1: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 40
A17A2: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 50 A17A2: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 50
@ -322,13 +336,20 @@ class SolixDeviceCategory:
class SolixDefaults: class SolixDefaults:
"""Dataclass for Anker Solix defaults to be used.""" """Dataclass for Anker Solix defaults to be used."""
# Output Power presets for Solarbank schedule timeslot settings
PRESET_MIN: int = 0 PRESET_MIN: int = 0
PRESET_MAX: int = 800 PRESET_MAX: int = 800
PRESET_DEF: int = 100 PRESET_DEF: int = 100
# Export Switch preset for Solarbank schedule timeslot settings
ALLOW_EXPORT: bool = True ALLOW_EXPORT: bool = True
# Charge Priority limit preset for Solarbank schedule timeslot settings
CHARGE_PRIORITY_MIN: int = 0 CHARGE_PRIORITY_MIN: int = 0
CHARGE_PRIORITY_MAX: int = 100 CHARGE_PRIORITY_MAX: int = 100
CHARGE_PRIORITY_DEF: int = 80 CHARGE_PRIORITY_DEF: int = 80
# Seconds delay for subsequent Api requests in methods to update the Api cache dictionaries
REQUEST_DELAY_MIN: float = 0.0
REQUEST_DELAY_MAX: float = 5.0
REQUEST_DELAY_DEF: float = 0.3
class SolixDeviceStatus(Enum): class SolixDeviceStatus(Enum):
@ -363,11 +384,47 @@ class SolarbankTimeslot:
start_time: datetime start_time: datetime
end_time: datetime end_time: datetime
appliance_load: int | None = None # mapped to appliance_loads setting using a default 50% share for dual solarbank setups appliance_load: int | None = (
None # mapped to appliance_loads setting using a default 50% share for dual solarbank setups
)
allow_export: bool | None = None # mapped to the turn_on boolean allow_export: bool | None = None # mapped to the turn_on boolean
charge_priority_limit: int | None = None # mapped to charge_priority setting charge_priority_limit: int | None = None # mapped to charge_priority setting
class RequestCounter:
"""Counter for datetime entries in last minute and last hour."""
def __init__(
self,
) -> None:
"""Initialize."""
self.elements: list = []
def __str__(self) -> str:
"""Print the counters."""
return f"{self.last_hour()} last hour, {self.last_minute()} last minute"
def add(self, request_time: datetime = datetime.now()) -> None:
"""Add new timestamp to end of counter."""
self.elements.append(request_time)
# limit the counter entries to 1 hour when adding new
self.recycle()
def recycle(self, last_time: datetime = datetime.now() - timedelta(hours=1)) -> None:
"""Remove oldest timestamps from beginning of counter until last_time is reached, default is 1 hour ago."""
self.elements = [x for x in self.elements if x > last_time]
def last_minute(self) -> int:
"""Get numnber of timestamps for last minute."""
last_time = datetime.now() - timedelta(minutes=1)
return len([x for x in self.elements if x > last_time])
def last_hour(self) -> int:
"""Get numnber of timestamps for last minute."""
last_time = datetime.now() - timedelta(hours=1)
return len([x for x in self.elements if x > last_time])
class AnkerSolixApi: class AnkerSolixApi:
"""Define the API class to handle Anker server authentication and API requests, along with the last state of queried site details and Device information.""" """Define the API class to handle Anker server authentication and API requests, along with the last state of queried site details and Device information."""
@ -420,10 +477,15 @@ class AnkerSolixApi:
self._token_expiration: datetime | None = None self._token_expiration: datetime | None = None
self._login_response: dict = {} self._login_response: dict = {}
self.mask_credentials: bool = True self.mask_credentials: bool = True
self.encrypt_body: bool = False
self.request_count: RequestCounter = RequestCounter()
self._request_delay: float = SolixDefaults.REQUEST_DELAY_DEF
self._last_request_time: datetime | None = None
# Define Encryption for password, using ECDH assymetric key exchange for shared secret calculation, which must be used to encrypt the password using AES-256-CBC with seed of 16 # Define Encryption for password, using ECDH assymetric key exchange for shared secret calculation, which must be used to encrypt the password using AES-256-CBC with seed of 16
# uncompressed public key from EU Anker server in the format 04 [32 byte x vlaue] [32 byte y value] # uncompressed public key from EU Anker server in the format 04 [32 byte x vlaue] [32 byte y value]
# TODO(2): COM Anker server public key usage must still be validated # Both, the EU and COM Anker server public key is the same and login response is provided for both upon an authentication request
# However, if country ID assignment is to wrong server, no sites or devices will be listed for the authenticated account.
self._api_public_key_hex = "04c5c00c4f8d1197cc7c3167c52bf7acb054d722f0ef08dcd7e0883236e0d72a3868d9750cb47fa4619248f3d83f0f662671dadc6e2d31c2f41db0161651c7c076" self._api_public_key_hex = "04c5c00c4f8d1197cc7c3167c52bf7acb054d722f0ef08dcd7e0883236e0d72a3868d9750cb47fa4619248f3d83f0f662671dadc6e2d31c2f41db0161651c7c076"
self._curve = ( self._curve = (
ec.SECP256R1() ec.SECP256R1()
@ -515,13 +577,12 @@ class AnkerSolixApi:
), ),
) )
return data return data
return {}
except OSError as err: except OSError as err:
self._logger.error( self._logger.error(
"ERROR: Failed to load JSON from file %s", masked_filename "ERROR: Failed to load JSON from file %s", masked_filename
) )
self._logger.error(err) self._logger.error(err)
return {} return {}
def _saveToFile(self, filename: str, data: dict = None) -> bool: def _saveToFile(self, filename: str, data: dict = None) -> bool:
"""Save json data to given file for testing.""" """Save json data to given file for testing."""
@ -810,11 +871,44 @@ class AnkerSolixApi:
def logLevel(self, level: int = None) -> int: def logLevel(self, level: int = None) -> int:
"""Get or set the logger log level.""" """Get or set the logger log level."""
if level: if level is not None and isinstance(level, int):
self._logger.setLevel(level) self._logger.setLevel(level)
self._logger.info("Set log level to: %s", level) self._logger.info("Set log level to: %s", level)
return self._logger.getEffectiveLevel() return self._logger.getEffectiveLevel()
def requestDelay(self, delay: float = None) -> float:
"""Get or set the api request delay in seconds."""
if (
delay is not None
and isinstance(delay, (float,int))
and delay != self._request_delay
):
self._request_delay = min(
SolixDefaults.REQUEST_DELAY_MAX,
max(SolixDefaults.REQUEST_DELAY_MIN, delay),
)
self._logger.info(
"Set api request delay to %.3f seconds", self._request_delay
)
return self._request_delay
async def _wait_delay(self, delay: float = None) -> None:
"""Wait at least for the defined Api request delay or for the provided delay in seconds since the last request occured."""
if delay is not None and isinstance(delay, float):
delay = min(
SolixDefaults.REQUEST_DELAY_MAX,
max(SolixDefaults.REQUEST_DELAY_MIN, delay),
)
else:
delay = self._request_delay
if isinstance(self._last_request_time, datetime):
await sleep(
max(
0,
delay - (datetime.now() - self._last_request_time).total_seconds(),
)
)
async def async_authenticate(self, restart: bool = False) -> bool: async def async_authenticate(self, restart: bool = False) -> bool:
"""Authenticate with server and get an access token. If restart is not enforced, cached login data may be used to obtain previous token.""" """Authenticate with server and get an access token. If restart is not enforced, cached login data may be used to obtain previous token."""
if restart: if restart:
@ -945,6 +1039,26 @@ class AnkerSolixApi:
if self._token: if self._token:
mergedHeaders.update({"x-auth-token": self._token}) mergedHeaders.update({"x-auth-token": self._token})
mergedHeaders.update({"gtoken": self._gtoken}) mergedHeaders.update({"gtoken": self._gtoken})
if self.encrypt_body:
# TODO(#70): Test and Support optional encryption for body
# Unknowns: Which string is signed? Method + Request?
# How does the signing work?
# What is the key-ident? Ident of the shared secret?
# What is request-once?
# Is the separate timestamp relevant for encryption?
pass
# Extra Api header arguments used by the mobile App for request encryption
# Response will return encrypted boy and a signature
# mergedHeaders.update({
# "x-replay-info": "replay",
# "x-encryption-info": "algo_ecdh",
# "x-signature": "", # 32 Bit hex
# "x-request-once": "", # 16 Bit hex
# "x-key-ident": "", # 16 Bit hex
# "x-request-ts": str(
# int(systime.mktime(datetime.now().timetuple()) * 1000)
# ), # Unix Timestamp in ms as string
# })
self._logger.debug("Request Url: %s %s", method.upper(), url) self._logger.debug("Request Url: %s %s", method.upper(), url)
self._logger.debug( self._logger.debug(
@ -959,44 +1073,67 @@ class AnkerSolixApi:
else: else:
self._logger.debug("Request Body: %s", json) self._logger.debug("Request Body: %s", json)
body_text = "" body_text = ""
# enforce configured delay between any subsequent request
await self._wait_delay()
async with self._session.request( async with self._session.request(
method, url, headers=mergedHeaders, json=json method, url, headers=mergedHeaders, json=json
) as resp: ) as resp:
try: try:
self._last_request_time = datetime.now()
self.request_count.add(self._last_request_time)
self._logger.debug(
"%s request %s %s response received", self.nickname, method, url
)
# print response headers
self._logger.debug("Response Headers: %s", resp.headers)
# get first the body text for usage in error detail logging if necessary # get first the body text for usage in error detail logging if necessary
body_text = await resp.text() body_text = await resp.text()
resp.raise_for_status() data = {}
data: dict = await resp.json(content_type=None) resp.raise_for_status() # any response status >= 400
if (data := await resp.json(content_type=None)) and self.encrypt_body:
# TODO(#70): Test and Support optional encryption for body
# data dict has to be decoded when encrypted
# if signature := data.get("signature"):
# pass
pass
if not data:
self._logger.error("Response Text: %s", body_text)
raise ClientError(f"No data response while requesting {endpoint}")
if endpoint == _API_LOGIN: if endpoint == _API_LOGIN:
self._logger.debug( self._logger.debug(
"Request Response: %s", "Response Data: %s",
self.mask_values( self.mask_values(
data, "user_id", "auth_token", "email", "geo_key" data, "user_id", "auth_token", "email", "geo_key"
), ),
) )
else: else:
self._logger.debug("Request Response: %s", data) self._logger.debug("Response Data: %s", data)
if not data:
self._logger.error("Response Text: %s", body_text)
raise ClientError(f"No data response while requesting {endpoint}")
errors.raise_error(data) # check the response code in the data
if endpoint != _API_LOGIN:
self._retry_attempt = False # reset retry flag only when valid token received and not another login request self._retry_attempt = False # reset retry flag only when valid token received and not another login request
errors.raise_error(data) # check the Api response status code in the data
# valid response at this point, mark login and return data # valid response at this point, mark login and return data
self._loggedIn = True self._loggedIn = True
return data return data # noqa: TRY300
except ( except (
ClientError ClientError
) as err: # Exception from ClientSession based on standard response codes ) as err: # Exception from ClientSession based on standard response status codes
self._logger.error("Request Error: %s", err) self._logger.error("Api Request Error: %s", err)
self._logger.error("Response Text: %s", body_text) self._logger.error("Response Text: %s", body_text)
if "401" in str(err) or "403" in str(err): # Prepare data dict for Api error lookup
if not data:
data = {}
if not hasattr(data,"code"):
data["code"] = resp.status
if not hasattr(data,"msg"):
data["msg"] = body_text
if resp.status in [401,403]:
# Unauthorized or forbidden request # Unauthorized or forbidden request
if self._retry_attempt: if self._retry_attempt:
errors.raise_error(data, prefix=f"Login failed for user {self._email}")
# catch error if Api code not defined
raise errors.AuthorizationError( raise errors.AuthorizationError(
f"Login failed for user {self._email}" f"Login failed for user {self._email}"
) from err ) from err
@ -1005,36 +1142,21 @@ class AnkerSolixApi:
return await self.request( return await self.request(
method, endpoint, headers=headers, json=json method, endpoint, headers=headers, json=json
) )
self._logger.error("Login failed for user %s", self._email) self._logger.error("Re-Login failed for user %s", self._email)
raise errors.AuthorizationError( elif resp.status in [429]:
f"Login failed for user {self._email}" # Too Many Requests, add stats to message
) from err errors.raise_error(data, prefix=f"Too Many Requests: {self.request_count}")
else:
# raise Anker Solix error if code is known
errors.raise_error(data)
# raise Client error otherwise
raise ClientError( raise ClientError(
f"There was an error while requesting {endpoint}: {err}" f"Api Request Error: {err}", f"response={body_text}"
) from err
except (
errors.InvalidCredentialsError,
errors.TokenKickedOutError,
) as err: # Exception for API specific response codes
self._logger.error("API ERROR: %s", err)
self._logger.error("Response Text: %s", body_text)
if self._retry_attempt:
raise errors.AuthorizationError(
f"Login failed for user {self._email}"
) from err
self._logger.warning("Login failed, retrying authentication")
if await self.async_authenticate(restart=True):
return await self.request(
method, endpoint, headers=headers, json=json
)
self._logger.error("Login failed for user %s", self._email)
raise errors.AuthorizationError(
f"Login failed for user {self._email}"
) from err ) from err
except errors.AnkerSolixError as err: # Other Exception from API except errors.AnkerSolixError as err: # Other Exception from API
self._logger.error("ANKER API ERROR: %s", err) self._logger.error("%s", err)
self._logger.error("Response Text: %s", body_text) self._logger.error("Response Text: %s", body_text)
raise err raise
async def update_sites(self, fromFile: bool = False) -> dict: async def update_sites(self, fromFile: bool = False) -> dict:
"""Get the latest info for all accessible sites and update class site and device variables. """Get the latest info for all accessible sites and update class site and device variables.
@ -1066,9 +1188,8 @@ class AnkerSolixApi:
sites = await self.get_site_list(fromFile=fromFile) sites = await self.get_site_list(fromFile=fromFile)
self._site_devices = set() self._site_devices = set()
for site in sites.get("site_list", []): for site in sites.get("site_list", []):
if site.get("site_id"): if myid := site.get("site_id"):
# Update site info # Update site info
myid = site.get("site_id")
mysite = self.sites.get(myid, {}) mysite = self.sites.get(myid, {})
siteInfo = mysite.get("site_info", {}) siteInfo = mysite.get("site_info", {})
siteInfo.update(site) siteInfo.update(site)
@ -1104,7 +1225,7 @@ class AnkerSolixApi:
solarbank = dict(solarbank).copy() solarbank = dict(solarbank).copy()
solarbank.update({"alias_name": solarbank.pop("device_name")}) solarbank.update({"alias_name": solarbank.pop("device_name")})
# work around for system and device output presets, which are not set correctly and cannot be queried with load schedule for shared accounts # work around for system and device output presets, which are not set correctly and cannot be queried with load schedule for shared accounts
if not solarbank.get("set_load_power"): if not str(solarbank.get("set_load_power")).isdigit():
total_preset = str(mysite.get("retain_load", "")).replace( total_preset = str(mysite.get("retain_load", "")).replace(
"W", "" "W", ""
) )
@ -1143,6 +1264,14 @@ class AnkerSolixApi:
if sn: if sn:
self._site_devices.add(sn) self._site_devices.add(sn)
sb_charges[sn] = charge_calc sb_charges[sn] = charge_calc
# as time progressed, update actual schedule slot presets from a cached schedule if available
if schedule := (self.devices.get(sn, {})).get("schedule"):
self._update_dev(
{
"device_sn": sn,
"schedule": schedule,
}
)
# adjust calculated SB charge to match total # adjust calculated SB charge to match total
if len(sb_charges) == len(sb_list) and str(sb_total_charge).isdigit(): if len(sb_charges) == len(sb_list) and str(sb_total_charge).isdigit():
sb_total_charge = int(sb_total_charge) sb_total_charge = int(sb_total_charge)
@ -1219,13 +1348,18 @@ class AnkerSolixApi:
self.sites = new_sites self.sites = new_sites
return self.sites return self.sites
async def update_site_details(self, fromFile: bool = False) -> dict: async def update_site_details(
self, fromFile: bool = False, exclude: set = None
) -> dict:
"""Get the latest updates for additional site related details updated less frequently. """Get the latest updates for additional site related details updated less frequently.
Most of theses requests return data only when user has admin rights for sites owning the devices. Most of theses requests return data only when user has admin rights for sites owning the devices.
To limit API requests, this update site details method should be called less frequently than update site method, To limit API requests, this update site details method should be called less frequently than update site method,
and it updates just the nested site_details dictionary in the sites dictionary. and it updates just the nested site_details dictionary in the sites dictionary.
""" """
# define excluded categories to skip for queries
if not exclude:
exclude = set()
self._logger.debug("Updating Sites Details") self._logger.debug("Updating Sites Details")
# Fetch unread account messages once and put in site details for all sites # Fetch unread account messages once and put in site details for all sites
self._logger.debug("Getting unread messages indicator") self._logger.debug("Getting unread messages indicator")
@ -1234,12 +1368,13 @@ class AnkerSolixApi:
# Fetch details that only work for site admins # Fetch details that only work for site admins
if site.get("site_admin", False): if site.get("site_admin", False):
# Fetch site price and CO2 settings # Fetch site price and CO2 settings
self._logger.debug("Getting price and CO2 settings for site") if {ApiCategories.site_price} - exclude:
await self.get_site_price(siteId=site_id, fromFile=fromFile) self._logger.debug("Getting price and CO2 settings for site")
await self.get_site_price(siteId=site_id, fromFile=fromFile)
return self.sites return self.sites
async def update_device_details( async def update_device_details(
self, fromFile: bool = False, devtypes: set = None self, fromFile: bool = False, exclude: set = None
) -> dict: ) -> dict:
"""Get the latest updates for additional device info updated less frequently. """Get the latest updates for additional device info updated less frequently.
@ -1247,17 +1382,18 @@ class AnkerSolixApi:
To limit API requests, this update device details method should be called less frequently than update site method, To limit API requests, this update device details method should be called less frequently than update site method,
which will also update most device details as found in the site data response. which will also update most device details as found in the site data response.
""" """
# define allowed device types to query, default to all # define excluded device types or categories to skip for queries
if not devtypes: if not exclude:
devtypes = {d.value for d in SolixDeviceType} exclude = set()
self._logger.debug("Updating Device Details") self._logger.debug("Updating Device Details")
# Fetch firmware version of device # Fetch firmware version of device
# This response will also contain unbound / standalone devices not added to a site # This response will also contain unbound / standalone devices not added to a site
self._logger.debug("Getting bind devices") self._logger.debug("Getting bind devices")
await self.get_bind_devices(fromFile=fromFile) await self.get_bind_devices(fromFile=fromFile)
# Get the setting for effective automated FW upgrades # Get the setting for effective automated FW upgrades
self._logger.debug("Getting OTA settings") if {ApiCategories.device_auto_upgrade} - exclude:
await self.get_auto_upgrade(fromFile=fromFile) self._logger.debug("Getting OTA settings")
await self.get_auto_upgrade(fromFile=fromFile)
# Fetch other relevant device information that requires site id and/or SN # Fetch other relevant device information that requires site id and/or SN
site_wifi: dict[str, list[dict | None]] = {} site_wifi: dict[str, list[dict | None]] = {}
for sn, device in self.devices.items(): for sn, device in self.devices.items():
@ -1284,18 +1420,20 @@ class AnkerSolixApi:
if 0 < wifi_index <= len(wifi_list): if 0 < wifi_index <= len(wifi_list):
device.update(wifi_list[wifi_index - 1]) device.update(wifi_list[wifi_index - 1])
# Fetch device type specific details, if device type should be queried # Fetch device type specific details, if device type not excluded
if dev_Type in ({SolixDeviceType.SOLARBANK.value} & devtypes): if dev_Type in ({SolixDeviceType.SOLARBANK.value} - exclude):
# Fetch active Power Cutoff setting for solarbanks # Fetch active Power Cutoff setting for solarbanks
self._logger.debug("Getting Power Cutoff settings for device") if {ApiCategories.solarbank_cutoff} - exclude:
await self.get_power_cutoff( self._logger.debug("Getting Power Cutoff settings for device")
siteId=site_id, deviceSn=sn, fromFile=fromFile await self.get_power_cutoff(
) siteId=site_id, deviceSn=sn, fromFile=fromFile
)
# Fetch defined inverter details for solarbanks # Fetch defined inverter details for solarbanks
self._logger.debug("Getting inverter settings for device") if {ApiCategories.solarbank_solar_info} - exclude:
await self.get_solar_info(solarbankSn=sn, fromFile=fromFile) self._logger.debug("Getting inverter settings for device")
await self.get_solar_info(solarbankSn=sn, fromFile=fromFile)
# Fetch schedule for device types supporting it # Fetch schedule for device types supporting it
self._logger.debug("Getting schedule details for device") self._logger.debug("Getting schedule details for device")
@ -1304,10 +1442,11 @@ class AnkerSolixApi:
) )
# Fetch device fittings for device types supporting it # Fetch device fittings for device types supporting it
self._logger.debug("Getting fittings for device") if {ApiCategories.solarbank_fittings} - exclude:
await self.get_device_fittings( self._logger.debug("Getting fittings for device")
siteId=site_id, deviceSn=sn, fromFile=fromFile await self.get_device_fittings(
) siteId=site_id, deviceSn=sn, fromFile=fromFile
)
# update entry in devices # update entry in devices
self.devices.update({sn: device}) self.devices.update({sn: device})
@ -1316,19 +1455,22 @@ class AnkerSolixApi:
return self.devices return self.devices
async def update_device_energy(self, devtypes: set = None) -> dict: async def update_device_energy(self, exclude: set = None) -> dict:
"""Get the energy statistics for given device types from today and yesterday. """Get the energy statistics for given device types from today and yesterday.
Yesterday energy will be queried only once if not available yet, but not updated in subsequent refreshes. Yesterday energy will be queried only once if not available yet, but not updated in subsequent refreshes.
Energy data can also be fetched by shared accounts. Energy data can also be fetched by shared accounts.
""" """
# define allowed device types to query, default to no energy data # define allowed device types to query, default to no energy data
if not devtypes: if not exclude:
devtypes = set() exclude = set()
for sn, device in self.devices.items(): for sn, device in self.devices.items():
site_id = device.get("site_id", "") site_id = device.get("site_id", "")
dev_Type = device.get("type", "") dev_Type = device.get("type", "")
if dev_Type in ({SolixDeviceType.SOLARBANK.value} & devtypes): if (
dev_Type in ({SolixDeviceType.SOLARBANK.value} - exclude)
and {ApiCategories.solarbank_energy} - exclude
):
self._logger.debug("Getting Energy details for device") self._logger.debug("Getting Energy details for device")
energy = device.get("energy_details") or {} energy = device.get("energy_details") or {}
today = datetime.today().strftime("%Y-%m-%d") today = datetime.today().strftime("%Y-%m-%d")
@ -2607,8 +2749,6 @@ class AnkerSolixApi:
daylist = [startDay + timedelta(days=x) for x in range(numDays)] daylist = [startDay + timedelta(days=x) for x in range(numDays)]
for day in daylist: for day in daylist:
daystr = day.strftime("%Y-%m-%d") daystr = day.strftime("%Y-%m-%d")
if day != daylist[0]:
systime.sleep(1) # delay to avoid hammering API
resp = await self.energy_analysis( resp = await self.energy_analysis(
siteId=siteId, siteId=siteId,
deviceSn=deviceSn, deviceSn=deviceSn,

View File

@ -27,6 +27,10 @@ class RequestError(AnkerSolixError):
"""Request error.""" """Request error."""
class RequestLimitError(AnkerSolixError):
"""Request Limit exceeded error."""
class VerifyCodeError(AnkerSolixError): class VerifyCodeError(AnkerSolixError):
"""Verify code error.""" """Verify code error."""
@ -70,6 +74,7 @@ class RetryExceeded(AnkerSolixError):
ERRORS: dict[int, type[AnkerSolixError]] = { ERRORS: dict[int, type[AnkerSolixError]] = {
401: AuthorizationError, 401: AuthorizationError,
403: AuthorizationError, 403: AuthorizationError,
429: RequestLimitError,
997: ConnectError, 997: ConnectError,
998: NetworkError, 998: NetworkError,
999: ServerError, 999: ServerError,
@ -86,14 +91,17 @@ ERRORS: dict[int, type[AnkerSolixError]] = {
26084: TokenKickedOutError, 26084: TokenKickedOutError,
26108: InvalidCredentialsError, 26108: InvalidCredentialsError,
26156: InvalidCredentialsError, 26156: InvalidCredentialsError,
26161: RequestError,
100053: RetryExceeded, 100053: RetryExceeded,
} }
def raise_error(data: dict) -> None: def raise_error(data: dict, prefix: str = "Anker Api Error") -> None:
"""Raise the appropriate error based upon a response code.""" """Raise the appropriate Api error based upon a response code in json data."""
code = data.get("code", -1) if isinstance(data, dict) and "code" in data:
if code in [0]: # json dict, get code for server response
code = int(data.get("code"))
else:
return return
cls = ERRORS.get(code, AnkerSolixError) if error := ERRORS.get(code) or (AnkerSolixError if code >= 10000 else None):
raise cls(f'({code}) {data.get("msg","Error msg not found")}') raise error(f'({code}) {prefix}: {data.get("msg","Error msg not found")}')

View File

@ -1,4 +1,4 @@
"""A collection of helper functions for pyscripts.""" """A collection of helper functions for pyscripts.""" # noqa: INP001
import getpass import getpass
import logging import logging
import os import os

View File

@ -82,9 +82,14 @@ async def main() -> None:
filename = f"daily_energy_{daystr}.csv" filename = f"daily_energy_{daystr}.csv"
except ValueError: except ValueError:
return False return False
# delay requests, limit appears to be around 25 per minute
if numdays > 25:
myapi.requestDelay(2.5)
else:
myapi.requestDelay(.3)
CONSOLE.info( CONSOLE.info(
"Queries may take up to %s seconds...please wait...", "Queries may take up to %s seconds with %.3f seconds delay...please wait...",
numdays * daytotals + 2, round((numdays * daytotals + 1) * myapi.requestDelay()),myapi.requestDelay()
) )
data = await myapi.energy_daily( data = await myapi.energy_daily(
siteId=device.get("site_id"), siteId=device.get("site_id"),

View File

@ -25,7 +25,6 @@ import os
import random import random
import string import string
import sys import sys
import time
from aiohttp import ClientSession from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientError from aiohttp.client_exceptions import ClientError
@ -141,7 +140,6 @@ def export(
"""Save dict data to given file.""" """Save dict data to given file."""
if not d: if not d:
d = {} d = {}
time.sleep(1) # central delay between multiple requests
if len(d) == 0: if len(d) == 0:
CONSOLE.info("WARNING: File %s not saved because JSON is empty", filename) CONSOLE.info("WARNING: File %s not saved because JSON is empty", filename)
return return
@ -208,6 +206,8 @@ async def main() -> bool: # noqa: C901 # pylint: disable=too-many-branches,too-
# Ensure to use local subfolder # Ensure to use local subfolder
folder = os.path.join(os.path.dirname(__file__), "exports", folder) folder = os.path.join(os.path.dirname(__file__), "exports", folder)
os.makedirs(folder, exist_ok=True) os.makedirs(folder, exist_ok=True)
# define minimum delay in seconds between requests
myapi.requestDelay(0.5)
# first update sites and devices in API object # first update sites and devices in API object
CONSOLE.info("\nQuerying site information...") CONSOLE.info("\nQuerying site information...")

View File

@ -1,6 +1,6 @@
[project] [project]
name = "Anker-Solix-Api" name = "Anker-Solix-Api"
version = "1.5.0" version = "1.8.0"
description = "Python library for Anker Solix Power devices (Solarbank, Inverter etc)" description = "Python library for Anker Solix Power devices (Solarbank, Inverter etc)"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View File

@ -248,6 +248,7 @@ async def main() -> ( # noqa: C901 # pylint: disable=too-many-locals,too-many-b
"Neither Solarbank nor Inverter device, further details will be skipped" "Neither Solarbank nor Inverter device, further details will be skipped"
) )
CONSOLE.info("") CONSOLE.info("")
CONSOLE.info("Api Requests: %s", myapi.request_count)
CONSOLE.debug(json.dumps(myapi.devices, indent=2)) CONSOLE.debug(json.dumps(myapi.devices, indent=2))
for sec in range(REFRESH): for sec in range(REFRESH):
now = datetime.now().astimezone() now = datetime.now().astimezone()