project jira-projects / Miscellaneous avatar

jira-projects/MISC#2: Try running InfluxDB Client in Pyodide



Issue Information

Issue Type: issue
Status: closed
Reported By: btasker
Assigned To: btasker

Created: 05-May-22 16:59



Description

I saw a post on running Python in HTML pages earlier and wanted to fiddle about with it.

What I wanted to try, was using pyodide to run Python in the browser, load the Python InfluxDB Client and connect back to cloud2



Toggle State Changes

Activity


assigned to @btasker

assigned to @btasker

There's a web console available at https://pyodide.org/en/stable/console.html which can be used to test.

Pyodide has micropip support, so the first thing to do is to try and install the client

import micropip
micropip.install("influxdb-client")

That worked.

Next thing to do is to draft the python script that we want to run

import influxdb_client

bucket = "Systemstats"
org = "<my_org>"
token = "<my_token>"
url = "https://eu-central-1-1.aws.cloud2.influxdata.com/"

client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)


query_api = client.query_api()
query = """
from(bucket: "Systemstats")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "power_watts")
  |> filter(fn: (r) => r["_field"] == "consumption")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
"""
result = query_api.query(org=org, query=query)
results = []
for table in result:
  for record in table.records:
    results.append((record.get_field(), record.get_value()))

print(results)

Giving that a go, fails

Screenshot_20220505_180305

Text of that traceback is

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/lib/python3.10/site-packages/influxdb_client/client/influxdb_client.py", line 58, in __init__
    from .._sync.api_client import ApiClient
  File "/lib/python3.10/site-packages/influxdb_client/_sync/api_client.py", line 16, in <module>
    from multiprocessing.pool import ThreadPool
  File "/lib/python3.10/multiprocessing/pool.py", line 30, in <module>
    from .connection import wait
  File "/lib/python3.10/multiprocessing/connection.py", line 21, in <module>
    import _multiprocessing
ModuleNotFoundError: No module named '_multiprocessing'

Pyodide issue 1603 refers to this

A workaround is to monkypatch the _multiprocessing module:

We're not going to use multiprocessing mode (as we now know it can't be supported), so let's patch so that the import doesn't fail

import influxdb_client

# Pyodide doesn't support multiprocessing, create a dummy object so imports
# don't throw exceptions
import sys
sys.modules['_multiprocessing'] = object


bucket = "Systemstats"
org = "<my_org>"
token = "<my_token>"
url = "https://eu-central-1-1.aws.cloud2.influxdata.com/"

client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)


query_api = client.query_api()
query = """
from(bucket: "Systemstats")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "power_watts")
  |> filter(fn: (r) => r["_field"] == "consumption")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
"""
result = query_api.query(org=org, query=query)
results = []
for table in result:
  for record in table.records:
    results.append((record.get_field(), record.get_value()))

print(results)

We progress onto the next error

Screenshot_20220505_180727

Text:

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/lib/python3.10/site-packages/influxdb_client/client/influxdb_client.py", line 58, in __init__
    from .._sync.api_client import ApiClient
  File "/lib/python3.10/site-packages/influxdb_client/_sync/api_client.py", line 27, in <module>
    from influxdb_client._sync import rest
  File "/lib/python3.10/site-packages/influxdb_client/_sync/rest.py", line 19, in <module>
    import ssl
  File "/lib/python3.10/ssl.py", line 98, in <module>
    import _ssl             # if we can't import it, let the error propagate
ModuleNotFoundError: No module named '_ssl'

We need to install the SSL module

Installing the deps we need

import micropip
micropip.install("influxdb-client")
micropip.install("ssl")

The script itself stays the same as before

import influxdb_client

# Pyodide doesn't support multiprocessing, create a dummy object so imports
# don't throw exceptions
import sys
sys.modules['_multiprocessing'] = object


bucket = "Systemstats"
org = "<my_org>"
token = "<my_token>"
url = "https://eu-central-1-1.aws.cloud2.influxdata.com/"

client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)


query_api = client.query_api()
query = """
from(bucket: "Systemstats")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "power_watts")
  |> filter(fn: (r) => r["_field"] == "consumption")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
"""
result = query_api.query(org=org, query=query)
results = []
for table in result:
  for record in table.records:
    results.append((record.get_field(), record.get_value()))

print(results)

We hit our next (and unfortunately, final) error

Screenshot_20220505_181217

The text of that traceback is

