Skip to content

Commit

Permalink
Merge pull request #63 from jasonacox/token
Browse files Browse the repository at this point in the history
Add Bearer Token Auth
  • Loading branch information
jasonacox authored Jan 4, 2024
2 parents 20e4ec0 + 433f43e commit ceabac5
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 32 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ and call function to poll data. Here is an example:
Classes
Powerwall(host, password, email, timezone, pwcacheexpire, timeout, poolmaxsize,
cloudmode, siteid, authpath)
cloudmode, siteid, authpath, authmode)
Parameters
host # Hostname or IP of the Tesla gateway
Expand All @@ -136,6 +136,7 @@ and call function to poll data. Here is an example:
cloudmode = False # If True, use Tesla cloud for data (default is False)
siteid # If cloudmode is True, use this siteid (default is None)
authpath # Path to cloud auth and site cache files (default is "")
authmode = "cookie" # "cookie" (default) or "token" - use cookie or bearer token for auth
Functions
poll(api, json, force) # Return data from Powerwall api (dict if json=True, bypass cache force=True)
Expand Down
17 changes: 17 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# RELEASE NOTES

## v0.7.4 - Bearer Token Auth

pyPowerwall Updates
* This release adds the ability to use a Bearer Token for Authentication for the local Powerwall gateway API calls. This is selectable by defining `authmode='token'` in the initialization. The default mode uses the existing `AuthCookie` and `UserRecord` method.

```python
import pypowerwall

pw = pypowerwall.Powerwall(HOST, PASSWORD, EMAIL, TIMEZONE, authmode="token")
```

Proxy
* The above option is extended to the pyPowerwall Proxy via the envrionmental variable `PW_AUTH_MODE` set to cookie (default) or token.

Powerwall Network Scanner
* Added optional IP address argument to network scanner by @mcbirse in https://github.com/jasonacox/pypowerwall/pull/63. The Scan Function can now accept an additional argument `-ip=` to override the host IP address detection (`python -m pypowerwall scan -ip=192.168.1.100`). This may be useful where the host IP address/network cannot be detected correctly, for instance if pypowerwall is running inside a container.

## v0.7.3 - Cloud Mode Setup

* Setup will now check for `PW_AUTH_PATH` environmental variable to set the path for `.pypowerwall.auth` and `.pypowerwall.site` by @mcbirse in https://github.com/jasonacox/pypowerwall/pull/62
Expand Down
40 changes: 29 additions & 11 deletions proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import ssl
from transform import get_static, inject_js

