project Utilities / Python Influxdb Downsample avatar

utilities/python_influxdb_downsample#17: Container becomes stuck



Issue Information

Issue Type: issue
Status: closed
Reported By: btasker
Assigned To: btasker

Created: 09-Feb-23 00:00



Description

One of my cron tasks got itself stuck.

The underlying cause was actually my own fault, I accidentally removed the docker container I was using as an InfluxDB output and forgot to create a retention policy when I brought the new one up

2023-02-08 23:15:39.056887: Failed to write back to: websites/analytics, exception: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '80150e03-a806-11ed-802a-0242ac110002', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'retention policy not found: analytics', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '80150e03-a806-11ed-802a-0242ac110002', 'Date': 'Wed, 08 Feb 2023 23:15:39 GMT', 'Content-Length': '50'})
HTTP response body: {"error":"retention policy not found: analytics"}

But, shit goes down sometimes, the downsampler needs to be able to handle it.

I'm guessing the issue has prevented close() from completing on the output



Toggle State Changes

Activity


assigned to @btasker

I've created the retention policy now, I want to see whether the container ever actually manages to complete (I assume not)

If I strace it, I just get

clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1438366, tv_nsec=978518494}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1438366, tv_nsec=979255844}) = 0
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1438367, tv_nsec=79255844}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1438367, tv_nsec=79775567}) = 0
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1438367, tv_nsec=179775567}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1438367, tv_nsec=180567114}) = 0

I'm guessing that that's close() backing off on checking whether the queue has been flushed yet.

We can be reasonably sure it's not a write retry backoff because the loglines we have are only printed when all tries have been exhausted.

Yeah, so looking at the code, close() is defined here, but simply calls __del__ (defined here)

That function is

    def __del__(self):
        """Close WriteApi."""
        if self._subject:
            self._subject.on_completed()
            self._subject.dispose()
            self._subject = None

            # Wait for finish writing
            while not self._disposable.is_disposed:
                sleep(0.1)

        if self._disposable:
            self._disposable = None
        pass

I'd be willing to bet we're stuck in that while loop

No movement overnight

OK, quick notes

the attribute _disposable is populated when in batching mode (here) and is a reactivex.subject (details on that here, but basically, an observable).

Going back to the InfluxDB client then, the call chain is

         self._subject.on_completed()

Which calls this function.

Then

self._subject.dispose()

Calling this function, this is where is_disposed should get flipped

    def dispose(self) -> None:
        """Unsubscribe all observers and release resources."""

        with self.lock:
            self.is_disposed = True
            self.observers = []
            self.exception = None
            super().dispose()

self.lock is a threading RLock, so perhaps we're stuck waiting for that lock?

I'm wondering if the implementation of #16 might somehow have contributed to this.

With the configuration that was in use for the broken run, there were two client objects created to point to the same InfluxDB endpoint: one for writes, and another for stats.

I wonder if that might have led to some non-thread-safe behaviour somewhere.

It'd make more sense than the client simply locking up when write failures happen, issue trackers would be full of complaints if that were the case.

If that's the case, then we should be able to repro this

  • Create a new InfluxDB 1.x instance
  • Create a DB/RP to write stats into
  • Don't create one for downsampled data to go into
  • Run a config with stats enabled, writing output data into a non-existent bucket

If the theory is correct, this should lock up. If we then turn stats off and re-run, it should still fail to write, but exit cleanly. Then, as a final check, should create the output bucket, reenable stats just to prove it all works.

OK, repro setup.

config.yaml

influxes: 
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60
    outx:
         url: http://192.168.3.20:18086
         org: "myorg"
         token: ""
         timeout: 60



stats:
    active: True
    influx: outx
    bucket: telegraf/autogen
    measurement: downsample_stats
    tags:
        role: "hourlydownsamples"
        host: "$HOSTNAME"


downsamples: 
    downsample_website_bunnycdn_stats:
        # Name for the task
        name: "Downsample Website Response Times"
        influx: home1x

        # Taken from range()
        period: 60

        # Take from aggregate Window
        window: 15

        # taken from in_bucket
        bucket: websites/autogen

        measurement:
            - bunnycdn
            - website_response_times

        # We want to generate each of the aggregates
        # and append the suffixes created in the map() calls
        aggregates: 
            max:

        # The query writes to multiple outputs
        output_influx: 
            - influx: outx
        output_bucket: websites/analytics

