Source code for narrow_down.sqlite

"""Storage backend based on SQLite."""
import sqlite3
from typing import Iterable, List, Optional

from narrow_down.storage import StorageBackend

QUERY_BATCH_SIZE = 500


[docs]class SQLiteStore(StorageBackend): """File-based storage backend for a SimilarityStore based on SQLite."""
[docs] def __init__(self, db_filename: str, partitions: int = 128) -> None: """Create a new empty or connect to an existing SQLite database.""" self.db_filename = db_filename self._connection = sqlite3.connect(self.db_filename, isolation_level="IMMEDIATE") # On reopening we can read the number of partitions from the db partitions_from_db = self._query_setting_sync("__sqlite_partitions") self.partitions = int(partitions_from_db) if partitions_from_db else partitions
[docs] async def initialize( self, ) -> "SQLiteStore": """Initialize the tables in the SQLite database file. Returns: self """ with self._connection as conn: conn.execute( "CREATE TABLE IF NOT EXISTS settings (key TEXT NOT NULL PRIMARY KEY, value TEXT)" ) conn.execute( "CREATE TABLE IF NOT EXISTS documents (id INTEGER NOT NULL PRIMARY KEY, doc BLOB)" ) for i in range(self.partitions): conn.execute( f"CREATE TABLE IF NOT EXISTS buckets_{i} (" "bucket INTEGER NOT NULL, " "hash INTEGER NOT NULL, " "doc_id INTEGER NOT NULL" ")" ) conn.execute("PRAGMA synchronous = OFF") conn.execute("PRAGMA journal_mode = MEMORY") conn.commit() await self.insert_setting("__sqlite_partitions", str(self.partitions)) return self
[docs] async def insert_setting(self, key: str, value: str): """Store a setting as key-value pair.""" with self._connection as conn: conn.execute( "INSERT INTO settings(key,value) VALUES (:key,:value) " "ON CONFLICT(key) DO UPDATE SET value=:value", dict(key=key, value=value), )
[docs] async def query_setting(self, key: str) -> Optional[str]: """Query a setting with the given key. Args: key: The identifier of the setting Returns: A string with the value. If the key does not exist or the storage is uninitialized None is returned. Raises: sqlite3.OperationalError: In case the database query fails for any reason. """ return self._query_setting_sync(key)
def _query_setting_sync(self, key: str) -> Optional[str]: try: cursor = self._connection.execute("SELECT value FROM settings WHERE key=?", (key,)) setting = cursor.fetchone() if setting is not None: return setting[0] return None except sqlite3.OperationalError as e: if "no such table: settings" in e.args: return None raise
[docs] async def insert_document(self, document: bytes, document_id: Optional[int] = None) -> int: """Add the data of a document to the storage and return its ID.""" with self._connection as conn: if document_id: conn.execute( "INSERT INTO documents(id,doc) VALUES (:id,:doc) " "ON CONFLICT(id) DO UPDATE SET doc=:doc", dict(id=document_id, doc=document), ) return document_id else: cursor = conn.execute( "INSERT INTO documents(doc) VALUES (:doc)", dict(doc=document), ) return cursor.lastrowid # type: ignore # (this always works if the insert works)
[docs] async def query_document(self, document_id: int) -> bytes: """Get the data belonging to a document. Args: document_id: The id of the document. This ID is created and returned by the `insert_document` method. Returns: The document stored under the key `document_id` as bytes object. Raises: KeyError: If the document is not stored. """ cursor = self._connection.execute("SELECT doc FROM documents WHERE id=?", (document_id,)) doc = cursor.fetchone() if doc is None: raise KeyError(f"No document with id {document_id}") return doc[0]
[docs] async def query_documents(self, document_ids: List[int]) -> List[bytes]: """Get the data belonging to multiple documents. Args: document_ids: Key under which the data is stored. Returns: The documents stored under the key `document_id` as bytes object. Raises: KeyError: If no document was found for at least one of the ids. """ docs = {} for i in range(0, len(document_ids), QUERY_BATCH_SIZE): doc_id_batch = document_ids[i : i + QUERY_BATCH_SIZE] doc_ids_str = ",".join(map(str, map(int, doc_id_batch))) cursor = self._connection.execute( f"SELECT id, doc FROM documents WHERE id IN ({doc_ids_str})" ) for id_, doc in cursor.fetchall(): docs[id_] = doc return [docs[i] for i in document_ids]
[docs] async def remove_document(self, document_id: int): """Remove a document given by ID from the list of documents.""" with self._connection as conn: conn.execute("DELETE FROM documents WHERE id=?", (document_id,))
[docs] async def add_document_to_bucket(self, bucket_id: int, document_hash: int, document_id: int): """Link a document to a bucket.""" partition = int(document_hash % self.partitions) with self._connection as conn: conn.execute( f"INSERT INTO buckets_{partition}(bucket,hash,doc_id) VALUES (?,?,?)", (bucket_id, document_hash, document_id), )
[docs] async def query_ids_from_bucket(self, bucket_id, document_hash: int) -> Iterable[int]: """Get all document IDs stored in a bucket for a certain hash value.""" partition = int(document_hash % self.partitions) cursor = self._connection.execute( f"SELECT doc_id FROM buckets_{partition} WHERE bucket=? AND hash=?", (bucket_id, document_hash), ) return [r[0] for r in cursor.fetchall()]
[docs] async def remove_id_from_bucket(self, bucket_id: int, document_hash: int, document_id: int): """Remove a document from a bucket.""" with self._connection as conn: partition = int(document_hash % self.partitions) conn.execute( f"DELETE FROM buckets_{partition} " "WHERE bucket=? AND hash=? AND doc_id=?", (bucket_id, document_hash, document_id), )