fixed reading tcp bug

This commit is contained in:
Arturo Hernandez 2018-05-03 15:58:52 -04:00
parent abb76d7ac4
commit 66448cef18
6 changed files with 140 additions and 89 deletions

View File

@ -17,17 +17,17 @@ Complete documentation can be found at [Readthedocs](http://pyzk.readthedocs.io/
Just create a ZK object and you will ready to call api.
* Basic Usage
```
```python
from zk import ZK, const
conn = None
zk = ZK('192.168.1.10', port=4370, timeout=5)
zk = ZK('192.168.1.201', port=4370, timeout=5)
try:
print 'Connecting to device ...'
print ('Connecting to device ...')
conn = zk.connect()
print 'Disabling device ...'
print ('Disabling device ...')
conn.disable_device()
print 'Firmware Version: : {}'.format(conn.get_firmware_version())
print ('Firmware Version: : {}'.format(conn.get_firmware_version()))
# print '--- Get User ---'
users = conn.get_users()
for user in users:
@ -35,19 +35,19 @@ try:
if user.privilege == const.USER_ADMIN:
privilege = 'Admin'
print '- UID #{}'.format(user.uid)
print ' Name : {}'.format(user.name)
print ' Privilege : {}'.format(privilege)
print ' Password : {}'.format(user.password)
print ' Group ID : {}'.format(user.group_id)
print ' User ID : {}'.format(user.user_id)
print ('- UID #{}'.format(user.uid))
print (' Name : {}'.format(user.name))
print (' Privilege : {}'.format(privilege))
print (' Password : {}'.format(user.password))
print (' Group ID : {}'.format(user.group_id))
print (' User ID : {}'.format(user.user_id))
print "Voice Test ..."
print ("Voice Test ...")
conn.test_voice()
print 'Enabling device ...'
print ('Enabling device ...')
conn.enable_device()
except Exception, e:
print "Process terminate : {}".format(e)
except Exception as e:
print ("Process terminate : {}".format(e))
finally:
if conn:
conn.disconnect()
@ -100,7 +100,7 @@ conn.get_pin_width()
* Get Device use and free Space
```
```python
conn.read_sizes()
print(conn)
#also:
@ -114,7 +114,7 @@ conn.records_cap
* User Operation
```
```python
# Create user
conn.set_user(uid=1, name='Fanani M. Ihsan', privilege=const.USER_ADMIN, password='12345678', group_id='', user_id='123', card=0)
# Get all users (will return list of User object)
@ -122,12 +122,12 @@ users = conn.get_users()
# Delete User
conn.delete_user(uid=1)
```
there is also an enroll_user() (but it doesn't work with some tcp ZK8 devices)
there is also an `enroll_user()` (but it doesn't work with some tcp ZK8 devices)
* Fingerprints
```
```python
# Get a single Fingerprint (will return a Finger object)
template = conn.get_user_template(uid=1, temp_id=0) #temp_id is the finger to read 0~9
# Get all fingers from DB (will return a list of Finger objects)
@ -141,7 +141,7 @@ conn.save_user_template(user, [fing1 ,fing2])
* Attendance Record
```
```python
# Get attendances (will return list of Attendance object)
attendances = conn.get_attendance()
# Clear attendances record
@ -150,13 +150,13 @@ conn.clear_attendance()
* Test voice
```
```python
conn.test_voice(index=10) # beep or chirp
```
* Device Maintenance
```
```python
# shutdown connected device
conn.power_off()
# restart connected device
@ -165,6 +165,43 @@ conn.restart()
conn.free_data()
```
Test Machine
```sh
usage: ./test_machine.py [-h] [-a ADDRESS] [-p PORT] [-T TIMEOUT] [-P PASSWORD]
[-f] [-t] [-r] [-u] [-l] [-D DELETEUSER] [-A ADDUSER]
[-E ENROLLUSER] [-F FINGER]
ZK Basic Reading Tests
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
ZK device Address [192.168.1.201]
-p PORT, --port PORT ZK device port [4370]
-T TIMEOUT, --timeout TIMEOUT
Default [10] seconds (0: disable timeout)
-P PASSWORD, --password PASSWORD
Device code/password
-f, --force-udp Force UDP communication
-t, --templates Get templates / fingers
-r, --records Get attendance records
-u, --updatetime Update Date/Time
-l, --live-capture Live Event Capture
-D DELETEUSER, --deleteuser DELETEUSER
Delete a User (uid)
-A ADDUSER, --adduser ADDUSER
Add a User (uid) (and enroll)
-E ENROLLUSER, --enrolluser ENROLLUSER
Enroll a User (uid)
-F FINGER, --finger FINGER
Finger for enroll (fid=0)
```
# Related Project
* [zkcluster](https://github.com/fananimi/zkcluster/ "zkcluster project") is a django apps to manage multiple fingerprint devices.

31
test.py
View File

@ -1,39 +1,36 @@
# -*- coding: utf-8 -*-
import sys
sys.path.append("zk")
from zk import ZK, const
sys.path.append("zk")
conn = None
zk = ZK('192.168.1.201', port=4370, timeout=5)
try:
print 'Connecting to device ...'
print ('Connecting to device ...')
conn = zk.connect()
print 'Disabling device ...'
print ('Disabling device ...')
conn.disable_device()
print 'Firmware Version: : {}'.format(conn.get_firmware_version())
print ('Firmware Version: : {}'.format(conn.get_firmware_version()))
# print '--- Get User ---'
users = conn.get_users()
for user in users:
privilege = 'User'
if user.privilege == const.USER_ADMIN:
privilege = 'Admin'
print ('- UID #{}'.format(user.uid))
print (' Name : {}'.format(user.name))
print (' Privilege : {}'.format(privilege))
print (' Password : {}'.format(user.password))
print (' Group ID : {}'.format(user.group_id))
print (' User ID : {}'.format(user.user_id))
print '- UID #{}'.format(user.uid)
print ' Name : {}'.format(user.name)
print ' Privilege : {}'.format(privilege)
print ' Password : {}'.format(user.password)
print ' Group ID : {}'.format(user.group_id)
print ' User ID : {}'.format(user.user_id)
print ' Card : {}'.format(user.card)
print "Voice Test ..."
print ("Voice Test ...")
conn.test_voice()
print 'Enabling device ...'
print ('Enabling device ...')
conn.enable_device()
except Exception, e:
print "Process terminate : {}".format(e)
except Exception as e:
print ("Process terminate : {}".format(e))
finally:
if conn:
conn.disconnect()

View File

@ -20,35 +20,37 @@ conn = None
parser = argparse.ArgumentParser(description='ZK Basic Reading Tests')
parser.add_argument('-a', '--address',
help='ZK device Addres [192.168.1.201]', default='192.168.1.201')
help='ZK device Address [192.168.1.201]', default='192.168.1.201')
parser.add_argument('-p', '--port', type=int,
help='device port', default=4370)
help='ZK device port [4370]', default=4370)
parser.add_argument('-T', '--timeout', type=int,
help='timeout', default=10)
help='Default [10] seconds (0: disable timeout)', default=10)
parser.add_argument('-P', '--password', type=int,
help='Device code/password', default=0)
parser.add_argument('-f', '--force-udp', action="store_true",
help='Force UDP communication')
parser.add_argument('-v', '--verbose', action="store_true",
help='Print debug information')
parser.add_argument('-t', '--templates', action="store_true",
help='get templates')
help='Get templates / fingers')
parser.add_argument('-r', '--records', action="store_true",
help='get records')
help='Get attendance records')
parser.add_argument('-u', '--updatetime', action="store_true",
help='Update Date / Time')
help='Update Date/Time')
parser.add_argument('-l', '--live-capture', action="store_true",
help='Live Event Capture')
parser.add_argument('-D', '--deleteuser', type=int,
help='Delete a User (uid)', default=0)
parser.add_argument('-A', '--adduser', type=int,
help='Add a User (uid)', default=0)
help='Add a User (uid) (and enroll)', default=0)
parser.add_argument('-E', '--enrolluser', type=int,
help='Enroll a User (uid)', default=0)
parser.add_argument('-F', '--finger', type=int,
help='Finger for register', default=0)
help='Finger for enroll (fid=0)', default=0)
args = parser.parse_args()
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp) # , firmware=args.firmware
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose)
try:
print('Connecting to device ...')
conn = zk.connect()
@ -167,7 +169,12 @@ try:
print ("Read Templates...")
templates = conn.get_templates()
for tem in templates:
print (tem)
tem2 =conn.get_user_template(tem.uid,tem.fid)
if tem == tem2: # compare with alternative method
print ("OK! %s" % tem)
else:
print ("dif-1 %s" % tem)
print ("dif-2 %s" % tem2)
if args.records:
print ("Read Records...")
attendance = conn.get_attendance()
@ -181,11 +188,11 @@ try:
print (conn)
if args.live_capture:
print ('')
print ('--- Live Capture! (press ctrl+C to break)---') #TODO how?
print ('--- Live Capture! (press ctrl+C to break) ---')
counter = 0
for att in conn.live_capture():
for att in conn.live_capture():# using a generator!
if att is None:
#counter += 1
#counter += 1 #enable to implemet a poorman timeout
print ("timeout {}".format(counter))
else:
print ("ATT {:>6}: uid:{:>3}, user_id:{:>8} t: {}, s:{}".format(counter, att.uid, att.user_id, att.timestamp, att.status))

View File

@ -3,10 +3,11 @@
import sys
import argparse
sys.path.append("zk")
from time import sleep
from zk import ZK, const
sys.path.append("zk")
parser = argparse.ArgumentParser(description='ZK Basic Reading Tests')
parser.add_argument('-a', '--address',
help='ZK device Addres [192.168.1.201]', default='192.168.1.201')
@ -18,12 +19,13 @@ parser.add_argument('-P', '--password', type=int,
help='Device code/password', default=0)
parser.add_argument('-f', '--force-udp', action="store_true",
help='Force UDP communication')
parser.add_argument('-v', '--verbose', action="store_true",
help='Print debug information')
args = parser.parse_args()
conn = None
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp)
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose)
try:
print ('Connecting to device ...')
conn = zk.connect()

View File

@ -80,7 +80,7 @@ class ZK_helper(object):
self.client.close()
return res
def test_udp(self):
def test_udp(self): # WIP:
self.client = socket(AF_INET, SOCK_DGRAM)
self.client.settimeout(10) # fixed test
@ -470,7 +470,7 @@ class ZK(object):
def get_user_extend_fmt(self):
'''
determine extend fmt
determine user extend fmt
'''
command = const.CMD_OPTIONS_RRQ
command_string = b'~UserExtFmt'
@ -502,7 +502,7 @@ class ZK(object):
def get_compat_old_firmware(self):
'''
determine extend fmt
determine old firmware
'''
command = const.CMD_OPTIONS_RRQ
command_string = b'CompatOldFirmware'
@ -538,7 +538,6 @@ class ZK(object):
command = const.CMD_GET_PINWIDTH
command_string = b' P'
response_size = 9
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
width = self.__data.split(b'\x00')[0]
@ -559,7 +558,6 @@ class ZK(object):
""" read sizes """
command = const.CMD_GET_FREE_SIZES
response_size = 1024
cmd_response = self.__send_command(command,b'', response_size)
if cmd_response.get('status'):
size = len(self.__data)
@ -607,7 +605,7 @@ class ZK(object):
raise ZKErrorResponse("can't restart device")
def get_time(self):
"""obtener la hora del equipo"""
"""get Device Time"""
command = const.CMD_GET_TIME
response_size = 1032
cmd_response = self.__send_command(command, b'', response_size)
@ -617,7 +615,7 @@ class ZK(object):
raise ZKErrorResponse("can't get time")
def set_time(self, timestamp):
""" colocar la hora del sistema al zk """
""" set Device time (pass datetime object)"""
command = const.CMD_SET_TIME
command_string = pack(b'I', self.__encode_time(timestamp))
cmd_response = self.__send_command(command, command_string)
@ -653,7 +651,7 @@ class ZK(object):
def test_voice(self, index=0):
'''
play test voice
0 acceso correcto
0 acceso correcto / acceso correcto
1 password incorrecto / clave incorrecta
2 la memoria del terminal está llena / acceso denegado
3 usuario invalido /codigo no valido
@ -865,8 +863,8 @@ class ZK(object):
def get_user_template(self, uid, temp_id=0):
""" ZKFinger VX10.0
for tcp:
command = const.CMD_USERTEMP_RRQ
command_string = pack('hb', uid, temp_id)
command = const.CMD_USERTEMP_RRQ (doesn't work always)
command_string = pack('hb', uid, temp_id)
"""
command = 88 # comando secreto!!!
@ -878,15 +876,16 @@ class ZK(object):
return None #("can't get user template")
#else
if cmd_response.get('code') == const.CMD_DATA: # less than 1024!!!
resp = self.__data[:-1]
if self.tcp:
size = len(self.__data[8:])
return Finger(uid, temp_id, 1, self.__data[8:])
else:
size = len(self.__data)
return Finger(uid, temp_id, 1, self.__data)
if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug?
resp = resp[:-6]
if self.verbose: print("tcp too small!")
return Finger(uid, temp_id, 1, resp)
if cmd_response.get('code') == const.CMD_PREPARE_DATA:
data = []
bytes = self.__get_data_size() #TODO: check with size
size = bytes
if self.tcp:
data_recv = self.__sock.recv(bytes + 32)
recieved = len(data_recv)
@ -896,9 +895,11 @@ class ZK(object):
response = unpack('HHHH', data_recv[8:16])[0]
if recieved >= (bytes + 32): #complete
if response == const.CMD_DATA:
resp = data_recv[16:bytes+16] # no ack?
if self.verbose: print ("resp len", len(resp))
return Finger(uid, temp_id, 1, resp)
resp = data_recv[16:bytes+16][:size-1] # no ack?
if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug?
resp = resp[:-6]
if self.verbose: print ("resp len1", len(resp))
return Finger(uid, temp_id, 1, resp) #mistery
else:
if self.verbose: print("broken packet!!!")
return None #broken
@ -913,7 +914,10 @@ class ZK(object):
data_recv = self.__sock.recv(16)
response = unpack('HHHH', data_recv[8:16])[0]
if response == const.CMD_ACK_OK:
resp = b''.join(data)
resp = b''.join(data)[:size-1] # testing
if resp[-6:] == b'\x00\x00\x00\x00\x00\x00':
resp = resp[:-6]
if self.verbose: print ("resp len2", len(resp))
return Finger(uid, temp_id, 1, resp)
#data_recv[bytes+16:].encode('hex') #included CMD_ACK_OK
if self.verbose: print("bad response %s" % data_recv)
@ -936,9 +940,9 @@ class ZK(object):
break
if self.verbose: print("still needs %s" % bytes)
data = b''.join(data)
self.free_data()
#uid 32 fing 03, starts with 4d-9b-53-53-32-31
return Finger(uid, temp_id, 1, data)
#CMD_USERTEMP_RRQ desn't need [:-1]
return Finger(uid, temp_id, 1, data[:-1]) #udp
def get_templates(self):
""" return array of all fingers """
@ -948,9 +952,11 @@ class ZK(object):
if self.verbose: print("WRN: no user data") # debug
return []
total_size = unpack('i', templatedata[0:4])[0]
print ("get template total size {}, size {} len {}".format(total_size, size, len(templatedata)))
templatedata = templatedata[4:] #total size not used
# ZKFinger VX10.0 the only finger firmware tested
while total_size:
#print ("total_size {}".format(total_size))
size, uid, fid, valid = unpack('HHbb',templatedata[:6])
template = unpack("%is" % (size-6), templatedata[6:size])[0]
finger = Finger(uid, fid, valid, template)
@ -1077,7 +1083,7 @@ class ZK(object):
if self.verbose: print("A:%i esperando primer regevent" % attempts)
data_recv = self.__sock.recv(1032) # timeout? tarda bastante...
self.__ack_ok()
if self.verbose: print((data_recv).encode('hex'))
if self.verbose: print(codecs.encode(data_recv,'hex'))
if self.tcp:
if len(data_recv) > 16: #not empty
res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0]
@ -1096,7 +1102,7 @@ class ZK(object):
if self.verbose: print ("A:%i esperando 2do regevent" % attempts)
data_recv = self.__sock.recv(1032) # timeout? tarda bastante...
self.__ack_ok()
if self.verbose: print ((data_recv).encode('hex'))
if self.verbose: print (codecs.encode(data_recv, 'hex'))
if self.tcp:
if len(data_recv) > 8: #not empty
res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0]
@ -1121,7 +1127,7 @@ class ZK(object):
if self.verbose: print ("esperando 3er regevent")
data_recv = self.__sock.recv(1032) # timeout? tarda bastante...
self.__ack_ok()
if self.verbose: print ((data_recv).encode('hex'))
if self.verbose: print (codecs.encode(data_recv, 'hex'))
if self.tcp:
res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0]
else:
@ -1195,8 +1201,8 @@ class ZK(object):
uid = tuser[0].uid
yield Attendance(uid, user_id, timestamp, status)
else:
if self.verbose: print ((data).encode('hex')), len(data)
yield data
if self.verbose: print (codecs.encode(data, 'hex')), len(data)
yield codecs.encode(data, 'hex')
except timeout:
if self.verbose: print ("time out")
yield None # return to keep watching
@ -1232,10 +1238,8 @@ class ZK(object):
raise ZKErrorResponse("can't read chunk %i:[%i]" % (start, size))
#else
if cmd_response.get('code') == const.CMD_DATA: # less than 1024!!!
if self.tcp:
return self.__data[8:] #TODO: check size?
else:
return self.__data
if self.verbose: print ("size was {} len is {}".format(size, len(self.__data)))
return self.__data
if cmd_response.get('code') == const.CMD_PREPARE_DATA:
data = []
bytes = self.__get_data_size() #TODO: check with size

View File

@ -9,15 +9,19 @@ class Finger(object):
self.valid = valid
self.template = template
#self.mark = str().encode("hex")
self.mark = codecs.encode(template[:6], 'hex')
self.mark = codecs.encode(template[:8], 'hex') + b'...' + codecs.encode(template[-8:], 'hex')
def repack(self): #full
return pack("HHbb%is" % (self.size), self.size+6, self.uid, self.fid, self.valid, self.template)
def repack_only(self): #only template
return pack("H%is" % (self.size), self.size+2, self.template)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __str__(self):
return "<Finger> [uid:%i, fid:%i, size:%i v:%i t:%s...]" % (self.uid, self.fid, self.size, self.valid, self.mark)
return "<Finger> [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, self.mark)
def __repr__(self):
return "<Finger> [uid:%i, fid:%i, size:%i v:%i t:%s...]" % (self.uid, self.fid, self.size, self.valid, self.mark) #.encode('hex')
return "<Finger> [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, self.mark)