Python Multi-threading en un conjunto de registros

Tengo un conjunto de registros de la base de datos (aproximadamente 1000 filas) y actualmente estoy iterando a través de ellos, para integrar más datos usando la consulta de db adicional para cada registro.

Al hacerlo, aumenta el tiempo total del proceso a unos 100 segundos.

Lo que quiero hacer es compartir la funcionalidad con 2-4 procesos.

Estoy usando Python 2.7 para tener compatibilidad con AWS Lambda.

def handler(event, context): try: records = connection.get_users() mandrill_client = open_mandrill_connection() mandrill_messages = get_mandrill_messages() mandrill_template = 'POINTS weekly-report-to-user' start_time = time.time() messages = build_messages(mandrill_messages, records) print("OVERALL: %s seconds ---" % (time.time() - start_time)) send_mandrill_message(mandrill_client, mandrill_template, messages) connection.close_database_connection() return "Process Completed" except Exception as e: print(e) 

La siguiente es la función que quiero poner en hilos:

 def build_messages(messages, records): for record in records: record = dict(record) stream = get_user_stream(record) data = compile_loyalty_stream(stream) messages['to'].append({ 'email': record['email'], 'type': 'to' }) messages['merge_vars'].append({ 'rcpt': record['email'], 'vars': [ { 'name': 'total_points', 'content': record['total_points'] }, { 'name': 'total_week', 'content': record['week_points'] }, { 'name': 'stream_greek', 'content': data['el'] }, { 'name': 'stream_english', 'content': data['en'] } ] }) return messages 

Lo que he intentado es importar la biblioteca de multiprocesamiento:

 from multiprocessing.pool import ThreadPool 

Creó un grupo dentro del bloque try y asignó la función dentro de este grupo:

 pool = ThreadPool(4) messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records)) def build_messages_in(a_b): build_msg(*a_b) def build_msg(a, b): return build_messages(a, b) def get_user_stream(record): response = [] i = 0 for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'], record['points'], record['action_creation']): information = get_reference(mod, mod_id) if information: response.append({ 'action': act, 'points': p, 'created': act_created, 'info': information }) if (act == 'invite_friend') \ or (act == 'donate') \ or (act == 'bonus_500_general') \ or (act == 'bonus_1000_general') \ or (act == 'bonus_500_cancel') \ or (act == 'bonus_1000_cancel'): response[i]['info']['date_ref'] = act_created response[i]['info']['slug'] = 'attiki' if (act == 'bonus_500_general') \ or (act == 'bonus_1000_general') \ or (act == 'bonus_500_cancel') \ or (act == 'bonus_1000_cancel'): response[i]['info']['title'] = '' i += 1 return response 

Finalmente quité el bucle for de la función build_message.

Lo que obtengo como resultado es que un objeto ‘Ninguno’ no es iterable.

¿Es esta la forma correcta de hacer esto?

Su código parece bastante profundo y, por lo tanto, no puede estar seguro de que el multithreading conduzca a un aumento de rendimiento cuando se aplique en un nivel alto. Por lo tanto, vale la pena profundizar hasta el punto que le dé la mayor latencia y considerar cómo abordar el cuello de botella específico. Vea aquí para una mayor discusión sobre las limitaciones de los hilos.

Si, por ejemplo, como comentamos en los comentarios, puede identificar una sola tarea que lleva mucho tiempo, entonces podría intentar paralelizarla utilizando multiprocessing lugar, para aprovechar más la potencia de su CPU. Este es un ejemplo genérico que, con suerte, es lo suficientemente simple de entender para reflejar sus consultas de Postgres sin entrar en su propia base de código; Creo que es una cantidad de esfuerzo inviable tbh.

 import multiprocessing as mp import time import random import datetime as dt MAILCHIMP_RESPONSE = [x for x in range(1000)] def chunks(l, n): n = max(1, n) return [l[i:i + n] for i in range(0, len(l), n)] def db_query(): ''' Delayed response from database ''' time.sleep(0.01) return random.random() def do_queries(query_list): ''' The function that takes all your query ids and executes them sequentially for each id ''' results = [] for item in query_list: query = db_query() # Your super-quick processing of the Postgres response processing_result = query * 2 results.append([item, processing_result]) return results def single_processing(): ''' As you do now - equivalent to get_reference ''' result_of_process = do_queries(MAILCHIMP_RESPONSE) return result_of_process def multi_process(chunked_data, queue): ''' Same as single_processing, except we put our results in queue rather than returning them ''' result_of_process = do_queries(chunked_data) queue.put(result_of_process) def multiprocess_handler(): ''' Divide and conquor on our db requests. We split the mailchimp response into a series of chunks and fire our queries simultaneously. Thus, each concurrent process has a smaller number of queries to make ''' num_processes = 4 # depending on cores/resources size_chunk = len(MAILCHIMP_RESPONSE) / num_processes chunked_queries = chunks(MAILCHIMP_RESPONSE, size_chunk) queue = mp.Queue() # This is going to combine all the results processes = [mp.Process(target=multi_process, args=(chunked_queries[x], queue)) for x in range(num_processes)] for p in processes: p.start() divide_and_conquor_result = [] for p in processes: divide_and_conquor_result.extend(queue.get()) return divide_and_conquor_result if __name__ == '__main__': start_single = dt.datetime.now() single_process = single_processing() print "Single process took {}".format(dt.datetime.now() - start_single) print "Number of records processed = {}".format(len(single_process)) start_multi = dt.datetime.now() multi = multiprocess_handler() print "Multi process took {}".format(dt.datetime.now() - start_multi) print "Number of records processed = {}".format(len(multi))