BUILD = "t37"
BUILD = "t38"
ALLOWLIST = [
'/api/status', '/api/site_info/site_name', '/api/meters/site',
'/api/meters/solar', '/api/sitemaster', '/api/powerwalls',
Expand Down Expand Up @@ -72,6 +72,7 @@
style = os.getenv("PW_STYLE", "clear") + ".js"
siteid = os.getenv("PW_SITEID", None)
authpath = os.getenv("PW_AUTH_PATH", "")
authmode = os.getenv("PW_AUTH_MODE", "cookie")

# Global Stats
proxystats = {}
Expand Down Expand Up @@ -135,7 +136,8 @@ def get_value(a, key):
# TODO: Add support for multiple Powerwalls
try:
pw = pypowerwall.Powerwall(host,password,email,timezone,cache_expire,
timeout,pool_maxsize,siteid=siteid,authpath=authpath)
timeout,pool_maxsize,siteid=siteid,
authpath=authpath,authmode=authmode)
except Exception as e:
log.error(e)
log.error("Fatal Error: Unable to connect. Please fix config and restart.")
Expand Down Expand Up @@ -222,6 +224,7 @@ def do_GET(self):
if pw.cloudmode and pw.Tesla is not None:
proxystats['siteid'] = pw.Tesla.siteid
proxystats['counter'] = pw.Tesla.counter
proxystats['authmode'] = pw.authmode
message = json.dumps(proxystats)
elif self.path == '/stats/clear':
# Clear Internal Stats
Expand Down Expand Up @@ -334,6 +337,7 @@ def do_GET(self):
if pw.cloudmode and pw.Tesla is not None:
proxystats['siteid'] = pw.Tesla.siteid
proxystats['counter'] = pw.Tesla.counter
proxystats['authmode'] = pw.authmode
contenttype = 'text/html'
message = '<html>\n<head><meta http-equiv="refresh" content="5" />\n'
message += '<style>p, td, th { font-family: Helvetica, Arial, sans-serif; font-size: 10px;}</style>\n'
Expand All @@ -355,8 +359,13 @@ def do_GET(self):
else:
# Everything else - Set auth headers required for web application
proxystats['gets'] = proxystats['gets'] + 1
self.send_header("Set-Cookie", "AuthCookie={};{}".format(pw.auth['AuthCookie'], cookiesuffix))
self.send_header("Set-Cookie", "UserRecord={};{}".format(pw.auth['UserRecord'], cookiesuffix))
if pw.authmode == "token":
# Create bogus cookies
self.send_header("Set-Cookie", "AuthCookie=1234567890;{}".format(cookiesuffix))
self.send_header("Set-Cookie", "UserRecord=1234567890;{}".format(cookiesuffix))
else:
self.send_header("Set-Cookie", "AuthCookie={};{}".format(pw.auth['AuthCookie'], cookiesuffix))
self.send_header("Set-Cookie", "UserRecord={};{}".format(pw.auth['UserRecord'], cookiesuffix))

# Serve static assets from web root first, if found.
if self.path == "/" or self.path == "":
Expand Down Expand Up @@ -388,13 +397,22 @@ def do_GET(self):
proxy_path = proxy_path[1:]
pw_url = "https://{}/{}".format(pw.host, proxy_path)
log.debug("Proxy request to: {}".format(pw_url))
r = pw.session.get(
url=pw_url,
cookies=pw.auth,
verify=False,
stream=True,
timeout=pw.timeout
)
if pw.authmode == "token":
r = pw.session.get(
url=pw_url,
headers=pw.auth,
verify=False,
stream=True,
timeout=pw.timeout
)
else:
r = pw.session.get(
url=pw_url,
cookies=pw.auth,
verify=False,
stream=True,
timeout=pw.timeout
)
fcontent = r.content
ftype = r.headers['content-type']

Expand Down
54 changes: 42 additions & 12 deletions pypowerwall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
* Will cache responses for 5s to limit number of calls to Powerwall Gateway
* Will re-use http connections to Powerwall Gateway for reduced load and faster response times
* Can use Tesla Cloud API instead of local Powerwall Gateway (if enabled)
* Uses Auth Cookie or Bearer Token for authorization (configurable)
Classes
Powerwall(host, password, email, timezone, pwcacheexpire, timeout, poolmaxsize,
cloudmode, siteid, authpath)
cloudmode, siteid, authpath, authmode)
Parameters
host # Hostname or IP of the Tesla gateway
Expand All @@ -30,6 +31,7 @@
cloudmode = False # If True, use Tesla cloud for data (default is False)
siteid = None # If cloudmode is True, use this siteid (default is None)
authpath = "" # Path to cloud auth and site files (default current directory)
authmode = "cookie" # "cookie" (default) or "token" - use cookie or bearer token for auth
Functions
poll(api, json, force) # Return data from Powerwall api (dict if json=True, bypass cache force=True)
Expand Down Expand Up @@ -72,7 +74,7 @@
from . import tesla_pb2 # Protobuf definition for vitals
from . import cloud # Tesla Cloud API

version_tuple = (0, 7, 3)
version_tuple = (0, 7, 4)
version = __version__ = '%d.%d.%d' % version_tuple
__author__ = 'jasonacox'