Traceback (most recent call last):
  File "/lib/python3.10/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
  File "/lib/python3.10/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/lib/python3.10/site-packages/urllib3/util/connection.py", line 79, in create_connection
    _set_socket_options(sock, socket_options)
  File "/lib/python3.10/site-packages/urllib3/util/connection.py", line 105, in _set_socket_options
    sock.setsockopt(*opt)
OSError: [Errno 50] Protocol not available

Pyodide Issue 375 explains why we're getting this message.

Networking in Python does not work, since it relies on low-level networking primitives (socket) that aren't allowed in the browser sandbox.

So, we can't easily proceed with this - although urllib3 can be installed in Pyodide, it's not got the ability to connect out to anything.

In theory it should be possible to patch things so that the relevant calls instead trigger the appropriate Javascript calls (fetch or xmlhttp), but that's further down the rabbit hole than I currently want to go (though it looks like Pyodide exposes a function called pyodide.open_url which can be used for external accesses).

FWIW, it looks like pyodide.http.pyfetch() could be used for this

Reopening as it seems this is an itch my brain wants to scratch

So, what I want to do is overload a function into influxdb_client so that we end up calling pyfetch rather than urllib.

We start with the basic boilerplate

import influxdb_client

bucket = "Systemstats"
org = "<my_org>"
token = "<my_token>"
url = "https://eu-central-1-1.aws.cloud2.influxdata.com/"

client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)

query_api = client.query_api()
q = "foo"

Then we define a function to check that we can overlay and set it

def new_query(self, query: str, org=None, params: dict = None):
    return "ben's playing with fire"

query_api.query  = new_query

Finally, we call it (we need to pass a copy of the class to itself, as I haven't figured out how to get self to work with an overlayed method)

>>> query_api.query(query_api, q)
"ben's playing with fire"

Marvellous...

So what we need to do, is write an implementation of query() which takes the info it needs.

The other option (and arguably the proper way to do it) would be to create a class which inherits from QueryApi and overrides the method, but that's much less fun and the aim of this ticket isn't really to be productive

We need our method to be able to authenticate it's HTTP requests, so we need to find the following

  • Org
  • Auth token
  • url

Could just change the method signature to allow them to be passed in... but that's cheating.

If we poke down to here we see some auth related stuff, but if we call it

>>> query_api._query_api.api_client.configuration.auth_settings()
{'BasicAuthentication': {'type': 'basic', 'in': 'header', 'key': 'Authorization', 'value': 'Basic Og=='}, 'TokenAuthentication': {'type': 'api_key', 'in': 'header', 'key': 'Authorization', 'value': None}}

We don't get our token back. Let's poke in Python instead

>>> dir(query_api)
['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_build_flux_ast', '_create_query', '_get_query_options', '_influxdb_client', '_org_param', '_params_to_extern_ast', '_parm_to_extern_ast', '_query_api', '_query_options', '_to_csv', '_to_data_frame_stream', '_to_data_frame_stream_async', '_to_data_frame_stream_parser', '_to_data_frames', '_to_flux_record_stream', '_to_flux_record_stream_async', '_to_flux_record_stream_parser', '_to_tables', '_to_tables_async', '_to_tables_parser', 'default_dialect', 'query', 'query_csv', 'query_data_frame', 'query_data_frame_stream', 'query_raw', 'query_stream']

Of those, _influxdb_client looks most likely to have auth details in it somewhere

>>> dir(query_api._influxdb_client)
['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_from_config_file', '_from_env_properties', '_version', 'api_client', 'auth_header_name', 'auth_header_value', 'authorizations_api', 'buckets_api', 'close', 'conf', 'default_tags', 'delete_api', 'from_config_file', 'from_env_properties', 'health', 'invocable_scripts_api', 'labels_api', 'org', 'organizations_api', 'ping', 'profilers', 'query_api', 'ready', 'retries', 'tasks_api', 'token', 'url', 'users_api', 'version', 'write_api']

We have some interesting attributes

>>> query_api._influxdb_client.auth_header_value
'Token <my_token>'
>>> query_api._influxdb_client.token
'<my_token>'
>>> query_api._influxdb_client.url
'https://eu-central-1-1.aws.cloud2.influxdata.com/'
>>> query_api._influxdb_client.org
'<my_org>'

So, we now have the bits we need to start to construct the request

So, there's a little bit of a headache here, because pyfetch is asynchronous.

But, from a quick scan, it's the only exposed call which allows us to pass arbitrary args in - pyodide.open_url just accepts a URL

We can trigger a call using custom headers with

import pyodide
pyodide.http.pyfetch(url="https://pfanalytics.bentasker.co.uk/bentest", method="POST", headers={"X-ff": "bar"})

So, we need to translate that into building a request to send across to the cloud service

import pyodide
import micropip
micropip.install("influxdb-client")
micropip.install("ssl")

# Pyodide doesn't support multiprocessing, create a dummy object so imports
# don't throw exceptions
import sys
sys.modules['_multiprocessing'] = object

import influxdb_client

# Define our override
async def new_query(self, query: str, org=None, params: dict = None):
    # We'll come back to this
    #q = self._create_query(query, self.default_dialect, params)
    #print(q)    
    headers = {
        "Authorization" : "Token " + self._influxdb_client.token,
        "content-type" : "application/vnd.flux"
    }
    url = self._influxdb_client.url  + "api/v2/query?org=" + self._influxdb_client.org
    a = await pyodide.http.pyfetch(url=url, 
                             method="POST", 
                             headers=headers,
                             body=query
                             )
    return a



bucket = "Systemstats"
org = "<my_org>"
token = "<my_token>"
url = "https://eu-central-1-1.aws.cloud2.influxdata.com/"

client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)

