# -*- coding: utf-8 -*- import sys from datetime import datetime from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket, timeout from struct import pack, unpack import codecs from zk import const from zk.attendance import Attendance from zk.exception import ZKErrorConnection, ZKErrorResponse, ZKNetworkError from zk.user import User from zk.finger import Finger def safe_cast(val, to_type, default=None): #https://stackoverflow.com/questions/6330071/safe-casting-in-python try: return to_type(val) except (ValueError, TypeError): return default def make_commkey(key, session_id, ticks=50): """ take a password and session_id and scramble them to send to the machine. copied from commpro.c - MakeKey """ key = int(key) session_id = int(session_id) k = 0 for i in range(32): if (key & (1 << i)): k = (k << 1 | 1) else: k = k << 1 k += session_id k = pack(b'I', k) k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ ord('Z'), k[1] ^ ord('K'), k[2] ^ ord('S'), k[3] ^ ord('O')) k = unpack(b'HH', k) k = pack(b'HH', k[1], k[0]) B = 0xff & ticks k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ B, k[1] ^ B, B, k[3] ^ B) return k class ZK_helper(object): """ helper class """ def __init__(self, ip, port=4370): self.address = (ip, port) self.ip = ip self.port = port def test_ping(self): """ Returns True if host responds to a ping request """ import subprocess, platform # Ping parameters as function of OS ping_str = "-n 1" if platform.system().lower()=="windows" else "-c 1 -W 5" args = "ping " + " " + ping_str + " " + self.ip need_sh = False if platform.system().lower()=="windows" else True # Ping return subprocess.call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=need_sh) == 0 def test_tcp(self): self.client = socket(AF_INET, SOCK_STREAM) self.client.settimeout(10) res = self.client.connect_ex(self.address) self.client.close() return res def test_udp(self): self.client = socket(AF_INET, SOCK_DGRAM) self.client.settimeout(10) class ZK(object): """ ZK main class """ def __init__(self, ip, port=4370, timeout=60, password=0, force_udp=False, ommit_ping=False, verbose=False, encoding='UTF-8'): """ initialize instance """ User.encoding = encoding self.__address = (ip, port) self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(timeout) self.__timeout = timeout self.__password = password # passint self.__session_id = 0 self.__reply_id = const.USHRT_MAX - 1 self.__data_recv = None self.__data = None self.is_connect = False self.is_enabled = True self.helper = ZK_helper(ip, port) self.force_udp = force_udp self.ommit_ping = ommit_ping self.verbose = verbose self.encoding = encoding self.tcp = not force_udp self.users = 0 self.fingers = 0 self.records = 0 self.dummy = 0 self.cards = 0 self.fingers_cap = 0 self.users_cap = 0 self.rec_cap = 0 self.faces = 0 self.faces_cap = 0 self.fingers_av = 0 self.users_av = 0 self.rec_av = 0 self.next_uid = 1 self.next_user_id='1' self.user_packet_size = 28 # default zk6 self.end_live_capture = False def __nonzero__(self): """ for boolean test """ return self.is_connect def __create_socket(self): if self.tcp: self.__sock = socket(AF_INET, SOCK_STREAM) self.__sock.settimeout(self.__timeout) self.__sock.connect_ex(self.__address) else: self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(self.__timeout) def __create_tcp_top(self, packet): """ witch the complete packet set top header """ length = len(packet) top = pack('= const.USHRT_MAX: reply_id -= const.USHRT_MAX buf = pack('<4H', command, checksum, session_id, reply_id) return buf + command_string def __create_checksum(self, p): """ Calculates the checksum of the packet to be sent to the time clock Copied from zkemsdk.c """ l = len(p) checksum = 0 while l > 1: checksum += unpack('H', pack('BB', p[0], p[1]))[0] p = p[2:] if checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX l -= 2 if l: checksum = checksum + p[-1] while checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX checksum = ~checksum while checksum < 0: checksum += const.USHRT_MAX return pack('H', checksum) def __test_tcp_top(self, packet): """ return size! """ if len(packet)<=8: return 0 tcp_header = unpack('= 80: fields = unpack('20i', self.__data[:80]) self.users = fields[4] self.fingers = fields[6] self.records = fields[8] self.dummy = fields[10] #??? self.cards = fields[12] self.fingers_cap = fields[14] self.users_cap = fields[15] self.rec_cap = fields[16] self.fingers_av = fields[17] self.users_av = fields[18] self.rec_av = fields[19] self.__data = self.__data[80:] if len(self.__data) >= 12: #face info fields = unpack('3i', self.__data[:12]) #dirty hack! we need more information self.faces = fields[0] self.faces_cap = fields[2] return True else: raise ZKErrorResponse("can't read sizes") def unlock(self, time=3): """ :param time: define time in seconds :return: thanks to https://github.com/SoftwareHouseMerida/pyzk/ """ command = const.CMD_UNLOCK command_string = pack("I",int(time)*10) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Can't open door") def __str__(self): """ for debug """ return "ZK %s://%s:%s users[%i]:%i/%i fingers:%i/%i, records:%i/%i faces:%i/%i" % ( "tcp" if self.tcp else "udp", self.__address[0], self.__address[1], self.user_packet_size, self.users, self.users_cap, self.fingers, self.fingers_cap, self.records, self.rec_cap, self.faces, self.faces_cap ) def restart(self): """ restart the device """ command = const.CMD_RESTART cmd_response = self.__send_command(command) if cmd_response.get('status'): self.is_connect = False self.next_uid = 1 return True else: raise ZKErrorResponse("can't restart device") def get_time(self): """ get Device Time """ command = const.CMD_GET_TIME response_size = 1032 cmd_response = self.__send_command(command, b'', response_size) if cmd_response.get('status'): return self.__decode_time(self.__data[:4]) else: raise ZKErrorResponse("can't get time") def set_time(self, timestamp): """ set Device time (pass datetime object) """ command = const.CMD_SET_TIME command_string = pack(b'I', self.__encode_time(timestamp)) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't set time") def poweroff(self): """ shutdown the device """ command = const.CMD_POWEROFF command_string = b'' response_size = 1032 cmd_response = self.__send_command(command, command_string, response_size) if cmd_response.get('status'): self.is_connect = False self.next_uid = 1 return True else: raise ZKErrorResponse("can't poweroff") def refresh_data(self): """ shutdown the device """ command = const.CMD_REFRESHDATA cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't refresh data") def test_voice(self, index=0): """ play test voice: 0 Thank You 1 Incorrect Password 2 Access Denied 3 Invalid ID 4 Please try again 5 Re-enter ID 6 The clock is full 7 The clock is full 8 Duplicate finger 9 Accepted. Thank you 10 beep kuko 11 beep siren 12 - 13 beep bell /* ** *** HELP! TRANSLATE TO ENGLISH THE FOLLOWING ITEMS *** ** * */ 14 excedido tiempo p esta operacion /- 15 coloque su dedo de nuevo /- 16 coloque su dedo por ultima vez /- 17 ATN numero de tarjeta está repetida /- 18 proceso de registro correcto * /- 19 borrado correcto /- 20 Numero de usuario / ponga la caja de ojos 21 ATN se ha llegado al max num usuarios /- 22 verificacion de usuarios /- 23 usuario no registrado /- 24 ATN se ha llegado al num max de registros /- 25 ATN la puerta no esta cerrada /- 26 registro de usuarios /- 27 borrado de usuarios /- 28 coloque su dedo /- 29 registre la tarjeta de administrador /- 30 0 /- 31 1 /- 32 2 /- 33 3 /- 34 4 /- 35 5 /- 36 6 /- 37 7 /- 38 8 /- 39 9 /- 40 PFV seleccione numero de usuario /- 41 registrar /- 42 operacion correcta /- 43 PFV acerque su tarjeta /- 43 la tarjeta ha sido registrada /- 45 error en operacion /- 46 PFV acerque tarjeta de administracion, p confirmacion /- 47 descarga de fichajes /- 48 descarga de usuarios /- 49 carga de usuarios /- 50 actualizan de firmware /- 51 ejeuctar ficheros de configuracion /- 52 confirmación de clave de acceso correcta /- 53 error en operacion de tclado /- 54 borrar todos los usuarios /- 55 restaurar terminal con configuracion por defecto /- 56 introduzca numero de usuario /- 57 teclado bloqueado /- 58 error en la gestión de la tarjeta /- 59 establezca una clave de acceso /- 60 pulse el teclado /- 61 zona de accceso invalida /- 62 acceso combinado invĺlido /- 63 verificación multiusuario /- 64 modo de verificación inválido /- 65 - /- """ command = const.CMD_TESTVOICE command_string = pack("I", index) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: return False def set_user(self, uid=None, name='', privilege=0, password='', group_id='', user_id='', card=0): """ create or update user by uid """ command = const.CMD_USER_WRQ if uid is None: uid = self.next_uid if not user_id: user_id = self.next_user_id if not user_id: user_id = str(uid) #ZK6 needs uid2 == uid #TODO: check what happens if name is missing... if privilege not in [const.USER_DEFAULT, const.USER_ADMIN]: privilege = const.USER_DEFAULT privilege = int(privilege) if self.user_packet_size == 28: #self.firmware == 6: if not group_id: group_id = 0 try: command_string = pack('HB5s8sIxBHI', uid, privilege, password.encode(self.encoding, errors='ignore'), name.encode(self.encoding, errors='ignore'), card, int(group_id), 0, int(user_id)) except Exception as e: if self.verbose: print("s_h Error pack: %s" % e) if self.verbose: print("Error pack: %s" % sys.exc_info()[0]) raise ZKErrorResponse("Can't pack user") else: name_pad = name.encode(self.encoding, errors='ignore').ljust(24, b'\x00')[:24] card_str = pack('i', int(card))[:4] command_string = pack('HB8s24s4sx7sx24s', uid, privilege, password.encode(self.encoding, errors='ignore'), name_pad, card_str, group_id.encode(), user_id.encode()) response_size = 1024 #TODO check response? cmd_response = self.__send_command(command, command_string, response_size) if not cmd_response.get('status'): raise ZKErrorResponse("Can't set user") self.refresh_data() if self.next_uid == uid: self.next_uid += 1 # better recalculate again if self.next_user_id == user_id: self.next_user_id = str(self.next_uid) def save_user_template(self, user, fingers=[]): """ save user and template """ if not isinstance(user, User): users = self.get_users() tusers = list(filter(lambda x: x.uid==user, users)) if len(tusers) == 1: user = tusers[0] else: tusers = list(filter(lambda x: x.user_id==str(user), users)) if len(tusers) == 1: user = tusers[0] else: raise ZKErrorResponse("Can't find user") if isinstance(fingers, Finger): fingers = [fingers] fpack = b"" table = b"" fnum = 0x10 tstart = 0 for finger in fingers: tfp = finger.repack_only() table += pack("= 28: uid, privilege, password, name, card, group_id, timezone, user_id = unpack(' max_uid: max_uid = uid password = (password.split(b'\x00')[0]).decode(self.encoding, errors='ignore') name = (name.split(b'\x00')[0]).decode(self.encoding, errors='ignore').strip() group_id = str(group_id) user_id = str(user_id) #TODO: check card value and find in ver8 if not name: name = "NN-%s" % user_id user = User(uid, name, privilege, password, group_id, user_id, card) users.append(user) if self.verbose: print("[6]user:",uid, privilege, password, name, card, group_id, timezone, user_id) userdata = userdata[28:] else: while len(userdata) >= 72: uid, privilege, password, name, card, group_id, user_id = unpack(' max_uid: max_uid = uid if not name: name = "NN-%s" % user_id user = User(uid, name, privilege, password, group_id, user_id, card) users.append(user) userdata = userdata[72:] max_uid += 1 self.next_uid = max_uid self.next_user_id = str(max_uid) while True: if any(u for u in users if u.user_id == self.next_user_id): max_uid += 1 self.next_user_id = str(max_uid) else: break return users def cancel_capture(self): """ cancel capturing finger """ command = const.CMD_CANCELCAPTURE cmd_response = self.__send_command(command) return bool(cmd_response.get('status')) def verify_user(self): """ start verify finger mode (after capture) """ command = const.CMD_STARTVERIFY cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Cant Verify") def reg_event(self, flags): """ reg events """ command = const.CMD_REG_EVENT command_string = pack ("I", flags) cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): raise ZKErrorResponse("cant' reg events %i" % flags) def set_sdk_build_1(self): command = const.CMD_OPTIONS_WRQ command_string = b"SDKBuild=1" cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): return False return True def enroll_user(self, uid=0, temp_id=0, user_id=''): """ start enroll user we need user_id (uid2) """ command = const.CMD_STARTENROLL done = False if not user_id: users = self.get_users() users = list(filter(lambda x: x.uid==uid, users)) if len(users) >= 1: user_id = users[0].user_id else: return False if self.tcp: command_string = pack('<24sbb',str(user_id).encode(), temp_id, 1) else: command_string = pack(' 16: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 0 or res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break else: if len(data_recv) > 8: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout") break if self.verbose: print ("A:%i esperando 2do regevent" % attempts) data_recv = self.__sock.recv(1032) self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: if len(data_recv) > 8: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 else: if len(data_recv) > 8: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 if attempts == 0: data_recv = self.__sock.recv(1032) self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] else: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 5: if self.verbose: print ("finger duplicate") if res == 6 or res == 4: if self.verbose: print ("posible timeout") if res == 0: size = unpack("H", data_recv.ljust(16,b"\x00")[10:12])[0] pos = unpack("H", data_recv.ljust(16,b"\x00")[12:14])[0] if self.verbose: print("enroll ok", size, pos) done = True self.__sock.settimeout(self.__timeout) self.reg_event(0) # TODO: test self.cancel_capture() self.verify_user() return done def live_capture(self, new_timeout=10): """ try live capture of events""" was_enabled = self.is_enabled users = self.get_users() self.cancel_capture() self.verify_user() if not self.is_enabled: self.enable_device() if self.verbose: print ("start live_capture") self.reg_event(const.EF_ATTLOG) self.__sock.settimeout(new_timeout) self.end_live_capture = False while not self.end_live_capture: try: if self.verbose: print ("esperando event") data_recv = self.__sock.recv(1032) self.__ack_ok() if self.tcp: size = unpack('= 12: if len(data) == 12: user_id, status, punch, timehex = unpack('= 52: user_id, status, punch, timehex, _other = unpack('<24sBB6s20s', data[:52]) data = data[52:] if isinstance(user_id, int): user_id = str(user_id) else: user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore') timestamp = self.__decode_timehex(timehex) tuser = list(filter(lambda x: x.user_id == user_id, users)) if not tuser: uid = int(user_id) else: uid = tuser[0].uid yield Attendance(user_id, timestamp, status, punch, uid) except timeout: if self.verbose: print ("time out") yield None # return to keep watching except (KeyboardInterrupt, SystemExit): if self.verbose: print ("break") break if self.verbose: print ("exit gracefully") self.__sock.settimeout(self.__timeout) self.reg_event(0) if not was_enabled: self.disable_device() def clear_data(self): """ clear all data (include: user, attendance report, finger database ) """ command = const.CMD_CLEAR_DATA command_string = '' cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): self.is_connect = False self.next_uid = 1 return True else: raise ZKErrorResponse("can't clear data") def __recieve_tcp_data(self, data_recv, size): """ data_recv, raw tcp packet must analyze tcp_length must return data, broken """ data = [] tcp_length = self.__test_tcp_top(data_recv) if self.verbose: print ("tcp_length {}, size {}".format(tcp_length, size)) if tcp_length <= 0: if self.verbose: print ("Incorrect tcp packet") return None, b"" if (tcp_length - 8) < size: if self.verbose: print ("tcp length too small... retrying") resp, bh = self.__recieve_tcp_data(data_recv, tcp_length - 8) data.append(resp) size -= len(resp) if self.verbose: print ("new tcp DATA packet to fill misssing {}".format(size)) data_recv = bh + self.__sock.recv(size + 16 ) if self.verbose: print ("new tcp DATA starting with {} bytes".format(len(data_recv))) resp, bh = self.__recieve_tcp_data(data_recv, size) data.append(resp) if self.verbose: print ("for misssing {} recieved {} with extra {}".format(size, len(resp), len(bh))) return b''.join(data), bh recieved = len(data_recv) if self.verbose: print ("recieved {}, size {}".format(recieved, size)) response = unpack('HHHH', data_recv[8:16])[0] if recieved >= (size + 32): if response == const.CMD_DATA: resp = data_recv[16 : size + 16] if self.verbose: print ("resp complete len {}".format(len(resp))) return resp, data_recv[size + 16:] else: if self.verbose: print("incorrect response!!! {}".format(response)) return None, b"" else: if self.verbose: print ("try DATA incomplete (actual valid {})".format(recieved-16)) data.append(data_recv[16 : size + 16 ]) size -= recieved - 16 broken_header = b"" if size < 0: broken_header = data_recv[size:] if self.verbose: print ("broken", (broken_header).encode('hex')) if size > 0: data_recv = self.__recieve_raw_data(size) data.append(data_recv) return b''.join(data), broken_header def __recieve_raw_data(self, size): """ partial data ? """ data = [] if self.verbose: print ("expecting {} bytes raw data".format(size)) while size > 0: data_recv = self.__sock.recv(size) recieved = len(data_recv) if self.verbose: print ("partial recv {}".format(recieved)) if recieved < 100 and self.verbose: print (" recv {}".format(codecs.encode(data_recv, 'hex'))) data.append(data_recv) size -= recieved if self.verbose: print ("still need {}".format(size)) return b''.join(data) def __recieve_chunk(self): """ recieve a chunk """ if self.__response == const.CMD_DATA: if self.tcp: if self.verbose: print ("_rc_DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length)) if len(self.__data) < (self.__tcp_length - 8): need = (self.__tcp_length - 8) - len(self.__data) if self.verbose: print ("need more data: {}".format(need)) more_data = self.__recieve_raw_data(need) return b''.join([self.__data, more_data]) else: if self.verbose: print ("Enough data") return self.__data else: if self.verbose: print ("_rc len is {}".format(len(self.__data))) return self.__data elif self.__response == const.CMD_PREPARE_DATA: data = [] size = self.__get_data_size() if self.verbose: print ("recieve chunk: prepare data size is {}".format(size)) if self.tcp: if len(self.__data) >= (8 + size): data_recv = self.__data[8:] else: data_recv = self.__data[8:] + self.__sock.recv(size + 32) resp, broken_header = self.__recieve_tcp_data(data_recv, size) data.append(resp) # get CMD_ACK_OK if len(broken_header) < 16: data_recv = broken_header + self.__sock.recv(16) else: data_recv = broken_header if len(data_recv) < 16: print ("trying to complete broken ACK %s /16" % len(data_recv)) if self.verbose: print (data_recv.encode('hex')) data_recv += self.__sock.recv(16 - len(data_recv)) #TODO: CHECK HERE_! if not self.__test_tcp_top(data_recv): if self.verbose: print ("invalid chunk tcp ACK OK") return None response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: if self.verbose: print ("chunk tcp ACK OK!") return b''.join(data) if self.verbose: print("bad response %s" % data_recv) if self.verbose: print (codecs.encode(data,'hex')) return None return resp while True: data_recv = self.__sock.recv(1024+8) response = unpack('<4H', data_recv[:8])[0] if self.verbose: print ("# packet response is: {}".format(response)) if response == const.CMD_DATA: data.append(data_recv[8:]) size -= 1024 elif response == const.CMD_ACK_OK: break else: if self.verbose: print ("broken!") break if self.verbose: print ("still needs %s" % size) return b''.join(data) else: if self.verbose: print ("invalid response %s" % self.__response) return None def __read_chunk(self, start, size): """ read a chunk from buffer """ for _retries in range(3): command = 1504 command_string = pack('= 8: uid, status, timestamp, punch = unpack('HB4sB', attendance_data.ljust(8, b'\x00')[:8]) if self.verbose: print (codecs.encode(attendance_data[:8], 'hex')) attendance_data = attendance_data[8:] tuser = list(filter(lambda x: x.uid == uid, users)) if not tuser: user_id = str(uid) else: user_id = tuser[0].user_id timestamp = self.__decode_time(timestamp) attendance = Attendance(user_id, timestamp, status, punch, uid) attendances.append(attendance) elif record_size == 16: while len(attendance_data) >= 16: user_id, timestamp, status, punch, reserved, workcode = unpack('= 40: uid, user_id, status, timestamp, punch, space = unpack('