Merge pull request #18 from kurenai-ryu/master

major changes (with tcp support)
This commit is contained in:
Fanani M. Ihsan 2018-10-30 08:40:28 +07:00 committed by GitHub
commit 0cebf52d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 3502 additions and 606 deletions

27
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@ -0,0 +1,27 @@
---
name: Bug report
about: Create a report to help us improve
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. initialize data '...'
2. execute command '....'
3. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Capture Data**
Try to always include captured data (pcap files from wireshark [(tutorial)](https://youtu.be/AsDedCgkhnA), and verbose output from test_machine if applicable)
**System (please complete the following information):**
- OS: [e.g. iOS, Debian 9]
- Python version [e.g. 2.7, 3.5]]
**Additional context**
Add any other context about the problem here.

7
.github/ISSUE_TEMPLATE/custom.md vendored Normal file
View File

@ -0,0 +1,7 @@
---
name: Custom issue template
about: Describe this issue template's purpose here.
---

View File

@ -0,0 +1,17 @@
---
name: Feature request
about: Suggest an idea for this project
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

2
.gitignore vendored
View File

@ -3,3 +3,5 @@
build/
dist/
*.egg-info/
*.test
*.bak

View File

@ -1,5 +1,22 @@
Changelog
=========
Version 0.9
-----------
* Initial Python 3 Support
* major changes
Version 0.8
-----------
* test suite tool (test_machine.py)
* Initial TCP support
Version 0.7
-----------
* internal major changes
Version 0.6
-----------
* device password support
Version 0.5
-----------

306
README.md
View File

@ -4,30 +4,55 @@ pyzk is an unofficial library of zksoftware the fingerprint attendance machine.
# Installation
[![Build Status](https://travis-ci.org/fananimi/pyzk.svg?branch=master)](https://travis-ci.org/fananimi/pyzk)
[![Build Status](https://travis-ci.org/kurenai-ryu/pyzk.svg?branch=master)](https://travis-ci.org/kurenai-ryu/pyzk)
`pip install pyzk`
replace original pyzk, if it was installed
```sh
pip install -U git+https://github.com/kurenai-ryu/pyzk.git
```
or using pipenv:
```sh
pipenv install git+https://gith.com/kurenai-ryu/pyzk#egg=pyzk
```
or clone and execute:
```sh
python setup.py install
```
or in your project, append the path of this project
```python
import sys
import os
sys.path.insert(1,os.path.abspath("./pyzk"))
from zk import ZK, const
```
# Documentation
Complete documentation can be found at [Readthedocs](http://pyzk.readthedocs.io/en/latest/ "pyzk's readthedocs") .
Complete documentation of the original project can be found at [Readthedocs](http://pyzk.readthedocs.io/en/latest/ "pyzk's readthedocs") .
# Api Usage
Just create a ZK object and you will ready to call api.
Just create a ZK object and you will be ready to call api.
* Basic Usage
```
```python
from zk import ZK, const
conn = None
zk = ZK('192.168.1.10', port=4370, timeout=5)
zk = ZK('192.168.1.201', port=4370, timeout=5, password=0, force_udp=False, ommit_ping=False)
try:
print 'Connecting to device ...'
print ('Connecting to device ...')
conn = zk.connect()
print 'Disabling device ...'
print ('Disabling device ...')
conn.disable_device()
print 'Firmware Version: : {}'.format(conn.get_firmware_version())
print ('Firmware Version: : {}'.format(conn.get_firmware_version()))
# print '--- Get User ---'
users = conn.get_users()
for user in users:
@ -35,19 +60,19 @@ try:
if user.privilege == const.USER_ADMIN:
privilege = 'Admin'
print '- UID #{}'.format(user.uid)
print ' Name : {}'.format(user.name)
print ' Privilege : {}'.format(privilege)
print ' Password : {}'.format(user.password)
print ' Group ID : {}'.format(user.group_id)
print ' User ID : {}'.format(user.user_id)
print ('- UID #{}'.format(user.uid))
print (' Name : {}'.format(user.name))
print (' Privilege : {}'.format(privilege))
print (' Password : {}'.format(user.password))
print (' Group ID : {}'.format(user.group_id))
print (' User ID : {}'.format(user.user_id))
print "Voice Test ..."
print ("Voice Test ...")
conn.test_voice()
print 'Enabling device ...'
print ('Enabling device ...')
conn.enable_device()
except Exception, e:
print "Process terminate : {}".format(e)
except Exception as e:
print ("Process terminate : {}".format(e))
finally:
if conn:
conn.disconnect()
@ -55,42 +80,96 @@ finally:
* Connect/Disconnect
```
```python
conn = zk.connect()
conn.disconnect()
```
* Disable/Enable Connected Device
```
```python
conn.disable_device()
conn.enable_device()
```
* Ger Firmware Version
* Get and Set Time
```python
from datetime import datetime
zktime = conn.get_time()
print zktime
newtime = datetime.today()
conn.set_time(newtime)
```
* Ger Firmware Version and extra information
```python
conn.get_firmware_version()
conn.get_serialnumber()
conn.get_platform()
conn.get_device_name()
conn.get_face_version()
conn.get_fp_version()
conn.get_extend_fmt()
conn.get_user_extend_fmt()
conn.get_face_fun_on()
conn.get_compat_old_firmware()
conn.get_network_params()
conn.get_mac()
conn.get_pin_width()
```
* Get Device use and free Space
```python
conn.read_sizes()
print(conn)
#also:
conn.users
conn.fingers
conn.records
conn.users_cap
conn.fingers_cap
conn.records_cap
```
* User Operation
```
```python
# Create user
conn.set_user(uid=1, name='Fanani M. Ihsan', privilege=const.USER_ADMIN, password='12345678', group_id='', user_id='123')
conn.set_user(uid=1, name='Fanani M. Ihsan', privilege=const.USER_ADMIN, password='12345678', group_id='', user_id='123', card=0)
# Get all users (will return list of User object)
users = conn.get_users()
# Delete User
conn.delete_user(uid=1)
```
there is also an `enroll_user()` (but it doesn't work with some tcp ZK8 devices)
* Fingerprints
```python
# Get a single Fingerprint (will return a Finger object)
template = conn.get_user_template(uid=1, temp_id=0) #temp_id is the finger to read 0~9
# Get all fingers from DB (will return a list of Finger objects)
fingers = conn.get_templates()
# to restore a finger, we need to assemble with the corresponding user
# pass a User object and a list of finger (max 10) to save
conn.save_user_template(user, [fing1 ,fing2])
* Remote Fingerprint Enrollment
```
zk.enroll_user('23')
```
* Attendance Record
```
```python
# Get attendances (will return list of Attendance object)
attendances = conn.get_attendance()
# Clear attendances record
@ -99,29 +178,190 @@ conn.clear_attendance()
* Test voice
```
conn.test_voice()
```python
conn.test_voice(index=10) # beep or chirp
```
* Device Maintenance
```
```python
# shutdown connected device
conn.power_off()
# restart connected device
conn.restart()
# clear buffer
conn.free_data()
```
# Related Project
* Live Capture!
* [zkcluster](https://github.com/fananimi/zkcluster/ "zkcluster project") is a django apps to manage multiple fingerprint devices.
```python
# live capture! (timeout at 10s)
for attendance in conn.live_capture():
if attendance is None:
# implement here timeout logic
pass
else:
print (attendance) # Attendance object
#
#if you need to break gracefully just set
# conn.end_live_capture = True
#
# On interactive mode,
# use Ctrl+C to break gracefully
# this way it restores timeout
# and disables live capture
```
* [Driji](https://github.com/fananimi/driji/ "Driji project") is an attendance apps based fingerprint for school
**Test Machine**
```sh
usage: ./test_machine.py [-h] [-a ADDRESS] [-p PORT] [-T TIMEOUT] [-P PASSWORD]
[-f] [-t] [-r] [-u] [-l] [-D DELETEUSER] [-A ADDUSER]
[-E ENROLLUSER] [-F FINGER]
ZK Basic Reading Tests
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
ZK device Address [192.168.1.201]
-p PORT, --port PORT ZK device port [4370]
-T TIMEOUT, --timeout TIMEOUT
Default [10] seconds (0: disable timeout)
-P PASSWORD, --password PASSWORD
Device code/password
-b, --basic get Basic Information only. (no bulk read, ie: users)
-f, --force-udp Force UDP communication
-v, --verbose Print debug information
-t, --templates Get templates / fingers (compare bulk and single read)
-tr, --templates-raw Get raw templates (dump templates)
-r, --records Get attendance records
-u, --updatetime Update Date/Time
-l, --live-capture Live Event Capture
-o, --open-door Open door
-D DELETEUSER, --deleteuser DELETEUSER
Delete a User (uid)
-A ADDUSER, --adduser ADDUSER
Add a User (uid) (and enroll)
-E ENROLLUSER, --enrolluser ENROLLUSER
Enroll a User (uid)
-F FINGER, --finger FINGER
Finger for enroll (fid=0)
```
**Backup/Restore (Users and fingers only!!!)** *(WARNING! destructive test! do it at your own risk!)*
```sh
usage: ./test_backup_restore.py [-h] [-a ADDRESS] [-p PORT] [-T TIMEOUT]
[-P PASSWORD] [-f] [-v] [-r]
[filename]
ZK Basic Backup/Restore Tool
positional arguments:
filename backup filename (default [serialnumber].bak)
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
ZK device Address [192.168.1.201]
-p PORT, --port PORT ZK device port [4370]
-T TIMEOUT, --timeout TIMEOUT
Default [10] seconds (0: disable timeout)
-P PASSWORD, --password PASSWORD
Device code/password
-f, --force-udp Force UDP communication
-v, --verbose Print debug information
-E, --erase clean the device after writting backup!
-r, --restore Restore from backup
-c, --clear-attendance
On Restore, also clears the attendance [default keep
attendance]
```
to restore on a different device, make sure to specify the `filename`. on restoring, it asks for the serial number of the destination device (to make sure it was correct, as it deletes all data) WARNING. there is no way to restore attendance data, you can keep it or clear it, but once cleared, there is no way to restore it.
# Compatible devices
```
Firmware Version : Ver 6.21 Nov 19 2008
Platform : ZEM500
DeviceName : U580
Firmware Version : Ver 6.60 Oct 29 2012
Platform : ZEM800_TFT
DeviceName : iFace402/ID
Firmware Version : Ver 6.60 Dec 27 2014
Platform : ZEM600_TFT
DeviceName : iFace800/ID
Firmware Version : Ver 6.60 Mar 18 2013
Platform : ZEM560
DeviceName : MA300
Firmware Version : Ver 6.60 Dec 1 2010
Platform : ZEM510_TFT
DeviceName : T4-C
Firmware Version : Ver 6.60 Apr 9 2010
Platform : ZEM510_TFT
DeviceName : T4-C
Firmware Version : Ver 6.60 Mar 18 2011
Platform : ZEM600_TFT
DeviceName : iClock260
Firmware Version : Ver 6.60 Nov 6 2017 (remote tested with correct results)
Platform : ZMM220_TFT
DeviceName : (unknown device) (broken info but at least the important data was read)
Firmware Version : Ver 6.60 Jun 9 2017
Platform : JZ4725_TFT
DeviceName : K20 (latest checked correctly!)
```
### Latest tested (not really confirmed)
```
Firmware Version : Ver 6.60 Jun 16 2015
Platform : JZ4725_TFT
DeviceName : iClock260
Firmware Version : Ver 6.60 Jun 16 2015
Platform : JZ4725_TFT
DeviceName : K14 (not tested, but same behavior like the other one)
Firmware Version : Ver 6.60 Jun 5 2015
Platform : ZMM200_TFT
DeviceName : iClock3000/ID (Active testing! latest fix)
Firmware Version : Ver 6.70 Jul 12 2013
Platform : ZEM600_TFT
DeviceName : iClock880-H/ID (Active testing! latest fix)
```
### Not Working (needs more tests, more information)
```
Firmware Version : Ver 6.4.1 (build 99) (display version 2012-08-31)
Platform :
DeviceName : iClock260 (no capture data - probably similar problem as the latest TESTED)
```
If you have another version tested and it worked, please inform me to update this list!
# Todo
* Create better documentation
* Finger template downloader & uploader
* ~~Finger template downloader & uploader~~
* HTTP Rest api
* Create real time api (if possible)
* ~~Create real time api (if possible)~~
* and much more ...

36
basic_test.py Normal file
View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
import sys
sys.path.append("zk")
from zk import ZK, const
conn = None
zk = ZK('192.168.1.201', port=4370, timeout=5, password=0, force_udp=False, ommit_ping=False)
try:
print ('Connecting to device ...')
conn = zk.connect()
print ('Disabling device ...')
conn.disable_device()
print ('Firmware Version: : {}'.format(conn.get_firmware_version()))
# print '--- Get User ---'
users = conn.get_users()
for user in users:
privilege = 'User'
if user.privilege == const.USER_ADMIN:
privilege = 'Admin'
print ('- UID #{}'.format(user.uid))
print (' Name : {}'.format(user.name))
print (' Privilege : {}'.format(privilege))
print (' Password : {}'.format(user.password))
print (' Group ID : {}'.format(user.group_id))
print (' User ID : {}'.format(user.user_id))
print ("Voice Test ...")
conn.test_voice()
print ('Enabling device ...')
conn.enable_device()
except Exception as e:
print ("Process terminate : {}".format(e))
finally:
if conn:
conn.disconnect()

View File

@ -3,7 +3,7 @@ from setuptools import setup
setup(
name='pyzk',
version='0.6',
version='0.9',
description='an unofficial library of zksoftware fingerprint device',
url='https://github.com/fananimi/pyzk',
author='Fanani M. Ihsan',
@ -19,5 +19,6 @@ setup(
'biometrics',
'security'
],
install_requires=['future'],
zip_safe=False
)

481
test.py Normal file → Executable file
View File

@ -1,38 +1,455 @@
# -*- coding: utf-8 -*-
#!/usr/bin/env python2
# # -*- coding: utf-8 -*-
import sys
import os
import unittest
import codecs
from mock import patch, Mock, MagicMock
mock_socket = MagicMock(name='zk.socket')
sys.modules['zk.socket'] = mock_socket
from zk import ZK, const
from zk.base import ZK_helper
from zk.user import User
from zk.finger import Finger
from zk.attendance import Attendance
from zk.exception import ZKErrorResponse, ZKNetworkError
sys.path.append("zk")
conn = None
zk = ZK('192.168.1.10', port=4370, timeout=5)
try:
print 'Connecting to device ...'
unittest.TestCase.assertRaisesRegex
except AttributeError:
unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
def dump(obj, nested_level=0, output=sys.stdout):
spacing = ' '
if type(obj) == dict:
print >> output, '%s{' % ((nested_level) * spacing)
for k, v in obj.items():
if hasattr(v, '__iter__'):
print >> output, '%s%s:' % ((nested_level + 1) * spacing, k)
dump(v, nested_level + 1, output)
else:
print >> output, '%s%s: %s' % ((nested_level + 1) * spacing, k, v)
print >> output, '%s}' % (nested_level * spacing)
elif type(obj) == list:
print >> output, '%s[' % ((nested_level) * spacing)
for v in obj:
if hasattr(v, '__iter__'):
dump(v, nested_level + 1, output)
else:
print >> output, '%s%s' % ((nested_level + 1) * spacing, v)
print >> output, '%s]' % ((nested_level) * spacing)
else:
print >> output, '%s%s' % (nested_level * spacing, obj)
class PYZKTest(unittest.TestCase):
def setup(self):
pass
def tearDown(self):
pass
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_no_ping(self,helper, socket):
""" what if ping doesn't response """
helper.return_value.test_ping.return_value = False #no ping simulated
#begin
zk = ZK('192.168.1.201')
helper.assert_called_with('192.168.1.201', 4370) # called correctly
self.assertRaisesRegex(ZKNetworkError, "can't reach device", zk.connect)
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_correct_ping(self,helper, socket):
""" what if ping is ok """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 1 # helper tcp ok
socket.return_value.recv.return_value = b''
#begin
zk = ZK('192.168.1.201')
helper.assert_called_with('192.168.1.201', 4370) # called correctly
self.assertRaisesRegex(ZKNetworkError, "unpack requires", zk.connect) # no data...?
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_invalid(self, helper, socket):
""" Basic tcp invalid """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.return_value = b'Invalid tcp data'
#begin
zk = ZK('192.168.1.201')
helper.assert_called_with('192.168.1.201', 4370) # called correctly
self.assertRaisesRegex(ZKNetworkError, "TCP packet invalid", zk.connect)
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_connect(self, helper, socket):
""" Basic connection test """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.return_value = codecs.decode('5050827d08000000d007fffc2ffb0000','hex') # tcp CMD_ACK_OK
#begin
zk = ZK('192.168.1.201') # already tested
conn = zk.connect()
print 'Disabling device ...'
conn.disable_device()
print 'Firmware Version: : {}'.format(conn.get_firmware_version())
# print '--- Get User ---'
users = conn.get_users()
for user in users:
privilege = 'User'
if user.privilege == const.USER_ADMIN:
privilege = 'Admin'
print '- UID #{}'.format(user.uid)
print ' Name : {}'.format(user.name)
print ' Privilege : {}'.format(privilege)
print ' Password : {}'.format(user.password)
print ' Group ID : {}'.format(user.group_id)
print ' User ID : {}'.format(user.user_id)
print "Voice Test ..."
conn.test_voice()
print 'Enabling device ...'
conn.enable_device()
except Exception, e:
print "Process terminate : {}".format(e)
finally:
if conn:
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
conn.disconnect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e903e6002ffb0100', 'hex'))
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_force_udp_connect(self, helper, socket):
""" Force UDP connection test """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.return_value = codecs.decode('d007fffc2ffb0000','hex') # tcp CMD_ACK_OK
#begin
zk = ZK('192.168.1.201', force_udp=True)
conn = zk.connect()
socket.return_value.sendto.assert_called_with(codecs.decode('e80317fc00000000', 'hex'), ('192.168.1.201', 4370))
conn.disconnect()
socket.return_value.sendto.assert_called_with(codecs.decode('e903e6002ffb0100', 'hex'), ('192.168.1.201', 4370))
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_udp_connect(self, helper, socket):
""" Basic auto UDP connection test """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 1 # helper tcp nope
socket.return_value.recv.return_value = codecs.decode('d007fffc2ffb0000','hex') # tcp CMD_ACK_OK
#begin
zk = ZK('192.168.1.201')
conn = zk.connect()
socket.return_value.sendto.assert_called_with(codecs.decode('e80317fc00000000', 'hex'), ('192.168.1.201', 4370))
conn.disconnect()
socket.return_value.sendto.assert_called_with(codecs.decode('e903e6002ffb0100', 'hex'), ('192.168.1.201', 4370))
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_unauth(self, helper, socket):
""" Basic unauth test """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d5075bb2cf450000', 'hex'), # tcp CMD_UNAUTH
codecs.decode('5050827d08000000d5075ab2cf450100', 'hex') # tcp CMD_UNAUTH
]
#begin
zk = ZK('192.168.1.201', password=12)
self.assertRaisesRegex(ZKErrorResponse, "Unauthenticated", zk.connect)
socket.return_value.send.assert_called_with(codecs.decode('5050827d0c0000004e044e2ccf450100614d323c', 'hex')) # try with password 12
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_auth(self, helper, socket):
""" Basic auth test """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d5075bb2cf450000', 'hex'), # tcp CMD_UNAUTH
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex') # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201', password=45)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d0c0000004e044db0cf45010061c9323c', 'hex')) #auth with pass 45
conn.disconnect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e90345b6cf450200', 'hex')) #exit
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_size(self, helper, socket):
""" can read sizes? """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d64000000d007a3159663130000000000000000000000000000000000070000000000000006000000000000005d020000000000000f0c0000000000000100000000000000b80b000010270000a0860100b20b00000927000043840100000000000000', 'hex'), #sizes
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201') # already tested
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
conn.read_sizes()
socket.return_value.send.assert_called_with(codecs.decode('5050827d080000003200fcb9cf450200', 'hex'))
conn.disconnect()
self.assertEqual(conn.users, 7, "missed user data %s" % conn.users)
self.assertEqual(conn.fingers, 6, "missed finger data %s" % conn.fingers)
self.assertEqual(conn.records, 605, "missed record data %s" % conn.records)
self.assertEqual(conn.users_cap, 10000, "missed user cap %s" % conn.users_cap)
self.assertEqual(conn.fingers_cap, 3000, "missed finger cap %s" % conn.fingers_cap)
self.assertEqual(conn.rec_cap, 100000, "missed record cap %s" % conn.rec_cap)
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_users_small_data(self, helper, socket):
""" can get empty? """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d64000000d007a3159663130000000000000000000000000000000000070000000000000006000000000000005d020000000000000f0c0000000000000100000000000000b80b000010270000a0860100b20b00000927000043840100000000000000', 'hex'), #sizes
codecs.decode('5050827d04020000dd05942c96631500f801000001000e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003830380000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003832310000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833350000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833310000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003833320000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003836000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000383432000000000000000000000000000000000000000000','hex'), #DATA directly(not ok)
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
#codecs.decode('5050827d08000000d00745b2cf451b00', 'hex') # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201' )
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
users = conn.get_users()
socket.return_value.send.assert_called_with(codecs.decode('5050827d13000000df053ca6cf4514000109000500000000000000', 'hex')) #get users
self.assertEqual(len(users), 7, "incorrect size %s" % len(users))
#assert one user
usu = users[3]
self.assertIsInstance(usu.uid, int, "uid should be int() %s" % type(usu.uid))
if sys.version_info >= (3, 0):
self.assertIsInstance(usu.user_id, (str, bytes), "user_id should be str() or bytes() %s" % type(usu.user_id))
else:
self.assertIsInstance(usu.user_id, (str, unicode), "user_id should be str() or unicode() %s" % type(usu.user_id))
self.assertEqual(usu.uid, 4, "incorrect uid %s" % usu.uid)
self.assertEqual(usu.user_id, "831", "incorrect user_id %s" % usu.user_id)
self.assertEqual(usu.name, "NN-831", "incorrect uid %s" % usu.name) # generated
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_users_broken_data(self, helper, socket):
""" test case for K20 """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d007d7d758200000','hex'), #ACK Ok
codecs.decode('5050827d58000000d0074c49582013000000000000000000000000000000000002000000000000000000000000000000000000000000000007000000000000000000000000000000f4010000f401000050c30000f4010000f201000050c30000','hex'),#Sizes
codecs.decode('5050827d9c000000dd053c87582015009000000001000000000000000000006366756c616e6f0000000000000000000000000000000000000000000000000000000000003130303030316c70000000000000000000000000000000000200000000000000000000726d656e67616e6f0000000000000000000000000000000000','hex'),#DATA112
codecs.decode('000000000000000000000000323232323232636200000000000000000000000000000000','hex'), #extra data 36
#codecs.decode('','hex'), #
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for get_users TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for free_data TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for exit TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201') #, verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
users = conn.get_users()
#print (users) #debug
socket.return_value.send.assert_called_with(codecs.decode('5050827d13000000df05b3cb582014000109000500000000000000', 'hex')) #get users
self.assertEqual(len(users), 2, "incorrect size %s" % len(users))
#assert one user
usu = users[1]
self.assertIsInstance(usu.uid, int, "uid should be int() %s" % type(usu.uid))
if sys.version_info >= (3, 0):
self.assertIsInstance(usu.user_id, (str, bytes), "user_id should be str() or bytes() %s" % type(usu.user_id))
else:
self.assertIsInstance(usu.user_id, (str, unicode), "user_id should be str() or unicode() %s" % type(usu.user_id))
self.assertEqual(usu.uid, 2, "incorrect uid %s" % usu.uid)
self.assertEqual(usu.user_id, "222222cb", "incorrect user_id %s" % usu.user_id)
self.assertEqual(usu.name, "rmengano", "incorrect uid %s" % usu.name) # check test case
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_users_broken_tcp(self, helper, socket):
""" tst case for https://github.com/fananimi/pyzk/pull/18#issuecomment-406250746 """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d09000000d007babb5c3c100009', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d58000000d007292c5c3c13000000000000000000000000000000000046000000000000004600000000000000990c0000000000001a010000000000000600000006000000f4010000f401000050c30000ae010000ae010000b7b60000', 'hex'), #sizes
codecs.decode('5050827d15000000d007a7625c3c150000b4130000b4130000cdef2300','hex'), #PREPARE_BUFFER -> OK 5044
codecs.decode('5050827d10000000dc050da65c3c1600b4130000f0030000', 'hex'), # read_buffer -> Prepare_data 5044
codecs.decode('5050827df8030000dd05d05800001600b013000001000e35313437393833004a6573757353616c646976617200000000000000000000000000000001000000000000000035313437393833000000000000000000000000000000000002000e33343934383636004e69657665734c6f70657a00000000000000000000000000000000000100000000000000003334393438363600000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003337333139333600000000000000000000000000', 'hex'), # DATA 1016 -8 (util 216)
codecs.decode('0000000100000000000000003734383433330000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003433333939353800000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003333373335313100000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003337373535363100000000000000000000000000000000000b000000', 'hex'), # raw data 256
codecs.decode('0000000004000e00000000000000000000000000000000000000000000000000000000000000000000000001000000000000000032333338323035000000000000000000000000000000000005000e000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000333632363439300000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000313838343633340000000000000000000000000000000000070000000000000000000000000000000000000000000000000000000000000000000000', 'hex'), #raw data 256
codecs.decode('00000000000000000000000000000000000000000000000000000000000000000000000100000000000000003131313336333200000000000000000000000000000000000c00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003130353233383900000000000000000000000000000000000d00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003135333538333600000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000000000000000000100000000', 'hex'), #raw data 256
codecs.decode('000000003933313637300000000000000000000000000000', 'hex'), #raw data 24
codecs.decode('5050827df8030000dd0520b601001600000000000f00003334323931343800000000000000000000000000000000000000000000000000000000000100000000000000003334323931343800000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003139303636393700000000000000000000000000000000001100000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003139333831333500000000000000000000000000', 'hex'), # DATA 1016 -8 (util216
codecs.decode('00000000120000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000393231303537000000000000000000000000000000000000130000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000333634383739340000000000000000000000000000000000140000323831353732000000000000000000000000000000000000000000000000000000000000010000000000000000323831353732000000000000000000000000000000000000150000000000000000000000000000000000000000000000000000000000000000000000', 'hex'), #raw data 256
codecs.decode('00000001000000000000000031383133323236000000000000000000000000000000000016000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000035393037353800000000000000000000000000000000000017000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000031363933373232000000000000000000000000000000000018000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000033363430323131000000000000000000000000000000000019000000', 'hex'), #raw data 256
codecs.decode('00000000000000000000000000000000000000000000000000000000000000000000000100000000000000003331303733390000000000000000000000000000000000001a00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003433353430393400000000000000000000000000000000001b00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003338303736333200000000000000000000000000000000001c00000000000000000000000000000000000000000000000000000000000000000000000000000100000000', 'hex'), #raw data 256
codecs.decode('000000003231333938313700000000000000000000000000', 'hex'), #raw data 24
codecs.decode('5050827df8030000dd059a2102001600000000001d00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003333383738313900000000000000000000000000000000001e00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003439353634363800000000000000000000000000000000001f00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003832343030300000000000000000000000000000', 'hex'), #DATA 1016 -8 (util 216)
codecs.decode('00000000200000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000333937373437370000000000000000000000000000000000210000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000343435383038340000000000000000000000000000000000220000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000343430353130390000000000000000000000000000000000230000000000000000000000000000000000000000000000000000000000000000000000', 'hex'), #raw data 256
codecs.decode('00000001000000000000000033353732363931000000000000000000000000000000000024000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000033363336333832000000000000000000000000000000000025000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000033333232353432000000000000000000000000000000000026000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000039393437303800000000000000000000000000000000000027000000', 'hex'), #raw data 256
codecs.decode('00000000000000000000000000000000000000000000000000000000000000000000000100000000000000003836333539380000000000000000000000000000000000002800000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003338383736383000000000000000000000000000000000002900000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003739393434350000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000000000000000000100000000', 'hex'), # raw data 256
codecs.decode('000000003532313136340000000000000000000000000000', 'hex'), # raw data 24
codecs.decode('5050827df8030000dd053da903001600000000002b00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003439373033323400000000000000000000000000000000002c0000000000000000000000000000000000000000000000000000000000000000000000', 'hex'), # DATA 1016 -8 (util 112)
codecs.decode('0000000100000000000000003134363732353100000000000000000000000000000000002d000e32363635373336006d61726368756b0000000000000000000000000000000000000000000100000000000000003236363537333600000000000000000000000000', 'hex'), # raw data 104
codecs.decode('000000002e00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003136383133353200000000000000000000000000000000002f000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000034393633363732000000000000000000000000000000000030000000', 'hex'), # raw data 152
codecs.decode('00000000000000000000000000000000000000000000000000000000000000000000000100000000000000003337363137373100000000000000000000000000000000003100000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003231353939353100000000000000000000000000000000003200000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003136393734323700000000000000000000000000000000003300000000000000000000000000000000000000000000000000000000000000000000000000000100000000', 'hex'), # raw data 256
codecs.decode('0000000033373336323437000000000000000000000000000000000034000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000031323930313635000000000000000000000000000000000035000000000000000000000000000000000000000000000000000000', 'hex'), # raw data 128
codecs.decode('0000000000000000000000010000000000000000333236333636330000000000000000000000000000000000360000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000393031353036000000000000000000000000000000000000370000000000000000000000', 'hex'), # raw data 128
codecs.decode('0000000000000000000000000000000000000000000000000000000100000000000000003238313732393300000000000000000000000000000000003800000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003437303630333800000000000000000000000000', 'hex'), # raw data 128
codecs.decode('5050827df8030000dd05037d04001600000000003900000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003136343731353600000000000000000000000000000000003a0000000000000000000000000000000000000000000000000000000000000000000000', 'hex'), # DATA 1016 -8 (util 112)
codecs.decode('0000000100000000000000003530313435310000000000000000000000000000000000003b00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003534363236373300000000000000000000000000000000003c00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003533363730310000000000000000000000000000000000003d00000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003435383033303700000000000000000000000000000000003e000000', 'hex'), # raw data 256
codecs.decode('00000000000000000000000000000000000000000000000000000000000000000000000100000000000000003136333835333200000000000000000000000000000000003f000e3336323634313900000000000000000000000000000000000000000000000000000000000100000000000000003336323634313900000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003233323331383500000000000000000000000000000000004100000000000000000000000000000000000000000000000000000000000000000000000000000100000000', 'hex'), # raw data 256
codecs.decode('0000000035323930373337000000000000000000000000000000000042000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000033393839303636000000000000000000000000000000000043000000000000000000000000000000000000000000000000000000', 'hex'), # raw data 128
codecs.decode('0000000000000000000000010000000000000000343033323930390000000000000000000000000000000000440000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000323034363338380000000000000000000000000000000000450000000000000000000000', 'hex'), # raw data 128
codecs.decode('0000000000000000000000000000000000000000000000000000000100000000000000003733383730330000000000000000000000000000000000004600000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000003239313836333600000000000000000000000000', 'hex'), # raw data 128
codecs.decode('5050827d0c000000dd0507fa0500160000000000', 'hex'), # DATA 12-8 (util 4 ok) and ACK OK!!!
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for get_users TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for free_data TODO: generate proper sequenced response
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # CMD_ACK_OK for exit TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201') # , verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
users = conn.get_users()
#print (users) #debug
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000de05aebd5c3c1700', 'hex')) #get users
self.assertEqual(len(users), 70, "incorrect size %s" % len(users))
#assert one user
usu = users[1]
self.assertIsInstance(usu.uid, int, "uid should be int() %s" % type(usu.uid))
if sys.version_info >= (3, 0):
self.assertIsInstance(usu.user_id, (str, bytes), "user_id should be str() or bytes() %s" % type(usu.user_id))
else:
self.assertIsInstance(usu.user_id, (str, unicode), "user_id should be str() or unicode() %s" % type(usu.user_id))
self.assertEqual(usu.uid, 2, "incorrect uid %s" % usu.uid)
self.assertEqual(usu.user_id, "3494866", "incorrect user_id %s" % usu.user_id)
self.assertEqual(usu.name, "NievesLopez", "incorrect uid %s" % usu.name) # check test case
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def _test_tcp_get_template(self, helper, socket):
""" can get empty? """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d15000000d007acf93064160000941d0000941d0000b400be00', 'hex'), # ack ok with size 7572
codecs.decode('5050827d10000000dc05477830641700941d000000000100', 'hex'), #prepare data
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
#codecs.decode('5050827d08000000d00745b2cf451b00', 'hex') # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201', verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
templates = conn.get_templates()
self.assertEqual(len(templates), 6, "incorrect size %s" % len(templates))
#assert one user
usu = users[3]
self.assertIsInstance(usu.uid, int, "uid should be int() %s" % type(usu.uid))
if sys.version_info >= (3, 0):
self.assertIsInstance(usu.user_id, (str, bytes), "user_id should be str() or bytes() %s" % type(usu.user_id))
else:
self.assertIsInstance(usu.user_id, (str, unicode), "user_id should be str() or unicode() %s" % type(usu.user_id))
self.assertEqual(usu.uid, 4, "incorrect uid %s" % usu.uid)
self.assertEqual(usu.user_id, "831", "incorrect user_id %s" % usu.user_id)
self.assertEqual(usu.name, "NN-831", "incorrect uid %s" % usu.name) # generated
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def _test_tcp_get_template_1(self, helper, socket):
""" cchekc correct template 1 """
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d10000000dc055558d0983200dc040000f0030000', 'hex'), # tcp PREPARE_DATA 1244
codecs.decode('5050827df8030000dd0500f4000032004d9853533231000004dbda0408050709ced000001cda69010000008406316adb0c0012062900d000aad221001600390caf001cdbb106240031007e033bdb3b00e9067700850083d42b004300c503f40043dbd6037b005000460ea7db5900910f90009f0012d5e7005c00970a5f006ddb', 'hex'), # DATA (tcp 1016, actual 112?)
codecs.decode('930fa1009a00560f86db9d00820e86006f007dd3f400ab00a60fcd01b7dbb00b4b00bd0079083adbc00045035d000600c1df7300cc0039049e00dddb380e8c00da00e30dd8dbdc00220e130027004dd9f500e3009d0a6a00e9db26090001ef00ea03c5dbf0002306', 'hex'), #raw data 104
codecs.decode('d000380028d83400ff00430f6200fbdba70dfb0002016203c5db0201a5044b00c10132d4de0006019f080a000cdab70541000f01fe0f19db1901c902e600dc0198d839002f01360ed80037dabd04d4003301520104da38014f01a100830196d5f5004b015c0411005cdacd03bc67ab8d162b48ad18f7fec7448e448387afa1a3', 'hex'), # raw 128
codecs.decode('062b37ca3cf9f53c8087f9150926e03335df1b71aedbd0f2', 'hex'), # raw 24
codecs.decode('b40da90541168df1551f70fc15b51bf26d7d4501bf12915e6485fd966f0ba2072728987dc1018a12ab105ec7aa003508fef08a49b923f3e85e42edf5ea861bd1600d23151787fc78d522f38431883e809f0e4dd2008ecd8ed97670035acf0c763503f27c37ec76d982806986c6016bf952d01e0673820570a87e1a236005ad81', 'hex'), # raw 128
codecs.decode('7d8734949952bb929d81e5fdbcf99ca0c4886d8c65098c0e9aa6ac81e103c684607951d03b0ce9f0cd785885ad27d4f61bfc5de8bc7411de8d8f5910c518e004e9229304f90f9a891395912680ebc6f4c57fd3fceeb684f7c18ba78107fc2e16073e89f6d6b67fbb', 'hex'), # raw 104
codecs.decode('fb11e2feb3effd0e5391c61da77176359f7e4d8a0ff3090a01204501c76a19af07002b003ac0042300dbab0113c2fa07c56e02cbc32bc10400a1c31349df0008102d2a04c5120c9b8904008f0810fb0404c20f3a6407006fd709fbecfe0400041529f60304fd1931fb0b006ede0c391bc1c0c0460e00a3210b1a34c2ffffc3fd', 'hex'), # raw 128
codecs.decode('980f04832806404a5bc1940505da86292d0f0056f600f925', 'hex'), # raw 24
codecs.decode('5c43c243ff06c5733a5d85c7080040473f3d31dd01774d8983c4c000778982750b009459d551c426c3c0170900929b17fba3fc780800376135fefbe0ff1100396aed3b3146265ac0c1ffff15c5357232fffdc0fdc03f3bc141914514003f85e738fdfa2441ff5cc0ff45951504ec7ee9c0fac1fc053dc424c0554affc103c5f8', 'hex'), # raw 128
codecs.decode('94f2fd0e00668b06eac1f9b3c3fdc2fd08008388f3ef460a00869e13a56079cf013fb82d22c394c2c619c3c33ac45304c527e19d4d0c008aab1305c0fa1aff6050110083687dc713c396c0c2c1c104c1c6b10f0072b54cc14d83c519c1760e0055b9f8c1f8187486', 'hex'), # raw 104
codecs.decode('750d00797ff0fdee593bc1090086781657267f11004cc1375050827df4000000dd0548b10100320038ffc024c2fec4c1c18c05c4fad0013ec54051c2879d00cb56521cc2c204c50fc2e62506008eca1a05fec5250d0072d23dc344c2c45cc10a008bd31a3afefa1a92c0080034e68642c45d0d005bdd376707c08da002008ede', 'hex'), # raw 128
codecs.decode('24ffc100e405213306002de78637c4de011de846ff98c100', 'hex'), # raw 24
codecs.decode('07283b590300fef3f5f800da10f5494b031000071819061035084365650b14900834c0c1c4c104c1c5a302100e1134c1c01045c83c8806110e2185c22edd11082424fec006ff02cb052834c3c073c910d4eb965b3833ff0bc582cce18d876a051106f337f826c00410013d2b05c200ca003f4cfeff03d56454ccc101', 'hex'), # raw 124
codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK
#codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201', verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
template = conn.get_user_template(14, 1)
self.assertEqual(template.size, 1243, "incorrect size %s" % template.size)
self.assertEqual(template.mark, "4d98535332310000...feff03d56454ccc1", "incorrect mark %s" % template.mark)
self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid)
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_template_1f(self, helper, socket):
""" cchekc correct template 1 fixed"""
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d10000000dc055558d0983200dc040000f0030000', 'hex'), # tcp PREPARE_DATA 1244
codecs.decode('5050827df8030000dd0500f4000032004d9853533231000004dbda0408050709ced000001cda69010000008406316adb0c0012062900d000aad221001600390caf001cdbb106240031007e033bdb3b00e9067700850083d42b004300c503f40043dbd6037b005000460ea7db5900910f90009f0012d5e7005c00970a5f006ddb930fa1009a00560f86db9d00820e86006f007dd3f400ab00a60fcd01b7dbb00b4b00bd0079083adbc00045035d000600c1df7300cc0039049e00dddb380e8c00da00e30dd8dbdc00220e130027004dd9f500e3009d0a6a00e9db26090001ef00ea03c5dbf0002306', 'hex'), # DATA (tcp 1016, actual 112 +104
codecs.decode('d000380028d83400ff00430f6200fbdba70dfb0002016203c5db0201a5044b00c10132d4de0006019f080a000cdab70541000f01fe0f19db1901c902e600dc0198d839002f01360ed80037dabd04d4003301520104da38014f01a100830196d5f5004b015c0411005cdacd03bc67ab8d162b48ad18f7fec7448e448387afa1a3062b37ca3cf9f53c8087f9150926e03335df1b71aedbd0f2', 'hex'), # raw 128 + 24
codecs.decode('b40da90541168df1551f70fc15b51bf26d7d4501bf12915e6485fd966f0ba2072728987dc1018a12ab105ec7aa003508fef08a49b923f3e85e42edf5ea861bd1600d23151787fc78d522f38431883e809f0e4dd2008ecd8ed97670035acf0c763503f27c37ec76d982806986c6016bf952d01e0673820570a87e1a236005ad817d8734949952bb929d81e5fdbcf99ca0c4886d8c65098c0e9aa6ac81e103c684607951d03b0ce9f0cd785885ad27d4f61bfc5de8bc7411de8d8f5910c518e004e9229304f90f9a891395912680ebc6f4c57fd3fceeb684f7c18ba78107fc2e16073e89f6d6b67fbb', 'hex'), # raw 128 +104
codecs.decode('fb11e2feb3effd0e5391c61da77176359f7e4d8a0ff3090a01204501c76a19af07002b003ac0042300dbab0113c2fa07c56e02cbc32bc10400a1c31349df0008102d2a04c5120c9b8904008f0810fb0404c20f3a6407006fd709fbecfe0400041529f60304fd1931fb0b006ede0c391bc1c0c0460e00a3210b1a34c2ffffc3fd980f04832806404a5bc1940505da86292d0f0056f600f925', 'hex'), # raw 128 +24
codecs.decode('5c43c243ff06c5733a5d85c7080040473f3d31dd01774d8983c4c000778982750b009459d551c426c3c0170900929b17fba3fc780800376135fefbe0ff1100396aed3b3146265ac0c1ffff15c5357232fffdc0fdc03f3bc141914514003f85e738fdfa2441ff5cc0ff45951504ec7ee9c0fac1fc053dc424c0554affc103c5f894f2fd0e00668b06eac1f9b3c3fdc2fd08008388f3ef460a00869e13a56079cf013fb82d22c394c2c619c3c33ac45304c527e19d4d0c008aab1305c0fa1aff6050110083687dc713c396c0c2c1c104c1c6b10f0072b54cc14d83c519c1760e0055b9f8c1f8187486', 'hex'), # raw 128 +104
codecs.decode('750d00797ff0fdee593bc1090086781657267f11004cc137', 'hex'), # raw 24?
codecs.decode('5050827df4000000dd0548b10100320038ffc024c2fec4c1c18c05c4fad0013ec54051c2879d00cb56521cc2c204c50fc2e62506008eca1a05fec5250d0072d23dc344c2c45cc10a008bd31a3afefa1a92c0080034e68642c45d0d005bdd376707c08da002008ede24ffc100e405213306002de78637c4de011de846ff98c100', 'hex'), # raw 128-24 (104) +24
codecs.decode('07283b590300fef3f5f800da10f5494b031000071819061035084365650b14900834c0c1c4c104c1c5a302100e1134c1c01045c83c8806110e2185c22edd11082424fec006ff02cb052834c3c073c910d4eb965b3833ff0bc582cce18d876a051106f337f826c00410013d2b05c200ca003f4cfeff03d56454ccc101', 'hex'), # raw 124
codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201') #, verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
template = conn.get_user_template(14, 1)
self.assertEqual(template.size, 1243, "incorrect size %s" % template.size)
self.assertEqual(template.mark, "4d98535332310000...feff03d56454ccc1", "incorrect mark %s" % template.mark)
self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid)
conn.disconnect()
@patch('zk.base.socket')
@patch('zk.base.ZK_helper')
def test_tcp_get_template_2f(self, helper, socket):
""" cchekc correct template 2 fixed"""
helper.return_value.test_ping.return_value = True # ping simulated
helper.return_value.test_tcp.return_value = 0 # helper tcp ok
socket.return_value.recv.side_effect = [
codecs.decode('5050827d08000000d0075fb2cf450100', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d10000000dc053b59d0983500f3030000f0030000', 'hex'), # tcp PREPARE_DATA 1011
codecs.decode('5050827df8030000dd056855000035004ab153533231000003f2f10408050709ced000001bf36901000000831f256cf23e00740f4c008900f2f879005500fe0fe3005bf2d30a60005c00a00f32f26600580a2700ad00e3fd98007500800f000082f21a0f68008300300e5bf28d00570930004b00dafd4c009a00dd090900a8f2270f8600ad008a0b1ff2b000480f4400730040fc5400b800430f4400c6f2370ab100ca00f30ecbf2cb002f0f4a001300c7fdaa00e400b50c4300e6f2b706bf00ea00f90668f2f2002e0dad003000b7f7cf00f600350cbe0008f31f0dd0000c017101cbf20f019c01', 'hex'), # DATA (tcp 1016, actual 112 +104
codecs.decode('5e00d4012dfdda001301a408e00019f3400c12002201fc0c4ff2570193096d0092018dfc3c7a62107e85688f818ff39a358ef99acb0fee06d47da2e2116a7c77f102a57bd1890a6a598b5ee2db0a0f64a384b28da105f29ca7eff9a137194560847d1565aa827ffc69705ffa8189f19f1f9ca10abbf2160f791a6e0dd8af0f723e062b6e84000a997780c100f6684b8016188780d7f44d0a', 'hex'), # raw 128 + 24
codecs.decode('5083790fd0fa1a089ef44b807572db9b0900d9795083397a8780ca0161091489ae7b7c134278a6004c00b68bcf80e9f98982509a0e01dbf02e6a441a21138a70ddeaf1f9b16a8f1025f2ceef74f369094b70b2fb3a176bb339f9860f6459f304bb679757b3fca891ba733c4c6444c72032f303131c9705004b3079bc0600a03a89c405fdc03205004b456254c6006fb276c20a00a94343c2fc30779505001b4f862804f27d51faff31c2cd007fa50141c12f1800085a9431c181c4fe83c10674c33275300600245c89fcc0ad07005b5c6b88040503a96267c1830700e9695d30c1c2510a0031ae57', 'hex'), # raw 128 +104
codecs.decode('5fa47a04007c7574510f039e80f0fd3bfefe9d55c3fa01c7841746ff06fa1ff2ee8ea07e787e0689c133c1c3c0c2ffc004c1fcae07005990578c040d03dc9350c0c4376a3a8623f2f29ea2c17c67b0928330726b6a83ff08c582afa8c5c3c3c1c3fec300895f0efdfd2809000bae21be5afd0c001cb68c59c20dc3fefda205004fb8150cfbc1030089bbffc30ef245bc467bc07404c288fd', 'hex'), # raw 128 +24
codecs.decode('0155bd46786445c3c130c0040091c52938c320f305c8a4c1ff7b05c08a63c3c2c1c2c3c13ac1c132c1ffc2c0c0c205c3c336050084c9306ec100b13f352c0700cacdf56b72f611f61a2d1605d5ef41a4fec0f818004c17c63e0dfef9c0fdfffe3b3649a0fac00c004ada856a6464c20b006cf83145c1c032c23d04109804d57617e28f07a0fe3bff3bfbfe0afc2ac0fdc138c01095f91bc543281101cbb0c19758fe9282c3c26270737997c1c0c2c0c204c70be27f0f2084c5fc070913ad1731c2c1c37b0125130c1ba958c049ff4e9bc6529262c1c290c2076ac2ed11e718a9554b068bc730b196', 'hex'), # raw 128 +104
codecs.decode('c2c1c2c1077dfc830210074929c1c910c5af81c0c1ffc2fe', 'hex'), # raw 24?
codecs.decode('5050827d0b000000dd054ba201003500a05701', 'hex'), # raw 43-24 (104)
codecs.decode('5050827d08000000d007fcf701003200', 'hex'), # tcp CMD_ACK_OK
codecs.decode('5050827d08000000d00745b2cf451b00', 'hex'), # tcp random CMD_ACK_OK TODO: generate proper sequenced response
]
#begin
zk = ZK('192.168.1.201', verbose=True)
conn = zk.connect()
socket.return_value.send.assert_called_with(codecs.decode('5050827d08000000e80317fc00000000', 'hex'))
template = conn.get_user_template(14, 1)
self.assertEqual(template.size, 1010, "incorrect size %s" % template.size)
self.assertEqual(template.mark, "4ab1535332310000...81c0c1ffc2fea057", "incorrect mark %s" % template.mark)
self.assertEqual(template.uid, 14, "incorrect uid %s" % template.uid)
conn.disconnect()
if __name__ == '__main__':
unittest.main()

151
test_backup_restore.py Executable file
View File

@ -0,0 +1,151 @@
#!/usr/bin/env python2
# # -*- coding: utf-8 -*-
import sys
import traceback
import argparse
import time
import datetime
import codecs
from builtins import input
import json
sys.path.append("zk")
from zk import ZK, const
from zk.user import User
from zk.finger import Finger
from zk.attendance import Attendance
from zk.exception import ZKErrorResponse, ZKNetworkError
class BasicException(Exception):
pass
conn = None
parser = argparse.ArgumentParser(description='ZK Basic Backup/Restore Tool')
parser.add_argument('-a', '--address',
help='ZK device Address [192.168.1.201]', default='192.168.1.201')
parser.add_argument('-p', '--port', type=int,
help='ZK device port [4370]', default=4370)
parser.add_argument('-T', '--timeout', type=int,
help='Default [10] seconds (0: disable timeout)', default=10)
parser.add_argument('-P', '--password', type=int,
help='Device code/password', default=0)
parser.add_argument('-f', '--force-udp', action="store_true",
help='Force UDP communication')
parser.add_argument('-v', '--verbose', action="store_true",
help='Print debug information')
parser.add_argument('-E', '--erase', action="store_true",
help='clean the device after writting backup!')
parser.add_argument('-r', '--restore', action="store_true",
help='Restore from backup')
parser.add_argument('-c', '--clear-attendance', action="store_true",
help='On Restore, also clears the attendance [default keep attendance]')
parser.add_argument('filename', nargs='?',
help='backup filename (default [serialnumber].bak)', default='')
args = parser.parse_args()
def erase_device(conn, serialnumber, clear_attendance=False):
"""input serial number to corroborate."""
print ('WARNING! the next step will erase the current device content.')
print ('Please input the serialnumber of this device [{}] to acknowledge the ERASING!'.format(serialnumber))
new_serial = input ('Serial Number : ')
if new_serial != serialnumber:
raise BasicException('Serial number mismatch')
conn.disable_device()
print ('Erasing device...')
conn.clear_data()
if clear_attendance:
print ('Clearing attendance too!')
conn.clear_attendance()
conn.read_sizes()
print (conn)
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose)
try:
print('Connecting to device ...')
conn = zk.connect()
serialnumber = conn.get_serialnumber()
fp_version = conn.get_fp_version()
print ('Serial Number : {}'.format(serialnumber))
print ('Finger Version : {}'.format(fp_version))
filename = args.filename if args.filename else "{}.json.bak".format(serialnumber)
print ('')
if not args.restore:
print ('--- sizes & capacity ---')
conn.read_sizes()
print (conn)
print ('--- Get User ---')
inicio = time.time()
users = conn.get_users()
final = time.time()
print ('Read {} users took {:.3f}[s]'.format(len(users), final - inicio))
if len(users) == 0:
raise BasicException("Empty user list, aborting...")
print ("Read Templates...")
inicio = time.time()
templates = conn.get_templates()
final = time.time()
print ('Read {} templates took {:.3f}[s]'.format(len(templates), final - inicio))
#save to file!
print ('')
print ('Saving to file {} ...'.format(filename))
output = open(filename, 'w')
data = {
'version':'1.00jut',
'serial': serialnumber,
'fp_version': fp_version,
'users': [u.__dict__ for u in users],
'templates':[t.json_pack() for t in templates]
}
json.dump(data, output, indent=1)
output.close()
if args.erase:
erase_device(conn, serialnumber, args.clear_attendance)
else:
print ('Reading file {}'.format(filename))
infile = open(filename, 'r')
data = json.load(infile)
infile.close()
#compare versions...
if data['version'] != '1.00jut':
raise BasicException("file with different version... aborting!")
if data['fp_version'] != fp_version:
raise BasicException("fingerprint version mismmatch {} != {} ... aborting!".format(fp_version, data['fp_version']))
#TODO: check data consistency...
users = [User.json_unpack(u) for u in data['users']]
#print (users)
print ("INFO: ready to write {} users".format(len(users)))
templates = [Finger.json_unpack(t) for t in data['templates']]
#print (templates)
print ("INFO: ready to write {} templates".format(len(templates)))
erase_device(conn, serialnumber, args.clear_attendance)
print ('Restoring Data...')
for u in users:
#look for Templates
temps = list(filter(lambda f: f.uid ==u.uid, templates))
#print ("user {} has {} fingers".format(u.uid, len(temps)))
conn.save_user_template(u,temps)
conn.enable_device()
print ('--- final sizes & capacity ---')
conn.read_sizes()
print (conn)
except BasicException as e:
print (e)
print ('')
except Exception as e:
print ("Process terminate : {}".format(e))
print ("Error: %s" % sys.exc_info()[0])
print ('-'*60)
traceback.print_exc(file=sys.stdout)
print ('-'*60)
finally:
if conn:
print ('Enabling device ...')
conn.enable_device()
conn.disconnect()
print ('ok bye!')
print ('')

271
test_machine.py Executable file
View File

@ -0,0 +1,271 @@
#!/usr/bin/env python2
# # -*- coding: utf-8 -*-
import sys
import traceback
import argparse
import time
import datetime
import codecs
from builtins import input
sys.path.append("zk")
from zk import ZK, const
from zk.user import User
from zk.finger import Finger
from zk.attendance import Attendance
from zk.exception import ZKErrorResponse, ZKNetworkError
class BasicException(Exception):
pass
conn = None
parser = argparse.ArgumentParser(description='ZK Basic Reading Tests')
parser.add_argument('-a', '--address',
help='ZK device Address [192.168.1.201]', default='192.168.1.201')
parser.add_argument('-p', '--port', type=int,
help='ZK device port [4370]', default=4370)
parser.add_argument('-T', '--timeout', type=int,
help='Default [10] seconds (0: disable timeout)', default=10)
parser.add_argument('-P', '--password', type=int,
help='Device code/password', default=0)
parser.add_argument('-b', '--basic', action="store_true",
help='get Basic Information only. (no bulk read, ie: users)')
parser.add_argument('-f', '--force-udp', action="store_true",
help='Force UDP communication')
parser.add_argument('-v', '--verbose', action="store_true",
help='Print debug information')
parser.add_argument('-t', '--templates', action="store_true",
help='Get templates / fingers (compare bulk and single read)')
parser.add_argument('-tr', '--templates-raw', action="store_true",
help='Get raw templates (dump templates)')
parser.add_argument('-ti', '--templates-index', type=int,
help='Get specific template', default=0)
parser.add_argument('-r', '--records', action="store_true",
help='Get attendance records')
parser.add_argument('-u', '--updatetime', action="store_true",
help='Update Date/Time')
parser.add_argument('-l', '--live-capture', action="store_true",
help='Live Event Capture')
parser.add_argument('-o', '--open-door', action="store_true",
help='Open door')
parser.add_argument('-D', '--deleteuser', type=int,
help='Delete a User (uid)', default=0)
parser.add_argument('-A', '--adduser', type=int,
help='Add a User (uid) (and enroll)', default=0)
parser.add_argument('-E', '--enrolluser', type=int,
help='Enroll a User (uid)', default=0)
parser.add_argument('-F', '--finger', type=int,
help='Finger for enroll (fid=0)', default=0)
args = parser.parse_args()
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose)
try:
print('Connecting to device ...')
conn = zk.connect()
print('SDK build=1 : %s' % conn.set_sdk_build_1()) # why?
print ('Disabling device ...')
conn.disable_device()
fmt = conn.get_extend_fmt()
print ('ExtendFmt : {}'.format(fmt))
fmt = conn.get_user_extend_fmt()
print ('UsrExtFmt : {}'.format(fmt))
print ('Face FunOn : {}'.format(conn.get_face_fun_on()))
print ('Face Version : {}'.format(conn.get_face_version()))
print ('Finger Version : {}'.format(conn.get_fp_version()))
print ('Old Firm compat : {}'.format(conn.get_compat_old_firmware()))
net = conn.get_network_params()
print ('IP:{} mask:{} gateway:{}'.format(net['ip'],net['mask'], net['gateway']))
now = datetime.datetime.today().replace(microsecond=0)
if args.updatetime:
print ('--- Updating Time---')
conn.set_time(now)
zk_time = conn.get_time()
dif = abs(zk_time - now).total_seconds()
print ('Time : {}'.format(zk_time))
if dif > 120:
print("WRN: TIME IS NOT SYNC!!!!!! (local: %s) use command -u to update" % now)
print ('Firmware Version : {}'.format(conn.get_firmware_version()))
print ('Platform : %s' % conn.get_platform())
print ('DeviceName : %s' % conn.get_device_name())
print ('Pin Width : %i' % conn.get_pin_width())
print ('Serial Number : %s' % conn.get_serialnumber())
print ('MAC: %s' % conn.get_mac())
print ('')
print ('--- sizes & capacity ---')
conn.read_sizes()
print (conn)
print ('')
if args.basic:
raise BasicException("Basic Info... Done!")
print ('--- Get User ---')
inicio = time.time()
users = conn.get_users()
final = time.time()
print (' took {:.3f}[s]'.format(final - inicio))
max_uid = 0
prev = None
if not args.deleteuser:
for user in users:
privilege = 'User'
if user.uid > max_uid:
max_uid = user.uid
privilege = 'User' if user.privilege == const.USER_DEFAULT else 'Admin-%s' % user.privilege
print ('-> UID #{:<5} Name : {:<27} Privilege : {}'.format(user.uid, user.name, privilege))
print (' Group ID : {:<8} User ID : {:<8} Password : {:<8} Card : {}'.format(user.group_id, user.user_id, user.password, user.card))
#print (len (user.repack73()), user.repack73().encode('hex'))
#print ('')
if args.adduser and user.uid == args.adduser:
prev = user
if args.deleteuser:
print ('')
print ('-- Delete User UID#%s ---' % args.deleteuser)
#TODO implementar luego
conn.delete_user(args.deleteuser)
users = conn.get_users() #update
for user in users:
if user.uid > max_uid:
max_uid = user.uid
privilege = 'User' if user.privilege == const.USER_DEFAULT else 'Admin-%s' % user.privilege
print ('-> UID #{:<5} Name : {:<27} Privilege : {}'.format(user.uid, user.name, privilege))
print (' Group ID : {:<8} User ID : {:<8} Password : {:<8} Card : {}'.format(user.group_id, user.user_id, user.password, user.card))
#print len (user.repack73()), user.repack73().encode('hex')
#print ''
if args.adduser and user.uid == args.adduser:
prev = user
print (' took {:.3f}[s]'.format(final - inicio))
if args.adduser:
uid = int(args.adduser)
if prev:
user = prev
privilege = 'User' if user.privilege == const.USER_DEFAULT else 'Admin-%s' % user.privilege
print ('')
print ('--- Modify User %i ---' % user.uid)
print ('-> UID #{:<5} Name : {:<27} Privilege : {}'.format(user.uid, user.name, privilege))
print (' Group ID : {:<8} User ID : {:<8} Password : {:<8} Card : {}'.format(user.group_id, user.user_id, user.password, user.card))
#discard prev
else:
print ('--- Add new User %i ---' % uid)
name = input('Name :')
admin = input('Admin (y/N):')
privilege = 14 if admin == 'y' else 0
password = input('Password :')
user_id = input('User ID2 :')
card = input('Card :')
card = int(card) if card else 0
#if prev:
# conn.delete_user(uid) #borrado previo
try:
conn.set_user(uid, name, privilege, password, '', user_id, card)
args.enrolluser = uid
except ZKErrorResponse as e:
print ("error: %s" % e)
#try new format
zk_user = User(uid, name, privilege, password, '', user_id, card)
conn.save_user_template(zk_user)# forced creation
args.enrolluser = uid
conn.refresh_data()
if args.enrolluser:
uid = int(args.enrolluser)
print ('--- Enrolling User #{} ---'.format(uid))
conn.delete_user_template(uid, args.finger)
conn.reg_event(0xFFFF) #
if conn.enroll_user(uid, args.finger):
conn.test_voice(18) # register ok
tem = conn.get_user_template(uid, args.finger)
print (tem)
else:
conn.test_voice(23) # not registered
conn.refresh_data()
#print ("Voice Test ...")
#conn.test_voice(10)
if args.templates_index:
print ("Read Single template... {}".format(args.templates_index))
inicio = time.time()
template = conn.get_user_template(args.templates_index, args.finger)
final = time.time()
print (' took {:.3f}[s]'.format(final - inicio))
print (" single! {}".format(template))
elif args.templates or args.templates_raw:
print ("Read Templates...")
inicio = time.time()
templates = conn.get_templates()
final = time.time()
print (' took {:.3f}[s]'.format(final - inicio))
if args.templates:
print ('now checking individually...')
i = 0
for tem in templates:
i += 1
tem2 =conn.get_user_template(tem.uid,tem.fid)
if tem2 is None:
print ("%i: bulk! %s" % (i, tem))
elif tem == tem2: # compare with alternative method
print ("%i: OK! %s" % (i, tem))
else:
print ("%i: dif-1 %s" % (i, tem))
print ("%i: dif-2 %s" % (i, tem2))
print (' took {:.3f}[s]'.format(final - inicio))
else:
print ('template dump')
i = 0
for tem in templates:
i += 1
print ("%i: %s" % (i, tem.dump()))
print (' took {:.3f}[s]'.format(final - inicio))
if args.records:
print ("Read Records...")
inicio = time.time()
attendance = conn.get_attendance()
final = time.time()
print (' took {:.3f}[s]'.format(final - inicio))
i = 0
for att in attendance:
i += 1
print ("ATT {:>6}: uid:{:>3}, user_id:{:>8} t: {}, s:{} p:{}".format(i, att.uid, att.user_id, att.timestamp, att.status, att.punch))
print (' took {:.3f}[s]'.format(final - inicio))
print ('')
print ('--- sizes & capacity ---')
conn.read_sizes()
print (conn)
if args.open_door:
print ('')
print ('--- Opening door 10s ---')
conn.unlock(10)
print (' -- done!---')
if args.live_capture:
print ('')
print ('--- Live Capture! (press ctrl+C to break) ---')
counter = 0
for att in conn.live_capture():# using a generator!
if att is None:
#counter += 1 #enable to implemet a poorman timeout
print ("timeout {}".format(counter))
else:
print ("ATT {:>6}: uid:{:>3}, user_id:{:>8} t: {}, s:{} p:{}".format(counter, att.uid, att.user_id, att.timestamp, att.status, att.punch))
if counter >= 10:
conn.end_live_capture = True
print('')
print('--- capture End!---')
print ('')
except BasicException as e:
print (e)
print ('')
except Exception as e:
print ("Process terminate : {}".format(e))
print ("Error: %s" % sys.exc_info()[0])
print ('-'*60)
traceback.print_exc(file=sys.stdout)
print ('-'*60)
finally:
if conn:
print ('Enabling device ...')
conn.enable_device()
conn.disconnect()
print ('ok bye!')
print ('')

45
test_voice.py Executable file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
import sys
import argparse
sys.path.append("zk")
from time import sleep
from zk import ZK, const
parser = argparse.ArgumentParser(description='ZK Basic Reading Tests')
parser.add_argument('-a', '--address',
help='ZK device Addres [192.168.1.201]', default='192.168.1.201')
parser.add_argument('-p', '--port', type=int,
help='device port [4370]', default=4370)
parser.add_argument('-T', '--timeout', type=int,
help='timeout [60]', default=60)
parser.add_argument('-P', '--password', type=int,
help='Device code/password', default=0)
parser.add_argument('-f', '--force-udp', action="store_true",
help='Force UDP communication')
parser.add_argument('-v', '--verbose', action="store_true",
help='Print debug information')
args = parser.parse_args()
conn = None
zk = ZK(args.address, port=args.port, timeout=args.timeout, password=args.password, force_udp=args.force_udp, verbose=args.verbose)
try:
print ('Connecting to device ...')
conn = zk.connect()
print ('Disabling device ...')
conn.disable_device()
print ('Firmware Version: : {}'.format(conn.get_firmware_version()))
for i in range(0,65):
print ("test_voice, %i" % i)
zk.test_voice(i)
sleep(3)
print ('Enabling device ...')
conn.enable_device()
except Exception as e:
print ("Process terminate : {}".format(e))
finally:
if conn:
conn.disconnect()

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from base import ZK
from .base import ZK
VERSION = (0, 6)
VERSION = (0, 9)
__all__ = ['ZK']

View File

@ -1,12 +1,14 @@
# -*- coding: utf-8 -*-
class Attendance(object):
def __init__(self, user_id, timestamp, status):
def __init__(self, user_id, timestamp, status, punch=0, uid=0):
self.uid = uid # not really used any more
self.user_id = user_id
self.timestamp = timestamp
self.status = status
self.punch = punch
def __str__(self):
return '<Attendance>: {}'.format(self.user_id)
return '<Attendance>: {} : {} ({}, {})'.format(self.user_id, self.timestamp, self.status, self.punch)
def __repr__(self):
return '<Attendance>: {}'.format(self.user_id)
return '<Attendance>: {} : {} ({}, {})'.format(self.user_id, self.timestamp,self.status, self.punch)

2045
zk/base.py

File diff suppressed because it is too large Load Diff

View File

@ -94,6 +94,8 @@ EF_FPFTR = (1<<8)# be real-time capture fingerprint minutia
EF_ALARM = (1<<9)# Alarm signal
USER_DEFAULT = 0
USER_ENROLLER = 2
USER_MANAGER = 6
USER_ADMIN = 14
FCT_ATTLOG = 1
@ -103,3 +105,6 @@ FCT_OPLOG = 4
FCT_USER = 5
FCT_SMS = 6
FCT_UDATA = 7
MACHINE_PREPARE_DATA_1 = 20560 # 0x5050
MACHINE_PREPARE_DATA_2 = 32130 # 0x7282

46
zk/finger.py Normal file
View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
from struct import pack #, unpack
import codecs
class Finger(object):
def __init__(self, uid, fid, valid, template):
self.size = len(template) # template only
self.uid = int(uid)
self.fid = int(fid)
self.valid = int(valid)
self.template = template
#self.mark = str().encode("hex")
self.mark = codecs.encode(template[:8], 'hex') + b'...' + codecs.encode(template[-8:], 'hex')
def repack(self): #full
return pack("HHbb%is" % (self.size), self.size+6, self.uid, self.fid, self.valid, self.template)
def repack_only(self): #only template
return pack("H%is" % (self.size), self.size, self.template)
@staticmethod
def json_unpack(json):
return Finger(
uid=json['uid'],
fid=json['fid'],
valid=json['valid'],
template=codecs.decode(json['template'],'hex')
)
def json_pack(self): #packs for json
return {
"size": self.size,
"uid": self.uid,
"fid": self.fid,
"valid": self.valid,
"template": codecs.encode(self.template, 'hex').decode('ascii')
}
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __str__(self):
return "<Finger> [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, self.mark)
def __repr__(self):
return "<Finger> [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, self.mark)
def dump(self):
return "<Finger> [uid:{:>3}, fid:{}, size:{:>4} v:{} t:{}]".format(self.uid, self.fid, self.size, self.valid, codecs.encode(self.template, 'hex'))

View File

@ -1,16 +1,40 @@
# -*- coding: utf-8 -*-
from struct import pack #, unpack
class User(object):
encoding = 'UTF-8'
def __init__(self, uid, name, privilege, password='', group_id='', user_id=''):
def __init__(self, uid, name, privilege, password='', group_id='', user_id='', card=0):
self.uid = uid
self.name = name
self.name = str(name)
self.privilege = privilege
self.password = password
self.group_id = group_id
self.password = str(password)
self.group_id = str(group_id)
self.user_id = user_id
self.card = int(card) # 64 int to 40 bit int
@staticmethod
def json_unpack(json):
#validate?
return User(
uid=json['uid'],
name=json['name'],
privilege=json['privilege'],
password=json['password'],
group_id=json['group_id'],
user_id=json['user_id'],
card=json['card']
)
def repack29(self): # with 02 for zk6 (size 29)
return pack("<BHB5s8sIxBhI", 2, self.uid, self.privilege, self.password.encode(User.encoding, errors='ignore'), self.name.encode(User.encoding, errors='ignore'), self.card, int(self.group_id) if self.group_id else 0, 0, int(self.user_id))
def repack73(self): #with 02 for zk8 (size73)
#password 6s + 0x00 + 0x77
# 0,0 => 7sx group id, timezone?
return pack("<BHB8s24sIB7sx24s", 2, self.uid, self.privilege,self.password.encode(User.encoding, errors='ignore'), self.name.encode(User.encoding, errors='ignore'), self.card, 1, str(self.group_id).encode(User.encoding, errors='ignore'), str(self.user_id).encode(User.encoding, errors='ignore'))
def __str__(self):
return '<User>: {}'.format(self.name)
return '<User>: [uid:{}, name:{} user_id:{}]'.format(self.uid, self.name, self.user_id)
def __repr__(self):
return '<User>: {}'.format(self.name)
return '<User>: [uid:{}, name:{} user_id:{}]'.format(self.uid, self.name, self.user_id)

593
zk6.lua Normal file
View File

@ -0,0 +1,593 @@
----------------------------------------
-- script-name: zk6_udp_dissector.lua
--
-- author: Arturo Hernandez
-- Copyright (c) 2018
-- This code is in the Public Domain, or the BSD (3 clause) license if Public Domain does not apply
-- in your country.
--
-- Version: 1.0
--
-- BACKGROUND:
-- based on the example dns_dissector.lua from Hadriel Kaplan
--
-- OVERVIEW:
-- This script creates an dissector for the UDP protocol on ZK products.
--
-- HOW TO RUN THIS SCRIPT:
-- Wireshark and Tshark support multiple ways of loading Lua scripts: through a dofile() call in init.lua,
-- through the file being in either the global or personal plugins directories, or via the command line.
-- See the Wireshark USer's Guide chapter on Lua (http://www.wireshark.org/docs/wsug_html_chunked/wsluarm.html).
-- Once the script is loaded, it creates a new protocol named "MyDNS" (or "MYDNS" in some places). If you have
-- a capture file with DNS packets in it, simply select one in the Packet List pane, right-click on it, and
-- select "Decode As ...", and then in the dialog box that shows up scroll down the list of protocols to one
-- called "MYDNS", select that and click the "ok" or "apply" button. Voila`, you're now decoding DNS packets
-- using the simplistic dissector in this script. Another way is to download the capture file made for
-- this script, and open that - since the DNS packets in it use UDP port 65333 (instead of the default 53),
-- and since the MyDNS protocol in this script has been set to automatically decode UDP port 65333, it will
-- automagically do it without doing "Decode As ...".
--
----------------------------------------
print("hello world!")
-- do not modify this table
local debug_level = {
DISABLED = 0,
LEVEL_1 = 1,
LEVEL_2 = 2
}
-- set this DEBUG to debug_level.LEVEL_1 to enable printing debug_level info
-- set it to debug_level.LEVEL_2 to enable really verbose printing
-- note: this will be overridden by user's preference settings
local DEBUG = debug_level.LEVEL_1
local default_settings = {
debug_level = DEBUG,
port = 4370,
heur_enabled = false,
}
-- for testing purposes, we want to be able to pass in changes to the defaults
-- from the command line; because you can't set lua preferences from the command
-- line using the '-o' switch (the preferences don't exist until this script is
-- loaded, so the command line thinks they're invalid preferences being set)
-- so we pass them in as command arguments insetad, and handle it here:
local args={...} -- get passed-in args
if args and #args > 0 then
for _, arg in ipairs(args) do
local name, value = arg:match("(.+)=(.+)")
if name and value then
if tonumber(value) then
value = tonumber(value)
elseif value == "true" or value == "TRUE" then
value = true
elseif value == "false" or value == "FALSE" then
value = false
elseif value == "DISABLED" then
value = debug_level.DISABLED
elseif value == "LEVEL_1" then
value = debug_level.LEVEL_1
elseif value == "LEVEL_2" then
value = debug_level.LEVEL_2
else
error("invalid commandline argument value")
end
else
error("invalid commandline argument syntax")
end
default_settings[name] = value
end
end
local dprint = function() end
local dprint2 = function() end
local function reset_debug_level()
if default_settings.debug_level > debug_level.DISABLED then
dprint = function(...)
print(table.concat({"Lua:", ...}," "))
end
if default_settings.debug_level > debug_level.LEVEL_1 then
dprint2 = dprint
end
end
end
-- call it now
reset_debug_level()
dprint2("Wireshark version = ", get_version())
dprint2("Lua version = ", _VERSION)
----------------------------------------
-- Unfortunately, the older Wireshark/Tshark versions have bugs, and part of the point
-- of this script is to test those bugs are now fixed. So we need to check the version
-- end error out if it's too old.
local major, minor, micro = get_version():match("(%d+)%.(%d+)%.(%d+)")
if major and tonumber(major) <= 1 and ((tonumber(minor) <= 10) or (tonumber(minor) == 11 and tonumber(micro) < 3)) then
error( "Sorry, but your Wireshark/Tshark version ("..get_version()..") is too old for this script!\n"..
"This script needs Wireshark/Tshark version 1.11.3 or higher.\n" )
end
-- more sanity checking
-- verify we have the ProtoExpert class in wireshark, as that's the newest thing this file uses
assert(ProtoExpert.new, "Wireshark does not have the ProtoExpert class, so it's too old - get the latest 1.11.3 or higher")
----------------------------------------
----------------------------------------
-- creates a Proto object, but doesn't register it yet
local zk = Proto("zk6","ZK600 UDP Protocol")
local zk_tcp = Proto("zk8","ZK800 TCP Protocol")
local rfct = {
[1] = "FCT_ATTLOG",
[8] = "FCT_WORKCODE",
[2] = "FCT_FINGERTMP",
[4] = "FCT_OPLOG",
[5] = "FCT_USER",
[6] = "FCT_SMS",
[7] = "FCT_UDATA"
}
local rcomands = {
[7] = "CMD_DB_RRQ",
[8] = "CMD_USER_WRQ",
[9] = "CMD_USERTEMP_RRQ",
[10] = "CMD_USERTEMP_WRQ",
[11] = "CMD_OPTIONS_RRQ",
[12] = "CMD_OPTIONS_WRQ",
[13] = "CMD_ATTLOG_RRQ",
[14] = "CMD_CLEAR_DATA",
[15] = "CMD_CLEAR_ATTLOG",
[18] = "CMD_DELETE_USER",
[19] = "CMD_DELETE_USERTEMP",
[20] = "CMD_CLEAR_ADMIN",
[21] = "CMD_USERGRP_RRQ",
[22] = "CMD_USERGRP_WRQ",
[23] = "CMD_USERTZ_RRQ",
[24] = "CMD_USERTZ_WRQ",
[25] = "CMD_GRPTZ_RRQ",
[26] = "CMD_GRPTZ_WRQ",
[27] = "CMD_TZ_RRQ",
[28] = "CMD_TZ_WRQ",
[29] = "CMD_ULG_RRQ",
[30] = "CMD_ULG_WRQ",
[31] = "CMD_UNLOCK",
[32] = "CMD_CLEAR_ACC",
[33] = "CMD_CLEAR_OPLOG",
[34] = "CMD_OPLOG_RRQ",
[50] = "CMD_GET_FREE_SIZES",
[57] = "CMD_ENABLE_CLOCK",
[60] = "CMD_STARTVERIFY",
[61] = "CMD_STARTENROLL",
[62] = "CMD_CANCELCAPTURE",
[64] = "CMD_STATE_RRQ",
[66] = "CMD_WRITE_LCD",
[67] = "CMD_CLEAR_LCD",
[69] = "CMD_GET_PINWIDTH",
[70] = "CMD_SMS_WRQ",
[71] = "CMD_SMS_RRQ",
[72] = "CMD_DELETE_SMS",
[73] = "CMD_UDATA_WRQ",
[74] = "CMD_DELETE_UDATA",
[75] = "CMD_DOORSTATE_RRQ",
[76] = "CMD_WRITE_MIFARE",
[78] = "CMD_EMPTY_MIFARE",
[88] = "_CMD_GET_USER_TEMPLATE",
[201] = "CMD_GET_TIME",
[202] = "CMD_SET_TIME",
[500] = "CMD_REG_EVENT",
[1000] = "CMD_CONNECT",
[1001] = "CMD_EXIT",
[1002] = "CMD_ENABLEDEVICE",
[1003] = "CMD_DISABLEDEVICE",
[1004] = "CMD_RESTART",
[1005] = "CMD_POWEROFF",
[1006] = "CMD_SLEEP",
[1007] = "CMD_RESUME",
[1009] = "CMD_CAPTUREFINGER",
[1011] = "CMD_TEST_TEMP",
[1012] = "CMD_CAPTUREIMAGE",
[1013] = "CMD_REFRESHDATA",
[1014] = "CMD_REFRESHOPTION",
[1017] = "CMD_TESTVOICE",
[1100] = "CMD_GET_VERSION",
[1101] = "CMD_CHANGE_SPEED",
[1102] = "CMD_AUTH",
[1500] = "CMD_PREPARE_DATA",
[1501] = "CMD_DATA",
[1502] = "CMD_FREE_DATA",
[1503] = "CMD_PREPARE_BUFFER",
[1504] = "CMD_READ_BUFFER",
[2000] = "CMD_ACK_OK",
[2001] = "CMD_ACK_ERROR",
[2002] = "CMD_ACK_DATA",
[2003] = "CMD_ACK_RETRY",
[2004] = "CMD_ACK_REPEAT",
[2005] = "CMD_ACK_UNAUTH",
[65535] = "CMD_ACK_UNKNOWN",
[65533] = "CMD_ACK_ERROR_CMD",
[65532] = "CMD_ACK_ERROR_INIT",
[65531] = "CMD_ACK_ERROR_DATA"
}
local rmachines = {
[20560] = "MACHINE_PREPARE_DATA_1",
[32130] = "MACHINE_PREPARE_DATA_2"
}
----------------------------------------
local pf_machine1 = ProtoField.new ("Machine Data 1", "zk8.machine1", ftypes.UINT16, rmachines, base.DEC)
local pf_machine2 = ProtoField.new ("Machine Data 2", "zk8.machine2", ftypes.UINT16, rmachines, base.DEC)
local pf_length = ProtoField.new ("Length", "zk8.length", ftypes.UINT32, nil, base.DEC)
local pf_command = ProtoField.new ("Command", "zk6.command", ftypes.UINT16, rcomands, base.DEC)
local pf_checksum = ProtoField.new ("CheckSum", "zk6.checksum", ftypes.UINT16, nil, base.HEX)
local pf_sesion_id = ProtoField.uint16("zk6.session_id", "ID session", base.HEX)
local pf_reply_id = ProtoField.uint16("zk6.reply_id", "ID Reply")
local pf_commkey = ProtoField.new ("Communication key", "zk6.commkey", ftypes.UINT32, nil, base.HEX)
local pf_data = ProtoField.new ("Data", "zk6.data", ftypes.BYTES, nil, base.DOT)
local pf_string = ProtoField.new ("Data", "zk6.string", ftypes.STRING)
local pf_time = ProtoField.new ("Time", "zk6.time", ftypes.UINT32, nil)
local pf_start = ProtoField.new ("Data offset", "zk6.start", ftypes.UINT32, nil)
local pf_size = ProtoField.new ("Data Size", "zk6.size", ftypes.UINT32, nil)
local pf_psize = ProtoField.new ("Packet Size", "zk6.psize", ftypes.UINT32, nil)
local pf_fsize0 = ProtoField.new ("null #1", "zk6.fsize0", ftypes.UINT32, nil)
local pf_fsize1 = ProtoField.new ("null #2", "zk6.fsize1", ftypes.UINT32, nil)
local pf_fsize2 = ProtoField.new ("null #3", "zk6.fsize2", ftypes.UINT32, nil)
local pf_fsize3 = ProtoField.new ("null #4", "zk6.fsize3", ftypes.UINT32, nil)
local pf_fsizeu = ProtoField.new ("users", "zk6.fsizeu", ftypes.UINT32, nil)
local pf_fsize4 = ProtoField.new ("null #5", "zk6.fsize4", ftypes.UINT32, nil)
local pf_fsizef = ProtoField.new ("fingers", "zk6.fsizef", ftypes.UINT32, nil)
local pf_fsize5 = ProtoField.new ("null #6", "zk6.fsize5", ftypes.UINT32, nil)
local pf_fsizer = ProtoField.new ("records", "zk6.fsizer", ftypes.UINT32, nil)
local pf_fsize6 = ProtoField.new ("null #7", "zk6.fsize6", ftypes.UINT32, nil)
local pf_fsize7 = ProtoField.new ("null 4096", "zk6.fsize7", ftypes.UINT32, nil)
local pf_fsize8 = ProtoField.new ("null #8", "zk6.fsize8", ftypes.UINT32, nil)
local pf_fsizec = ProtoField.new ("cards", "zk6.fsizec", ftypes.UINT32, nil)
local pf_fsize9 = ProtoField.new ("null #9", "zk6.fsize9", ftypes.UINT32, nil)
local pf_fsizefc = ProtoField.new ("finger capacity", "zk6.fsizefc", ftypes.UINT32, nil)
local pf_fsizeuc = ProtoField.new ("user capacity", "zk6.fsizeuc", ftypes.UINT32, nil)
local pf_fsizerc = ProtoField.new ("record capacity", "zk6.fsizerc", ftypes.UINT32, nil)
local pf_fsizefa = ProtoField.new ("finger available", "zk6.fsizefa", ftypes.UINT32, nil)
local pf_fsizeua = ProtoField.new ("user available", "zk6.fsizeua", ftypes.UINT32, nil)
local pf_fsizera = ProtoField.new ("record available", "zk6.fsizera", ftypes.UINT32, nil)
local pf_fsizeff = ProtoField.new ("face", "zk6.fsizerff", ftypes.UINT32, nil)
local pf_fsize10 = ProtoField.new ("nul #10", "zk6.fsize10", ftypes.UINT32, nil)
local pf_fsizeffc = ProtoField.new ("face capacity", "zk6.fsizeffc", ftypes.UINT32, nil)
local pf_pbfill = ProtoField.new ("null 01", "zk6.pbfill", ftypes.UINT8, nil)
local pf_pbcmd = ProtoField.new ("command", "zk6.pbcmd", ftypes.UINT16, rcomands)
local pf_pbarg = ProtoField.new ("argument", "zk6.pbarg", ftypes.UINT64, rfct)
local pf_pbfill0 = ProtoField.new ("null 0", "zk6.pbfill0", ftypes.UINT8, nil)
local pf_pbfree = ProtoField.new ("free space", "zk6.pbfree", ftypes.UINT32, nil)
local pf_uid = ProtoField.new ("User ID", "zk6.uid", ftypes.UINT16, nil)
----------------------------------------
-- this actually registers the ProtoFields above, into our new Protocol
-- in a real script I wouldn't do it this way; I'd build a table of fields programmatically
-- and then set dns.fields to it, so as to avoid forgetting a field
zk.fields = { pf_command, pf_checksum, pf_sesion_id, pf_reply_id, pf_commkey, pf_data, pf_string,
pf_time, pf_start, pf_size, pf_psize, pf_fsize0, pf_fsize1, pf_fsize2, pf_fsize3,
pf_fsizeu, pf_fsize4, pf_fsizef, pf_fsize5,pf_fsizer,pf_fsize6,pf_fsize7,
pf_fsize8,pf_fsizec,pf_fsize9,pf_fsizefc,pf_fsizeuc,pf_fsizerc, pf_uid,
pf_fsizefa,pf_fsizeua,pf_fsizera, pf_fsizeff, pf_fsize10, pf_fsizeffc,
pf_pbfill, pf_pbcmd, pf_pbarg, pf_pbfill0, pf_pbfree}
zk_tcp.fields = { pf_machine1, pf_machine2, pf_length }
----------------------------------------
-- we don't just want to display our protocol's fields, we want to access the value of some of them too!
-- There are several ways to do that. One is to just parse the buffer contents in Lua code to find
-- the values. But since ProtoFields actually do the parsing for us, and can be retrieved using Field
-- objects, it's kinda cool to do it that way. So let's create some Fields to extract the values.
-- The following creates the Field objects, but they're not 'registered' until after this script is loaded.
-- Also, these lines can't be before the 'dns.fields = ...' line above, because the Field.new() here is
-- referencing fields we're creating, and they're not "created" until that line above.
-- Furthermore, you cannot put these 'Field.new()' lines inside the dissector function.
-- Before Wireshark version 1.11, you couldn't even do this concept (of using fields you just created).
local machine1_field = Field.new("zk8.machine1")
local machine2_field = Field.new("zk8.machine2")
local length_field = Field.new("zk8.length")
local command_field = Field.new("zk6.command")
local checksum_field = Field.new("zk6.checksum")
local session_id_field = Field.new("zk6.session_id")
local reply_id_field = Field.new("zk6.reply_id")
local commkey_field = Field.new("zk6.commkey")
local data_field = Field.new("zk6.data")
local string_field = Field.new("zk6.string")
local time_field = Field.new("zk6.time")
local size_field = Field.new("zk6.size")
local start_field = Field.new("zk6.start")
local psize_field = Field.new("zk6.psize")
local fsize0_field = Field.new("zk6.fsize0")
local fsize1_field = Field.new("zk6.fsize1")
local fsize2_field = Field.new("zk6.fsize2")
local fsize3_field = Field.new("zk6.fsize3")
local fsize4_field = Field.new("zk6.fsize4")
local fsize5_field = Field.new("zk6.fsize5")
local fsize6_field = Field.new("zk6.fsize6")
local fsize7_field = Field.new("zk6.fsize7")
local fsize8_field = Field.new("zk6.fsize8")
local fsize9_field = Field.new("zk6.fsize9")
local fsizef_field = Field.new("zk6.fsizef")
local fsizeu_field = Field.new("zk6.fsizeu")
local fsizer_field = Field.new("zk6.fsizer")
local fsizec_field = Field.new("zk6.fsizec")
local pbfill_field = Field.new("zk6.pbfill")
local pbcmd_field = Field.new("zk6.pbcmd")
local pbarg_field = Field.new("zk6.pbarg")
local pbfill0_field = Field.new("zk6.pbfill0")
local pbfree_field = Field.new("zk6.pbfree")
local uid_field = Field.new("zk6.uid")
-- here's a little helper function to access the response_field value later.
-- Like any Field retrieval, you can't retrieve a field's value until its value has been
-- set, which won't happen until we actually use our ProtoFields in TreeItem:add() calls.
-- So this isResponse() function can't be used until after the pf_flag_response ProtoField
-- has been used inside the dissector.
-- Note that calling the Field object returns a FieldInfo object, and calling that
-- returns the value of the field - in this case a boolean true/false, since we set the
-- "mydns.flags.response" ProtoField to ftype.BOOLEAN way earlier when we created the
-- pf_flag_response ProtoField. Clear as mud?
--
-- A shorter version of this function would be:
-- local function isResponse() return response_field()() end
-- but I though the below is easier to understand.
local function isResponse()
local response_fieldinfo = response_field()
return response_fieldinfo()
end
--------------------------------------------------------------------------------
-- preferences handling stuff
--------------------------------------------------------------------------------
-- a "enum" table for our enum pref, as required by Pref.enum()
-- having the "index" number makes ZERO sense, and is completely illogical
-- but it's what the code has expected it to be for a long time. Ugh.
local debug_pref_enum = {
{ 1, "Disabled", debug_level.DISABLED },
{ 2, "Level 1", debug_level.LEVEL_1 },
{ 3, "Level 2", debug_level.LEVEL_2 },
}
zk.prefs.debug = Pref.enum("Debug", default_settings.debug_level,
"The debug printing level", debug_pref_enum)
zk.prefs.port = Pref.uint("Port number", default_settings.port,
"The UDP port number for MyDNS")
zk.prefs.heur = Pref.bool("Heuristic enabled", default_settings.heur_enabled,
"Whether heuristic dissection is enabled or not")
----------------------------------------
-- a function for handling prefs being changed
function zk.prefs_changed()
dprint2("prefs_changed called")
default_settings.debug_level = zk.prefs.debug
reset_debug_level()
default_settings.heur_enabled = zk.prefs.heur
if default_settings.port ~= zk.prefs.port then
-- remove old one, if not 0
if default_settings.port ~= 0 then
dprint2("removing ZK6 from port",default_settings.port)
DissectorTable.get("udp.port"):remove(default_settings.port, zk)
end
-- set our new default
default_settings.port = dns.prefs.port
-- add new one, if not 0
if default_settings.port ~= 0 then
dprint2("adding ZK6 to port",default_settings.port)
DissectorTable.get("udp.port"):add(default_settings.port, zk)
end
end
end
dprint2("ZK6 Prefs registered")
----------------------------------------
---- some constants for later use ----
-- the DNS header size
local ZK_HDR_LEN = 8
-- the smallest possible DNS query field size
-- has to be at least a label length octet, label character, label null terminator, 2-bytes type and 2-bytes class
local MIN_QUERY_LEN = 7
----------------------------------------
-- some forward "declarations" of helper functions we use in the dissector
-- I don't usually use this trick, but it'll help reading/grok'ing this script I think
-- if we don't focus on them.
local getQueryName
local prevCommand = 0
----------------------------------------
-- The following creates the callback function for the dissector.
-- It's the same as doing "dns.dissector = function (tvbuf,pkt,root)"
-- The 'tvbuf' is a Tvb object, 'pktinfo' is a Pinfo object, and 'root' is a TreeItem object.
-- Whenever Wireshark dissects a packet that our Proto is hooked into, it will call
-- this function and pass it these arguments for the packet it's dissecting.
function zk.dissector(tvbuf, pktinfo, root)
dprint2("zk.dissector called")
-- set the protocol column to show our protocol name
pktinfo.cols.protocol:set("ZK6")
-- We want to check that the packet size is rational during dissection, so let's get the length of the
-- packet buffer (Tvb).
-- Because DNS has no additional payload data other than itself, and it rides on UDP without padding,
-- we can use tvb:len() or tvb:reported_len() here; but I prefer tvb:reported_length_remaining() as it's safer.
local pktlen = tvbuf:reported_length_remaining()
-- We start by adding our protocol to the dissection display tree.
-- A call to tree:add() returns the child created, so we can add more "under" it using that return value.
-- The second argument is how much of the buffer/packet this added tree item covers/represents - in this
-- case (DNS protocol) that's the remainder of the packet.
local tree = root:add(zk, tvbuf:range(0,pktlen))
-- now let's check it's not too short
if pktlen < ZK_HDR_LEN then
-- since we're going to add this protocol to a specific UDP port, we're going to
-- assume packets in this port are our protocol, so the packet being too short is an error
-- the old way: tree:add_expert_info(PI_MALFORMED, PI_ERROR, "packet too short")
-- the correct way now:
tree:add_proto_expert_info(ef_too_short)
dprint("packet length",pktlen,"too short")
return
end
-- Now let's add our transaction id under our dns protocol tree we just created.
-- The transaction id starts at offset 0, for 2 bytes length.
tree:add_le(pf_command, tvbuf:range(0,2))
tree:add_le(pf_checksum, tvbuf:range(2,2))
tree:add_le(pf_sesion_id, tvbuf:range(4,2))
tree:add_le(pf_reply_id, tvbuf:range(6,2))
local command = tvbuf:range(0,2):le_uint()
if rcomands[command] ~= nil then
--pktinfo.cols.info:set(rcomands[command])
pktinfo.cols.info = string.sub(rcomands[command], 5)
else
--pktinfo.cols.info:set("CMD:" .. tostring(command))
pktinfo.cols.info = "CMD:" .. tostring(command)
end
if pktlen > ZK_HDR_LEN then
remain = pktlen - ZK_HDR_LEN -- TODO: no funciona el prevCommand,
if (command == 1102) then --CMD_AUTH
tree:add_le(pf_commkey, tvbuf:range(8,4))
elseif (command == 1500) then --CMD_PREPARE_DATA
tree:add_le(pf_size, tvbuf:range(8,4))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " - " .. tvbuf:range(8,4):le_uint() .. " Bytes"
if remain > 8 then
tree:add_le(pf_psize, tvbuf:range(12,4))
end
elseif (command == 12) or (command == 11) then --CMD_OPTIONS_RRQ CMD_OPTIONS_WRQ
tree:add(pf_string, tvbuf:range(8,remain))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " - " .. tvbuf:range(8,remain):string()
elseif (command == 18) then -- CMD_DELETE_USER
tree:add_le(pf_uid, tvbuf(8,2))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " UID: " .. tvbuf:range(8,2):le_uint()
elseif (command == 88) then -- CMD_get_user_Template
tree:add_le(pf_uid, tvbuf(8,2))
tree:add_le(pf_pbfill0, tvbuf(10,1))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " UID: " .. tvbuf:range(8,2):le_uint()
elseif (command == 1503) then -- CMD_PREPARE_BUFFER
tree:add(pf_pbfill, tvbuf:range(8,1))
tree:add_le(pf_pbcmd, tvbuf:range(9,2))
tree:add_le(pf_pbarg, tvbuf:range(11,8))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " - " .. rcomands[tvbuf:range(9,2):le_uint()]
elseif (command == 1504) then --CMD_READ_BUFFER
tree:add_le(pf_start, tvbuf:range(8,4))
tree:add_le(pf_size, tvbuf:range(12,4))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " [" .. tvbuf:range(8,4):le_uint() .. "] -> " .. tvbuf:range(12,4):le_uint()
elseif (command == 1501) then --CMD_DATA
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " " .. (remain) .. " Bytes"
tree:add(pf_string, tvbuf:range(8,remain))
elseif (prevCommand == 1503) then -- CMD_PREPARE_BUFFER OK!
tree:add_le(pf_pbfill0, tvbuf:range(8,1))
tree:add_le(pf_size, tvbuf:range(9,4))
tree:add_le(pf_psize, tvbuf:range(13,4))
tree:add_le(pf_pbfree, tvbuf:range(17,4))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " BUFFER [" .. tvbuf:range(9,4):le_uint() .. "] (" .. tvbuf:range(13,4):le_uint() .. ")"
elseif (prevCommand == 12) or (prevCommand == 11) or (prevCommand == 1100) then --CMD_OPTIONS_RRQ CMD_OPTIONS_WRQ OK
tree:add(pf_string, tvbuf:range(8,remain))
pktinfo.cols.info = tostring(pktinfo.cols.info) .. " RESP " .. tvbuf:range(8,remain):string()
elseif (prevCommand == 201) or (prevCommand == 202) then
local ts = tvbuf:range(8,4):le_uint()
tree:add_le(pf_time, tvbuf:range(8,4))
elseif (prevCommand == 50) then
tree:add_le(pf_fsize0, tvbuf:range(8,4))
tree:add_le(pf_fsize1, tvbuf:range(12,4))
tree:add_le(pf_fsize2, tvbuf:range(16,4))
tree:add_le(pf_fsize3, tvbuf:range(20,4))
tree:add_le(pf_fsizeu, tvbuf:range(24,4))
tree:add_le(pf_fsize4, tvbuf:range(28,4))
tree:add_le(pf_fsizef, tvbuf:range(32,4))
tree:add_le(pf_fsize5, tvbuf:range(36,4))
tree:add_le(pf_fsizer, tvbuf:range(40,4))
tree:add_le(pf_fsize6, tvbuf:range(44,4))
tree:add_le(pf_fsize7, tvbuf:range(48,4))
tree:add_le(pf_fsize8, tvbuf:range(52,4))
tree:add_le(pf_fsizec, tvbuf:range(56,4))
tree:add_le(pf_fsize9, tvbuf:range(60,4))
tree:add_le(pf_fsizefc, tvbuf:range(64,4))
tree:add_le(pf_fsizeuc, tvbuf:range(68,4))
tree:add_le(pf_fsizerc, tvbuf:range(72,4))
tree:add_le(pf_fsizefa, tvbuf:range(76,4))
tree:add_le(pf_fsizeua, tvbuf:range(80,4))
tree:add_le(pf_fsizera, tvbuf:range(84,4))
if remain > 80 then
tree:add_le(pf_fsizeff, tvbuf:range(88,4))
tree:add_le(pf_fsize10, tvbuf:range(92,4))
tree:add_le(pf_fsizeffc, tvbuf:range(96,4))
end
else
-- tree:add_le(pf_data, tvbuf:range(8,remain)) most time we need strings
tree:add(pf_string, tvbuf:range(8,remain))
end
end
dprint2("zk.dissector returning",pktlen)
prevCommand = command
-- tell wireshark how much of tvbuff we dissected
return pktlen
end
----------------------------------------
-- we want to have our protocol dissection invoked for a specific UDP port,
-- so get the udp dissector table and add our protocol to it
DissectorTable.get("udp.port"):add(default_settings.port, zk)
function zk_tcp.dissector(tvbuf, pktinfo, root)
dprint2("zk_tcp.dissector called")
local pktlen = tvbuf:reported_length_remaining()
-- We start by adding our protocol to the dissection display tree.
-- A call to tree:add() returns the child created, so we can add more "under" it using that return value.
-- The second argument is how much of the buffer/packet this added tree item covers/represents - in this
-- case (DNS protocol) that's the remainder of the packet.
local tree = root:add(zk_tcp, tvbuf:range(0,pktlen))
-- now let's check it's not too short
if pktlen < ZK_HDR_LEN then
-- since we're going to add this protocol to a specific UDP port, we're going to
-- assume packets in this port are our protocol, so the packet being too short is an error
-- the old way: tree:add_expert_info(PI_MALFORMED, PI_ERROR, "packet too short")
-- the correct way now:
tree:add_proto_expert_info(ef_too_short)
dprint("packet length",pktlen,"too short")
return
end
-- tell wireshark how much of tvbuff we dissected
dprint2("zk_tcp.dissector returning", pktlen)
local machine1 = tvbuf:range(0,2):le_uint()
local machine2 = tvbuf:range(2,2):le_uint()
if (machine1 == 20560) and (machine2 == 32130) then
local tcp_length = tvbuf:range(4,4):le_uint64()
tree:add_le(pf_machine1, tvbuf:range(0,2))
tree:add_le(pf_machine2, tvbuf:range(2,2))
tree:add_le(pf_length, tvbuf:range(4,4))
if pktlen > ZK_HDR_LEN then
remain = pktlen - ZK_HDR_LEN
-- zk_tree = tree:add(zk, tvbuf:range(8, remain))
zk.dissector(tvbuf:range(8,remain):tvb(), pktinfo, tree)
end
-- set the protocol column to show our protocol name
pktinfo.cols.protocol:set("ZK8")
else
pktinfo.cols.protocol:set("ZK8")
pktinfo.cols.info:set("--- data " .. pktlen .. " Bytes")
end
return pktlen
end
DissectorTable.get("tcp.port"):add(default_settings.port, zk_tcp)
-- We're done!
-- our protocol (Proto) gets automatically registered after this script finishes loading
----------------------------------------