Thursday, 4 January 2024

Snippet, Python: Persistent Key Acknowledged Queue - An Inbox


I need a data-structure with these properties:

  • I can add an element via key-value. If the key is present, the element in the queue should be replaced. If not the element should be queued (FIFO).
  • It is not a big problem if the consumer consumes twice as operations are maybe expensive but idempotent.
  • No update should be missed, as updated elements are more worth than their original counterparts.
  • The consumer has to acknowledge that it successfully processed an element. Processing it may take a while.
  • In my case: There are many producers but only one consumer.
Potential problems, challenges and notes.
  • What happens if an element is currently taken by a consumer, but not acknowledged. In this case we have to enqueue it.
  • Take has to be atomic.


  • Queue-Like Behavior: The basic functionality of adding and removing elements in a first-in, first-out (FIFO) order.
  • Key-Based Access: Each element in the queue has a unique key, similar to elements in a map or dictionary.
  • Replacement on Duplication: If an element with a given key is added and that key already exists in the queue, the existing element is replaced.
  • Acknowledgment of Processing: The consumer of the queue must explicitly acknowledge successful processing of each element.

In computer science terms, this structure could be thought of as a "Keyed Acknowledging Queue" or a "Map-Backed Acknowledging Queue". However, these names are descriptive rather than standard terms.

Implementing this data structure would likely require a combination of a queue for the ordering and a hash map (or similar structure) for the key-based access and replacement functionality. The acknowledgment aspect would need to be handled with additional logic to track the processing status of each element.


Here is a quick and dirty draft:

from typing import Any
from sqlalchemy import create_engine, Column, String, Integer, JSON, Boolean, text
from sqlalchemy.orm import sessionmaker, declarative_base
import json

Base = declarative_base()

class Inbox:
    Multi-Producers, Single-Consumer key-value queue with commit and rollback.

    def __init__(self, db_uri: str) -> None:
        self.engine = create_engine(db_uri)
        self.Session = sessionmaker(bind=self.engine)
        self.session = self.Session()

    def __enter__(self) -> 'Inbox':
        create_table_sql = text("""
            CREATE TABLE IF NOT EXISTS queue_elements (
                id SERIAL PRIMARY KEY,
                key VARCHAR,
                data BLOB,
                locked BOOLEAN DEFAULT FALSE
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:

    def add(self, key: str, data: Any) -> None:

        # First, try updating the most recent (highest ID) unlocked row with the given key
        update_sql = text("""
            UPDATE queue_elements
            SET data = :data
            WHERE key = :key AND locked = false
            AND id = (
                SELECT MAX(id)
                FROM queue_elements
                WHERE key = :key
        result = self.session.execute(update_sql, {'key': key, 'data': json.dumps(data).encode('utf-8')})
        # If no row was updated, insert a new row
        if result.rowcount == 0:
            insert_sql = text("""
                INSERT INTO queue_elements (key, data, locked)
                VALUES (:key, :data, false)
            self.session.execute(insert_sql, {'key': key, 'data': json.dumps(data).encode('utf-8')})


    def commit(self) -> None:
        sql = text("""
            DELETE FROM queue_elements
            WHERE locked = true

    def rollback(self) -> None:
        sql = text("""
            UPDATE queue_elements
            SET locked = false

    def take(self) -> tuple[str, Any]:

        # SQL to select the oldest element
        take_sql = text("""
            SELECT id, key, data, locked
            FROM queue_elements
            ORDER BY id
            LIMIT 1
        result = self.session.execute(take_sql).fetchone()

        if not result: 
            # Queue is empty: Return None if no elements are available
            return None

        element_id, key, data, locked = result

        if locked:
            # Raise an error if the selected element is locked
            raise Exception("Inbox is locked. Call commit() or rollback() first.")

        # Update the element to set locked = true
        lock_sql = text("""
            UPDATE queue_elements
            SET locked = true
            WHERE id = :id
        self.session.execute(lock_sql, {'id': element_id})

        # Commit the transaction

        # Return key and data if the element is not locked
        return key, json.loads(data.decode('utf-8'))

with Inbox('sqlite:///inbox.db') as inbox:
    inbox.add('a', {'a': 1})
    inbox.add('b', {'b': 2})
    print(inbox.take()) # ('a', {'a': 1})
        print('this should not happen')
    except Exception as e:
        print('as expected: locked --', e)
    print(inbox.take()) # ('b', {'b': 2})
    inbox.take() # ('b', {'b': 2})
    print(inbox.take()) # None

No comments:

Post a Comment

Parse Wikipedia dump

""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...