Expand Down Expand Up @@ -101,7 +103,7 @@ class ConnectionError(Exception):
class Powerwall(object):
def __init__(self, host="", password="", email="nobody@nowhere.com",
timezone="America/Los_Angeles", pwcacheexpire=5, timeout=5, poolmaxsize=10,
cloudmode=False, siteid=None, authpath=""):
cloudmode=False, siteid=None, authpath="", authmode="cookie"):
"""
Represents a Tesla Energy Gateway Powerwall device.
Expand All @@ -117,6 +119,7 @@ def __init__(self, host="", password="", email="nobody@nowhere.com",
cloudmode = If True, use Tesla cloud for data (default is False)
siteid = If cloudmode is True, use this siteid (default is None)
authpath = Path to cloud auth and site cache files (default current directory)
authmode = "cookie" (default) or "token" - use cookie or bearer token for authorization
"""

Expand All @@ -128,14 +131,16 @@ def __init__(self, host="", password="", email="nobody@nowhere.com",
self.timezone = timezone
self.timeout = timeout # 5s timeout for http calls
self.poolmaxsize = poolmaxsize # pool max size for http connection re-use
self.auth = {} # caches authentication cookies
self.auth = {} # caches auth cookies
self.token = None # caches bearer token
self.pwcachetime = {} # holds the cached data timestamps for api
self.pwcache = {} # holds the cached data for api
self.pwcacheexpire = pwcacheexpire # seconds to expire cache
self.cloudmode = cloudmode # cloud mode or local mode (default)
self.siteid = siteid # siteid for cloud mode
self.authpath = authpath # path to auth and site cache files
self.Tesla = None # cloud object for cloud connection
self.authmode = authmode # cookie or token

# Check for cloud mode
if self.cloudmode or self.host == "":
Expand All @@ -158,11 +163,24 @@ def __init__(self, host="", password="", email="nobody@nowhere.com",
else:
# Disable http persistent connections
self.session = requests
# Enforce authmode
if self.authmode not in ['cookie', 'token']:
log.debug("Invalid value for parameter 'authmode' (%s) switching to default" % str(self.authmode))
self.authmode = 'cookie'
# Load cached auth session
try:
f = open(self.cachefile, "r")
self.auth = json.load(f)
log.debug('loaded auth from cache file %s' % self.cachefile)
# Check to see if we have a valid cached session for the mode
if self.authmode == "token":
if 'Authorization' in self.auth:
self.token = self.auth['Authorization'].split(' ')[1]
else:
self.auth = {}
else:
if 'AuthCookie' not in self.auth or 'UserRecord' not in self.auth:
self.auth = {}
log.debug('loaded auth from cache file %s (%s authmode)' % (self.cachefile, self.authmode))
except:
log.debug('no auth cache file')
pass
Expand All @@ -185,7 +203,11 @@ def _get_session(self):

# Save Auth cookies
try:
self.auth = {'AuthCookie': r.cookies['AuthCookie'], 'UserRecord': r.cookies['UserRecord']}
if self.authmode == "token":
self.token = r.json()['token']
self.auth = {'Authorization': 'Bearer ' + self.token}
else:
self.auth = {'AuthCookie': r.cookies['AuthCookie'], 'UserRecord': r.cookies['UserRecord']}
try:
f = open(self.cachefile, "w")
json.dump(self.auth,f)
Expand All @@ -202,7 +224,11 @@ def _close_session(self):
self.Tesla.logout()
return
url = "https://%s/api/logout" % self.host
g = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout)
if self.authmode == "token":
g = self.session.get(url, headers=self.auth, verify=False, timeout=self.timeout)
else:
g = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout)

self.auth = {}

