# -*- coding: utf-8 -*- import sys from datetime import datetime from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket, timeout from struct import pack, unpack import codecs from . import const from .attendance import Attendance from .exception import ZKErrorConnection, ZKErrorResponse, ZKNetworkError from .user import User from .finger import Finger def safe_cast(val, to_type, default=None): #https://stackoverflow.com/questions/6330071/safe-casting-in-python try: return to_type(val) except (ValueError, TypeError): return default def make_commkey(key, session_id, ticks=50): """ take a password and session_id and scramble them to send to the machine. copied from commpro.c - MakeKey """ key = int(key) session_id = int(session_id) k = 0 for i in range(32): if (key & (1 << i)): k = (k << 1 | 1) else: k = k << 1 k += session_id k = pack(b'I', k) k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ ord('Z'), k[1] ^ ord('K'), k[2] ^ ord('S'), k[3] ^ ord('O')) k = unpack(b'HH', k) k = pack(b'HH', k[1], k[0]) B = 0xff & ticks k = unpack(b'BBBB', k) k = pack( b'BBBB', k[0] ^ B, k[1] ^ B, B, k[3] ^ B) return k class ZK_helper(object): """ ZK helper class """ def __init__(self, ip, port=4370): """ Construct a new 'ZK_helper' object. """ self.address = (ip, port) self.ip = ip self.port = port def test_ping(self): """ Returns True if host responds to a ping request :return: bool """ import subprocess, platform # Ping parameters as function of OS ping_str = "-n 1" if platform.system().lower()=="windows" else "-c 1 -W 5" args = "ping " + " " + ping_str + " " + self.ip need_sh = False if platform.system().lower()=="windows" else True # Ping return subprocess.call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=need_sh) == 0 def test_tcp(self): """ test TCP connection """ self.client = socket(AF_INET, SOCK_STREAM) self.client.settimeout(10) res = self.client.connect_ex(self.address) self.client.close() return res def test_udp(self): """ test UDP connection """ self.client = socket(AF_INET, SOCK_DGRAM) self.client.settimeout(10) class ZK(object): """ ZK main class """ def __init__(self, ip, port=4370, timeout=60, password=0, force_udp=False, ommit_ping=False, verbose=False, encoding='UTF-8'): """ Construct a new 'ZK' object. :param ip: machine's IP address :param port: machine's port :param timeout: timeout number :param password: passint :param force_udp: use UDP connection :param omit_ping: check ip using ping before connect :param verbose: showing log while run the commands :param encoding: user encoding """ User.encoding = encoding self.__address = (ip, port) self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(timeout) self.__timeout = timeout self.__password = password # passint self.__session_id = 0 self.__reply_id = const.USHRT_MAX - 1 self.__data_recv = None self.__data = None self.is_connect = False self.is_enabled = True self.helper = ZK_helper(ip, port) self.force_udp = force_udp self.ommit_ping = ommit_ping self.verbose = verbose self.encoding = encoding self.tcp = not force_udp self.users = 0 self.fingers = 0 self.records = 0 self.dummy = 0 self.cards = 0 self.fingers_cap = 0 self.users_cap = 0 self.rec_cap = 0 self.faces = 0 self.faces_cap = 0 self.fingers_av = 0 self.users_av = 0 self.rec_av = 0 self.next_uid = 1 self.next_user_id='1' self.user_packet_size = 28 # default zk6 self.end_live_capture = False def __nonzero__(self): """ for boolean test """ return self.is_connect def __create_socket(self): if self.tcp: self.__sock = socket(AF_INET, SOCK_STREAM) self.__sock.settimeout(self.__timeout) self.__sock.connect_ex(self.__address) else: self.__sock = socket(AF_INET, SOCK_DGRAM) self.__sock.settimeout(self.__timeout) def __create_tcp_top(self, packet): """ witch the complete packet set top header """ length = len(packet) top = pack('= const.USHRT_MAX: reply_id -= const.USHRT_MAX buf = pack('<4H', command, checksum, session_id, reply_id) return buf + command_string def __create_checksum(self, p): """ Calculates the checksum of the packet to be sent to the time clock Copied from zkemsdk.c """ l = len(p) checksum = 0 while l > 1: checksum += unpack('H', pack('BB', p[0], p[1]))[0] p = p[2:] if checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX l -= 2 if l: checksum = checksum + p[-1] while checksum > const.USHRT_MAX: checksum -= const.USHRT_MAX checksum = ~checksum while checksum < 0: checksum += const.USHRT_MAX return pack('H', checksum) def __test_tcp_top(self, packet): """ return size! """ if len(packet)<=8: return 0 tcp_header = unpack('= 80: fields = unpack('20i', self.__data[:80]) self.users = fields[4] self.fingers = fields[6] self.records = fields[8] self.dummy = fields[10] #??? self.cards = fields[12] self.fingers_cap = fields[14] self.users_cap = fields[15] self.rec_cap = fields[16] self.fingers_av = fields[17] self.users_av = fields[18] self.rec_av = fields[19] self.__data = self.__data[80:] if len(self.__data) >= 12: #face info fields = unpack('3i', self.__data[:12]) #dirty hack! we need more information self.faces = fields[0] self.faces_cap = fields[2] return True else: raise ZKErrorResponse("can't read sizes") def unlock(self, time=3): """ unlock the door\n thanks to https://github.com/SoftwareHouseMerida/pyzk/ :param time: define delay in seconds :return: bool """ command = const.CMD_UNLOCK command_string = pack("I",int(time)*10) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Can't open door") def __str__(self): """ for debug """ return "ZK %s://%s:%s users[%i]:%i/%i fingers:%i/%i, records:%i/%i faces:%i/%i" % ( "tcp" if self.tcp else "udp", self.__address[0], self.__address[1], self.user_packet_size, self.users, self.users_cap, self.fingers, self.fingers_cap, self.records, self.rec_cap, self.faces, self.faces_cap ) def restart(self): """ restart the device :return: bool """ command = const.CMD_RESTART cmd_response = self.__send_command(command) if cmd_response.get('status'): self.is_connect = False self.next_uid = 1 return True else: raise ZKErrorResponse("can't restart device") def get_time(self): """ :return: the machine's time """ command = const.CMD_GET_TIME response_size = 1032 cmd_response = self.__send_command(command, b'', response_size) if cmd_response.get('status'): return self.__decode_time(self.__data[:4]) else: raise ZKErrorResponse("can't get time") def set_time(self, timestamp): """ set Device time (pass datetime object) :param timestamp: python datetime object """ command = const.CMD_SET_TIME command_string = pack(b'I', self.__encode_time(timestamp)) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't set time") def poweroff(self): """ shutdown the machine """ command = const.CMD_POWEROFF command_string = b'' response_size = 1032 cmd_response = self.__send_command(command, command_string, response_size) if cmd_response.get('status'): self.is_connect = False self.next_uid = 1 return True else: raise ZKErrorResponse("can't poweroff") def refresh_data(self): command = const.CMD_REFRESHDATA cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("can't refresh data") def test_voice(self, index=0): """ play test voice:\n 0 Thank You\n 1 Incorrect Password\n 2 Access Denied\n 3 Invalid ID\n 4 Please try again\n 5 Dupicate ID\n 6 The clock is flow\n 7 The clock is full\n 8 Duplicate finger\n 9 Duplicated punch\n 10 Beep kuko\n 11 Beep siren\n 12 -\n 13 Beep bell\n 14 -\n 15 -\n 16 -\n 17 -\n 18 Windows(R) opening sound\n 19 -\n 20 Fingerprint not emolt\n 21 Password not emolt\n 22 Badges not emolt\n 23 Face not emolt\n 24 Beep standard\n 25 -\n 26 -\n 27 -\n 28 -\n 29 -\n 30 Invalid user\n 31 Invalid time period\n 32 Invalid combination\n 33 Illegal Access\n 34 Disk space full\n 35 Duplicate fingerprint\n 36 Fingerprint not registered\n 37 -\n 38 -\n 39 -\n 40 -\n 41 -\n 42 -\n 43 -\n 43 -\n 45 -\n 46 -\n 47 -\n 48 -\n 49 -\n 50 -\n 51 Focus eyes on the green box\n 52 -\n 53 -\n 54 -\n 55 -\n :param index: int sound index :return: bool """ command = const.CMD_TESTVOICE command_string = pack("I", index) cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): return True else: return False def set_user(self, uid=None, name='', privilege=0, password='', group_id='', user_id='', card=0): """ create or update user by uid :param name: name ot the user :param privilege: check the const.py for reference :param password: int password :param group_id: group ID :param user_id: your own user ID :param card: card :return: bool """ command = const.CMD_USER_WRQ if uid is None: uid = self.next_uid if not user_id: user_id = self.next_user_id if not user_id: user_id = str(uid) #ZK6 needs uid2 == uid #TODO: check what happens if name is missing... if privilege not in [const.USER_DEFAULT, const.USER_ADMIN]: privilege = const.USER_DEFAULT privilege = int(privilege) if self.user_packet_size == 28: #self.firmware == 6: if not group_id: group_id = 0 try: command_string = pack('HB5s8sIxBHI', uid, privilege, password.encode(self.encoding, errors='ignore'), name.encode(self.encoding, errors='ignore'), card, int(group_id), 0, int(user_id)) except Exception as e: if self.verbose: print("s_h Error pack: %s" % e) if self.verbose: print("Error pack: %s" % sys.exc_info()[0]) raise ZKErrorResponse("Can't pack user") else: name_pad = name.encode(self.encoding, errors='ignore').ljust(24, b'\x00')[:24] card_str = pack('= 28: uid, privilege, password, name, card, group_id, timezone, user_id = unpack(' max_uid: max_uid = uid password = (password.split(b'\x00')[0]).decode(self.encoding, errors='ignore') name = (name.split(b'\x00')[0]).decode(self.encoding, errors='ignore').strip() group_id = str(group_id) user_id = str(user_id) #TODO: check card value and find in ver8 if not name: name = "NN-%s" % user_id user = User(uid, name, privilege, password, group_id, user_id, card) users.append(user) if self.verbose: print("[6]user:",uid, privilege, password, name, card, group_id, timezone, user_id) userdata = userdata[28:] else: while len(userdata) >= 72: uid, privilege, password, name, card, group_id, user_id = unpack(' max_uid: max_uid = uid if not name: name = "NN-%s" % user_id user = User(uid, name, privilege, password, group_id, user_id, card) users.append(user) userdata = userdata[72:] max_uid += 1 self.next_uid = max_uid self.next_user_id = str(max_uid) while True: if any(u for u in users if u.user_id == self.next_user_id): max_uid += 1 self.next_user_id = str(max_uid) else: break return users def cancel_capture(self): """ cancel capturing finger :return: bool """ command = const.CMD_CANCELCAPTURE cmd_response = self.__send_command(command) return bool(cmd_response.get('status')) def verify_user(self): """ start verify finger mode (after capture) :return: bool """ command = const.CMD_STARTVERIFY cmd_response = self.__send_command(command) if cmd_response.get('status'): return True else: raise ZKErrorResponse("Cant Verify") def reg_event(self, flags): """ reg events """ command = const.CMD_REG_EVENT command_string = pack ("I", flags) cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): raise ZKErrorResponse("cant' reg events %i" % flags) def set_sdk_build_1(self): command = const.CMD_OPTIONS_WRQ command_string = b"SDKBuild=1" cmd_response = self.__send_command(command, command_string) if not cmd_response.get('status'): return False return True def enroll_user(self, uid=0, temp_id=0, user_id=''): """ start enroll user :param uid: uid :param temp_id: template id :param user_id: user ID :return: bool """ command = const.CMD_STARTENROLL done = False if not user_id: users = self.get_users() users = list(filter(lambda x: x.uid==uid, users)) if len(users) >= 1: user_id = users[0].user_id else: return False if self.tcp: command_string = pack('<24sbb',str(user_id).encode(), temp_id, 1) else: command_string = pack(' 16: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 0 or res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break else: if len(data_recv) > 8: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout") break if self.verbose: print ("A:%i esperando 2do regevent" % attempts) data_recv = self.__sock.recv(1032) self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: if len(data_recv) > 8: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 else: if len(data_recv) > 8: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 6 or res == 4: if self.verbose: print ("posible timeout o reg Fallido") break elif res == 0x64: if self.verbose: print ("ok, continue?") attempts -= 1 if attempts == 0: data_recv = self.__sock.recv(1032) self.__ack_ok() if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] else: res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0] if self.verbose: print("res %i" % res) if res == 5: if self.verbose: print ("finger duplicate") if res == 6 or res == 4: if self.verbose: print ("posible timeout") if res == 0: size = unpack("H", data_recv.ljust(16,b"\x00")[10:12])[0] pos = unpack("H", data_recv.ljust(16,b"\x00")[12:14])[0] if self.verbose: print("enroll ok", size, pos) done = True self.__sock.settimeout(self.__timeout) self.reg_event(0) # TODO: test self.cancel_capture() self.verify_user() return done def live_capture(self, new_timeout=10): """ try live capture of events """ was_enabled = self.is_enabled users = self.get_users() self.cancel_capture() self.verify_user() if not self.is_enabled: self.enable_device() if self.verbose: print ("start live_capture") self.reg_event(const.EF_ATTLOG) self.__sock.settimeout(new_timeout) self.end_live_capture = False while not self.end_live_capture: try: if self.verbose: print ("esperando event") data_recv = self.__sock.recv(1032) self.__ack_ok() if self.tcp: size = unpack('= 12: if len(data) == 12: user_id, status, punch, timehex = unpack('= 52: user_id, status, punch, timehex, _other = unpack('<24sBB6s20s', data[:52]) data = data[52:] if isinstance(user_id, int): user_id = str(user_id) else: user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore') timestamp = self.__decode_timehex(timehex) tuser = list(filter(lambda x: x.user_id == user_id, users)) if not tuser: uid = int(user_id) else: uid = tuser[0].uid yield Attendance(user_id, timestamp, status, punch, uid) except timeout: if self.verbose: print ("time out") yield None # return to keep watching except (KeyboardInterrupt, SystemExit): if self.verbose: print ("break") break if self.verbose: print ("exit gracefully") self.__sock.settimeout(self.__timeout) self.reg_event(0) if not was_enabled: self.disable_device() def clear_data(self): """ clear all data (included: user, attendance report, finger database) :return: bool """ command = const.CMD_CLEAR_DATA command_string = b'' cmd_response = self.__send_command(command, command_string) if cmd_response.get('status'): self.next_uid = 1 return True else: raise ZKErrorResponse("can't clear data") def __recieve_tcp_data(self, data_recv, size): """ data_recv, raw tcp packet must analyze tcp_length must return data, broken """ data = [] tcp_length = self.__test_tcp_top(data_recv) if self.verbose: print ("tcp_length {}, size {}".format(tcp_length, size)) if tcp_length <= 0: if self.verbose: print ("Incorrect tcp packet") return None, b"" if (tcp_length - 8) < size: if self.verbose: print ("tcp length too small... retrying") resp, bh = self.__recieve_tcp_data(data_recv, tcp_length - 8) data.append(resp) size -= len(resp) if self.verbose: print ("new tcp DATA packet to fill misssing {}".format(size)) data_recv = bh + self.__sock.recv(size + 16 ) if self.verbose: print ("new tcp DATA starting with {} bytes".format(len(data_recv))) resp, bh = self.__recieve_tcp_data(data_recv, size) data.append(resp) if self.verbose: print ("for misssing {} recieved {} with extra {}".format(size, len(resp), len(bh))) return b''.join(data), bh recieved = len(data_recv) if self.verbose: print ("recieved {}, size {}".format(recieved, size)) response = unpack('HHHH', data_recv[8:16])[0] if recieved >= (size + 32): if response == const.CMD_DATA: resp = data_recv[16 : size + 16] if self.verbose: print ("resp complete len {}".format(len(resp))) return resp, data_recv[size + 16:] else: if self.verbose: print("incorrect response!!! {}".format(response)) return None, b"" else: if self.verbose: print ("try DATA incomplete (actual valid {})".format(recieved-16)) data.append(data_recv[16 : size + 16 ]) size -= recieved - 16 broken_header = b"" if size < 0: broken_header = data_recv[size:] if self.verbose: print ("broken", (broken_header).encode('hex')) if size > 0: data_recv = self.__recieve_raw_data(size) data.append(data_recv) return b''.join(data), broken_header def __recieve_raw_data(self, size): """ partial data ? """ data = [] if self.verbose: print ("expecting {} bytes raw data".format(size)) while size > 0: data_recv = self.__sock.recv(size) recieved = len(data_recv) if self.verbose: print ("partial recv {}".format(recieved)) if recieved < 100 and self.verbose: print (" recv {}".format(codecs.encode(data_recv, 'hex'))) data.append(data_recv) size -= recieved if self.verbose: print ("still need {}".format(size)) return b''.join(data) def __recieve_chunk(self): """ recieve a chunk """ if self.__response == const.CMD_DATA: if self.tcp: if self.verbose: print ("_rc_DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length)) if len(self.__data) < (self.__tcp_length - 8): need = (self.__tcp_length - 8) - len(self.__data) if self.verbose: print ("need more data: {}".format(need)) more_data = self.__recieve_raw_data(need) return b''.join([self.__data, more_data]) else: if self.verbose: print ("Enough data") return self.__data else: if self.verbose: print ("_rc len is {}".format(len(self.__data))) return self.__data elif self.__response == const.CMD_PREPARE_DATA: data = [] size = self.__get_data_size() if self.verbose: print ("recieve chunk: prepare data size is {}".format(size)) if self.tcp: if len(self.__data) >= (8 + size): data_recv = self.__data[8:] else: data_recv = self.__data[8:] + self.__sock.recv(size + 32) resp, broken_header = self.__recieve_tcp_data(data_recv, size) data.append(resp) # get CMD_ACK_OK if len(broken_header) < 16: data_recv = broken_header + self.__sock.recv(16) else: data_recv = broken_header if len(data_recv) < 16: print ("trying to complete broken ACK %s /16" % len(data_recv)) if self.verbose: print (data_recv.encode('hex')) data_recv += self.__sock.recv(16 - len(data_recv)) #TODO: CHECK HERE_! if not self.__test_tcp_top(data_recv): if self.verbose: print ("invalid chunk tcp ACK OK") return None response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: if self.verbose: print ("chunk tcp ACK OK!") return b''.join(data) if self.verbose: print("bad response %s" % data_recv) if self.verbose: print (codecs.encode(data,'hex')) return None return resp while True: data_recv = self.__sock.recv(1024+8) response = unpack('<4H', data_recv[:8])[0] if self.verbose: print ("# packet response is: {}".format(response)) if response == const.CMD_DATA: data.append(data_recv[8:]) size -= 1024 elif response == const.CMD_ACK_OK: break else: if self.verbose: print ("broken!") break if self.verbose: print ("still needs %s" % size) return b''.join(data) else: if self.verbose: print ("invalid response %s" % self.__response) return None def __read_chunk(self, start, size): """ read a chunk from buffer """ for _retries in range(3): command = 1504 command_string = pack('= 8: uid, status, timestamp, punch = unpack('HB4sB', attendance_data.ljust(8, b'\x00')[:8]) if self.verbose: print (codecs.encode(attendance_data[:8], 'hex')) attendance_data = attendance_data[8:] tuser = list(filter(lambda x: x.uid == uid, users)) if not tuser: user_id = str(uid) else: user_id = tuser[0].user_id timestamp = self.__decode_time(timestamp) attendance = Attendance(user_id, timestamp, status, punch, uid) attendances.append(attendance) elif record_size == 16: while len(attendance_data) >= 16: user_id, timestamp, status, punch, reserved, workcode = unpack('= 40: uid, user_id, status, timestamp, punch, space = unpack('