diff --git a/README.md b/README.md index 83772aa..934447e 100644 --- a/README.md +++ b/README.md @@ -17,17 +17,17 @@ Complete documentation can be found at [Readthedocs](http://pyzk.readthedocs.io/ Just create a ZK object and you will ready to call api. * Basic Usage -``` +```python from zk import ZK, const conn = None -zk = ZK('192.168.1.10', port=4370, timeout=5) +zk = ZK('192.168.1.201', port=4370, timeout=5) try: - print 'Connecting to device ...' + print ('Connecting to device ...') conn = zk.connect() - print 'Disabling device ...' + print ('Disabling device ...') conn.disable_device() - print 'Firmware Version: : {}'.format(conn.get_firmware_version()) + print ('Firmware Version: : {}'.format(conn.get_firmware_version())) # print '--- Get User ---' users = conn.get_users() for user in users: @@ -35,19 +35,19 @@ try: if user.privilege == const.USER_ADMIN: privilege = 'Admin' - print '- UID #{}'.format(user.uid) - print ' Name : {}'.format(user.name) - print ' Privilege : {}'.format(privilege) - print ' Password : {}'.format(user.password) - print ' Group ID : {}'.format(user.group_id) - print ' User ID : {}'.format(user.user_id) + print ('- UID #{}'.format(user.uid)) + print (' Name : {}'.format(user.name)) + print (' Privilege : {}'.format(privilege)) + print (' Password : {}'.format(user.password)) + print (' Group ID : {}'.format(user.group_id)) + print (' User ID : {}'.format(user.user_id)) - print "Voice Test ..." + print ("Voice Test ...") conn.test_voice() - print 'Enabling device ...' + print ('Enabling device ...') conn.enable_device() -except Exception, e: - print "Process terminate : {}".format(e) +except Exception as e: + print ("Process terminate : {}".format(e)) finally: if conn: conn.disconnect() @@ -100,7 +100,7 @@ conn.get_pin_width() * Get Device use and free Space -``` +```python conn.read_sizes() print(conn) #also: @@ -114,7 +114,7 @@ conn.records_cap * User Operation -``` +```python # Create user conn.set_user(uid=1, name='Fanani M. Ihsan', privilege=const.USER_ADMIN, password='12345678', group_id='', user_id='123', card=0) # Get all users (will return list of User object) @@ -122,12 +122,12 @@ users = conn.get_users() # Delete User conn.delete_user(uid=1) ``` -there is also an enroll_user() (but it doesn't work with some tcp ZK8 devices) +there is also an `enroll_user()` (but it doesn't work with some tcp ZK8 devices) * Fingerprints -``` +```python # Get a single Fingerprint (will return a Finger object) template = conn.get_user_template(uid=1, temp_id=0) #temp_id is the finger to read 0~9 # Get all fingers from DB (will return a list of Finger objects) @@ -141,7 +141,7 @@ conn.save_user_template(user, [fing1 ,fing2]) * Attendance Record -``` +```python # Get attendances (will return list of Attendance object) attendances = conn.get_attendance() # Clear attendances record @@ -150,13 +150,13 @@ conn.clear_attendance() * Test voice -``` +```python conn.test_voice(index=10) # beep or chirp ``` * Device Maintenance -``` +```python # shutdown connected device conn.power_off() # restart connected device @@ -165,6 +165,43 @@ conn.restart() conn.free_data() ``` +Test Machine + +```sh +usage: ./test_machine.py [-h] [-a ADDRESS] [-p PORT] [-T TIMEOUT] [-P PASSWORD] + [-f] [-t] [-r] [-u] [-l] [-D DELETEUSER] [-A ADDUSER] + [-E ENROLLUSER] [-F FINGER] + +ZK Basic Reading Tests + +optional arguments: + -h, --help show this help message and exit + -a ADDRESS, --address ADDRESS + ZK device Address [192.168.1.201] + -p PORT, --port PORT ZK device port [4370] + -T TIMEOUT, --timeout TIMEOUT + Default [10] seconds (0: disable timeout) + -P PASSWORD, --password PASSWORD + Device code/password + -f, --force-udp Force UDP communication + -t, --templates Get templates / fingers + -r, --records Get attendance records + -u, --updatetime Update Date/Time + -l, --live-capture Live Event Capture + -D DELETEUSER, --deleteuser DELETEUSER + Delete a User (uid) + -A ADDUSER, --adduser ADDUSER + Add a User (uid) (and enroll) + -E ENROLLUSER, --enrolluser ENROLLUSER + Enroll a User (uid) + -F FINGER, --finger FINGER + Finger for enroll (fid=0) + + +``` + + + # Related Project * [zkcluster](https://github.com/fananimi/zkcluster/ "zkcluster project") is a django apps to manage multiple fingerprint devices. diff --git a/test.py b/test.py index ab19bd7..d5f1f75 100644 --- a/test.py +++ b/test.py @@ -1,39 +1,36 @@ # -*- coding: utf-8 -*- import sys +sys.path.append("zk") from zk import ZK, const -sys.path.append("zk") - conn = None zk = ZK('192.168.1.201', port=4370, timeout=5) try: - print 'Connecting to device ...' + print ('Connecting to device ...') conn = zk.connect() - print 'Disabling device ...' + print ('Disabling device ...') conn.disable_device() - print 'Firmware Version: : {}'.format(conn.get_firmware_version()) + print ('Firmware Version: : {}'.format(conn.get_firmware_version())) # print '--- Get User ---' users = conn.get_users() for user in users: privilege = 'User' if user.privilege == const.USER_ADMIN: privilege = 'Admin' + print ('- UID #{}'.format(user.uid)) + print (' Name : {}'.format(user.name)) + print (' Privilege : {}'.format(privilege)) + print (' Password : {}'.format(user.password)) + print (' Group ID : {}'.format(user.group_id)) + print (' User ID : {}'.format(user.user_id)) - print '- UID #{}'.format(user.uid) - print ' Name : {}'.format(user.name) - print ' Privilege : {}'.format(privilege) - print ' Password : {}'.format(user.password) - print ' Group ID : {}'.format(user.group_id) - print ' User ID : {}'.format(user.user_id) - print ' Card : {}'.format(user.card) - - print "Voice Test ..." + print ("Voice Test ...") conn.test_voice() - print 'Enabling device ...' + print ('Enabling device ...') conn.enable_device() -except Exception, e: - print "Process terminate : {}".format(e) +except Exception as e: + print ("Process terminate : {}".format(e)) finally: if conn: conn.disconnect() diff --git a/test_machine.py b/test_machine.py index 31a325d..d260a84 100755 --- a/test_machine.py +++ b/test_machine.py @@ -20,35 +20,37 @@ conn = None parser = argparse.ArgumentParser(description='ZK Basic Reading Tests') parser.add_argument('-a', '--address', - help='ZK device Addres [192.168.1.201]', default='192.168.1.201') + help='ZK device Address [192.168.1.201]', default='192.168.1.201') parser.add_argument('-p', '--port', type=int, - help='device port', default=4370) + help='ZK device port [4370]', default=4370) parser.add_argument('-T', '--timeout', type=int, - help='timeout', default=10) + help='Default [10] seconds (0: disable timeout)', default=10) parser.add_argument('-P', '--password', type=int, help='Device code/password', default=0) parser.add_argument('-f', '--force-udp', action="store_true", help='Force UDP communication') +parser.add_argument('-v', '--verbose', action="store_true", + help='Print debug information') parser.add_argument('-t', '--templates', action="store_true", - help='get templates') + help='Get templates / fingers') parser.add_argument('-r', '--records', action="store_true", - help='get records') + help='Get attendance records') parser.add_argument('-u', '--updatetime', action="store_true", - help='Update Date / Time') + help='Update Date/Time') parser.add_argument('-l', '--live-capture', action="store_true", help='Live Event Capture') parser.add_argument('-D', '--deleteuser', type=int, help='Delete a User (uid)', default=0) parser.add_argument('-A', '--adduser', type=int, - help='Add a User (uid)', default=0) + help='Add a User (uid) (and enroll)', default=0) parser.add_argument('-E', '--enrolluser', type=int, help='Enroll a User (uid)', default=0) parser.add_argument('-F', '--finger', type=int, - help='Finger for register', default=0) + help='Finger for enroll (fid=0)', default=0) args = parser.parse_args() -zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp) # , firmware=args.firmware +zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose) try: print('Connecting to device ...') conn = zk.connect() @@ -167,7 +169,12 @@ try: print ("Read Templates...") templates = conn.get_templates() for tem in templates: - print (tem) + tem2 =conn.get_user_template(tem.uid,tem.fid) + if tem == tem2: # compare with alternative method + print ("OK! %s" % tem) + else: + print ("dif-1 %s" % tem) + print ("dif-2 %s" % tem2) if args.records: print ("Read Records...") attendance = conn.get_attendance() @@ -181,11 +188,11 @@ try: print (conn) if args.live_capture: print ('') - print ('--- Live Capture! (press ctrl+C to break)---') #TODO how? + print ('--- Live Capture! (press ctrl+C to break) ---') counter = 0 - for att in conn.live_capture(): + for att in conn.live_capture():# using a generator! if att is None: - #counter += 1 + #counter += 1 #enable to implemet a poorman timeout print ("timeout {}".format(counter)) else: print ("ATT {:>6}: uid:{:>3}, user_id:{:>8} t: {}, s:{}".format(counter, att.uid, att.user_id, att.timestamp, att.status)) diff --git a/test_voice.py b/test_voice.py index 293c3c8..352e64e 100755 --- a/test_voice.py +++ b/test_voice.py @@ -3,10 +3,11 @@ import sys import argparse +sys.path.append("zk") + from time import sleep from zk import ZK, const -sys.path.append("zk") parser = argparse.ArgumentParser(description='ZK Basic Reading Tests') parser.add_argument('-a', '--address', help='ZK device Addres [192.168.1.201]', default='192.168.1.201') @@ -18,12 +19,13 @@ parser.add_argument('-P', '--password', type=int, help='Device code/password', default=0) parser.add_argument('-f', '--force-udp', action="store_true", help='Force UDP communication') - +parser.add_argument('-v', '--verbose', action="store_true", + help='Print debug information') args = parser.parse_args() conn = None -zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp) +zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose) try: print ('Connecting to device ...') conn = zk.connect() diff --git a/zk/base.py b/zk/base.py index c0814e3..b949c2b 100644 --- a/zk/base.py +++ b/zk/base.py @@ -80,7 +80,7 @@ class ZK_helper(object): self.client.close() return res - def test_udp(self): + def test_udp(self): # WIP: self.client = socket(AF_INET, SOCK_DGRAM) self.client.settimeout(10) # fixed test @@ -470,7 +470,7 @@ class ZK(object): def get_user_extend_fmt(self): ''' - determine extend fmt + determine user extend fmt ''' command = const.CMD_OPTIONS_RRQ command_string = b'~UserExtFmt' @@ -502,7 +502,7 @@ class ZK(object): def get_compat_old_firmware(self): ''' - determine extend fmt + determine old firmware ''' command = const.CMD_OPTIONS_RRQ command_string = b'CompatOldFirmware' @@ -538,7 +538,6 @@ class ZK(object): command = const.CMD_GET_PINWIDTH command_string = b' P' response_size = 9 - cmd_response = self.__send_command(command, command_string, response_size) if cmd_response.get('status'): width = self.__data.split(b'\x00')[0] @@ -559,7 +558,6 @@ class ZK(object): """ read sizes """ command = const.CMD_GET_FREE_SIZES response_size = 1024 - cmd_response = self.__send_command(command,b'', response_size) if cmd_response.get('status'): size = len(self.__data) @@ -607,7 +605,7 @@ class ZK(object): raise ZKErrorResponse("can't restart device") def get_time(self): - """obtener la hora del equipo""" + """get Device Time""" command = const.CMD_GET_TIME response_size = 1032 cmd_response = self.__send_command(command, b'', response_size) @@ -617,7 +615,7 @@ class ZK(object): raise ZKErrorResponse("can't get time") def set_time(self, timestamp): - """ colocar la hora del sistema al zk """ + """ set Device time (pass datetime object)""" command = const.CMD_SET_TIME command_string = pack(b'I', self.__encode_time(timestamp)) cmd_response = self.__send_command(command, command_string) @@ -653,7 +651,7 @@ class ZK(object): def test_voice(self, index=0): ''' play test voice - 0 acceso correcto + 0 acceso correcto / acceso correcto 1 password incorrecto / clave incorrecta 2 la memoria del terminal está llena / acceso denegado 3 usuario invalido /codigo no valido @@ -865,8 +863,8 @@ class ZK(object): def get_user_template(self, uid, temp_id=0): """ ZKFinger VX10.0 for tcp: - command = const.CMD_USERTEMP_RRQ - command_string = pack('hb', uid, temp_id) + command = const.CMD_USERTEMP_RRQ (doesn't work always) + command_string = pack('hb', uid, temp_id) """ command = 88 # comando secreto!!! @@ -878,15 +876,16 @@ class ZK(object): return None #("can't get user template") #else if cmd_response.get('code') == const.CMD_DATA: # less than 1024!!! + resp = self.__data[:-1] if self.tcp: - size = len(self.__data[8:]) - return Finger(uid, temp_id, 1, self.__data[8:]) - else: - size = len(self.__data) - return Finger(uid, temp_id, 1, self.__data) + if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug? + resp = resp[:-6] + if self.verbose: print("tcp too small!") + return Finger(uid, temp_id, 1, resp) if cmd_response.get('code') == const.CMD_PREPARE_DATA: data = [] bytes = self.__get_data_size() #TODO: check with size + size = bytes if self.tcp: data_recv = self.__sock.recv(bytes + 32) recieved = len(data_recv) @@ -896,9 +895,11 @@ class ZK(object): response = unpack('HHHH', data_recv[8:16])[0] if recieved >= (bytes + 32): #complete if response == const.CMD_DATA: - resp = data_recv[16:bytes+16] # no ack? - if self.verbose: print ("resp len", len(resp)) - return Finger(uid, temp_id, 1, resp) + resp = data_recv[16:bytes+16][:size-1] # no ack? + if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug? + resp = resp[:-6] + if self.verbose: print ("resp len1", len(resp)) + return Finger(uid, temp_id, 1, resp) #mistery else: if self.verbose: print("broken packet!!!") return None #broken @@ -913,7 +914,10 @@ class ZK(object): data_recv = self.__sock.recv(16) response = unpack('HHHH', data_recv[8:16])[0] if response == const.CMD_ACK_OK: - resp = b''.join(data) + resp = b''.join(data)[:size-1] # testing + if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': + resp = resp[:-6] + if self.verbose: print ("resp len2", len(resp)) return Finger(uid, temp_id, 1, resp) #data_recv[bytes+16:].encode('hex') #included CMD_ACK_OK if self.verbose: print("bad response %s" % data_recv) @@ -936,9 +940,9 @@ class ZK(object): break if self.verbose: print("still needs %s" % bytes) data = b''.join(data) - self.free_data() #uid 32 fing 03, starts with 4d-9b-53-53-32-31 - return Finger(uid, temp_id, 1, data) + #CMD_USERTEMP_RRQ desn't need [:-1] + return Finger(uid, temp_id, 1, data[:-1]) #udp def get_templates(self): """ return array of all fingers """ @@ -948,9 +952,11 @@ class ZK(object): if self.verbose: print("WRN: no user data") # debug return [] total_size = unpack('i', templatedata[0:4])[0] + print ("get template total size {}, size {} len {}".format(total_size, size, len(templatedata))) templatedata = templatedata[4:] #total size not used # ZKFinger VX10.0 the only finger firmware tested while total_size: + #print ("total_size {}".format(total_size)) size, uid, fid, valid = unpack('HHbb',templatedata[:6]) template = unpack("%is" % (size-6), templatedata[6:size])[0] finger = Finger(uid, fid, valid, template) @@ -1077,7 +1083,7 @@ class ZK(object): if self.verbose: print("A:%i esperando primer regevent" % attempts) data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() - if self.verbose: print((data_recv).encode('hex')) + if self.verbose: print(codecs.encode(data_recv,'hex')) if self.tcp: if len(data_recv) > 16: #not empty res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] @@ -1096,7 +1102,7 @@ class ZK(object): if self.verbose: print ("A:%i esperando 2do regevent" % attempts) data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() - if self.verbose: print ((data_recv).encode('hex')) + if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: if len(data_recv) > 8: #not empty res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] @@ -1121,7 +1127,7 @@ class ZK(object): if self.verbose: print ("esperando 3er regevent") data_recv = self.__sock.recv(1032) # timeout? tarda bastante... self.__ack_ok() - if self.verbose: print ((data_recv).encode('hex')) + if self.verbose: print (codecs.encode(data_recv, 'hex')) if self.tcp: res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0] else: @@ -1195,8 +1201,8 @@ class ZK(object): uid = tuser[0].uid yield Attendance(uid, user_id, timestamp, status) else: - if self.verbose: print ((data).encode('hex')), len(data) - yield data + if self.verbose: print (codecs.encode(data, 'hex')), len(data) + yield codecs.encode(data, 'hex') except timeout: if self.verbose: print ("time out") yield None # return to keep watching @@ -1232,10 +1238,8 @@ class ZK(object): raise ZKErrorResponse("can't read chunk %i:[%i]" % (start, size)) #else if cmd_response.get('code') == const.CMD_DATA: # less than 1024!!! - if self.tcp: - return self.__data[8:] #TODO: check size? - else: - return self.__data + if self.verbose: print ("size was {} len is {}".format(size, len(self.__data))) + return self.__data if cmd_response.get('code') == const.CMD_PREPARE_DATA: data = [] bytes = self.__get_data_size() #TODO: check with size diff --git a/zk/finger.py b/zk/finger.py index 1359af2..2ab0ab0 100644 --- a/zk/finger.py +++ b/zk/finger.py @@ -9,15 +9,19 @@ class Finger(object): self.valid = valid self.template = template #self.mark = str().encode("hex") - self.mark = codecs.encode(template[:6], 'hex') + self.mark = codecs.encode(template[:8], 'hex') + b'...' + codecs.encode(template[-8:], 'hex') + def repack(self): #full return pack("HHbb%is" % (self.size), self.size+6, self.uid, self.fid, self.valid, self.template) def repack_only(self): #only template return pack("H%is" % (self.size), self.size+2, self.template) - + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + def __str__(self): - return " [uid:%i, fid:%i, size:%i v:%i t:%s...]" % (self.uid, self.fid, self.size, self.valid, self.mark) + return " [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, self.mark) def __repr__(self): - return " [uid:%i, fid:%i, size:%i v:%i t:%s...]" % (self.uid, self.fid, self.size, self.valid, self.mark) #.encode('hex') + return " [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, self.mark)