InfluxDB setup

ben@optimus:~/tmp/ds17$ docker run -d --rm -p 18086:8086 influxdb:1.8.10
ben@optimus:~/tmp/ds17$ docker exec -it 9284e794a223 influx
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.8.10
> create database telegraf

Running a downsample

docker run \
--name="ds17_test" \
--rm \
-h $HOSTNAME \
-v $PWD/config.yaml:/config.yaml \
bentasker12/downsample:0.2-rc1

We get the failure back nice and quickly

2023-02-09 09:17:46.268223: Starting Job: Downsample Website Response Times
2023-02-09 09:17:46.269003: Running Query: from(bucket: "websites/autogen" )\n|> range(start: 2023-02-09T08:17:46.267124Z, stop: 2023-02-09T09:17:46.267124Z)\n|> filter(fn:(r) =>  r._measurement == "bunnycdn" or r._measurement == "website_response_times" )\n|> window(every: 15m)
2023-02-09 09:17:46.393499: Processing Results
2023-02-09 09:17:46.394700: Inserting 234 new rows
2023-02-09 09:17:46.400809: Job Complete: Downsample Website Response Times
2023-02-09 09:17:46.400850: Flushing queue for outx
2023-02-09 09:17:46.415684: Failed to write back to: websites/analytics, exception: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '9daa5e38-a85a-11ed-8003-0242ac110005', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'database not found: "websites"', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '9daa5e38-a85a-11ed-8003-0242ac110005', 'Date': 'Thu, 09 Feb 2023 09:17:46 GMT', 'Content-Length': '45'})
HTTP response body: {"error":"database not found: \"websites\""}

Worth noting, the status code is different this time. In this test, the database is missing, whereas the thing that started this was a missing RP. It shouldn't make a difference, but if this doesn't repro we'll need to correct that.

root@optimus:/home/ben/tmp/ds17# strace -p 2548042
strace: Process 2548042 attached
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1471513, tv_nsec=131218788}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1471513, tv_nsec=131630057}) = 0
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1471513, tv_nsec=231630057}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1471513, tv_nsec=232111936}) = 0
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1471513, tv_nsec=332111936}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1471513, tv_nsec=332533542}) = 0
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, {tv_sec=1471513, tv_nsec=432533542}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {tv_sec=1471513, tv_nsec=433081003}) = 0

We've ended up in the same state. Repro successful

OK, test 2, same test, different config config.yaml

influxes: 
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60
    outx:
         url: http://192.168.3.20:18086
         org: "myorg"
         token: ""
         timeout: 60



stats:
    # Stats disabled
    active: False
    influx: outx
    bucket: telegraf/autogen
    measurement: downsample_stats
    tags:
        role: "hourlydownsamples"
        host: "$HOSTNAME"


downsamples: 
    downsample_website_bunnycdn_stats:
        # Name for the task
        name: "Downsample Website Response Times"
        influx: home1x

        # Taken from range()
        period: 60

        # Take from aggregate Window
        window: 15

        # taken from in_bucket
        bucket: websites/autogen

        measurement:
            - bunnycdn
            - website_response_times

        # We want to generate each of the aggregates
        # and append the suffixes created in the map() calls
        aggregates: 
            max:

        # The query writes to multiple outputs
        output_influx: 
            - influx: outx
        output_bucket: websites/analytics

Running the sample

ben@optimus:~/tmp/ds17$ docker run --name="ds17_test" --rm -h $HOSTNAME -v $PWD/config.yaml:/config.yaml bentasker12/downsample:0.2-rc1
2023-02-09 09:20:58.246022: Starting Job: Downsample Website Response Times
2023-02-09 09:20:58.266272: Running Query: from(bucket: "websites/autogen" )\n|> range(start: 2023-02-09T08:20:58.246013Z, stop: 2023-02-09T09:20:58.246013Z)\n|> filter(fn:(r) =>  r._measurement == "bunnycdn" or r._measurement == "website_response_times" )\n|> window(every: 15m)
2023-02-09 09:20:58.388956: Processing Results
2023-02-09 09:20:58.390190: Inserting 234 new rows
2023-02-09 09:20:58.396909: Job Complete: Downsample Website Response Times
2023-02-09 09:20:58.396940: Flushing queue for outx
2023-02-09 09:20:58.403550: Failed to write back to: websites/analytics, exception: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '101a8590-a85b-11ed-8005-0242ac110005', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'database not found: "websites"', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '101a8590-a85b-11ed-8005-0242ac110005', 'Date': 'Thu, 09 Feb 2023 09:20:58 GMT', 'Content-Length': '45'})
HTTP response body: {"error":"database not found: \"websites\""}


