diff --git a/nsis.py b/nsis.py
index e752bc863..06ca07655 100644
--- a/nsis.py
+++ b/nsis.py
@@ -24,7 +24,7 @@
import os
import sys
from py2exe.build_exe import py2exe
-
+# import py2exe
nsi_base_script = r"""\
; base.nsi
diff --git a/pyfepdf.py b/pyfepdf.py
index acd518170..2ec035722 100644
--- a/pyfepdf.py
+++ b/pyfepdf.py
@@ -120,6 +120,9 @@ class FEPDF:
(1, 6, 11, 19, 51): 'Factura',
(2, 7, 12, 20, 52): 'Nota de Débito',
(3, 8, 13, 21, 53): 'Nota de Crédito',
+ (201, 206, 211): u'Factura de Crédito MiPyMEs',
+ (202, 207, 212): u'Nota de Débito MiPyMEs',
+ (203, 208, 213): u'Nota de Crédito MiPyMEs',
(4, 9, 15, 54): 'Recibo',
(10, 5): 'Nota de Venta al contado',
(60, 61): 'Cuenta de Venta y Líquido producto',
@@ -127,13 +130,13 @@ class FEPDF:
(91, ): 'Remito',
(39, 40): '???? (R.G. N° 3419)'}
- letras_fact = {(1, 2, 3, 4, 5, 39, 60, 63): 'A',
- (6, 7, 8, 9, 10, 40, 61, 64): 'B',
- (11, 12, 13, 15): 'C',
+ letras_fact = {(1, 2, 3, 4, 5, 39, 60, 63, 201, 202, 203): 'A',
+ (6, 7, 8, 9, 10, 40, 61, 64, 206, 207, 208): 'B',
+ (11, 12, 13, 15, 211, 212, 213): 'C',
(51, 52, 53, 54): 'M',
(19, 20, 21): 'E',
(91, ): 'R',
- }
+ }
def __init__(self):
self.Version = __version__
@@ -517,13 +520,13 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
fact[k] = ds.replace('
', '\n')
# divido las observaciones por linea:
- if fact.get('obs_generales') and 'obs' not in f and 'ObservacionesGenerales1' not in f:
+ if fact.get('obs_generales') and not f.has_key('obs') and not f.has_key('ObservacionesGenerales1'):
obs = "\nObservaciones:\n\n" + fact['obs_generales']
# limpiar texto (campos dbf) y reemplazar saltos de linea:
obs = obs.replace('\x00', '').replace('
', '\n')
for ds in f.split_multicell(obs, 'Item.Descripcion01'):
li_items.append(dict(codigo=None, ds=ds, qty=None, umed=None, precio=None, importe=None))
- if fact.get('obs_comerciales') and 'obs_comerciales' not in f and 'ObservacionesComerciales1' not in f:
+ if fact.get('obs_comerciales') and not f.has_key('obs_comerciales') and not f.has_key('ObservacionesComerciales1'):
obs = "\nObservaciones Comerciales:\n\n" + fact['obs_comerciales']
# limpiar texto (campos dbf) y reemplazar saltos de linea:
obs = obs.replace('\x00', '').replace('
', '\n')
@@ -534,13 +537,13 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
permisos = ['Codigo de Despacho %s - Destino de la mercadería: %s' % (
p['id_permiso'], self.paises.get(p['dst_merc'], p['dst_merc']))
for p in fact.get('permisos', [])]
- #import dbg; dbg.set_trace()
- if 'permiso.id1' in f and "permiso.delivery1" in f:
+
+ if f.has_key('permiso.id1') and f.has_key("permiso.delivery1"):
for i, p in enumerate(fact.get('permisos', [])):
self.AgregarDato("permiso.id%d" % (i + 1), p['id_permiso'])
pais_dst = self.paises.get(p['dst_merc'], p['dst_merc'])
self.AgregarDato("permiso.delivery%d" % (i + 1), pais_dst)
- elif 'permisos' not in f and permisos:
+ elif not f.has_key('permisos') and permisos:
obs = "\nPermisos de Embarque:\n\n" + '\n'.join(permisos)
for ds in f.split_multicell(obs, 'Item.Descripcion01'):
li_items.append(dict(codigo=None, ds=ds, qty=None, umed=None, precio=None, importe=None))
@@ -549,7 +552,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
# agrego comprobantes asociados
cmps_asoc = ['%s %s %s' % self.fmt_fact(c['cbte_tipo'], c['cbte_punto_vta'], c['cbte_nro'])
for c in fact.get('cbtes_asoc', [])]
- if 'cmps_asoc' not in f and cmps_asoc:
+ if not f.has_key('cmps_asoc') and cmps_asoc:
obs = "\nComprobantes Asociados:\n\n" + '\n'.join(cmps_asoc)
for ds in f.split_multicell(obs, 'Item.Descripcion01'):
li_items.append(dict(codigo=None, ds=ds, qty=None, umed=None, precio=None, importe=None))
@@ -572,7 +575,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
# mostrar las validaciones no excluyentes de AFIP (observaciones)
if fact.get('motivos_obs') and fact['motivos_obs'] != '00':
- if 'motivos_ds.L' not in f:
+ if not f.has_key('motivos_ds.L'):
motivos_ds = "Irregularidades observadas por AFIP (F136): %s" % fact['motivos_obs']
else:
motivos_ds = "%s" % fact['motivos_obs']
@@ -583,7 +586,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
if letra_fact in ('A', 'M'):
msg_no_iva = "\nEl IVA discriminado no puede computarse como Crédito Fiscal (RG2485/08 Art. 30 inc. c)."
- if 'leyenda_credito_fiscal' not in f and motivos_ds:
+ if not f.has_key('leyenda_credito_fiscal') and motivos_ds:
motivos_ds += msg_no_iva
copias = {1: 'Original', 2: 'Duplicado', 3: 'Triplicado'}
@@ -687,7 +690,7 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
if it['codigo'] is not None:
f.set('Item.Codigo%02d' % li, it['codigo'])
if it['umed'] is not None:
- if it['umed'] and "Item.Umed_ds01" in f:
+ if it['umed'] and f.has_key("Item.Umed_ds01"):
# recortar descripción:
umed_ds = self.umeds_ds.get(int(it['umed']))
s = f.split_multicell(umed_ds, 'Item.Umed_ds01')
@@ -829,9 +832,9 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
# Datos del pie de factura (obtenidos desde AFIP):
f.set('motivos_ds', motivos_ds)
- if 'motivos_ds1' in f and motivos_ds:
+ if f.has_key('motivos_ds1') and motivos_ds:
if letra_fact in ('A', 'M'):
- if 'leyenda_credito_fiscal' in f:
+ if f.has_key('leyenda_credito_fiscal'):
f.set('leyenda_credito_fiscal', msg_no_iva)
for i, txt in enumerate(f.split_multicell(motivos_ds, 'motivos_ds1')):
f.set('motivos_ds%d' % (i + 1), txt)
@@ -863,13 +866,13 @@ def ProcesarPlantilla(self, num_copias=3, lineas_max=36, qty_pos='izq'):
f.set('estado', "") # compatibilidad hacia atras
# colocar campos de observaciones (si no van en ds)
- if 'observacionesgenerales1' in f and 'obs_generales' in fact:
+ if f.has_key('observacionesgenerales1') and 'obs_generales' in fact:
for i, txt in enumerate(f.split_multicell(fact['obs_generales'], 'ObservacionesGenerales1')):
f.set('ObservacionesGenerales%d' % (i + 1), txt)
- if 'observacionescomerciales1' in f and 'obs_comerciales' in fact:
+ if f.has_key('observacionescomerciales1') and 'obs_comerciales' in fact:
for i, txt in enumerate(f.split_multicell(fact['obs_comerciales'], 'ObservacionesComerciales1')):
f.set('ObservacionesComerciales%d' % (i + 1), txt)
- if 'enletras1' in f and 'en_letras' in fact:
+ if f.has_key('enletras1') and 'en_letras' in fact:
for i, txt in enumerate(f.split_multicell(fact['en_letras'], 'EnLetras1')):
f.set('EnLetras%d' % (i + 1), txt)
diff --git a/pyqr.py b/pyqr.py
new file mode 100644
index 000000000..389126be4
--- /dev/null
+++ b/pyqr.py
@@ -0,0 +1,180 @@
+#!/usr/bin/python
+# -*- coding: latin-1 -*-
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation; either version 3, or (at your option) any later
+# version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+
+"M?dulo para generar c?digos QR"
+
+__author__ = "Mariano Reingart "
+__copyright__ = "Copyright (C) 2011 Mariano Reingart"
+__license__ = "LGPL 3.0"
+__version__ = "1.03b"
+
+import base64
+import json
+import os
+import sys
+import tempfile
+
+import qrcode
+
+
+TEST_QR_DATA = """
+eyJ2ZXIiOjEsImZlY2hhIjoiMjAyMC0xMC0xMyIsImN1aXQiOjMwMDAwMDAwMDA3LCJwdG9WdGEiOj
+EwLCJ0aXBvQ21wIjoxLCJucm9DbXAiOjk0LCJpbXBvcnRlIjoxMjEwMCwibW9uZWRhIjoiRE9MIiwi
+Y3R6Ijo2NSwidGlwb0RvY1JlYyI6ODAsIm5yb0RvY1JlYyI6MjAwMDAwMDAwMDEsInRpcG9Db2RBdX
+QiOiJFIiwiY29kQXV0Ijo3MDQxNzA1NDM2NzQ3Nn0=""".replace("\n", "")
+
+
+class PyQR:
+ "Interfaz para generar Codigo QR de Factura Electr?nica"
+ _public_methods_ = ['GenerarImagen', 'CrearArchivo',
+ ]
+ _public_attrs_ = ['Version', 'Excepcion', 'Traceback', "URL", "Archivo",
+ 'qr_ver', 'box_size', 'border', 'error_correction',
+ ]
+
+ _reg_progid_ = "PyQR"
+ _reg_clsid_ = "{0868A2B6-2DC7-478D-8884-A10E92C588DE}"
+
+ URL = "https://www.afip.gob.ar/fe/qr/?p=%s"
+ Archivo = "qr.png"
+
+ # qrencode default parameters:
+ qr_ver = 1
+ box_size = 10
+ border = 4
+ error_correction = qrcode.constants.ERROR_CORRECT_L
+
+ def __init__(self):
+ self.Version = __version__
+ self.Exception = self.Traceback = ""
+
+ def CrearArchivo(self):
+ """Crea un nombre de archivo temporal"""
+ # para evitar errores de permisos y poder generar varios qr simultaneos
+ tmp = tempfile.NamedTemporaryFile(prefix="qr_afip_",
+ suffix=".png",
+ delete=False)
+ self.Archivo = tmp.name
+ return self.Archivo
+
+ def GenerarImagen(self, ver=1,
+ fecha="2020-10-13",
+ cuit=30000000007,
+ pto_vta=10, tipo_cmp=1, nro_cmp=94,
+ importe=12100, moneda="PES", ctz=1.000,
+ tipo_doc_rec=80, nro_doc_rec=20000000001,
+ tipo_cod_aut="E", cod_aut=70417054367476,
+ ):
+ "Generar una im?gen con el c?digo QR"
+ # basado en: https://www.afip.gob.ar/fe/qr/especificaciones.asp
+ datos_cmp = {
+ "ver": int(ver),
+ "fecha": fecha,
+ "cuit": int(cuit),
+ "ptoVta": int(pto_vta),
+ "tipoCmp": int(tipo_cmp),
+ "nroCmp": int(nro_cmp),
+ "importe": float(importe),
+ "moneda": moneda,
+ "ctz": float(ctz),
+ "tipoDocRec": int(tipo_doc_rec),
+ "nroDocRec": int(nro_doc_rec),
+ "tipoCodAut": tipo_cod_aut,
+ "codAut": int(cod_aut),
+ }
+
+ # convertir a representación json y codificar en base64:
+ datos_cmp_json = json.dumps(datos_cmp)
+ url = self.URL % (base64.b64encode(datos_cmp_json))
+
+ qr = qrcode.QRCode(
+ version=self.qr_ver,
+ error_correction=self.error_correction,
+ box_size=self.box_size,
+ border=self.border,
+ )
+ qr.add_data(url)
+ qr.make(fit=True)
+
+ img = qr.make_image(fill_color="black", back_color="white")
+
+ img.save(self.Archivo, "PNG")
+ return url
+
+
+if __name__ == '__main__':
+
+ if "--register" in sys.argv or "--unregister" in sys.argv:
+ import win32com.server.register
+ win32com.server.register.UseCommandLine(PyQR)
+ elif "/Automate" in sys.argv:
+ try:
+ # MS seems to like /automate to run the class factories.
+ import win32com.server.localserver
+ win32com.server.localserver.serve([PyQR._reg_clsid_])
+ except Exception:
+ raise
+ else:
+
+ pyqr = PyQR()
+
+ if '--datos' in sys.argv:
+ args = sys.argv[sys.argv.index("--datos")+1:]
+ (ver, fecha, cuit, pto_vta, tipo_cmp, nro_cmp, importe, moneda, ctz,
+ tipo_doc_rec, nro_doc_rec, tipo_cod_aut, cod_aut) = args
+ else:
+ ver = 1
+ fecha = "2020-10-13"
+ cuit = 30000000007
+ pto_vta = 10
+ tipo_cmp = 1
+ nro_cmp = 94
+ importe = 12100
+ moneda = "DOL"
+ ctz = 65.000
+ tipo_doc_rec = 80
+ nro_doc_rec = 20000000001
+ tipo_cod_aut = "E"
+ cod_aut = 70417054367476
+
+ if '--archivo' in sys.argv:
+ pyqr.Archivo = sys.argv[sys.argv.index("--archivo")+1]
+ else:
+ pyqr.CrearArchivo()
+
+ if '--url' in sys.argv:
+ pyqr.URL = sys.argv[sys.argv.index("--url")+1]
+
+ print("datos:", (ver, fecha, cuit, pto_vta, tipo_cmp, nro_cmp,
+ importe, moneda, ctz, tipo_doc_rec, nro_doc_rec,
+ tipo_cod_aut, cod_aut))
+ print("archivo", pyqr.Archivo)
+
+ url = pyqr.GenerarImagen(ver, fecha, cuit, pto_vta, tipo_cmp, nro_cmp,
+ importe, moneda, ctz, tipo_doc_rec, nro_doc_rec,
+ tipo_cod_aut, cod_aut)
+
+ print("url generada:", url)
+
+ if "--prueba" in sys.argv:
+ qr_data_test = json.loads(base64.b64decode(TEST_QR_DATA))
+ qr_data_gen = json.loads(base64.b64decode(url[33:]))
+ assert url.startswith("https://www.afip.gob.ar/fe/qr/?p=")
+ assert qr_data_test == qr_data_gen, "Diff: %r != %r" % (qr_data_test, qr_data_gen)
+ print("QR data ok:", qr_data_gen)
+
+ if not '--mostrar' in sys.argv:
+ pass
+ elif sys.platform=="linux2":
+ os.system("eog ""%s""" % pyqr.Archivo)
+ else:
+ os.startfile(pyqr.archivo)
\ No newline at end of file
diff --git a/setup.py b/setup.py
index b188ef275..47cbab0ed 100644
--- a/setup.py
+++ b/setup.py
@@ -25,7 +25,7 @@
__version__ = "%s.%s.%s" % (sys.version_info[0:2] + (rev, ))
-HOMO = True
+HOMO = False
# build a one-click-installer for windows:
if 'py2exe' in sys.argv:
@@ -38,14 +38,14 @@
#import pyrece
from . import wsaa
from . import wsfev1, rece1, rg3685
- #import wsfexv1, recex1
+ import wsfexv1, recex1
#import wsbfev1, receb1
#import wsmtx, recem
#import wsct, recet
- #import ws_sr_padron
- #import pyfepdf
- #import pyemail
- #import pyi25
+ import ws_sr_padron
+ import pyfepdf
+ import pyemail
+ import pyi25
#import wsctg
#import wslpg
#import wsltv
@@ -53,7 +53,7 @@
#import wslsp
#import wsremcarne
#import wscoc
- #import wscdc
+ import wscdc
#import cot
#import iibb
#import trazamed
@@ -61,8 +61,9 @@
#import trazarenpre
#import trazafito
#import trazavet
- #import padron
- #import sired
+ import padron
+ import sired
+ import pyqr
data_files = [
(".", ["licencia.txt", ]),
diff --git a/tests/wsfev1.py b/tests/wsfev1.py
index ead3251b3..f43b2135b 100644
--- a/tests/wsfev1.py
+++ b/tests/wsfev1.py
@@ -9,7 +9,7 @@
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
-
+from pyafipws import wsfev1
from pyafipws.wsaa import WSAA
from pyafipws.wsfev1 import WSFEv1
"Pruebas para WSFEv1 de AFIP (Factura Electrónica Mercado Interno sin detalle)"
diff --git a/wsaa.py b/wsaa.py
index eaa4cdc10..24a15b1d1 100644
--- a/wsaa.py
+++ b/wsaa.py
@@ -1,5 +1,5 @@
#!/usr/bin/python
-# -*- coding: latin-1 -*-
+# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3, or (at your option) any later
@@ -10,11 +10,11 @@
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
-"Módulo para obtener un ticket de autorización del web service WSAA de AFIP"
+"Módulo para obtener un ticket de autorización del web service WSAA de AFIP"
# Basado en wsaa-client.php de Gerardo Fisanotti - DvSHyS/DiOPIN/AFIP - 13-apr-07
# Definir WSDL, CERT, PRIVATEKEY, PASSPHRASE, SERVICE, WSAAURL
-# Devuelve TA.xml (ticket de autorización de WSAA)
+# Devuelve TA.xml (ticket de autorización de WSAA)
__author__ = "Mariano Reingart (reingart@gmail.com)"
__copyright__ = "Copyright (C) 2008-2011 Mariano Reingart"
@@ -38,17 +38,18 @@
except ImportError:
ex = exception_info()
warnings.warn("No es posible importar M2Crypto (OpenSSL)")
- warnings.warn(ex['msg']) # revisar instalación y DLLs de OpenSSL
+ warnings.warn(ex['msg']) # revisar instalación y DLLs de OpenSSL
BIO = Rand = SMIME = SSL = None
# utilizar alternativa (ejecutar proceso por separado)
from subprocess import Popen, PIPE
from base64 import b64encode
+ from tempfile import NamedTemporaryFile
# Constantes (si se usa el script de linea de comandos)
WSDL = "https://wsaahomo.afip.gov.ar/ws/services/LoginCms?wsdl" # El WSDL correspondiente al WSAA
CERT = "reingart.crt" # El certificado X.509 obtenido de Seg. Inf.
PRIVATEKEY = "reingart.key" # La clave privada del certificado CERT
-PASSPHRASE = "xxxxxxx" # La contraseña para firmar (si hay)
+PASSPHRASE = "xxxxxxx" # La contraseña para firmar (si hay)
SERVICE = "wsfe" # El nombre del web service al que se le pide el TA
# WSAAURL: la URL para acceder al WSAA, verificar http/https y wsaa/wsaahomo
@@ -57,7 +58,7 @@
SOAP_ACTION = 'http://ar.gov.afip.dif.facturaelectronica/' # Revisar WSDL
SOAP_NS = "http://wsaa.view.sua.dvadac.desein.afip.gov" # Revisar WSDL
-# Verificación del web server remoto, necesario para verificar canal seguro
+# Verificación del web server remoto, necesario para verificar canal seguro
CACERT = "conf/afip_ca_info.crt" # WSAA CA Cert (Autoridades de Confiaza)
HOMO = False
@@ -65,7 +66,7 @@
DEFAULT_TTL = 60 * 60 * 5 # five hours
DEBUG = False
-# No debería ser necesario modificar nada despues de esta linea
+# No deberÃa ser necesario modificar nada despues de esta linea
def create_tra(service=SERVICE, ttl=2400):
@@ -82,18 +83,19 @@ def create_tra(service=SERVICE, ttl=2400):
tra.header.add_child('generationTime', str(date('c', date('U') - ttl)))
tra.header.add_child('expirationTime', str(date('c', date('U') + ttl)))
tra.add_child('service', service)
- return tra.as_xml().decode("utf8")
+ return tra.as_xml()
def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
"Firmar PKCS#7 el TRA y devolver CMS (recortando los headers SMIME)"
if BIO:
+ print("pudo importar m2crypto")
# Firmar el texto (tra) usando m2crypto (openssl bindings para python)
buf = BIO.MemoryBuffer(tra) # Crear un buffer desde el texto
- # Rand.load_file('randpool.dat', -1) # Alimentar el PRNG
+ #Rand.load_file('randpool.dat', -1) # Alimentar el PRNG
s = SMIME.SMIME() # Instanciar un SMIME
- # soporte de contraseña de encriptación (clave privada, opcional)
+ # soporte de contraseña de encriptación (clave privada, opcional)
callback = lambda *args, **kwarg: passphrase
# Cargar clave privada y certificado
if not privatekey.startswith("-----BEGIN RSA PRIVATE KEY-----"):
@@ -104,8 +106,8 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
else:
raise RuntimeError("Archivos no encontrados: %s, %s" % (privatekey, cert))
# crear buffers en memoria de la clave privada y certificado:
- key_bio = BIO.MemoryBuffer(privatekey)
- crt_bio = BIO.MemoryBuffer(cert)
+ key_bio = BIO.MemoryBuffer(privatekey.encode('utf8'))
+ crt_bio = BIO.MemoryBuffer(cert.encode('utf8'))
s.load_key_bio(key_bio, crt_bio, callback) # (desde buffer)
p7 = s.sign(buf, 0) # Firmar el buffer
out = BIO.MemoryBuffer() # Crear un buffer para la salida
@@ -113,7 +115,7 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
# Rand.save_file('randpool.dat') # Guardar el estado del PRNG's
# extraer el cuerpo del mensaje (parte firmada)
- msg = email.message_from_string(out.read())
+ msg = email.message_from_string(out.read().decode('utf8'))
for part in msg.walk():
filename = part.get_filename()
if filename == "smime.p7m": # es la parte firmada?
@@ -128,10 +130,35 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
openssl = r"c:\OpenSSL-Win32\bin\openssl.exe"
else:
openssl = r"c:\OpenSSL-Win64\bin\openssl.exe"
- out = Popen([openssl, "smime", "-sign",
- "-signer", cert, "-inkey", privatekey,
- "-outform", "DER", "-nodetach"],
- stdin=PIPE, stdout=PIPE, stderr=PIPE).communicate(tra.encode("utf8"))[0]
+ # NOTE: workaround if certificate is not already stored in a file
+ # SECURITY WARNING: the private key will be exposed a bit in /tmp
+ # (in theory only for the current user)
+ if cert.startswith("-----BEGIN CERTIFICATE-----"):
+ cert_f = NamedTemporaryFile()
+ cert_f.write(cert.encode('utf-8'))
+ cert_f.flush()
+ cert = cert_f.name
+ else:
+ cert_f = None
+ if privatekey.startswith("-----BEGIN RSA PRIVATE KEY-----"):
+ key_f = NamedTemporaryFile()
+ key_f.write(privatekey.encode('utf-8'))
+ key_f.flush()
+ privatekey = key_f.name
+ else:
+ key_f = None
+ try:
+ out = Popen([openssl, "smime", "-sign",
+ "-signer", cert, "-inkey", privatekey,
+ "-outform","DER", "-nodetach"],
+ stdin=PIPE, stdout=PIPE,
+ stderr=PIPE).communicate(tra)[0]
+ finally:
+ # close temp files to delete them (just in case):
+ if cert_f:
+ cert_f.close()
+ if key_f:
+ key_f.close()
return b64encode(out).decode("utf8")
except OSError as e:
if e.errno == 2:
@@ -140,7 +167,7 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
def call_wsaa(cms, location=WSAAURL, proxy=None, trace=False, cacert=None):
- "Llamar web service con CMS para obtener ticket de autorización (TA)"
+ "Llamar web service con CMS para obtener ticket de autorización (TA)"
# creo la nueva clase
wsaa = WSAA()
@@ -156,7 +183,7 @@ def call_wsaa(cms, location=WSAAURL, proxy=None, trace=False, cacert=None):
class WSAA(BaseWS):
- "Interfaz para el WebService de Autenticación y Autorización"
+ "Interfaz para el WebService de Autenticación y Autorización"
_public_methods_ = ['CreateTRA', 'SignTRA', 'CallWSAA', 'LoginCMS', 'Conectar',
'AnalizarXml', 'ObtenerTagXml', 'Expirado', 'Autenticar',
'DebugLog', 'AnalizarCertificado',
@@ -180,7 +207,7 @@ class WSAA(BaseWS):
# Variables globales para BaseWS:
HOMO = HOMO
WSDL = WSDL
- Version = "%s %s" % (__version__, HOMO and 'Homologación' or '')
+ Version = "%s %s" % (__version__, HOMO and 'Homologación' or '')
@inicializar_y_capturar_excepciones
def CreateTRA(self, service="wsfe", ttl=2400):
@@ -189,15 +216,17 @@ def CreateTRA(self, service="wsfe", ttl=2400):
@inicializar_y_capturar_excepciones
def AnalizarCertificado(self, crt, binary=False):
- "Carga un certificado digital y extrae los campos más importantes"
+ "Carga un certificado digital y extrae los campos más importantes"
from M2Crypto import BIO, EVP, RSA, X509
if binary:
- bio = BIO.MemoryBuffer(cert)
- x509 = X509.load_cert_bio(bio, X509.FORMAT_DER)
+ bio = BIO.MemoryBuffer(cert.encode('utf8'))
+ x509 = X511.load_cert_bio(bio, X509.FORMAT_DER)
else:
if not crt.startswith("-----BEGIN CERTIFICATE-----"):
crt = open(crt).read()
- bio = BIO.MemoryBuffer(crt)
+ if isinstance(crt, str):
+ crt = crt.encode('utf-8')
+ bio = BIO.MemoryBuffer(crt.encode('utf8'))
x509 = X509.load_cert_bio(bio, X509.FORMAT_PEM)
if x509:
self.Identidad = x509.get_subject().as_text()
@@ -236,7 +265,7 @@ def CrearPedidoCertificado(self, cuit="", empresa="", nombre="pyafipws",
# create the certificate signing request (CSR):
self.x509_req = X509.Request()
- # normalizar encoding (reemplazar acentos, eñe, etc.)
+ # normalizar encoding (reemplazar acentos, eñe, etc.)
if isinstance(empresa, str):
empresa = unicodedata.normalize('NFKD', empresa).encode('ASCII', 'ignore')
if isinstance(nombre, str):
@@ -264,11 +293,11 @@ def CrearPedidoCertificado(self, cuit="", empresa="", nombre="pyafipws",
@inicializar_y_capturar_excepciones
def SignTRA(self, tra, cert, privatekey, passphrase=""):
"Firmar el TRA y devolver CMS"
- return sign_tra(str(tra), cert.encode('latin1'), privatekey.encode('latin1'), passphrase.encode("utf8"))
+ return sign_tra(tra, cert, privatekey, passphrase)
@inicializar_y_capturar_excepciones
def LoginCMS(self, cms):
- "Obtener ticket de autorización (TA)"
+ "Obtener ticket de autorización (TA)"
results = self.client.loginCms(in0=str(cms))
ta_xml = results['loginCmsReturn'] # .encode("utf-8")
self.xml = ta = SimpleXMLElement(ta_xml)
@@ -278,7 +307,7 @@ def LoginCMS(self, cms):
return ta_xml
def CallWSAA(self, cms, url="", proxy=None):
- "Obtener ticket de autorización (TA) -version retrocompatible-"
+ "Obtener ticket de autorización (TA) -version retrocompatible-"
self.Conectar("", url, proxy)
ta_xml = self.LoginCMS(cms)
if not ta_xml:
@@ -287,7 +316,7 @@ def CallWSAA(self, cms, url="", proxy=None):
@inicializar_y_capturar_excepciones
def Expirado(self, fecha=None):
- "Comprueba la fecha de expiración, devuelve si ha expirado"
+ "Comprueba la fecha de expiración, devuelve si ha expirado"
if not fecha:
fecha = self.ObtenerTagXml('expirationTime')
now = datetime.datetime.now()
@@ -295,7 +324,7 @@ def Expirado(self, fecha=None):
return now > d
def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cacert=None, cache=None, debug=False):
- "Método unificado para obtener el ticket de acceso (cacheado)"
+ "Método unificado para obtener el ticket de acceso (cacheado)"
self.LanzarExcepciones = True
try:
@@ -303,7 +332,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
for filename in (crt, key):
if not os.access(filename, os.R_OK):
raise RuntimeError("Imposible abrir %s\n" % filename)
- # creo el nombre para el archivo del TA (según credenciales y ws)
+ # creo el nombre para el archivo del TA (según credenciales y ws)
ta_src = (service + crt + key).encode("utf8")
fn = "TA-%s.xml" % hashlib.md5(ta_src).hexdigest()
if cache:
@@ -318,7 +347,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
if DEBUG:
print("Creando TRA...")
tra = self.CreateTRA(service=service, ttl=DEFAULT_TTL)
- # firmarlo criptográficamente
+ # firmarlo criptográficamente
if DEBUG:
print("Frimando TRA...")
cms = self.SignTRA(tra, crt, key)
@@ -327,8 +356,8 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
print("Conectando a WSAA...")
ok = self.Conectar(cache, wsdl, proxy, wrapper, cacert)
if not ok or self.Excepcion:
- raise RuntimeError("Fallo la conexión: %s" % self.Excepcion)
- # llamar al método remoto para solicitar el TA
+ raise RuntimeError("Fallo la conexión: %s" % self.Excepcion)
+ # llamar al método remoto para solicitar el TA
if DEBUG:
print("Llamando WSAA...")
ta = self.LoginCMS(cms)
@@ -362,7 +391,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
return ta
-# busco el directorio de instalación (global para que no cambie si usan otra dll)
+# busco el directorio de instalación (global para que no cambie si usan otra dll)
INSTALL_DIR = WSAA.InstallDir = get_install_dir()
@@ -395,21 +424,21 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
# start the server.
win32com.server.localserver.serve([WSAA._reg_clsid_])
elif "--crear_pedido_cert" in sys.argv:
- # instanciar el helper y revisar los parámetros
+ # instanciar el helper y revisar los parámetros
wsaa = WSAA()
args = [arg for arg in sys.argv if not arg.startswith("--")]
# obtengo el CUIT y lo normalizo:
cuit = len(args) > 1 and args[1] or input("Ingrese un CUIT: ")
cuit = ''.join([c for c in cuit if c.isdigit()])
nombre = len(args) > 2 and args[2] or "PyAfipWs"
- # consultar el padrón online de AFIP si no se especificó razón social:
+ # consultar el padrón online de AFIP si no se especificó razón social:
empresa = len(args) > 3 and args[3] or ""
if not empresa:
from .padron import PadronAFIP
padron = PadronAFIP()
ok = padron.Consultar(cuit)
if ok and padron.denominacion:
- print("Denominación según AFIP:", padron.denominacion)
+ print("Denominación según AFIP:", padron.denominacion)
empresa = padron.denominacion
else:
print("CUIT %s no encontrado: %s..." % (cuit, padron.Excepcion))
@@ -430,7 +459,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
print("Se crearon los archivos:")
print(clave_privada)
print(pedido_cert)
- # convertir a terminación de linea windows y abrir con bloc de notas
+ # convertir a terminación de linea windows y abrir con bloc de notas
if sys.platform == "win32":
txt = open(pedido_cert + ".txt", "wb")
for linea in open(pedido_cert, "r"):
@@ -449,7 +478,7 @@ def Autenticar(self, service, crt, key, wsdl=None, proxy=None, wrapper=None, cac
url = len(argv) > 5 and argv[5] or WSAAURL
wrapper = len(argv) > 6 and argv[6] or None
cacert = len(argv) > 7 and argv[7] or CACERT
- DEBUG = "--debug" in args
+ DEBUG = "--debug" in args or DEBUG
print("Usando CRT=%s KEY=%s URL=%s SERVICE=%s TTL=%s" % (crt, key, url, service, ttl), file=sys.stderr)
diff --git a/wsfecred.py b/wsfecred.py
new file mode 100644
index 000000000..1c14d8dca
--- /dev/null
+++ b/wsfecred.py
@@ -0,0 +1,592 @@
+#!/usr/bin/python
+# -*- coding: utf8 -*-
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 3, or (at your option) any later
+# version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+
+"""Módulo para la Gestión de cuentas corrientes de Facturas Electrónicas de
+Crédito del servicio web FECredService versión 1.0.1-rc1 (RG4367/18)
+"""
+
+__author__ = "Mariano Reingart "
+__copyright__ = "Copyright (C) 2018-2019 Mariano Reingart"
+__license__ = "LGPL 3.0"
+__version__ = "1.02a"
+
+LICENCIA = """
+wsfecred.py: Interfaz para REGISTRO DE FACTURAS de CRÉDITO ELECTRÓNICA MiPyMEs
+Resolución General 4367/2018.
+Copyright (C) 2019 Mariano Reingart reingart@gmail.com
+http://www.sistemasagiles.com.ar/trac/wiki/FacturaCreditoElectronica
+
+Este progarma es software libre, se entrega ABSOLUTAMENTE SIN GARANTIA
+y es bienvenido a redistribuirlo bajo la licencia GPLv3.
+
+Para información adicional sobre garantÃa, soporte técnico comercial
+e incorporación/distribución en programas propietarios ver PyAfipWs:
+http://www.sistemasagiles.com.ar/trac/wiki/PyAfipWs
+"""
+
+AYUDA = """
+Opciones:
+ --ayuda: este mensaje
+
+ --debug: modo depuración (detalla y confirma las operaciones)
+ --prueba: genera y autoriza una rec de prueba (no usar en producción!)
+ --xml: almacena los requerimientos y respuestas XML (depuración)
+ --dummy: consulta estado de servidores
+
+ --obligado: consultar monto obligado a recepcion (según CUIT)
+ --ctasctes: consultar cuentas corrientes generadas a partir de facturación
+
+ --tipos_ajuste: tabla de parametros para tipo de ajuste
+ --tipos_cancelacion: tabla de parametros para formas cancelacion
+ --tipos_retencion: tabla de parametros para tipo de retenciones
+ --tipos_rechazo: tabla de parametros para tipo de motivos de rechazo
+
+Ver rece.ini para parámetros de configuración (URL, certificados, etc.)"
+"""
+
+from collections import OrderedDict
+import datetime
+import os, sys, time, base64
+import traceback
+from pysimplesoap.client import SoapFault
+from . import utils
+
+# importo funciones compartidas:
+from .utils import json, BaseWS, inicializar_y_capturar_excepciones, get_install_dir, json_serializer
+
+# constantes de configuración (producción/homologación):
+
+WSDL = ["https://serviciosjava.afip.gob.ar/wsfecred/FECredService?wsdl",
+ "https://fwshomo.afip.gov.ar/wsfecred/FECredService?wsdl"]
+
+DEBUG = False
+XML = False
+CONFIG_FILE = "rece.ini"
+HOMO = False
+ENCABEZADO = []
+
+
+class WSFECred(BaseWS):
+ "Interfaz para el WebService de Factura de Crédito Electronica"
+ _public_methods_ = ['Conectar', 'Dummy', 'SetTicketAcceso', 'DebugLog',
+ 'CrearFECred', 'AgregarFormasCancelacion', 'AgregarAjustesOperacion', 'AgregarRetenciones',
+ 'AgregarConfirmarNotasDC',
+ 'ConsultarCtasCtes', 'LeerCtaCte',
+ 'ConsultarTiposAjustesOperacion', 'ConsultarTiposFormasCancelacion',
+ 'ConsultarTiposMotivosRechazo', 'ConsultarTiposRetenciones',
+ 'ConsultarMontoObligadoRecepcion',
+ 'SetParametros', 'SetParametro', 'GetParametro', 'AnalizarXml', 'ObtenerTagXml', 'LoadTestXML',
+ ]
+ _public_attrs_ = ['XmlRequest', 'XmlResponse', 'Version', 'Traceback', 'Excepcion', 'LanzarExcepciones',
+ 'Token', 'Sign', 'Cuit', 'AppServerStatus', 'DbServerStatus', 'AuthServerStatus',
+ 'CodCtaCte', 'TipoComprobante', 'PuntoVenta',
+ 'NroComprobante', 'CodAutorizacion', 'FechaVencimiento', 'FechaEmision', 'Estado', 'Resultado',
+ 'QR',
+ 'ErrCode', 'ErrMsg', 'Errores', 'ErroresFormato', 'Observaciones', 'Obs', 'Evento', 'Eventos',
+ ]
+ _reg_progid_ = "WSFECred"
+ _reg_clsid_ = "{F4B2B652-C992-4E46-9134-121F62011C46}"
+
+ # Variables globales para BaseWS:
+ HOMO = HOMO
+ WSDL = WSDL[HOMO]
+ LanzarExcepciones = False
+ Version = "%s %s" % (__version__, HOMO and 'Homologación' or '')
+
+ def Conectar(self, *args, **kwargs):
+ ret = BaseWS.Conectar(self, *args, **kwargs)
+ return ret
+
+ def inicializar(self):
+ self.AppServerStatus = self.DbServerStatus = self.AuthServerStatus = None
+ self.CodCtaCte = self.TipoComprobante = self.PuntoVenta = None
+ self.NroComprobante = self.CUITEmisor = None
+ self.Resultado = None
+ self.Errores = []
+ self.ErroresFormato = []
+ self.Observaciones = []
+ self.Eventos = []
+ self.Evento = self.ErrCode = self.ErrMsg = self.Obs = ""
+
+ def __analizar_errores(self, ret):
+ "Comprueba y extrae errores si existen en la respuesta XML"
+ self.Errores = [err['codigoDescripcion'] for err in ret.get('arrayErrores', [])]
+ self.ErroresFormato = [err['codigoDescripcionString'] for err in ret.get('arrayErroresFormato', [])]
+ errores = self.Errores + self.ErroresFormato
+ self.ErrCode = ' '.join(["%(codigo)s" % err for err in errores])
+ self.ErrMsg = '\n'.join(["%(codigo)s: %(descripcion)s" % err for err in errores])
+
+ def __analizar_observaciones(self, ret):
+ "Comprueba y extrae observaciones si existen en la respuesta XML"
+ self.Observaciones = [obs["codigoDescripcion"] for obs in ret.get('arrayObservaciones', [])]
+ self.Obs = '\n'.join(["%(codigo)s: %(descripcion)s" % obs for obs in self.Observaciones])
+
+ def __analizar_evento(self, ret):
+ "Comprueba y extrae el wvento informativo si existen en la respuesta XML"
+ evt = ret.get('evento')
+ if evt:
+ self.Eventos = [evt]
+ self.Evento = "%(codigo)s: %(descripcion)s" % evt
+
+ @inicializar_y_capturar_excepciones
+ def Dummy(self):
+ "Obtener el estado de los servidores de la AFIP"
+ results = self.client.dummy()['dummyReturn']
+ self.AppServerStatus = str(results['appserver'])
+ self.DbServerStatus = str(results['dbserver'])
+ self.AuthServerStatus = str(results['authserver'])
+
+ @inicializar_y_capturar_excepciones
+ def CrearFECred(self, cuit_emisor, tipo_cbte, punto_vta, nro_cbte, cod_moneda="PES", ctz_moneda_ult=1,
+ importe_cancelado=0.00, importe_embargo_pesos=0.00, importe_total_ret_pesos=0.00,
+ saldo_aceptado=0.00, tipo_cancelacion="TOT",
+ **kwargs):
+ "Inicializa internamente los datos de una Factura de Crédito Electrónica para aceptacion/rechazo"
+ self.factura = {
+ 'idCtaCte': {
+ ## "codCtaCte": 2561,
+ "idFactura": {
+ 'CUITEmisor': cuit_emisor,
+ 'codTipoCmp': tipo_cbte,
+ 'ptoVta': punto_vta,
+ 'nroCmp': nro_cbte,
+ }
+ },
+ 'codMoneda': cod_moneda,
+ 'cotizacionMonedaUlt': ctz_moneda_ult,
+ 'importeCancelado': importe_cancelado,
+ 'importeEmbargoPesos': importe_embargo_pesos,
+ 'importeTotalRetPesos': importe_total_ret_pesos,
+ 'saldoAceptado': saldo_aceptado,
+ 'tipoCancelacion': tipo_cancelacion,
+ 'arrayAjustesOperacion': [],
+ 'arrayFormasCancelacion': [],
+ 'arrayRetenciones': [],
+ 'arrayConfirmarNotasDC': [],
+ }
+ return True
+
+ @inicializar_y_capturar_excepciones
+ def AgregarAjustesOperacion(self, codigo=None, importe=0.00, **kwargs):
+ "Agrega la información de los ajustes a la Factura de Crédito Electrónica"
+ self.factura['arrayAjustesOperacion'].append({
+ 'ajuste': {
+ 'codigo': codigo,
+ 'importe': importe,
+ }
+ })
+ return True
+
+ @inicializar_y_capturar_excepciones
+ def AgregarFormasCancelacion(self, codigo=None, descripcion=None, **kwargs):
+ "Agrega la información de las formas de cancelación a la Factura de Crédito Electrónica"
+ self.factura['arrayFormasCancelacion'].append({
+ 'codigoDescripcion': {
+ 'codigo': codigo,
+ 'descripcion': descripcion,
+ }
+ })
+ return True
+
+ @inicializar_y_capturar_excepciones
+ def AgregarRetenciones(self, cod_tipo=None, desc_motivo=None, importe=None, porcentaje=None, **kwargs):
+ "Agrega la información de las retenciones a la Factura de Crédito Electrónica"
+ self.factura['arrayRetenciones'].append({
+ 'retencion': {
+ 'codTipo': cod_tipo,
+ 'descMotivo': desc_motivo,
+ 'importe': importe,
+ 'porcentaje': porcentaje,
+ }
+ })
+ return True
+
+ @inicializar_y_capturar_excepciones
+ def AgregarConfirmarNotasDC(self, cuit_emisor, tipo_cbte, punto_vta, nro_cbte, **kwargs):
+ "Agrega la información referente al viaje del remito electrónico cárnico"
+ self.factura['arrayConfirmarNotasDC'].append({
+ 'confirmarNota': {
+ 'acepta': 'acepta',
+ 'idNota': {
+ 'CUITEmisor': cuit_emisor,
+ 'codTipoCmp': tipo_cbte,
+ 'ptoVta': punto_vta,
+ 'nroCmp': nro_cbte,
+ }
+ }
+ })
+ return True
+
+ @inicializar_y_capturar_excepciones
+ def AceptarFECred(self):
+ "Aceptar el saldo actual de la Cta. Cte. de una Factura de Crédito"
+ # pudiendo indicar: pagos parciales, retenciones y/o embargos
+ params = {
+ 'authRequest': {
+ 'cuitRepresentada': self.Cuit,
+ 'sign': self.Sign,
+ 'token': self.Token
+ },
+ }
+ params.update(self.factura)
+ response = self.client.aceptarFECred(**params)
+ ret = response.get("operacionFECredReturn")
+ if ret:
+ self.__analizar_errores(ret)
+ self.__analizar_observaciones(ret)
+ self.__analizar_evento(ret)
+ self.AnalizarFECred(ret)
+ return True
+
+ @inicializar_y_capturar_excepciones
+ def AnalizarFECred(self, ret, archivo=None):
+ "Extrae el resultado de la Factura de Crédito Electrónica, si existen en la respuesta XML"
+ if ret:
+ id_cta_cte = ret.get("idCtaCte", {})
+ self.CodCtaCte = id_cta_cte.get("codCtaCte")
+ id_factura = id_cta_cte.get("idFactura")
+ if id_factura:
+ self.CUITEmisor = ret.get("cuitEmisor")
+ self.TipoComprobante = ret.get("tipoComprobante")
+ self.PuntoVenta = ret.get("ptoVta")
+ self.NroComprobante = ret.get('NroComprobante')
+ self.Resultado = ret.get('resultado')
+
+ @inicializar_y_capturar_excepciones
+ def ConsultarTiposAjustesOperacion(self, sep="||"):
+ "Listar los tipos de ajustes disponibles"
+ # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada
+ ret = self.client.consultarTiposAjustesOperacion(
+ authRequest={
+ 'token': self.Token, 'sign': self.Sign,
+ 'cuitRepresentada': self.Cuit, },
+ )['codigoDescripcionReturn']
+ self.__analizar_errores(ret)
+ array = ret.get('arrayCodigoDescripcion', [])
+ lista = [it['codigoDescripcion'] for it in array]
+ return [(u"%s {codigo} %s {descripcion} %s" % (sep, sep, sep)).format(**it) if sep else it for it in lista]
+
+ @inicializar_y_capturar_excepciones
+ def ConsultarTiposFormasCancelacion(self, sep="||"):
+ "Listar los tipos de formas de cancelación habilitados para una Factura Electrónica de Crédito"
+ # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada
+ ret = self.client.consultarTiposFormasCancelacion(
+ authRequest={
+ 'token': self.Token, 'sign': self.Sign,
+ 'cuitRepresentada': self.Cuit, },
+ )['codigoDescripcionReturn']
+ self.__analizar_errores(ret)
+ array = ret.get('arrayCodigoDescripcion', [])
+ lista = [it['codigoDescripcion'] for it in array]
+ return [(u"%s {codigo} %s {descripcion} %s" % (sep, sep, sep)).format(**it) if sep else it for it in lista]
+
+ @inicializar_y_capturar_excepciones
+ def ConsultarTiposMotivosRechazo(self, sep="||"):
+ "Listar los tipos de motivos de rechazo habilitados para una cta. cte."
+ # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada
+ ret = self.client.consultarTiposMotivosRechazo(
+ authRequest={
+ 'token': self.Token, 'sign': self.Sign,
+ 'cuitRepresentada': self.Cuit, },
+ )['codigoDescripcionReturn']
+ self.__analizar_errores(ret)
+ array = ret.get('arrayCodigoDescripcion', [])
+ lista = [it['codigoDescripcion'] for it in array]
+ return [(u"%s {codigo} %s {descripcion} %s" % (sep, sep, sep)).format(**it) if sep else it for it in lista]
+
+ @inicializar_y_capturar_excepciones
+ def ConsultarTiposRetenciones(self, sep="||"):
+ "Listar los tipos de retenciones habilitados para una Factura Electrónica de Crédito"
+ # para informar la aceptación de una Factura Electrónica de Crédito y su Cuenta Corriente vinculada
+ ret = self.client.consultarTiposRetenciones(
+ authRequest={
+ 'token': self.Token, 'sign': self.Sign,
+ 'cuitRepresentada': self.Cuit, },
+ )['consultarTiposRetencionesReturn']
+ self.__analizar_errores(ret)
+ array = ret.get('arrayTiposRetenciones', [])
+ lista = [it['tipoRetencion'] for it in array]
+ return [(u"%s {codigoJurisdiccion} %s {descripcionJurisdiccion} %s {porcentajeRetencion} %s" %
+ (sep, sep, sep, sep)).format(**it) if sep else it for it in lista]
+
+ @inicializar_y_capturar_excepciones
+ def ConsultarMontoObligadoRecepcion(self, cuit_consultada, fecha_emision=None):
+ "Conocer la obligación respecto a la emisión o recepción de Facturas de Créditos"
+ if not fecha_emision:
+ fecha_emision = datetime.datetime.today().strftime("%Y-%m-%d")
+ response = self.client.consultarMontoObligadoRecepcion(
+ authRequest={
+ 'token': self.Token, 'sign': self.Sign,
+ 'cuitRepresentada': self.Cuit,
+ },
+ cuitConsultada=cuit_consultada,
+ fechaEmision=fecha_emision,
+ )
+ ret = response.get('consultarMontoObligadoRecepcionReturn')
+ if ret:
+ self.__analizar_errores(ret)
+ self.__analizar_observaciones(ret)
+ self.__analizar_evento(ret)
+ self.Resultado = ret['obligado']
+ return ret['montoDesde']
+
+ @inicializar_y_capturar_excepciones
+ def ConsultarCtasCtes(self, cuit_contraparte=None, rol="Receptor",
+ fecha_desde=None, fecha_hasta=None, fecha_tipo="Emision"):
+ """Obtener las cuentas corrientes que fueron generadas a partir de la facturación
+
+ Args:
+ cuit_contraparte (str): Cuit de la contraparte, que ocupa el rol opuesto (CUITContraparte)
+ rol (str): Identificar la CUIT Representada que origina la cuenta corriente (rolCUITRepresentada)
+ "Emisor" o "Receptor"
+ fecha_desde (str): Fecha Desde, si no se indica se usa "2019-01-01"
+ fecha_hasta (str): Fecha Hasta, si no se indica se usa la fecha de hoy
+ fecha_tipo (str): permite determinar sobre qué fecha vamos a hacer el filtro (TipoFechaSimpleType)
+ "Emision": Fecha de Emisión
+ "PuestaDispo": Fecha puesta a Disposición
+ "VenPago": Fecha vencimiento de pago
+ "VenAcep": Fecha vencimiento aceptación
+ "Acep": Fecha aceptación
+ "InfoAgDptoCltv": Fecha informada a Agente de Deposito
+
+ Returns:
+ int: cantidad de cuentas corrientes
+ """
+ if not fecha_desde:
+ fecha_desde = datetime.datetime.today().strftime("2019-01-01")
+ if not fecha_hasta:
+ fecha_hasta = datetime.datetime.today().strftime("%Y-%m-%d")
+ response = self.client.consultarCtasCtes(
+ authRequest={
+ 'token': self.Token, 'sign': self.Sign,
+ 'cuitRepresentada': self.Cuit,
+ },
+ CUITContraparte=cuit_contraparte,
+ rolCUITRepresentada=rol,
+ fecha={
+ 'desde': fecha_desde,
+ 'hasta': fecha_hasta,
+ 'tipo': fecha_tipo
+ },
+ )
+ ret = response.get('consultarCtasCtesReturn')
+ self.ctas_ctes = []
+ if ret:
+ self.__analizar_errores(ret)
+ self.__analizar_observaciones(ret)
+ self.__analizar_evento(ret)
+ array = ret.get('arrayInfosCtaCte', [])
+ for cc in [it['infoCtaCte'] for it in array]:
+ cc = {
+ 'cod_cta_cte': cc['codCtaCte'],
+ 'estado_cta_cte': cc['estadoCtaCte']['estado'],
+ 'fecha_hora_estado': cc['estadoCtaCte']['fechaHoraEstado'],
+ 'cuit_emisor': cc['idFacturaCredito']['CUITEmisor'],
+ 'tipo_cbte': cc['idFacturaCredito']['codTipoCmp'],
+ 'nro_cbte': cc['idFacturaCredito']['nroCmp'],
+ 'punto_vta': cc['idFacturaCredito']['ptoVta'],
+ 'cod_moneda': cc['codMoneda'],
+ 'importe_total_fc': cc['importeTotalFC'],
+ 'saldo': cc['saldo'],
+ 'saldo_aceptado': cc['saldoAceptado'],
+ }
+ self.ctas_ctes.append(cc)
+ return len(self.ctas_ctes)
+
+ @inicializar_y_capturar_excepciones
+ def LeerCtaCte(self, pos=0):
+ """Leer la cuenta corriente generada a partir de la facturación
+
+ Args:
+ pos (int): posición de la cuenta corriente (0 a n)
+
+ Returns:
+ dict: elemento de la cuenta corriente: {
+ 'cod_cta_cte': 2561,
+ 'estado_cta_cte': 'Modificable',
+ 'fecha_hora_estado': datetime.datetime(2019, 5, 13, 9, 25, 32),
+ 'cuit_emisor': 20267565393,
+ 'tipo_cbte': 201,
+ 'nro_cbte': 22,
+ 'punto_vta': 999
+ 'cod_moneda': 'PES',
+ 'importe_total_fc': Decimal('12850000'),
+ 'saldo': Decimal('12850000'),
+ 'saldo_aceptado': Decimal('0')
+ }
+ """
+ from win32com.client import Dispatch
+ d = Dispatch('Scripting.Dictionary')
+ cc = self.ctas_ctes.pop(pos) if pos < len(self.ctas_ctes) else {}
+ for k, v in cc.items():
+ d.Add(k, str(v))
+ return d
+
+
+# busco el directorio de instalación (global para que no cambie si usan otra dll)
+if not hasattr(sys, "frozen"):
+ basepath = __file__
+elif sys.frozen == 'dll':
+ import win32api
+
+ basepath = win32api.GetModuleFileName(sys.frozendllhandle)
+else:
+ basepath = sys.executable
+INSTALL_DIR = WSFECred.InstallDir = get_install_dir()
+
+if __name__ == '__main__':
+ if '--ayuda' in sys.argv:
+ print(LICENCIA)
+ print(AYUDA)
+ sys.exit(0)
+
+ if "--register" in sys.argv or "--unregister" in sys.argv:
+ import win32com.server.register
+
+ win32com.server.register.UseCommandLine(WSFECred)
+ sys.exit(0)
+
+ from configparser import SafeConfigParser
+
+ try:
+
+ if "--version" in sys.argv:
+ print("Versión: ", __version__)
+
+ for arg in sys.argv[1:]:
+ if arg.startswith("--"):
+ break
+ print("Usando configuración:", arg)
+ CONFIG_FILE = arg
+
+ config = SafeConfigParser()
+ config.read(CONFIG_FILE)
+ CERT = config.get('WSAA', 'CERT')
+ PRIVATEKEY = config.get('WSAA', 'PRIVATEKEY')
+ CUIT = config.get('WSFECred', 'CUIT')
+ ENTRADA = config.get('WSFECred', 'ENTRADA')
+ SALIDA = config.get('WSFECred', 'SALIDA')
+
+ if config.has_option('WSAA', 'URL') and not HOMO:
+ wsaa_url = config.get('WSAA', 'URL')
+ else:
+ wsaa_url = None
+ if config.has_option('WSFECred', 'URL') and not HOMO:
+ wsfecred_url = config.get('WSFECred', 'URL')
+ else:
+ wsfecred_url = WSDL[HOMO]
+
+ if config.has_section('DBF'):
+ conf_dbf = dict(config.items('DBF'))
+ if DEBUG: print("conf_dbf", conf_dbf)
+ else:
+ conf_dbf = {}
+
+ DEBUG = '--debug' in sys.argv
+ XML = '--xml' in sys.argv
+
+ if DEBUG:
+ print("Usando Configuración:")
+ print("wsaa_url:", wsaa_url)
+ print("wsfecred_url:", wsfecred_url)
+
+ # obteniendo el TA
+ from wsaa import WSAA
+
+ wsaa = WSAA()
+ ta = wsaa.Autenticar("wsfecred", CERT, PRIVATEKEY, wsaa_url, debug=DEBUG)
+ if not ta:
+ sys.exit("Imposible autenticar con WSAA: %s" % wsaa.Excepcion)
+
+ # cliente soap del web service
+ wsfecred = WSFECred()
+ wsfecred.Conectar(wsdl=wsfecred_url)
+ wsfecred.SetTicketAcceso(ta)
+ wsfecred.Cuit = CUIT
+ ok = None
+
+ if '--dummy' in sys.argv:
+ ret = wsfecred.Dummy()
+ print("AppServerStatus", wsfecred.AppServerStatus)
+ print("DbServerStatus", wsfecred.DbServerStatus)
+ print("AuthServerStatus", wsfecred.AuthServerStatus)
+ sys.exit(0)
+
+ if '--obligado' in sys.argv:
+ try:
+ cuit_consultar = int(sys.argv[sys.argv.index("--obligado") + 1])
+ except IndexError as ValueError:
+ cuit_consultar = raw_input("Cuit a Consultar: ")
+ ret = wsfecred.ConsultarMontoObligadoRecepcion(cuit_consultar)
+ print("Obligado:", wsfecred.Resultado)
+ print("Monto Desde:", ret)
+
+ if '--ctasctes' in sys.argv:
+ try:
+ cuit_contraparte = int(sys.argv[sys.argv.index("--ctasctes") + 1])
+ except IndexError as ValueError:
+ cuit_contraparte = None
+ ret = wsfecred.ConsultarCtasCtes(cuit_contraparte, rol="Emisor")
+ print("Observaciones:", wsfecred.Obs)
+ import pprint
+
+ for cc in ret:
+ pprint.pprint(cc)
+
+ if '--prueba' in sys.argv:
+ fec = dict(
+ cuit_emisor=30999999999,
+ tipo_cbte=201, punto_vta=99, nro_cbte=22,
+ cod_moneda="PES", ctz_moneda_ult=1,
+ importe_cancelado=1000.00, importe_embargo_pesos=0.00, importe_total_ret_pesos=0.00,
+ saldo_aceptado=1000.00, tipo_cancelacion="TOT",
+ )
+
+ wsfecred.CrearFECred(**fec)
+ wsfecred.AgregarFormasCancelacion(codigo=2, descripcion="Transferencia Bancaria")
+ wsfecred.AceptarFECred()
+
+ print("Resultado", wsfecred.Resultado)
+ print("CodCtaCte", wsfecred.CodCtaCte)
+
+ # Recuperar parámetros:
+
+ if '--tipos_ajuste' in sys.argv:
+ ret = wsfecred.ConsultarTiposAjustesOperacion()
+ print("\n".join(ret))
+
+ if '--tipos_cancelacion' in sys.argv:
+ ret = wsfecred.ConsultarTiposFormasCancelacion()
+ print("\n".join(ret))
+
+ if '--tipos_retencion' in sys.argv:
+ ret = wsfecred.ConsultarTiposRetenciones()
+ print("\n".join(ret))
+
+ if '--tipos_rechazo' in sys.argv:
+ ret = wsfecred.ConsultarTiposMotivosRechazo()
+ print("\n".join(ret))
+
+ if wsfecred.Errores or wsfecred.ErroresFormato:
+ print("Errores:", wsfecred.Errores, wsfecred.ErroresFormato)
+
+ print("hecho.")
+
+ except SoapFault as e:
+ print("Falla SOAP:", e.faultcode, e.faultstring.encode("ascii", "ignore"))
+ sys.exit(3)
+ except Exception as e:
+ ex = utils.exception_info()
+ print(ex)
+ if DEBUG:
+ raise
+ sys.exit(5)