Python, SQLite y threading.

Estoy trabajando en una aplicación que recostackrá datos a través de HTTP desde varios lugares, almacenará los datos localmente y luego los entregaré a través de HTTP.

Así que estaba mirando lo siguiente. Mi aplicación primero creará varios subprocesos que recostackrán datos en un intervalo específico y almacenarán esos datos localmente en una base de datos SQLite.

Luego, en el hilo principal, inicie una aplicación CherryPy que consultará esa base de datos SQLite y servirá los datos.

Mi problema es: ¿cómo manejo las conexiones a la base de datos SQLite desde mis hilos y desde la aplicación CherryPy?

Si hago una conexión por hilo a la base de datos, ¿también podré crear / usar una base de datos en memoria?

Respuesta corta: no utilice Sqlite3 en una aplicación de subprocesos.

Las bases de datos Sqlite3 se escalan bien para el tamaño, pero terriblemente para la concurrencia. Usted estará plagado de errores “La base de datos está bloqueada”.

Si lo hace, necesitará una conexión por subproceso, y debe asegurarse de que estas conexiones se limpien después de ellas. Esto se maneja tradicionalmente usando sesiones locales de subproceso, y se realiza bastante bien (por ejemplo) usando ScopedSession de SQLAlchemy. Yo usaría esto si fuera usted, incluso si no está usando las características ORM de SQLAlchemy.

Puedes usar algo así.

“… cree varios subprocesos que recostackrán datos en un intervalo específico y almacenará en caché esos datos localmente en una base de datos sqlite. Luego, en el hilo principal, inicie una aplicación CherryPy que consultará esa db sqlite y servirá los datos”.

No pierdas mucho tiempo en hilos. Las cosas que estás describiendo son simplemente procesos de SO. Simplemente inicie los procesos ordinarios para hacer la recostackción y ejecutar Cherry Py.

No tiene un uso real para subprocesos concurrentes en un solo proceso para esto. La recostackción de datos en un intervalo específico, cuando se realiza con procesos simples del sistema operativo, puede ser progtwigda de manera muy simple. Cron, por ejemplo, hace un gran trabajo de esto.

Una aplicación CherryPy, también, es un proceso de sistema operativo, no un solo hilo de un proceso mayor.

Solo usa procesos, los hilos no te ayudarán.

Dependiendo de la aplicación, la base de datos podría ser una sobrecarga real. Si estamos hablando de datos volátiles, tal vez podría omitir completamente la comunicación a través de DB y compartir los datos entre el proceso de recolección de datos y los procesos de servicio de datos a través de IPC. Esta no es una opción si los datos deben persistir, por supuesto.

Dependiendo de la velocidad de datos, sqlite podría ser exactamente la forma correcta de hacer esto. La base de datos completa está bloqueada para cada escritura, por lo que no va a escalar a miles de escrituras simultáneas por segundo. Pero si solo tiene unos pocos, es la forma más segura de asegurarse de no sobrescribirse unos a otros.

Esta prueba se realiza para determinar la mejor manera de escribir y leer desde la base de datos SQLite. Seguimos 3 enfoques a continuación

  1. Lee y escribe sin ningún hilo (los métodos con la palabra normal en él)
  2. Leer y escribir con hilos
  3. Leer y escribir con procesos.

Nuestro conjunto de datos de muestra es un conjunto de datos OHLC generado ficticio con un símbolo, marca de tiempo y 6 valores falsos para ohlc y volumefrom, volumeto

Lee

  1. El método normal toma alrededor de 0.25 segundos para leer
  2. El método roscado tarda 10 segundos.
  3. El procesamiento toma 0.25 segundos para leer

Ganador: Procesamiento y Normal

Escribe

  1. El método normal toma alrededor de 1.5 segundos para escribir
  2. Método roscado tarda unos 30 segundos
  3. El procesamiento tarda unos 30 segundos

Ganador: Normal

Nota: Todos los registros no se escriben utilizando los métodos de escritura procesados ​​y en subprocesos. Los métodos de escritura con hilos y procesados ​​obviamente se ejecutan en los errores de la base de datos bloqueados, ya que las escrituras se ponen en cola. SQlite solo pone en cola las escrituras en un determinado umbral y luego lanza la base de datos de indicación sqlite3.OperationalError. La forma ideal es volver a intentar insertar el mismo fragmento de nuevo, pero no tiene sentido, ya que la ejecución del método para la inserción paralela toma más tiempo que una lectura secuencial incluso sin volver a intentar las inserciones bloqueadas / fallidas Sin volver a intentarlo, el 97% de las filas se escribieron y aún tomó 10 veces más tiempo que una escritura secuencial