The script exited correctly.

Third test - stats are being sent to a different URL (but the same underlying instance, it's just an aliased IP on the box)

influxes: 
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60
    outx:
         url: http://192.168.3.20:18086
         org: "myorg"
         token: ""
         timeout: 60
    outx2:
         # Same box, just an aliased IP
         url: http://192.168.3.21:18086
         org: "myorg"
         token: ""
         timeout: 60


stats:
    active: True
    influx: outx2
    bucket: telegraf/autogen
    measurement: downsample_stats
    tags:
        role: "hourlydownsamples"
        host: "$HOSTNAME"


downsamples: 
    downsample_website_bunnycdn_stats:
        # Name for the task
        name: "Downsample Website Response Times"
        influx: home1x

        # Taken from range()
        period: 60

        # Take from aggregate Window
        window: 15

        # taken from in_bucket
        bucket: websites/autogen

        measurement:
            - bunnycdn
            - website_response_times

        # We want to generate each of the aggregates
        # and append the suffixes created in the map() calls
        aggregates: 
            max:

        # The query writes to multiple outputs
        output_influx: 
            - influx: outx
        output_bucket: websites/analytics

Oooo, interesting... it locks up:

ben@optimus:~/tmp/ds17$ docker run --name="ds17_test" --rm -h $HOSTNAME -v $PWD/config.yaml:/config.yaml bentasker12/downsample:0.2-rc1
2023-02-09 10:41:27.949809: Starting Job: Downsample Website Response Times
2023-02-09 10:41:27.950710: Running Query: from(bucket: "websites/autogen" )\n|> range(start: 2023-02-09T09:41:27.948875Z, stop: 2023-02-09T10:41:27.948875Z)\n|> filter(fn:(r) =>  r._measurement == "bunnycdn" or r._measurement == "website_response_times" )\n|> window(every: 15m)
2023-02-09 10:41:28.070193: Processing Results
2023-02-09 10:41:28.071439: Inserting 249 new rows
2023-02-09 10:41:28.077946: Job Complete: Downsample Website Response Times
2023-02-09 10:41:28.077974: Flushing queue for outx
2023-02-09 10:41:28.093438: Failed to write back to: websites/analytics, exception: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '4ed29142-a866-11ed-8006-0242ac110005', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'database not found: "websites"', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '4ed29142-a866-11ed-8006-0242ac110005', 'Date': 'Thu, 09 Feb 2023 10:41:28 GMT', 'Content-Length': '45'})
HTTP response body: {"error":"database not found: \"websites\""}

That's unexpected.

I'll have to do a run writing stats out to an entirely different instance to prove whether it's the existence of the stats object or something else

Sending the stats write to the source instance

influxes: 
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60
    outx:
         url: http://192.168.3.20:18086
         org: "myorg"
         token: ""
         timeout: 60
    outx2:
         # Same box, just an aliased IP
         url: http://192.168.3.21:18086
         org: "myorg"
         token: ""
         timeout: 60


stats:
    active: True
    influx: home1x
    bucket: testing_db
    measurement: downsample_stats
    tags:
        role: "hourlydownsamples"
        host: "$HOSTNAME"

downsamples: 
    downsample_website_bunnycdn_stats:
        # Name for the task
        name: "Downsample Website Response Times"
        influx: home1x

        # Taken from range()
        period: 60

        # Take from aggregate Window
        window: 15

        # taken from in_bucket
        bucket: websites/autogen

        measurement:
            - bunnycdn
            - website_response_times

        # We want to generate each of the aggregates
        # and append the suffixes created in the map() calls
        aggregates: 
            max:

        # The query writes to multiple outputs
        output_influx: 
            - influx: outx
        output_bucket: websites/analytics

Still hangs.

So, somehow it's the existence of the stats writer which is the issue.

I wonder if it's a timing issue?

If we step through the command output and look at where in the call chain we are