query_api = client.query_api()
q = """
from(bucket: "Systemstats")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "power_watts")
  |> filter(fn: (r) => r["_field"] == "consumption")
  |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
"""

# Overlay the function
query_api.query  = new_query  
r = await query_api.query(query_api,q)

We're most of the way there, cloud has sent back an annotated CSV.

Currently r is of type pyodide.http.FetchResponse: we "just" need to have our overlaid method extract the body from that and then call the client's method to translate into tables.

My intention had been to pull as type bytes (literally r.bytes()) and passing into _to_tables as happens here but, digging down the call chain it looks like FluxCsvParser is expecting a Response object rather than string/bytes.

I tried it anyway, just in case, but no dice

r = await query_api.query(query_api,q)
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "<console>", line 15, in new_query
  File "/lib/python3.10/site-packages/influxdb_client/client/_base.py", line 179, in _to_tables
    list(_parser.generator())
  File "/lib/python3.10/site-packages/influxdb_client/client/flux_csv_parser.py", line 108, in generator
    for val in parser._parse_flux_response():
  File "/lib/python3.10/site-packages/influxdb_client/client/flux_csv_parser.py", line 118, in _parse_flux_response
    for csv in self._reader:
  File "/lib/python3.10/codecs.py", line 1053, in iterdecode
    for input in iterator:
TypeError: 'coroutine' object is not iterable

So, looks like we need to initialise FluxCSVParser ourself and try and skip past the reader bits

from influxdb_client.client.flux_csv_parser import FluxResponseMetadataMode, FluxCsvParser, FluxSerializationMode, _FluxCsvParserMetadata
from io import StringIO
import csv 

async def new_query(self, query: str, org=None, params: dict = None):
    # We'll come back to this
    #q = self._create_query(query, self.default_dialect, params)
    #print(q)    
    headers = {
        "Authorization" : "Token " + self._influxdb_client.token,
        "content-type" : "application/vnd.flux"
    }
    url = self._influxdb_client.url  + "api/v2/query?org=" + self._influxdb_client.org
    a = await pyodide.http.pyfetch(url=url, 
                             method="POST", 
                             headers=headers,
                             body=query
                             )
    response = await a.string()
    # The client would call _parser.generator here, but that calls _parse_flux_response() which'll treat
    # us as a readable object, so we implement around itt
    _parser = self._to_tables_parser(response, self._get_query_options(), FluxResponseMetadataMode.only_names)
    list(self.generator(self, response, _parser))
    return _parser.table_list()


def generator(self, csv, _parser):
    for val in self._parse_flux_response(self, csv, _parser):
        yield val

def _parse_flux_response(self, csv_str, _parser):
    metadata = _FluxCsvParserMetadata()
    f = StringIO(csv_str)
    rows = csv.reader(f)
    print(rows)
    for row in rows:
        print(row)
        for val in _parser._parse_flux_response_row(metadata, row):
            print(val)
            yield val


query_api.query  = new_query  
query_api.generator = generator
query_api._parse_flux_response = _parse_flux_response

r = await query_api.query(query_api,q)


results = []
for table in r:
  for record in table.records:
    results.append((record.get_field(), record.get_value()))

print(results)

This is a little over-engineered but it works