Estrategias para llevar:

  1. Prefiere leer SQLite y escribirlo en el mismo hilo

  2. Si debe realizar subprocesos múltiples, use el multiprocesamiento para leer que tenga más o menos el mismo rendimiento y difiera a las operaciones de escritura de un solo hilo.

  3. NO USE HILO para leer y escribir, ya que es 10 veces más lento en ambos, puede agradecerle a GIL por eso.

Aquí está el código para la prueba completa

import sqlite3 import time import random import string import os import timeit from functools import wraps from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import threading import os database_file = os.path.realpath('../files/ohlc.db') create_statement = 'CREATE TABLE IF NOT EXISTS database_threading_test (symbol TEXT, ts INTEGER, o REAL, h REAL, l REAL, c REAL, vf REAL, vt REAL, PRIMARY KEY(symbol, ts))' insert_statement = 'INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)' select = 'SELECT * from database_threading_test' def time_stuff(some_function): def wrapper(*args, **kwargs): t0 = timeit.default_timer() value = some_function(*args, **kwargs) print(timeit.default_timer() - t0, 'seconds') return value return wrapper def generate_values(count=100): end = int(time.time()) - int(time.time()) % 900 symbol = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10)) ts = list(range(end - count * 900, end, 900)) for i in range(count): yield (symbol, ts[i], random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1e9, random.random() * 1e5) def generate_values_list(symbols=1000,count=100): values = [] for _ in range(symbols): values.extend(generate_values(count)) return values @time_stuff def sqlite_normal_read(): """ 100k records in the database, 1000 symbols, 100 rows First run 0.25139795300037804 seconds Second run Third run """ conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) try: with conn: conn.execute(create_statement) results = conn.execute(select).fetchall() print(len(results)) except sqlite3.OperationalError as e: print(e) @time_stuff def sqlite_normal_write(): """ 1000 symbols, 100 rows First run 2.279409104000024 seconds Second run 2.3364172020001206 seconds Third run """ l = generate_values_list() conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) try: with conn: conn.execute(create_statement) conn.executemany(insert_statement, l) except sqlite3.OperationalError as e: print(e) @time_stuff def sequential_batch_read(): """ We read all the rows for each symbol one after the other in sequence First run 3.661222331999852 seconds Second run 2.2836898810001003 seconds Third run 0.24514851899994028 seconds Fourth run 0.24082150699996419 seconds """ conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) try: with conn: conn.execute(create_statement) symbols = conn.execute("SELECT DISTINCT symbol FROM database_threading_test").fetchall() for symbol in symbols: results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall() except sqlite3.OperationalError as e: print(e) def sqlite_threaded_read_task(symbol): results = [] conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) try: with conn: results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall() except sqlite3.OperationalError as e: print(e) finally: return results def sqlite_multiprocessed_read_task(symbol): results = [] conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) try: with conn: results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall() except sqlite3.OperationalError as e: print(e) finally: return results @time_stuff def sqlite_threaded_read(): """ 1000 symbols, 100 rows per symbol First run 9.429676861000189 seconds Second run 10.18928106400017 seconds Third run 10.382290903000467 seconds """ conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall() with ThreadPoolExecutor(max_workers=8) as e: results = e.map(sqlite_threaded_read_task, symbols, chunksize=50) for result in results: pass @time_stuff def sqlite_multiprocessed_read(): """ 1000 symbols, 100 rows First run 0.2484774920012569 seconds!!! Second run 0.24322178500005975 seconds Third run 0.2863524549993599 seconds """ conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall() with ProcessPoolExecutor(max_workers=8) as e: results = e.map(sqlite_multiprocessed_read_task, symbols, chunksize=50) for result in results: pass def sqlite_threaded_write_task(n): """ We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors """ conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) data = list(generate_values()) try: with conn: conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data) except sqlite3.OperationalError as e: print("Database locked",e) finally: conn.close() return len(data) def sqlite_multiprocessed_write_task(n): """ We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors """ conn = sqlite3.connect(os.path.realpath('../files/ohlc.db')) data = list(generate_values()) try: with conn: conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data) except sqlite3.OperationalError as e: print("Database locked",e) finally: conn.close() return len(data) @time_stuff def sqlite_threaded_write(): """ Did not write all the results but the outcome with 97400 rows written is still this... Takes 20x the amount of time as a normal write 1000 symbols, 100 rows First run 28.17819765000013 seconds Second run 25.557972323000058 seconds Third run """ symbols = [i for i in range(1000)] with ThreadPoolExecutor(max_workers=8) as e: results = e.map(sqlite_threaded_write_task, symbols, chunksize=50) for result in results: pass @time_stuff def sqlite_multiprocessed_write(): """ 1000 symbols, 100 rows First run 30.09209805699993 seconds Second run 27.502465319000066 seconds Third run """ symbols = [i for i in range(1000)] with ProcessPoolExecutor(max_workers=8) as e: results = e.map(sqlite_multiprocessed_write_task, symbols, chunksize=50) for result in results: pass sqlite_normal_write()