Intro
I need a data-structure with these properties:
- I can add an element via key-value. If the key is present, the element in the queue should be replaced. If not the element should be queued (FIFO).
- It is not a big problem if the consumer consumes twice as operations are maybe expensive but idempotent.
- No update should be missed, as updated elements are more worth than their original counterparts.
- The consumer has to acknowledge that it successfully processed an element. Processing it may take a while.
- In my case: There are many producers but only one consumer.
Potential problems, challenges and notes.
- What happens if an element is currently taken by a consumer, but not acknowledged. In this case we have to enqueue it.
- Take has to be atomic.
Features:
- Queue-Like Behavior: The basic functionality of adding and removing elements in a first-in, first-out (FIFO) order.
- Key-Based Access: Each element in the queue has a unique key, similar to elements in a map or dictionary.
- Replacement on Duplication: If an element with a given key is added and that key already exists in the queue, the existing element is replaced.
- Acknowledgment of Processing: The consumer of the queue must explicitly acknowledge successful processing of each element.
In computer science terms, this structure could be thought of as a "Keyed Acknowledging Queue" or a "Map-Backed Acknowledging Queue". However, these names are descriptive rather than standard terms.
Implementing this data structure would likely require a combination of a queue for the ordering and a hash map (or similar structure) for the key-based access and replacement functionality. The acknowledgment aspect would need to be handled with additional logic to track the processing status of each element.
Implementation:
Here is a quick and dirty draft:
from typing import Any
from sqlalchemy import create_engine, Column, String, Integer, JSON, Boolean, text
from sqlalchemy.orm import sessionmaker, declarative_base
import json
Base = declarative_base()
class Inbox:
'''
Multi-Producers, Single-Consumer key-value queue with commit and rollback.
'''
def __init__(self, db_uri: str) -> None:
self.engine = create_engine(db_uri)
self.Session = sessionmaker(bind=self.engine)
self.session = self.Session()
def __enter__(self) -> 'Inbox':
create_table_sql = text("""
CREATE TABLE IF NOT EXISTS queue_elements (
id SERIAL PRIMARY KEY,
key VARCHAR,
data BLOB,
locked BOOLEAN DEFAULT FALSE
)
""")
self.session.execute(create_table_sql)
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.session.close()
def add(self, key: str, data: Any) -> None:
# First, try updating the most recent (highest ID) unlocked row with the given key
update_sql = text("""
UPDATE queue_elements
SET data = :data
WHERE key = :key AND locked = false
AND id = (
SELECT MAX(id)
FROM queue_elements
WHERE key = :key
)
""")
result = self.session.execute(update_sql, {'key': key, 'data': json.dumps(data).encode('utf-8')})
# If no row was updated, insert a new row
if result.rowcount == 0:
insert_sql = text("""
INSERT INTO queue_elements (key, data, locked)
VALUES (:key, :data, false)
""")
self.session.execute(insert_sql, {'key': key, 'data': json.dumps(data).encode('utf-8')})
self.session.commit()
def commit(self) -> None:
sql = text("""
DELETE FROM queue_elements
WHERE locked = true
""")
self.session.execute(sql)
self.session.commit()
def rollback(self) -> None:
sql = text("""
UPDATE queue_elements
SET locked = false
""")
self.session.execute(sql)
self.session.commit()
def take(self) -> tuple[str, Any]:
# SQL to select the oldest element
take_sql = text("""
SELECT id, key, data, locked
FROM queue_elements
ORDER BY id
LIMIT 1
""")
result = self.session.execute(take_sql).fetchone()
if not result:
# Queue is empty: Return None if no elements are available
return None
element_id, key, data, locked = result
if locked:
# Raise an error if the selected element is locked
raise Exception("Inbox is locked. Call commit() or rollback() first.")
# Update the element to set locked = true
lock_sql = text("""
UPDATE queue_elements
SET locked = true
WHERE id = :id
""")
self.session.execute(lock_sql, {'id': element_id})
# Commit the transaction
self.session.commit()
# Return key and data if the element is not locked
return key, json.loads(data.decode('utf-8'))
with Inbox('sqlite:///inbox.db') as inbox:
inbox.add('a', {'a': 1})
inbox.add('b', {'b': 2})
print(inbox.take()) # ('a', {'a': 1})
try:
print(inbox.take())
print('this should not happen')
except Exception as e:
print('as expected: locked --', e)
inbox.commit()
print(inbox.take()) # ('b', {'b': 2})
inbox.rollback()
inbox.take() # ('b', {'b': 2})
inbox.commit()
print(inbox.take()) # None
No comments:
Post a Comment