Tuesday, 2 January 2024

Playground, Script: Python Tika

import subprocess
import os
import time
import requests
import hashlib
import logging
from tika import parser
import socket
from pathlib import Path

logging.getLogger().setLevel(logging.INFO)


logger = logging.getLogger(__name__)

class TikaServerManager:
    """
    Manages an Apache Tika server instance. Downloads the Tika server JAR if not present,
    checks for an open port to start the server, and provides file parsing capabilities.
    """

    TIKA_VERSION = "3.0.0-BETA"
    TIKA_SERVER_JAR_URL = f"https://dlcdn.apache.org/tika/{TIKA_VERSION}/tika-server-standard-{TIKA_VERSION}.jar"

    def __init__(self, tika_jar_path:Path|str, server_port:int=9998):
        """
        :param tika_jar_path: Path to the Tika server JAR file.
        :param server_port: Port number for the Tika server.
        """

        if isinstance(tika_jar_path, str):
            tika_jar_path = Path(tika_jar_path)

        self._tika_jar_path = tika_jar_path
        self._server_port = server_port
        self._server_url = f'http://localhost:{self._server_port}/'
        self._server_process = None


    def parse_file(self, path:Path|str):
        if isinstance(path, str):
            path = Path(path)
        assert path.is_file() 
        parser.TikaClientOnly = True
        return parser.from_file(str(path), self._server_url)


    def __enter__(self):
        """
        Context manager entry point. Starts the Tika server.
        """
        logger.info("Starting Tika server...")
        if not self._tika_jar_path.is_file():
            self._download_tika_server_jar()
        if not self._tika_jar_path.with_suffix('.md5').is_file():
            self._check_or_create_tika_jar_md5()


        self._check_port_and_start_server()
        self._wait_for_server_to_start()

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Context manager exit point. Stops the Tika server.
        """

        logger.info("Stopping Tika server...")
        if self._server_process:
            self._server_process.terminate()
            self._server_process.wait()

    def _check_port_and_start_server(self):
        """
        Checks if the specified server port is in use. If in use, checks if it's by a Tika server.
        If not by a Tika server, finds a free port and starts the Tika server.
        """
        if self._is_port_in_use(self._server_port):
            logger.info(f"Port {self._server_port} is in use. Checking if it's a Tika server...")
            if not self._is_tika_server(self._server_port):
                self._server_port = self._find_free_port()
                logger.info(f"Using free port {self._server_port} for Tika server.")
            else:
                logger.info(f"Tika server is already running on port {self._server_port}.")
                return
        self._server_url = f'http://localhost:{self._server_port}/'
        self._server_process = subprocess.Popen(['java', '-jar', str(self._tika_jar_path.absolute()), '--port', str(self._server_port)])

    def _is_port_in_use(self, port:int) -> bool:
        """
        :param port: Port number to check.
        :return: True if port is in use, False otherwise.
        """
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            return s.connect_ex(('localhost', port)) == 0
    
    def _is_tika_server(self, port:int) -> bool:
        """
        Checks if a Tika server is running on the specified port.

        :param port: Port number to check.
        :return: True if a Tika server is running, False otherwise.
        """
        try:
            response = requests.get(f'http://localhost:{port}/')
            return "Welcome to the Apache Tika" in response.text
        except requests.ConnectionError:
            return False

    def _find_free_port(self) -> int:
        """
        Finds a free port by checking ports randomly within the dynamic/private range.

        :return: A free port number.
        """
        import random
        while True:
            port = random.randint(49152, 65535)  # Dynamic and/or Private Ports range
            if not self._is_port_in_use(port):
                return port

    def _download_tika_server_jar(self):
        """
        Downloads the Tika server JAR file.
        """
        logger.info("Downloading Tika server jar...")
        self._tika_jar_path.parent.mkdir(parents=True, exist_ok=True)
        response = requests.get(self.TIKA_SERVER_JAR_URL, stream=True)
        if response.status_code == 200:
            with self._tika_jar_path.open('wb') as f:
                f.write(response.content)
        else:
            logger.error("Failed to download Tika server JAR")
            raise Exception("Failed to download Tika server JAR")

    def _check_or_create_tika_jar_md5(self):
        """
        Checks if an MD5 checksum file exists for the Tika server JAR.
        If not, creates it.
        """
        md5_file = self._tika_jar_path.with_suffix('.md5')
        if not os.path.isfile(md5_file):
            logger.info("Creating MD5 checksum file...")
            md5_hash = hashlib.md5()
            with self._tika_jar_path.open('rb') as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    md5_hash.update(chunk)
            with md5_file.open('w') as f:
                f.write(md5_hash.hexdigest())

    def _wait_for_server_to_start(self, timeout_seconds:float=60, check_every_seconds:float=1):
        """
        :param timeout_seconds: Maximum time in seconds to wait for the server to start.
        """
        logger.info("Waiting for Tika server to start...")
        start_time = time.time()
        while True:
            try:
                response = requests.get(self._server_url)
                if response.status_code == 200:
                    logger.info("Tika server started successfully.")
                    break
            except requests.ConnectionError:
                pass
            if time.time() - start_time > timeout_seconds:
                logger.error("Timed out waiting for Tika server to start.")
                raise TimeoutError("Timed out waiting for Tika server to start.")
            time.sleep(check_every_seconds)

    

############################# USAGE EXAMPLE############################# 

import click
from tqdm import tqdm


@click.command()
@click.argument('path', type=click.Path(exists=True, file_okay=True, dir_okay=True, path_type=Path))
@click.option('--tika-jar-path', type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), default='./bin/tika-server.jar', help='Path to the Tika server JAR file.')
def main(path: Path, tika_jar_path: Path):
    """
    Parses files at the given path using Apache Tika and prints their metadata.

    :param path: A file or directory path. If a directory is provided, all files in the directory will be parsed.
    :param tika_jar_path: Path to the Apache Tika server jar. Default is './bin/tika-server.jar'.
    """
    # Collect all file paths if the given path is a directory, else just use the single file path
    if path.is_dir():
        paths = [p for p in path.glob('**/*') if p.is_file()]
    else:
        paths = [path]

    print(f"Found {len(paths)} files")

    with TikaServerManager(tika_jar_path) as tika_manager:
        for p in tqdm(paths):
            metadata = tika_manager.parse_file(p)['metadata']
            print(p, metadata.get('Content-Type', 'Unknown content type'))

if __name__ == '__main__':
    # pylint: disable=no-value-for-parameter
    main()

No comments:

Post a Comment

Parse Wikipedia dump

""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...