pyztk/zk/base.py

1631 lines
59 KiB
Python
Raw Normal View History

2016-05-23 13:50:54 +07:00
# -*- coding: utf-8 -*-
import sys
2016-06-12 15:46:20 +07:00
from datetime import datetime
from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket, timeout
2016-05-23 13:50:54 +07:00
from struct import pack, unpack
2018-05-05 02:26:46 +07:00
import codecs
2016-05-23 13:50:54 +07:00
from . import const
from .attendance import Attendance
from .exception import ZKErrorConnection, ZKErrorResponse, ZKNetworkError
from .user import User
from .finger import Finger
2016-05-23 13:50:54 +07:00
2018-12-27 13:57:49 +07:00
2018-06-08 21:17:27 +07:00
def safe_cast(val, to_type, default=None):
#https://stackoverflow.com/questions/6330071/safe-casting-in-python
try:
return to_type(val)
except (ValueError, TypeError):
return default
def make_commkey(key, session_id, ticks=50):
2018-12-27 13:57:49 +07:00
"""
take a password and session_id and scramble them to send to the machine.
copied from commpro.c - MakeKey
"""
key = int(key)
session_id = int(session_id)
k = 0
for i in range(32):
if (key & (1 << i)):
k = (k << 1 | 1)
else:
k = k << 1
k += session_id
k = pack(b'I', k)
k = unpack(b'BBBB', k)
k = pack(
b'BBBB',
k[0] ^ ord('Z'),
k[1] ^ ord('K'),
k[2] ^ ord('S'),
k[3] ^ ord('O'))
k = unpack(b'HH', k)
k = pack(b'HH', k[1], k[0])
B = 0xff & ticks
k = unpack(b'BBBB', k)
k = pack(
b'BBBB',
k[0] ^ B,
k[1] ^ B,
B,
k[3] ^ B)
return k
2018-04-30 21:25:43 +07:00
2018-12-27 13:57:49 +07:00
class ZK_helper(object):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
ZK helper class
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
def __init__(self, ip, port=4370):
2018-12-29 08:48:32 +07:00
"""
Construct a new 'ZK_helper' object.
"""
self.address = (ip, port)
self.ip = ip
self.port = port
2018-04-30 21:25:43 +07:00
def test_ping(self):
"""
Returns True if host responds to a ping request
2018-12-29 08:48:32 +07:00
:return: bool
"""
import subprocess, platform
# Ping parameters as function of OS
2018-05-08 02:26:31 +07:00
ping_str = "-n 1" if platform.system().lower()=="windows" else "-c 1 -W 5"
args = "ping " + " " + ping_str + " " + self.ip
need_sh = False if platform.system().lower()=="windows" else True
# Ping
return subprocess.call(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=need_sh) == 0
2018-04-30 21:25:43 +07:00
def test_tcp(self):
2018-12-29 08:48:32 +07:00
"""
test TCP connection
"""
self.client = socket(AF_INET, SOCK_STREAM)
2018-12-27 13:57:49 +07:00
self.client.settimeout(10)
res = self.client.connect_ex(self.address)
self.client.close()
return res
2018-04-30 21:25:43 +07:00
2018-12-27 13:57:49 +07:00
def test_udp(self):
2018-12-29 08:48:32 +07:00
"""
test UDP connection
"""
self.client = socket(AF_INET, SOCK_DGRAM)
2018-12-27 13:57:49 +07:00
self.client.settimeout(10)
2018-04-30 21:25:43 +07:00
2016-05-23 13:50:54 +07:00
class ZK(object):
2018-12-27 13:57:49 +07:00
"""
ZK main class
"""
def __init__(self, ip, port=4370, timeout=60, password=0, force_udp=False, ommit_ping=False, verbose=False, encoding='UTF-8'):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
Construct a new 'ZK' object.
:param ip: machine's IP address
:param port: machine's port
:param timeout: timeout number
:param password: passint
:param force_udp: use UDP connection
:param omit_ping: check ip using ping before connect
:param verbose: showing log while run the commands
:param encoding: user encoding
2018-12-27 13:57:49 +07:00
"""
User.encoding = encoding
2016-05-23 13:50:54 +07:00
self.__address = (ip, port)
self.__sock = socket(AF_INET, SOCK_DGRAM)
self.__sock.settimeout(timeout)
self.__timeout = timeout
self.__password = password # passint
2018-12-27 13:57:49 +07:00
self.__session_id = 0
self.__reply_id = const.USHRT_MAX - 1
self.__data_recv = None
self.__data = None
self.is_connect = False
self.is_enabled = True
self.helper = ZK_helper(ip, port)
self.force_udp = force_udp
self.ommit_ping = ommit_ping
self.verbose = verbose
self.encoding = encoding
2018-12-27 13:57:49 +07:00
self.tcp = not force_udp
self.users = 0
self.fingers = 0
self.records = 0
self.dummy = 0
self.cards = 0
self.fingers_cap = 0
self.users_cap = 0
self.rec_cap = 0
self.faces = 0
self.faces_cap = 0
self.fingers_av = 0
self.users_av = 0
self.rec_av = 0
2018-05-12 08:12:22 +07:00
self.next_uid = 1
self.next_user_id='1'
self.user_packet_size = 28 # default zk6
2018-05-07 19:52:22 +07:00
self.end_live_capture = False
2018-04-30 21:25:43 +07:00
2018-07-03 03:25:51 +07:00
def __nonzero__(self):
2018-12-27 13:57:49 +07:00
"""
for boolean test
"""
2018-07-03 03:25:51 +07:00
return self.is_connect
def __create_socket(self):
if self.tcp:
self.__sock = socket(AF_INET, SOCK_STREAM)
2018-08-21 20:00:46 +07:00
self.__sock.settimeout(self.__timeout)
self.__sock.connect_ex(self.__address)
else:
self.__sock = socket(AF_INET, SOCK_DGRAM)
2018-08-21 20:00:46 +07:00
self.__sock.settimeout(self.__timeout)
2018-04-30 21:25:43 +07:00
def __create_tcp_top(self, packet):
2018-12-27 13:57:49 +07:00
"""
witch the complete packet set top header
"""
length = len(packet)
top = pack('<HHI', const.MACHINE_PREPARE_DATA_1, const.MACHINE_PREPARE_DATA_2, length)
return top + packet
2018-04-30 21:25:43 +07:00
2018-04-12 06:38:37 +07:00
def __create_header(self, command, command_string, session_id, reply_id):
2018-12-27 13:57:49 +07:00
"""
2016-05-23 13:50:54 +07:00
Puts a the parts that make up a packet together and packs them into a byte string
2018-12-27 13:57:49 +07:00
"""
2018-06-13 21:14:21 +07:00
buf = pack('<4H', command, 0, session_id, reply_id) + command_string
2016-06-24 19:00:55 +07:00
buf = unpack('8B' + '%sB' % len(command_string), buf)
2016-05-23 23:59:43 +07:00
checksum = unpack('H', self.__create_checksum(buf))[0]
2016-05-23 13:50:54 +07:00
reply_id += 1
if reply_id >= const.USHRT_MAX:
reply_id -= const.USHRT_MAX
2018-06-13 21:14:21 +07:00
buf = pack('<4H', command, checksum, session_id, reply_id)
2016-05-23 13:50:54 +07:00
return buf + command_string
def __create_checksum(self, p):
2018-12-27 13:57:49 +07:00
"""
2016-05-23 23:59:43 +07:00
Calculates the checksum of the packet to be sent to the time clock
2016-05-23 13:50:54 +07:00
Copied from zkemsdk.c
2018-12-27 13:57:49 +07:00
"""
2016-05-23 13:50:54 +07:00
l = len(p)
2016-05-23 23:59:43 +07:00
checksum = 0
2016-05-23 13:50:54 +07:00
while l > 1:
2016-05-23 23:59:43 +07:00
checksum += unpack('H', pack('BB', p[0], p[1]))[0]
2016-05-23 13:50:54 +07:00
p = p[2:]
2016-05-23 23:59:43 +07:00
if checksum > const.USHRT_MAX:
checksum -= const.USHRT_MAX
2016-05-23 13:50:54 +07:00
l -= 2
if l:
2016-05-23 23:59:43 +07:00
checksum = checksum + p[-1]
2016-06-15 19:15:12 +07:00
2016-05-23 23:59:43 +07:00
while checksum > const.USHRT_MAX:
checksum -= const.USHRT_MAX
2016-06-15 19:15:12 +07:00
2016-05-23 23:59:43 +07:00
checksum = ~checksum
2016-06-15 19:15:12 +07:00
2016-05-23 23:59:43 +07:00
while checksum < 0:
checksum += const.USHRT_MAX
2016-05-23 13:50:54 +07:00
2016-05-23 23:59:43 +07:00
return pack('H', checksum)
2018-05-05 02:26:46 +07:00
def __test_tcp_top(self, packet):
2018-12-27 13:57:49 +07:00
"""
return size!
"""
if len(packet)<=8:
2018-12-27 13:57:49 +07:00
return 0
2018-05-05 02:26:46 +07:00
tcp_header = unpack('<HHI', packet[:8])
if tcp_header[0] == const.MACHINE_PREPARE_DATA_1 and tcp_header[1] == const.MACHINE_PREPARE_DATA_2:
return tcp_header[2]
2018-12-27 13:57:49 +07:00
return 0
2018-05-05 02:26:46 +07:00
def __send_command(self, command, command_string=b'', response_size=8):
2018-12-27 13:57:49 +07:00
"""
2016-06-15 19:15:12 +07:00
send command to the terminal
2018-12-27 13:57:49 +07:00
"""
2018-12-27 15:15:37 +07:00
if command not in [const.CMD_CONNECT, const.CMD_AUTH] and not self.is_connect:
2018-12-27 13:57:49 +07:00
raise ZKErrorConnection("instance are not connected.")
2018-04-12 06:38:37 +07:00
buf = self.__create_header(command, command_string, self.__session_id, self.__reply_id)
2016-06-10 15:35:55 +07:00
try:
if self.tcp:
top = self.__create_tcp_top(buf)
self.__sock.send(top)
self.__tcp_data_recv = self.__sock.recv(response_size + 8)
2018-05-05 02:26:46 +07:00
self.__tcp_length = self.__test_tcp_top(self.__tcp_data_recv)
if self.__tcp_length == 0:
raise ZKNetworkError("TCP packet invalid")
2018-06-13 21:14:21 +07:00
self.__header = unpack('<4H', self.__tcp_data_recv[8:16])
2018-12-27 13:57:49 +07:00
self.__data_recv = self.__tcp_data_recv[8:]
else:
self.__sock.sendto(buf, self.__address)
self.__data_recv = self.__sock.recv(response_size)
2018-06-13 21:14:21 +07:00
self.__header = unpack('<4H', self.__data_recv[:8])
except Exception as e:
2016-06-10 15:35:55 +07:00
raise ZKNetworkError(str(e))
self.__response = self.__header[0]
self.__reply_id = self.__header[3]
2018-12-27 13:57:49 +07:00
self.__data = self.__data_recv[8:]
2018-04-12 06:38:37 +07:00
if self.__response in [const.CMD_ACK_OK, const.CMD_PREPARE_DATA, const.CMD_DATA]:
2016-05-29 20:27:01 +07:00
return {
'status': True,
'code': self.__response
}
2018-04-12 06:38:37 +07:00
return {
'status': False,
'code': self.__response
}
2018-04-30 21:25:43 +07:00
2018-04-12 06:38:37 +07:00
def __ack_ok(self):
2018-12-27 13:57:49 +07:00
"""
event ack ok
"""
buf = self.__create_header(const.CMD_ACK_OK, b'', self.__session_id, const.USHRT_MAX - 1)
2018-04-12 06:38:37 +07:00
try:
if self.tcp:
top = self.__create_tcp_top(buf)
self.__sock.send(top)
else:
self.__sock.sendto(buf, self.__address)
except Exception as e:
2018-04-12 06:38:37 +07:00
raise ZKNetworkError(str(e))
2016-05-27 10:01:00 +07:00
def __get_data_size(self):
2018-12-27 13:57:49 +07:00
"""
Checks a returned packet to see if it returned CMD_PREPARE_DATA,
2016-05-27 10:01:00 +07:00
indicating that data packets are to be sent
2018-12-27 13:57:49 +07:00
Returns the amount of bytes that are going to be sent
"""
2016-05-27 10:01:00 +07:00
response = self.__response
if response == const.CMD_PREPARE_DATA:
size = unpack('I', self.__data[:4])[0]
2016-05-27 10:01:00 +07:00
return size
else:
return 0
2016-06-12 15:46:20 +07:00
def __reverse_hex(self, hex):
data = ''
for i in reversed(range(len(hex) / 2)):
2016-06-24 19:00:55 +07:00
data += hex[i * 2:(i * 2) + 2]
2016-06-12 15:46:20 +07:00
return data
def __decode_time(self, t):
"""
2018-12-27 13:57:49 +07:00
Decode a timestamp retrieved from the timeclock
copied from zkemsdk.c - DecodeTime
"""
2018-12-27 13:57:49 +07:00
t = unpack("<I", t)[0]
2016-06-12 15:46:20 +07:00
second = t % 60
t = t // 60
2016-06-12 15:46:20 +07:00
minute = t % 60
t = t // 60
2016-06-12 15:46:20 +07:00
hour = t % 24
t = t // 24
2016-06-12 15:46:20 +07:00
2016-06-24 19:00:55 +07:00
day = t % 31 + 1
t = t // 31
2016-06-12 15:46:20 +07:00
2016-06-24 19:00:55 +07:00
month = t % 12 + 1
t = t // 12
2016-06-12 15:46:20 +07:00
year = t + 2000
d = datetime(year, month, day, hour, minute, second)
return d
2018-12-27 13:57:49 +07:00
def __decode_timehex(self, timehex):
2018-12-27 13:57:49 +07:00
"""
timehex string of six bytes
"""
2018-06-13 21:14:21 +07:00
year, month, day, hour, minute, second = unpack("6B", timehex)
year += 2000
d = datetime(year, month, day, hour, minute, second)
return d
2018-12-27 13:57:49 +07:00
2017-12-07 06:28:41 +07:00
def __encode_time(self, t):
2018-12-27 13:57:49 +07:00
"""
Encode a timestamp so that it can be read on the timeclock
2017-12-07 06:28:41 +07:00
"""
# formula taken from zkemsdk.c - EncodeTime
# can also be found in the technical manual
d = (
((t.year % 100) * 12 * 31 + ((t.month - 1) * 31) + t.day - 1) *
(24 * 60 * 60) + (t.hour * 60 + t.minute) * 60 + t.second
)
return d
2017-12-08 07:02:18 +07:00
2016-05-23 13:50:54 +07:00
def connect(self):
2018-12-27 13:57:49 +07:00
"""
2016-06-15 19:15:12 +07:00
connect to the device
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
self.end_live_capture = False
if not self.ommit_ping and not self.helper.test_ping():
raise ZKNetworkError("can't reach device (ping %s)" % self.__address[0])
2018-12-27 13:57:49 +07:00
if not self.force_udp and self.helper.test_tcp() == 0:
self.user_packet_size = 72 # default zk8
2018-12-27 13:57:49 +07:00
self.__create_socket()
2018-04-12 06:38:37 +07:00
self.__session_id = 0
self.__reply_id = const.USHRT_MAX - 1
cmd_response = self.__send_command(const.CMD_CONNECT)
self.__session_id = self.__header[2]
if cmd_response.get('code') == const.CMD_ACK_UNAUTH:
if self.verbose: print ("try auth")
2018-04-12 06:38:37 +07:00
command_string = make_commkey(self.__password, self.__session_id)
cmd_response = self.__send_command(const.CMD_AUTH, command_string)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
2016-05-26 12:45:28 +07:00
self.is_connect = True
2016-06-24 19:00:55 +07:00
return self
2016-05-23 23:59:43 +07:00
else:
if cmd_response["code"] == const.CMD_ACK_UNAUTH:
raise ZKErrorResponse("Unauthenticated")
if self.verbose: print ("connect err response {} ".format(cmd_response["code"]))
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("Invalid response: Can't connect")
2016-05-23 13:50:54 +07:00
def disconnect(self):
2018-12-27 13:57:49 +07:00
"""
2016-06-15 19:15:12 +07:00
diconnect from the connected device
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(const.CMD_EXIT)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
2018-12-27 13:57:49 +07:00
self.is_connect = False
if self.__sock:
2018-12-27 13:57:49 +07:00
self.__sock.close()
2016-05-29 20:27:01 +07:00
return True
2016-05-23 23:59:43 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't disconnect")
2016-05-23 13:55:09 +07:00
2016-05-29 20:27:01 +07:00
def enable_device(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-27 22:07:55 +07:00
re-enable the connected device and allow user activity in device again
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(const.CMD_ENABLEDEVICE)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
self.is_enabled = True
2016-05-29 20:27:01 +07:00
return True
2016-05-23 23:59:43 +07:00
else:
raise ZKErrorResponse("Can't enable device")
2016-05-23 13:55:09 +07:00
2018-12-27 13:57:49 +07:00
def disable_device(self):
"""
2018-12-27 22:07:55 +07:00
disable (lock) device, to ensure no user activity in device while some process run
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
cmd_response = self.__send_command(const.CMD_DISABLEDEVICE)
if cmd_response.get('status'):
self.is_enabled = False
return True
else:
raise ZKErrorResponse("Can't disable device")
2016-05-29 20:27:01 +07:00
def get_firmware_version(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the firmware version
2018-12-27 13:57:49 +07:00
"""
cmd_response = self.__send_command(const.CMD_GET_VERSION,b'', 1024)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
firmware_version = self.__data.split(b'\x00')[0]
return firmware_version.decode()
2016-05-23 23:59:43 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("Can't read frimware version")
2016-05-23 21:00:43 +07:00
2016-06-09 11:20:39 +07:00
def get_serialnumber(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the serial number
2018-12-27 13:57:49 +07:00
"""
2016-06-10 11:27:49 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'~SerialNumber\x00'
2016-06-09 11:20:39 +07:00
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-06-09 11:20:39 +07:00
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
serialnumber = self.__data.split(b'=', 1)[-1].split(b'\x00')[0]
serialnumber = serialnumber.replace(b'=', b'')
return serialnumber.decode() # string?
2016-06-09 11:20:39 +07:00
else:
raise ZKErrorResponse("Can't read serial number")
2016-06-09 11:20:39 +07:00
2018-03-17 07:00:24 +07:00
def get_platform(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the platform name
2018-12-27 13:57:49 +07:00
"""
2018-03-17 07:00:24 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'~Platform\x00'
2018-03-17 07:00:24 +07:00
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-17 07:00:24 +07:00
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
platform = self.__data.split(b'=', 1)[-1].split(b'\x00')[0]
platform = platform.replace(b'=', b'')
return platform.decode()
2018-03-17 07:00:24 +07:00
else:
2018-12-27 13:57:49 +07:00
raise ZKErrorResponse("Can't read platform name")
2018-03-17 07:00:24 +07:00
2018-04-20 07:23:23 +07:00
def get_mac(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the machine's mac address
2018-12-27 13:57:49 +07:00
"""
2018-04-20 07:23:23 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'MAC\x00'
2018-04-20 07:23:23 +07:00
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
mac = self.__data.split(b'=', 1)[-1].split(b'\x00')[0]
return mac.decode()
2018-04-20 07:23:23 +07:00
else:
2018-12-27 13:57:49 +07:00
raise ZKErrorResponse("can't read mac address")
2018-04-20 07:23:23 +07:00
2018-03-17 07:00:24 +07:00
def get_device_name(self):
2018-12-27 13:57:49 +07:00
"""
return the device name
2018-12-29 08:48:32 +07:00
:return: str
2018-12-27 13:57:49 +07:00
"""
2018-03-17 07:00:24 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'~DeviceName\x00'
2018-03-17 07:00:24 +07:00
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-17 07:00:24 +07:00
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
device = self.__data.split(b'=', 1)[-1].split(b'\x00')[0]
return device.decode()
2018-03-17 07:00:24 +07:00
else:
2018-12-27 13:57:49 +07:00
return ""
2018-03-17 07:00:24 +07:00
2018-04-27 02:35:54 +07:00
def get_face_version(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the face version
2018-12-27 13:57:49 +07:00
"""
2018-04-27 02:35:54 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'ZKFaceVersion\x00'
2018-04-27 02:35:54 +07:00
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
response = self.__data.split(b'=', 1)[-1].split(b'\x00')[0]
2018-06-08 21:17:27 +07:00
return safe_cast(response, int, 0) if response else 0
2018-04-27 02:35:54 +07:00
else:
return None
def get_fp_version(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the fingerprint version
2018-12-27 13:57:49 +07:00
"""
2018-04-27 02:35:54 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'~ZKFPVersion\x00'
2018-04-27 02:35:54 +07:00
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
response = self.__data.split(b'=', 1)[-1].split(b'\x00')[0]
response = response.replace(b'=', b'')
2018-06-08 21:17:27 +07:00
return safe_cast(response, int, 0) if response else 0
2018-04-27 02:35:54 +07:00
else:
2018-12-27 13:57:49 +07:00
raise ZKErrorResponse("can't read fingerprint version")
2018-09-13 23:22:47 +07:00
def _clear_error(self, command_string=b''):
2018-12-27 13:57:49 +07:00
"""
clear ACK_error
"""
2018-09-13 23:22:47 +07:00
cmd_response = self.__send_command(const.CMD_ACK_ERROR, command_string, 1024)
cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024)
cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024)
cmd_response = self.__send_command(const.CMD_ACK_UNKNOWN, command_string, 1024)
def get_extend_fmt(self):
2018-12-27 13:57:49 +07:00
"""
determine extend fmt
2018-12-27 13:57:49 +07:00
"""
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'~ExtendFmt\x00'
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
fmt = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
2018-06-08 21:17:27 +07:00
return safe_cast(fmt, int, 0) if fmt else 0
else:
2018-09-13 23:22:47 +07:00
self._clear_error(command_string)
2018-09-13 22:46:01 +07:00
return None
2018-04-27 02:35:54 +07:00
def get_user_extend_fmt(self):
2018-12-27 13:57:49 +07:00
"""
2018-05-04 02:58:52 +07:00
determine user extend fmt
2018-12-27 13:57:49 +07:00
"""
2018-04-27 02:35:54 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'~UserExtFmt\x00'
2018-04-27 02:35:54 +07:00
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
fmt = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
2018-06-08 21:17:27 +07:00
return safe_cast(fmt, int, 0) if fmt else 0
2018-04-27 02:35:54 +07:00
else:
2018-09-13 23:22:47 +07:00
self._clear_error(command_string)
2018-04-27 02:35:54 +07:00
return None
def get_face_fun_on(self):
2018-12-27 13:57:49 +07:00
"""
2018-04-27 02:35:54 +07:00
determine extend fmt
2018-12-27 13:57:49 +07:00
"""
2018-04-27 02:35:54 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'FaceFunOn\x00'
2018-04-27 02:35:54 +07:00
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
response = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
2018-09-13 23:07:32 +07:00
return safe_cast(response, int ,0) if response else 0
2018-04-27 02:35:54 +07:00
else:
2018-09-13 23:22:47 +07:00
self._clear_error(command_string)
2018-04-27 02:35:54 +07:00
return None
def get_compat_old_firmware(self):
2018-12-27 13:57:49 +07:00
"""
2018-05-04 02:58:52 +07:00
determine old firmware
2018-12-27 13:57:49 +07:00
"""
2018-04-27 02:35:54 +07:00
command = const.CMD_OPTIONS_RRQ
2018-09-13 23:38:52 +07:00
command_string = b'CompatOldFirmware\x00'
2018-04-27 02:35:54 +07:00
response_size = 1024
cmd_response = self.__send_command(command, command_string, response_size)
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
response = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
2018-06-08 21:17:27 +07:00
return safe_cast(response, int, 0) if response else 0
2018-04-27 02:35:54 +07:00
else:
2018-09-13 23:22:47 +07:00
self._clear_error(command_string)
2018-04-27 02:35:54 +07:00
return None
2018-04-27 02:35:54 +07:00
def get_network_params(self):
2018-12-29 08:48:32 +07:00
"""
get network params
"""
2018-04-27 02:35:54 +07:00
ip = self.__address[0]
mask = b''
gate = b''
2018-09-13 23:38:52 +07:00
cmd_response = self.__send_command(const.CMD_OPTIONS_RRQ, b'IPAddress\x00', 1024)
2018-04-27 02:35:54 +07:00
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
ip = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
2018-09-13 23:38:52 +07:00
cmd_response = self.__send_command(const.CMD_OPTIONS_RRQ, b'NetMask\x00', 1024)
2018-04-27 02:35:54 +07:00
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
mask = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
2018-09-13 23:38:52 +07:00
cmd_response = self.__send_command(const.CMD_OPTIONS_RRQ, b'GATEIPAddress\x00', 1024)
2018-04-27 02:35:54 +07:00
if cmd_response.get('status'):
2018-06-07 05:12:22 +07:00
gate = (self.__data.split(b'=', 1)[-1].split(b'\x00')[0])
return {'ip': ip.decode(), 'mask': mask.decode(), 'gateway': gate.decode()}
2018-03-17 07:00:24 +07:00
def get_pin_width(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the PIN width
2018-12-27 13:57:49 +07:00
"""
2018-03-17 07:00:24 +07:00
command = const.CMD_GET_PINWIDTH
command_string = b' P'
2018-04-12 06:38:37 +07:00
response_size = 9
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-17 07:00:24 +07:00
if cmd_response.get('status'):
width = self.__data.split(b'\x00')[0]
return bytearray(width)[0]
2018-03-17 07:00:24 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can0t get pin width")
2018-03-17 07:00:24 +07:00
def free_data(self):
2018-12-27 13:57:49 +07:00
"""
clear buffer
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
command = const.CMD_FREE_DATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't free data")
def read_sizes(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
read the memory ussage
2018-12-27 13:57:49 +07:00
"""
command = const.CMD_GET_FREE_SIZES
response_size = 1024
cmd_response = self.__send_command(command,b'', response_size)
if cmd_response.get('status'):
if self.verbose: print(codecs.encode(self.__data,'hex'))
2018-06-13 21:14:21 +07:00
size = len(self.__data)
if len(self.__data) >= 80:
fields = unpack('20i', self.__data[:80])
self.users = fields[4]
self.fingers = fields[6]
self.records = fields[8]
self.dummy = fields[10] #???
self.cards = fields[12]
self.fingers_cap = fields[14]
self.users_cap = fields[15]
self.rec_cap = fields[16]
self.fingers_av = fields[17]
self.users_av = fields[18]
self.rec_av = fields[19]
self.__data = self.__data[80:]
if len(self.__data) >= 12: #face info
fields = unpack('3i', self.__data[:12]) #dirty hack! we need more information
self.faces = fields[0]
self.faces_cap = fields[2]
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't read sizes")
def unlock(self, time=3):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
unlock the door\n
thanks to https://github.com/SoftwareHouseMerida/pyzk/
2018-12-29 08:48:32 +07:00
:param time: define delay in seconds
:return: bool
2018-12-27 13:57:49 +07:00
"""
command = const.CMD_UNLOCK
command_string = pack("I",int(time)*10)
cmd_response = self.__send_command(command, command_string)
if cmd_response.get('status'):
return True
else:
raise ZKErrorResponse("Can't open door")
def __str__(self):
2018-12-27 13:57:49 +07:00
"""
for debug
"""
return "ZK %s://%s:%s users[%i]:%i/%i fingers:%i/%i, records:%i/%i faces:%i/%i" % (
"tcp" if self.tcp else "udp", self.__address[0], self.__address[1],
self.user_packet_size, self.users, self.users_cap,
self.fingers, self.fingers_cap,
self.records, self.rec_cap,
self.faces, self.faces_cap
)
2018-03-17 07:00:24 +07:00
2016-05-29 20:27:01 +07:00
def restart(self):
2018-12-27 13:57:49 +07:00
"""
2016-06-15 19:15:12 +07:00
restart the device
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-05-29 20:27:01 +07:00
command = const.CMD_RESTART
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2016-05-24 21:49:04 +07:00
if cmd_response.get('status'):
2018-12-27 17:52:11 +07:00
self.is_connect = False
self.next_uid = 1
2016-05-29 20:27:01 +07:00
return True
2016-05-24 21:49:04 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't restart device")
2016-05-24 21:49:04 +07:00
2017-12-07 06:28:41 +07:00
def get_time(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: the machine's time
2018-12-27 13:57:49 +07:00
"""
2017-12-07 06:28:41 +07:00
command = const.CMD_GET_TIME
response_size = 1032
cmd_response = self.__send_command(command, b'', response_size)
2017-12-07 06:28:41 +07:00
if cmd_response.get('status'):
return self.__decode_time(self.__data[:4])
2017-12-07 06:28:41 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't get time")
2017-12-07 06:28:41 +07:00
def set_time(self, timestamp):
2018-12-27 13:57:49 +07:00
"""
set Device time (pass datetime object)
2018-12-29 08:48:32 +07:00
:param timestamp: python datetime object
2018-12-27 13:57:49 +07:00
"""
2017-12-07 06:28:41 +07:00
command = const.CMD_SET_TIME
command_string = pack(b'I', self.__encode_time(timestamp))
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2017-12-07 06:28:41 +07:00
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't set time")
2016-06-10 14:16:22 +07:00
def poweroff(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
shutdown the machine
2018-12-27 13:57:49 +07:00
"""
2016-05-29 20:27:01 +07:00
command = const.CMD_POWEROFF
command_string = b''
2017-12-07 06:28:41 +07:00
response_size = 1032
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-05-24 21:49:04 +07:00
if cmd_response.get('status'):
2018-12-27 17:52:11 +07:00
self.is_connect = False
self.next_uid = 1
2016-05-29 20:27:01 +07:00
return True
2016-05-24 21:49:04 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't poweroff")
2016-05-24 21:49:04 +07:00
2018-04-07 07:01:35 +07:00
def refresh_data(self):
command = const.CMD_REFRESHDATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2018-04-07 07:01:35 +07:00
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't refresh data")
2018-04-07 07:01:35 +07:00
2018-04-13 06:10:01 +07:00
def test_voice(self, index=0):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
play test voice:\n
0 Thank You\n
1 Incorrect Password\n
2 Access Denied\n
3 Invalid ID\n
4 Please try again\n
5 Re-enter ID\n
6 The clock is full\n
7 The clock is full\n
8 Duplicate finger\n
9 Accepted. Thank you\n
10 beep kuko\n
11 beep siren\n
12 -\n
13 beep bell\n
\t HELP! TRANSLATE TO ENGLISH THE FOLLOWING ITEMS\n
14 excedido tiempo p esta operacion\n
15 coloque su dedo de nuevo\n
16 coloque su dedo por ultima vez\n
17 ATN numero de tarjeta está repetida\n
18 proceso de registro correcto\n
19 borrado correcto\n
20 Numero de usuario / ponga la caja de ojos\n
21 ATN se ha llegado al max num usuarios\n
22 verificacion de usuarios\n
23 usuario no registrado\n
24 ATN se ha llegado al num max de registros\n
25 ATN la puerta no esta cerrada\n
26 registro de usuarios\n
27 borrado de usuarios\n
28 coloque su dedo\n
29 registre la tarjeta de administrador\n
30 0\n
31 1\n
32 2\n
33 3\n
34 4\n
35 5\n
36 6\n
37 7\n
38 8\n
39 9\n
40 PFV seleccione numero de usuario\n
41 registrar\n
42 operacion correcta\n
43 PFV acerque su tarjeta\n
43 la tarjeta ha sido registrada\n
45 error en operacion\n
46 PFV acerque tarjeta de administracion, p confirmacion\n
47 descarga de fichajes\n
48 descarga de usuarios\n
49 carga de usuarios\n
50 actualizan de firmware\n
51 ejeuctar ficheros de configuracion\n
52 confirmación de clave de acceso correcta\n
53 error en operacion de tclado\n
54 borrar todos los usuarios\n
55 restaurar terminal con configuracion por defecto\n
56 introduzca numero de usuario\n
57 teclado bloqueado\n
58 error en la gestión de la tarjeta\n
59 establezca una clave de acceso\n
60 pulse el teclado\n
61 zona de accceso invalida\n
62 acceso combinado invĺlido\n
63 verificación multiusuario\n
64 modo de verificación inválido\n
65 -\n
:param index: int sound index
:return: bool
2018-12-27 13:57:49 +07:00
"""
2018-04-12 06:38:37 +07:00
command = const.CMD_TESTVOICE
2018-04-13 06:10:01 +07:00
command_string = pack("I", index)
cmd_response = self.__send_command(command, command_string)
2016-05-24 21:52:42 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-24 21:52:42 +07:00
else:
2018-12-27 13:57:49 +07:00
return False
2016-05-24 21:52:42 +07:00
def set_user(self, uid=None, name='', privilege=0, password='', group_id='', user_id='', card=0):
2018-12-27 13:57:49 +07:00
"""
2016-05-27 08:54:17 +07:00
create or update user by uid
2018-12-29 08:48:32 +07:00
:param name: name ot the user
:param privilege: check the const.py for reference
:param password: int password
:param group_id: group ID
:param user_id: your own user ID
:param card: card
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-05-25 01:32:06 +07:00
command = const.CMD_USER_WRQ
if uid is None:
2018-12-27 13:57:49 +07:00
uid = self.next_uid
if not user_id:
2018-12-27 13:57:49 +07:00
user_id = self.next_user_id
if not user_id:
user_id = str(uid) #ZK6 needs uid2 == uid
#TODO: check what happens if name is missing...
2016-05-25 12:02:56 +07:00
if privilege not in [const.USER_DEFAULT, const.USER_ADMIN]:
privilege = const.USER_DEFAULT
privilege = int(privilege)
if self.user_packet_size == 28: #self.firmware == 6:
if not group_id:
group_id = 0
try:
command_string = pack('HB5s8sIxBHI', uid, privilege, password.encode(self.encoding, errors='ignore'), name.encode(self.encoding, errors='ignore'), card, int(group_id), 0, int(user_id))
except Exception as e:
if self.verbose: print("s_h Error pack: %s" % e)
if self.verbose: print("Error pack: %s" % sys.exc_info()[0])
raise ZKErrorResponse("Can't pack user")
else:
name_pad = name.encode(self.encoding, errors='ignore').ljust(24, b'\x00')[:24]
2018-04-19 06:52:38 +07:00
card_str = pack('i', int(card))[:4]
command_string = pack('HB8s24s4sx7sx24s', uid, privilege, password.encode(self.encoding, errors='ignore'), name_pad, card_str, group_id.encode(), user_id.encode())
response_size = 1024 #TODO check response?
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
if not cmd_response.get('status'):
raise ZKErrorResponse("Can't set user")
self.refresh_data()
2018-05-24 19:48:50 +07:00
if self.next_uid == uid:
self.next_uid += 1 # better recalculate again
if self.next_user_id == user_id:
self.next_user_id = str(self.next_uid)
2018-04-07 07:01:35 +07:00
def save_user_template(self, user, fingers=[]):
2018-12-29 08:48:32 +07:00
"""
save user and template
:param user: user
:param fingers: list of finger. (The maximum index 0-9)
"""
2018-04-30 21:25:43 +07:00
if not isinstance(user, User):
users = self.get_users()
2018-05-05 06:18:23 +07:00
tusers = list(filter(lambda x: x.uid==user, users))
if len(tusers) == 1:
user = tusers[0]
2018-04-30 21:25:43 +07:00
else:
2018-05-05 06:18:23 +07:00
tusers = list(filter(lambda x: x.user_id==str(user), users))
if len(tusers) == 1:
user = tusers[0]
else:
raise ZKErrorResponse("Can't find user")
2018-04-07 07:01:35 +07:00
if isinstance(fingers, Finger):
2018-04-30 21:25:43 +07:00
fingers = [fingers]
fpack = b""
table = b""
2018-12-27 13:57:49 +07:00
fnum = 0x10
2018-04-07 07:01:35 +07:00
tstart = 0
for finger in fingers:
tfp = finger.repack_only()
2018-04-19 07:22:03 +07:00
table += pack("<bHbI", 2, user.uid, fnum + finger.fid, tstart)
2018-04-07 07:01:35 +07:00
tstart += len(tfp)
fpack += tfp
2018-12-27 13:57:49 +07:00
if self.user_packet_size == 28:
upack = user.repack29()
2018-12-27 13:57:49 +07:00
else:
upack = user.repack73()
2018-04-07 07:01:35 +07:00
head = pack("III", len(upack), len(table), len(fpack))
packet = head + upack + table + fpack
self._send_with_buffer(packet)
2018-12-27 13:57:49 +07:00
command = 110
command_string = pack('<IHH', 12,0,8)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if not cmd_response.get('status'):
raise ZKErrorResponse("Can't save utemp")
2018-04-07 07:01:35 +07:00
self.refresh_data()
def _send_with_buffer(self, buffer):
MAX_CHUNK = 1024
size = len(buffer)
self.free_data()
command = const.CMD_PREPARE_DATA
command_string = pack('I', size)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if not cmd_response.get('status'):
raise ZKErrorResponse("Can't prepare data")
2018-04-07 07:01:35 +07:00
remain = size % MAX_CHUNK
packets = (size - remain) // MAX_CHUNK
2018-04-07 07:01:35 +07:00
start = 0
for _wlk in range(packets):
self.__send_chunk(buffer[start:start+MAX_CHUNK])
start += MAX_CHUNK
if remain:
self.__send_chunk(buffer[start:start+remain])
2016-06-11 13:58:49 +07:00
2018-04-07 07:01:35 +07:00
def __send_chunk(self, command_string):
command = const.CMD_DATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if cmd_response.get('status'):
2018-12-27 13:57:49 +07:00
return True
2018-04-07 07:01:35 +07:00
else:
raise ZKErrorResponse("Can't send chunk")
2018-04-07 07:01:35 +07:00
2018-05-05 06:18:23 +07:00
def delete_user_template(self, uid=0, temp_id=0, user_id=''):
2018-03-28 06:32:56 +07:00
"""
Delete specific template
2018-12-29 08:48:32 +07:00
:param uid: user ID that are generated from device
:param user_id: your own user ID
:return: bool
2018-03-28 06:32:56 +07:00
"""
if self.tcp and user_id:
2018-12-27 13:57:49 +07:00
command = 134
command_string = pack('<24sB', str(user_id), temp_id)
cmd_response = self.__send_command(command, command_string)
if cmd_response.get('status'):
2018-12-27 13:57:49 +07:00
return True
else:
return False # probably empty!
2018-05-05 06:18:23 +07:00
if not uid:
users = self.get_users()
users = list(filter(lambda x: x.user_id==str(user_id), users))
if not users:
return False
uid = users[0].uid
2018-03-28 06:32:56 +07:00
command = const.CMD_DELETE_USERTEMP
command_string = pack('hb', uid, temp_id)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-03-28 06:32:56 +07:00
if cmd_response.get('status'):
return True #refres_data (1013)?
else:
2018-04-07 07:01:35 +07:00
return False # probably empty!
2018-03-28 06:32:56 +07:00
def delete_user(self, uid=0, user_id=''):
2018-12-27 13:57:49 +07:00
"""
delete specific user by uid or user_id
2018-12-29 08:48:32 +07:00
:param uid: user ID that are generated from device
:param user_id: your own user ID
:return: bool
2018-12-27 13:57:49 +07:00
"""
2018-05-05 06:18:23 +07:00
if not uid:
users = self.get_users()
users = list(filter(lambda x: x.user_id==str(user_id), users))
if not users:
return False
uid = users[0].uid
2016-06-11 13:58:49 +07:00
command = const.CMD_DELETE_USER
2018-04-07 07:01:35 +07:00
command_string = pack('h', uid)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
if not cmd_response.get('status'):
raise ZKErrorResponse("Can't delete user")
self.refresh_data()
if uid == (self.next_uid - 1):
2018-12-27 13:57:49 +07:00
self.next_uid = uid
2018-03-28 06:32:56 +07:00
2018-05-05 06:18:23 +07:00
def get_user_template(self, uid, temp_id=0, user_id=''):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:param uid: user ID that are generated from device
:param user_id: your own user ID
:return: list Finger object of the selected user
"""
2018-05-05 06:18:23 +07:00
if not uid:
users = self.get_users()
users = list(filter(lambda x: x.user_id==str(user_id), users))
if not users:
return False
uid = users[0].uid
for _retries in range(3):
2018-12-27 13:57:49 +07:00
command = 88 # command secret!!! GET_USER_TEMPLATE
2018-05-05 06:18:23 +07:00
command_string = pack('hb', uid, temp_id)
response_size = 1024 + 8
cmd_response = self.__send_command(command, command_string, response_size)
data = self.__recieve_chunk()
if data is not None:
2018-12-27 13:57:49 +07:00
resp = data[:-1]
2018-05-05 06:18:23 +07:00
if resp[-6:] == b'\x00\x00\x00\x00\x00\x00': # padding? bug?
resp = resp[:-6]
return Finger(uid, temp_id, 1, resp)
if self.verbose: print ("retry get_user_template")
else:
if self.verbose: print ("Can't read/find finger")
2018-05-05 06:18:23 +07:00
return None
def get_templates(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
:return: list of Finger object
2018-12-27 13:57:49 +07:00
"""
self.read_sizes()
if self.fingers == 0:
2018-09-13 21:30:34 +07:00
return []
templates = []
templatedata, size = self.read_with_buffer(const.CMD_DB_RRQ, const.FCT_FINGERTMP)
if size < 4:
2018-12-27 13:57:49 +07:00
if self.verbose: print("WRN: no user data")
return []
total_size = unpack('i', templatedata[0:4])[0]
if self.verbose: print ("get template total size {}, size {} len {}".format(total_size, size, len(templatedata)))
2018-12-27 13:57:49 +07:00
templatedata = templatedata[4:]
while total_size:
size, uid, fid, valid = unpack('HHbb',templatedata[:6])
template = unpack("%is" % (size-6), templatedata[6:size])[0]
finger = Finger(uid, fid, valid, template)
2018-12-27 13:57:49 +07:00
if self.verbose: print(finger)
templates.append(finger)
templatedata = templatedata[size:]
total_size -= size
return templates
2016-05-25 01:32:06 +07:00
2018-12-27 13:57:49 +07:00
def get_users(self):
2018-12-29 08:48:32 +07:00
"""
:return: list of User object
"""
2018-12-27 13:57:49 +07:00
self.read_sizes()
if self.users == 0:
2018-05-12 08:12:22 +07:00
self.next_uid = 1
self.next_user_id='1'
return []
users = []
2018-05-12 08:12:22 +07:00
max_uid = 0
userdata, size = self.read_with_buffer(const.CMD_USERTEMP_RRQ, const.FCT_USER)
if self.verbose: print("user size {} (= {})".format(size, len(userdata)))
if size <= 4:
2018-12-27 13:57:49 +07:00
print("WRN: missing user data")
return []
total_size = unpack("I",userdata[:4])[0]
self.user_packet_size = total_size / self.users
if not self.user_packet_size in [28, 72]:
if self.verbose: print("WRN packet size would be %i" % self.user_packet_size)
userdata = userdata[4:]
if self.user_packet_size == 28:
while len(userdata) >= 28:
uid, privilege, password, name, card, group_id, timezone, user_id = unpack('<HB5s8sIxBhI',userdata.ljust(28, b'\x00')[:28])
2018-05-12 08:12:22 +07:00
if uid > max_uid: max_uid = uid
password = (password.split(b'\x00')[0]).decode(self.encoding, errors='ignore')
name = (name.split(b'\x00')[0]).decode(self.encoding, errors='ignore').strip()
group_id = str(group_id)
user_id = str(user_id)
#TODO: check card value and find in ver8
if not name:
name = "NN-%s" % user_id
2018-04-07 07:01:35 +07:00
user = User(uid, name, privilege, password, group_id, user_id, card)
users.append(user)
if self.verbose: print("[6]user:",uid, privilege, password, name, card, group_id, timezone, user_id)
userdata = userdata[28:]
else:
while len(userdata) >= 72:
uid, privilege, password, name, card, group_id, user_id = unpack('<HB8s24sIx7sx24s', userdata.ljust(72, b'\x00')[:72])
password = (password.split(b'\x00')[0]).decode(self.encoding, errors='ignore')
name = (name.split(b'\x00')[0]).decode(self.encoding, errors='ignore').strip()
group_id = (group_id.split(b'\x00')[0]).decode(self.encoding, errors='ignore').strip()
user_id = (user_id.split(b'\x00')[0]).decode(self.encoding, errors='ignore')
2018-05-12 08:12:22 +07:00
if uid > max_uid: max_uid = uid
if not name:
name = "NN-%s" % user_id
user = User(uid, name, privilege, password, group_id, user_id, card)
users.append(user)
userdata = userdata[72:]
2018-05-12 08:12:22 +07:00
max_uid += 1
self.next_uid = max_uid
self.next_user_id = str(max_uid)
while True:
if any(u for u in users if u.user_id == self.next_user_id):
max_uid += 1
self.next_user_id = str(max_uid)
else:
break
return users
2016-05-25 21:35:50 +07:00
2016-05-26 12:45:28 +07:00
def cancel_capture(self):
2018-12-27 13:57:49 +07:00
"""
2016-05-27 08:54:17 +07:00
cancel capturing finger
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-05-26 12:45:28 +07:00
command = const.CMD_CANCELCAPTURE
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
return bool(cmd_response.get('status'))
2016-05-26 12:45:28 +07:00
def verify_user(self):
2018-12-27 13:57:49 +07:00
"""
2018-04-13 06:10:01 +07:00
start verify finger mode (after capture)
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-05-26 12:45:28 +07:00
command = const.CMD_STARTVERIFY
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2018-04-07 07:01:35 +07:00
if cmd_response.get('status'):
return True
else:
raise ZKErrorResponse("Cant Verify")
2018-04-12 06:38:37 +07:00
def reg_event(self, flags):
2018-12-27 13:57:49 +07:00
"""
reg events
"""
2018-04-12 06:38:37 +07:00
command = const.CMD_REG_EVENT
command_string = pack ("I", flags)
cmd_response = self.__send_command(command, command_string)
if not cmd_response.get('status'):
raise ZKErrorResponse("cant' reg events %i" % flags)
2018-04-20 07:23:23 +07:00
def set_sdk_build_1(self):
command = const.CMD_OPTIONS_WRQ
command_string = b"SDKBuild=1"
2018-04-20 07:23:23 +07:00
cmd_response = self.__send_command(command, command_string)
if not cmd_response.get('status'):
2018-12-27 13:57:49 +07:00
return False
return True
def enroll_user(self, uid=0, temp_id=0, user_id=''):
2018-12-27 13:57:49 +07:00
"""
2016-05-27 08:54:17 +07:00
start enroll user
2018-12-29 08:48:32 +07:00
:param uid: uid
:param temp_id: template id
:param user_id: user ID
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-05-26 12:45:28 +07:00
command = const.CMD_STARTENROLL
done = False
if not user_id:
users = self.get_users()
users = list(filter(lambda x: x.uid==uid, users))
if len(users) >= 1:
user_id = users[0].user_id
2018-12-27 13:57:49 +07:00
else:
return False
if self.tcp:
2018-12-27 13:57:49 +07:00
command_string = pack('<24sbb',str(user_id).encode(), temp_id, 1)
else:
2018-12-27 13:57:49 +07:00
command_string = pack('<Ib', int(user_id), temp_id)
self.cancel_capture()
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if not cmd_response.get('status'):
raise ZKErrorResponse("Cant Enroll user #%i [%i]" %(uid, temp_id))
2018-12-27 13:57:49 +07:00
self.__sock.settimeout(60)
2018-04-12 06:38:37 +07:00
attempts = 3
while attempts:
if self.verbose: print("A:%i esperando primer regevent" % attempts)
2018-12-27 13:57:49 +07:00
data_recv = self.__sock.recv(1032)
2018-04-12 06:38:37 +07:00
self.__ack_ok()
2018-05-04 02:58:52 +07:00
if self.verbose: print(codecs.encode(data_recv,'hex'))
if self.tcp:
2018-12-27 13:57:49 +07:00
if len(data_recv) > 16:
res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0]
if self.verbose: print("res %i" % res)
if res == 0 or res == 6 or res == 4:
if self.verbose: print ("posible timeout o reg Fallido")
break
else:
2018-12-27 13:57:49 +07:00
if len(data_recv) > 8:
res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0]
if self.verbose: print("res %i" % res)
if res == 6 or res == 4:
2018-12-27 13:57:49 +07:00
if self.verbose: print ("posible timeout")
break
if self.verbose: print ("A:%i esperando 2do regevent" % attempts)
2018-12-27 13:57:49 +07:00
data_recv = self.__sock.recv(1032)
2018-04-12 06:38:37 +07:00
self.__ack_ok()
2018-05-04 02:58:52 +07:00
if self.verbose: print (codecs.encode(data_recv, 'hex'))
if self.tcp:
2018-12-27 13:57:49 +07:00
if len(data_recv) > 8:
res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0]
if self.verbose: print("res %i" % res)
if res == 6 or res == 4:
if self.verbose: print ("posible timeout o reg Fallido")
break
elif res == 0x64:
if self.verbose: print ("ok, continue?")
attempts -= 1
else:
2018-12-27 13:57:49 +07:00
if len(data_recv) > 8:
res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0]
if self.verbose: print("res %i" % res)
if res == 6 or res == 4:
if self.verbose: print ("posible timeout o reg Fallido")
break
elif res == 0x64:
if self.verbose: print ("ok, continue?")
attempts -= 1
if attempts == 0:
2018-12-27 13:57:49 +07:00
data_recv = self.__sock.recv(1032)
self.__ack_ok()
2018-05-04 02:58:52 +07:00
if self.verbose: print (codecs.encode(data_recv, 'hex'))
if self.tcp:
res = unpack("H", data_recv.ljust(24,b"\x00")[16:18])[0]
else:
res = unpack("H", data_recv.ljust(16,b"\x00")[8:10])[0]
if self.verbose: print("res %i" % res)
if res == 5:
2018-12-27 13:57:49 +07:00
if self.verbose: print ("finger duplicate")
if res == 6 or res == 4:
2018-12-27 13:57:49 +07:00
if self.verbose: print ("posible timeout")
if res == 0:
size = unpack("H", data_recv.ljust(16,b"\x00")[10:12])[0]
pos = unpack("H", data_recv.ljust(16,b"\x00")[12:14])[0]
if self.verbose: print("enroll ok", size, pos)
done = True
self.__sock.settimeout(self.__timeout)
self.reg_event(0) # TODO: test
self.cancel_capture()
self.verify_user()
return done
2018-12-27 13:57:49 +07:00
def live_capture(self, new_timeout=10):
2018-12-29 08:48:32 +07:00
"""
try live capture of events
"""
was_enabled = self.is_enabled
users = self.get_users()
self.cancel_capture()
self.verify_user()
if not self.is_enabled:
self.enable_device()
if self.verbose: print ("start live_capture")
2018-12-27 13:57:49 +07:00
self.reg_event(const.EF_ATTLOG)
self.__sock.settimeout(new_timeout)
self.end_live_capture = False
while not self.end_live_capture:
try:
if self.verbose: print ("esperando event")
data_recv = self.__sock.recv(1032)
self.__ack_ok()
if self.tcp:
size = unpack('<HHI', data_recv[:8])[2]
header = unpack('HHHH', data_recv[8:16])
data = data_recv[16:]
else:
size = len(data_recv)
2018-06-13 21:14:21 +07:00
header = unpack('<4H', data_recv[:8])
data = data_recv[8:]
if not header[0] == const.CMD_REG_EVENT:
if self.verbose: print("not event! %x" % header[0])
2018-12-27 13:57:49 +07:00
continue
if not len(data):
if self.verbose: print ("empty")
continue
2018-12-27 17:13:03 +07:00
while len(data) >= 12:
if len(data) == 12:
user_id, status, punch, timehex = unpack('<IBB6s', data)
data = data[12:]
elif len(data) == 32:
2018-12-27 16:35:29 +07:00
user_id, status, punch, timehex = unpack('<24sBB6s', data[:32])
data = data[32:]
elif len(data) == 36:
user_id, status, punch, timehex, _other = unpack('<24sBB6s4s', data[:36])
data = data[36:]
elif len(data) >= 52:
user_id, status, punch, timehex, _other = unpack('<24sBB6s20s', data[:52])
data = data[52:]
2018-12-27 17:13:03 +07:00
if isinstance(user_id, int):
user_id = str(user_id)
else:
user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore')
timestamp = self.__decode_timehex(timehex)
2018-05-05 06:18:23 +07:00
tuser = list(filter(lambda x: x.user_id == user_id, users))
if not tuser:
uid = int(user_id)
else:
uid = tuser[0].uid
yield Attendance(user_id, timestamp, status, punch, uid)
except timeout:
if self.verbose: print ("time out")
yield None # return to keep watching
except (KeyboardInterrupt, SystemExit):
if self.verbose: print ("break")
break
if self.verbose: print ("exit gracefully")
self.__sock.settimeout(self.__timeout)
2018-12-27 13:57:49 +07:00
self.reg_event(0)
if not was_enabled:
self.disable_device()
2018-12-27 14:54:10 +07:00
def clear_data(self):
2018-12-27 13:57:49 +07:00
"""
2018-12-29 08:48:32 +07:00
clear all data (included: user, attendance report, finger database)
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-06-10 11:27:49 +07:00
command = const.CMD_CLEAR_DATA
2018-12-27 14:54:10 +07:00
command_string = ''
2018-04-19 07:22:03 +07:00
cmd_response = self.__send_command(command, command_string)
2016-06-10 11:27:49 +07:00
if cmd_response.get('status'):
2018-12-27 13:57:49 +07:00
self.is_connect = False
self.next_uid = 1
2016-06-15 19:41:59 +07:00
return True
2016-06-10 11:27:49 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't clear data")
2016-05-25 21:35:50 +07:00
def __recieve_tcp_data(self, data_recv, size):
""" data_recv, raw tcp packet
must analyze tcp_length
must return data, broken
"""
data = []
2018-12-27 13:57:49 +07:00
tcp_length = self.__test_tcp_top(data_recv)
if self.verbose: print ("tcp_length {}, size {}".format(tcp_length, size))
if tcp_length <= 0:
if self.verbose: print ("Incorrect tcp packet")
return None, b""
2018-12-27 13:57:49 +07:00
if (tcp_length - 8) < size:
if self.verbose: print ("tcp length too small... retrying")
resp, bh = self.__recieve_tcp_data(data_recv, tcp_length - 8)
data.append(resp)
size -= len(resp)
if self.verbose: print ("new tcp DATA packet to fill misssing {}".format(size))
2018-12-27 13:57:49 +07:00
data_recv = bh + self.__sock.recv(size + 16 )
2018-09-13 02:21:07 +07:00
if self.verbose: print ("new tcp DATA starting with {} bytes".format(len(data_recv)))
resp, bh = self.__recieve_tcp_data(data_recv, size)
data.append(resp)
if self.verbose: print ("for misssing {} recieved {} with extra {}".format(size, len(resp), len(bh)))
return b''.join(data), bh
recieved = len(data_recv)
if self.verbose: print ("recieved {}, size {}".format(recieved, size))
response = unpack('HHHH', data_recv[8:16])[0]
2018-12-27 13:57:49 +07:00
if recieved >= (size + 32):
if response == const.CMD_DATA:
2018-12-27 13:57:49 +07:00
resp = data_recv[16 : size + 16]
if self.verbose: print ("resp complete len {}".format(len(resp)))
return resp, data_recv[size + 16:]
else:
if self.verbose: print("incorrect response!!! {}".format(response))
2018-12-27 13:57:49 +07:00
return None, b""
else:
if self.verbose: print ("try DATA incomplete (actual valid {})".format(recieved-16))
2018-12-27 13:57:49 +07:00
data.append(data_recv[16 : size + 16 ])
size -= recieved - 16
broken_header = b""
2018-12-27 13:57:49 +07:00
if size < 0:
broken_header = data_recv[size:]
2018-12-27 13:57:49 +07:00
if self.verbose: print ("broken", (broken_header).encode('hex'))
if size > 0:
data_recv = self.__recieve_raw_data(size)
2018-12-27 13:57:49 +07:00
data.append(data_recv)
return b''.join(data), broken_header
def __recieve_raw_data(self, size):
""" partial data ? """
data = []
if self.verbose: print ("expecting {} bytes raw data".format(size))
while size > 0:
2018-12-27 13:57:49 +07:00
data_recv = self.__sock.recv(size)
recieved = len(data_recv)
if self.verbose: print ("partial recv {}".format(recieved))
2018-09-13 02:21:07 +07:00
if recieved < 100 and self.verbose: print (" recv {}".format(codecs.encode(data_recv, 'hex')))
2018-12-27 13:57:49 +07:00
data.append(data_recv)
size -= recieved
if self.verbose: print ("still need {}".format(size))
return b''.join(data)
2018-05-05 06:18:23 +07:00
def __recieve_chunk(self):
""" recieve a chunk """
2018-12-27 13:57:49 +07:00
if self.__response == const.CMD_DATA:
if self.tcp:
if self.verbose: print ("_rc_DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length))
if len(self.__data) < (self.__tcp_length - 8):
need = (self.__tcp_length - 8) - len(self.__data)
if self.verbose: print ("need more data: {}".format(need))
2018-09-01 07:17:28 +07:00
more_data = self.__recieve_raw_data(need)
return b''.join([self.__data, more_data])
2018-12-27 13:57:49 +07:00
else:
if self.verbose: print ("Enough data")
return self.__data
2018-12-27 13:57:49 +07:00
else:
if self.verbose: print ("_rc len is {}".format(len(self.__data)))
2018-12-27 13:57:49 +07:00
return self.__data
elif self.__response == const.CMD_PREPARE_DATA:
2018-05-05 06:18:23 +07:00
data = []
2018-12-27 13:57:49 +07:00
size = self.__get_data_size()
if self.verbose: print ("recieve chunk: prepare data size is {}".format(size))
2018-05-05 06:18:23 +07:00
if self.tcp:
2018-12-27 13:57:49 +07:00
if len(self.__data) >= (8 + size):
data_recv = self.__data[8:]
else:
2018-12-27 13:57:49 +07:00
data_recv = self.__data[8:] + self.__sock.recv(size + 32)
resp, broken_header = self.__recieve_tcp_data(data_recv, size)
data.append(resp)
# get CMD_ACK_OK
if len(broken_header) < 16:
data_recv = broken_header + self.__sock.recv(16)
else:
data_recv = broken_header
if len(data_recv) < 16:
print ("trying to complete broken ACK %s /16" % len(data_recv))
2018-12-27 13:57:49 +07:00
if self.verbose: print (data_recv.encode('hex'))
data_recv += self.__sock.recv(16 - len(data_recv)) #TODO: CHECK HERE_!
if not self.__test_tcp_top(data_recv):
if self.verbose: print ("invalid chunk tcp ACK OK")
2018-12-27 13:57:49 +07:00
return None
response = unpack('HHHH', data_recv[8:16])[0]
if response == const.CMD_ACK_OK:
if self.verbose: print ("chunk tcp ACK OK!")
return b''.join(data)
if self.verbose: print("bad response %s" % data_recv)
if self.verbose: print (codecs.encode(data,'hex'))
return None
return resp
2018-12-27 13:57:49 +07:00
while True:
2018-05-08 06:48:19 +07:00
data_recv = self.__sock.recv(1024+8)
2018-06-13 21:14:21 +07:00
response = unpack('<4H', data_recv[:8])[0]
2018-05-05 06:18:23 +07:00
if self.verbose: print ("# packet response is: {}".format(response))
if response == const.CMD_DATA:
2018-12-27 13:57:49 +07:00
data.append(data_recv[8:])
size -= 1024
2018-05-05 06:18:23 +07:00
elif response == const.CMD_ACK_OK:
2018-12-27 13:57:49 +07:00
break
2018-05-05 06:18:23 +07:00
else:
if self.verbose: print ("broken!")
break
if self.verbose: print ("still needs %s" % size)
return b''.join(data)
else:
if self.verbose: print ("invalid response %s" % self.__response)
2018-12-27 13:57:49 +07:00
return None
2018-05-05 06:18:23 +07:00
def __read_chunk(self, start, size):
2018-12-27 13:57:49 +07:00
"""
read a chunk from buffer
"""
2018-05-05 06:18:23 +07:00
for _retries in range(3):
2018-12-27 13:57:49 +07:00
command = 1504
2018-05-05 06:18:23 +07:00
command_string = pack('<ii', start, size)
2018-06-07 04:42:43 +07:00
if self.tcp:
response_size = size + 32
else:
response_size = 1024 + 8
2018-05-05 06:18:23 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
data = self.__recieve_chunk()
if data is not None:
return data
else:
raise ZKErrorResponse("can't read chunk %i:[%i]" % (start, size))
def read_with_buffer(self, command, fct=0 ,ext=0):
2018-12-27 13:57:49 +07:00
"""
Test read info with buffered command (ZK6: 1503)
"""
if self.tcp:
2018-12-27 13:57:49 +07:00
MAX_CHUNK = 0xFFc0
else:
MAX_CHUNK = 16 * 1024
command_string = pack('<bhii', 1, command, fct, ext)
if self.verbose: print ("rwb cs", command_string)
response_size = 1024
data = []
start = 0
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(1503, command_string, response_size)
if not cmd_response.get('status'):
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("RWB Not supported")
if cmd_response['code'] == const.CMD_DATA:
2018-12-27 13:57:49 +07:00
if self.tcp:
if self.verbose: print ("DATA! is {} bytes, tcp length is {}".format(len(self.__data), self.__tcp_length))
if len(self.__data) < (self.__tcp_length - 8):
need = (self.__tcp_length - 8) - len(self.__data)
if self.verbose: print ("need more data: {}".format(need))
2018-09-01 07:17:28 +07:00
more_data = self.__recieve_raw_data(need)
return b''.join([self.__data, more_data]), len(self.__data) + len(more_data)
2018-12-27 13:57:49 +07:00
else:
if self.verbose: print ("Enough data")
size = len(self.__data)
return self.__data, size
2018-12-27 13:57:49 +07:00
else:
size = len(self.__data)
return self.__data, size
2018-12-27 13:57:49 +07:00
size = unpack('I', self.__data[1:5])[0]
if self.verbose: print ("size fill be %i" % size)
remain = size % MAX_CHUNK
packets = (size-remain) // MAX_CHUNK # should be size /16k
if self.verbose: print ("rwb: #{} packets of max {} bytes, and extra {} bytes remain".format(packets, MAX_CHUNK, remain))
for _wlk in range(packets):
data.append(self.__read_chunk(start,MAX_CHUNK))
start += MAX_CHUNK
if remain:
data.append(self.__read_chunk(start, remain))
2018-12-27 13:57:49 +07:00
start += remain
self.free_data()
if self.verbose: print ("_read w/chunk %i bytes" % start)
return b''.join(data), start
2016-05-25 21:35:50 +07:00
def get_attendance(self):
2018-12-27 13:57:49 +07:00
"""
return attendance record
2018-12-29 08:48:32 +07:00
:return: List of Attendance object
2018-12-27 13:57:49 +07:00
"""
self.read_sizes()
2018-12-27 13:57:49 +07:00
if self.records == 0:
return []
users = self.get_users()
if self.verbose: print (users)
attendances = []
attendance_data, size = self.read_with_buffer(const.CMD_ATTLOG_RRQ)
if size < 4:
2018-12-27 13:57:49 +07:00
if self.verbose: print ("WRN: no attendance data")
return []
total_size = unpack("I", attendance_data[:4])[0]
record_size = total_size/self.records
if self.verbose: print ("record_size is ", record_size)
2018-12-27 13:57:49 +07:00
attendance_data = attendance_data[4:]
if record_size == 8:
while len(attendance_data) >= 8:
2018-06-12 05:28:52 +07:00
uid, status, timestamp, punch = unpack('HB4sB', attendance_data.ljust(8, b'\x00')[:8])
if self.verbose: print (codecs.encode(attendance_data[:8], 'hex'))
2018-04-13 06:10:01 +07:00
attendance_data = attendance_data[8:]
tuser = list(filter(lambda x: x.uid == uid, users))
if not tuser:
2018-12-27 13:57:49 +07:00
user_id = str(uid)
else:
user_id = tuser[0].user_id
timestamp = self.__decode_time(timestamp)
2018-12-27 13:57:49 +07:00
attendance = Attendance(user_id, timestamp, status, punch, uid)
attendances.append(attendance)
2018-12-27 13:57:49 +07:00
elif record_size == 16:
while len(attendance_data) >= 16:
2018-06-12 05:28:52 +07:00
user_id, timestamp, status, punch, reserved, workcode = unpack('<I4sBB2sI', attendance_data.ljust(16, b'\x00')[:16])
2018-05-05 06:18:23 +07:00
user_id = str(user_id)
if self.verbose: print(codecs.encode(attendance_data[:16], 'hex'))
attendance_data = attendance_data[16:]
2018-05-05 06:18:23 +07:00
tuser = list(filter(lambda x: x.user_id == user_id, users))
if not tuser:
if self.verbose: print("no uid {}", user_id)
uid = str(user_id)
2018-12-27 13:57:49 +07:00
tuser = list(filter(lambda x: x.uid == user_id, users))
if not tuser:
2018-12-27 13:57:49 +07:00
uid = str(user_id)
else:
uid = tuser[0].uid
user_id = tuser[0].user_id
else:
uid = tuser[0].uid
timestamp = self.__decode_time(timestamp)
attendance = Attendance(user_id, timestamp, status, punch, uid)
attendances.append(attendance)
else:
while len(attendance_data) >= 40:
2018-06-12 05:28:52 +07:00
uid, user_id, status, timestamp, punch, space = unpack('<H24sB4sB8s', attendance_data.ljust(40, b'\x00')[:40])
if self.verbose: print (codecs.encode(attendance_data[:40], 'hex'))
user_id = (user_id.split(b'\x00')[0]).decode(errors='ignore')
timestamp = self.__decode_time(timestamp)
attendance = Attendance(user_id, timestamp, status, punch, uid)
attendances.append(attendance)
attendance_data = attendance_data[40:]
return attendances
2016-05-25 21:35:50 +07:00
def clear_attendance(self):
2018-12-27 13:57:49 +07:00
"""
2016-06-15 19:15:12 +07:00
clear all attendance record
2018-12-29 08:48:32 +07:00
:return: bool
2018-12-27 13:57:49 +07:00
"""
2016-06-15 19:15:12 +07:00
command = const.CMD_CLEAR_ATTLOG
2018-04-20 07:23:23 +07:00
cmd_response = self.__send_command(command)
2016-06-15 19:15:12 +07:00
if cmd_response.get('status'):
return True
else:
2018-12-27 13:57:49 +07:00
raise ZKErrorResponse("Can't clear response")