Merge pull request #25 from kurenai-ryu/master

added support for 32byte on live capture
This commit is contained in:
kurenai-ryu 2018-12-07 19:41:04 -04:00 committed by GitHub
commit 0d04f61e1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 8 deletions

View File

@ -42,6 +42,7 @@ Complete documentation of the original project can be found at [Readthedocs](htt
Just create a ZK object and you will be ready to call api. Just create a ZK object and you will be ready to call api.
* Basic Usage * Basic Usage
```python ```python
from zk import ZK, const from zk import ZK, const
@ -170,7 +171,7 @@ zk.enroll_user('23')
* Attendance Record * Attendance Record
```python ```python
# Get attendances (will return list of Attendance object) # Get attendances (will return list of Attendance object)
attendances = conn.get_attendance() attendances = conn.get_attendance()
# Clear attendances record # Clear attendances record
@ -323,6 +324,10 @@ DeviceName : (unknown device) (broken info but at least the important data was r
Firmware Version : Ver 6.60 Jun 9 2017 Firmware Version : Ver 6.60 Jun 9 2017
Platform : JZ4725_TFT Platform : JZ4725_TFT
DeviceName : K20 (latest checked correctly!) DeviceName : K20 (latest checked correctly!)
Firmware Version : Ver 6.60 Aug 23 2014
Platform : ZEM600_TFT
DeviceName : VF680 (face device only, but we read the user and attendance list!)
``` ```
@ -359,6 +364,12 @@ DeviceName : iClock260 (no capture data - probably similar problem as the latest
If you have another version tested and it worked, please inform me to update this list! If you have another version tested and it worked, please inform me to update this list!
# Related Project
* [zkcluster](https://github.com/kurenai-ryu/zkcluster/ "zkcluster project") is a django apps to manage multiple fingerprint devices. (Initial support form the [original project](https://github.com/fananimi/zkcluster/))
# Related Project (TODO: check compatibility with this fork)
* [Driji](https://github.com/fananimi/driji/ "Driji project") is an attendance apps based fingerprint for school
# Todo # Todo
* Create better documentation * Create better documentation

60
test.py
View File

@ -417,7 +417,7 @@ class PYZKTest(unittest.TestCase):
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex')) socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
template = conn.get_user_template(14, 1) template = conn.get_user_template(14, 1)
self.assertEqual(template.size, 1243, "incorrect size %s" % template.size) self.assertEqual(template.size, 1243, "incorrect size %s" % template.size)
self.assertEqual(template.mark, "4d98535332310000...feff03d56454ccc1", "incorrect mark %s" % template.mark) self.assertEqual(template.mark, b"4d98535332310000...feff03d56454ccc1", "incorrect mark %s" % template.mark)
self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid) self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid)
conn.disconnect() conn.disconnect()
@ -442,14 +442,68 @@ class PYZKTest(unittest.TestCase):
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
] ]
#begin #begin
zk = ZK('192.168.1.201', verbose=True) zk = ZK('192.168.1.201')#, verbose=True)
conn = zk.connect() conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex')) socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
template = conn.get_user_template(14, 1) template = conn.get_user_template(14, 1)
self.assertEqual(template.size, 1010, "incorrect size %s" % template.size) self.assertEqual(template.size, 1010, "incorrect size %s" % template.size)
self.assertEqual(template.mark, "4ab1535332310000...81c0c1ffc2fea057", "incorrect mark %s" % template.mark) self.assertEqual(template.mark, b"4ab1535332310000...81c0c1ffc2fea057", "incorrect mark %s" % template.mark)
self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid) self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid)
conn.disconnect() conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_live_connect(self, helper, socket):
""" check live_capture 12 bytes"""
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d64000000d007a3159663130000000000000000000000000000000000070000000000000006000000000000005d020000000000000f0c0000000000000100000000000000b80b000010270000a0860100b20b00000927000043840100000000000000', 'hex'), #sizes
codecs.decode('5050827d04020000dd05942c96631500f801000001000e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003830380000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003832310000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833350000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833310000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833320000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003836000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000383432000000000000000000000000000000000000000000','hex'), #DATA directly(not ok)
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
codecs.decode('5050827d10000000dc053b59d0983500f401ae4301000000f19449000000120c07130906', 'hex'), # tcp PREPARE_DATA 1011
codecs.decode('5050827df8030000f401ae4301000000f19449000000120c07130906', 'hex'), # reg_event!
codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201')#, verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
for att in conn.live_capture():
#print att
conn.end_live_capture = True
self.assertEqual(att.user_id, "4822257", "incorrect user_id %s" % att.user_id)
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_live_connect_small(self, helper, socket):
""" check live_capture 32 bytes"""
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d64000000d007a3159663130000000000000000000000000000000000070000000000000006000000000000005d020000000000000f0c0000000000000100000000000000b80b000010270000a0860100b20b00000927000043840100000000000000', 'hex'), #sizes
codecs.decode('5050827d04020000dd05942c96631500f801000001000e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003830380000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003832310000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833350000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833310000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833320000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003836000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000383432000000000000000000000000000000000000000000','hex'), #DATA directly(not ok)
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
codecs.decode('5050827d10000000dc053b59d0983500f401ae4301000000f19449000000120c07130906', 'hex'), # tcp PREPARE_DATA 1011
codecs.decode('5050827df8030000f401ae43010000003131343030363400000000000000000000000000000000000f00120b1d0c3703', 'hex'), # reg_event!
codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201')#, verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
for att in conn.live_capture():
#print att
conn.end_live_capture = True
self.assertEqual(att.user_id, "1140064", "incorrect user_id %s" % att.user_id)
conn.disconnect()
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -491,7 +491,7 @@ class ZK(object):
cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024) cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024)
cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024) cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024)
cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024) cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024)
def get_extend_fmt(self): def get_extend_fmt(self):
''' '''
determine extend fmt determine extend fmt
@ -1213,6 +1213,7 @@ class ZK(object):
self.verify_user() self.verify_user()
if not self.is_enabled: if not self.is_enabled:
self.enable_device() self.enable_device()
if self.verbose: print ("start live_capture")
self.reg_event(const.EF_ATTLOG) #0xFFFF self.reg_event(const.EF_ATTLOG) #0xFFFF
self.__sock.settimeout(new_timeout) # default 1 minute test? self.__sock.settimeout(new_timeout) # default 1 minute test?
self.end_live_capture = False self.end_live_capture = False
@ -1230,7 +1231,7 @@ class ZK(object):
header = unpack('<4H', data_recv[:8]) header = unpack('<4H', data_recv[:8])
data = data_recv[8:] data = data_recv[8:]
if not header[0] == const.CMD_REG_EVENT: if not header[0] == const.CMD_REG_EVENT:
if self.verbose: print("not event!") if self.verbose: print("not event! %x" % header[0])
continue # or raise error? continue # or raise error?
if not len(data): if not len(data):
if self.verbose: print ("empty") if self.verbose: print ("empty")
@ -1248,8 +1249,8 @@ class ZK(object):
else: else:
uid = tuser[0].uid uid = tuser[0].uid
yield Attendance(user_id, timestamp, status, punch, uid)#punch test? yield Attendance(user_id, timestamp, status, punch, uid)#punch test?
elif len(data) == 36: #class 2 attendance elif len(data) == 36 or len(data) == 32: #class 2 attendance
user_id, status, punch, timehex, res = unpack('<24sBB6sI', data) user_id, status, punch, timehex = unpack('<24sBB6s', data[:32])
user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore') user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore')
timestamp = self.__decode_timehex(timehex) timestamp = self.__decode_timehex(timehex)
tuser = list(filter(lambda x: x.user_id == user_id, users)) tuser = list(filter(lambda x: x.user_id == user_id, users))