# Query being run 2023-02-09 10:48:33.042108: Running Query: from(bucket: "websites/autogen" )\n|> range(start: 2023-02-09T09:48:33.040384Z, stop: 2023-02-09T10:48:33.040384Z)\n|> filter(fn:(r) => r._measurement == "bunnycdn" or r._measurement == "website_response_times" )\n|> window(every: 15m) # Aggregates calculated and writers being buffered 2023-02-09 10:48:33.160557: Processing Results 2023-02-09 10:48:33.161849: Inserting 264 new rows # The job finished 2023-02-09 10:48:33.168631: Job Complete: Downsample Website Response Times # We've called flush_outputs, so `.close()` is being called on the outx handle 2023-02-09 10:48:33.168660: Flushing queue for outx # We've run out of retries for our writes (to outx, which is now trying to close) 2023-02-09 10:48:33.176413: Failed to write back to: websites/analytics, exception: (404) Reason: Not Found HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '4c312b64-a867-11ed-8008-0242ac110005', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'database not found: "websites"', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '4c312b64-a867-11ed-8008-0242ac110005', 'Date': 'Thu, 09 Feb 2023 10:48:33 GMT', 'Content-Length': '45'}) HTTP response body: {"error":"database not found: \"websites\""} # As part of handling/recording the failure, we'll attempt to queue a stat write

We print the logline before calling close() and before closing the stats handler:

        section_run_start = time.time_ns()
        for op_influx_id in self.handles['write']:
            self.output_line(f"Flushing queue for {op_influx_id}")
            self.handles['write'][op_influx_id].close()



        fields["flush_run_time"] = time.time_ns() - section_run_start
        self.write_stat(fields, tags)
        # Close the stats output too
        if self.stats_active:
            self.output_line("Flushing stats queue")
            self.stats_writer.close()

So we know that at the time of the lockup

  • close() has only been called on output outx
  • The stats handle is still active, so it should be possible to write to it

We can add certainty to the second point though by disabling the failure stats write and seeing whether we can still repro

Updated to

    def recordFailure(self, conf: (str, str, str), data: str, exception):
        ''' This used to write out a gzip'd YAML file containing details of a batch

        Now, it just logs an error
        '''
        self.output_line(f"Failed to write back to: {conf[0]}, exception: {exception}", True)
        tags = {
                "bucket" : conf[0]
            }
        fields = {
                "failures" : 1,
                "message" : exception
            }

        #self.write_stat(fields, tags)

Running (same config as above)

ben@optimus:~/tmp/ds17$ docker run --name="ds17_test" --rm -h $HOSTNAME -v $PWD/config.yaml:/config.yaml bentasker12/downsample:0.2-rc1
2023-02-09 10:57:17.207540: Starting Job: Downsample Website Response Times
2023-02-09 10:57:17.208293: Running Query: from(bucket: "websites/autogen" )\n|> range(start: 2023-02-09T09:57:17.206714Z, stop: 2023-02-09T10:57:17.206714Z)\n|> filter(fn:(r) =>  r._measurement == "bunnycdn" or r._measurement == "website_response_times" )\n|> window(every: 15m)
2023-02-09 10:57:17.329006: Processing Results
2023-02-09 10:57:17.330239: Inserting 256 new rows
2023-02-09 10:57:17.337102: Job Complete: Downsample Website Response Times
2023-02-09 10:57:17.337137: Flushing queue for outx
2023-02-09 10:57:17.354882: Failed to write back to: websites/analytics, exception: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '84a06e41-a868-11ed-8009-0242ac110005', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'database not found: "websites"', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '84a06e41-a868-11ed-8009-0242ac110005', 'Date': 'Thu, 09 Feb 2023 10:57:17 GMT', 'Content-Length': '45'})
HTTP response body: {"error":"database not found: \"websites\""}


2023-02-09 10:57:17.450304: Flushing stats queue

There we have it.

So to summarise:

  • It's not possible to write into the stats output whilst another is being closed
  • It's not possible for an output to close whilst retries are in process (or we're still in the error_callback)

It's obviously racey - the output won't close because we haven't finished handling it's error, but we can't finish handling the error because that involves writing a stat to another handler and that's not possible whilst this one's in close.

I assume it's not actually specific to the stats output, and that it's actually not possible to write into any output whilst another is being closed. There isn't an easy way to test that in this codebase though.

