# -*- coding: utf-8 -*- import sys #from builtins import str from datetime import datetime from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket, timeout from struct import pack, unpack import codecs from zk import const from zk.attendance import Attendance from zk.exception import ZKErrorResponse, ZKNetworkError from zk.user import User from zk.finger import Finger def safe_cast(val, to_type, default=None): #https://stackoverflow.com/questions/6330071/safe-casting-in-python try: return to_type(val) except (ValueError, TypeError): return default def make_commkey(key, session_id, ticks=50): """take a password and session_id and scramble them to send to the time clock. copied from commpro.c - MakeKey""" key = int(key) session_id = int(session_id) k = 0 for i in range(32): if (key & (1 << i)): k = (k << 1 | 1) else: k = k << 1 k += session_id k = pack(b'I', k) k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ ord('Z'), k[1] ^ ord('K'), k[2] ^ ord('S'), k[3] ^ ord('O')) k = unpack(b'HH', k) k = pack(b'HH', k[1], k[0]) B = 0xff & ticks k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ B, k[1] ^ B, B, k[3] ^ B) return k class ZK_helper(object): """ helper class """ def __init__(self, ip, port=4370): self.address = (ip, port) self.ip = ip self.port = port #self.timeout = timeout #self.password = password # passint #self.firmware = int(firmware) #TODO check minor version? #self.tcp = tcp def test_ping(self): """ Returns True if host responds to a ping request """ import subprocess, platform # Ping parameters as function of OS ping_str = "-n 1" if platform.system().lower()=="windows" else "-c 1 -W 5" args = "ping " + " " + ping_str + " " + self.ip need_sh = False if platform.system().lower()=="windows" else True # Ping return subprocess.call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=need_sh) == 0 def test_tcp(self): self.client = socket(AF_INET, SOCK_STREAM) self.client.settimeout(10) # fixed test res = self.client.connect_ex(self.address) self.client.close() return res def test_udp(self): # WIP: self.client = socket(AF_INET, SOCK_DGRAM) self.client.settimeout(10) # fixed test class ZK(object): """ Clase ZK """ def __init__(self, ip, port=4370, timeout=60, password=0, force_udp=False, ommit_ping=False, verbose=False, encoding='UTF-8'): """ initialize instance """ self.is_connect = False self.is_enabled = True #let's asume self.helper = ZK_helper(ip, port) self.__address = (ip, port) self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(timeout) self.__timeout = timeout self.__password = password # passint #self.firmware = int(firmware) #dummy self.force_udp = force_udp self.ommit_ping = ommit_ping self.verbose = verbose self.encoding = encoding User.encoding = encoding self.tcp = False self.users = 0 self.fingers = 0 self.records = 0 self.dummy = 0 self.cards = 0 self.fingers_cap = 0 self.users_cap = 0 self.rec_cap = 0 self.faces = 0 self.faces_cap = 0 self.fingers_av = 0 self.users_av = 0 self.rec_av = 0 self.next_uid = 1 self.next_user_id='1' self.user_packet_size = 28 # default zk6 self.end_live_capture = False self.__session_id = 0 self.__reply_id = const.USHRT_MAX-1 self.__data_recv = None self.__data = None def __nonzero__(self): """ for boolean test""" return self.is_connect def __create_socket(self): """ based on self.tcp""" if self.tcp: self.__sock = socket(AF_INET, SOCK_STREAM) self.__sock.settimeout(self.__timeout) self.__sock.connect_ex(self.__address) else: self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(self.__timeout) def __create_tcp_top(self, packet): """ witch the complete packet set top header """ length = len(packet) top = pack('= const.USHRT_MAX: reply_id -= const.USHRT_MAX buf = pack('<4H', command, checksum, session_id, reply_id) return buf + command_string def __create_checksum(self, p): ''' Calculates the checksum of the packet to be sent to the time clock Copied from zkemsdk.c ''' l = len(p) checksum = 0 while l > 1: checksum += unpack('H', pack('BB', p[0], p[1]))[0] p = p[2:] if checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX l -= 2 if l: checksum = checksum + p[-1] while checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX checksum = ~checksum while checksum < 0: checksum += const.USHRT_MAX return pack('H', checksum) def __test_tcp_top(self, packet): """ return size!""" if len(packet)<=8: return 0 # invalid packet tcp_header = unpack('= 80: fields = unpack('20i', self.__data[:80]) self.users = fields[4] self.fingers = fields[6] self.records = fields[8] self.dummy = fields[10] #??? self.cards = fields[12] self.fingers_cap = fields[14] self.users_cap = fields[15] self.rec_cap = fields[16] self.fingers_av = fields[17] self.users_av = fields[18] self.rec_av = fields[19] self.__data = self.__data[80:] if len(self.__data) >= 12: #face info fields = unpack('3i', self.__data[:12]) #dirty hack! we need more information self.faces = fields[0] self.faces_cap = fields[2] return True else: raise ZKErrorResponse("can't read sizes") def unlock(self, time=3): ''' :param time: define time in seconds :return: thanks to https://github.com/SoftwareHouseMerida/pyzk/ ''' command = const.CMD_UNLOCK command_string = pack("I",int(time)*10) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Can't open door") def __str__(self): """ for debug""" return "ZK %s://%s:%s users[%i]:%i/%i fingers:%i/%i, records:%i/%i faces:%i/%i" % ( "tcp" if self.tcp else "udp", self.__address[0], self.__address[1], self.user_packet_size, self.users, self.users_cap, self.fingers, self.fingers_cap, self.records, self.rec_cap, self.faces, self.faces_cap ) def restart(self): ''' restart the device ''' command = const.CMD_RESTART cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't restart device") def get_time(self): """get Device Time""" command = const.CMD_GET_TIME response_size = 1032 cmd_response = self.__send_command(command, b'', response_size) if cmd_response.get('status'): return self.__decode_time(self.__data[:4]) else: raise ZKErrorResponse("can't get time") def set_time(self, timestamp): """ set Device time (pass datetime object)""" command = const.CMD_SET_TIME command_string = pack(b'I', self.__encode_time(timestamp)) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't set time") def poweroff(self): ''' shutdown the device ''' command = const.CMD_POWEROFF command_string = b'' response_size = 1032 cmd_response = self.__send_command(command, command_string, response_size) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't poweroff") def refresh_data(self): ''' shutdown the device ''' command = const.CMD_REFRESHDATA cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't refresh data") def test_voice(self, index=0): ''' play test voice 0 acceso correcto / acceso correcto 1 password incorrecto / clave incorrecta 2 la memoria del terminal está llena / acceso denegado 3 usuario invalido /codigo no valido 4 intente de nuevo por favor / intente de nuevo por favor * 5 reintroduszca codigo de usuario /reintroduszca codigo 6 memoria del terminal llena /- 7 memoria de alm fich llena /- 8 huella duplicada / huella duplicada 9 acceso denegado / ya ha sido registrado 10 *beep* / beep kuko 11 el sistema vuelve al modo de verificacion / beep siren 12 por favor coloque su dedo o acerque tarjeta /- 13 acerca su tarjeta de nuevo /beep bell 14 excedido tiempo p esta operacion /- 15 coloque su dedo de nuevo /- 16 coloque su dedo por ultima vez /- 17 ATN numero de tarjeta está repetida /- 18 proceso de registro correcto * /- 19 borrado correcto /- 20 Numero de usuario / ponga la caja de ojos 21 ATN se ha llegado al max num usuarios /- 22 verificacion de usuarios /- 23 usuario no registrado /- 24 ATN se ha llegado al num max de registros /- 25 ATN la puerta no esta cerrada /- 26 registro de usuarios /- 27 borrado de usuarios /- 28 coloque su dedo /- 29 registre la tarjeta de administrador /- 30 0 /- 31 1 /- 32 2 /- 33 3 /- 34 4 /- 35 5 /- 36 6 /- 37 7 /- 38 8 /- 39 9 /- 40 PFV seleccione numero de usuario /- 41 registrar /- 42 operacion correcta /- 43 PFV acerque su tarjeta /- 43 la tarjeta ha sido registrada /- 45 error en operacion /- 46 PFV acerque tarjeta de administracion, p confirmacion /- 47 descarga de fichajes /- 48 descarga de usuarios /- 49 carga de usuarios /- 50 actualizan de firmware /- 51 ejeuctar ficheros de configuracion /- 52 confirmación de clave de acceso correcta /- 53 error en operacion de tclado /- 54 borrar todos los usuarios /- 55 restaurar terminal con configuracion por defecto /- 56 introduzca numero de usuario /- 57 teclado bloqueado /- 58 error en la gestión de la tarjeta /- 59 establezca una clave de acceso /- 60 pulse el teclado /- 61 zona de accceso invalida /- 62 acceso combinado invĺlido /- 63 verificación multiusuario /- 64 modo de verificación inválido /- 65 - /- ''' command = const.CMD_TESTVOICE command_string = pack("I", index) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: return False #some devices doesn't support sound #raise ZKErrorResponse("can't test voice") def set_user(self, uid=None, name='', privilege=0, password='', group_id='', user_id='', card=0): ''' create or update user by uid ''' command = const.CMD_USER_WRQ if uid is None: uid = self.next_uid # keeps uid=0 if not user_id: user_id = self.next_user_id # else... if not user_id: user_id = str(uid) #ZK6 needs uid2 == uid #TODO: check what happens if name is missing... if privilege not in [const.USER_DEFAULT, const.USER_ADMIN]: privilege = const.USER_DEFAULT privilege = int(privilege) if self.user_packet_size == 28: #self.firmware == 6: if not group_id: group_id = 0 try: command_string = pack('HB5s8sIxBHI', uid, privilege, password.encode(self.encoding, errors='ignore'), name.encode(self.encoding, errors='ignore'), card, int(group_id), 0, int(user_id)) except Exception as e: if self.verbose: print("s_h Error pack: %s" % e) if self.verbose: print("Error pack: %s" % sys.exc_info()[0]) raise ZKErrorResponse("Can't pack user") else: name_pad = name.encode(self.encoding, errors='ignore').ljust(24, b'\x00')[:24] card_str = pack('i', int(card))[:4] command_string = pack('HB8s24s4sx7sx24s', uid, privilege, password.encode(self.encoding, errors='ignore'), name_pad, card_str, group_id.encode(), user_id.encode()) response_size = 1024 #TODO check response? cmd_response = self.__send_command(command, command_string, response_size) if not cmd_response.get('status'): raise ZKErrorResponse("Can't set user") self.refresh_data() if self.next_uid == uid: self.next_uid += 1 # better recalculate again if self.next_user_id == user_id: self.next_user_id = str(self.next_uid) def save_user_template(self, user, fingers=[]): """ save user and template """ #TODO: grabado global # armar paquete de huellas if not isinstance(user, User): #try uid users = self.get_users() tusers = list(filter(lambda x: x.uid==user, users)) if len(tusers) == 1: user = tusers[0] else: tusers = list(filter(lambda x: x.user_id==str(user), users)) if len(tusers) == 1: user = tusers[0] else: raise ZKErrorResponse("Can't find user") if isinstance(fingers, Finger): fingers = [fingers] fpack = b"" table = b"" fnum = 0x10 # possibly flag tstart = 0 for finger in fingers: tfp = finger.repack_only() table += pack("= 28: uid, privilege, password, name, card, group_id, timezone, user_id = unpack(' max_uid: max_uid = uid password = (password.split(b'\x00')[0]).decode(self.encoding, errors='ignore') name = (name.split(b'\x00')[0]).decode(self.encoding, errors='ignore').strip() #card = unpack('I', card)[0] #or hex value? group_id = str(group_id) user_id = str(user_id) #TODO: check card value and find in ver8 if not name: name = "NN-%s" % user_id user = User(uid, name, privilege, password, group_id, user_id, card) users.append(user) if self.verbose: print("[6]user:",uid, privilege, password, name, card, group_id, timezone, user_id) userdata = userdata[28:] else: while len(userdata) >= 72: uid, privilege, password, name, card, group_id, user_id = unpack(' max_uid: max_uid = uid #card = int(unpack('I', separator)[0]) if not name: name = "NN-%s" % user_id user = User(uid, name, privilege, password, group_id, user_id, card) users.append(user) userdata = userdata[72:] #get limits! max_uid += 1 self.next_uid = max_uid self.next_user_id = str(max_uid) #re check while True: if any(u for u in users if u.user_id == self.next_user_id): max_uid += 1 self.next_user_id = str(max_uid) else: break return users def cancel_capture(self): ''' cancel capturing finger ''' command = const.CMD_CANCELCAPTURE cmd_response = self.__send_command(command) return bool(cmd_response.get('status')) def verify_user(self): ''' start verify finger mode (after capture) ''' command = const.CMD_STARTVERIFY # uid = chr(uid % 256) + chr(uid >> 8) cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Cant Verify") def reg_event(self, flags): """ reg events, """ command = const.CMD_REG_EVENT command_string = pack ("I", flags) cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): raise ZKErrorResponse("cant' reg events %i" % flags) def set_sdk_build_1(self): """ """ command = const.CMD_OPTIONS_WRQ command_string = b"SDKBuild=1" cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): return False #raise ZKErrorResponse("can't set sdk build ") return True def enroll_user(self, uid=0, temp_id=0, user_id=''): ''' start enroll user we need user_id (uid2) ''' command = const.CMD_STARTENROLL done = False if not user_id: #we need user_id (uid2) users = self.get_users() users = list(filter(lambda x: x.uid==uid, users)) if len(users) >= 1: user_id = users[0].user_id else: #double? posibly empty return False #can't enroll if self.tcp: command_string = pack('<24sbb',str(user_id).encode(), temp_id, 1) # el 1 es misterio else: command_string = pack(' 16: #not empty res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 0 or res == 6 or res == 4: # 6 timeout, 4 mismatch error, 0 can't start(why?) if self.verbose: print ("posible timeout o reg Fallido") break else: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break if self.verbose: print ("A:%i esperando 2do regevent" % attempts) data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 else: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 if attempts == 0: if self.verbose: print ("esperando 3er regevent") data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] else: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 5: if self.verbose: print ("huella duplicada") if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") if res == 0: size = unpack("H", data_recv.ljust(16,b"\x00")[10:12])[0] pos = unpack("H", data_recv.ljust(16,b"\x00")[12:14])[0] if self.verbose: print("enroll ok", size, pos) done = True self.__sock.settimeout(self.__timeout) self.reg_event(0) # TODO: test self.cancel_capture() self.verify_user() return done def live_capture(self, new_timeout=10):# generator! """ try live capture of events""" was_enabled = self.is_enabled users = self.get_users() self.cancel_capture() self.verify_user() if not self.is_enabled: self.enable_device() self.reg_event(const.EF_ATTLOG) #0xFFFF self.__sock.settimeout(new_timeout) # default 1 minute test? self.end_live_capture = False while not self.end_live_capture: try: if self.verbose: print ("esperando event") data_recv = self.__sock.recv(1032) self.__ack_ok() if self.tcp: size = unpack(' (recieved - 16): #broken first DATA # #reparse as more data packets? # if self.verbose: print ("trying".format(recieved, size)) # _data, bh = self.__recieve_tcp_data(data_recv, tcp_length-16) #analize first response response = unpack('HHHH', data_recv[8:16])[0] if recieved >= (size + 32): #complete with ACK_OK included if response == const.CMD_DATA: resp = data_recv[16 : size + 16] # no ack? if self.verbose: print ("resp complete len {}".format(len(resp))) return resp, data_recv[size + 16:] else: if self.verbose: print("incorrect response!!! {}".format(response)) return None, b"" #broken else: # response DATA incomplete (or missing ACK_OK) if self.verbose: print ("try DATA incomplete (actual valid {})".format(recieved-16)) data.append(data_recv[16 : size + 16 ]) # w/o DATA tcp and header size -= recieved - 16 # w/o DATA tcp and header broken_header = b"" if size < 0: #broken ack header? broken_header = data_recv[size:] if self.verbose: print ("broken", (broken_header).encode('hex')) #TODO python3 if size > 0: #need raw data to complete data_recv = self.__recieve_raw_data(size) data.append(data_recv) # w/o tcp and header return b''.join(data), broken_header #get cmd_ack_ok on __rchunk def __recieve_raw_data(self, size): """ partial data ? """ data = [] if self.verbose: print ("expecting {} bytes raw data".format(size)) while size > 0: data_recv = self.__sock.recv(size) #ideal limit? recieved = len(data_recv) if self.verbose: print ("partial recv {}".format(recieved)) data.append(data_recv) # w/o tcp and header size -= recieved if self.verbose: print ("still need {}".format(size)) return b''.join(data) def __recieve_chunk(self): """ recieve a chunk """ if self.__response == const.CMD_DATA: # less than 1024!!! if self.tcp: #MUST CHECK TCP SIZE if self.verbose: print ("_rc_DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length)) if len(self.__data) < (self.__tcp_length - 8): need = (self.__tcp_length - 8) - len(self.__data) if self.verbose: print ("need more data: {}".format(need)) more_data = self.__recieve_raw_data(need) return b''.join([self.__data, more_data]) else: #enough data if self.verbose: print ("Enough data") return self.__data else: #UDP if self.verbose: print ("_rc len is {}".format(len(self.__data))) return self.__data #without headers elif self.__response == const.CMD_PREPARE_DATA: data = [] size = self.__get_data_size() if self.verbose: print ("recieve chunk: prepare data size is {}".format(size)) if self.tcp: if len(self.__data) >= (8 + size): #prepare data with actual data! should be 8+size+32 data_recv = self.__data[8:] # test, maybe -32 else: data_recv = self.__sock.recv(size + 32) #could have two commands resp, broken_header = self.__recieve_tcp_data(data_recv, size) data.append(resp) # get CMD_ACK_OK if len(broken_header) < 16: data_recv = broken_header + self.__sock.recv(16) else: data_recv = broken_header #could be broken if len(data_recv) < 16: print ("trying to complete broken ACK %s /16" % len(data_recv)) if self.verbose: print (data_recv.encode('hex')) #todo python3 data_recv += self.__sock.recv(16 - len(data_recv)) #TODO: CHECK HERE_! if not self.__test_tcp_top(data_recv): if self.verbose: print ("invalid chunk tcp ACK OK") return None #b''.join(data) # incomplete? response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: if self.verbose: print ("chunk tcp ACK OK!") return b''.join(data) if self.verbose: print("bad response %s" % data_recv) if self.verbose: print (codecs.encode(data,'hex')) return None return resp #else udp while True: #limitado por respuesta no por tamaño data_recv = self.__sock.recv(1024+8) response = unpack('<4H', data_recv[:8])[0] if self.verbose: print ("# packet response is: {}".format(response)) if response == const.CMD_DATA: data.append(data_recv[8:]) #header turncated size -= 1024 #UDP elif response == const.CMD_ACK_OK: break #without problem. else: #truncado! continuar? if self.verbose: print ("broken!") break if self.verbose: print ("still needs %s" % size) return b''.join(data) else: if self.verbose: print ("invalid response %s" % self.__response) return None #("can't get user template") def __read_chunk(self, start, size): """ read a chunk from buffer """ for _retries in range(3): command = 1504 #CMD_READ_BUFFER command_string = pack('= 8:#TODO RETEST ZK6!!! uid, status, timestamp, punch = unpack('HB4sB', attendance_data.ljust(8, b'\x00')[:8]) if self.verbose: print (codecs.encode(attendance_data[:8], 'hex')) attendance_data = attendance_data[8:] tuser = list(filter(lambda x: x.uid == uid, users)) if not tuser: user_id = str(uid) #TODO revisar pq else: user_id = tuser[0].user_id timestamp = self.__decode_time(timestamp) attendance = Attendance(user_id, timestamp, status, punch, uid) # punch? attendances.append(attendance) elif record_size == 16: # extended while len(attendance_data) >= 16: #TODO RETEST ZK6 user_id, timestamp, status, punch, reserved, workcode = unpack('= 40: uid, user_id, status, timestamp, punch, space = unpack('