No se puede pasar la tupla de cerdo a Python UDF

Tengo master.txt que tiene registros de 10K, por lo que cada línea será una tupla y toda la misma debe ser pasada a Python UDF. Dado que tiene varios registros, el almacenamiento de p2preportmap obtiene el siguiente error. Por favor ayuda

El error es el siguiente:

No se puede abrir el iterador para el alias p2preportmap. Error de backend: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Scalar tiene más de una fila en la salida. 1º: (010301, MTS, MM), 2º: (010B06, MTS, TN) (causa común: “ÚNETE”, luego “FOREACH … GENERATE foo.bar” debería ser “foo :: bar”)

Pig Script es el siguiente:

REGISTER 'smsiuc_udf.py' using streaming_python as smsiuc_udfs; cdrs = load '2016040111*' USING PigStorage('|','-tagFile') ; mastergtrec = load 'master.txt' USING PigStorage(',','-tagFile'); mastergt = FOREACH mastergtrec GENERATE (chararray) UPPER($1) as opcdpc, (chararray) UPPER($2) as gtoptname,(chararray) UPPER($3) as gtoptcircle; mastergttup = FOREACH mastergt generate TOTUPLE(opcdpc,gtoptname,gtoptcircle) as mstgttup; cdrrecord = FOREACH cdrs GENERATE (chararray) UPPER($1) as aparty, (chararray) UPPER($2) as bparty,$3 as smssentdate,$4 as smssenttime,($29=='6' ? 'S' : 'F') as status,(chararray) UPPER($26) as srcgt,(chararray) UPPER($27) as destgt,($12=='405899136999995' ? 'MTSDEL-CDMA' : ($12=='919875089998' ? 'MTSRAJ-GSM' : ($12=='405899150999995' ? 'MTSCHN-CDMA' : $12) ) ) as smscgt, (chararray)$0 as cdrfname,(chararray) $13 as prepost; filteredp2pcdrs = FILTER cdrrecord by smsiuc_udfs.pullp2pcdrs(aparty,bparty,srcgt,destgt) and status == 'S' and SUBSTRING(smssentdate,4,6) == '$MON'; groupp2pcdrs = GROUP filteredp2pcdrs by (srcgt,destgt,aparty,bparty,smscgt,status,prepost); distinctp2pcdrs= FOREACH groupp2pcdrs { uniq = DISTINCT filteredp2pcdrs.(srcgt,destgt,aparty,bparty,smscgt,status,prepost); GENERATE FLATTEN(group),COUNT(uniq) as cnt; }; p2preportmap = FOREACH distinctp2pcdrs GENERATE smsiuc_udfs.p2preport(srcgt,destgt,aparty,bparty,mastergttup ),smscgt,status,prepost,cnt 

Esto se puede hacer agregando una columna ficticia y luego agrupándola.

dummmy = foreach p2preportmap genera 1, $ 0, $ 1 ….

agrupado = grupo ficticio por $ 0

Déjame darte un ejemplo tengo dos relaciones A y B

UNA

 1,2,3 3,4,5 4,5,6 

segundo

 1 2 3 1 2 3 1 2 3 

Ahora quiero un udf de python que busque la primera columna de la salida de impresión A algo como esto a continuación.

  ((1,{(1,2,3)})) ((2,)) ((3,{(3,4,5)})) ((1,{(1,2,3)})) ((2,)) ((3,{(3,4,5)})) ((1,{(1,2,3)})) ((2,)) ((3,{(3,4,5)})) 

Así que primero agrupo A por primera columna y luego lo agrupo por 1 para que tenga una sola fila

 c = group A by $0 e = group c by 1 

Python udf es algo como abajo

 def pythonudf(value,map): print map temp = None for a in map: if a[0] == value: temp = a[1] return value,temp 

ahora usas este udf

 D = foreach B generate myudf.pythonudf($0,e.$1);