We should be able to prevent the issue by setting a flag within the downsample object to note that we're in a closing state and then checking the truthiness of that flag in recordFailure

Implementing that change ```diff --git a/app/downsample.py b/app/downsample.py index 0cefbb1..6e15d80 100755 --- a/app/downsample.py +++ b/app/downsample.py @@ -52,6 +52,8 @@ class Downsample: } self.dry_run = dry_run self.quiet = False + # Added for utilities/python_influxdb_downsample#17 + self.in_closing = False

     self.record_failures = config['failures']['record']
     self.record_path = config['failures']['path']

@@ -575,15 +577,17 @@ class Downsample: Now, it just logs an error ''' self.output_line(f"Failed to write back to: {conf[0]}, exception: {exception}", True) - tags = { - "bucket" : conf[0] - } - fields = { - "failures" : 1, - "message" : exception - }

  • self.write_stat(fields, tags)
  • if not self.in_closing:
  • tags = {
  • "bucket" : conf[0]
  • }
  • fields = {
  • "failures" : 1,
  • "message" : exception
  • }
  •      self.write_stat(fields, tags)
    

    def write_stat(self, fields, tags): @@ -621,6 +625,9 @@ class Downsample: ''' Iterate over the InfluxDB writers and ensure their buffers are flushed '''

  •  # Mark that we're trying to close
    
  • self.in_closing = True
  • # Write the closing stats fields = { "points_processed" : self.points_processed,

Running with the same config as above

ben@optimus:~/tmp/ds17$ docker run --name="ds17_test" --rm -h $HOSTNAME -v $PWD/config.yaml:/config.yaml bentasker12/downsample:0.2-rc1 2023-02-09 11:09:37.651988: Starting Job: Downsample Website Response Times 2023-02-09 11:09:37.652905: Running Query: from(bucket: "websites/autogen" )\n|> range(start: 2023-02-09T10:09:37.650843Z, stop: 2023-02-09T11:09:37.650843Z)\n|> filter(fn:(r) => r._measurement == "bunnycdn" or r._measurement == "website_response_times" )\n|> window(every: 15m) 2023-02-09 11:09:37.763448: Processing Results 2023-02-09 11:09:37.764622: Inserting 212 new rows 2023-02-09 11:09:37.770243: Job Complete: Downsample Website Response Times 2023-02-09 11:09:37.770272: Flushing queue for outx 2023-02-09 11:09:37.783855: Failed to write back to: websites/analytics, exception: (404) Reason: Not Found HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Request-Id': '3df50489-a86a-11ed-800a-0242ac110005', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Error': 'database not found: "websites"', 'X-Influxdb-Version': '1.8.10', 'X-Request-Id': '3df50489-a86a-11ed-800a-0242ac110005', 'Date': 'Thu, 09 Feb 2023 11:09:37 GMT', 'Content-Length': '45'}) HTTP response body: {"error":"database not found: \"websites\""}

2023-02-09 11:09:37.880352: Flushing stats queue ```

Awesome

verified

mentioned in commit c9456395ebe07709b3c88445d1a65afd5779a8d7

Commit: c9456395ebe07709b3c88445d1a65afd5779a8d7 
Author: B Tasker                            
                            
Date: 2023-02-09T11:11:53.000+00:00 

Message

fix: Prevent failure stats being written when outputs are closing (utilities/python_influxdb_downsample#17)

This addresses a race condition which can lead to process lockup

+15 -8 (23 lines changed)

I think I've now got to the bottom of the behaviour.

It's not actually caused by there being two client instances active, that was me misleading myself.

The crucial bit is that the error_callback needs to result in an exception being thrown.

That exception will never be seen by the user, but will result in the writer thread dying in the background, as a result this while loop will continue indefinitely.

I created a simple repro of the behaviour: repro.py

I've raised https://github.com/influxdata/influxdb-client-python/issues/558 to improve handling in the client.

I do also need to work out what the exception was in this case, to ensure that it won't be thrown at other points.

Pull Request with fixes in the client lib is at https://github.com/influxdata/influxdb-client-python/pull/559

I'm no longer able to reproduce the original failure even with the fixes/workaround removed.

If I call recordFailure() directly it completes successfully and doesn't throw an exception. Whatever the original failure was, it was either transient or has since been fixed. Either way, the new changes should protect against repeats.