2848 lines
138 KiB
Python
2848 lines
138 KiB
Python
"""Class for interacting with the Anker Power / Solix API.
|
|
|
|
Required Python modules:
|
|
pip install cryptography
|
|
pip install aiohttp
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from asyncio import sleep
|
|
from base64 import b64encode
|
|
import contextlib
|
|
import copy
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time as systime
|
|
|
|
from aiohttp import ClientSession
|
|
from aiohttp.client_exceptions import ClientError
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes, padding, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
from . import errors
|
|
|
|
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
|
|
"""Default definitions required for the Anker Power/Solix Cloud API"""
|
|
# API servers per region. Country assignment not clear, defaulting to EU server
|
|
_API_SERVERS = {
|
|
"eu": "https://ankerpower-api-eu.anker.com",
|
|
"com": "https://ankerpower-api.anker.com",
|
|
}
|
|
_API_LOGIN = "passport/login"
|
|
_API_HEADERS = {
|
|
"Content-Type": "application/json",
|
|
"Model-Type": "DESKTOP",
|
|
"App-Name": "anker_power",
|
|
"Os-Type": "android",
|
|
}
|
|
_API_COUNTRIES = {
|
|
"com": [
|
|
"DZ",
|
|
"LB",
|
|
"SY",
|
|
"EG",
|
|
"LY",
|
|
"TN",
|
|
"IL",
|
|
"MA",
|
|
"JO",
|
|
"PS",
|
|
"AR",
|
|
"AU",
|
|
"BR",
|
|
"HK",
|
|
"IN",
|
|
"JP",
|
|
"MX",
|
|
"NG",
|
|
"NZ",
|
|
"RU",
|
|
"SG",
|
|
"ZA",
|
|
"KR",
|
|
"TW",
|
|
"US",
|
|
"CA",
|
|
],
|
|
"eu": [
|
|
"BE",
|
|
"EL",
|
|
"LT",
|
|
"PT",
|
|
"BG",
|
|
"ES",
|
|
"LU",
|
|
"RO",
|
|
"CZ",
|
|
"FR",
|
|
"HU",
|
|
"SI",
|
|
"DK",
|
|
"HR",
|
|
"MT",
|
|
"SK",
|
|
"DE",
|
|
"IT",
|
|
"NL",
|
|
"FI",
|
|
"EE",
|
|
"CY",
|
|
"AT",
|
|
"SE",
|
|
"IE",
|
|
"LV",
|
|
"PL",
|
|
"UK",
|
|
"IS",
|
|
"NO",
|
|
"LI",
|
|
"CH",
|
|
"BA",
|
|
"ME",
|
|
"MD",
|
|
"MK",
|
|
"GE",
|
|
"AL",
|
|
"RS",
|
|
"TR",
|
|
"UA",
|
|
"XK",
|
|
"AM",
|
|
"BY",
|
|
"AZ",
|
|
],
|
|
} # TODO(2): Expand or update list once ID assignments are wrong or missing
|
|
|
|
"""Following are the Anker Power/Solix Cloud API endpoints known so far"""
|
|
_API_ENDPOINTS = {
|
|
"homepage": "power_service/v1/site/get_site_homepage", # Scene info for configured site(s), content as preseneted on App Home Page (mostly empty for shared accounts)
|
|
"site_list": "power_service/v1/site/get_site_list", # List of available site ids for the user, will also show sites shared withe the account
|
|
"site_detail": "power_service/v1/site/get_site_detail", # Information for given site_id, can also be used by shared accounts
|
|
"site_rules": "power_service/v1/site/get_site_rules", # Information for supported power site types and their min and max qty per device model types
|
|
"scene_info": "power_service/v1/site/get_scen_info", # Scene info for provided site id (contains most information as the App home screen, with some but not all device details)
|
|
"user_devices": "power_service/v1/site/list_user_devices", # List Device details of owned devices, not all device details information included
|
|
"charging_devices": "power_service/v1/site/get_charging_device", # List of Portable Power Station devices?
|
|
"get_device_parm": "power_service/v1/site/get_site_device_param", # Get settings of a device for the provided site id and param type (e.g. Schedules)
|
|
"set_device_parm": "power_service/v1/site/set_site_device_param", # Apply provided settings to a device for the provided site id and param type (e.g. Schedules), NOT IMPLEMENTED YET
|
|
"wifi_list": "power_service/v1/site/get_wifi_info_list", # List of available networks for provided site id
|
|
"get_site_price": "power_service/v1/site/get_site_price", # List defined power price and CO2 for given site, works only for site owner account
|
|
"update_site_price": "power_service/v1/site/update_site_price", # Update power price and CO2 for given site, works only for site owner account
|
|
"get_auto_upgrade": "power_service/v1/app/get_auto_upgrade", # List of Auto-Upgrade configuration and enabled devices, only works for site owner accout
|
|
"set_auto_upgrade": "power_service/v1/app/set_auto_upgrade", # Set/Enable Auto-Upgrade configuration, works only for site owner account
|
|
"bind_devices": "power_service/v1/app/get_relate_and_bind_devices", # List with details of locally connected/bound devices, includes firmware version, works only for owner account
|
|
"get_device_load": "power_service/v1/app/device/get_device_home_load", # Get defined device schedule (same data as provided with device param query)
|
|
"set_device_load": "power_service/v1/app/device/set_device_home_load", # Set defined device schedule, Accepts the new schedule, but does NOT change it? Maybe future use for schedules per device
|
|
"get_ota_info": "power_service/v1/app/compatible/get_ota_info", # Get OTA status for solarbank and/or inverter serials
|
|
"get_ota_update": "power_service/v1/app/compatible/get_ota_update", # Not clear what this does, shows some OTA settings
|
|
"solar_info": "power_service/v1/app/compatible/get_compatible_solar_info", # Solar inverter definition for solarbanks, works only with owner account
|
|
"get_cutoff": "power_service/v1/app/compatible/get_power_cutoff", # Get Power Cutoff settings (Min SOC) for provided site id and device sn, works only with owner account
|
|
"set_cutoff": "power_service/v1/app/compatible/set_power_cutoff", # Set Min SOC for device, only works for onwer accounts
|
|
"compatible_process": "power_service/v1/app/compatible/get_compatible_process", # contains solar_info plus OTA processing codes, works only with owner account
|
|
"get_device_fittings": "power_service/v1/app/get_relate_device_fittings", # Device fittings for given site id and device sn. Shows Accessories like Solarbank 0W Switch info
|
|
"energy_analysis": "power_service/v1/site/energy_analysis", # Fetch energy data for given time frames
|
|
"home_load_chart": "power_service/v1/site/get_home_load_chart", # Fetch data as displayed in home load chart for schedule adjustments for given site_id and optional device SN (empty if solarbank not connected)
|
|
"get_upgrade_record": "power_service/v1/app/get_upgrade_record", # get list of firmware update history
|
|
"check_upgrade_record": "power_service/v1/app/check_upgrade_record", # show an upgrade record for the device, types 1-3 show different info, only works for owner account
|
|
"get_message_unread": "power_service/v1/get_message_unread", # GET method to show if there are unread messages for account
|
|
"get_message": "power_service/v1/get_message", # get list of max Messages from certain time, last_time format unknown
|
|
"get_mqtt_info": "app/devicemanage/get_user_mqtt_info", # get mqtt server and certificates, not explored or used
|
|
}
|
|
|
|
""" Other endpoints neither implemented nor explored:
|
|
'power_service/v1/site/can_create_site',
|
|
'power_service/v1/site/create_site',
|
|
'power_service/v1/site/update_site',
|
|
'power_service/v1/site/delete_site',
|
|
'power_service/v1/site/add_charging_device',
|
|
'power_service/v1/site/update_charging_device',
|
|
'power_service/v1/site/reset_charging_device',
|
|
'power_service/v1/site/delete_charging_device',
|
|
'power_service/v1/site/add_site_devices',
|
|
'power_service/v1/site/delete_site_devices',
|
|
'power_service/v1/site/update_site_devices',
|
|
'power_service/v1/site/get_addable_site_list, # show to which defined site a given model type can be added
|
|
'power_service/v1/app/compatible/set_ota_update',
|
|
'power_service/v1/app/compatible/save_ota_complete_status',
|
|
'power_service/v1/app/compatible/check_third_sn',
|
|
'power_service/v1/app/compatible/save_compatible_solar',
|
|
'power_service/v1/app/compatible/get_confirm_permissions',
|
|
'power_service/v1/app/compatible/confirm_permissions_settings',
|
|
'power_service/v1/app/after_sale/check_popup',
|
|
'power_service/v1/app/after_sale/check_sn',
|
|
'power_service/v1/app/after_sale/mark_sn',
|
|
'power_service/v1/app/share_site/delete_site_member',
|
|
'power_service/v1/app/share_site/invite_member',
|
|
'power_service/v1/app/share_site/delete_inviting_member',
|
|
'power_service/v1/app/share_site/get_invited_list',
|
|
'power_service/v1/app/share_site/join_site',
|
|
'power_service/v1/app/upgrade_event_report', # post an entry to upgrade event report
|
|
'power_service/v1/app/get_phonecode_list',
|
|
'power_service/v1/get_message_not_disturb', # get do not disturb messages settings
|
|
'power_service/v1/message_not_disturb', # change do not disurb messages settings
|
|
'power_service/v1/read_message',
|
|
'power_service/v1/del_message',
|
|
'power_service/v1/product_categories', # GET method to list all supported products with details and web picture links
|
|
'power_service/v1/product_accessories', # GET method to list all supported products accessories with details and web picture links
|
|
'app/devicemanage/update_relate_device_info',
|
|
'app/cloudstor/get_app_up_token_general',
|
|
'app/logging/get_device_logging',
|
|
'app/devicerelation/up_alias_name', # Update Alias name of device? Fails with (10003) Failed to request
|
|
'app/devicerelation/un_relate_and_unbind_device',
|
|
'app/devicerelation/relate_device',
|
|
|
|
|
|
|
|
|
|
|
|
Structure of the JSON response for an API Login Request:
|
|
An unexpired token_id must be used for API request, along with the gtoken which is an MD5 hash of the returned(encrypted) user_id.
|
|
The combination of the provided token and MD5 hashed user_id authenticate the client to the server.
|
|
The Login Response is cached in a JSON file per email user account and can be reused by this API class without further login request.
|
|
|
|
ATTENTION: Anker allows only 1 active token on the server per user account. Any login for the same account (e.g. via Anker mobile App) will kickoff the token used in this Api instance and further requests are no longer authorized.
|
|
Currently, the Api will re-authenticate automatically and therefore may kick off the other user that obtained the actual access token (e.g. kick out the App user again when used for regular Api requests)
|
|
|
|
NOTES: Parallel Api instances should use different user accounts. They may work in parallel when all using the same cached authentication data. The first API instance with failed authorization will restart a new Login request and updates
|
|
the cached JSON file. Other instances should recognize an update of the cached JSON file and will refresh their login credentials in the instance for the actual token and gtoken without new login request.
|
|
"""
|
|
|
|
LOGIN_RESPONSE: dict = {
|
|
"user_id": str,
|
|
"email": str,
|
|
"nick_name": str,
|
|
"auth_token": str,
|
|
"token_expires_at": int,
|
|
"avatar": str,
|
|
"mac_addr": str,
|
|
"domain": str,
|
|
"ab_code": str,
|
|
"token_id": int,
|
|
"geo_key": str,
|
|
"privilege": int,
|
|
"phone": str,
|
|
"phone_number": str,
|
|
"phone_code": str,
|
|
"server_secret_info": {"public_key": str},
|
|
"params": list,
|
|
"trust_list": list,
|
|
"fa_info": {"step": int, "info": str},
|
|
"country_code": str,
|
|
}
|
|
|
|
|
|
class SolixDeviceType(Enum):
|
|
"""Enumuration for Anker Solix device types."""
|
|
|
|
SYSTEM = "system"
|
|
SOLARBANK = "solarbank"
|
|
INVERTER = "inverter"
|
|
PPS = "pps"
|
|
POWERPANEL = "powerpanel"
|
|
POWERCOOLER = "powercooler"
|
|
|
|
|
|
class SolixParmType(Enum):
|
|
"""Enumuration for Anker Solix Parameter types."""
|
|
|
|
SOLARBANK_SCHEDULE = "4"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ApiCategories:
|
|
"""Dataclass to specify supported Api categorties for regular Api cache refresh cycles."""
|
|
|
|
site_price: str = "site_price"
|
|
device_auto_upgrade: str = "device_auto_upgrade"
|
|
solarbank_energy: str = "solarbank_energy"
|
|
solarbank_fittings: str = "solarbank_fittings"
|
|
solarbank_cutoff: str = "solarbank_cutoff"
|
|
solarbank_solar_info: str = "solarbank_solar_info"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SolixDeviceCapacity:
|
|
"""Dataclass for Anker Solix device capacities in Wh by Part Number."""
|
|
|
|
A17C0: int = 1600 # SOLIX E1600 Solarbank
|
|
A1720: int = 256 # Anker PowerHouse 521 Portable Power Station
|
|
A1751: int = 512 # Anker PowerHouse 535 Portable Power Station
|
|
A1753: int = 768 # SOLIX C800 Portable Power Station
|
|
A1754: int = 768 # SOLIX C800 Plus Portable Power Station
|
|
A1755: int = 768 # SOLIX C800X Portable Power Station
|
|
A1760: int = 1024 # Anker PowerHouse 555 Portable Power Station
|
|
A1761: int = 1056 # SOLIX C1000(X) Portable Power Station
|
|
A17C1: int = 1056 # SOLIX C1000 Expansion Battery
|
|
A1770: int = 1229 # Anker PowerHouse 757 Portable Power Station
|
|
A1771: int = 1229 # SOLIX F1200 Portable Power Station
|
|
A1772: int = 1536 # SOLIX F1500 Portable Power Station
|
|
A1780: int = 2048 # SOLIX F2000 Portable Power Station (PowerHouse 767)
|
|
A1780_1: int = 2048 # Expansion Battery for F2000
|
|
A1781: int = 2560 # SOLIX F2600 Portable Power Station
|
|
A1790: int = 3840 # SOLIX F3800 Portable Power Station
|
|
A1790_1: int = 3840 # SOLIX BP3800 Expansion Battery for F3800
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SolixDeviceCategory:
|
|
"""Dataclass for Anker Solix device types by Part Number to be used for standalone/unbound device categorization."""
|
|
|
|
# Solarbanks
|
|
A17C0: str = SolixDeviceType.SOLARBANK.value # SOLIX E1600 Solarbank
|
|
# Inverter
|
|
A5140: str = SolixDeviceType.INVERTER.value # MI60 Inverter
|
|
A5143: str = SolixDeviceType.INVERTER.value # MI80 Inverter
|
|
# Portable Power Stations (PPS)
|
|
A1720: str = (
|
|
SolixDeviceType.PPS.value
|
|
) # Anker PowerHouse 521 Portable Power Station
|
|
A1751: str = (
|
|
SolixDeviceType.PPS.value
|
|
) # Anker PowerHouse 535 Portable Power Station
|
|
A1753: str = SolixDeviceType.PPS.value # SOLIX C800 Portable Power Station
|
|
A1754: str = SolixDeviceType.PPS.value # SOLIX C800 Plus Portable Power Station
|
|
A1755: str = SolixDeviceType.PPS.value # SOLIX C800X Portable Power Station
|
|
A1760: str = (
|
|
SolixDeviceType.PPS.value
|
|
) # Anker PowerHouse 555 Portable Power Station
|
|
A1761: str = SolixDeviceType.PPS.value # SOLIX C1000(X) Portable Power Station
|
|
A1770: str = (
|
|
SolixDeviceType.PPS.value
|
|
) # Anker PowerHouse 757 Portable Power Station
|
|
A1771: str = SolixDeviceType.PPS.value # SOLIX F1200 Portable Power Station
|
|
A1772: str = SolixDeviceType.PPS.value # SOLIX F1500 Portable Power Station
|
|
A1780: str = (
|
|
SolixDeviceType.PPS.value
|
|
) # SOLIX F2000 Portable Power Station (PowerHouse 767)
|
|
A1781: str = SolixDeviceType.PPS.value # SOLIX F2600 Portable Power Station
|
|
A1790: str = SolixDeviceType.PPS.value # SOLIX F3800 Portable Power Station
|
|
# Home Power Panels
|
|
A17B1: str = SolixDeviceType.POWERPANEL.value # SOLIX Home Power Panel
|
|
# Power Cooler
|
|
A17A0: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 30
|
|
A17A1: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 40
|
|
A17A2: str = SolixDeviceType.POWERCOOLER.value # SOLIX Power Cooler 50
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SolixDefaults:
|
|
"""Dataclass for Anker Solix defaults to be used."""
|
|
|
|
# Output Power presets for Solarbank schedule timeslot settings
|
|
PRESET_MIN: int = 0
|
|
PRESET_MAX: int = 800
|
|
PRESET_DEF: int = 100
|
|
# Export Switch preset for Solarbank schedule timeslot settings
|
|
ALLOW_EXPORT: bool = True
|
|
# Charge Priority limit preset for Solarbank schedule timeslot settings
|
|
CHARGE_PRIORITY_MIN: int = 0
|
|
CHARGE_PRIORITY_MAX: int = 100
|
|
CHARGE_PRIORITY_DEF: int = 80
|
|
# Seconds delay for subsequent Api requests in methods to update the Api cache dictionaries
|
|
REQUEST_DELAY_MIN: float = 0.0
|
|
REQUEST_DELAY_MAX: float = 5.0
|
|
REQUEST_DELAY_DEF: float = 0.3
|
|
|
|
|
|
class SolixDeviceStatus(Enum):
|
|
"""Enumuration for Anker Solix Device status."""
|
|
|
|
# The device status code seems to be used for cloud connection status.
|
|
offline = "0"
|
|
online = "1"
|
|
unknown = "unknown"
|
|
|
|
|
|
class SolarbankStatus(Enum):
|
|
"""Enumuration for Anker Solix Solarbank status."""
|
|
|
|
detection = "0"
|
|
bypass = "1"
|
|
discharge = "2"
|
|
charge = "3"
|
|
charge_bypass = "35" # pseudo state, the solarbank does not distinguish this
|
|
charge_priority = "37" # pseudo state, the solarbank does not distinguish this but reports 3 as seen so far
|
|
wakeup = "4" # Not clear what happens during this state, but observed short intervals during night as well
|
|
# TODO(3): Add descriptions once status code usage is observed/known
|
|
# code 5 was not observed yet
|
|
full_bypass = "6" # seen at cold temperature, when battery must not be charged and the Solarbank bypasses all directly to inverter, also solar power < 25 W
|
|
standby = "7"
|
|
unknown = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class SolarbankTimeslot:
|
|
"""Dataclass to define customizable attributes of an Anker Solix Solarbank time slot as used for the schedule definition or update."""
|
|
|
|
start_time: datetime
|
|
end_time: datetime
|
|
appliance_load: int | None = (
|
|
None # mapped to appliance_loads setting using a default 50% share for dual solarbank setups
|
|
)
|
|
allow_export: bool | None = None # mapped to the turn_on boolean
|
|
charge_priority_limit: int | None = None # mapped to charge_priority setting
|
|
|
|
|
|
class RequestCounter:
|
|
"""Counter for datetime entries in last minute and last hour."""
|
|
|
|
def __init__(
|
|
self,
|
|
) -> None:
|
|
"""Initialize."""
|
|
self.elements: list = []
|
|
|
|
def __str__(self) -> str:
|
|
"""Print the counters."""
|
|
return f"{self.last_hour()} last hour, {self.last_minute()} last minute"
|
|
|
|
def add(self, request_time: datetime = datetime.now()) -> None:
|
|
"""Add new timestamp to end of counter."""
|
|
self.elements.append(request_time)
|
|
# limit the counter entries to 1 hour when adding new
|
|
self.recycle()
|
|
|
|
def recycle(
|
|
self, last_time: datetime = datetime.now() - timedelta(hours=1)
|
|
) -> None:
|
|
"""Remove oldest timestamps from beginning of counter until last_time is reached, default is 1 hour ago."""
|
|
self.elements = [x for x in self.elements if x > last_time]
|
|
|
|
def last_minute(self) -> int:
|
|
"""Get numnber of timestamps for last minute."""
|
|
last_time = datetime.now() - timedelta(minutes=1)
|
|
return len([x for x in self.elements if x > last_time])
|
|
|
|
def last_hour(self) -> int:
|
|
"""Get numnber of timestamps for last minute."""
|
|
last_time = datetime.now() - timedelta(hours=1)
|
|
return len([x for x in self.elements if x > last_time])
|
|
|
|
|
|
class AnkerSolixApi:
|
|
"""Define the API class to handle Anker server authentication and API requests, along with the last state of queried site details and Device information."""
|
|
|
|
def __init__(
|
|
self,
|
|
email: str,
|
|
password: str,
|
|
countryId: str,
|
|
websession: ClientSession,
|
|
logger=None,
|
|
) -> None:
|
|
"""Initialize."""
|
|
self._countryId: str = countryId.upper()
|
|
self._api_base: str | None = None
|
|
for region, countries in _API_COUNTRIES.items():
|
|
if self._countryId in countries:
|
|
self._api_base = _API_SERVERS.get(region)
|
|
# default to EU server
|
|
if not self._api_base:
|
|
self._api_base = _API_SERVERS.get("eu")
|
|
self._email: str = email
|
|
self._password: str = password
|
|
self._session: ClientSession = websession
|
|
self._loggedIn: bool = False
|
|
self._testdir: str = os.path.join(
|
|
os.path.dirname(__file__), "..", "examples", "example1"
|
|
)
|
|
self._retry_attempt: bool = False # Flag for retry after any token error
|
|
os.makedirs(
|
|
os.path.join(os.path.dirname(__file__), "authcache"), exist_ok=True
|
|
) # ensure folder for authentication caching exists
|
|
self._authFile: str = os.path.join(
|
|
os.path.dirname(__file__), "authcache", f"{email}.json"
|
|
) # filename for authentication cache
|
|
self._authFileTime: float = 0
|
|
# initialize logger for object
|
|
if logger:
|
|
self._logger = logger
|
|
else:
|
|
self._logger = _LOGGER
|
|
self._logger.setLevel(logging.WARNING)
|
|
if not self._logger.hasHandlers():
|
|
self._logger.addHandler(logging.StreamHandler(sys.stdout))
|
|
|
|
self._timezone: str = (
|
|
self._getTimezoneGMTString()
|
|
) # Timezone format: 'GMT+01:00'
|
|
self._gtoken: str | None = None
|
|
self._token: str | None = None
|
|
self._token_expiration: datetime | None = None
|
|
self._login_response: dict = {}
|
|
self.mask_credentials: bool = True
|
|
self.encrypt_body: bool = False
|
|
self.request_count: RequestCounter = RequestCounter()
|
|
self._request_delay: float = SolixDefaults.REQUEST_DELAY_DEF
|
|
self._last_request_time: datetime | None = None
|
|
|
|
# Define Encryption for password, using ECDH assymetric key exchange for shared secret calculation, which must be used to encrypt the password using AES-256-CBC with seed of 16
|
|
# uncompressed public key from EU Anker server in the format 04 [32 byte x vlaue] [32 byte y value]
|
|
# Both, the EU and COM Anker server public key is the same and login response is provided for both upon an authentication request
|
|
# However, if country ID assignment is to wrong server, no sites or devices will be listed for the authenticated account.
|
|
self._api_public_key_hex = "04c5c00c4f8d1197cc7c3167c52bf7acb054d722f0ef08dcd7e0883236e0d72a3868d9750cb47fa4619248f3d83f0f662671dadc6e2d31c2f41db0161651c7c076"
|
|
self._curve = (
|
|
ec.SECP256R1()
|
|
) # Encryption curve SECP256R1 (identical to prime256v1)
|
|
self._ecdh = ec.generate_private_key(
|
|
self._curve, default_backend()
|
|
) # returns EllipticCurvePrivateKey
|
|
self._public_key = self._ecdh.public_key() # returns EllipticCurvePublicKey
|
|
self._shared_key = self._ecdh.exchange(
|
|
ec.ECDH(),
|
|
ec.EllipticCurvePublicKey.from_encoded_point(
|
|
self._curve, bytes.fromhex(self._api_public_key_hex)
|
|
),
|
|
) # returns bytes of shared secret
|
|
|
|
# track active devices bound to any site
|
|
self._site_devices: set = set()
|
|
|
|
# Define class variables saving the most recent site and device data
|
|
self.nickname: str = ""
|
|
self.sites: dict = {}
|
|
self.devices: dict = {}
|
|
|
|
def _md5(self, text: str) -> str:
|
|
h = hashes.Hash(hashes.MD5())
|
|
h.update(text.encode("utf-8"))
|
|
return h.finalize().hex()
|
|
|
|
def _getTimezoneGMTString(self) -> str:
|
|
"""Construct timezone GMT string with offset, e.g. GMT+01:00."""
|
|
tzo = datetime.now().astimezone().strftime("%z")
|
|
return f"GMT{tzo[:3]}:{tzo[3:5]}"
|
|
|
|
def _encryptApiData(self, raw: str) -> str:
|
|
"""Return Base64 encoded secret as utf-8 decoded string using the shared secret with seed of 16 for the encryption."""
|
|
# Password must be UTF-8 encoded and AES-256-CBC encrypted with block size of 16
|
|
aes = Cipher(
|
|
algorithms.AES(self._shared_key),
|
|
modes.CBC(self._shared_key[0:16]),
|
|
backend=default_backend(),
|
|
)
|
|
encryptor = aes.encryptor()
|
|
# Use default PKCS7 padding for incomplete AES blocks
|
|
padder = padding.PKCS7(128).padder()
|
|
raw_padded = padder.update(raw.encode("utf-8")) + padder.finalize()
|
|
return (b64encode(encryptor.update(raw_padded) + encryptor.finalize())).decode(
|
|
"utf-8"
|
|
)
|
|
|
|
def mask_values(self, data: dict | str, *args: str) -> dict | str:
|
|
"""Mask values in dictionary for provided keys or the given string."""
|
|
if self.mask_credentials:
|
|
if isinstance(data, str):
|
|
datacopy: dict = {"text": data}
|
|
args: list = ["text"]
|
|
else:
|
|
datacopy = data.copy()
|
|
for key in args:
|
|
if old := datacopy.get(key):
|
|
new = ""
|
|
for idx in range(0, len(old), 16):
|
|
new = new + (
|
|
f"{old[idx:idx+2]}###masked###{old[idx+14:idx+16]}"
|
|
)
|
|
new = new[: len(old)]
|
|
datacopy[key] = new
|
|
if isinstance(data, str):
|
|
return datacopy.get("text")
|
|
return datacopy
|
|
return data
|
|
|
|
def _loadFromFile(self, filename: str) -> dict:
|
|
"""Load json data from given file for testing."""
|
|
if self.mask_credentials:
|
|
masked_filename = filename.replace(
|
|
self._email, self.mask_values(self._email)
|
|
)
|
|
else:
|
|
masked_filename = filename
|
|
try:
|
|
if os.path.isfile(filename):
|
|
with open(filename, encoding="utf-8") as file:
|
|
data = json.load(file)
|
|
self._logger.debug("Loaded JSON from file %s:", masked_filename)
|
|
self._logger.debug(
|
|
"Data: %s",
|
|
self.mask_values(
|
|
data, "user_id", "auth_token", "email", "geo_key"
|
|
),
|
|
)
|
|
return data
|
|
except OSError as err:
|
|
self._logger.error(
|
|
"ERROR: Failed to load JSON from file %s", masked_filename
|
|
)
|
|
self._logger.error(err)
|
|
return {}
|
|
|
|
def _saveToFile(self, filename: str, data: dict | None = None) -> bool:
|
|
"""Save json data to given file for testing."""
|
|
if self.mask_credentials:
|
|
masked_filename = filename.replace(
|
|
self._email, self.mask_values(self._email)
|
|
)
|
|
else:
|
|
masked_filename = filename
|
|
if not data:
|
|
data = {}
|
|
try:
|
|
with open(filename, "w", encoding="utf-8") as file:
|
|
json.dump(data, file, indent=2)
|
|
self._logger.debug("Saved JSON to file %s:", masked_filename)
|
|
return True
|
|
except OSError as err:
|
|
self._logger.error("ERROR: Failed to save JSON to file %s", masked_filename)
|
|
self._logger.error(err)
|
|
return False
|
|
|
|
def _update_site( # noqa: C901
|
|
self,
|
|
siteId: str,
|
|
details: dict,
|
|
) -> None:
|
|
"""Update the internal sites dictionary with data provided for the nested site details dictionary.
|
|
|
|
This method is used to consolidate site details from various less frequent requests that are not covered with the update_sites method.
|
|
"""
|
|
# lookup old site details if any
|
|
if siteId in self.sites:
|
|
site_details = (self.sites[siteId]).get("site_details") or {}
|
|
site_details.update(details)
|
|
else:
|
|
site_details = details
|
|
self.sites[siteId] = {}
|
|
self.sites[siteId]["site_details"] = site_details
|
|
|
|
def _update_dev( # noqa: C901
|
|
self,
|
|
devData: dict,
|
|
devType: str | None = None,
|
|
siteId: str | None = None,
|
|
isAdmin: bool | None = None,
|
|
) -> str | None:
|
|
"""Update the internal device details dictionary with the given data. The device_sn key must be set in the data dict for the update to be applied.
|
|
|
|
This method is used to consolidate various device related key values from various requests under a common set of device keys.
|
|
"""
|
|
sn = devData.get("device_sn")
|
|
if sn:
|
|
device = self.devices.get(sn, {}) # lookup old device info if any
|
|
device.update({"device_sn": str(sn)})
|
|
if devType:
|
|
device.update({"type": devType.lower()})
|
|
if siteId:
|
|
device.update({"site_id": str(siteId)})
|
|
if isAdmin:
|
|
device.update({"is_admin": True})
|
|
elif isAdmin is False and device.get("is_admin") is None:
|
|
device.update({"is_admin": False})
|
|
calc_capacity = False # Flag whether capacity may need recalculation
|
|
for key, value in devData.items():
|
|
try:
|
|
if key in ["product_code", "device_pn"] and value:
|
|
device.update({"device_pn": str(value)})
|
|
# try to get type for standalone device from category definitions if not defined yet
|
|
if "type" not in device and hasattr(
|
|
SolixDeviceCategory, str(value)
|
|
):
|
|
device.update(
|
|
{"type": getattr(SolixDeviceCategory, str(value))}
|
|
)
|
|
elif key in ["device_name"] and value:
|
|
if value != device.get("name", ""):
|
|
calc_capacity = True
|
|
device.update({"name": str(value)})
|
|
elif key in ["alias_name"] and value:
|
|
device.update({"alias": str(value)})
|
|
elif key in ["device_sw_version"] and value:
|
|
device.update({"sw_version": str(value)})
|
|
elif key in ["wifi_online"]:
|
|
device.update({"wifi_online": bool(value)})
|
|
elif key in ["wireless_type"]:
|
|
device.update({"wireless_type": str(value)})
|
|
elif key in ["wifi_name"] and value:
|
|
# wifi_name can be empty in details if device connected, avoid clearing name
|
|
device.update({"wifi_name": str(value)})
|
|
elif key in ["wifi_signal"]:
|
|
device.update({"wifi_signal": str(value)})
|
|
elif key in ["bt_ble_mac"] and value:
|
|
device.update({"bt_ble_mac": str(value)})
|
|
elif key in ["battery_power"] and value:
|
|
# This is a percentage value for the battery state of charge, not power
|
|
device.update({"battery_soc": str(value)})
|
|
elif key in ["charging_power"]:
|
|
device.update({"charging_power": str(value)})
|
|
elif key in ["photovoltaic_power"]:
|
|
device.update({"input_power": str(value)})
|
|
elif key in ["output_power"]:
|
|
device.update({"output_power": str(value)})
|
|
# solarbank info shows the load preset per device, which is identical to device parallel_home_load for 2 solarbanks, or current homeload for single solarbank
|
|
elif key in ["set_load_power", "parallel_home_load"] and value:
|
|
# Value may include unit, remove unit to have content consistent
|
|
device.update({"set_output_power": str(value).replace("W", "")})
|
|
# The current_home_load from get_device_load always shows the system wide settings made via the schedule
|
|
elif key in ["current_home_load"] and value:
|
|
# Value may include unit, remove unit to have content consistent
|
|
device.update(
|
|
{"set_system_output_power": str(value).replace("W", "")}
|
|
)
|
|
# Value for device home load may be empty for single solarbank, use this setting also for device preset in this case
|
|
if not device.get("set_output_power"):
|
|
device.update(
|
|
{"set_output_power": str(value).replace("W", "")}
|
|
)
|
|
elif key in ["power_unit"]:
|
|
device.update({"power_unit": str(value)})
|
|
elif key in ["status"]:
|
|
device.update({"status": str(value)})
|
|
# decode the status into a description
|
|
description = SolixDeviceStatus.unknown.name
|
|
for status in SolixDeviceStatus:
|
|
if str(value) == status.value:
|
|
description = status.name
|
|
break
|
|
device.update({"status_desc": description})
|
|
elif key in ["charging_status"]:
|
|
device.update({"charging_status": str(value)})
|
|
# decode the status into a description
|
|
description = SolarbankStatus.unknown.name
|
|
for status in SolarbankStatus:
|
|
if str(value) == status.value:
|
|
description = status.name
|
|
break
|
|
# check if battery has bypass during charge (if output during charge)
|
|
# NOTE: charging power may be updated after initial device details update
|
|
# NOTE: If status is 3=charging and larger than preset but nothing goes out, the charge priority is active (e.g. 0 Watt switch)
|
|
# This key can be passed separately, make sure the other values are looked up in passed data first, then in device details
|
|
preset = devData.get("set_load_power") or device(
|
|
"set_output_power"
|
|
)
|
|
out = devData.get("output_power") or device.get("output_power")
|
|
solar = devData.get("photovoltaic_power") or device.get(
|
|
"input_power"
|
|
)
|
|
if (
|
|
description == SolarbankStatus.charge.name
|
|
and preset is not None
|
|
and out is not None
|
|
and solar is not None
|
|
):
|
|
with contextlib.suppress(ValueError):
|
|
if int(out) == 0 and int(solar) > int(preset):
|
|
# Bypass but 0 W output must be active charge priority
|
|
description = SolarbankStatus.charge_priority.name
|
|
elif int(out) > 0:
|
|
# Charge with output must be bypass charging
|
|
description = SolarbankStatus.charge_bypass.name
|
|
device.update({"charging_status_desc": description})
|
|
elif key in ["bws_surplus"]:
|
|
device.update({"bws_surplus": str(value)})
|
|
elif key in ["charge"]:
|
|
device.update({"charge": bool(value)})
|
|
elif key in ["auto_upgrade"]:
|
|
device.update({"auto_upgrade": bool(value)})
|
|
elif (
|
|
key in ["power_cutoff", "output_cutoff_data"]
|
|
and str(value).isdigit()
|
|
):
|
|
device.update({"power_cutoff": int(value)})
|
|
elif key in ["power_cutoff_data"] and value:
|
|
device.update({"power_cutoff_data": list(value)})
|
|
elif key in ["fittings"]:
|
|
# update nested dictionary
|
|
if "fittings" in device:
|
|
device["fittings"].update(dict(value))
|
|
else:
|
|
device["fittings"] = dict(value)
|
|
elif key in ["solar_info"] and isinstance(value, dict) and value:
|
|
# remove unnecessary keys from solar_info
|
|
keylist = value.keys()
|
|
for key in [
|
|
x
|
|
for x in ("brand_id", "model_img", "version", "ota_status")
|
|
if x in keylist
|
|
]:
|
|
value.pop(key, None)
|
|
device.update({"solar_info": dict(value)})
|
|
# schedule is currently a site wide setting. However, we save this with device details to retain info across site updates
|
|
# When individual device schedules are support in future, this info is needed per device anyway
|
|
elif key in ["schedule"] and isinstance(value, dict) and value:
|
|
device.update({"schedule": dict(value)})
|
|
# default active presets to None
|
|
device.pop("preset_system_output_power", None)
|
|
device.pop("preset_allow_export", None)
|
|
device.pop("preset_charge_priority", None)
|
|
# get actual presets from current slot
|
|
now = datetime.now().time().replace(microsecond=0)
|
|
# set now to new daytime if close to end of day
|
|
if now >= datetime.strptime("23:59:58", "%H:%M:%S").time():
|
|
now = datetime.strptime("00:00", "%H:%M").time()
|
|
for slot in value.get("ranges") or []:
|
|
with contextlib.suppress(ValueError):
|
|
start_time = datetime.strptime(
|
|
slot.get("start_time") or "00:00", "%H:%M"
|
|
).time()
|
|
end_time = slot.get("end_time") or "00:00"
|
|
# "24:00" format not supported in strptime
|
|
if end_time == "24:00":
|
|
end_time = datetime.strptime(
|
|
"23:59:59", "%H:%M:%S"
|
|
).time()
|
|
else:
|
|
end_time = datetime.strptime(
|
|
end_time, "%H:%M"
|
|
).time()
|
|
if start_time <= now < end_time:
|
|
device.update(
|
|
{
|
|
"preset_system_output_power": (
|
|
slot.get("appliance_loads") or [{}]
|
|
)[0].get("power"),
|
|
"preset_allow_export": slot.get("turn_on"),
|
|
"preset_charge_priority": slot.get(
|
|
"charge_priority"
|
|
),
|
|
}
|
|
)
|
|
|
|
# inverter specific keys
|
|
elif key in ["generate_power"]:
|
|
device.update({"generate_power": str(value)})
|
|
|
|
# generate extra values when certain conditions are met
|
|
if key in ["battery_power"] or calc_capacity:
|
|
# generate battery values when soc updated or device name changed or PN is known
|
|
if not (cap := device.get("battery_capacity")):
|
|
pn = device.get("device_pn") or ""
|
|
if hasattr(SolixDeviceCapacity, pn):
|
|
# get battery capacity from known PNs
|
|
cap = getattr(SolixDeviceCapacity, pn)
|
|
elif device.get("type") == SolixDeviceType.SOLARBANK.value:
|
|
# Derive battery capacity in Wh from latest solarbank name or alias if available
|
|
cap = (
|
|
device.get("name", "")
|
|
or devData.get("device_name", "")
|
|
or device.get("alias", "")
|
|
).replace("Solarbank E", "")
|
|
soc = devData.get("battery_power", "") or device.get(
|
|
"battery_soc", ""
|
|
)
|
|
# Calculate remaining energy in Wh and add values
|
|
if cap and soc and str(cap).isdigit() and str(soc).isdigit():
|
|
device.update(
|
|
{
|
|
"battery_capacity": str(cap),
|
|
"battery_energy": str(
|
|
int(int(cap) * int(soc) / 100)
|
|
),
|
|
}
|
|
)
|
|
except Exception as err: # pylint: disable=broad-exception-caught
|
|
self._logger.error(
|
|
"%s occured when updating device details for key %s with value %s: %s",
|
|
type(err),
|
|
key,
|
|
value,
|
|
err,
|
|
)
|
|
|
|
self.devices.update({str(sn): device})
|
|
return sn
|
|
|
|
def testDir(self, subfolder: str | None = None) -> str:
|
|
"""Get or set the subfolder for local API test files."""
|
|
if not subfolder or subfolder == self._testdir:
|
|
return self._testdir
|
|
if not os.path.isdir(subfolder):
|
|
self._logger.error("Specified test folder does not exist: %s", subfolder)
|
|
else:
|
|
self._testdir = subfolder
|
|
self._logger.info("Set Api test folder to: %s", subfolder)
|
|
return self._testdir
|
|
|
|
def logLevel(self, level: int | None = None) -> int:
|
|
"""Get or set the logger log level."""
|
|
if level is not None and isinstance(level, int):
|
|
self._logger.setLevel(level)
|
|
self._logger.info("Set log level to: %s", level)
|
|
return self._logger.getEffectiveLevel()
|
|
|
|
def requestDelay(self, delay: float | None = None) -> float:
|
|
"""Get or set the api request delay in seconds."""
|
|
if (
|
|
delay is not None
|
|
and isinstance(delay, float | int)
|
|
and float(delay) != float(self._request_delay)
|
|
):
|
|
self._request_delay = float(
|
|
min(
|
|
SolixDefaults.REQUEST_DELAY_MAX,
|
|
max(SolixDefaults.REQUEST_DELAY_MIN, delay),
|
|
)
|
|
)
|
|
self._logger.info(
|
|
"Set api request delay to %.3f seconds", self._request_delay
|
|
)
|
|
return self._request_delay
|
|
|
|
async def _wait_delay(self, delay: float | None = None) -> None:
|
|
"""Wait at least for the defined Api request delay or for the provided delay in seconds since the last request occured."""
|
|
if delay is not None and isinstance(delay, float | int):
|
|
delay = float(
|
|
min(
|
|
SolixDefaults.REQUEST_DELAY_MAX,
|
|
max(SolixDefaults.REQUEST_DELAY_MIN, delay),
|
|
)
|
|
)
|
|
else:
|
|
delay = self._request_delay
|
|
if isinstance(self._last_request_time, datetime):
|
|
await sleep(
|
|
max(
|
|
0,
|
|
delay - (datetime.now() - self._last_request_time).total_seconds(),
|
|
)
|
|
)
|
|
|
|
async def async_authenticate(self, restart: bool = False) -> bool:
|
|
"""Authenticate with server and get an access token. If restart is not enforced, cached login data may be used to obtain previous token."""
|
|
if restart:
|
|
self._token = None
|
|
self._token_expiration = None
|
|
self._gtoken = None
|
|
self._loggedIn = False
|
|
self._authFileTime = 0
|
|
self.nickname = ""
|
|
if os.path.isfile(self._authFile):
|
|
with contextlib.suppress(Exception):
|
|
os.remove(self._authFile)
|
|
# First check if cached login response is availble and login params can be filled, otherwise query server for new login tokens
|
|
if os.path.isfile(self._authFile):
|
|
data = self._loadFromFile(self._authFile)
|
|
self._authFileTime = os.path.getmtime(self._authFile)
|
|
self._logger.debug(
|
|
"Cached Login for %s from %s:",
|
|
self.mask_values(self._email),
|
|
datetime.fromtimestamp(self._authFileTime).isoformat(),
|
|
)
|
|
self._logger.debug(
|
|
"%s",
|
|
self.mask_values(data, "user_id", "auth_token", "email", "geo_key"),
|
|
)
|
|
# clear retry attempt to allow retry for authentication refresh
|
|
self._retry_attempt = False
|
|
else:
|
|
self._logger.debug("Fetching new Login credentials from server")
|
|
now = datetime.now().astimezone()
|
|
# set retry attempt to avoid retry on failed authentication
|
|
self._retry_attempt = True
|
|
auth_resp = await self.request(
|
|
"post",
|
|
_API_LOGIN,
|
|
json={
|
|
"ab": self._countryId,
|
|
"client_secret_info": {
|
|
"public_key": self._public_key.public_bytes(
|
|
serialization.Encoding.X962,
|
|
serialization.PublicFormat.UncompressedPoint,
|
|
).hex() # Uncompressed format of points in hex (0x04 + 32 Byte + 32 Byte)
|
|
},
|
|
"enc": 0,
|
|
"email": self._email,
|
|
"password": self._encryptApiData(
|
|
self._password
|
|
), # AES-256-CBC encrypted by the ECDH shared key derived from server public key and local private key
|
|
"time_zone": round(
|
|
datetime.utcoffset(now).total_seconds() * 1000
|
|
), # timezone offset in ms, e.g. 'GMT+01:00' => 3600000
|
|
"transaction": str(
|
|
int(systime.mktime(now.timetuple()) * 1000)
|
|
), # Unix Timestamp in ms as string
|
|
},
|
|
)
|
|
data = auth_resp.get("data", {})
|
|
self._logger.debug(
|
|
"Login Response: %s",
|
|
self.mask_values(data, "user_id", "auth_token", "email", "geo_key"),
|
|
)
|
|
self._loggedIn = True
|
|
# Cache login response in file for reuse
|
|
with open(self._authFile, "w", encoding="utf-8") as authfile:
|
|
json.dump(data, authfile, indent=2, skipkeys=True)
|
|
self._logger.debug("Response cached in file: %s", self._authFile)
|
|
self._authFileTime = os.path.getmtime(self._authFile)
|
|
|
|
# Update the login params
|
|
self._login_response = {}
|
|
self._login_response.update(data)
|
|
self._token = data.get("auth_token")
|
|
self.nickname = data.get("nick_name")
|
|
if data.get("token_expires_at"):
|
|
self._token_expiration = datetime.fromtimestamp(
|
|
data.get("token_expires_at")
|
|
)
|
|
else:
|
|
self._token_expiration = None
|
|
self._loggedIn = False
|
|
if data.get("user_id"):
|
|
self._gtoken = self._md5(
|
|
data.get("user_id")
|
|
) # gtoken is MD5 hash of user_id from login response
|
|
else:
|
|
self._gtoken = None
|
|
self._loggedIn = False
|
|
return self._loggedIn
|
|
|
|
async def request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
*,
|
|
headers: dict | None = None,
|
|
json: dict | None = None, # pylint: disable=redefined-outer-name
|
|
) -> dict:
|
|
"""Handle all requests to the API. This is also called recursively by login requests if necessary."""
|
|
if not headers:
|
|
headers = {}
|
|
if not json:
|
|
data = {}
|
|
if (
|
|
self._token_expiration
|
|
and (self._token_expiration - datetime.now()).total_seconds() < 60
|
|
):
|
|
self._logger.warning("WARNING: Access token expired, fetching a new one")
|
|
await self.async_authenticate(restart=True)
|
|
# For non-Login requests, ensure authentication will be updated if not logged in yet or cached file was refreshed
|
|
if endpoint != _API_LOGIN and (
|
|
not self._loggedIn
|
|
or (
|
|
os.path.isfile(self._authFile)
|
|
and self._authFileTime != os.path.getmtime(self._authFile)
|
|
)
|
|
):
|
|
await self.async_authenticate()
|
|
|
|
url: str = f"{self._api_base}/{endpoint}"
|
|
mergedHeaders = _API_HEADERS
|
|
mergedHeaders.update(headers)
|
|
if self._countryId:
|
|
mergedHeaders.update({"Country": self._countryId})
|
|
if self._timezone:
|
|
mergedHeaders.update({"Timezone": self._timezone})
|
|
if self._token:
|
|
mergedHeaders.update({"x-auth-token": self._token})
|
|
mergedHeaders.update({"gtoken": self._gtoken})
|
|
if self.encrypt_body:
|
|
# TODO(#70): Test and Support optional encryption for body
|
|
# Unknowns: Which string is signed? Method + Request?
|
|
# How does the signing work?
|
|
# What is the key-ident? Ident of the shared secret?
|
|
# What is request-once?
|
|
# Is the separate timestamp relevant for encryption?
|
|
pass
|
|
# Extra Api header arguments used by the mobile App for request encryption
|
|
# Response will return encrypted boy and a signature
|
|
# mergedHeaders.update({
|
|
# "x-replay-info": "replay",
|
|
# "x-encryption-info": "algo_ecdh",
|
|
# "x-signature": "", # 32 Bit hex
|
|
# "x-request-once": "", # 16 Bit hex
|
|
# "x-key-ident": "", # 16 Bit hex
|
|
# "x-request-ts": str(
|
|
# int(systime.mktime(datetime.now().timetuple()) * 1000)
|
|
# ), # Unix Timestamp in ms as string
|
|
# })
|
|
|
|
self._logger.debug("Request Url: %s %s", method.upper(), url)
|
|
self._logger.debug(
|
|
"Request Headers: %s",
|
|
self.mask_values(mergedHeaders, "x-auth-token", "gtoken"),
|
|
)
|
|
if endpoint == _API_LOGIN:
|
|
self._logger.debug(
|
|
"Request Body: %s",
|
|
self.mask_values(json, "user_id", "auth_token", "email", "geo_key"),
|
|
)
|
|
else:
|
|
self._logger.debug("Request Body: %s", json)
|
|
body_text = ""
|
|
# enforce configured delay between any subsequent request
|
|
await self._wait_delay()
|
|
async with self._session.request(
|
|
method, url, headers=mergedHeaders, json=json
|
|
) as resp:
|
|
try:
|
|
self._last_request_time = datetime.now()
|
|
self.request_count.add(self._last_request_time)
|
|
self._logger.debug(
|
|
"%s request %s %s response received", self.nickname, method, url
|
|
)
|
|
# print response headers
|
|
self._logger.debug("Response Headers: %s", resp.headers)
|
|
# get first the body text for usage in error detail logging if necessary
|
|
body_text = await resp.text()
|
|
data = {}
|
|
resp.raise_for_status() # any response status >= 400
|
|
if (data := await resp.json(content_type=None)) and self.encrypt_body:
|
|
# TODO(#70): Test and Support optional encryption for body
|
|
# data dict has to be decoded when encrypted
|
|
# if signature := data.get("signature"):
|
|
# pass
|
|
pass
|
|
if not data:
|
|
self._logger.error("Response Text: %s", body_text)
|
|
raise ClientError(f"No data response while requesting {endpoint}")
|
|
|
|
if endpoint == _API_LOGIN:
|
|
self._logger.debug(
|
|
"Response Data: %s",
|
|
self.mask_values(
|
|
data, "user_id", "auth_token", "email", "geo_key"
|
|
),
|
|
)
|
|
else:
|
|
self._logger.debug("Response Data: %s", data)
|
|
# reset retry flag only when valid token received and not another login request
|
|
self._retry_attempt = False
|
|
|
|
# check the Api response status code in the data
|
|
errors.raise_error(data)
|
|
|
|
# valid response at this point, mark login and return data
|
|
self._loggedIn = True
|
|
return data # noqa: TRY300
|
|
|
|
# Exception from ClientSession based on standard response status codes
|
|
except ClientError as err:
|
|
self._logger.error("Api Request Error: %s", err)
|
|
self._logger.error("Response Text: %s", body_text)
|
|
# Prepare data dict for Api error lookup
|
|
if not data:
|
|
data = {}
|
|
if not hasattr(data, "code"):
|
|
data["code"] = resp.status
|
|
if not hasattr(data, "msg"):
|
|
data["msg"] = body_text
|
|
if resp.status in [401, 403]:
|
|
# Unauthorized or forbidden request
|
|
# reattempt autentication with same credentials if cached token was kicked out
|
|
# retry attempt is set if login response data were not cached to fail immediately
|
|
if not self._retry_attempt:
|
|
self._logger.warning("Login failed, retrying authentication")
|
|
if await self.async_authenticate(restart=True):
|
|
return await self.request(
|
|
method, endpoint, headers=headers, json=json
|
|
)
|
|
self._logger.error("Re-Login failed for user %s", self._email)
|
|
errors.raise_error(
|
|
data, prefix=f"Login failed for user {self._email}"
|
|
)
|
|
# catch error if Api code not defined
|
|
raise errors.AuthorizationError(
|
|
f"Login failed for user {self._email}"
|
|
) from err
|
|
if resp.status in [429]:
|
|
# Too Many Requests, add stats to message
|
|
errors.raise_error(
|
|
data, prefix=f"Too Many Requests: {self.request_count}"
|
|
)
|
|
else:
|
|
# raise Anker Solix error if code is known
|
|
errors.raise_error(data)
|
|
# raise Client error otherwise
|
|
raise ClientError(
|
|
f"Api Request Error: {err}", f"response={body_text}"
|
|
) from err
|
|
except errors.AnkerSolixError as err: # Other Exception from API
|
|
self._logger.error("%s", err)
|
|
self._logger.error("Response Text: %s", body_text)
|
|
raise
|
|
|
|
async def update_sites(self, fromFile: bool = False) -> dict:
|
|
"""Get the latest info for all accessible sites and update class site and device variables.
|
|
|
|
Example data:
|
|
{'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c':
|
|
{'site_info': {'site_id': 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', 'site_name': 'BKW', 'site_img': '', 'device_type_list': [3], 'ms_type': 1, 'power_site_type': 2, 'is_allow_delete': True},
|
|
'site_admin': True,
|
|
'home_info': {'home_name': 'Home', 'home_img': '', 'charging_power': '0.00', 'power_unit': 'W'},
|
|
'solar_list': [],
|
|
'pps_info': {'pps_list': [], 'total_charging_power': '0.00', 'power_unit': 'W', 'total_battery_power': '0.00', 'updated_time': '', 'pps_status': 0},
|
|
'statistics': [{'type': '1', 'total': '89.75', 'unit': 'kwh'}, {'type': '2', 'total': '89.48', 'unit': 'kg'}, {'type': '3', 'total': '35.90', 'unit': '€'}],
|
|
'topology_type': '1',
|
|
'solarbank_info': {'solarbank_list': [{'device_pn': 'A17C0', 'device_sn': '9JVB42LJK8J0P5RY', 'device_name': 'Solarbank E1600',
|
|
'device_img': 'https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png',
|
|
'battery_power': '75', 'bind_site_status': '', 'charging_power': '0', 'power_unit': 'W', 'charging_status': '2', 'status': '0', 'wireless_type': '1', 'main_version': '', 'photovoltaic_power': '0',
|
|
'output_power': '0', 'create_time': 1695392386, 'set_load_power': ''}],
|
|
'total_charging_power': '0', 'power_unit': 'W', 'charging_status': '0', 'total_battery_power': '0.00', 'updated_time': '2023-12-28 18:53:27', 'total_photovoltaic_power': '0', 'total_output_power': '0.00',
|
|
'display_set_power': False},
|
|
'retain_load': '0W',
|
|
'updated_time': '01-01-0001 00:00:00',
|
|
'power_site_type': 2,
|
|
'site_id': 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c',
|
|
'powerpanel_list': []}}
|
|
"""
|
|
self._logger.debug("Updating Sites data")
|
|
new_sites = {}
|
|
self._logger.debug("Getting site list")
|
|
sites = await self.get_site_list(fromFile=fromFile)
|
|
self._site_devices = set()
|
|
for site in sites.get("site_list", []):
|
|
if myid := site.get("site_id"):
|
|
# Update site info
|
|
mysite = self.sites.get(myid, {})
|
|
siteInfo = mysite.get("site_info", {})
|
|
siteInfo.update(site)
|
|
mysite.update(
|
|
{"type": SolixDeviceType.SYSTEM.value, "site_info": siteInfo}
|
|
)
|
|
admin = (
|
|
siteInfo.get("ms_type", 0) in [0, 1]
|
|
) # add boolean key to indicate whether user is site admin (ms_type 1 or not known) and can query device details
|
|
mysite.update({"site_admin": admin})
|
|
# Update scene info for site
|
|
self._logger.debug("Getting scene info for site")
|
|
scene = await self.get_scene_info(myid, fromFile=fromFile)
|
|
mysite.update(scene)
|
|
new_sites.update({myid: mysite})
|
|
# Update device details from scene info
|
|
sb_total_charge = (mysite.get("solarbank_info", {})).get(
|
|
"total_charging_power", ""
|
|
)
|
|
sb_total_output = (mysite.get("solarbank_info", {})).get(
|
|
"total_output_power", ""
|
|
)
|
|
sb_total_solar = (mysite.get("solarbank_info", {})).get(
|
|
"total_photovoltaic_power", ""
|
|
)
|
|
sb_total_charge_calc = 0
|
|
sb_charges = {}
|
|
sb_list = (mysite.get("solarbank_info", {})).get("solarbank_list", [])
|
|
for index, solarbank in enumerate(sb_list):
|
|
# work around for device_name which is actually the device_alias in scene info
|
|
if "device_name" in solarbank:
|
|
# modify only a copy of the device dict to prevent changing the scene info dict
|
|
solarbank = dict(solarbank).copy()
|
|
solarbank.update({"alias_name": solarbank.pop("device_name")})
|
|
# work around for system and device output presets, which are not set correctly and cannot be queried with load schedule for shared accounts
|
|
if not str(solarbank.get("set_load_power")).isdigit():
|
|
total_preset = str(mysite.get("retain_load", "")).replace(
|
|
"W", ""
|
|
)
|
|
if total_preset.isdigit():
|
|
solarbank.update(
|
|
{
|
|
"set_load_power": f"{(int(total_preset)/len(sb_list)):.0f}",
|
|
"current_home_load": total_preset,
|
|
}
|
|
)
|
|
# Work around for weird charging power fields in SB totals and device list: They have same names, but completely different usage
|
|
# SB total charging power shows only power into the battery. At this time, charging power in device list seems to reflect the output power. This is seen for status 3
|
|
# SB total charging power show 0 when discharging, but then device charging power shows correct value. This is seen for status 2
|
|
# Conclusion: SB total charging power is correct total power INTO the batteries. When discharging it is 0
|
|
# Device list charging power is ONLY correct power OUT of the batteries. When charging it is 0 or shows the output power.
|
|
# Need to simplify this per device details and SB totals, will use positive value on both for charging power and negative for discharging power
|
|
# calculate estimate based on total for proportional split across available solarbanks and their calculated charge power
|
|
with contextlib.suppress(ValueError):
|
|
charge_calc = 0
|
|
power_in = int(solarbank.get("photovoltaic_power", ""))
|
|
power_out = int(solarbank.get("output_power", ""))
|
|
# power_charge = int(solarbank.get("charging_power", "")) # This value seems to reflect the output power, which is corect for status 2, but may be wrong for other states
|
|
charge_calc = power_in - power_out
|
|
solarbank["charging_power"] = str(
|
|
charge_calc
|
|
) # allow negative values
|
|
sb_total_charge_calc += charge_calc
|
|
mysite["solarbank_info"]["solarbank_list"][index] = solarbank
|
|
new_sites.update({myid: mysite})
|
|
sn = self._update_dev(
|
|
solarbank,
|
|
devType=SolixDeviceType.SOLARBANK.value,
|
|
siteId=myid,
|
|
isAdmin=admin,
|
|
)
|
|
if sn:
|
|
self._site_devices.add(sn)
|
|
sb_charges[sn] = charge_calc
|
|
# as time progressed, update actual schedule slot presets from a cached schedule if available
|
|
if schedule := (self.devices.get(sn, {})).get("schedule"):
|
|
self._update_dev(
|
|
{
|
|
"device_sn": sn,
|
|
"schedule": schedule,
|
|
}
|
|
)
|
|
# adjust calculated SB charge to match total
|
|
if len(sb_charges) == len(sb_list) and str(sb_total_charge).isdigit():
|
|
sb_total_charge = int(sb_total_charge)
|
|
if sb_total_charge_calc < 0:
|
|
with contextlib.suppress(ValueError):
|
|
# discharging, adjust sb total charge value in scene info and allow negativ value to indicate discharge
|
|
sb_total_charge = float(sb_total_solar) - float(
|
|
sb_total_output
|
|
)
|
|
mysite["solarbank_info"]["total_charging_power"] = str(
|
|
sb_total_charge
|
|
)
|
|
for sn, charge in sb_charges.items():
|
|
self.devices[sn]["charging_power"] = str(
|
|
0
|
|
if sb_total_charge_calc == 0
|
|
else int(sb_total_charge / sb_total_charge_calc * charge)
|
|
)
|
|
# Update also the charge status description which may change after charging power correction
|
|
charge_status = self.devices[sn].get("charging_status")
|
|
if charge_status == SolarbankStatus.charge:
|
|
self._update_dev(
|
|
{
|
|
"device_sn": sn,
|
|
"charging_status": charge_status,
|
|
}
|
|
)
|
|
# make sure to write back any changes to the solarbank info in sites dict
|
|
new_sites.update({myid: mysite})
|
|
|
|
for pps in (mysite.get("pps_info", {})).get("pps_list", []):
|
|
# work around for device_name which is actually the device_alias in scene info
|
|
if "device_name" in pps:
|
|
# modify only a copy of the device dict to prevent changing the scene info dict
|
|
pps = dict(pps).copy()
|
|
pps.update({"alias_name": pps.pop("device_name")})
|
|
sn = self._update_dev(
|
|
pps,
|
|
devType=SolixDeviceType.PPS.value,
|
|
siteId=myid,
|
|
isAdmin=admin,
|
|
)
|
|
if sn:
|
|
self._site_devices.add(sn)
|
|
for solar in mysite.get("solar_list", []):
|
|
# work around for device_name which is actually the device_alias in scene info
|
|
if "device_name" in solar:
|
|
# modify only a copy of the device dict to prevent changing the scene info dict
|
|
solar = dict(solar).copy()
|
|
solar.update({"alias_name": solar.pop("device_name")})
|
|
sn = self._update_dev(
|
|
solar,
|
|
devType=SolixDeviceType.INVERTER.value,
|
|
siteId=myid,
|
|
isAdmin=admin,
|
|
)
|
|
if sn:
|
|
self._site_devices.add(sn)
|
|
for powerpanel in mysite.get("powerpanel_list", []):
|
|
# work around for device_name which is actually the device_alias in scene info
|
|
if "device_name" in powerpanel:
|
|
# modify only a copy of the device dict to prevent changing the scene info dict
|
|
powerpanel = dict(powerpanel).copy()
|
|
powerpanel.update({"alias_name": powerpanel.pop("device_name")})
|
|
sn = self._update_dev(
|
|
powerpanel,
|
|
devType=SolixDeviceType.POWERPANEL.value,
|
|
siteId=myid,
|
|
isAdmin=admin,
|
|
)
|
|
if sn:
|
|
self._site_devices.add(sn)
|
|
# Write back the updated sites
|
|
self.sites = new_sites
|
|
return self.sites
|
|
|
|
async def update_site_details(
|
|
self, fromFile: bool = False, exclude: set | None = None
|
|
) -> dict:
|
|
"""Get the latest updates for additional site related details updated less frequently.
|
|
|
|
Most of theses requests return data only when user has admin rights for sites owning the devices.
|
|
To limit API requests, this update site details method should be called less frequently than update site method,
|
|
and it updates just the nested site_details dictionary in the sites dictionary.
|
|
"""
|
|
# define excluded categories to skip for queries
|
|
if not exclude:
|
|
exclude = set()
|
|
self._logger.debug("Updating Sites Details")
|
|
# Fetch unread account messages once and put in site details for all sites
|
|
self._logger.debug("Getting unread messages indicator")
|
|
await self.get_message_unread(fromFile=fromFile)
|
|
for site_id, site in self.sites.items():
|
|
# Fetch details that only work for site admins
|
|
if site.get("site_admin", False):
|
|
# Fetch site price and CO2 settings
|
|
if {ApiCategories.site_price} - exclude:
|
|
self._logger.debug("Getting price and CO2 settings for site")
|
|
await self.get_site_price(siteId=site_id, fromFile=fromFile)
|
|
return self.sites
|
|
|
|
async def update_device_details(
|
|
self, fromFile: bool = False, exclude: set | None = None
|
|
) -> dict:
|
|
"""Get the latest updates for additional device info updated less frequently.
|
|
|
|
Most of theses requests return data only when user has admin rights for sites owning the devices.
|
|
To limit API requests, this update device details method should be called less frequently than update site method,
|
|
which will also update most device details as found in the site data response.
|
|
"""
|
|
# define excluded device types or categories to skip for queries
|
|
if not exclude:
|
|
exclude = set()
|
|
self._logger.debug("Updating Device Details")
|
|
# Fetch firmware version of device
|
|
# This response will also contain unbound / standalone devices not added to a site
|
|
self._logger.debug("Getting bind devices")
|
|
await self.get_bind_devices(fromFile=fromFile)
|
|
# Get the setting for effective automated FW upgrades
|
|
if {ApiCategories.device_auto_upgrade} - exclude:
|
|
self._logger.debug("Getting OTA settings")
|
|
await self.get_auto_upgrade(fromFile=fromFile)
|
|
# Fetch other relevant device information that requires site id and/or SN
|
|
site_wifi: dict[str, list[dict | None]] = {}
|
|
for sn, device in self.devices.items():
|
|
site_id = device.get("site_id", "")
|
|
dev_Type = device.get("type", "")
|
|
|
|
# Fetch details that only work for site admins
|
|
if device.get("is_admin", False) and site_id:
|
|
# Fetch wifi networks and signal strength and map to usage of device
|
|
if wifi_index := device.get("wireless_type", ""):
|
|
self._logger.debug(
|
|
"Getting wifi list of site for mapping to device"
|
|
)
|
|
if str(wifi_index).isdigit():
|
|
wifi_index = int(wifi_index)
|
|
else:
|
|
wifi_index = 0
|
|
# fetch site wifi list if not queried yet
|
|
if site_id not in site_wifi:
|
|
site_wifi[site_id] = (
|
|
await self.get_wifi_list(siteId=site_id, fromFile=fromFile)
|
|
).get("wifi_info_list") or []
|
|
wifi_list = site_wifi.get(site_id, [{}])
|
|
if 0 < wifi_index <= len(wifi_list):
|
|
device.update(wifi_list[wifi_index - 1])
|
|
|
|
# Fetch device type specific details, if device type not excluded
|
|
|
|
if dev_Type in ({SolixDeviceType.SOLARBANK.value} - exclude):
|
|
# Fetch active Power Cutoff setting for solarbanks
|
|
if {ApiCategories.solarbank_cutoff} - exclude:
|
|
self._logger.debug("Getting Power Cutoff settings for device")
|
|
await self.get_power_cutoff(
|
|
siteId=site_id, deviceSn=sn, fromFile=fromFile
|
|
)
|
|
|
|
# Fetch defined inverter details for solarbanks
|
|
if {ApiCategories.solarbank_solar_info} - exclude:
|
|
self._logger.debug("Getting inverter settings for device")
|
|
await self.get_solar_info(solarbankSn=sn, fromFile=fromFile)
|
|
|
|
# Fetch schedule for device types supporting it
|
|
self._logger.debug("Getting schedule details for device")
|
|
await self.get_device_load(
|
|
siteId=site_id, deviceSn=sn, fromFile=fromFile
|
|
)
|
|
|
|
# Fetch device fittings for device types supporting it
|
|
if {ApiCategories.solarbank_fittings} - exclude:
|
|
self._logger.debug("Getting fittings for device")
|
|
await self.get_device_fittings(
|
|
siteId=site_id, deviceSn=sn, fromFile=fromFile
|
|
)
|
|
|
|
# update entry in devices
|
|
self.devices.update({sn: device})
|
|
|
|
# TODO(#0): Fetch other details of specific device types as known and relevant
|
|
|
|
return self.devices
|
|
|
|
async def update_device_energy(self, exclude: set | None = None) -> dict:
|
|
"""Get the energy statistics for given device types from today and yesterday.
|
|
|
|
Yesterday energy will be queried only once if not available yet, but not updated in subsequent refreshes.
|
|
Energy data can also be fetched by shared accounts.
|
|
"""
|
|
# define allowed device types to query, default to no energy data
|
|
if not exclude:
|
|
exclude = set()
|
|
for sn, device in self.devices.items():
|
|
site_id = device.get("site_id", "")
|
|
dev_Type = device.get("type", "")
|
|
if (
|
|
dev_Type in ({SolixDeviceType.SOLARBANK.value} - exclude)
|
|
and {ApiCategories.solarbank_energy} - exclude
|
|
):
|
|
self._logger.debug("Getting Energy details for device")
|
|
energy = device.get("energy_details") or {}
|
|
today = datetime.today().strftime("%Y-%m-%d")
|
|
yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
# Fetch energy from today
|
|
data = await self.energy_daily(
|
|
siteId=site_id,
|
|
deviceSn=sn,
|
|
startDay=datetime.fromisoformat(today),
|
|
numDays=1,
|
|
dayTotals=True,
|
|
)
|
|
energy["today"] = data.get(today) or {}
|
|
if yesterday != (energy.get("last_period") or {}).get("date"):
|
|
# Fetch energy from previous day once
|
|
data = await self.energy_daily(
|
|
siteId=site_id,
|
|
deviceSn=sn,
|
|
startDay=datetime.fromisoformat(yesterday),
|
|
numDays=1,
|
|
dayTotals=True,
|
|
)
|
|
energy["last_period"] = data.get(yesterday) or {}
|
|
device["energy_details"] = energy
|
|
self.devices[sn] = device
|
|
|
|
async def get_site_rules(self, fromFile: bool = False) -> dict:
|
|
"""Get the site rules supported by the api.
|
|
|
|
Example data:
|
|
{'rule_list': [
|
|
{'power_site_type': 1, 'main_device_models': ['A5143'], 'device_models': ['A5143', 'A1771'], 'can_empty_site': False,
|
|
'quantity_min_limit_map': {'A1771': 1, 'A5143': 1},'quantity_max_limit_map': {'A1771': 2, 'A5143': 1}},
|
|
{'power_site_type': 2, 'main_device_models': ['A17C0'], 'device_models': ['A17C0', 'A5143', 'A1771'], 'can_empty_site': False,
|
|
'quantity_min_limit_map': {'A17C0': 1}, 'quantity_max_limit_map': {'A1771': 2, 'A17C0': 2, 'A5143': 1}},
|
|
{'power_site_type': 4, 'main_device_models': ['A17B1'], 'device_models': ['A17B1'], 'can_empty_site': True,
|
|
'quantity_min_limit_map': None, 'quantity_max_limit_map': {'A17B1': 1}},
|
|
{'power_site_type': 5, 'main_device_models': ['A17C1'], 'device_models': ['A17C1', 'A17X7'], 'can_empty_site': True,
|
|
'quantity_min_limit_map': None, 'quantity_max_limit_map': {'A17C1': 1}},
|
|
{'power_site_type': 6, 'main_device_models': ['A5341'], 'device_models': ['A5341', 'A5101', 'A5220'], 'can_empty_site': False,
|
|
'quantity_min_limit_map': {'A5341': 1}, 'quantity_max_limit_map': {'A5341': 1}},
|
|
{'power_site_type': 7, 'main_device_models': ['A5101'], 'device_models': ['A5101', 'A5220'], 'can_empty_site': False,
|
|
'quantity_min_limit_map': {'A5101': 1}, 'quantity_max_limit_map': {'A5101': 6}},
|
|
{'power_site_type': 8, 'main_device_models': ['A5102'], 'device_models': ['A5102', 'A5220'], 'can_empty_site': False,
|
|
'quantity_min_limit_map': {'A5102': 1}, 'quantity_max_limit_map': {'A5102': 6}},
|
|
{'power_site_type': 9, 'main_device_models': ['A5103'], 'device_models': ['A5103', 'A5220'], 'can_empty_site': False,
|
|
'quantity_min_limit_map': {'A5103': 1}, 'quantity_max_limit_map': {'A5103': 6}}]}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(os.path.join(self._testdir, "site_rules.json"))
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["site_rules"])
|
|
return resp.get("data", {})
|
|
|
|
async def get_site_list(self, fromFile: bool = False) -> dict:
|
|
"""Get the site list.
|
|
|
|
Example data:
|
|
{'site_list': [{'site_id': 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', 'site_name': 'BKW', 'site_img': '', 'device_type_list': [3], 'ms_type': 2, 'power_site_type': 2, 'is_allow_delete': True}]}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(os.path.join(self._testdir, "site_list.json"))
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["site_list"])
|
|
return resp.get("data", {})
|
|
|
|
async def get_scene_info(self, siteId: str, fromFile: bool = False) -> dict:
|
|
"""Get scene info. This can be queried for each siteId listed in the homepage info site_list. It shows also data for accounts that are only site members.
|
|
|
|
Example data for provided site_id:
|
|
{"home_info":{"home_name":"Home","home_img":"","charging_power":"0.00","power_unit":"W"},
|
|
"solar_list":[],
|
|
"pps_info":{"pps_list":[],"total_charging_power":"0.00","power_unit":"W","total_battery_power":"0.00","updated_time":"","pps_status":0},
|
|
"statistics":[{"type":"1","total":"89.75","unit":"kwh"},{"type":"2","total":"89.48","unit":"kg"},{"type":"3","total":"35.90","unit":"€"}],
|
|
"topology_type":"1","solarbank_info":{"solarbank_list":
|
|
[{"device_pn":"A17C0","device_sn":"9JVB42LJK8J0P5RY","device_name":"Solarbank E1600",
|
|
"device_img":"https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png",
|
|
"battery_power":"75","bind_site_status":"","charging_power":"0","power_unit":"W","charging_status":"2","status":"0","wireless_type":"1","main_version":"","photovoltaic_power":"0",
|
|
"output_power":"0","create_time":1695392386}],
|
|
"total_charging_power":"0","power_unit":"W","charging_status":"0","total_battery_power":"0.00","updated_time":"2023-12-28 18:53:27","total_photovoltaic_power":"0","total_output_power":"0.00"},
|
|
"retain_load":"0W","updated_time":"01-01-0001 00:00:00","power_site_type":2,"site_id":"efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c"}
|
|
"""
|
|
data = {"site_id": siteId}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"scene_{siteId}.json")
|
|
)
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["scene_info"], json=data)
|
|
return resp.get("data", {})
|
|
|
|
async def get_homepage(self, fromFile: bool = False) -> dict:
|
|
"""Get the latest homepage info.
|
|
|
|
NOTE: This returns only data if the site is owned by the account. No data returned for site member accounts
|
|
Example data:
|
|
{"site_list":[{"site_id":"efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c","site_name":"BKW","site_img":"","device_type_list":[3],"ms_type":0,"power_site_type":0,"is_allow_delete":false}],
|
|
"solar_list":[],"pps_list":[],
|
|
"solarbank_list":[{"device_pn":"","device_sn":"9JVB42LJK8J0P5RY","device_name":"Solarbank E1600",
|
|
"device_img":"https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png",
|
|
"battery_power":"75","bind_site_status":"1","charging_power":"","power_unit":"","charging_status":"","status":"","wireless_type":"","main_version":"","photovoltaic_power":"","output_power":"","create_time":0}],
|
|
"powerpanel_list":[]}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(os.path.join(self._testdir, "homepage.json"))
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["homepage"])
|
|
return resp.get("data", {})
|
|
|
|
async def get_bind_devices(self, fromFile: bool = False) -> dict:
|
|
"""Get the bind device information, contains firmware level of devices.
|
|
|
|
Example data:
|
|
{"data": [{"device_sn":"9JVB42LJK8J0P5RY","product_code":"A17C0","bt_ble_id":"BC:A2:AF:C7:55:F9","bt_ble_mac":"BCA2AFC755F9","device_name":"Solarbank E1600","alias_name":"Solarbank E1600",
|
|
"img_url":"https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png",
|
|
"link_time":1695392302068,"wifi_online":false,"wifi_name":"","relate_type":["ble","wifi"],"charge":false,"bws_surplus":0,"device_sw_version":"v1.4.4","has_manual":false}]}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(os.path.join(self._testdir, "bind_devices.json"))
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["bind_devices"])
|
|
data = resp.get("data", {})
|
|
active_devices = set()
|
|
for device in data.get("data", []):
|
|
if sn := self._update_dev(device):
|
|
active_devices.add(sn)
|
|
# recycle api device list and remove devices no longer used in sites or bind devices
|
|
rem_devices = [
|
|
dev
|
|
for dev in self.devices
|
|
if dev not in (self._site_devices | active_devices)
|
|
]
|
|
for dev in rem_devices:
|
|
self.devices.pop(dev, None)
|
|
return data
|
|
|
|
async def get_user_devices(self, fromFile: bool = False) -> dict:
|
|
"""Get device details of all devices owned by user.
|
|
|
|
Example data: (Information is mostly empty when device is bound to site)
|
|
{'solar_list': [], 'pps_list': [], 'solarbank_list': [{'device_pn': 'A17C0', 'device_sn': '9JVB42LJK8J0P5RY', 'device_name': 'Solarbank E1600',
|
|
'device_img': 'https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png',
|
|
'battery_power': '', 'bind_site_status': '1', 'charging_power': '', 'power_unit': '', 'charging_status': '', 'status': '', 'wireless_type': '1', 'main_version': '',
|
|
'photovoltaic_power': '', 'output_power': '', 'create_time': 0}]}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(os.path.join(self._testdir, "user_devices.json"))
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["user_devices"])
|
|
return resp.get("data", {})
|
|
|
|
async def get_charging_devices(self, fromFile: bool = False) -> dict:
|
|
"""Get the charging devices (Power stations?).
|
|
|
|
Example data:
|
|
{'device_list': None, 'guide_txt': ''}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, "charging_devices.json")
|
|
)
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["charging_devices"])
|
|
return resp.get("data", {})
|
|
|
|
async def get_solar_info(self, solarbankSn: str, fromFile: bool = False) -> dict:
|
|
"""Get the solar info that is condigured for a solarbank.
|
|
|
|
Example data:
|
|
{"brand_id": "3a9930f5-74ef-4e41-a797-04e6b33d3f0f","solar_brand": "ANKER","solar_model": "A5140","solar_sn": "","solar_model_name": "MI60 Microinverter"}
|
|
"""
|
|
data = {"solarbank_sn": solarbankSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"solar_info_{solarbankSn}.json")
|
|
)
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["solar_info"], json=data)
|
|
data = resp.get("data", {})
|
|
if data:
|
|
self._update_dev({"device_sn": solarbankSn, "solar_info": data})
|
|
return data
|
|
|
|
async def get_compatible_info(
|
|
self, solarbankSn: str, fromFile: bool = False
|
|
) -> dict:
|
|
"""Get the solar info and OTA processing info for a solarbank.
|
|
|
|
Example data:
|
|
{"ota_complete_status": 2,"process_skip_type": 1,"solar_info": {
|
|
"solar_sn": "","solar_brand": "ANKER","solar_model": "A5140","brand_id": "3a9930f5-74ef-4e41-a797-04e6b33d3f0f",
|
|
"model_img": "https://public-aiot-ore-qa.s3.us-west-2.amazonaws.com/product/870cd979-95d8-4cc1-89c4-04a26511c9b1/picl_A1771_normal.png",
|
|
"version": "","ota_status": 1,"solar_model_name": "MI60 Microinverter"}
|
|
"""
|
|
data = {"solarbank_sn": solarbankSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"compatible_process_{solarbankSn}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["compatible_process"], json=data
|
|
)
|
|
data = resp.get("data", {})
|
|
if info := data.get("solar_info", {}):
|
|
self._update_dev({"device_sn": solarbankSn, "solar_info": info})
|
|
return data
|
|
|
|
async def get_auto_upgrade(self, fromFile: bool = False) -> dict:
|
|
"""Get auto upgrade settings and devices enabled for auto upgrade.
|
|
|
|
Example data:
|
|
{'main_switch': True, 'device_list': [{'device_sn': '9JVB42LJK8J0P5RY', 'device_name': 'Solarbank E1600', 'auto_upgrade': True, 'alias_name': 'Solarbank E1600',
|
|
'icon': 'https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png'}]}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(os.path.join(self._testdir, "auto_upgrade.json"))
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["get_auto_upgrade"])
|
|
data = resp.get("data", {})
|
|
main = data.get("main_switch")
|
|
devicelist = data.get("device_list", []) # could be null for non owning account
|
|
if not devicelist:
|
|
devicelist = []
|
|
for device in devicelist:
|
|
dev_ota = device.get("auto_upgrade")
|
|
if isinstance(dev_ota, bool):
|
|
# update device setting based on main setting if available
|
|
if isinstance(main, bool):
|
|
device.update({"auto_upgrade": main and dev_ota})
|
|
self._update_dev(device)
|
|
return data
|
|
|
|
async def set_auto_upgrade(self, devices: dict[str, bool]) -> bool:
|
|
"""Set auto upgrade switches for given device dictonary.
|
|
|
|
Example input:
|
|
devices = {'9JVB42LJK8J0P5RY': True}
|
|
The main switch must be set True if any device switch is set True. The main switch does not need to be changed to False if no device is True.
|
|
But if main switch is set to False, all devices will automatically be set to False and individual setting is ignored by Api.
|
|
"""
|
|
# get actual settings
|
|
settings = await self.get_auto_upgrade()
|
|
if (main_switch := settings.get("main_switch")) is None:
|
|
return False
|
|
dev_switches = {}
|
|
main = None
|
|
change_list = []
|
|
for dev_setting in settings.get("device_list") or []:
|
|
if (
|
|
isinstance(dev_setting, dict)
|
|
and (device_sn := dev_setting.get("device_sn"))
|
|
and (dev_upgrade := dev_setting.get("auto_upgrade")) is not None
|
|
):
|
|
dev_switches[device_sn] = dev_upgrade
|
|
# Loop through provided device list and compose the request data device list that needs to be send
|
|
for sn, upgrade in devices.items():
|
|
if sn in dev_switches:
|
|
if upgrade != dev_switches[sn]:
|
|
change_list.append({"device_sn": sn, "auto_upgrade": upgrade})
|
|
if upgrade:
|
|
main = True
|
|
|
|
if change_list:
|
|
# json example for endpoint
|
|
# {"main_switch": False, "device_list": [{"device_sn": "9JVB42LJK8J0P5RY","auto_upgrade": True}]}
|
|
data = {
|
|
"main_switch": main if main is not None else main_switch,
|
|
"device_list": change_list,
|
|
}
|
|
# Make the Api call and check for return code
|
|
code = (
|
|
await self.request(
|
|
"post", _API_ENDPOINTS["set_auto_upgrade"], json=data
|
|
)
|
|
).get("code")
|
|
if not isinstance(code, int) or int(code) != 0:
|
|
return False
|
|
# update the data in api dict
|
|
await self.get_auto_upgrade()
|
|
|
|
return True
|
|
|
|
async def get_wifi_list(self, siteId: str, fromFile: bool = False) -> dict:
|
|
"""Get the wifi list.
|
|
|
|
Example data:
|
|
{'wifi_info_list': [{'wifi_name': 'wifi-network-1', 'wifi_signal': '100'}]}
|
|
"""
|
|
data = {"site_id": siteId}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"wifi_list_{siteId}.json")
|
|
)
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["wifi_list"], json=data)
|
|
return resp.get("data", {})
|
|
|
|
async def get_power_cutoff(
|
|
self, deviceSn: str, siteId: str = "", fromFile: bool = False
|
|
) -> dict:
|
|
"""Get power cut off settings.
|
|
|
|
Example data:
|
|
{'power_cutoff_data': [
|
|
{'id': 1, 'is_selected': 1, 'output_cutoff_data': 10, 'lowpower_input_data': 5, 'input_cutoff_data': 10},
|
|
{'id': 2, 'is_selected': 0, 'output_cutoff_data': 5, 'lowpower_input_data': 4, 'input_cutoff_data': 5}]}
|
|
"""
|
|
data = {"site_id": siteId, "device_sn": deviceSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"power_cutoff_{deviceSn}.json")
|
|
)
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["get_cutoff"], json=data)
|
|
data = resp.get("data", {})
|
|
# add whole list to device details to provide option selection capabilities
|
|
details = {
|
|
"device_sn": deviceSn,
|
|
"power_cutoff_data": data.get("power_cutoff_data") or [],
|
|
}
|
|
for setting in data.get("power_cutoff_data") or []:
|
|
if (
|
|
int(setting.get("is_selected", 0)) > 0
|
|
and int(setting.get("output_cutoff_data", 0)) > 0
|
|
):
|
|
details.update(
|
|
{
|
|
"power_cutoff": int(setting.get("output_cutoff_data")),
|
|
}
|
|
)
|
|
self._update_dev(details)
|
|
return data
|
|
|
|
async def set_power_cutoff(self, deviceSn: str, setId: int) -> bool:
|
|
"""Set power cut off settings.
|
|
|
|
Example input:
|
|
{'device_sn': '9JVB42LJK8J0P5RY', 'cutoff_data_id': 1}
|
|
The id must be one of the ids listed with the get_power_cutoff endpoint
|
|
"""
|
|
data = {
|
|
"device_sn": deviceSn,
|
|
"cutoff_data_id": setId,
|
|
}
|
|
# Make the Api call and check for return code
|
|
code = (
|
|
await self.request("post", _API_ENDPOINTS["set_cutoff"], json=data)
|
|
).get("code")
|
|
if not isinstance(code, int) or int(code) != 0:
|
|
return False
|
|
# update the data in api dict
|
|
await self.get_power_cutoff(deviceSn=deviceSn)
|
|
return True
|
|
|
|
async def get_site_price(self, siteId: str, fromFile: bool = False) -> dict:
|
|
"""Get the power price set for the site.
|
|
|
|
Example data:
|
|
{"site_id": "efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c","price": 0.4,"site_co2": 0,"site_price_unit": "\u20ac"}
|
|
"""
|
|
data = {"site_id": siteId}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"price_{siteId}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["get_site_price"], json=data
|
|
)
|
|
data = resp.get("data", {})
|
|
# update site details in sites dict
|
|
details = data.copy()
|
|
if "site_id" in details:
|
|
details.pop("site_id")
|
|
self._update_site(siteId, details)
|
|
return data
|
|
|
|
async def set_site_price(
|
|
self,
|
|
siteId: str,
|
|
price: float | None = None,
|
|
unit: str | None = None,
|
|
co2: float | None = None,
|
|
) -> bool:
|
|
"""Set the power price, the unit and/or CO2 for a site.
|
|
|
|
Example input:
|
|
{"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "price": 0.325, "site_price_unit": "\u20ac", "site_co2": 0}
|
|
The id must be one of the ids listed with the get_power_cutoff endpoint
|
|
"""
|
|
# First get the old settings if only single setting should be updated
|
|
details = {}
|
|
if siteId in self.sites:
|
|
details = (self.sites.get(siteId) or {}).get("site_details") or {}
|
|
new_price = details.get("price") if price is None else price
|
|
new_unit = details.get("site_price_unit") if unit is None else unit
|
|
new_co2 = details.get("site_co2") if co2 is None else co2
|
|
data = {}
|
|
# Need to query old setting to avoid changing them if parameter not provided
|
|
if new_price is None or new_unit is None or new_co2 is None:
|
|
data = await self.get_site_price(siteId=siteId)
|
|
if new_price is not None:
|
|
data["price"] = new_price
|
|
if new_unit is not None:
|
|
data["site_price_unit"] = new_unit
|
|
if new_co2 is not None:
|
|
data["site_co2"] = new_co2
|
|
else:
|
|
data.update(
|
|
{
|
|
"site_id": siteId,
|
|
"price": new_price,
|
|
"site_price_unit": new_unit,
|
|
"site_co2": new_co2,
|
|
}
|
|
)
|
|
# Make the Api call and check for return code
|
|
code = (
|
|
await self.request("post", _API_ENDPOINTS["update_site_price"], json=data)
|
|
).get("code")
|
|
if not isinstance(code, int) or int(code) != 0:
|
|
return False
|
|
# update the data in api dict
|
|
await self.get_site_price(siteId=siteId)
|
|
return True
|
|
|
|
async def get_device_load(
|
|
self, siteId: str, deviceSn: str, fromFile: bool = False
|
|
) -> dict:
|
|
r"""Get device load settings.
|
|
|
|
Example data:
|
|
{"site_id": "efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",
|
|
"home_load_data": "{\"ranges\":[
|
|
{\"id\":0,\"start_time\":\"00:00\",\"end_time\":\"08:30\",\"turn_on\":true,\"appliance_loads\":[{\"id\":0,\"name\":\"Benutzerdefiniert\",\"power\":300,\"number\":1}],\"charge_priority\":80},
|
|
{\"id\":0,\"start_time\":\"08:30\",\"end_time\":\"17:00\",\"turn_on\":false,\"appliance_loads\":[{\"id\":0,\"name\":\"Benutzerdefiniert\",\"power\":100,\"number\":1}],\"charge_priority\":80},
|
|
{\"id\":0,\"start_time\":\"17:00\",\"end_time\":\"24:00\",\"turn_on\":true,\"appliance_loads\":[{\"id\":0,\"name\":\"Benutzerdefiniert\",\"power\":300,\"number\":1}],\"charge_priority\":0}],
|
|
\"min_load\":100,\"max_load\":800,\"step\":0,\"is_charge_priority\":0,\"default_charge_priority\":0,\"is_zero_output_tips\":1}",
|
|
"current_home_load": "300W","parallel_home_load": "","parallel_display": false}
|
|
"""
|
|
data = {"site_id": siteId, "device_sn": deviceSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"device_load_{deviceSn}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["get_device_load"], json=data
|
|
)
|
|
# The home_load_data is provided as string instead of object...Convert into object for proper handling
|
|
# It must be converted back to a string when passing this as input to set home load
|
|
string_data = (resp.get("data") or {}).get("home_load_data") or {}
|
|
if isinstance(string_data, str):
|
|
resp["data"].update({"home_load_data": json.loads(string_data)})
|
|
data = resp.get("data") or {}
|
|
# update schedule also for all device serials found in schedule
|
|
schedule = data.get("home_load_data") or {}
|
|
dev_serials = []
|
|
for slot in schedule.get("ranges") or []:
|
|
for dev in slot.get("device_power_loads") or []:
|
|
if (sn := dev.get("device_sn")) and sn not in dev_serials:
|
|
dev_serials.append(sn)
|
|
# add the given serial to list if not existing yet
|
|
if deviceSn and deviceSn not in dev_serials:
|
|
dev_serials.append(deviceSn)
|
|
for sn in dev_serials:
|
|
self._update_dev(
|
|
{
|
|
"device_sn": sn,
|
|
"schedule": schedule,
|
|
"current_home_load": data.get("current_home_load") or "",
|
|
"parallel_home_load": data.get("parallel_home_load") or "",
|
|
}
|
|
)
|
|
return data
|
|
|
|
async def set_device_load(
|
|
self,
|
|
siteId: str,
|
|
deviceSn: str,
|
|
loadData: dict,
|
|
) -> dict:
|
|
"""Set device home load (e.g. solarbank schedule).
|
|
|
|
Example input for system with single solarbank:
|
|
{'site_id': 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', 'device_sn': '9JVB42LJK8J0P5RY',
|
|
'home_load_data': '{"ranges":['
|
|
'{"id":0,"start_time":"00:00","end_time":"06:30","turn_on":true,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":300,"number":1}],'
|
|
'"charge_priority":0,"power_setting_mode":1,"device_power_loads":[{"device_sn":"9JVB42LJK8J0P5RY","power":150}]},'
|
|
'{"id":0,"start_time":"07:30","end_time":"18:00","turn_on":false,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":100,"number":1}],'
|
|
'"charge_priority":80,"power_setting_mode":1,"device_power_loads":[{"device_sn":"9JVB42LJK8J0P5RY","power":50}]},'
|
|
'{"id":0,"start_time":"18:00","end_time":"24:00","turn_on":true,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":300,"number":1}],'
|
|
'"charge_priority":0,"power_setting_mode":1,"device_power_loads":[{"device_sn":"9JVB42LJK8J0P5RY","power":150}]}],'
|
|
'"min_load":100,"max_load":800,"step":0,"is_charge_priority":0,"default_charge_priority":0,"is_zero_output_tips":1,"display_advanced_mode":0,"advanced_mode_min_load":0}'
|
|
}
|
|
Attention: This method and endpoint actually accepts the inputs, but does not change anything. The set_device_parm endpoint may have to be used
|
|
"""
|
|
data = {
|
|
"site_id": siteId,
|
|
"device_sn": deviceSn,
|
|
"home_load_data": json.dumps(loadData),
|
|
}
|
|
# Make the Api call and check for return code
|
|
code = (
|
|
await self.request("post", _API_ENDPOINTS["set_device_load"], json=data)
|
|
).get("code")
|
|
if not isinstance(code, int) or int(code) != 0:
|
|
return False
|
|
# update the data in api dict
|
|
await self.get_device_load(siteId=siteId, deviceSn=deviceSn)
|
|
return True
|
|
|
|
async def get_device_parm(
|
|
self,
|
|
siteId: str,
|
|
paramType: str = SolixParmType.SOLARBANK_SCHEDULE.value,
|
|
deviceSn: str | None = None,
|
|
fromFile: bool = False,
|
|
) -> dict:
|
|
r"""Get device parameters (e.g. solarbank schedule). This can be queried for each siteId listed in the homepage info site_list. The paramType is always 4, but can be modified if necessary.
|
|
|
|
Example data for provided site_id:
|
|
{"param_data": "{\"ranges\":[
|
|
{\"id\":0,\"start_time\":\"00:00\",\"end_time\":\"08:30\",\"turn_on\":true,\"appliance_loads\":[{\"id\":0,\"name\":\"Benutzerdefiniert\",\"power\":300,\"number\":1}],\"charge_priority\":80},
|
|
{\"id\":0,\"start_time\":\"08:30\",\"end_time\":\"17:00\",\"turn_on\":false,\"appliance_loads\":[{\"id\":0,\"name\":\"Benutzerdefiniert\",\"power\":100,\"number\":1}],\"charge_priority\":80},
|
|
{\"id\":0,\"start_time\":\"17:00\",\"end_time\":\"24:00\",\"turn_on\":true,\"appliance_loads\":[{\"id\":0,\"name\":\"Benutzerdefiniert\",\"power\":300,\"number\":1}],\"charge_priority\":0}],
|
|
\"min_load\":100,\"max_load\":800,\"step\":0,\"is_charge_priority\":0,\"default_charge_priority\":0,\"is_zero_output_tips\":1}"}
|
|
"""
|
|
data = {"site_id": siteId, "param_type": paramType}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"device_parm_{siteId}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["get_device_parm"], json=data
|
|
)
|
|
# The home_load_data is provided as string instead of object...Convert into object for proper handling
|
|
# It must be converted back to a string when passing this as input to set home load
|
|
string_data = (resp.get("data", {})).get("param_data", {})
|
|
if isinstance(string_data, str):
|
|
resp["data"].update({"param_data": json.loads(string_data)})
|
|
|
|
# update api device dict with latest data if optional device SN was provided, e.g. when called by set_device_parm for device details update
|
|
data = resp.get("data") or {}
|
|
# update schedule also for all device serials found in schedule
|
|
schedule = data.get("param_data") or {}
|
|
dev_serials = []
|
|
for slot in schedule.get("ranges") or []:
|
|
for dev in slot.get("device_power_loads") or []:
|
|
if (sn := dev.get("device_sn")) and sn not in dev_serials:
|
|
dev_serials.append(sn)
|
|
# add the given serial to list if not existing yet
|
|
if deviceSn and deviceSn not in dev_serials:
|
|
dev_serials.append(deviceSn)
|
|
for sn in dev_serials:
|
|
self._update_dev(
|
|
{
|
|
"device_sn": sn,
|
|
"schedule": schedule,
|
|
"current_home_load": data.get("current_home_load") or "",
|
|
"parallel_home_load": data.get("parallel_home_load") or "",
|
|
}
|
|
)
|
|
return data
|
|
|
|
async def set_device_parm(
|
|
self,
|
|
siteId: str,
|
|
paramData: dict,
|
|
paramType: str = SolixParmType.SOLARBANK_SCHEDULE.value,
|
|
command: int = 17,
|
|
deviceSn: str | None = None,
|
|
) -> dict:
|
|
"""Set device parameters (e.g. solarbank schedule).
|
|
|
|
command: Must be 17 for solarbank schedule.
|
|
paramType: was always string "4"
|
|
Example paramData:
|
|
{"param_data": '{"ranges":['
|
|
'{"id":0,"start_time":"00:00","end_time":"08:30","turn_on":true,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":300,"number":1}],"charge_priority":80},'
|
|
'{"id":0,"start_time":"08:30","end_time":"17:00","turn_on":false,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":100,"number":1}],"charge_priority":80},'
|
|
'{"id":0,"start_time":"17:00","end_time":"24:00","turn_on":true,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":300,"number":1}],"charge_priority":0}],'
|
|
'"min_load":100,"max_load":800,"step":0,"is_charge_priority":0,default_charge_priority":0}}'
|
|
"""
|
|
data = {
|
|
"site_id": siteId,
|
|
"param_type": paramType,
|
|
"cmd": command,
|
|
"param_data": json.dumps(paramData), # Must be string type
|
|
}
|
|
code = (
|
|
await self.request("post", _API_ENDPOINTS["set_device_parm"], json=data)
|
|
).get("code")
|
|
if not isinstance(code, int) or int(code) != 0:
|
|
return False
|
|
# update the data in api dict
|
|
await self.get_device_parm(siteId=siteId, deviceSn=deviceSn)
|
|
return True
|
|
|
|
async def set_home_load( # noqa: C901
|
|
self,
|
|
siteId: str,
|
|
deviceSn: str,
|
|
all_day: bool = False,
|
|
preset: int | None = None,
|
|
export: bool | None = None,
|
|
charge_prio: int | None = None,
|
|
set_slot: SolarbankTimeslot = None,
|
|
insert_slot: SolarbankTimeslot = None,
|
|
) -> bool:
|
|
"""Set the home load parameters for a given site id and device for actual or all slots in the existing schedule.
|
|
|
|
If no time slot is defined for current time, a new slot will be inserted for the gap. This will result in full day definition when no slot is defined.
|
|
Optionally when set_slot SolarbankTimeslot is provided, the given slot will replace the existing schedule completely.
|
|
When insert_slot SolarbankTimeslot is provided, the given slot will be incoorporated into existing schedule. Adjacent overlapping slot times will be updated and overlayed slots will be replaced.
|
|
|
|
Example schedule as provided via Api:
|
|
{{"ranges":[
|
|
{"id":0,"start_time":"00:00","end_time":"08:30","turn_on":true,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":300,"number":1}],"charge_priority":80},
|
|
{"id":0,"start_time":"08:30","end_time":"17:00","turn_on":false,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":100,"number":1}],"charge_priority":80},
|
|
{"id":0,"start_time":"17:00","end_time":"24:00","turn_on":true,"appliance_loads":[{"id":0,"name":"Benutzerdefiniert","power":300,"number":1}],"charge_priority":0}],
|
|
"min_load":100,"max_load":800,"step":0,"is_charge_priority":0,default_charge_priority":0}}
|
|
"""
|
|
# fast quit if nothing to change
|
|
if not str(charge_prio).isdigit():
|
|
charge_prio = None
|
|
if not str(preset).isdigit():
|
|
preset = None
|
|
if (
|
|
preset is None
|
|
and export is None
|
|
and charge_prio is None
|
|
and set_slot is None
|
|
and insert_slot is None
|
|
):
|
|
return False
|
|
# set flag for required current parameter update
|
|
if set_slot is None and insert_slot is None:
|
|
pending_now_update = True
|
|
else:
|
|
pending_now_update = False
|
|
# obtain actual device schedule from internal dict or fetch via api
|
|
if not (schedule := (self.devices.get(deviceSn) or {}).get("schedule") or {}):
|
|
schedule = (
|
|
await self.get_device_load(siteId=siteId, deviceSn=deviceSn)
|
|
).get("home_load_data") or {}
|
|
if (min_load := str(schedule.get("min_load"))).isdigit():
|
|
# min_load = int(min_load)
|
|
# Allow lower min setting as defined by API minimum. This however may be ignored if outsite of appliance defined slot boundaries.
|
|
min_load = SolixDefaults.PRESET_MIN
|
|
else:
|
|
min_load = SolixDefaults.PRESET_MIN
|
|
if (max_load := str(schedule.get("max_load"))).isdigit():
|
|
max_load = int(max_load)
|
|
else:
|
|
max_load = SolixDefaults.PRESET_MAX
|
|
ranges = schedule.get("ranges") or []
|
|
# get appliance load name from first existing slot to avoid mixture
|
|
# NOTE: The solarbank may behave weird if a mixture is found or the name does not match with some internal settings
|
|
# The name cannot be queried, but seems to be 'custom' by default. However, the mobile app translates it to whather language is defined in the App
|
|
appliance_name = None
|
|
pending_insert = False
|
|
if len(ranges) > 0:
|
|
appliance_name = (ranges[0].get("appliance_loads") or [{}])[0].get("name")
|
|
if insert_slot:
|
|
# set flag for pending insert slot
|
|
pending_insert = True
|
|
elif insert_slot:
|
|
# use insert_slot for set_slot to define a single new slot when no slots exist
|
|
set_slot = insert_slot
|
|
|
|
new_ranges = []
|
|
# update individual values in current slot or insert SolarbankTimeslot and adjust adjacent slots
|
|
if not set_slot:
|
|
now = datetime.now().time().replace(microsecond=0)
|
|
last_time = datetime.strptime("00:00", "%H:%M").time()
|
|
# set now to new daytime if close to end of day to determine which slot to modify
|
|
if now >= datetime.strptime("23:59:58", "%H:%M:%S").time():
|
|
now = datetime.strptime("00:00", "%H:%M").time()
|
|
next_start = None
|
|
split_slot = {}
|
|
for idx, slot in enumerate(ranges, start=1):
|
|
with contextlib.suppress(ValueError):
|
|
start_time = datetime.strptime(
|
|
slot.get("start_time") or "00:00", "%H:%M"
|
|
).time()
|
|
# "24:00" format not supported in strptime
|
|
end_time = datetime.strptime(
|
|
(
|
|
str(slot.get("end_time") or "00:00").replace(
|
|
"24:00", "23:59"
|
|
)
|
|
),
|
|
"%H:%M",
|
|
).time()
|
|
# check slot timings to update current, or insert new and modify adjacent slots
|
|
insert = {}
|
|
|
|
# Check if parameter update required for current time but it falls into gap of no defined slot.
|
|
# Create insert slot for the gap and add before or after current slot at the end of the current slot checks/modifications required for allday usage
|
|
if (
|
|
not insert_slot
|
|
and pending_now_update
|
|
and (
|
|
last_time <= now < start_time
|
|
or (idx == len(ranges) and now >= end_time)
|
|
)
|
|
):
|
|
# Use daily end time if now after last slot
|
|
insert = copy.deepcopy(slot)
|
|
insert.update(
|
|
{"start_time": next_start.isoformat(timespec="minutes")}
|
|
)
|
|
insert.update(
|
|
{
|
|
"end_time": (
|
|
start_time.isoformat(timespec="minutes")
|
|
).replace("23:59", "24:00")
|
|
if now < start_time
|
|
else "24:00"
|
|
}
|
|
)
|
|
(insert.get("appliance_loads") or [{}])[0].update(
|
|
{
|
|
"power": min(
|
|
max(
|
|
int(
|
|
SolixDefaults.PRESET_DEF
|
|
if preset is None
|
|
else preset
|
|
),
|
|
min_load,
|
|
),
|
|
max_load,
|
|
),
|
|
}
|
|
)
|
|
insert.update(
|
|
{
|
|
"turn_on": SolixDefaults.ALLOW_EXPORT
|
|
if export is None
|
|
else export
|
|
}
|
|
)
|
|
insert.update(
|
|
{
|
|
"charge_priority": min(
|
|
max(
|
|
int(
|
|
SolixDefaults.CHARGE_PRIORITY_DEF
|
|
if charge_prio is None
|
|
else charge_prio
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MIN,
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MAX,
|
|
)
|
|
}
|
|
)
|
|
|
|
# if gap is before current slot, insert now
|
|
if now < start_time:
|
|
new_ranges.append(insert)
|
|
last_time = start_time
|
|
insert = {}
|
|
|
|
if pending_insert and (
|
|
insert_slot.start_time.time() <= start_time
|
|
or idx == len(ranges)
|
|
):
|
|
# copy slot, update and insert the new slot
|
|
# re-use old slot parms if insert slot has not defined optional parms
|
|
insert = copy.deepcopy(slot)
|
|
insert.update(
|
|
{
|
|
"start_time": datetime.strftime(
|
|
insert_slot.start_time, "%H:%M"
|
|
)
|
|
}
|
|
)
|
|
insert.update(
|
|
{
|
|
"end_time": datetime.strftime(
|
|
insert_slot.end_time, "%H:%M"
|
|
).replace("23:59", "24:00")
|
|
}
|
|
)
|
|
if insert_slot.appliance_load is not None or (
|
|
insert_slot.start_time.time() < start_time
|
|
and insert_slot.end_time.time() != end_time
|
|
):
|
|
(insert.get("appliance_loads") or [{}])[0].update(
|
|
{
|
|
"power": min(
|
|
max(
|
|
int(
|
|
SolixDefaults.PRESET_DEF
|
|
if insert_slot.appliance_load is None
|
|
else insert_slot.appliance_load
|
|
),
|
|
min_load,
|
|
),
|
|
max_load,
|
|
),
|
|
}
|
|
)
|
|
if insert_slot.allow_export is not None or (
|
|
insert_slot.start_time.time() < start_time
|
|
and insert_slot.end_time.time() != end_time
|
|
):
|
|
insert.update(
|
|
{
|
|
"turn_on": SolixDefaults.ALLOW_EXPORT
|
|
if insert_slot.allow_export is None
|
|
else insert_slot.allow_export
|
|
}
|
|
)
|
|
if insert_slot.charge_priority_limit is not None or (
|
|
insert_slot.start_time.time() < start_time
|
|
and insert_slot.end_time.time() != end_time
|
|
):
|
|
insert.update(
|
|
{
|
|
"charge_priority": min(
|
|
max(
|
|
int(
|
|
SolixDefaults.CHARGE_PRIORITY_DEF
|
|
if insert_slot.charge_priority_limit
|
|
is None
|
|
else insert_slot.charge_priority_limit
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MIN,
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MAX,
|
|
)
|
|
}
|
|
)
|
|
# insert slot before current slot if not last
|
|
if insert_slot.start_time.time() <= start_time:
|
|
new_ranges.append(insert)
|
|
insert = {}
|
|
pending_insert = False
|
|
if insert_slot.end_time.time() >= end_time:
|
|
# set start of next slot if not end of day
|
|
if (
|
|
end_time
|
|
< datetime.strptime("23:59", "%H:%M").time()
|
|
):
|
|
next_start = insert_slot.end_time.time()
|
|
last_time = insert_slot.end_time.time()
|
|
# skip current slot since overlapped by insert slot
|
|
continue
|
|
if split_slot:
|
|
# insert second part of a preceeding slot that was split
|
|
new_ranges.append(split_slot)
|
|
split_slot = {}
|
|
# delay start time of current slot not needed if previous slot was split
|
|
else:
|
|
# delay start time of current slot
|
|
slot.update(
|
|
{
|
|
"start_time": datetime.strftime(
|
|
insert_slot.end_time, "%H:%M"
|
|
).replace("23:59", "24:00")
|
|
}
|
|
)
|
|
else:
|
|
# create copy of slot when insert slot will split last slot to add it later as well
|
|
if insert_slot.end_time.time() < end_time:
|
|
split_slot = copy.deepcopy(slot)
|
|
split_slot.update(
|
|
{
|
|
"start_time": datetime.strftime(
|
|
insert_slot.end_time, "%H:%M"
|
|
).replace("23:59", "24:00")
|
|
}
|
|
)
|
|
if insert_slot.start_time.time() < end_time:
|
|
# shorten end time of current slot when appended at the end
|
|
slot.update(
|
|
{
|
|
"end_time": datetime.strftime(
|
|
insert_slot.start_time, "%H:%M"
|
|
).replace("23:59", "24:00")
|
|
}
|
|
)
|
|
|
|
elif pending_insert and insert_slot.start_time.time() <= end_time:
|
|
# create copy of slot when insert slot will split current slot to add it later
|
|
if insert_slot.end_time.time() < end_time:
|
|
split_slot = copy.deepcopy(slot)
|
|
split_slot.update(
|
|
{
|
|
"start_time": datetime.strftime(
|
|
insert_slot.end_time, "%H:%M"
|
|
).replace("23:59", "24:00")
|
|
}
|
|
)
|
|
# shorten end of preceeding slot
|
|
slot.update(
|
|
{
|
|
"end_time": datetime.strftime(
|
|
insert_slot.start_time, "%H:%M"
|
|
)
|
|
}
|
|
)
|
|
# re-use old slot parms for insert if end time of insert slot is same as original slot
|
|
if insert_slot.end_time.time() == end_time:
|
|
if insert_slot.appliance_load is None:
|
|
insert_slot.appliance_load = int(
|
|
(slot.get("appliance_loads") or [{}])[0].get(
|
|
"power"
|
|
)
|
|
or SolixDefaults.PRESET_DEF
|
|
)
|
|
if insert_slot.allow_export is None:
|
|
insert_slot.allow_export = bool(
|
|
slot.get("turn_on") or SolixDefaults.ALLOW_EXPORT
|
|
)
|
|
if insert_slot.charge_priority_limit is None:
|
|
insert_slot.charge_priority_limit = int(
|
|
slot.get("charge_priority")
|
|
or SolixDefaults.CHARGE_PRIORITY_DEF
|
|
)
|
|
|
|
elif next_start and next_start < end_time:
|
|
# delay start of slot following an insert
|
|
slot.update(
|
|
{
|
|
"start_time": (
|
|
next_start.isoformat(timespec="minutes")
|
|
).replace("23:59", "24:00")
|
|
}
|
|
)
|
|
next_start = None
|
|
|
|
elif not insert_slot and (all_day or start_time <= now < end_time):
|
|
# update parameters in current slot or all slots
|
|
if preset is not None:
|
|
(slot.get("appliance_loads") or [{}])[0].update(
|
|
{
|
|
"power": min(
|
|
max(int(preset), min_load),
|
|
max_load,
|
|
)
|
|
}
|
|
)
|
|
if export is not None:
|
|
slot.update({"turn_on": export})
|
|
if charge_prio is not None:
|
|
slot.update(
|
|
{
|
|
"charge_priority": min(
|
|
max(
|
|
int(charge_prio),
|
|
SolixDefaults.CHARGE_PRIORITY_MIN,
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MAX,
|
|
)
|
|
}
|
|
)
|
|
# clear flag for pending parameter update for actual time
|
|
if start_time <= now < end_time:
|
|
pending_now_update = False
|
|
|
|
if (
|
|
last_time
|
|
<= datetime.strptime(
|
|
(slot.get("start_time") or "00:00").replace("24:00", "23:59"),
|
|
"%H:%M",
|
|
).time()
|
|
):
|
|
new_ranges.append(slot)
|
|
|
|
# fill gap after last slot for current time parameter changes or insert slots
|
|
if insert:
|
|
slot = insert
|
|
new_ranges.append(slot)
|
|
if split_slot:
|
|
# insert second part of a preceeding slot that was split
|
|
new_ranges.append(split_slot)
|
|
split_slot = {}
|
|
|
|
# Track end time of last appended slot in list
|
|
last_time = datetime.strptime(
|
|
(
|
|
str(new_ranges[-1].get("end_time") or "00:00").replace(
|
|
"24:00", "23:59"
|
|
)
|
|
),
|
|
"%H:%M",
|
|
).time()
|
|
|
|
# If no slot exists or new slot to be set, set defaults or given set_slot parameters
|
|
if len(new_ranges) == 0:
|
|
if not set_slot:
|
|
# define parameters to be used for a new slot
|
|
set_slot = SolarbankTimeslot(
|
|
start_time=datetime.strptime("00:00", "%H:%M"),
|
|
end_time=datetime.strptime("23:59", "%H:%M"),
|
|
appliance_load=SolixDefaults.PRESET_DEF
|
|
if preset is None
|
|
else preset,
|
|
allow_export=SolixDefaults.ALLOW_EXPORT
|
|
if export is None
|
|
else export,
|
|
charge_priority_limit=SolixDefaults.CHARGE_PRIORITY_DEF
|
|
if charge_prio is None
|
|
else charge_prio,
|
|
)
|
|
# generate the new slot
|
|
slot = {
|
|
"start_time": datetime.strftime(set_slot.start_time, "%H:%M"),
|
|
"end_time": datetime.strftime(set_slot.end_time, "%H:%M").replace(
|
|
"23:59", "24:00"
|
|
),
|
|
"turn_on": SolixDefaults.ALLOW_EXPORT
|
|
if set_slot.allow_export is None
|
|
else set_slot.allow_export,
|
|
"appliance_loads": [
|
|
{
|
|
"power": min(
|
|
max(
|
|
int(
|
|
SolixDefaults.PRESET_DEF
|
|
if set_slot.appliance_load is None
|
|
else set_slot.appliance_load
|
|
),
|
|
min_load,
|
|
),
|
|
max_load,
|
|
),
|
|
}
|
|
],
|
|
"charge_priority": min(
|
|
max(
|
|
int(
|
|
SolixDefaults.CHARGE_PRIORITY_DEF
|
|
if set_slot.charge_priority_limit is None
|
|
else set_slot.charge_priority_limit
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MIN,
|
|
),
|
|
SolixDefaults.CHARGE_PRIORITY_MAX,
|
|
),
|
|
}
|
|
# use previous appliance name if a slot was defined originally
|
|
if appliance_name:
|
|
(slot.get("appliance_loads") or [{}])[0].update(
|
|
{"name": appliance_name}
|
|
)
|
|
new_ranges.append(slot)
|
|
self._logger.debug(
|
|
"Ranges to apply: %s",
|
|
new_ranges,
|
|
)
|
|
# Make the Api call with final schedule and check for return code, the set call will also update api dict
|
|
# NOTE: set_device_load does not seem to be usable yet for changing the home load, or is only usable in dual bank setups for changing the appliance load share as well?
|
|
schedule.update({"ranges": new_ranges})
|
|
return await self.set_device_parm(
|
|
siteId=siteId,
|
|
paramData=schedule,
|
|
deviceSn=deviceSn,
|
|
)
|
|
|
|
async def get_device_fittings(
|
|
self, siteId: str, deviceSn: str, fromFile: bool = False
|
|
) -> dict:
|
|
r"""Get device fittings.
|
|
|
|
Example data:
|
|
{"data": [{
|
|
"device_sn": "ZDL32D6A3HKXUTN1","product_code": "A17Y0","device_name": "E1600 0W Output Switch","alias_name": "E1600 0W Output Switch",
|
|
"img_url": "https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/2024/01/10/iot-admin/EPBsJ3a5JyMGqA1j/picl_A17Y0_normal.png",
|
|
"bt_ble_id": "","bt_ble_mac": "FC1CEA253CDB","link_time": 1707127936
|
|
}]}
|
|
"""
|
|
data = {"site_id": siteId, "device_sn": deviceSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"device_fittings_{deviceSn}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["get_device_fittings"], json=data
|
|
)
|
|
data = resp.get("data") or {}
|
|
# update devices dict with new fittings data
|
|
fittings = {}
|
|
for fitting in [
|
|
x
|
|
for x in data.get("data") or []
|
|
if isinstance(x, dict) and x.get("device_sn")
|
|
]:
|
|
# remove unnecessary keys from fitting device
|
|
keylist = fitting.keys()
|
|
for key in [
|
|
x for x in ("img_url", "bt_ble_id", "link_time") if x in keylist
|
|
]:
|
|
fitting.pop(key, None)
|
|
fittings[fitting.get("device_sn")] = fitting
|
|
self._update_dev({"device_sn": deviceSn, "fittings": fittings})
|
|
return data
|
|
|
|
async def get_ota_info(
|
|
self, solarbankSn: str = "", inverterSn: str = "", fromFile: bool = False
|
|
) -> dict:
|
|
"""Get the solar info and OTA processing info for a solarbank.
|
|
|
|
Example data:
|
|
{"ota_status": 3,"current_version": "EZ1 2.0.5","timestamp": 1708277846,"version_type": 3}
|
|
"""
|
|
data = {"solar_bank_sn": solarbankSn, "solar_sn": inverterSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(
|
|
self._testdir, f"ota_info_{solarbankSn or inverterSn}.json"
|
|
)
|
|
)
|
|
else:
|
|
resp = await self.request("post", _API_ENDPOINTS["get_ota_info"], json=data)
|
|
return resp.get("data", {})
|
|
|
|
async def get_ota_update(
|
|
self, deviceSn: str, insertSn: str = "", fromFile: bool = False
|
|
) -> dict:
|
|
"""Usage not Clear, process OTA update with confirmation in insertSn?.
|
|
|
|
Example data:
|
|
{"is_ota_update": true,"need_retry": true,"retry_interval": 2000,"device_list": null}
|
|
"""
|
|
data = {"device_sn": deviceSn, "insert_sn": insertSn}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"ota_update_{deviceSn}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["get_ota_update"], json=data
|
|
)
|
|
return resp.get("data", {})
|
|
|
|
async def check_upgrade_record(
|
|
self, recordType: int = 2, fromFile: bool = False
|
|
) -> dict:
|
|
"""Check upgrade record, shows device updates with their last version. Type 0-3 work.
|
|
|
|
Example data:
|
|
{"is_record": true,"device_list": [{
|
|
"device_sn": "9JVB42LJK8J0P5RY","device_name": "","icon": "","last_version": "v1.4.4","device_pn": ""}
|
|
]}
|
|
"""
|
|
data = {"type": recordType}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"check_upgrade_record_{recordType}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["check_upgrade_record"], json=data
|
|
)
|
|
return resp.get("data", {})
|
|
|
|
async def get_upgrade_record(
|
|
self, deviceSn: str | None = None, siteId: str | None = None, recordType: int | None = None, fromFile: bool = False
|
|
) -> dict:
|
|
"""Get upgrade record for a device serial or site ID, shows update history. Type 1 works for solarbank, type 2 for site ID.
|
|
|
|
Example data:
|
|
{"device_sn": "9JVB42LJK8J0P5RY", "site_id": "", "upgrade_record_list": [
|
|
{"upgrade_time": "2024-02-29 12:38:23","upgrade_version": "v1.5.6","pre_version": "v1.4.4","upgrade_type": "1","upgrade_desc": "",
|
|
"device_sn": "9JVB42LJK8J0P5RY","device_name": "9JVB42LJK8J0P5RY","child_upgrade_records": null},
|
|
{"upgrade_time": "2023-12-29 10:23:06","upgrade_version": "v1.4.4","pre_version": "v0.0.6.6","upgrade_type": "1","upgrade_desc": "",
|
|
"device_sn": "9JVB42LJK8J0P5RY","device_name": "9JVB42LJK8J0P5RY","child_upgrade_records": null},
|
|
{"upgrade_time": "2023-11-02 13:43:09","upgrade_version": "v1.4.1","pre_version": "v0.0.6.5","upgrade_type": "1","upgrade_desc": "",
|
|
"device_sn": "9JVB42LJK8J0P5RY","device_name": "9JVB42LJK8J0P5RY","child_upgrade_records": null}]},
|
|
"""
|
|
if deviceSn:
|
|
data = {"device_sn": deviceSn, "type": 1 if recordType is None else recordType}
|
|
elif siteId:
|
|
data = {"site_id": siteId, "type": 2 if recordType is None else recordType}
|
|
else:
|
|
recordType = 0 if recordType is None else recordType
|
|
data = {"type": recordType}
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, f"get_upgrade_record_{deviceSn if deviceSn else siteId if siteId else recordType}.json")
|
|
)
|
|
else:
|
|
resp = await self.request(
|
|
"post", _API_ENDPOINTS["get_upgrade_record"], json=data
|
|
)
|
|
return resp.get("data", {})
|
|
|
|
async def energy_analysis(
|
|
self,
|
|
siteId: str,
|
|
deviceSn: str,
|
|
rangeType: str | None = None,
|
|
startDay: datetime | None = None,
|
|
endDay: datetime | None = None,
|
|
devType: str | None = None,
|
|
) -> dict:
|
|
"""Fetch Energy data for given device and optional time frame.
|
|
|
|
siteId: site ID of device
|
|
deviceSn: Device to fetch data
|
|
rangeType: "day" | "week" | "year"
|
|
startTime: optional start Date and time
|
|
endTime: optional end Date and time
|
|
devType: "solar_production" | "solarbank"
|
|
Example Data for solar_production:
|
|
{'power': [{'time': '2023-10-01', 'value': '3.67'}, {'time': '2023-10-02', 'value': '3.29'}, {'time': '2023-10-03', 'value': '0.55'}],
|
|
'charge_trend': None, 'charge_level': [], 'power_unit': 'wh', 'charge_total': '3.67', 'charge_unit': 'kwh', 'discharge_total': '3.11', 'discharge_unit': 'kwh',
|
|
'charging_pre': '0.49', 'electricity_pre': '0.51', 'others_pre': '0',
|
|
'statistics': [{'type': '1', 'total': '7.51', 'unit': 'kwh'}, {'type': '2', 'total': '7.49', 'unit': 'kg'}, {'type': '3', 'total': '3.00', 'unit': '€'}]}
|
|
"""
|
|
data = {
|
|
"site_id": siteId,
|
|
"device_sn": deviceSn,
|
|
"type": rangeType if rangeType in ["day", "week", "year"] else "day",
|
|
"start_time": startDay.strftime("%Y-%m-%d")
|
|
if startDay
|
|
else datetime.today().strftime("%Y-%m-%d"),
|
|
"device_type": devType
|
|
if devType in ["solar_production", "solarbank"]
|
|
else "solar_production",
|
|
"end_time": endDay.strftime("%Y-%m-%d") if endDay else "",
|
|
}
|
|
resp = await self.request("post", _API_ENDPOINTS["energy_analysis"], json=data)
|
|
return resp.get("data", {})
|
|
|
|
async def energy_daily(
|
|
self,
|
|
siteId: str,
|
|
deviceSn: str,
|
|
startDay: datetime = datetime.today(),
|
|
numDays: int = 1,
|
|
dayTotals: bool = False,
|
|
) -> dict:
|
|
"""Fetch daily Energy data for given interval and provide it in a table format dictionary.
|
|
|
|
Example:
|
|
{"2023-09-29": {"date": "2023-09-29", "solar_production": "1.21", "solarbank_discharge": "0.47", "solarbank_charge": "0.56"},
|
|
"2023-09-30": {"date": "2023-09-30", "solar_production": "3.07", "solarbank_discharge": "1.06", "solarbank_charge": "1.39"}}
|
|
""" # noqa: D413
|
|
table = {}
|
|
today = datetime.today()
|
|
# check daily range and limit to 1 year max and avoid future days
|
|
if startDay > today:
|
|
startDay = today
|
|
numDays = 1
|
|
elif (startDay + timedelta(days=numDays)) > today:
|
|
numDays = (today - startDay).days + 1
|
|
numDays = min(366, max(1, numDays))
|
|
# first get solarbank export
|
|
resp = await self.energy_analysis(
|
|
siteId=siteId,
|
|
deviceSn=deviceSn,
|
|
rangeType="week",
|
|
startDay=startDay,
|
|
endDay=startDay + timedelta(days=numDays - 1),
|
|
devType="solarbank",
|
|
)
|
|
for item in resp.get("power", []):
|
|
daystr = item.get("time", None)
|
|
if daystr:
|
|
table.update(
|
|
{
|
|
daystr: {
|
|
"date": daystr,
|
|
"solarbank_discharge": item.get("value", ""),
|
|
}
|
|
}
|
|
)
|
|
# Add solar production which contains percentages
|
|
resp = await self.energy_analysis(
|
|
siteId=siteId,
|
|
deviceSn=deviceSn,
|
|
rangeType="week",
|
|
startDay=startDay,
|
|
endDay=startDay + timedelta(days=numDays - 1),
|
|
devType="solar_production",
|
|
)
|
|
for item in resp.get("power", []):
|
|
daystr = item.get("time", None)
|
|
if daystr:
|
|
entry = table.get(daystr, {})
|
|
entry.update(
|
|
{"date": daystr, "solar_production": item.get("value", "")}
|
|
)
|
|
table.update({daystr: entry})
|
|
# Solarbank charge and percentages are only received as total value for given interval. If requested, make daily queries for given interval with some delay
|
|
if dayTotals:
|
|
if numDays == 1:
|
|
daystr = startDay.strftime("%Y-%m-%d")
|
|
entry = table.get(daystr, {})
|
|
entry.update(
|
|
{
|
|
"date": daystr,
|
|
"solarbank_charge": resp.get("charge_total", ""),
|
|
"battery_percentage": resp.get("charging_pre", ""),
|
|
"solar_percentage": resp.get("electricity_pre", ""),
|
|
"other_percentage": resp.get("others_pre", ""),
|
|
}
|
|
)
|
|
table.update({daystr: entry})
|
|
else:
|
|
daylist = [startDay + timedelta(days=x) for x in range(numDays)]
|
|
for day in daylist:
|
|
daystr = day.strftime("%Y-%m-%d")
|
|
resp = await self.energy_analysis(
|
|
siteId=siteId,
|
|
deviceSn=deviceSn,
|
|
rangeType="week",
|
|
startDay=day,
|
|
endDay=day,
|
|
devType="solar_production",
|
|
)
|
|
entry = table.get(daystr, {})
|
|
entry.update(
|
|
{
|
|
"date": daystr,
|
|
"solarbank_charge": resp.get("charge_total", ""),
|
|
"battery_percentage": resp.get("charging_pre", ""),
|
|
"solar_percentage": resp.get("electricity_pre", ""),
|
|
"other_percentage": resp.get("others_pre", ""),
|
|
}
|
|
)
|
|
table.update({daystr: entry})
|
|
return table
|
|
|
|
async def home_load_chart(self, siteId: str, deviceSn: str | None = None) -> dict:
|
|
"""Get home load chart data.
|
|
|
|
Example data:
|
|
{"data": []}
|
|
"""
|
|
data = {"site_id": siteId}
|
|
if deviceSn:
|
|
data.update({"device_sn": deviceSn})
|
|
resp = await self.request("post", _API_ENDPOINTS["home_load_chart"], json=data)
|
|
return resp.get("data", {})
|
|
|
|
async def get_message_unread(self, fromFile: bool = False) -> dict:
|
|
"""Get the unread messages for account.
|
|
|
|
Example data:
|
|
{"has_unread_msg": false}
|
|
"""
|
|
if fromFile:
|
|
resp = self._loadFromFile(
|
|
os.path.join(self._testdir, "message_unread.json")
|
|
)
|
|
else:
|
|
resp = await self.request("get", _API_ENDPOINTS["get_message_unread"])
|
|
# save unread msg flag in each known site
|
|
data = resp.get("data", {})
|
|
for siteId in self.sites:
|
|
self._update_site(siteId, data)
|
|
return data
|