# -*- coding: utf-8 -*- import sys #from builtins import str from datetime import datetime from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket, timeout from struct import pack, unpack import codecs from zk import const from zk.attendance import Attendance from zk.exception import ZKErrorResponse, ZKNetworkError from zk.user import User from zk.finger import Finger def make_commkey(key, session_id, ticks=50): """take a password and session_id and scramble them to send to the time clock. copied from commpro.c - MakeKey""" key = int(key) session_id = int(session_id) k = 0 for i in range(32): if (key & (1 << i)): k = (k << 1 | 1) else: k = k << 1 k += session_id k = pack(b'I', k) k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ ord('Z'), k[1] ^ ord('K'), k[2] ^ ord('S'), k[3] ^ ord('O')) k = unpack(b'HH', k) k = pack(b'HH', k[1], k[0]) B = 0xff & ticks k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ B, k[1] ^ B, B, k[3] ^ B) return k class ZK_helper(object): """ helper class """ def __init__(self, ip, port=4370): self.address = (ip, port) self.ip = ip self.port = port #self.timeout = timeout #self.password = password # passint #self.firmware = int(firmware) #TODO check minor version? #self.tcp = tcp def test_ping(self): """ Returns True if host responds to a ping request """ import subprocess, platform # Ping parameters as function of OS ping_str = "-n 1" if platform.system().lower()=="windows" else "-c 1" args = "ping " + " " + ping_str + " " + self.ip need_sh = False if platform.system().lower()=="windows" else True # Ping return subprocess.call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=need_sh) == 0 def test_tcp(self): self.client = socket(AF_INET, SOCK_STREAM) self.client.settimeout(10) # fixed test res = self.client.connect_ex(self.address) self.client.close() return res def test_udp(self): # WIP: self.client = socket(AF_INET, SOCK_DGRAM) self.client.settimeout(10) # fixed test class ZK(object): """ Clase ZK """ def __init__(self, ip, port=4370, timeout=60, password=0, force_udp=False, ommit_ping=False, verbose=False): """ initialize instance """ self.is_connect = False self.is_enabled = True #let's asume self.helper = ZK_helper(ip, port) self.__address = (ip, port) self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(timeout) self.__timeout = timeout self.__password = password # passint #self.firmware = int(firmware) #dummy self.force_udp = force_udp self.ommit_ping = ommit_ping self.verbose = verbose self.tcp = False self.users = 0 self.fingers = 0 self.records = 0 self.dummy = 0 self.cards = 0 self.fingers_cap = 0 self.users_cap = 0 self.rec_cap = 0 self.faces = 0 self.faces_cap = 0 self.fingers_av = 0 self.users_av = 0 self.rec_av = 0 self.user_packet_size = 28 # default zk6 self.__session_id = 0 self.__reply_id = const.USHRT_MAX-1 self.__data_recv = None self.__data = None def __create_socket(self): """ based on self.tcp""" if self.tcp: self.__sock = socket(AF_INET, SOCK_STREAM) self.__sock.connect_ex(self.__address) else: self.__sock = socket(AF_INET, SOCK_DGRAM) def __create_tcp_top(self, packet): """ witch the complete packet set top header """ length = len(packet) top = pack('= const.USHRT_MAX: reply_id -= const.USHRT_MAX buf = pack('HHHH', command, checksum, session_id, reply_id) return buf + command_string def __create_checksum(self, p): ''' Calculates the checksum of the packet to be sent to the time clock Copied from zkemsdk.c ''' l = len(p) checksum = 0 while l > 1: checksum += unpack('H', pack('BB', p[0], p[1]))[0] p = p[2:] if checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX l -= 2 if l: checksum = checksum + p[-1] while checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX checksum = ~checksum while checksum < 0: checksum += const.USHRT_MAX return pack('H', checksum) def __test_tcp_top(self, packet): """ return size!""" if len(packet)<=8: return 0 # invalid packet tcp_header = unpack(' 20: self.faces = fields[20] self.faces_cap = fields[22] return True else: raise ZKErrorResponse("can't read sizes") def __str__(self): """ for debug""" return "ZK %s://%s:%s users[%i]:%i/%i fingers:%i/%i, records:%i/%i faces:%i/%i" % ( "tcp" if self.tcp else "udp", self.__address[0], self.__address[1], self.user_packet_size, self.users, self.users_cap, self.fingers, self.fingers_cap, self.records, self.rec_cap, self.faces, self.faces_cap ) def restart(self): ''' restart the device ''' command = const.CMD_RESTART cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't restart device") def get_time(self): """get Device Time""" command = const.CMD_GET_TIME response_size = 1032 cmd_response = self.__send_command(command, b'', response_size) if cmd_response.get('status'): return self.__decode_time(self.__data[:4]) else: raise ZKErrorResponse("can't get time") def set_time(self, timestamp): """ set Device time (pass datetime object)""" command = const.CMD_SET_TIME command_string = pack(b'I', self.__encode_time(timestamp)) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't set time") def poweroff(self): ''' shutdown the device ''' command = const.CMD_POWEROFF command_string = b'' response_size = 1032 cmd_response = self.__send_command(command, command_string, response_size) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't poweroff") def refresh_data(self): ''' shutdown the device ''' command = const.CMD_REFRESHDATA cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't refresh data") def test_voice(self, index=0): ''' play test voice 0 acceso correcto / acceso correcto 1 password incorrecto / clave incorrecta 2 la memoria del terminal está llena / acceso denegado 3 usuario invalido /codigo no valido 4 intente de nuevo por favor / intente de nuevo por favor * 5 reintroduszca codigo de usuario /reintroduszca codigo 6 memoria del terminal llena /- 7 memoria de alm fich llena /- 8 huella duplicada / huella duplicada 9 acceso denegado / ya ha sido registrado 10 *beep* / beep kuko 11 el sistema vuelve al modo de verificacion / beep siren 12 por favor coloque su dedo o acerque tarjeta /- 13 acerca su tarjeta de nuevo /beep bell 14 excedido tiempo p esta operacion /- 15 coloque su dedo de nuevo /- 16 coloque su dedo por ultima vez /- 17 ATN numero de tarjeta está repetida /- 18 proceso de registro correcto * /- 19 borrado correcto /- 20 Numero de usuario / ponga la caja de ojos 21 ATN se ha llegado al max num usuarios /- 22 verificacion de usuarios /- 23 usuario no registrado /- 24 ATN se ha llegado al num max de registros /- 25 ATN la puerta no esta cerrada /- 26 registro de usuarios /- 27 borrado de usuarios /- 28 coloque su dedo /- 29 registre la tarjeta de administrador /- 30 0 /- 31 1 /- 32 2 /- 33 3 /- 34 4 /- 35 5 /- 36 6 /- 37 7 /- 38 8 /- 39 9 /- 40 PFV seleccione numero de usuario /- 41 registrar /- 42 operacion correcta /- 43 PFV acerque su tarjeta /- 43 la tarjeta ha sido registrada /- 45 error en operacion /- 46 PFV acerque tarjeta de administracion, p confirmacion /- 47 descarga de fichajes /- 48 descarga de usuarios /- 49 carga de usuarios /- 50 actualizan de firmware /- 51 ejeuctar ficheros de configuracion /- 52 confirmación de clave de acceso correcta /- 53 error en operacion de tclado /- 54 borrar todos los usuarios /- 55 restaurar terminal con configuracion por defecto /- 56 introduzca numero de usuario /- 57 teclado bloqueado /- 58 error en la gestión de la tarjeta /- 59 establezca una clave de acceso /- 60 pulse el teclado /- 61 zona de accceso invalida /- 62 acceso combinado invĺlido /- 63 verificación multiusuario /- 64 modo de verificación inválido /- 65 - /- ''' command = const.CMD_TESTVOICE command_string = pack("I", index) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: return False #some devices doesn't support sound #raise ZKErrorResponse("can't test voice") def set_user(self, uid, name, privilege=0, password='', group_id='', user_id='', card=0): ''' create or update user by uid ''' command = const.CMD_USER_WRQ if not user_id: user_id = str(uid) #ZK6 needs uid2 == uid #uid = chr(uid % 256) + chr(uid >> 8) if privilege not in [const.USER_DEFAULT, const.USER_ADMIN]: privilege = const.USER_DEFAULT privilege = int(privilege) if self.user_packet_size == 28: #self.firmware == 6: if not group_id: group_id = 0 try: command_string = pack('HB5s8sIxBHI', uid, privilege, password.encode(), name.encode(), card, int(group_id), 0, int(user_id)) except Exception as e: if self.verbose: print("s_h Error pack: %s" % e) if self.verbose: print("Error pack: %s" % sys.exc_info()[0]) raise ZKErrorResponse("Cant pack user") else: name_pad = name.encode().ljust(24, b'\x00')[:24] card_str = pack('i', int(card))[:4] command_string = pack('HB8s24s4sx7sx24s', uid, privilege, password.encode(), name_pad, card_str, group_id.encode(), user_id.encode()) response_size = 1024 #TODO check response? cmd_response = self.__send_command(command, command_string, response_size) if not cmd_response.get('status'): raise ZKErrorResponse("Cant set user") self.refresh_data() def save_user_template(self, user, fingers=[]): """ save user and template """ #TODO: grabado global # armar paquete de huellas if not isinstance(user, User): #try uid users = self.get_users() tusers = list(filter(lambda x: x.uid==user, users)) if len(tusers) == 1: user = tusers[0] else: tusers = list(filter(lambda x: x.user_id==str(user), users)) if len(tusers) == 1: user = tusers[0] else: raise ZKErrorResponse("Cant find user") if isinstance(fingers, Finger): fingers = [fingers] fpack = "" table = "" fnum = 0x10 # possibly flag tstart = 0 for finger in fingers: tfp = finger.repack_only() table += pack("= (bytes + 32): #complete if response == const.CMD_DATA: resp = data_recv[16:bytes+16][:size-1] # no ack? if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug? if self.verbose: print ("cutting from",len(resp)) resp = resp[:-6] if self.verbose: print ("resp len1", len(resp)) return Finger(uid, temp_id, 1, resp) #mistery else: if self.verbose: print("broken packet!!!") return None #broken else: # incomplete if self.verbose: print ("try incomplete") data.append(data_recv[16:]) # w/o tcp and header bytes -= recieved-16 while bytes>0: #jic data_recv = self.__sock.recv(bytes) #ideal limit? recieved = len(data_recv) if self.verbose: print ("partial recv {}".format(recieved)) data.append(data_recv) # w/o tcp and header bytes -= recieved data_recv = self.__sock.recv(16) response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: resp = b''.join(data)[:size-1] # testing if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': resp = resp[:-6] if self.verbose: print ("resp len2", len(resp)) return Finger(uid, temp_id, 1, resp) #data_recv[bytes+16:].encode('hex') #included CMD_ACK_OK if self.verbose: print("bad response %s" % data_recv) if self.verbose: print(data) return None #else udp size = bytes while True: #limitado por respuesta no por tamaño data_recv = self.__sock.recv(response_size) response = unpack('HHHH', data_recv[:8])[0] if self.verbose: print("# packet response is: {}".format(response)) if response == const.CMD_DATA: data.append(data_recv[8:]) #header turncated bytes -= 1024 elif response == const.CMD_ACK_OK: break #without problem. else: #truncado! continuar? if self.verbose: print("broken!") break if self.verbose: print("still needs %s" % bytes) data = b''.join(data)[:-1] if data[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug? data = data[:-6] #uid 32 fing 03, starts with 4d-9b-53-53-32-31 #CMD_USERTEMP_RRQ desn't need [:-1] return Finger(uid, temp_id, 1, data) def get_templates(self): """ return array of all fingers """ templates = [] templatedata, size = self.read_with_buffer(const.CMD_DB_RRQ, const.FCT_FINGERTMP) if size < 4: if self.verbose: print("WRN: no user data") # debug return [] total_size = unpack('i', templatedata[0:4])[0] print ("get template total size {}, size {} len {}".format(total_size, size, len(templatedata))) templatedata = templatedata[4:] #total size not used # ZKFinger VX10.0 the only finger firmware tested while total_size: #print ("total_size {}".format(total_size)) size, uid, fid, valid = unpack('HHbb',templatedata[:6]) template = unpack("%is" % (size-6), templatedata[6:size])[0] finger = Finger(uid, fid, valid, template) if self.verbose: print(finger) # test templates.append(finger) templatedata = templatedata[size:] total_size -= size return templates def get_users(self): #ALWAYS CALL TO GET correct user_packet_size """ return all user """ self.read_sizes() # last update if self.users == 0: #lazy return [] users = [] userdata, size = self.read_with_buffer(const.CMD_USERTEMP_RRQ, const.FCT_USER) if self.verbose: print("user size %i" % size) if size <= 4: if self.verbose: print("WRN: no user data")# debug return [] total_size = unpack("I",userdata[:4])[0] self.user_packet_size = total_size / self.users if not self.user_packet_size in [28, 72]: if self.verbose: print("WRN packet size would be %i" % self.user_packet_size) userdata = userdata[4:] if self.user_packet_size == 28: while len(userdata) >= 28: uid, privilege, password, name, card, group_id, timezone, user_id = unpack('= 72: uid, privilege, password, name, card, group_id, user_id = unpack('> 8) cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Cant Verify") def reg_event(self, flags): """ reg events, """ command = const.CMD_REG_EVENT command_string = pack ("I", flags) cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): raise ZKErrorResponse("cant' reg events %i" % flags) def set_sdk_build_1(self): """ """ command = const.CMD_OPTIONS_WRQ command_string = b"SDKBuild=1" cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): return False #raise ZKErrorResponse("can't set sdk build ") return True def enroll_user(self, uid=0, temp_id=0, user_id=''): ''' start enroll user we need user_id (uid2) ''' command = const.CMD_STARTENROLL done = False if not user_id: #we need user_id (uid2) users = self.get_users() users = list(filter(lambda x: x.uid==uid, users)) if len(users) >= 1: user_id = users[0].user_id else: #double? posibly empty return False #can't enroll if self.tcp: command_string = pack('<24sbb',str(user_id).encode(), temp_id, 1) # el 1 es misterio else: command_string = pack(' 16: #not empty res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 0 or res == 6 or res == 4: # 6 timeout, 4 mismatch error, 0 can't start(why?) if self.verbose: print ("posible timeout o reg Fallido") break else: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break if self.verbose: print ("A:%i esperando 2do regevent" % attempts) data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 else: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 if attempts == 0: if self.verbose: print ("esperando 3er regevent") data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] else: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 5: if self.verbose: print ("huella duplicada") if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") if res == 0: size = unpack("H", data_recv.ljust(16,b"\x00")[10:12])[0] pos = unpack("H", data_recv.ljust(16,b"\x00")[12:14])[0] if self.verbose: print("enroll ok", size, pos) done = True self.__sock.settimeout(self.__timeout) self.reg_event(0) # TODO: test self.cancel_capture() self.verify_user() return done def live_capture(self, new_timeout=10):# generator! """ try live capture of events""" was_enabled = self.is_enabled users = self.get_users() self.cancel_capture() self.verify_user() if not self.is_enabled: self.enable_device() self.reg_event(const.EF_ATTLOG) #0xFFFF self.__sock.settimeout(new_timeout) # default 1 minute test? self.end_live_capture = False while not self.end_live_capture: try: if self.verbose: print ("esperando event") data_recv = self.__sock.recv(1032) self.__ack_ok() if self.tcp: size = unpack('= (size + 32): #complete if response == const.CMD_DATA: resp = data_recv[16 : size + 16] # no ack? if self.verbose: print ("resp complete len", len(resp)) return resp else: if self.verbose: print("broken packet!!! {}".format(response)) return None #broken else: # incomplete if self.verbose: print ("try incomplete") data.append(data_recv[16:]) # w/o tcp and header size -= recieved-16 while size>0: #jic data_recv = self.__sock.recv(size) #ideal limit? recieved = len(data_recv) if self.verbose: print ("partial recv {}".format(recieved)) data.append(data_recv) # w/o tcp and header size -= recieved #get cmd_ack_ok data_recv = self.__sock.recv(16) #could be broken if len(data_recv) < 16: print ("trying to complete broken ACK") data_recv += self.__sock.recv(16 - len(data_recv)) if not self.__test_tcp_top(data_recv): if self.verbose: print ("invalid tcp ACK OK") return None #b''.join(data) # incomplete? response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: return b''.join(data) #data_recv[bytes+16:].encode('hex') #included CMD_ACK_OK if self.verbose: print("bad response %s" % data_recv) if self.verbose: print (data) return None #else udp while True: #limitado por respuesta no por tamaño data_recv = self.__sock.recv(response_size) response = unpack('HHHH', data_recv[:8])[0] if self.verbose: print ("# packet response is: {}".format(response)) if response == const.CMD_DATA: data.append(data_recv[8:]) #header turncated size -= 1024 #UDP elif response == const.CMD_ACK_OK: break #without problem. else: #truncado! continuar? if self.verbose: print ("broken!") break if self.verbose: print ("still needs %s" % size) return b''.join(data) else: if self.verbose: print ("invalid response %s" % self.__response) return None #("can't get user template") def __read_chunk(self, start, size): """ read a chunk from buffer """ for _retries in range(3): command = 1504 #CMD_READ_BUFFER command_string = pack('= (bytes + 32): #complete if response == const.CMD_DATA: resp = data_recv[16:bytes+16] # no ack? if self.verbose: print ("resp len", len(resp)) return resp else: if self.verbose: print("broken packet!!!") return '' #broken else: # incomplete data.append(data_recv[16:]) # w/o tcp and header bytes -= recieved-16 while bytes>0: #jic data_recv = self.__sock.recv(bytes) #ideal limit? recieved = len(data_recv) data.append(data_recv) # w/o tcp and header bytes -= recieved data_recv = self.__sock.recv(16) response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: return b''.join(data) #data_recv[bytes+16:].encode('hex') #included CMD_ACK_OK if self.verbose: print("bad response %s" % data_recv) if self.verbose: print (data) return '' #else udp while True: #limitado por respuesta no por tamaño data_recv = self.__sock.recv(response_size) response = unpack('HHHH', data_recv[:8])[0] if self.verbose: print ("# packet response is: {}".format(response)) if response == const.CMD_DATA: data.append(data_recv[8:]) #header turncated bytes -= 1024 #UDP elif response == const.CMD_ACK_OK: break #without problem. else: #truncado! continuar? if self.verbose: print ("broken!") break if self.verbose: print ("still needs %s" % bytes) return b''.join(data) def read_with_buffer(self, command, fct=0 ,ext=0): """ Test read info with buffered command (ZK6: 1503) """ if self.tcp: MAX_CHUNK = 0xFFc0 #arbitrary, below 0x10008 else: MAX_CHUNK = 16 * 1024 command_string = pack('= 8: uid, status, timestamp = unpack('HH4s', attendance_data.ljust(8, b'\x00')[:8]) if self.verbose: print (codecs.encode(attendance_data[:8], 'hex')) attendance_data = attendance_data[8:] tuser = list(filter(lambda x: x.uid == uid, users)) if not tuser: user_id = str(uid) #TODO revisar pq else: user_id = tuser[0].user_id timestamp = self.__decode_time(timestamp) attendance = Attendance(uid, user_id, timestamp, status) attendances.append(attendance) elif record_size == 16: # extended while len(attendance_data) >= 16: user_id, timestamp, status, verified, reserved, workcode = unpack('= 40: uid, user_id, sparator, timestamp, status, space = unpack('