Intro
I need a data-structure with these properties:
- I can add an element via key-value. If the key is present, the element in the queue should be replaced. If not the element should be queued (FIFO).
- It is not a big problem if the consumer consumes twice as operations are maybe expensive but idempotent.
- No update should be missed, as updated elements are more worth than their original counterparts.
- The consumer has to acknowledge that it successfully processed an element. Processing it may take a while.
- In my case: There are many producers but only one consumer.
Potential problems, challenges and notes.
- What happens if an element is currently taken by a consumer, but not acknowledged. In this case we have to enqueue it.
- Take has to be atomic.
Features:
- Queue-Like Behavior: The basic functionality of adding and removing elements in a first-in, first-out (FIFO) order.
- Key-Based Access: Each element in the queue has a unique key, similar to elements in a map or dictionary.
- Replacement on Duplication: If an element with a given key is added and that key already exists in the queue, the existing element is replaced.
- Acknowledgment of Processing: The consumer of the queue must explicitly acknowledge successful processing of each element.
In computer science terms, this structure could be thought of as a "Keyed Acknowledging Queue" or a "Map-Backed Acknowledging Queue". However, these names are descriptive rather than standard terms.
Implementing this data structure would likely require a combination of a queue for the ordering and a hash map (or similar structure) for the key-based access and replacement functionality. The acknowledgment aspect would need to be handled with additional logic to track the processing status of each element.
Implementation:
Here is a quick and dirty draft:
from typing import Any from sqlalchemy import create_engine, Column, String, Integer, JSON, Boolean, text from sqlalchemy.orm import sessionmaker, declarative_base import json Base = declarative_base() class Inbox: ''' Multi-Producers, Single-Consumer key-value queue with commit and rollback. ''' def __init__(self, db_uri: str) -> None: self.engine = create_engine(db_uri) self.Session = sessionmaker(bind=self.engine) self.session = self.Session() def __enter__(self) -> 'Inbox': create_table_sql = text(""" CREATE TABLE IF NOT EXISTS queue_elements ( id SERIAL PRIMARY KEY, key VARCHAR, data BLOB, locked BOOLEAN DEFAULT FALSE ) """) self.session.execute(create_table_sql) return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.session.close() def add(self, key: str, data: Any) -> None: # First, try updating the most recent (highest ID) unlocked row with the given key update_sql = text(""" UPDATE queue_elements SET data = :data WHERE key = :key AND locked = false AND id = ( SELECT MAX(id) FROM queue_elements WHERE key = :key ) """) result = self.session.execute(update_sql, {'key': key, 'data': json.dumps(data).encode('utf-8')}) # If no row was updated, insert a new row if result.rowcount == 0: insert_sql = text(""" INSERT INTO queue_elements (key, data, locked) VALUES (:key, :data, false) """) self.session.execute(insert_sql, {'key': key, 'data': json.dumps(data).encode('utf-8')}) self.session.commit() def commit(self) -> None: sql = text(""" DELETE FROM queue_elements WHERE locked = true """) self.session.execute(sql) self.session.commit() def rollback(self) -> None: sql = text(""" UPDATE queue_elements SET locked = false """) self.session.execute(sql) self.session.commit() def take(self) -> tuple[str, Any]: # SQL to select the oldest element take_sql = text(""" SELECT id, key, data, locked FROM queue_elements ORDER BY id LIMIT 1 """) result = self.session.execute(take_sql).fetchone() if not result: # Queue is empty: Return None if no elements are available return None element_id, key, data, locked = result if locked: # Raise an error if the selected element is locked raise Exception("Inbox is locked. Call commit() or rollback() first.") # Update the element to set locked = true lock_sql = text(""" UPDATE queue_elements SET locked = true WHERE id = :id """) self.session.execute(lock_sql, {'id': element_id}) # Commit the transaction self.session.commit() # Return key and data if the element is not locked return key, json.loads(data.decode('utf-8')) with Inbox('sqlite:///inbox.db') as inbox: inbox.add('a', {'a': 1}) inbox.add('b', {'b': 2}) print(inbox.take()) # ('a', {'a': 1}) try: print(inbox.take()) print('this should not happen') except Exception as e: print('as expected: locked --', e) inbox.commit() print(inbox.take()) # ('b', {'b': 2}) inbox.rollback() inbox.take() # ('b', {'b': 2}) inbox.commit() print(inbox.take()) # None
No comments:
Post a Comment