1
0
Fork 0

release 1.0

Initial release with code cleanup and some required changes for use in HA integration
This commit is contained in:
Thomas Luther 2024-02-06 10:24:29 +01:00
parent 072ce0e731
commit d8bbb6e3c5
22 changed files with 1084 additions and 659 deletions

View File

@ -1,15 +1,15 @@
<img src="https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png" alt="Solarbank E1600 Logo" title="Anker Solix API" align="right" height="100" /> <img src="https://public-aiot-fra-prod.s3.dualstack.eu-central-1.amazonaws.com/anker-power/public/product/anker-power/e9478c2d-e665-4d84-95d7-dd4844f82055/20230719-144818.png" alt="Solarbank E1600 Logo" title="Anker Solix Api" align="right" height="100" />
# Anker Solix API # Anker Solix Api
[![github licence](https://img.shields.io/badge/Licence-MIT-orange)](https://github.com/thomluther/anker-solix-api/blob/main/LICENSE) [![github licence](https://img.shields.io/badge/Licence-MIT-orange)](https://github.com/thomluther/anker-solix-api/blob/main/LICENSE)
![python badge](https://img.shields.io/badge/Made%20with-Python-orange) ![python badge](https://img.shields.io/badge/Made%20with-Python-orange)
This is an experimental Python library for Anker Solix Power devices (Solarbank, Inverter etc). This is an experimental Python library for Anker Solix Power devices (Solarbank, Inverter etc).
🚨 This is by no means an official Anker API. 🚨 🚨 This is by no means an official Anker Api. 🚨
🚨 It can break at any time, or API request can be removed/added/changed and break some of the endpoint methods used in this API.🚨 🚨 It can break at any time, or Api request can be removed/added/changed and break some of the endpoint methods used in this Api.🚨
# Python Versions # Python Versions
@ -27,13 +27,13 @@ pip install aiohttp
# Anker Account Information # Anker Account Information
Because of the way the Anker Solix API works, one account with email/password cannot be used for the Anker mobile App and this API in parallel. Because of the way the Anker Solix Api works, one account with email/password cannot be used for the Anker mobile App and this Api in parallel.
The Anker Cloud allows only one request token per account at any time. Each new authentication request by a client will create a new token and drop a previous token. The Anker Cloud allows only one request token per account at any time. Each new authentication request by a client will create a new token and drop a previous token.
Therefore usage of this API may kick out your account login in the mobile app. Therefore usage of this Api may kick out your account login in the mobile app.
However, starting with Anker mobile app release 2.0, you can share your defined system(s) with 'family members'. However, starting with Anker mobile app release 2.0, you can share your defined system(s) with 'family members'.
Therefore it is recommended to create a second Anker account with a different email address and share your defined system(s) with the second account. Therefore it is recommended to create a second Anker account with a different email address and share your defined system(s) with the second account.
Attention: A shared account is only a member of the shared system, and as such currently has no permissions to access or query device details of the shared system. Attention: A shared account is only a member of the shared system, and as such currently has no permissions to access or query device details of the shared system.
Therefore an API homepage query will neither display any data for a shared account. However, a shared account can receive API scene/site details of shared systems (App system = API site), Therefore an Api homepage query will neither display any data for a shared account. However, a shared account can receive Api scene/site details of shared systems (App system = Api site),
which is equivalent to what is displayed in the mobile app on the home screen for the selected system. which is equivalent to what is displayed in the mobile app on the home screen for the selected system.
# Usage # Usage
@ -48,13 +48,13 @@ from aiohttp import ClientSession
from api import api, errors from api import api, errors
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
#_LOGGER.setLevel(logging.DEBUG) # enable for detailed API output #_LOGGER.setLevel(logging.DEBUG) # enable for detailed Api output
async def main() -> None: async def main() -> None:
"""Create the aiohttp session and run the example.""" """Create the aiohttp session and run the example."""
async with ClientSession() as websession: async with ClientSession() as websession:
"""put your code here, example""" """put your code here, example"""
myapi = api.API("username@domain.com","password","de",websession, _LOGGER) myapi = api.AnkerSolixApi("username@domain.com","password","de",websession, _LOGGER)
await myapi.update_sites() await myapi.update_sites()
await myapi.update_device_details() await myapi.update_device_details()
print("System Overview:") print("System Overview:")
@ -62,7 +62,7 @@ async def main() -> None:
print("Device Overview:") print("Device Overview:")
print(json.dumps(myapi.devices, indent=2)) print(json.dumps(myapi.devices, indent=2))
"""run async main""" # run async main
if __name__ == '__main__': if __name__ == '__main__':
try: try:
asyncio.run(main()) asyncio.run(main())
@ -70,36 +70,36 @@ if __name__ == '__main__':
print(f'{type(err)}: {err}') print(f'{type(err)}: {err}')
``` ```
The API class provides 2 main methods: The AnkerSolixApi class provides 2 main methods:
- `API.update_sites()` to query overview data for all accessible sites and store data in API dictionaries `API.sites` and `API.devices` for quick access. - `AnkerSolixApi.update_sites()` to query overview data for all accessible sites and store data in dictionaries `AnkerSolixApi.sites` and `AnkerSolixApi.devices` for quick access.
This method could be run in regular intervals (30s or more) to fetch new data of the systems This method could be run in regular intervals (30sec or more) to fetch new data of the systems
- `API.update_device_details()` to query further settings for the device serials as found in the sites query. - `AnkerSolixApi.update_device_details()` to query further settings for the device serials as found in the sites query.
This method should be run less frequently since this will mostly fetch various device configuration settings and needs multiple queries. This method should be run less frequently since this will mostly fetch various device configuration settings and needs multiple queries.
It currently is developped for Solarbank devices only, further device types such as Inverters or Power Stations could be added once example data is available. It currently is developped for Solarbank devices only, further device types such as Inverters or Power Stations could be added once example data is available.
Check out `test_api.py` and other python executable tools that may help to leverage and explore the API for your Anker power system. Check out `test_api.py` and other python executable tools that may help to leverage and explore the Api for your Anker power system.
The subfolder [`examples`](https://github.com/thomluther/anker-solix-api/tree/main/examples) contains json files with anonymized responses of the The subfolder [`examples`](https://github.com/thomluther/anker-solix-api/tree/main/examples) contains json files with anonymized responses of the
`export_system.py` module giving you an idea of how various API responses look like. (Note that the Solarbank was switched off when the data were pulled, so some fields may be empty) `export_system.py` module giving you an idea of how various Api responses look like. (Note that the Solarbank was switched off when the data were pulled, so some fields may be empty)
Those json files can also be used to develop/debug the API for system constellations not available to the developper. Those json files can also be used to develop/debug the Api for system constellations not available to the developper.
# API Tools # AnkerSolixApi Tools
## test_api.py ## test_api.py
Example exec module that can be used to explore and test API methods or direct enpoint requests with parameters. Example exec module that can be used to explore and test AnkerSolixApi methods or direct enpoint requests with parameters.
## export_system.py ## export_system.py
Example exec module to use the Anker API for export of defined system data and device details. Example exec module to use the Anker Api for export of defined system data and device details.
This module will prompt for the Anker account details if not pre-set in the header. This module will prompt for the Anker account details if not pre-set in the header.
Upon successfull authentication, you can specify a subfolder for the exported JSON files received as API query response, defaulting to your nick name Upon successfull authentication, you can specify a subfolder for the exported JSON files received as Api query response, defaulting to your nick name
Optionally you can specify whether personalized information in the response data should be randomized in the files, like SNs, Site IDs, Trace IDs etc. Optionally you can specify whether personalized information in the response data should be randomized in the files, like SNs, Site IDs, Trace IDs etc.
You can review the response files afterwards. They can be used as examples for dedicated data extraction from the devices. You can review the response files afterwards. They can be used as examples for dedicated data extraction from the devices.
Optionally the API class can use the json files for debugging and testing on various system outputs. Optionally the AnkerSolixApi class can use the json files for debugging and testing on various system outputs.
## solarbank_monitor.py ## solarbank_monitor.py
Example exec module to use the Anker API for continously querying and displaying important solarbank parameters Example exec module to use the Anker Api for continously querying and displaying important solarbank parameters
This module will prompt for the Anker account details if not pre-set in the header. This module will prompt for the Anker account details if not pre-set in the header.
Upon successfull authentication, you will see the solarbank parameters displayed and refreshed at reqular interval. Upon successfull authentication, you will see the solarbank parameters displayed and refreshed at reqular interval.
Note: When the system owning account is used, more details for the solarbank can be queried and displayed. Note: When the system owning account is used, more details for the solarbank can be queried and displayed.
@ -107,12 +107,12 @@ Attention: During executiion of this module, the used account cannot be used in
## energy_csv.py ## energy_csv.py
Example exec module to use the Anker API for export of daily Solarbank Energy Data. Example exec module to use the Anker Api for export of daily Solarbank Energy Data.
This method will prompt for the Anker account details if not pre-set in the header. This method will prompt for the Anker account details if not pre-set in the header.
Then you can specify a start day and the number of days for data extraction from the Anker Cloud. Then you can specify a start day and the number of days for data extraction from the Anker Cloud.
Note: The Solar production and Solarbank discharge can be queried across the full range. The solarbank Note: The Solar production and Solarbank discharge can be queried across the full range. The solarbank
charge however can be queried only as total for an interval (e.g. day). Therefore when solarbank charge charge however can be queried only as total for an interval (e.g. day). Therefore when solarbank charge
data is also selected for export, an additional API query per day is required. data is also selected for export, an additional Api query per day is required.
The received daily values will be exported into a csv file. The received daily values will be exported into a csv file.
@ -122,18 +122,17 @@ The received daily values will be exported into a csv file.
![last commit](https://img.shields.io/github/last-commit/thomluther/anker-solix-api?color=orange) ![last commit](https://img.shields.io/github/last-commit/thomluther/anker-solix-api?color=orange)
[![Community Discussion](https://img.shields.io/badge/Home%20Assistant%20Community-Discussion-orange)](https://community.home-assistant.io/t/feature-request-integration-or-addon-for-anker-solix-e1600-solarbank/641086) [![Community Discussion](https://img.shields.io/badge/Home%20Assistant%20Community-Discussion-orange)](https://community.home-assistant.io/t/feature-request-integration-or-addon-for-anker-solix-e1600-solarbank/641086)
Github is used to host code, to track issues and feature requests, as well as accept pull requests.
Pull requests are the best way to propose changes to the codebase.
1. [Check for open features/bugs](https://github.com/thomluther/anker-solix-api/issues) 1. [Check for open features/bugs](https://github.com/thomluther/anker-solix-api/issues)
or [initiate a discussion on one](https://github.com/thomluther/anker-solix-api/issues/new). or [initiate a discussion on one](https://github.com/thomluther/anker-solix-api/issues/new).
2. [Fork the repository](https://github.com/thomluther/anker-solix-api/fork). 1. [Fork the repository](https://github.com/thomluther/anker-solix-api/fork).
3. Install the dev environment: `make init`. 1. Fork the repo and create your branch from `main`.
4. Enter the virtual environment: `source ./venv/bin/activate` 1. If you've changed something, update the documentation.
5. Code your new feature or bug fix. 1. Test your contribution.
6. Write a test that covers your new functionality. 1. Issue that pull request!
7. Update `README.md` with any new documentation.
8. Run tests and ensure 100% code coverage: `make coverage`
9. Ensure you have no linting errors: `make lint`
10. Ensure you have typed your code correctly: `make typing`
11. Submit a pull request!
# Acknowledgements / Credits # Acknowledgements / Credits
@ -144,4 +143,4 @@ The received daily values will be exported into a csv file.
# Showing Your Appreciation # Showing Your Appreciation
If you like this project, please give it a star on [GitHub](https://github.com/thomluther/anker-solix-api) If you like this project, please give it a star on [GitHub](https://github.com/thomluther/anker-solix-api)

File diff suppressed because it is too large Load Diff

View File

@ -1,73 +1,75 @@
"""Define package errors.""" """Define Anker Solix API errors."""
from typing import Dict, Type
from __future__ import annotations
class AnkerSolixError(Exception): class AnkerSolixError(Exception):
"""Define a base error.""" """Define a base error."""
pass
class AuthorizationError(AnkerSolixError): class AuthorizationError(AnkerSolixError):
"""Authorization error.""" """Authorization error."""
pass
class ConnectError(AnkerSolixError): class ConnectError(AnkerSolixError):
"""Connection error.""" """Connection error."""
pass
class NetworkError(AnkerSolixError): class NetworkError(AnkerSolixError):
"""Network error.""" """Network error."""
pass
class ServerError(AnkerSolixError): class ServerError(AnkerSolixError):
"""Server error.""" """Server error."""
pass
class RequestError(AnkerSolixError): class RequestError(AnkerSolixError):
"""Request error.""" """Request error."""
pass
class VerifyCodeError(AnkerSolixError): class VerifyCodeError(AnkerSolixError):
"""Verify code error.""" """Verify code error."""
pass
class VerifyCodeExpiredError(AnkerSolixError): class VerifyCodeExpiredError(AnkerSolixError):
"""Verification code has expired.""" """Verification code has expired."""
pass
class NeedVerifyCodeError(AnkerSolixError): class NeedVerifyCodeError(AnkerSolixError):
"""Need verification code error.""" """Need verification code error."""
pass
class VerifyCodeMaxError(AnkerSolixError): class VerifyCodeMaxError(AnkerSolixError):
"""Maximum attempts of verications error.""" """Maximum attempts of verications error."""
pass
class VerifyCodeNoneMatchError(AnkerSolixError): class VerifyCodeNoneMatchError(AnkerSolixError):
"""Verify code none match error.""" """Verify code none match error."""
pass
class VerifyCodePasswordError(AnkerSolixError): class VerifyCodePasswordError(AnkerSolixError):
"""Verify code password error.""" """Verify code password error."""
pass
class ClientPublicKeyError(AnkerSolixError): class ClientPublicKeyError(AnkerSolixError):
"""Define an error for client public key error.""" """Define an error for client public key error."""
pass
class TokenKickedOutError(AnkerSolixError): class TokenKickedOutError(AnkerSolixError):
"""Define an error for token does not exist because it was kicked out.""" """Define an error for token does not exist because it was kicked out."""
pass
class InvalidCredentialsError(AnkerSolixError): class InvalidCredentialsError(AnkerSolixError):
"""Define an error for unauthenticated accounts.""" """Define an error for unauthenticated accounts."""
pass
class RetryExceeded(AnkerSolixError): class RetryExceeded(AnkerSolixError):
"""Define an error for exceeded retry attempts. Please try again in 24 hours.""" """Define an error for exceeded retry attempts. Please try again in 24 hours."""
pass
ERRORS: Dict[int, Type[AnkerSolixError]] = {
ERRORS: dict[int, type[AnkerSolixError]] = {
401: AuthorizationError, 401: AuthorizationError,
403: AuthorizationError,
997: ConnectError, 997: ConnectError,
998: NetworkError, 998: NetworkError,
999: ServerError, 999: ServerError,
@ -83,6 +85,7 @@ ERRORS: Dict[int, Type[AnkerSolixError]] = {
26070: ClientPublicKeyError, 26070: ClientPublicKeyError,
26084: TokenKickedOutError, 26084: TokenKickedOutError,
26108: InvalidCredentialsError, 26108: InvalidCredentialsError,
26156: InvalidCredentialsError,
100053: RetryExceeded, 100053: RetryExceeded,
} }

View File

@ -1,5 +1,5 @@
""" """Example exec module to use the Anker API for export of daily Solarbank Energy Data.
Example exec module to use the Anker API for export of daily Solarbank Energy Data.
This method will prompt for the Anker account details if not pre-set in the header. This method will prompt for the Anker account details if not pre-set in the header.
Then you can specify a start day and the number of days for data extraction from the Anker Cloud. Then you can specify a start day and the number of days for data extraction from the Anker Cloud.
Note: The Solar production and Solarbank discharge can be queried across the full range. The solarbank Note: The Solar production and Solarbank discharge can be queried across the full range. The solarbank
@ -9,15 +9,22 @@ The received daily values will be exported into a csv file.
""" """
import asyncio import asyncio
from aiohttp import ClientSession import csv
from datetime import datetime from datetime import datetime
from api import api
from getpass import getpass from getpass import getpass
import json, logging, sys, csv import json
import logging
import sys
from aiohttp import ClientSession
from api import api
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
_LOGGER.addHandler(logging.StreamHandler(sys.stdout)) _LOGGER.addHandler(logging.StreamHandler(sys.stdout))
#_LOGGER.setLevel(logging.DEBUG) # enable for debug output # _LOGGER.setLevel(logging.DEBUG) # enable for debug output
CONSOLE: logging.Logger = logging.getLogger("console")
CONSOLE.addHandler(logging.StreamHandler(sys.stdout))
CONSOLE.setLevel(logging.INFO)
# Optional default Anker Account credentials to be used # Optional default Anker Account credentials to be used
USER = "" USER = ""
@ -26,14 +33,15 @@ COUNTRY = ""
async def main() -> None: async def main() -> None:
global USER, PASSWORD, COUNTRY """Run main to export energy history from cloud."""
print("Exporting daily Energy data for Anker Solarbank:") global USER, PASSWORD, COUNTRY # noqa: PLW0603
CONSOLE.info("Exporting daily Energy data for Anker Solarbank:")
if USER == "": if USER == "":
print("\nEnter Anker Account credentials:") CONSOLE.info("\nEnter Anker Account credentials:")
USER = input("Username (email): ") USER = input("Username (email): ")
if USER == "": if USER == "":
return False return False
PASSWORD = getpass("Password: ") PASSWORD = getpass("Password: ")
if PASSWORD == "": if PASSWORD == "":
return False return False
COUNTRY = input("Country ID (e.g. DE): ") COUNTRY = input("Country ID (e.g. DE): ")
@ -41,66 +49,87 @@ async def main() -> None:
return False return False
try: try:
async with ClientSession() as websession: async with ClientSession() as websession:
print("\nTrying authentication...",end="") CONSOLE.info("\nTrying authentication...")
myapi = api.API(USER,PASSWORD,COUNTRY,websession, _LOGGER) myapi = api.AnkerSolixApi(USER, PASSWORD, COUNTRY, websession, _LOGGER)
if await myapi.async_authenticate(): if await myapi.async_authenticate():
print("OK") CONSOLE.info("OK")
else: else:
print("CACHED") # Login validation will be done during first API call CONSOLE.info(
"CACHED"
) # Login validation will be done during first API call
# Refresh the site and device info of the API # Refresh the site and device info of the API
print("\nUpdating Site Info...", end="") CONSOLE.info("\nUpdating Site Info...")
if (await myapi.update_sites()) == {}: if (await myapi.update_sites()) == {}:
print("NO INFO") CONSOLE.info("NO INFO")
return False return False
print("OK") CONSOLE.info("OK")
print(f"\nDevices: {len(myapi.devices)}") CONSOLE.info(f"\nDevices: {len(myapi.devices)}")
_LOGGER.debug(json.dumps(myapi.devices, indent=2)) _LOGGER.debug(json.dumps(myapi.devices, indent=2))
for sn, device in myapi.devices.items(): for sn, device in myapi.devices.items():
if device.get("type") == "solarbank": if device.get("type") == "solarbank":
print(f"Found {device.get('name')} SN: {sn}") CONSOLE.info(f"Found {device.get('name')} SN: {sn}")
try: try:
daystr = input("\nEnter start day for daily energy data (yyyy-mm-dd) or enter to skip: ") daystr = input(
"\nEnter start day for daily energy data (yyyy-mm-dd) or enter to skip: "
)
if daystr == "": if daystr == "":
print(f"Skipped SN: {sn}, checking for next Solarbank...") CONSOLE.info(
f"Skipped SN: {sn}, checking for next Solarbank..."
)
continue continue
startday = datetime.fromisoformat(daystr) startday = datetime.fromisoformat(daystr)
numdays = int(input("How many days to query (1-366): ")) numdays = int(input("How many days to query (1-366): "))
daytotals = input("Do you want to include daily total data (e.g. solarbank charge) which require API query per day? (Y/N): ") daytotals = input(
daytotals = daytotals.upper() in ["Y","YES","TRUE",1] "Do you want to include daily total data (e.g. solarbank charge) which require API query per day? (Y/N): "
filename = input(f"CSV filename for export (daily_energy_{daystr}.csv): ") )
daytotals = daytotals.upper() in ["Y", "YES", "TRUE", 1]
filename = input(
f"CSV filename for export (daily_energy_{daystr}.csv): "
)
if filename == "": if filename == "":
filename = f"daily_energy_{daystr}.csv" filename = f"daily_energy_{daystr}.csv"
except ValueError: except ValueError:
return False return False
print(f"Queries may take up to {numdays*daytotals + 2} seconds...please wait...") CONSOLE.info(
data = await myapi.energy_daily(siteId=device.get("site_id"),deviceSn=sn,startDay=startday,numDays=numdays,dayTotals=daytotals) f"Queries may take up to {numdays*daytotals + 2} seconds...please wait..."
)
data = await myapi.energy_daily(
siteId=device.get("site_id"),
deviceSn=sn,
startDay=startday,
numDays=numdays,
dayTotals=daytotals,
)
_LOGGER.debug(json.dumps(data, indent=2)) _LOGGER.debug(json.dumps(data, indent=2))
# Write csv file # Write csv file
if len(data) > 0: if len(data) > 0:
with open(filename, 'w', newline='') as csvfile: with open(
filename, "w", newline="", encoding="utf-8"
) as csvfile:
fieldnames = (next(iter(data.values()))).keys() fieldnames = (next(iter(data.values()))).keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader() writer.writeheader()
writer.writerows(data.values()) writer.writerows(data.values())
print(f"\nCompleted: Successfully exported data to {filename}") CONSOLE.info(
f"\nCompleted: Successfully exported data to {filename}"
)
return True return True
print("No data received for device") CONSOLE.info("No data received for device")
return False return False
print("No accepted Solarbank device found.") CONSOLE.info("No accepted Solarbank device found.")
return False return False
except Exception as err: except Exception as err:
print(f'{type(err)}: {err}') CONSOLE.info(f"{type(err)}: {err}")
return False return False
"""run async main""" # run async main
if __name__ == '__main__': if __name__ == "__main__":
try: try:
if not asyncio.run(main()): if not asyncio.run(main()):
print("Aborted!") CONSOLE.info("Aborted!")
except Exception as err: except Exception as exception:
print(f'{type(err)}: {err}') CONSOLE.info(f"{type(exception)}: {exception}")

View File

@ -0,0 +1,5 @@
SETUP:
HM 600 Inverter
2x 395 W panels.
1x solarbank e1600
1 System in Anker App

View File

@ -1,5 +1,5 @@
""" """Example exec module to use the Anker API for export of defined system data and device details.
Example exec module to use the Anker API for export of defined system data and device details.
This module will prompt for the Anker account details if not pre-set in the header. This module will prompt for the Anker account details if not pre-set in the header.
Upon successfull authentication, you can specify a subfolder for the exported JSON files received as API query response, defaulting to your nick name. Upon successfull authentication, you can specify a subfolder for the exported JSON files received as API query response, defaulting to your nick name.
Optionally you can specify whether personalized information in the response data should be randomized in the files, like SNs, Site IDs, Trace IDs etc. Optionally you can specify whether personalized information in the response data should be randomized in the files, like SNs, Site IDs, Trace IDs etc.
@ -8,102 +8,133 @@ Optionally the API class can use the json files for debugging and testing on var
""" """
import asyncio import asyncio
from aiohttp import ClientSession
from datetime import datetime
from getpass import getpass
from contextlib import suppress from contextlib import suppress
import json, logging, sys, csv, os, time, string, random from getpass import getpass
import json
import logging
import os
import random
import string
import sys
import time
from aiohttp import ClientSession
from api import api from api import api
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
_LOGGER.addHandler(logging.StreamHandler(sys.stdout)) _LOGGER.addHandler(logging.StreamHandler(sys.stdout))
#_LOGGER.setLevel(logging.DEBUG) # enable for debug output # _LOGGER.setLevel(logging.DEBUG) # enable for debug output
CONSOLE: logging.Logger = logging.getLogger("console")
CONSOLE.addHandler(logging.StreamHandler(sys.stdout))
CONSOLE.setLevel(logging.INFO)
# Optional default Anker Account credentials to be used # Optional default Anker Account credentials to be used
USER = "" USER = ""
PASSWORD = "" PASSWORD = ""
COUNTRY = "" COUNTRY = ""
RANDOMIZE = True # Global flag to save randomize decission RANDOMIZE = True # Global flag to save randomize decission
RANDOMDATA = {} # Global dict for randomized data, printed at the end RANDOMDATA = {} # Global dict for randomized data, printed at the end
def randomize(val, key: str = "") -> str: def randomize(val, key: str = "") -> str:
"""Randomize a given string while maintaining its format if format is known for given key name. """Randomize a given string while maintaining its format if format is known for given key name.
Reuse same randomization if value was already randomized"""
global RANDOMDATA Reuse same randomization if value was already randomized
"""
global RANDOMDATA # noqa: PLW0602
if not RANDOMIZE: if not RANDOMIZE:
return str(val) return str(val)
randomstr = RANDOMDATA.get(val,"") randomstr = RANDOMDATA.get(val, "")
if not randomstr and val: if not randomstr and val:
if "_sn" in key: if "_sn" in key:
randomstr = "".join(random.choices(string.ascii_uppercase+string.digits, k=len(val))) randomstr = "".join(
random.choices(string.ascii_uppercase + string.digits, k=len(val))
)
elif "bt_ble_" in key: elif "bt_ble_" in key:
"""Handle values with and without : """ # Handle values with and without ':'
temp = val.replace(":","") temp = val.replace(":", "")
randomstr = RANDOMDATA.get(temp) # retry existing randomized value without : randomstr = RANDOMDATA.get(
temp
) # retry existing randomized value without :
if not randomstr: if not randomstr:
randomstr = "".join(random.choices(string.hexdigits.upper(), k=len(temp))) randomstr = "".join(
random.choices(string.hexdigits.upper(), k=len(temp))
)
if ":" in val: if ":" in val:
RANDOMDATA.update({temp: randomstr}) # save also key value without : RANDOMDATA.update({temp: randomstr}) # save also key value without :
randomstr = ':'.join(a+b for a,b in zip(randomstr[::2], randomstr[1::2])) randomstr = ":".join(
a + b for a, b in zip(randomstr[::2], randomstr[1::2])
)
elif "_id" in key: elif "_id" in key:
for part in val.split("-"): for part in val.split("-"):
if randomstr: if randomstr:
randomstr = "-".join([randomstr,"".join(random.choices(string.hexdigits.lower(), k=len(part)))]) randomstr = "-".join(
[
randomstr,
"".join(
random.choices(string.hexdigits.lower(), k=len(part))
),
]
)
else: else:
randomstr = "".join(random.choices(string.hexdigits.lower(), k=len(part))) randomstr = "".join(
random.choices(string.hexdigits.lower(), k=len(part))
)
elif "wifi_name" in key: elif "wifi_name" in key:
idx = sum(1 for s in RANDOMDATA.values() if 'wifi-network-' in s) idx = sum(1 for s in RANDOMDATA.values() if "wifi-network-" in s)
randomstr = f'wifi-network-{idx+1}' randomstr = f"wifi-network-{idx+1}"
else: else:
# default randomize format # default randomize format
randomstr = "".join(random.choices(string.ascii_letters, k=len(val))) randomstr = "".join(random.choices(string.ascii_letters, k=len(val)))
RANDOMDATA.update({val: randomstr}) RANDOMDATA.update({val: randomstr})
return randomstr return randomstr
def check_keys(data): def check_keys(data):
"""Recursive traversal of complex nested objects to randomize value for certain keys""" """Recursive traversal of complex nested objects to randomize value for certain keys."""
if isinstance(data, str) or isinstance(data, int): if isinstance(data, int | str):
return data return data
for k, v in data.copy().items(): for k, v in data.copy().items():
if isinstance(v, dict): if isinstance(v, dict):
v = check_keys(v) v = check_keys(v)
if isinstance(v, list): if isinstance(v, list):
v = [check_keys(i) for i in v] v = [check_keys(i) for i in v]
"""Randomize value for certain keys""" # Randomize value for certain keys
if any(x in k for x in ["_sn","site_id","trace_id","bt_ble_","wifi_name"]): if any(x in k for x in ["_sn", "site_id", "trace_id", "bt_ble_", "wifi_name"]):
data[k] = randomize(v,k) data[k] = randomize(v, k)
return data return data
def export(filename: str, d: dict = {}) -> None: def export(filename: str, d: dict = None) -> None:
"""Save dict data to given file""" """Save dict data to given file."""
time.sleep(1) # central delay between multiple requests if not d:
d = {}
time.sleep(1) # central delay between multiple requests
if len(d) == 0: if len(d) == 0:
print(f"WARNING: File {filename} not saved because JSON is empty") CONSOLE.info(f"WARNING: File {filename} not saved because JSON is empty")
return return
elif RANDOMIZE: elif RANDOMIZE:
d = check_keys(d) d = check_keys(d)
try: try:
with open(filename, 'w') as file: with open(filename, "w", encoding="utf-8") as file:
json.dump(d, file, indent=2) json.dump(d, file, indent=2)
print(f"Saved JSON to file {filename}") CONSOLE.info(f"Saved JSON to file {filename}")
except Exception as err: except Exception as err:
print(f"ERROR: Failed to save JSON to file {filename}") CONSOLE.error(f"ERROR: Failed to save JSON to file {filename}: {err}")
return return
async def main() -> bool: async def main() -> bool: # noqa: C901
global USER, PASSWORD, COUNTRY, RANDOMIZE """Run main function to export config."""
print("Exporting found Anker Solix system data for all assigned sites:") global USER, PASSWORD, COUNTRY, RANDOMIZE # noqa: PLW0603
CONSOLE.info("Exporting found Anker Solix system data for all assigned sites:")
if USER == "": if USER == "":
print("\nEnter Anker Account credentials:") CONSOLE.info("\nEnter Anker Account credentials:")
USER = input("Username (email): ") USER = input("Username (email): ")
if USER == "": if USER == "":
return False return False
PASSWORD = getpass("Password: ") PASSWORD = getpass("Password: ")
if PASSWORD == "": if PASSWORD == "":
return False return False
COUNTRY = input("Country ID (e.g. DE): ") COUNTRY = input("Country ID (e.g. DE): ")
@ -111,119 +142,237 @@ async def main() -> bool:
return False return False
try: try:
async with ClientSession() as websession: async with ClientSession() as websession:
print("\nTrying authentication...",end="") CONSOLE.info("\nTrying authentication...")
myapi = api.API(USER,PASSWORD,COUNTRY,websession, _LOGGER) myapi = api.AnkerSolixApi(USER, PASSWORD, COUNTRY, websession, _LOGGER)
if await myapi.async_authenticate(): if await myapi.async_authenticate():
print("OK") CONSOLE.info("OK")
else: else:
print("CACHED") # Login validation will be done during first API call CONSOLE.info(
"CACHED"
random = input(f"\nDo you want to randomize unique IDs and SNs in exported files? (default: {'YES' if RANDOMIZE else 'NO'}) (Y/N): ") ) # Login validation will be done during first API call
if random != "" or not isinstance(RANDOMIZE,bool):
RANDOMIZE = random.upper() in ["Y","YES","TRUE",1] resp = input(
nickname = myapi.nickname.replace("*","#") # avoid filesystem problems with * in user nicknames f"\nDo you want to randomize unique IDs and SNs in exported files? (default: {'YES' if RANDOMIZE else 'NO'}) (Y/N): "
)
if resp != "" or not isinstance(RANDOMIZE, bool):
RANDOMIZE = resp.upper() in ["Y", "YES", "TRUE", 1]
nickname = myapi.nickname.replace(
"*", "#"
) # avoid filesystem problems with * in user nicknames
folder = input(f"Subfolder for export (default: {nickname}): ") folder = input(f"Subfolder for export (default: {nickname}): ")
if folder == "": if folder == "":
if nickname == "": if nickname == "":
return False return False
else: folder = nickname
folder = nickname os.makedirs(folder, exist_ok=True)
os.makedirs(folder, exist_ok=True)
# first update sites in API object # first update sites in API object
print("\nQuerying site information...") CONSOLE.info("\nQuerying site information...")
await myapi.update_sites() await myapi.update_sites()
print(f"Sites: {len(myapi.sites)}, Devices: {len(myapi.devices)}") CONSOLE.info(f"Sites: {len(myapi.sites)}, Devices: {len(myapi.devices)}")
_LOGGER.debug(json.dumps(myapi.devices, indent=2)) _LOGGER.debug(json.dumps(myapi.devices, indent=2))
# Query API using direct endpoints to save full response of each query in json files # Query API using direct endpoints to save full response of each query in json files
print("\nExporting homepage...") CONSOLE.info("\nExporting homepage...")
export(os.path.join(folder,f"homepage.json"), await myapi.request("post", api._API_ENDPOINTS["homepage"],json={})) export(
print("Exporting site list...") os.path.join(folder, "homepage.json"),
export(os.path.join(folder,f"site_list.json"), await myapi.request("post", api._API_ENDPOINTS["site_list"],json={})) await myapi.request("post", api._API_ENDPOINTS["homepage"], json={}),
print("Exporting bind devices...") )
export(os.path.join(folder,f"bind_devices.json"), await myapi.request("post", api._API_ENDPOINTS["bind_devices"],json={})) # shows only owner devices CONSOLE.info("Exporting site list...")
print("Exporting user devices...") export(
export(os.path.join(folder,f"user_devices.json"), await myapi.request("post", api._API_ENDPOINTS["user_devices"],json={})) # shows only owner devices os.path.join(folder, "site_list.json"),
print("Exporting charging devices...") await myapi.request("post", api._API_ENDPOINTS["site_list"], json={}),
export(os.path.join(folder,f"charging_devices.json"), await myapi.request("post", api._API_ENDPOINTS["charging_devices"],json={})) # shows only owner devices )
print("Exporting auto upgrade settings...") CONSOLE.info("Exporting bind devices...")
export(os.path.join(folder,f"auto_upgrade.json"), await myapi.request("post", api._API_ENDPOINTS["get_auto_upgrade"],json={})) # shows only owner devices export(
os.path.join(folder, "bind_devices.json"),
await myapi.request(
"post", api._API_ENDPOINTS["bind_devices"], json={}
),
) # shows only owner devices
CONSOLE.info("Exporting user devices...")
export(
os.path.join(folder, "user_devices.json"),
await myapi.request(
"post", api._API_ENDPOINTS["user_devices"], json={}
),
) # shows only owner devices
CONSOLE.info("Exporting charging devices...")
export(
os.path.join(folder, "charging_devices.json"),
await myapi.request(
"post", api._API_ENDPOINTS["charging_devices"], json={}
),
) # shows only owner devices
CONSOLE.info("Exporting auto upgrade settings...")
export(
os.path.join(folder, "auto_upgrade.json"),
await myapi.request(
"post", api._API_ENDPOINTS["get_auto_upgrade"], json={}
),
) # shows only owner devices
for siteId, site in myapi.sites.items(): for siteId, site in myapi.sites.items():
print(f"\nExporting site specific data for site {siteId}...") CONSOLE.info(f"\nExporting site specific data for site {siteId}...")
print("Exporting scene info...") CONSOLE.info("Exporting scene info...")
export(os.path.join(folder,f"scene_{randomize(siteId,'site_id')}.json"), await myapi.request("post", api._API_ENDPOINTS["scene_info"],json={"site_id": siteId})) export(
print("Exporting solar info...") os.path.join(folder, f"scene_{randomize(siteId,'site_id')}.json"),
await myapi.request(
"post",
api._API_ENDPOINTS["scene_info"],
json={"site_id": siteId},
),
)
CONSOLE.info("Exporting solar info...")
with suppress(Exception): with suppress(Exception):
export(os.path.join(folder,f"solar_info_{randomize(siteId,'site_id')}.json"), await myapi.request("post", api._API_ENDPOINTS["solar_info"],json={"site_id": siteId})) # PARAMETERS UNKNOWN, site id not sufficient export(
print("Exporting site detail...") os.path.join(
folder, f"solar_info_{randomize(siteId,'site_id')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["solar_info"],
json={"site_id": siteId},
),
) # PARAMETERS UNKNOWN, site id not sufficient
CONSOLE.info("Exporting site detail...")
admin = site.get("site_admin") admin = site.get("site_admin")
try: try:
export(os.path.join(folder,f"site_detail_{randomize(siteId,'site_id')}.json"), await myapi.request("post", api._API_ENDPOINTS["site_detail"],json={"site_id": siteId})) export(
except Exception as err: os.path.join(
folder, f"site_detail_{randomize(siteId,'site_id')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["site_detail"],
json={"site_id": siteId},
),
)
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
print("Exporting wifi list...") CONSOLE.info("Exporting wifi list...")
try: try:
export(os.path.join(folder,f"wifi_list_{randomize(siteId,'site_id')}.json"), await myapi.request("post", api._API_ENDPOINTS["wifi_list"],json={"site_id": siteId})) # works only for site owners export(
except Exception as err: os.path.join(
folder, f"wifi_list_{randomize(siteId,'site_id')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["wifi_list"],
json={"site_id": siteId},
),
) # works only for site owners
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
print("Exporting site price...") CONSOLE.info("Exporting site price...")
try: try:
export(os.path.join(folder,f"price_{randomize(siteId,'site_id')}.json"), await myapi.request("post", api._API_ENDPOINTS["get_site_price"],json={"site_id": siteId})) # works only for site owners export(
except Exception as err: os.path.join(
folder, f"price_{randomize(siteId,'site_id')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["get_site_price"],
json={"site_id": siteId},
),
) # works only for site owners
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
print("Exporting device parameter settings...") CONSOLE.info("Exporting device parameter settings...")
try: try:
export(os.path.join(folder,f"device_parm_{randomize(siteId,'site_id')}.json"), await myapi.request("post", api._API_ENDPOINTS["get_device_parm"],json={"site_id": siteId, "param_type": "4"})) # works only for site owners export(
except Exception as err: os.path.join(
folder, "device_parm_{randomize(siteId,'site_id')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["get_device_parm"],
json={"site_id": siteId, "param_type": "4"},
),
) # works only for site owners
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
for sn, device in myapi.devices.items(): for sn, device in myapi.devices.items():
print(f"\nExporting device specific data for device {device.get('name','')} SN {sn}...") CONSOLE.info(
siteId = device.get('site_id','') f"\nExporting device specific data for device {device.get('name','')} SN {sn}..."
admin = site.get('is_admin') )
print("Exporting power cutoff settings...") siteId = device.get("site_id", "")
admin = site.get("is_admin")
CONSOLE.info("Exporting power cutoff settings...")
try: try:
export(os.path.join(folder,f"power_cutoff_{randomize(sn,'_sn')}.json"), await myapi.request("post", api._API_ENDPOINTS["get_cutoff"],json={"site_id": siteId, "device_sn": sn})) # works only for site owners export(
except Exception as err: os.path.join(
folder, f"power_cutoff_{randomize(sn,'_sn')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["get_cutoff"],
json={"site_id": siteId, "device_sn": sn},
),
) # works only for site owners
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
print("Exporting fittings...") CONSOLE.info("Exporting fittings...")
try: try:
export(os.path.join(folder,f"device_fittings_{randomize(sn,'_sn')}.json"), await myapi.request("post", api._API_ENDPOINTS["get_device_fittings"],json={"site_id": siteId, "device_sn": sn})) # works only for site owners export(
except Exception as err: os.path.join(
folder, f"device_fittings_{randomize(sn,'_sn')}.json"
),
await myapi.request(
"post",
api._API_ENDPOINTS["get_device_fittings"],
json={"site_id": siteId, "device_sn": sn},
),
) # works only for site owners
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
print("Exporting load...") CONSOLE.info("Exporting load...")
try: try:
export(os.path.join(folder,f"device_load_{randomize(sn,'_sn')}.json"), await myapi.request("post", api._API_ENDPOINTS["get_device_load"],json={"site_id": siteId, "device_sn": sn})) # works only for site owners export(
except Exception as err: os.path.join(folder, f"device_load_{randomize(sn,'_sn')}.json"),
await myapi.request(
"post",
api._API_ENDPOINTS["get_device_load"],
json={"site_id": siteId, "device_sn": sn},
),
) # works only for site owners
except Exception:
if not admin: if not admin:
print("Query requires account of site owner!") CONSOLE.warning("Query requires account of site owner!")
print(f"\nCompleted export of Anker Solix system data for user {USER}") CONSOLE.info(
f"\nCompleted export of Anker Solix system data for user {USER}"
)
if RANDOMIZE: if RANDOMIZE:
print(f"Folder {os.path.abspath(folder)} contains the randomized JSON files. Pls check and update fields that may contain unrecognized personalized data.") CONSOLE.info(
print(f"Following trace or site IDs, SNs and MAC addresses have been randomized in files (from -> to):") f"Folder {os.path.abspath(folder)} contains the randomized JSON files. Pls check and update fields that may contain unrecognized personalized data."
print(json.dumps(RANDOMDATA, indent=2)) )
CONSOLE.info(
"Following trace or site IDs, SNs and MAC addresses have been randomized in files (from -> to):"
)
CONSOLE.info(json.dumps(RANDOMDATA, indent=2))
else: else:
print(f"Folder {os.path.abspath(folder)} contains the JSON files.") CONSOLE.info(
f"Folder {os.path.abspath(folder)} contains the JSON files."
)
return True return True
except Exception as err: except Exception as err:
print(f'{type(err)}: {err}') CONSOLE.info(f"{type(err)}: {err}")
return False return False
"""run async main""" # run async main
if __name__ == '__main__': if __name__ == "__main__":
try: try:
if not asyncio.run(main()): if not asyncio.run(main()):
print("Aborted!") CONSOLE.info("Aborted!")
except KeyboardInterrupt: except KeyboardInterrupt:
print("Aborted!") CONSOLE.info("Aborted!")
except Exception as err: except Exception as exception:
print(f'{type(err)}: {err}') CONSOLE.info(f"{type(exception)}: {exception}")

View File

@ -1,44 +1,55 @@
""" """Example exec module to use the Anker API for continously querying and displaying important solarbank parameters
Example exec module to use the Anker API for continously querying and displaying important solarbank parameters
This module will prompt for the Anker account details if not pre-set in the header. This module will prompt for the Anker account details if not pre-set in the header.
Upon successfull authentication, you will see the solarbank parameters displayed and refreshed at reqular interval. Upon successfull authentication, you will see the solarbank parameters displayed and refreshed at reqular interval.
Note: When the system owning account is used, more details for the solarbank can be queried and displayed. Note: When the system owning account is used, more details for the solarbank can be queried and displayed.
Attention: During executiion of this module, the used account cannot be used in the Anker App since it will be kicked out on each refresh. Attention: During executiion of this module, the used account cannot be used in the Anker App since it will be kicked out on each refresh.
""" """ # noqa: D205
import asyncio import asyncio
from aiohttp import ClientSession
from datetime import datetime, timedelta from datetime import datetime, timedelta
from getpass import getpass from getpass import getpass
import json, logging, sys, time, os import logging
import os
import sys
import time
from aiohttp import ClientSession
from api import api from api import api
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
_LOGGER.addHandler(logging.StreamHandler(sys.stdout)) _LOGGER.addHandler(logging.StreamHandler(sys.stdout))
#_LOGGER.setLevel(logging.DEBUG) # enable for debug output #_LOGGER.setLevel(logging.DEBUG) # enable for debug output
CONSOLE: logging.Logger = logging.getLogger("console")
CONSOLE.addHandler(logging.StreamHandler(sys.stdout))
CONSOLE.setLevel(logging.INFO)
# Optional default Anker Account credentials to be used # Optional default Anker Account credentials to be used
USER = "" USER = ""
PASSWORD = "" PASSWORD = ""
COUNTRY = "" COUNTRY = ""
REFRESH = 30 # refresh interval in seconds REFRESH = 30 # default refresh interval in seconds
def clearscreen(): def clearscreen():
"""Clear the terminal screen."""
if sys.stdin is sys.__stdin__: # check if not in IDLE shell if sys.stdin is sys.__stdin__: # check if not in IDLE shell
os.system("cls") if os.name == "nt" else os.system("clear") if os.name == "nt":
#print("\033[H\033[2J", end="") # ESC characters to clear screen, system independent? os.system("cls")
else:
os.system("clear")
#CONSOLE.info("\033[H\033[2J", end="") # ESC characters to clear terminal screen, system independent?
async def main() -> None: async def main() -> None:
global USER, PASSWORD, COUNTRY, REFRESH """Run Main routine to start Solarbank monitor in a loop."""
print("Solarbank Monitor:") global USER, PASSWORD, COUNTRY, REFRESH # noqa: PLW0603
CONSOLE.info("Solarbank Monitor:")
if USER == "": if USER == "":
print("\nEnter Anker Account credentials:") CONSOLE.info("\nEnter Anker Account credentials:")
USER = input("Username (email): ") USER = input("Username (email): ")
if USER == "": if USER == "":
return False return False
PASSWORD = getpass("Password: ") PASSWORD = getpass("Password: ")
if PASSWORD == "": if PASSWORD == "":
return False return False
COUNTRY = input("Country ID (e.g. DE): ") COUNTRY = input("Country ID (e.g. DE): ")
@ -46,12 +57,13 @@ async def main() -> None:
return False return False
try: try:
async with ClientSession() as websession: async with ClientSession() as websession:
print("\nTrying authentication...",end="") CONSOLE.info("\nTrying authentication...")
myapi = api.API(USER,PASSWORD,COUNTRY,websession, _LOGGER) myapi = api.AnkerSolixApi(USER,PASSWORD,COUNTRY,websession, _LOGGER)
if await myapi.async_authenticate(): if await myapi.async_authenticate():
print("OK") CONSOLE.info("OK")
else: else:
print("CACHED") # Login validation will be done during first API call # Login validation will be done during first API call
CONSOLE.info("CACHED")
while True: while True:
resp = input(f"\nHow many seconds refresh interval should be used? (10-600, default: {REFRESH}): ") resp = input(f"\nHow many seconds refresh interval should be used? (10-600, default: {REFRESH}): ")
@ -74,81 +86,81 @@ async def main() -> None:
t5 = 6 t5 = 6
t6 = 10 t6 = 10
while True: while True:
print("\n") CONSOLE.info("\n")
now = datetime.now().astimezone() now = datetime.now().astimezone()
if next_refr <= now: if next_refr <= now:
print("Running site refresh...") CONSOLE.info("Running site refresh...")
await myapi.update_sites() await myapi.update_sites()
next_refr = now + timedelta(seconds=REFRESH) next_refr = now + timedelta(seconds=REFRESH)
if next_dev_refr <= now: if next_dev_refr <= now:
print("Running device details refresh...") CONSOLE.info("Running device details refresh...")
await myapi.update_device_details() await myapi.update_device_details()
next_dev_refr = next_refr + timedelta(seconds=REFRESH*9) next_dev_refr = next_refr + timedelta(seconds=REFRESH*9)
schedules = {} schedules = {}
clearscreen() clearscreen()
print(f"Solarbank Monitor (refresh {REFRESH} s, details refresh {10*REFRESH} s):") CONSOLE.info(f"Solarbank Monitor (refresh {REFRESH} s, details refresh {10*REFRESH} s):")
print(f"Sites: {len(myapi.sites)}, Devices: {len(myapi.devices)}") CONSOLE.info(f"Sites: {len(myapi.sites)}, Devices: {len(myapi.devices)}")
for sn, dev in myapi.devices.items(): for sn, dev in myapi.devices.items():
devtype = dev.get('type','unknown') devtype = dev.get('type','unknown')
admin = dev.get('is_admin',False) admin = dev.get('is_admin',False)
print(f"{'Device':<{col1}}: {(dev.get('name','NoName')):<{col2}} (Admin: {'YES' if admin else 'NO'})") CONSOLE.info(f"{'Device':<{col1}}: {(dev.get('name','NoName')):<{col2}} (Admin: {'YES' if admin else 'NO'})")
print(f"{'SN':<{col1}}: {sn}") CONSOLE.info(f"{'SN':<{col1}}: {sn}")
print(f"{'PN':<{col1}}: {dev.get('pn','')}") CONSOLE.info(f"{'PN':<{col1}}: {dev.get('pn','')}")
print(f"{'Type':<{col1}}: {devtype.capitalize()}") CONSOLE.info(f"{'Type':<{col1}}: {devtype.capitalize()}")
if devtype == "solarbank": if devtype == "solarbank":
siteid = dev.get('site_id','') siteid = dev.get('site_id','')
print(f"{'Site ID':<{col1}}: {siteid}") CONSOLE.info(f"{'Site ID':<{col1}}: {siteid}")
online = dev.get('wifi_online') online = dev.get('wifi_online')
print(f"{'Wifi state':<{col1}}: {('Unknown' if online == None else 'Online' if online else 'Offline'):<{col2}} (Charging Status: {dev.get('charging_status','')})") CONSOLE.info(f"{'Wifi state':<{col1}}: {('Unknown' if online is None else 'Online' if online else 'Offline'):<{col2}} (Charging Status: {dev.get('charging_status','')})")
upgrade = dev.get('auto_upgrade') upgrade = dev.get('auto_upgrade')
print(f"{'SW Version':<{col1}}: {dev.get('sw_version','Unknown'):<{col2}} (Auto-Upgrade: {'Unknown' if upgrade == None else 'Enabled' if upgrade else 'Disabled'})") CONSOLE.info(f"{'SW Version':<{col1}}: {dev.get('sw_version','Unknown'):<{col2}} (Auto-Upgrade: {'Unknown' if upgrade is None else 'Enabled' if upgrade else 'Disabled'})")
soc = f"{dev.get('battery_soc','---'):>3} %" soc = f"{dev.get('battery_soc','---'):>3} %"
print(f"{'State Of Charge':<{col1}}: {soc:<{col2}} (Min SOC: {str(dev.get('power_cutoff','--'))+' %'})") CONSOLE.info(f"{'State Of Charge':<{col1}}: {soc:<{col2}} (Min SOC: {str(dev.get('power_cutoff','--'))+' %'})")
unit = dev.get('power_unit','W') unit = dev.get('power_unit','W')
print(f"{'Input Power':<{col1}}: {dev.get('input_power',''):>3} {unit}") CONSOLE.info(f"{'Input Power':<{col1}}: {dev.get('input_power',''):>3} {unit}")
print(f"{'Charge Power':<{col1}}: {dev.get('charging_power',''):>3} {unit}") CONSOLE.info(f"{'Charge Power':<{col1}}: {dev.get('charging_power',''):>3} {unit}")
print(f"{'Output Power':<{col1}}: {dev.get('output_power',''):>3} {unit}") CONSOLE.info(f"{'Output Power':<{col1}}: {dev.get('output_power',''):>3} {unit}")
preset = dev.get('set_output_power') preset = dev.get('set_output_power')
if not preset: if not preset:
preset = '---' preset = '---'
print(f"{'Output Preset':<{col1}}: {preset:>3} {unit}") CONSOLE.info(f"{'Output Preset':<{col1}}: {preset:>3} {unit}")
"""update schedule with device details refresh and print it""" # update schedule with device details refresh and print it
if admin: if admin:
if not schedules.get(sn) and siteid: if not schedules.get(sn) and siteid:
schedules.update({sn: await myapi.get_device_load(siteId=siteid,deviceSn=sn)}) schedules.update({sn: await myapi.get_device_load(siteId=siteid,deviceSn=sn)})
data = schedules.get(sn,{}) data = schedules.get(sn,{})
print(f"{'Schedule':<{col1}}: {now.strftime('%H:%M UTC %z'):<{col2}} (Current Preset: {data.get('current_home_load','')})") CONSOLE.info(f"{'Schedule':<{col1}}: {now.strftime('%H:%M UTC %z'):<{col2}} (Current Preset: {data.get('current_home_load','')})")
print(f"{'ID':<{t1}} {'Start':<{t2}} {'End':<{t3}} {'Discharge':<{t4}} {'Output':<{t5}} {'ChargePrio':<{t6}}") CONSOLE.info(f"{'ID':<{t1}} {'Start':<{t2}} {'End':<{t3}} {'Discharge':<{t4}} {'Output':<{t5}} {'ChargePrio':<{t6}}")
for slot in (data.get("home_load_data",{})).get("ranges",[]): for slot in (data.get("home_load_data",{})).get("ranges",[]):
enabled = slot.get('turn_on') enabled = slot.get('turn_on')
load = slot.get('appliance_loads',[]) load = slot.get('appliance_loads',[])
load = load[0] if len(load) > 0 else {} load = load[0] if len(load) > 0 else {}
print(f"{str(slot.get('id','')):>{t1}} {slot.get('start_time',''):<{t2}} {slot.get('end_time',''):<{t3}} {('---' if enabled == None else 'YES' if enabled else 'NO'):^{t4}} {str(load.get('power',''))+' W':>{t5}} {str(slot.get('charge_priority',''))+' %':>{t6}}") CONSOLE.info(f"{str(slot.get('id','')):>{t1}} {slot.get('start_time',''):<{t2}} {slot.get('end_time',''):<{t3}} {('---' if enabled is None else 'YES' if enabled else 'NO'):^{t4}} {str(load.get('power',''))+' W':>{t5}} {str(slot.get('charge_priority',''))+' %':>{t6}}")
else: else:
print(f"Not a Solarbank device, further details skipped") sys.stdoutf("Not a Solarbank device, further details skipped")
print("") CONSOLE.info("")
#print(json.dumps(myapi.devices, indent=2)) #CONSOLE.info(json.dumps(myapi.devices, indent=2))
for sec in range(0,REFRESH): for sec in range(0,REFRESH):
now = datetime.now().astimezone() now = datetime.now().astimezone()
if sys.stdin is sys.__stdin__: if sys.stdin is sys.__stdin__:
print(f"Site refresh: {int((next_refr-now).total_seconds()):>3} sec, Device details refresh: {int((next_dev_refr-now).total_seconds()):>3} sec (CTRL-C to abort)", end = "\r", flush=True) print(f"Site refresh: {int((next_refr-now).total_seconds()):>3} sec, Device details refresh: {int((next_dev_refr-now).total_seconds()):>3} sec (CTRL-C to abort)", end = "\r", flush=True) # noqa: T201
elif sec == 0: elif sec == 0:
# IDLE may be used and does not support cursor placement, skip time progress display # IDLE may be used and does not support cursor placement, skip time progress display
print(f"Site refresh: {int((next_refr-now).total_seconds()):>3} sec, Device details refresh: {int((next_dev_refr-now).total_seconds()):>3} sec (CTRL-C to abort)", end = "", flush=True) print(f"Site refresh: {int((next_refr-now).total_seconds()):>3} sec, Device details refresh: {int((next_dev_refr-now).total_seconds()):>3} sec (CTRL-C to abort)", end = "", flush=True) # noqa: T201
time.sleep(1) time.sleep(1)
return False return False
except Exception as err: except Exception as exception:
print(f'{type(err)}: {err}') CONSOLE.info(f'{type(exception)}: {exception}')
return False return False
"""run async main""" # run async main
if __name__ == '__main__': if __name__ == '__main__':
try: try:
if not asyncio.run(main()): if not asyncio.run(main()):
print("\nAborted!") CONSOLE.info("\nAborted!")
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nAborted!") CONSOLE.info("\nAborted!")
except Exception as err: except Exception as err:
print(f'{type(err)}: {err}') CONSOLE.info(f'{type(err)}: {err}')

View File

@ -1,103 +1,108 @@
""" """Example exec module to test the Anker API for various methods or direct endpoint requests with various parameters."""
Example exec module to test the Anker API for various methods or direct endpoint requests with various parameters.
"""
import asyncio import asyncio
from aiohttp import ClientSession
from datetime import datetime from datetime import datetime
from api import api, errors import json
import json, logging, sys import logging
import sys
from aiohttp import ClientSession
from api import api
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
_LOGGER.addHandler(logging.StreamHandler(sys.stdout)) _LOGGER.addHandler(logging.StreamHandler(sys.stdout))
#_LOGGER.setLevel(logging.DEBUG) # enable for detailed API output # _LOGGER.setLevel(logging.DEBUG) # enable for detailed API output
CONSOLE: logging.Logger = logging.getLogger("console")
CONSOLE.addHandler(logging.StreamHandler(sys.stdout))
CONSOLE.setLevel(logging.INFO)
async def main() -> None: async def main() -> None:
"""Create the aiohttp session and run the example.""" """Create the aiohttp session and run the example."""
print("Testing Solix API:") CONSOLE.info("Testing Solix API:")
try: try:
async with ClientSession() as websession: async with ClientSession() as websession:
myapi = api.API("username@domain.com","password","de",websession, _LOGGER) myapi = api.AnkerSolixApi(
"username@domain.com", "password", "de", websession, _LOGGER
)
#show login response # show login response
''' """
#new = await myapi.async_authenticate(restart=True) # enforce new login data from server #new = await myapi.async_authenticate(restart=True) # enforce new login data from server
new = await myapi.async_authenticate() # receive new or load cached login data new = await myapi.async_authenticate() # receive new or load cached login data
if new: if new:
print("Received Login response:") CONSOLE.info("Received Login response:")
else: else:
print("Cached Login response:") CONSOLE.info("Cached Login response:")
print(json.dumps(myapi._login_response, indent=2)) # show used login response for API reqests CONSOLE.info(json.dumps(myapi._login_response, indent=2)) # show used login response for API reqests
''' """
# test site api methods # test site api methods
await myapi.update_sites() await myapi.update_sites()
await myapi.update_device_details() await myapi.update_device_details()
print("System Overview:") CONSOLE.info("System Overview:")
print(json.dumps(myapi.sites, indent=2)) CONSOLE.info(json.dumps(myapi.sites, indent=2))
print("Device Overview:") CONSOLE.info("Device Overview:")
print(json.dumps(myapi.devices, indent=2)) CONSOLE.info(json.dumps(myapi.devices, indent=2))
# test api methods # test api methods
''' """
print(json.dumps(await myapi.get_site_list(), indent=2)) CONSOLE.info(json.dumps(await myapi.get_site_list(), indent=2))
print(json.dumps(await myapi.get_homepage(), indent=2)) CONSOLE.info(json.dumps(await myapi.get_homepage(), indent=2))
print(json.dumps(await myapi.get_bind_devices(), indent=2)) CONSOLE.info(json.dumps(await myapi.get_bind_devices(), indent=2))
print(json.dumps(await myapi.get_user_devices(), indent=2)) CONSOLE.info(json.dumps(await myapi.get_user_devices(), indent=2))
print(json.dumps(await myapi.get_charging_devices(), indent=2)) CONSOLE.info(json.dumps(await myapi.get_charging_devices(), indent=2))
print(json.dumps(await myapi.get_auto_upgrade(), indent=2)) CONSOLE.info(json.dumps(await myapi.get_auto_upgrade(), indent=2))
print(json.dumps(await myapi.get_scene_info(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2)) CONSOLE.info(json.dumps(await myapi.get_scene_info(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2))
print(json.dumps(await myapi.get_wifi_list(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2)) CONSOLE.info(json.dumps(await myapi.get_wifi_list(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2))
print(json.dumps(await myapi.get_solar_info(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2)) # json parameters unknown: site_id not sifficient, or works only with Anker Inverters? CONSOLE.info(json.dumps(await myapi.get_solar_info(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2)) # json parameters unknown: site_id not sifficient, or works only with Anker Inverters?
print(json.dumps(await myapi.get_device_parm(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",paramType="4"), indent=2)) CONSOLE.info(json.dumps(await myapi.get_device_parm(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",paramType="4"), indent=2))
print(json.dumps(await myapi.get_power_cutoff(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY"), indent=2)) CONSOLE.info(json.dumps(await myapi.get_power_cutoff(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY"), indent=2))
print(json.dumps(await myapi.get_device_load(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY"), indent=2)) CONSOLE.info(json.dumps(await myapi.get_device_load(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY"), indent=2))
print(json.dumps(await myapi.energy_analysis(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY",rangeType="week",startDay=datetime.fromisoformat("2023-10-10"),endDay=datetime.fromisoformat("2023-10-10"),devType="solar_production"), indent=2)) CONSOLE.info(json.dumps(await myapi.energy_analysis(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY",rangeType="week",startDay=datetime.fromisoformat("2023-10-10"),endDay=datetime.fromisoformat("2023-10-10"),devType="solar_production"), indent=2))
print(json.dumps(await myapi.energy_analysis(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY",rangeType="week",startDay=datetime.fromisoformat("2023-10-10"),endDay=datetime.fromisoformat("2023-10-10"),devType="solarbank"), indent=2)) CONSOLE.info(json.dumps(await myapi.energy_analysis(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY",rangeType="week",startDay=datetime.fromisoformat("2023-10-10"),endDay=datetime.fromisoformat("2023-10-10"),devType="solarbank"), indent=2))
print(json.dumps(await myapi.energy_daily(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY",startDay=datetime.fromisoformat("2024-01-10"),numDays=10), indent=2)) CONSOLE.info(json.dumps(await myapi.energy_daily(siteId="efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c",deviceSn="9JVB42LJK8J0P5RY",startDay=datetime.fromisoformat("2024-01-10"),numDays=10), indent=2))
print(json.dumps(await myapi.home_load_chart(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2)) CONSOLE.info(json.dumps(await myapi.home_load_chart(siteId='efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'), indent=2))
''' """
# test api endpoints directly # test api endpoints directly
''' """
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["homepage"],json={})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["homepage"],json={})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["site_list"],json={})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["site_list"],json={})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["bind_devices"],json={})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["bind_devices"],json={})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["user_devices"],json={})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["user_devices"],json={})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["charging_devices"],json={})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["charging_devices"],json={})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_auto_upgrade"],json={})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_auto_upgrade"],json={})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["site_detail"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["site_detail"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["wifi_list"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["wifi_list"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_site_price"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_site_price"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["solar_info"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2)) # json parameters unknown: site_id not sifficient, or works only with Anker Inverters? CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["solar_info"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2)) # json parameters unknown: site_id not sifficient, or works only with Anker Inverters?
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_cutoff"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "device_sn": "9JVB42LJK8J0P5RY"})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_cutoff"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "device_sn": "9JVB42LJK8J0P5RY"})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_device_fittings"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "device_sn": "9JVB42LJK8J0P5RY"})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_device_fittings"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "device_sn": "9JVB42LJK8J0P5RY"})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_device_load"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "device_sn": "9JVB42LJK8J0P5RY"})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_device_load"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "device_sn": "9JVB42LJK8J0P5RY"})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_device_parm"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "param_type": "4"})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["get_device_parm"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c', "param_type": "4"})), indent=2))
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["compatible_process"],json={})), indent=2)) # json parameters unknown CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["compatible_process"],json={})), indent=2)) # json parameters unknown
print(json.dumps((await myapi.request("post", api._API_ENDPOINTS["home_load_chart"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2)) CONSOLE.info(json.dumps((await myapi.request("post", api._API_ENDPOINTS["home_load_chart"],json={"site_id": 'efaca6b5-f4a0-e82e-3b2e-6b9cf90ded8c'})), indent=2))
''' """
# test api from json files # test api from json files
''' """
myapi.testDir("examples") myapi.testDir("examples")
await myapi.update_sites(fromFile=True) await myapi.update_sites(fromFile=True)
await myapi.update_device_details(fromFile=True) await myapi.update_device_details(fromFile=True)
print(json.dumps(myapi.sites,indent=2)) CONSOLE.info(json.dumps(myapi.sites,indent=2))
print(json.dumps(myapi.devices,indent=2)) CONSOLE.info(json.dumps(myapi.devices,indent=2))
''' """
except Exception as err: except Exception as exception:
print(f'{type(err)}: {err}') CONSOLE.info(f"{type(exception)}: {exception}")
# run async main
"""run async main""" if __name__ == "__main__":
if __name__ == '__main__':
try: try:
asyncio.run(main()) asyncio.run(main())
except Exception as err: except Exception as err:
print(f'{type(err)}: {err}') CONSOLE.info(f"{type(err)}: {err}")