latest test for smaller TCP DATA packet (on JZ4725_TFT)

This commit is contained in:
Arturo Hernandez 2018-08-31 20:16:10 -04:00
parent 99b677a9da
commit 5826aac263
3 changed files with 70 additions and 5 deletions

View File

@ -283,6 +283,7 @@ DeviceName : iClock260
Firmware Version : Ver 6.60 Nov 6 2017 (remote tested with correct results)
Platform : ZMM220_TFT
DeviceName : (unknown device) (broken info but at least the important data was read)
```
@ -297,6 +298,10 @@ DeviceName : iClock260
Firmware Version : Ver 6.60 Jun 16 2015
Platform : JZ4725_TFT
DeviceName : K14 (not tested, but same behavior like the other one)
Firmware Version : Ver 6.60 Jun 9 2017
Platform : JZ4725_TFT
DeviceName : K20 (Active testing! latest fix)
```
### Not Working (needs more tests, more information)

39
test.py
View File

@ -185,7 +185,7 @@ class PYZKTest(unittest.TestCase):
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_users_small(self, helper, socket):
def test_tcp_get_users_small_data(self, helper, socket):
""" can get empty? """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
@ -215,6 +215,43 @@ class PYZKTest(unittest.TestCase):
self.assertEqual(usu.name, "NN-831", "incorrect uid %s" % usu.name) # generated
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_users_broken_data(self, helper, socket):
""" test case for K20 """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d007d7d758200000','hex'), #ACK Ok
codecs.decode('5050827d58000000d0074c49582013000000000000000000000000000000000002000000000000000000000000000000000000000000000007000000000000000000000000000000f4010000f401000050c30000f4010000f201000050c30000','hex'),#Sizes
codecs.decode('5050827d9c000000dd053c87582015009000000001000000000000000000006366756c616e6f0000000000000000000000000000000000000000000000000000000000003130303030316c70000000000000000000000000000000000200000000000000000000726d656e67616e6f0000000000000000000000000000000000','hex'),#DATA112
codecs.decode('000000000000000000000000323232323232636200000000000000000000000000000000','hex'), #extra data 36
#codecs.decode('','hex'), #
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for get_users TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for free_data TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for exit TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201') #, verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
users = conn.get_users()
#print (users) #debug
socket.return_value.send.assert_called_with(codecs.decode('5050827d13000000df05b3cb582014000109000500000000000000', 'hex')) #get users
self.assertEqual(len(users), 2, "incorrect size %s" % len(users))
#assert one user
usu = users[1]
self.assertIsInstance(usu.uid, int, "uid should be int() %s" % type(usu.uid))
if sys.version_info >= (3, 0):
self.assertIsInstance(usu.user_id, (str, bytes), "user_id should be str() or bytes() %s" % type(usu.user_id))
else:
self.assertIsInstance(usu.user_id, (str, unicode), "user_id should be str() or unicode() %s" % type(usu.user_id))
self.assertEqual(usu.uid, 2, "incorrect uid %s" % usu.uid)
self.assertEqual(usu.user_id, "222222cb", "incorrect user_id %s" % usu.user_id)
self.assertEqual(usu.name, "rmengano", "incorrect uid %s" % usu.name) # check test case
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_users_broken_tcp(self, helper, socket):

View File

@ -1336,8 +1336,19 @@ class ZK(object):
def __recieve_chunk(self):
""" recieve a chunk """
if self.__response == const.CMD_DATA: # less than 1024!!!
if self.verbose: print ("_rc len is {}".format(len(self.__data)))
return self.__data #without headers
if self.tcp: #MUST CHECK TCP SIZE
if self.verbose: print ("_rc_DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length))
if len(self.__data) < (self.__tcp_length - 8):
need = (self.__tcp_length - 8) - len(self.__data)
more_data = self.__recieve_raw_data(need)
if self.verbose: print ("need more data: {}".format(need))
return b''.join([self.__data, more_data])
else: #enough data
if self.verbose: print ("Enough data")
return self.__data
else: #UDP
if self.verbose: print ("_rc len is {}".format(len(self.__data)))
return self.__data #without headers
elif self.__response == const.CMD_PREPARE_DATA:
data = []
size = self.__get_data_size()
@ -1423,8 +1434,20 @@ class ZK(object):
raise ZKErrorResponse("RWB Not supported")
if cmd_response['code'] == const.CMD_DATA:
#direct!!! small!!!
size = len(self.__data)
return self.__data, size
if self.tcp: #MUST CHECK TCP SIZE
if self.verbose: print ("DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length))
if len(self.__data) < (self.__tcp_length - 8):
need = (self.__tcp_length - 8) - len(self.__data)
more_data = self.__recieve_raw_data(need)
if self.verbose: print ("need more data: {}".format(need))
return b''.join([self.__data, more_data]), len(self.__data) + len(more_data)
else: #enough data
if self.verbose: print ("Enough data")
size = len(self.__data)
return self.__data, size
else: #udp is direct
size = len(self.__data)
return self.__data, size
#else ACK_OK with size
size = unpack('I', self.__data[1:5])[0] # extra info???
if self.verbose: print ("size fill be %i" % size)