From 80070941adce5e60d149a6d3a5e6c323f651a10c Mon Sep 17 00:00:00 2001 From: Arturo Hernandez Date: Fri, 7 Dec 2018 19:26:58 -0400 Subject: [PATCH] added test for 32 bytes live_capture response --- test.py | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++--- zk/base.py | 9 ++++---- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/test.py b/test.py index 63adf64..977a17e 100755 --- a/test.py +++ b/test.py @@ -417,7 +417,7 @@ class PYZKTest(unittest.TestCase): socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex')) template = conn.get_user_template(14, 1) self.assertEqual(template.size, 1243, "incorrect size %s" % template.size) - self.assertEqual(template.mark, "4d98535332310000...feff03d56454ccc1", "incorrect mark %s" % template.mark) + self.assertEqual(template.mark, b"4d98535332310000...feff03d56454ccc1", "incorrect mark %s" % template.mark) self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid) conn.disconnect() @@ -442,14 +442,68 @@ class PYZKTest(unittest.TestCase): codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response ] #begin - zk = ZK('192.168.1.201', verbose=True) + zk = ZK('192.168.1.201')#, verbose=True) conn = zk.connect() socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex')) template = conn.get_user_template(14, 1) self.assertEqual(template.size, 1010, "incorrect size %s" % template.size) - self.assertEqual(template.mark, "4ab1535332310000...81c0c1ffc2fea057", "incorrect mark %s" % template.mark) + self.assertEqual(template.mark, b"4ab1535332310000...81c0c1ffc2fea057", "incorrect mark %s" % template.mark) self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid) conn.disconnect() + @patch('zk.base.socket') + @patch('zk.base.ZK_helper') + def test_tcp_live_connect(self, helper, socket): + """ check live_capture 12 bytes""" + helper.return_value.test_ping.return_value = True # ping simulated + helper.return_value.test_tcp.return_value = 0 # helper tcp ok + socket.return_value.recv.side_effect = [ + codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK + codecs.decode('5050827d64000000d007a3159663130000000000000000000000000000000000070000000000000006000000000000005d020000000000000f0c0000000000000100000000000000b80b000010270000a0860100b20b00000927000043840100000000000000', 'hex'), #sizes + codecs.decode('5050827d04020000dd05942c96631500f801000001000e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003830380000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003832310000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833350000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833310000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833320000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003836000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000383432000000000000000000000000000000000000000000','hex'), #DATA directly(not ok) + codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response + codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response + codecs.decode('5050827d10000000dc053b59d0983500f401ae4301000000f19449000000120c07130906', 'hex'), # tcp PREPARE_DATA 1011 + codecs.decode('5050827df8030000f401ae4301000000f19449000000120c07130906', 'hex'), # reg_event! + codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK + codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response + ] + #begin + zk = ZK('192.168.1.201')#, verbose=True) + conn = zk.connect() + socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex')) + for att in conn.live_capture(): + #print att + conn.end_live_capture = True + self.assertEqual(att.user_id, "4822257", "incorrect user_id %s" % att.user_id) + conn.disconnect() + + @patch('zk.base.socket') + @patch('zk.base.ZK_helper') + def test_tcp_live_connect_small(self, helper, socket): + """ check live_capture 32 bytes""" + helper.return_value.test_ping.return_value = True # ping simulated + helper.return_value.test_tcp.return_value = 0 # helper tcp ok + socket.return_value.recv.side_effect = [ + codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK + codecs.decode('5050827d64000000d007a3159663130000000000000000000000000000000000070000000000000006000000000000005d020000000000000f0c0000000000000100000000000000b80b000010270000a0860100b20b00000927000043840100000000000000', 'hex'), #sizes + codecs.decode('5050827d04020000dd05942c96631500f801000001000e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003830380000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003832310000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833350000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833310000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833320000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003836000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000383432000000000000000000000000000000000000000000','hex'), #DATA directly(not ok) + codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response + codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response + codecs.decode('5050827d10000000dc053b59d0983500f401ae4301000000f19449000000120c07130906', 'hex'), # tcp PREPARE_DATA 1011 + codecs.decode('5050827df8030000f401ae43010000003131343030363400000000000000000000000000000000000f00120b1d0c3703', 'hex'), # reg_event! + codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK + codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response + ] + #begin + zk = ZK('192.168.1.201')#, verbose=True) + conn = zk.connect() + socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex')) + for att in conn.live_capture(): + #print att + conn.end_live_capture = True + self.assertEqual(att.user_id, "1140064", "incorrect user_id %s" % att.user_id) + conn.disconnect() + if __name__ == '__main__': unittest.main() diff --git a/zk/base.py b/zk/base.py index 4724df6..3abe842 100644 --- a/zk/base.py +++ b/zk/base.py @@ -491,7 +491,7 @@ class ZK(object): cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024) cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024) cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024) - + def get_extend_fmt(self): ''' determine extend fmt @@ -1213,6 +1213,7 @@ class ZK(object): self.verify_user() if not self.is_enabled: self.enable_device() + if self.verbose: print ("start live_capture") self.reg_event(const.EF_ATTLOG) #0xFFFF self.__sock.settimeout(new_timeout) # default 1 minute test? self.end_live_capture = False @@ -1230,7 +1231,7 @@ class ZK(object): header = unpack('<4H', data_recv[:8]) data = data_recv[8:] if not header[0] == const.CMD_REG_EVENT: - if self.verbose: print("not event!") + if self.verbose: print("not event! %x" % header[0]) continue # or raise error? if not len(data): if self.verbose: print ("empty") @@ -1248,8 +1249,8 @@ class ZK(object): else: uid = tuser[0].uid yield Attendance(user_id, timestamp, status, punch, uid)#punch test? - elif len(data) == 36: #class 2 attendance - user_id, status, punch, timehex, res = unpack('<24sBB6sI', data) + elif len(data) == 36 or len(data) == 32: #class 2 attendance + user_id, status, punch, timehex = unpack('<24sBB6s', data[:32]) user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore') timestamp = self.__decode_timehex(timehex) tuser = list(filter(lambda x: x.user_id == user_id, users))