¿Cómo puedo lograr el rendimiento de 50 k / s al insertar mis datos en Cassandra mientras leo la entrada de un archivo csv?

Mi objective es boost el rendimiento de los datos de versiones en Cassandra. He usado lecturas y escrituras concurrentes y también he aumentado el tamaño de trozo que mi código lee en el archivo. Mi máquina es de 16 gb con 8 núcleos y sí, he cambiado el archivo yaml de Cassandra para 10k lecturas y escrituras concurrentes y cuando lo programé, descubrí que 10000 escrituras / lecturas toman menos de un segundo. Mi código mínimo y viable es:

import json import logging import os import sys from datetime import datetime from hashlib import sha256, sha512, sha1 import pandas as pd from cassandra import ConsistencyLevel, WriteTimeout from cassandra.cluster import (EXEC_PROFILE_DEFAULT, BatchStatement, Cluster, ExecutionProfile) from cassandra.concurrent import (execute_concurrent, execute_concurrent_with_args) from cassandra.query import SimpleStatement, dict_factory class PythonCassandraExample: def __init__(self, file_to_be_versioned, working_dir=os.getcwd(), mode='append'): self.cluster = None self.session = None self.keyspace = None self.log = None self.mode = mode self.file_to_be_versioned = file_to_be_versioned self.insert_patch = [] self.delete_patch = [] self.update_patch = [] self.working_dir = working_dir def __del__(self): self.cluster.shutdown() def createsession(self): profile = ExecutionProfile( row_factory=dict_factory, request_timeout=6000 ) self.cluster = Cluster( ['localhost'], connect_timeout=50, execution_profiles={ EXEC_PROFILE_DEFAULT: profile } ) self.session = self.cluster.connect(self.keyspace) def getsession(self): return self.session # How about Adding some log info to see what went wrong def setlogger(self): log = logging.getLogger() log.setLevel('INFO') handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s: %(message)s")) log.addHandler(handler) self.log = log # Create Keyspace based on Given Name def handle_error(self, exception): self.log.error("Failed to fetch user info: %s", exception) def createkeyspace(self, keyspace): """ :param keyspace: The Name of Keyspace to be created :return: """ # Before we create new lets check if exiting keyspace; we will drop that and create new self.log.info("creating keyspace...") self.session.execute(""" CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' } """ % keyspace) self.log.info("setting keyspace...") self.keyspace = keyspace self.session.set_keyspace(self.keyspace) def create_table_and_set_version(self, table_name): self.table_name = table_name.lower() table_select_query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name='{}' AND table_name='{}'".format( self.keyspace, self.table_name) print(table_select_query) table_exists = self.session.execute(table_select_query).one() self.log.info("Table exists: {}".format(table_exists)) if table_exists: self.log.info( "Datapackage already exists! Checking the last version") self.version = self.session.execute( "SELECT version FROM {} LIMIT 1".format(self.table_name)).one() self.log.info( "The version fetched is: {} of type: {}".format( self.version, type(self.version) ) ) if not self.version: self.version = 0 else: self.version = self.version['version'] else: self.log.info("Table didn't exist!") self.version = 0 self.target_version = int(str(self.version)) + 1 self.log.info( "Current and candidate versions are: {}, {}".format( self.version, self.target_version ) ) # c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version int, row varchar, row_hash varchar, PRIMARY KEY(id, version)) with clustering order by (version desc)".format( # self.table_name) c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version int, row_hash varchar, PRIMARY KEY(version, id))".format( self.table_name ) self.session.execute(c_sql) self.log.info("DP Table Created !!!") self.log.info("Current and candidate versions are: {}, {}".format( self.version, self.target_version)) def push_to_update_patch(self, update_patch_file, last_patch=False): if len(self.update_patch) >= 10000: with open(update_patch_file, mode='a') as json_file: json_file.writelines( self.update_patch ) del self.update_patch[:] if last_patch is True and len(self.update_patch) > 0: with open(update_patch_file, mode='a') as json_file: json_file.writelines( self.update_patch ) del self.update_patch[:] def push_to_insert_patch(self, insert_patch_file, last_patch=False): if len(self.insert_patch) >= 10000: with open(insert_patch_file, mode='a') as json_file: json_file.writelines( self.insert_patch ) del self.insert_patch[:] if last_patch is True and len(self.update_patch) > 0: with open(insert_patch_file, mode='a') as json_file: json_file.writelines( self.insert_patch ) del self.insert_patch[:] def push_to_delete_patch(self, delete_patch_file, last_patch=False): if len(self.delete_patch) >= 10000: with open(delete_patch_file, mode='a') as json_file: json_file.writelines( self.delete_patch ) del self.delete_patch[:] if last_patch is True and len(self.delete_patch) > 0: with open(delete_patch_file, mode='a') as json_file: json_file.writelines( self.delete_patch ) del self.delete_patch[:] def push_to_patch(self, key, value, mode='update'): return if key is None or value is None: raise ValueError( "Key or value or both not specified for making a patch. Exiting now." ) data = {} data["id"] = str(key) data["data"] = json.dumps(value, default=str) # convert dict to json str so that the patch is a list of line jsons. data = json.dumps(data, default=str) json_patch_file = os.path.join( self.working_dir, "version_{}_{}.json".format( self.target_version, mode ) ) if mode == 'update': self.update_patch.append( data + "\n" ) self.push_to_update_patch( json_patch_file ) if mode == 'insert': self.insert_patch.append( data + "\n" ) self.push_to_insert_patch( json_patch_file ) if mode == 'delete': self.delete_patch.append( data + "\n" ) self.push_to_delete_patch( json_patch_file ) def clone_version(self): if self.mode == 'replace': return self.log.info("Cloning version") start_time = datetime.utcnow() if self.version == 0: return insert_sql = self.session.prepare( ( "INSERT INTO {} ({}, {}, {}) VALUES (?,?,?)" ).format( self.table_name, "id", "version", "row_hash" ) ) futures = [] current_version_query = "SELECT id, row_hash FROM {} WHERE version={}".format( self.table_name, self.version ) current_version_rows = self.session.execute( current_version_query ) for current_version_row in current_version_rows: params = ( current_version_row['id'], self.target_version, current_version_row['row_hash'] ) futures.append( ( insert_sql, params ) ) self.log.info( "Time taken to clone the version is: {}".format( datetime.utcnow() - start_time ) ) def hash_string(self, value): return (sha1(str(value).encode('utf-8')).hexdigest()) def hash_row(self, row): row_json = json.dumps(row, default=str) return (self.hash_string(row_json)) def insert_data(self, generate_diff=False): self.generate_diff = generate_diff destination = self.file_to_be_versioned chunksize = 100000 concurrency_value = 1000 patch_length_for_cql = chunksize chunks = pd.read_csv(destination, chunksize=chunksize) chunk_counter = 0 insert_sql = self.session.prepare( ( "INSERT INTO {} ({}, {}, {}) VALUES (?,?,?)" ).format( self.table_name, "id", "version", "row_hash" ) ) select_sql = self.session.prepare( ( "SELECT id, version, row_hash FROM {} WHERE version=? AND id=?" ).format( self.table_name ) ) futures = [] check_for_patch = [] #this list comprises rows with ids and values for checking whether its an update/insert rows_for_checking_patch = [] start_time = datetime.utcnow() for df in chunks: rows_for_checking_patch = df.values.tolist() chunk_counter += 1 df["row_hash"] = df.apply( self.hash_row ) df["key"] = df["column_test_3"].apply( self.hash_string ) keys = list(df["key"]) row_hashes = list(df["row_hash"]) start_time_de_params = datetime.utcnow() for i in range(chunksize): row_check = None params = ( str(keys[i]), self.target_version, str(row_hashes[i]) ) check_for_patch_params = ( self.version, str(keys[i]) ) check_for_patch.append( ( select_sql, check_for_patch_params ) ) futures.append( ( insert_sql, params ) ) self.log.info("Time for params: {}".format(datetime.utcnow() - start_time_de_params)) if len(check_for_patch) >= patch_length_for_cql: start_time_de_update = datetime.utcnow() results = execute_concurrent( self.session, check_for_patch, concurrency=concurrency_value, raise_on_first_error=False ) self.log.info("Time for just the query: {}".format(datetime.utcnow() - start_time_de_update)) row_counter_for_patch = 0 for (success, result) in results: if not result: self.push_to_patch( keys[row_counter_for_patch], rows_for_checking_patch[row_counter_for_patch], mode='insert' ) row_counter_for_patch += 1 continue if not success: # result will be an Exception self.log.error("Error has occurred in insert cql") self.handle_error(result) id_to_be_compared = result[0]["id"] row_hash_to_be_compared = result[0]["row_hash"] if (row_hash_to_be_compared != row_hashes[row_counter_for_patch]): self.push_to_patch( id_to_be_compared, rows_for_checking_patch[row_counter_for_patch]["row"], mode='update' ) row_counter_for_patch += 1 del check_for_patch[:] del rows_for_checking_patch[:] row_counter_for_patch = 0 self.log.info("Time for check patch: {}".format( datetime.utcnow() - start_time_de_update )) if (len(futures) >= patch_length_for_cql): start_time_de_insert = datetime.utcnow() results = execute_concurrent( self.session, futures, concurrency=concurrency_value, raise_on_first_error=False ) for (success, result) in results: if not success: # result will be an Exception self.log.error("Error has occurred in insert cql") self.handle_error(result) del futures[:] self.log.info("Time for insert patch: {}".format( datetime.utcnow() - start_time_de_insert )) self.log.info(chunk_counter) # self.log.info("This chunk got over in {}".format(datetime.utcnow() - start_time)) if len(check_for_patch) > 0: results = execute_concurrent( self.session, check_for_patch, concurrency=concurrency_value, raise_on_first_error=False ) row_counter_for_patch = 0 for (success, result) in results: if not result: self.push_to_patch( rows_for_checking_patch[row_counter_for_patch]["id"], rows_for_checking_patch[row_counter_for_patch]["row"], mode='insert' ) row_counter_for_patch += 1 continue if not success: # result will be an Exception self.log.error("Error has occurred in insert cql") self.handle_error(result) id_to_be_compared = result[0]["id"] row_hash_to_be_compared = result[0]["row_hash"] if (row_hash_to_be_compared != rows_for_checking_patch[row_counter_for_patch]["row_hash"]): self.push_to_patch( id_to_be_compared, rows_for_checking_patch[row_counter_for_patch]["row"], mode='update' ) row_counter_for_patch += 1 del check_for_patch[:] del rows_for_checking_patch[:] if len(futures) > 0: # in case the last dataframe has #rows < 10k. results = execute_concurrent( self.session, futures, concurrency=concurrency_value, raise_on_first_error=False ) for (success, result) in results: if not success: self.handle_error(result) del futures[:] self.log.info(chunk_counter) # Check the delete patch if self.generate_diff is True and self.mode is 'replace' and self.version is not 0: self.log.info("We got to find the delete patch!") start_time = datetime.utcnow() current_version_query = "SELECT id, row, row_hash FROM {} WHERE version={}".format( self.table_name, self.version ) current_version_rows = self.session.execute( current_version_query ) for current_version_row in current_version_rows: row_check_query = "SELECT {} FROM {} WHERE {}={} AND {}='{}' ".format( "id", self.table_name, "version", self.target_version, "id", current_version_row.id ) row_check = self.session.execute(row_check_query).one() if row_check is not None: # row exists in both version. continue self.push_to_patch( current_version_row.id, current_version_row.id, mode="delete" ) print("Complete insert's duration is: {}".format( datetime.utcnow() - start_time) ) # Calling last_patch for all remaining diffs modes = [ 'update', 'insert', 'delete' ] for mode in modes: json_patch_file = os.path.join( self.working_dir, "version_{}_{}.json".format( self.target_version, mode ) ) if mode == 'update': self.push_to_update_patch( json_patch_file, last_patch=True ) if mode == 'insert': self.push_to_insert_patch( json_patch_file, last_patch=True ) if mode == 'delete': self.push_to_delete_patch( json_patch_file, last_patch=True ) if __name__ == '__main__': example1 = PythonCassandraExample( file_to_be_versioned="hundred_million_eleven_columns.csv" ) example1.createsession() example1.setlogger() example1.createkeyspace('sat_athena_one') example1.create_table_and_set_version('five_hundred_rows') example1.clone_version() example1.insert_data(generate_diff=True) 