def is_connected(self):
Expand Down Expand Up @@ -251,15 +277,15 @@ def poll(self, api='/api/site_info/site_name', jsonformat=False, raw=False, recu

if(fetch):
if(api == '/api/devices/vitals'):
# Always want the raw output from the vitals call; protobuf binary payload
# Always want the raw stream output from the vitals call; protobuf binary payload
raw = True

url = "https://%s%s" % (self.host, api)
try:
if(raw):
r = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout, stream=True)
if self.authmode == "token":
r = self.session.get(url, headers=self.auth, verify=False, timeout=self.timeout, stream=raw)
else:
r = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout)
r = self.session.get(url, cookies=self.auth, verify=False, timeout=self.timeout, stream=raw)
except requests.exceptions.Timeout:
log.debug('ERROR Timeout waiting for Powerwall API %s' % url)
return None
Expand All @@ -271,11 +297,15 @@ def poll(self, api='/api/site_info/site_name', jsonformat=False, raw=False, recu
return None
if r.status_code >= 400 and r.status_code < 500:
# Session Expired - Try to get a new one unless we already tried
log.debug('Session Expired - Trying to get a new one')
if(not recursive):
if raw:
# Drain the stream before retrying
payload = r.raw.data
self._get_session()
return self.poll(api, jsonformat, raw, True)
else:
log.debug('ERROR Unable to establish session with Powerwall at %s - check password' % url)
log.error('Unable to establish session with Powerwall at %s - check password' % url)
return None
if(raw):
payload = r.raw.data
Expand Down
6 changes: 5 additions & 1 deletion pypowerwall/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
timeout = 1.0
state = 0
color = True
ip = None

for i in sys.argv:
if(i==sys.argv[0]):
Expand All @@ -34,6 +35,8 @@
state = 1
elif(i.lower() == "-nocolor"):
color = False
elif(i.lower()[0:4] == "-ip="):
ip = i[4:]
else:
try:
timeout = float(i)
Expand All @@ -42,7 +45,7 @@

# State 0 = Run Scan
if(state == 0):
scan.scan(color, timeout)
scan.scan(color, timeout, ip)

# State 1 = Cloud Mode Setup
if(state == 1):
Expand All @@ -65,6 +68,7 @@
print(" command = setup Setup Tesla Login for Cloud Mode access.")
print(" timeout Seconds to wait per host [Default=%0.1f]" % (timeout))
print(" -nocolor Disable color text output.")
print(" -ip=<ip> (Scan option) IP address within network to scan.")
print(" -h Show usage.")
print("")

Expand Down
11 changes: 6 additions & 5 deletions pypowerwall/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ def setup(self):
while True:
response = input("\n Email address: ").strip()
if "@" not in response:
print(" - Error: Invalid email address\n")
print(" - Error: Invalid email address")
else:
tuser = response
break
Expand All @@ -852,7 +852,7 @@ def setup(self):
code_verifier = tesla.new_code_verifier()

try:
print("Open the below address in your browser to login.\n")
print("\nOpen the below address in your browser to login.\n")
print(tesla.authorization_url(state=state, code_verifier=code_verifier))
except Exception as err:
log.error(f"Connection failure - {repr(err)}")
Expand Down Expand Up @@ -931,6 +931,7 @@ def setup(self):

# Test code
set_debug(False)
tuser = None
# Check for .pypowerwall.auth file
if os.path.isfile(AUTHFILE):
# Read the json file
Expand Down Expand Up @@ -961,9 +962,9 @@ def setup(self):

print("Connected to Tesla Cloud")

#print("\nSite Data")
#sites = cloud.getsites()
#print(sites)
print("\nSite Data")
sites = cloud.getsites()
print(sites)

#print("\Battery")
#r = cloud.get_battery()
Expand Down
5 changes: 3 additions & 2 deletions pypowerwall/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def getmyIP():
s.close()
return r

def scan(color=True, timeout=0.4):
def scan(color=True, timeout=0.4, ip=None):
"""
pyPowerwall Network Scanner
Expand All @@ -62,7 +62,8 @@ def scan(color=True, timeout=0.4):

# Fetch my IP address and assume /24 network
try:
ip = getmyIP()
if ip is None:
ip = getmyIP()
network = ipaddress.IPv4Interface(u''+ip+'/24').network
except:
print(alert + 'ERROR: Unable to get your IP address and network automatically.' + normal)
Expand Down

0 comments on commit ceabac5

Please sign in to comment.