>>> results
[('consumption', '73.5'), ('consumption', '75'), ('consumption', '73.6'), ('consumption', '71.4'), ('consumption', 
'71.2'), ('consumption', '74.2'), ('consumption', '71.4'), ('consumption', '71.2'), ('consumption', '73.4'), ('cons
umption', '71.6'), ('consumption', '71.4'), ('consumption', '72'), ('consumption', '72.33333333333333'), ('consumpt
ion', '0'), ('consumption', '0'), ('consumption', '0'), ('consumption', '0'), ('consumption', '0'), ('consumption',
 '0'), ('consumption', '0'), ('consumption', '0'), ('consumption', '0'), ('consumption', '0'), ('consumption', '0')
, ('consumption', '0'), ('consumption', '0'), ('consumption', '604.8461538461538'), ('consumption', '604.6'), ('con
sumption', '611.04'), ('consumption', '634.28'), ('consumption', '730.0833333333334'), ('consumption', '760.96'), (
'consumption', '756'), ('consumption', '638.52'), ('consumption', '649.04'), ('consumption', '644.28'), ('consumpti
on', '724.8'), ('consumption', '722.2'), ('consumption', '686.6666666666666')]

So, putting it all together into a fully copy-pastable version.

The crappy spacing inside functions is because my text-editor doesn't seem to copy indents for empty lines, making python bork.

# Creds etc
bucket = "Systemstats"
org = "<my_org>"
token = "<my_token>"
url = "https://eu-central-1-1.aws.cloud2.influxdata.com/"


import pyodide
import micropip
a = await micropip.install("influxdb-client")
a = await micropip.install("ssl")
del(a)

import influxdb_client
from io import StringIO
import csv 

# Pyodide doesn't support multiprocessing, create a dummy object so imports
# don't throw exceptions
import sys
sys.modules['_multiprocessing'] = object

# Get bits from the Influx DB client
from influxdb_client.client.flux_csv_parser import FluxResponseMetadataMode, FluxCsvParser, FluxSerializationMode, _FluxCsvParserMetadata


# We're going to overlay some queries into the class
async def new_query(self, query: str, org=None, params: dict = None):
    # We'll come back to this
    #q = self._create_query(query, self.default_dialect, params)
    #print(q)    
    headers = {
        "Authorization" : "Token " + self._influxdb_client.token,
        "content-type" : "application/vnd.flux"
    }
    url = self._influxdb_client.url  + "api/v2/query?org=" + self._influxdb_client.org
    a = await pyodide.http.pyfetch(url=url, 
                             method="POST", 
                             headers=headers,
                             body=query
                             )
    response = await a.string()
    # The client would call _parser.generator here, but that calls _parse_flux_response() which'll treat
    # us as a readable object, so we implement around itt
    _parser = self._to_tables_parser(response, self._get_query_options(), FluxResponseMetadataMode.only_names)
    list(self.generator(self, response, _parser))
    return _parser.table_list()

# Replacement for 
# https://github.com/influxdata/influxdb-client-python/blob/5168a04983e3b70a9451fa3629fd54db08b91ecd/influxdb_client/client/flux_csv_parser.py#L105
def generator(self, csv, _parser):
    for val in self._parse_flux_response(self, csv, _parser):
        yield val

# Replaces line 115 in the same file
def _parse_flux_response(self, csv_str, _parser):
    metadata = _FluxCsvParserMetadata()
    f = StringIO(csv_str)
    rows = csv.reader(f)
    for row in rows:
        for val in _parser._parse_flux_response_row(metadata, row):
            print(val)
            yield val


# Set up the client
client = influxdb_client.InfluxDBClient(
   url=url,
   token=token,
   org=org
)


query_api = client.query_api()
query = """
from(bucket: "Systemstats")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "power_watts")
  |> filter(fn: (r) => r["_field"] == "consumption")
  |> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
"""



# Overlay our functions into the object
query_api.query  = new_query  
query_api.generator = generator
query_api._parse_flux_response = _parse_flux_response


# Run the query
r = await query_api.query(query_api,query)

# Iterate through the result object
results = []
for table in r:
  for record in table.records:
    results.append((record.get_field(), record.get_value()))

print(results)

Screenshot_20220506_204649

Itch scratched, I might actually get to sleep tonight!

This morning, I've taken this a step further and turned it into something that can be run using Pyscript rather than needing to be pasted into the REPL.

There's a simple web-page based on it published at https://projectsstatic.bentasker.co.uk/MISC/MISC.2/

I've also published a write-up of this and the pyscript stuff here: https://www.bentasker.co.uk/posts/blog/software-development/monkeying-about-with-pyodide-and-pyscript.html