Tengo un archivo csv de 100M filas y 11 cols. El script utilizado para generar dicho archivo es:

 import csv import sys import os import pandas as pd file_name = "hundred_million_eleven_columns.csv" rows_list = [] chunk_counter = 1 headers = [ "column_test_1", "column_test_2", "column_test_3", "column_test_4", "column_test_5", "column_test_6", "column_test_7", "column_test_8", "column_test_9", "column_test_10", "column_test_11", ] file_exists = os.path.isfile(file_name) with open(file_name, 'a') as csvfile: writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n', fieldnames=headers) if not file_exists: writer.writeheader() # file doesn't exist yet, write a header for i in range(100000000): dict1 = [ i, i+1, i+2, i+3, i+4, i+5, i+6, i+7, i+8, i+9, i+10 ] # get input row in dictionary format # key = col_name rows_list.append(dict1) if len(rows_list) == 100000: df = pd.DataFrame(rows_list) df.to_csv(file_name, mode='a', index=False, header=False) del rows_list[:] del df print(chunk_counter) chunk_counter += 1 if len(rows_list) > 0: df = pd.DataFrame(rows_list) df.to_csv(file_name, mode='a', index=False, header=False) del rows_list[:] del df print(chunk_counter) chunk_counter += 1 

El archivo yaml de mi cassandra está aquí.

Asegúrate de que tu código pueda generar tanto a 50k. Si elimina las ejecuciones, ¿puede incluso leer el CSV y generar el sha tan rápido? La instancia de AC * en ese host de tamaño con SSD debería poder realizar escrituras de 50k / seg, pero hay muchas cosas que suceden fuera de las escrituras de C * que probablemente son parte del problema.

Si sus lecturas / escrituras concurrentes están por encima de 128, tendrá algunos problemas serios. En un sistema que puede manejarlo, 64 incluso es suficiente para pasar 200k escribe un segundo. De hecho, vas a empeorar las cosas con ese entorno tan alto. No hay IO involucrado en eso, por lo que, como indica la documentación, 8 veces su cuenta principal es un buen valor. Incluso recomendaría bajar la concurrencia de 10k a 1024 o incluso menos. Puedes jugar con diferentes configuraciones para ver cómo afecta las cosas.

Asegúrese de que Python se compiló con cython en su instalación, ya que de lo contrario se dominará en la serialización. Hablar del driver Python es el más lento, así que tenlo en cuenta.

Tu locking en el sha puede ser la mayoría del tiempo. Sin realizar un seguimiento perfecto, simplemente inténtelo con un valor fijo para ver la diferencia.

“Mi máquina”: ¿es este un clúster de un solo nodo? Si su disponibilidad de lanzamiento en la ventana también puede desactivar durable_writes en el espacio de teclas para acelerar un poco las escrituras. Falta la configuración del montón, pero asegúrese de tener un mínimo de 8 gb, incluso si se trata de un host bastante pequeño, Cassandra necesita memoria. Si no está leyendo, considere desactivar el caché de teclas y tal vez desactivar las compactaciones mientras el trabajo se está ejecutando y luego habilitarlo.

Comentario recomienda 8 * número de núcleos.

Por otro lado, dado que las escrituras casi nunca están vinculadas a IO, el número ideal de “concurrent_writes” depende de la cantidad de núcleos en su sistema; (8 * number_of_cores) es una buena regla de oro.

64 es correcto en la máquina 8core.

  • concurrent_reads: 64
  • escrituras concurrentes: 64
  • concurrent_counter_writes: 64

Es posible que se recomienden estos límites porque hay muchas otras operaciones de io excepto IO normal. ej.) escritura de registro de confirmación, almacenamiento en caché, compactación, replicación, vista (si existe)

Algunas reglas de oro

  • disk_optimization_strategy: ssd // Si tu disco es hdd, cambia el valor para girar
  • Utilice el disco de registro de confirmación dedicado. ssd recomendado.
  • más discos = mejor rendimiento