pyztk/zk/base.py

1035 lines
39 KiB
Python
Raw Normal View History

2016-05-23 13:50:54 +07:00
# -*- coding: utf-8 -*-
import sys
2016-06-12 15:46:20 +07:00
from datetime import datetime
2016-06-24 19:00:55 +07:00
from socket import AF_INET, SOCK_DGRAM, socket
2016-05-23 13:50:54 +07:00
from struct import pack, unpack
from zk import const
2016-06-12 16:35:02 +07:00
from zk.attendance import Attendance
2016-06-24 19:00:55 +07:00
from zk.exception import ZKErrorResponse, ZKNetworkError
2016-05-25 16:28:55 +07:00
from zk.user import User
from zk.finger import Finger
2016-05-23 13:50:54 +07:00
def make_commkey(key, session_id, ticks=50):
"""take a password and session_id and scramble them to send to the time
clock.
copied from commpro.c - MakeKey"""
key = int(key)
session_id = int(session_id)
k = 0
for i in range(32):
if (key & (1 << i)):
k = (k << 1 | 1)
else:
k = k << 1
k += session_id
k = pack(b'I', k)
k = unpack(b'BBBB', k)
k = pack(
b'BBBB',
k[0] ^ ord('Z'),
k[1] ^ ord('K'),
k[2] ^ ord('S'),
k[3] ^ ord('O'))
k = unpack(b'HH', k)
k = pack(b'HH', k[1], k[0])
B = 0xff & ticks
k = unpack(b'BBBB', k)
k = pack(
b'BBBB',
k[0] ^ B,
k[1] ^ B,
B,
k[3] ^ B)
return k
2016-06-24 19:00:55 +07:00
2016-05-23 13:50:54 +07:00
class ZK(object):
2018-04-12 06:38:37 +07:00
""" Clase ZK """
def __init__(self, ip, port=4370, timeout=60, password=0, firmware=8):
2018-04-12 06:38:37 +07:00
""" initialize instance """
2017-12-06 21:11:22 +07:00
self.is_connect = False
2016-05-23 13:50:54 +07:00
self.__address = (ip, port)
self.__sock = socket(AF_INET, SOCK_DGRAM)
self.__sock.settimeout(timeout)
self.__password = password # passint
self.__firmware = int(firmware) #TODO check minor version?
self.users = 0
self.fingers = 0
self.records = 0
self.dummy = 0
self.cards = 0
self.fingers_cap = 0
self.users_cap = 0
self.rec_cap = 0
self.fingers_av = 0
self.users_av = 0
self.rec_av = 0
2018-04-12 06:38:37 +07:00
self.__session_id = 0
self.__reply_id = const.USHRT_MAX-1
self.__data_recv = None
def __create_header(self, command, command_string, session_id, reply_id):
2016-05-23 13:50:54 +07:00
'''
Puts a the parts that make up a packet together and packs them into a byte string
2018-04-12 06:38:37 +07:00
MODIFIED now, without initial checksum
2016-05-23 13:50:54 +07:00
'''
2018-04-12 06:38:37 +07:00
#checksum = 0 always? for calculating
buf = pack('HHHH', command, 0, session_id, reply_id) + command_string
2016-06-24 19:00:55 +07:00
buf = unpack('8B' + '%sB' % len(command_string), buf)
2016-05-23 23:59:43 +07:00
checksum = unpack('H', self.__create_checksum(buf))[0]
2016-05-23 13:50:54 +07:00
reply_id += 1
if reply_id >= const.USHRT_MAX:
reply_id -= const.USHRT_MAX
2016-05-23 23:59:43 +07:00
buf = pack('HHHH', command, checksum, session_id, reply_id)
2016-05-23 13:50:54 +07:00
return buf + command_string
def __create_checksum(self, p):
'''
2016-05-23 23:59:43 +07:00
Calculates the checksum of the packet to be sent to the time clock
2016-05-23 13:50:54 +07:00
Copied from zkemsdk.c
'''
l = len(p)
2016-05-23 23:59:43 +07:00
checksum = 0
2016-05-23 13:50:54 +07:00
while l > 1:
2016-05-23 23:59:43 +07:00
checksum += unpack('H', pack('BB', p[0], p[1]))[0]
2016-05-23 13:50:54 +07:00
p = p[2:]
2016-05-23 23:59:43 +07:00
if checksum > const.USHRT_MAX:
checksum -= const.USHRT_MAX
2016-05-23 13:50:54 +07:00
l -= 2
if l:
2016-05-23 23:59:43 +07:00
checksum = checksum + p[-1]
2016-06-15 19:15:12 +07:00
2016-05-23 23:59:43 +07:00
while checksum > const.USHRT_MAX:
checksum -= const.USHRT_MAX
2016-06-15 19:15:12 +07:00
2016-05-23 23:59:43 +07:00
checksum = ~checksum
2016-06-15 19:15:12 +07:00
2016-05-23 23:59:43 +07:00
while checksum < 0:
checksum += const.USHRT_MAX
2016-05-23 13:50:54 +07:00
2016-05-23 23:59:43 +07:00
return pack('H', checksum)
2018-04-12 06:38:37 +07:00
def __send_command(self, command, command_string='', response_size=8):
2016-06-15 19:15:12 +07:00
'''
send command to the terminal
'''
2018-04-12 06:38:37 +07:00
buf = self.__create_header(command, command_string, self.__session_id, self.__reply_id)
2016-06-10 15:35:55 +07:00
try:
self.__sock.sendto(buf, self.__address)
self.__data_recv = self.__sock.recv(response_size)
except Exception, e:
raise ZKNetworkError(str(e))
2016-05-29 20:27:01 +07:00
self.__response = unpack('HHHH', self.__data_recv[:8])[0]
self.__reply_id = unpack('HHHH', self.__data_recv[:8])[3]
2018-04-12 06:38:37 +07:00
if self.__response in [const.CMD_ACK_OK, const.CMD_PREPARE_DATA, const.CMD_DATA]:
2016-05-29 20:27:01 +07:00
return {
'status': True,
'code': self.__response
}
2018-04-12 06:38:37 +07:00
return {
'status': False,
'code': self.__response
}
def __ack_ok(self):
""" ack ok """
buf = self.__create_header(const.CMD_ACK_OK, "", self.__session_id, const.USHRT_MAX - 1)
try:
self.__sock.sendto(buf, self.__address)
except Exception, e:
raise ZKNetworkError(str(e))
2016-05-23 13:50:54 +07:00
2016-05-27 10:01:00 +07:00
def __get_data_size(self):
"""Checks a returned packet to see if it returned CMD_PREPARE_DATA,
indicating that data packets are to be sent
Returns the amount of bytes that are going to be sent"""
response = self.__response
if response == const.CMD_PREPARE_DATA:
size = unpack('I', self.__data_recv[8:12])[0]
return size
else:
return 0
2016-06-12 15:46:20 +07:00
def __reverse_hex(self, hex):
data = ''
2016-06-24 19:00:55 +07:00
for i in reversed(xrange(len(hex) / 2)):
data += hex[i * 2:(i * 2) + 2]
2016-06-12 15:46:20 +07:00
return data
def __decode_time(self, t):
"""Decode a timestamp retrieved from the timeclock
copied from zkemsdk.c - DecodeTime"""
t = t.encode('hex')
t = int(self.__reverse_hex(t), 16)
2017-12-07 06:28:41 +07:00
#print "decode from %s "% format(t, '04x')
2016-06-12 15:46:20 +07:00
second = t % 60
t = t / 60
minute = t % 60
t = t / 60
hour = t % 24
t = t / 24
2016-06-24 19:00:55 +07:00
day = t % 31 + 1
2016-06-12 15:46:20 +07:00
t = t / 31
2016-06-24 19:00:55 +07:00
month = t % 12 + 1
2016-06-12 15:46:20 +07:00
t = t / 12
year = t + 2000
d = datetime(year, month, day, hour, minute, second)
return d
2017-12-07 06:28:41 +07:00
def __encode_time(self, t):
"""Encode a timestamp so that it can be read on the timeclock
"""
# formula taken from zkemsdk.c - EncodeTime
# can also be found in the technical manual
d = (
((t.year % 100) * 12 * 31 + ((t.month - 1) * 31) + t.day - 1) *
(24 * 60 * 60) + (t.hour * 60 + t.minute) * 60 + t.second
)
return d
2017-12-08 07:02:18 +07:00
2016-05-23 13:50:54 +07:00
def connect(self):
'''
2016-06-15 19:15:12 +07:00
connect to the device
2016-05-23 13:50:54 +07:00
'''
2018-04-12 06:38:37 +07:00
self.__session_id = 0
self.__reply_id = const.USHRT_MAX - 1
cmd_response = self.__send_command(const.CMD_CONNECT)
self.__session_id = unpack('HHHH', self.__data_recv[:8])[2]
if cmd_response.get('code')==const.CMD_ACK_UNAUTH:
2017-12-07 06:28:41 +07:00
#print "try auth"
2018-04-12 06:38:37 +07:00
command_string = make_commkey(self.__password, self.__session_id)
cmd_response = self.__send_command(const.CMD_AUTH, command_string)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
2016-05-26 12:45:28 +07:00
self.is_connect = True
# set the session iduid, privilege, password, name, card, group_id, timezone, user_id = unpack('HB5s8s5sBhI',userdata.ljust(28)[:28])
2016-06-24 19:00:55 +07:00
return self
2016-05-23 23:59:43 +07:00
else:
print "connect err {} ".format(cmd_response["code"])
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("Invalid response: Can't connect")
2016-05-23 13:50:54 +07:00
def disconnect(self):
'''
2016-06-15 19:15:12 +07:00
diconnect from the connected device
2016-05-23 13:50:54 +07:00
'''
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(const.CMD_EXIT)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
2017-12-06 21:11:22 +07:00
self.is_connect = False
2016-05-29 20:27:01 +07:00
return True
2016-05-23 23:59:43 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't disconnect")
2016-05-23 13:55:09 +07:00
2016-05-29 20:27:01 +07:00
def disable_device(self):
2016-05-27 08:54:17 +07:00
'''
2016-06-15 19:15:12 +07:00
disable (lock) device, ensure no activity when process run
2016-05-27 08:54:17 +07:00
'''
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(const.CMD_DISABLEDEVICE)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-23 23:59:43 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("Can't Disable")
2016-05-23 14:41:39 +07:00
2016-05-29 20:27:01 +07:00
def enable_device(self):
2016-05-23 13:55:09 +07:00
'''
2016-06-15 19:15:12 +07:00
re-enable the connected device
2016-05-23 13:55:09 +07:00
'''
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(const.CMD_ENABLEDEVICE)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-23 23:59:43 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("Can't enable")
2016-05-23 13:55:09 +07:00
2016-05-29 20:27:01 +07:00
def get_firmware_version(self):
2016-05-23 13:55:09 +07:00
'''
2016-06-15 19:15:12 +07:00
return the firmware version
2016-05-23 13:55:09 +07:00
'''
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(const.CMD_GET_VERSION,'', 1024)
2016-05-23 23:59:43 +07:00
if cmd_response.get('status'):
firmware_version = self.__data_recv[8:].split('\x00')[0]
2016-05-29 20:27:01 +07:00
return firmware_version
2016-05-23 23:59:43 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("Can't read frimware version")
2016-05-23 21:00:43 +07:00
2016-06-09 11:20:39 +07:00
def get_serialnumber(self):
2016-06-15 19:15:12 +07:00
'''
return the serial number
'''
2016-06-10 11:27:49 +07:00
command = const.CMD_OPTIONS_RRQ
2016-06-09 11:20:39 +07:00
command_string = '~SerialNumber'
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-06-09 11:20:39 +07:00
if cmd_response.get('status'):
serialnumber = self.__data_recv[8:].split('=')[-1].split('\x00')[0]
2016-06-09 13:37:19 +07:00
return serialnumber
2016-06-09 11:20:39 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't read serial number")
2016-06-09 11:20:39 +07:00
2018-03-17 07:00:24 +07:00
def get_platform(self):
'''
return the serial number
'''
command = const.CMD_OPTIONS_RRQ
command_string = '~Platform'
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-17 07:00:24 +07:00
if cmd_response.get('status'):
platform = self.__data_recv[8:].split('=')[-1].split('\x00')[0]
return platform
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't get platform")
2018-03-17 07:00:24 +07:00
def get_device_name(self):
'''
return the serial number
'''
command = const.CMD_OPTIONS_RRQ
command_string = '~DeviceName'
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-17 07:00:24 +07:00
if cmd_response.get('status'):
device = self.__data_recv[8:].split('=')[-1].split('\x00')[0]
return device
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't read device name")
2018-03-17 07:00:24 +07:00
def get_pin_width(self):
'''
return the serial number
'''
command = const.CMD_GET_PINWIDTH
command_string = ' P'
2018-04-12 06:38:37 +07:00
response_size = 9
2018-03-17 07:00:24 +07:00
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-17 07:00:24 +07:00
if cmd_response.get('status'):
width = self.__data_recv[8:].split('\x00')[0]
return bytearray(width)[0]
2018-03-17 07:00:24 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can0t get pin width")
2018-03-17 07:00:24 +07:00
def free_data(self):
""" clear buffer"""
command = const.CMD_FREE_DATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't free data")
def read_sizes(self):
""" read sizes """
command = const.CMD_GET_FREE_SIZES
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command,'', response_size)
if cmd_response.get('status'):
fields = unpack('iiiiiiiiiiiiiiiiiiii', self.__data_recv[8:])
self.users = fields[4]
self.fingers = fields[6]
self.records = fields[8]
self.dummy = fields[10] #???
self.cards = fields[12]
self.fingers_cap = fields[14]
self.users_cap = fields[15]
self.rec_cap = fields[16]
self.fingers_av = fields[17]
self.users_av = fields[18]
self.rec_av = fields[19]
#TODO: get faces size...
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't read sizes")
def __str__(self):
""" for debug"""
return "ZK%i adr:%s:%s users:%i/%i fingers:%i/%i, records:%i/%i" % (
self.__firmware, self.__address[0], self.__address[1],
self.users, self.users_cap, self.fingers, self.fingers_cap,
self.records, self.rec_cap
)
2018-03-17 07:00:24 +07:00
2016-05-29 20:27:01 +07:00
def restart(self):
2016-05-27 08:54:17 +07:00
'''
2016-06-15 19:15:12 +07:00
restart the device
2016-05-27 08:54:17 +07:00
'''
2016-05-29 20:27:01 +07:00
command = const.CMD_RESTART
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2016-05-24 21:49:04 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-24 21:49:04 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't restart device")
2016-05-24 21:49:04 +07:00
2017-12-07 06:28:41 +07:00
def get_time(self):
"""obtener la hora del equipo"""
command = const.CMD_GET_TIME
response_size = 1032
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, '', response_size)
2017-12-07 06:28:41 +07:00
if cmd_response.get('status'):
return self.__decode_time(self.__data_recv[8:12])
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't get time")
2017-12-07 06:28:41 +07:00
def set_time(self, timestamp):
""" colocar la hora del sistema al zk """
command = const.CMD_SET_TIME
command_string = pack(b'I', self.__encode_time(timestamp))
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2017-12-07 06:28:41 +07:00
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't set time")
2017-12-07 06:28:41 +07:00
2016-06-10 14:16:22 +07:00
def poweroff(self):
2016-05-27 08:54:17 +07:00
'''
2016-06-15 19:15:12 +07:00
shutdown the device
2016-05-27 08:54:17 +07:00
'''
2016-05-29 20:27:01 +07:00
command = const.CMD_POWEROFF
command_string = ''
2017-12-07 06:28:41 +07:00
response_size = 1032
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-05-24 21:49:04 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-24 21:49:04 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't poweroff")
2016-05-24 21:49:04 +07:00
2018-04-07 07:01:35 +07:00
def refresh_data(self):
'''
shutdown the device
'''
command = const.CMD_REFRESHDATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2018-04-07 07:01:35 +07:00
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't refresh data")
2018-04-07 07:01:35 +07:00
2018-04-13 06:10:01 +07:00
def test_voice(self, index=0):
2016-05-27 08:54:17 +07:00
'''
play test voice
2018-04-13 06:10:01 +07:00
0 acceso correcto
1 password incorrecto
2 la memoria del terminal estĆ” llena
3 usuario invalido
4 intente de nuevo por favor *
5 reintroduszca codigo de usuario
6 memoria del terminal llena
7 memoria de alm fich llena
8 huella duplicada
9 acceso denegado
10 *beep*
11 el sistema vuelve al modo de verificacion
12 por favor coloque su dedo o acerque tarjeta
13 acerca su tarjeta de nuevo
14 excedido tiempo p esta operacion
15 coloque su dedo de nuevo
16 coloque su dedo por ultima vez
17 ATN numero de tarjeta estĆ” repetida
18 proceso de registro correcto *
19 borrado correcto
20 Numero de usuario
21 ATN se ha llegado al max num usuarios
22 verificacion de usuarios
23 usuario no registrado
24 ATN se ha llegado al num max de registros
25 ATN la puerta no esta cerrada
26 registro de usuarios
27 borrado de usuarios
28 coloque su dedo
29 registre la tarjeta de administrador
30 0
31 1
32 2
33 3
34 4
35 5
36 6
37 7
38 8
39 9
40 PFV seleccione numero de usuario
41 registrar
42 operacion correcta
43 PFV acerque su tarjeta
43 la tarjeta ha sido registrada
45 error en operacion
46 PFV acerque tarjeta de administracion, p confirmacion
47 descarga de fichajes
48 descarga de usuarios
49 carga de usuarios
50 actualizan de firmware
51 ejeuctar ficheros de configuracion
52 confirmaciĆ³n de clave de acceso correcta
53 error en operacion de tclado
54 borrar todos los usuarios
55 restaurar terminal con configuracion por defecto
56 introduzca numero de usuario
57 teclado bloqueado
58 error en la gestiĆ³n de la tarjeta
59 establezca una clave de acceso
60 pulse el teclado
61 zona de accceso invalida
62 acceso combinado invÄŗlido
63 verificaciĆ³n multiusuario
64 modo de verificaciĆ³n invĆ”lido
65 -
2016-05-27 08:54:17 +07:00
'''
2018-04-12 06:38:37 +07:00
command = const.CMD_TESTVOICE
2018-04-13 06:10:01 +07:00
command_string = pack("I", index)
cmd_response = self.__send_command(command, command_string)
2016-05-24 21:52:42 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-24 21:52:42 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't test voice")
2016-05-24 21:52:42 +07:00
2018-03-28 06:32:56 +07:00
def set_user(self, uid, name, privilege=0, password='', group_id='', user_id='', card=0):
2016-05-27 08:54:17 +07:00
'''
create or update user by uid
'''
2016-05-25 01:32:06 +07:00
command = const.CMD_USER_WRQ
if not user_id:
user_id = str(uid) #ZK6 needs uid2 == uid
#uid = chr(uid % 256) + chr(uid >> 8)
2016-05-25 12:02:56 +07:00
if privilege not in [const.USER_DEFAULT, const.USER_ADMIN]:
privilege = const.USER_DEFAULT
privilege = chr(privilege)
if self.__firmware == 6:
print "uid : %i" % uid
print "pri : %c" % privilege
print "pass: %s" % str(password)
print "name: %s" % str(name)
print type(name)
print "group %i" % int(group_id)
print "uid2: %i" % int(user_id)
try:
command_string = pack('Hc5s8s5sBHI', uid, privilege, str(password), str(name), chr(0), int(group_id), 0, int(user_id))
print "cmd : %s" % command_string
except Exception, e:
print "s_h Error pack: %s" % e
print "Error pack: %s" % sys.exc_info()[0]
2018-04-07 07:01:35 +07:00
raise ZKErrorResponse("Cant pack user")
else:
command_string = pack('Hc8s28sc7sx24s', uid, privilege, password, name, chr(0), group_id, user_id)
2016-05-29 20:27:01 +07:00
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-05-25 01:32:06 +07:00
if cmd_response.get('status'):
2016-05-29 20:27:01 +07:00
return True
2016-05-25 01:32:06 +07:00
else:
2018-04-07 07:01:35 +07:00
raise ZKErrorResponse("Cant set user")
def save_user_template(self, user, fingers=[]):
""" save user and template """
# armar paquete de huellas
if isinstance(fingers, Finger):
fingers =[Finger]
fpack = ""
table = ""
fnum = 0x10 # possibly flag
tstart = 0
for finger in fingers:
tfp = finger.repack_only()
table += pack("<bHbI", 2, user.uid, fnum, tstart)
tstart += len(tfp)
fnum += 1 # hack
fpack += tfp
upack = user.repack29()
head = pack("III", len(upack), len(table), len(fpack))
packet = head + upack + table + fpack
self._send_with_buffer(packet)
command = 110 # Unknown
command_string = pack('<IHH', 12,0,8) # ??? write? WRQ user data?
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if not cmd_response.get('status'):
raise ZKErrorResponse("Cant save utemp")
self.refresh_data()
def _send_with_buffer(self, buffer):
MAX_CHUNK = 1024
size = len(buffer)
#free_Data
self.free_data()
# send prepare_data
command = const.CMD_PREPARE_DATA
command_string = pack('I', size)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if not cmd_response.get('status'):
raise ZKErrorResponse("Cant prepare data")
remain = size % MAX_CHUNK
packets = (size - remain) / MAX_CHUNK
start = 0
for _wlk in range(packets):
self.__send_chunk(buffer[start:start+MAX_CHUNK])
start += MAX_CHUNK
if remain:
self.__send_chunk(buffer[start:start+remain])
2016-06-11 13:58:49 +07:00
2018-04-07 07:01:35 +07:00
def __send_chunk(self, command_string):
command = const.CMD_DATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if cmd_response.get('status'):
return True #refres_data (1013)?
else:
raise ZKErrorResponse("Cant send chunk")
def delete_user_template(self, uid, temp_id=0):
2018-03-28 06:32:56 +07:00
"""
Delete specific template
"""
command = const.CMD_DELETE_USERTEMP
command_string = pack('hb', uid, temp_id)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-03-28 06:32:56 +07:00
if cmd_response.get('status'):
return True #refres_data (1013)?
else:
2018-04-07 07:01:35 +07:00
return False # probably empty!
2018-03-28 06:32:56 +07:00
2016-06-11 13:58:49 +07:00
def delete_user(self, uid):
2016-06-15 19:15:12 +07:00
'''
delete specific user by uid
'''
2016-06-11 13:58:49 +07:00
command = const.CMD_DELETE_USER
2018-04-07 07:01:35 +07:00
#uid = chr(uid % 256) + chr(uid >> 8)
#command_string = pack('2s', uid)
command_string = pack('h', uid)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2016-06-11 13:58:49 +07:00
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't delete user")
2018-03-28 06:32:56 +07:00
def get_user_template(self, uid, temp_id):
command = 88 # comando secreto!!!
command_string = pack('hb', uid, temp_id)
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2018-03-28 06:32:56 +07:00
data = []
if not cmd_response.get('status'):
2018-04-13 06:10:01 +07:00
return None #("can't get user template")
2018-03-28 06:32:56 +07:00
#else
if cmd_response.get('code') == const.CMD_PREPARE_DATA:
bytes = self.__get_data_size() #TODO: check with size
size = bytes
while True: #limitado por respuesta no por tamaƱo
data_recv = self.__sock.recv(1032)
response = unpack('HHHH', data_recv[:8])[0]
#print "# %s packet response is: %s" % (pac, response)
if response == const.CMD_DATA:
data.append(data_recv[8:]) #header turncated
bytes -= 1024
elif response == const.CMD_ACK_OK:
break #without problem.
else:
#truncado! continuar?
#print "broken!"
break
#print "still needs %s" % bytes
data = ''.join(data)
#uid 32 fing 03, starts with 4d-9b-53-53-32-31
2018-04-07 07:01:35 +07:00
return Finger(size, uid, temp_id, 1, data)
def get_templates(self):
2018-03-28 06:32:56 +07:00
""" return array of all fingers """
templates = []
templatedata, size = self.read_with_buffer(const.CMD_DB_RRQ, const.FCT_FINGERTMP)
if size < 4:
print "WRN: no user data" # debug
return []
total_size = unpack('i', templatedata[0:4])[0]
templatedata = templatedata[4:] #total size not used
if self.__firmware == 6: #tested!
while total_size:
size, uid, fid, valid = unpack('HHbb',templatedata[:6])
2018-03-28 06:32:56 +07:00
template = unpack("%is" % (size-6), templatedata[6:size])[0]
finger = Finger(size, uid, fid, valid, template)
print finger # test
templates.append(finger)
templatedata = templatedata[size:]
total_size -= size
else: # TODO: test!!!
while total_size:
size, uid, fid, valid = unpack('HHbb',templatedata[:6])
2018-03-28 06:32:56 +07:00
template = unpack("%is" % (size-6), templatedata[6:size])[0]
2018-04-07 07:01:35 +07:00
finger = Finger(size - 6, uid, fid, valid, template)
print finger # test
templates.append(finger)
templatedata = templatedata[(size):]
total_size -= size
return templates
2016-05-25 01:32:06 +07:00
2016-05-25 15:35:45 +07:00
def get_users(self):
""" return all user """
users = []
userdata, size = self.read_with_buffer(const.CMD_USERTEMP_RRQ, const.FCT_USER)
2018-04-18 07:15:59 +07:00
#print "user size %i" % size
if size < 4:
print "WRN: no user data" # debug
return []
userdata = userdata[4:] #total size not used
if self.__firmware == 6:
while len(userdata) >= 28:
uid, privilege, password, name, card, group_id, timezone, user_id = unpack('HB5s8s5sBhI',userdata.ljust(28)[:28])
password = unicode(password.split('\x00')[0], errors='ignore')
name = unicode(name.split('\x00')[0], errors='ignore').strip()
card = unpack('Q', card.ljust(8,'\x00'))[0] #or hex value?
group_id = str(group_id)
user_id = str(user_id)
#TODO: check card value and find in ver8
if not name:
name = "NN-%s" % user_id
2018-04-07 07:01:35 +07:00
user = User(uid, name, privilege, password, group_id, user_id, card)
users.append(user)
print "[6]user:",uid, privilege, password, name, card, group_id, timezone, user_id
userdata = userdata[28:]
else:
while len(userdata) >= 72:
uid, privilege, password, name, separator, group_id, user_id = unpack('Hc8s24s4sx7sx24s', userdata.ljust(72)[:72])
#u1 = int(uid[0].encode("hex"), 16)
#u2 = int(uid[1].encode("hex"), 16)
#uid = u1 + (u2 * 256)
privilege = int(privilege.encode("hex"), 16)
password = unicode(password.split('\x00')[0], errors='ignore')
name = unicode(name.split('\x00')[0], errors='ignore').strip()
group_id = unicode(group_id.split('\x00')[0], errors='ignore').strip()
user_id = unicode(user_id.split('\x00')[0], errors='ignore')
card = int(unpack('I', separator)[0])
if not name:
name = "NN-%s" % user_id
user = User(uid, name, privilege, password, group_id, user_id, card)
users.append(user)
userdata = userdata[72:]
return users
def _get_users(self):
2016-05-27 08:54:17 +07:00
'''
2016-06-15 19:15:12 +07:00
return all user
2016-05-27 08:54:17 +07:00
'''
2016-05-25 15:35:45 +07:00
command = const.CMD_USERTEMP_RRQ
2016-07-04 13:55:29 +07:00
command_string = chr(const.FCT_USER)
2016-05-29 20:27:01 +07:00
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-05-29 20:27:01 +07:00
users = []
2017-12-28 23:01:12 +07:00
pac = 0
2016-05-29 20:27:01 +07:00
if cmd_response.get('status'):
if cmd_response.get('code') == const.CMD_PREPARE_DATA:
bytes = self.__get_data_size()
userdata = []
2017-12-28 23:01:12 +07:00
while True:
2016-05-25 15:35:45 +07:00
data_recv = self.__sock.recv(1032)
2017-12-10 00:39:16 +07:00
response = unpack('HHHH', data_recv[:8])[0]
2017-12-28 23:01:12 +07:00
if response == const.CMD_DATA:
pac += 1
userdata.append(data_recv[8:]) #header turncated
bytes -= 1024
elif response == const.CMD_ACK_OK:
break #without problem.
else:
#truncado! continuar?
#print "broken! with %s" % response
#print "user still needs %s" % bytes
2017-12-28 23:01:12 +07:00
break
2016-05-25 16:28:55 +07:00
if response == const.CMD_ACK_OK:
2016-05-29 20:27:01 +07:00
if userdata:
2016-05-25 17:38:59 +07:00
# The first 4 bytes don't seem to be related to the user
userdata = ''.join(userdata)
2017-12-28 23:01:12 +07:00
userdata = userdata[4:]
if self.__firmware == 6:
while len(userdata) >= 28:
uid, privilege, password, name, card, group_id, timezone, user_id = unpack('HB5s8s5sBhI',userdata.ljust(28)[:28])
password = unicode(password.split('\x00')[0], errors='ignore')
name = unicode(name.split('\x00')[0], errors='ignore').strip()
card = unpack('Q', card.ljust(8,'\x00'))[0] #or hex value?
group_id = str(group_id)
user_id = str(user_id)
#TODO: check card value and find in ver8
if not name:
name = "NN-%s" % user_id
user = User(uid, name, privilege, password, group_id, user_id)
users.append(user)
print "[6]user:",uid, privilege, password, name, card, group_id, timezone, user_id
userdata = userdata[28:]
else:
while len(userdata) >= 72:
uid, privilege, password, name, sparator, group_id, user_id = unpack('Hc8s28sc7sx24s', userdata.ljust(72)[:72])
#u1 = int(uid[0].encode("hex"), 16)
#u2 = int(uid[1].encode("hex"), 16)
#uid = u1 + (u2 * 256)
privilege = int(privilege.encode("hex"), 16)
password = unicode(password.split('\x00')[0], errors='ignore')
name = unicode(name.split('\x00')[0], errors='ignore').strip()
group_id = unicode(group_id.split('\x00')[0], errors='ignore').strip()
user_id = unicode(user_id.split('\x00')[0], errors='ignore')
if not name:
name = "NN-%s" % user_id
user = User(uid, name, privilege, password, group_id, user_id)
users.append(user)
userdata = userdata[72:]
self.free_data()
2016-05-25 16:28:55 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't _get user")
2016-05-29 20:27:01 +07:00
return users
2016-05-25 21:35:50 +07:00
2016-05-26 12:45:28 +07:00
def cancel_capture(self):
2016-05-27 08:54:17 +07:00
'''
cancel capturing finger
'''
2016-05-26 12:45:28 +07:00
command = const.CMD_CANCELCAPTURE
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
return bool(cmd_response.get('status'))
2016-05-26 12:45:28 +07:00
def verify_user(self):
2016-05-27 08:54:17 +07:00
'''
2018-04-13 06:10:01 +07:00
start verify finger mode (after capture)
2016-05-27 08:54:17 +07:00
'''
2016-05-26 12:45:28 +07:00
command = const.CMD_STARTVERIFY
# uid = chr(uid % 256) + chr(uid >> 8)
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2018-04-07 07:01:35 +07:00
if cmd_response.get('status'):
return True
else:
raise ZKErrorResponse("Cant Verify")
2018-04-12 06:38:37 +07:00
def reg_event(self, flags):
""" reg events, """
command = const.CMD_REG_EVENT
command_string = pack ("I", flags)
cmd_response = self.__send_command(command, command_string)
if not cmd_response.get('status'):
raise ZKErrorResponse("cant' reg events %i" % flags)
2016-05-26 12:45:28 +07:00
2018-03-28 06:32:56 +07:00
def enroll_user(self, uid, temp_id=0):
2016-05-27 08:54:17 +07:00
'''
start enroll user
'''
2016-05-26 12:45:28 +07:00
command = const.CMD_STARTENROLL
2018-03-28 06:32:56 +07:00
command_string = pack('hhb', uid, 0, temp_id) # el 0 es misterio
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2018-04-07 07:01:35 +07:00
if not cmd_response.get('status'):
raise ZKErrorResponse("Cant Enroll user #%i [%i]" %(uid, temp_id))
print "enroll", cmd_response
#retorna rapido toca esperar un reg event
2018-04-12 06:38:37 +07:00
attempts = 3
while attempts:
print "A:%i esperando primer regevent" % attempts
data_recv = self.__sock.recv(1032) # timeout? tarda bastante...
self.__ack_ok()
print (data_recv).encode('hex')
if len(data_recv) > 8: #not empty
res = unpack("H", data_recv.ljust(16,"\x00")[8:10])[0]
print "res", res
if res == 6:
print ("posible timeout")
return False
print "A:%i esperando 2do regevent" % attempts
data_recv = self.__sock.recv(1032) # timeout? tarda bastante...
self.__ack_ok()
print (data_recv).encode('hex')
if len(data_recv) > 8: #not empty
res = unpack("H", data_recv.ljust(16,"\x00")[8:10])[0]
print "res", res
if res == 6:
print ("posible timeout")
return False
elif res == 0x64:
print ("ok, continue?")
attempts -= 1
print "esperando 3er regevent"
2018-04-07 07:01:35 +07:00
data_recv = self.__sock.recv(1032) # timeout? tarda bastante...
2018-04-12 06:38:37 +07:00
self.__ack_ok()
print (data_recv).encode('hex')
res = unpack("H", data_recv.ljust(16,"\x00")[8:10])[0]
if res == 4:
print "registro Fallido"
self.cancel_capture()
return False
if res == 0:
size = unpack("H", data_recv.ljust(16,"\x00")[10:12])[0]
pos = unpack("H", data_recv.ljust(16,"\x00")[12:14])[0]
print "enroll ok", size, pos
return True
2016-05-26 12:45:28 +07:00
2016-06-10 11:27:49 +07:00
def clear_data(self):
2016-05-25 21:35:50 +07:00
'''
2016-06-10 11:27:49 +07:00
clear all data (include: user, attendance report, finger database )
2016-05-25 21:35:50 +07:00
'''
2016-06-10 11:27:49 +07:00
command = const.CMD_CLEAR_DATA
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command)
2016-06-10 11:27:49 +07:00
if cmd_response.get('status'):
2016-06-15 19:41:59 +07:00
return True
2016-06-10 11:27:49 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't clear data")
2016-05-25 21:35:50 +07:00
def __read_chunk(self, start, size):
""" read a chunk from buffer """
command = 1504 #CMD_READ_BUFFER
command_string = pack('<ii', start, size)
2018-04-12 06:38:37 +07:00
response_size = 1024 + 8
cmd_response = self.__send_command(command, command_string, response_size)
if not cmd_response.get('status'):
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't read chunk %i:[%i]" % (start, size))
#else
2018-04-12 06:38:37 +07:00
if cmd_response.get('code') == const.CMD_DATA: # less than 1024!!!
return self.__data_recv[8:]
if cmd_response.get('code') == const.CMD_PREPARE_DATA:
2018-04-12 06:38:37 +07:00
data = []
bytes = self.__get_data_size() #TODO: check with size
while True: #limitado por respuesta no por tamaƱo
data_recv = self.__sock.recv(1032)
response = unpack('HHHH', data_recv[:8])[0]
#print "# %s packet response is: %s" % (pac, response)
if response == const.CMD_DATA:
data.append(data_recv[8:]) #header turncated
bytes -= 1024
elif response == const.CMD_ACK_OK:
break #without problem.
else:
#truncado! continuar?
#print "broken!"
break
#print "still needs %s" % bytes
return ''.join(data)
def read_with_buffer(self, command, fct=0 ,ext=0):
""" Test read info with buffered command (ZK6: 1503) """
2018-04-07 07:01:35 +07:00
MAX_CHUNK = 16 * 1024
command_string = pack('<bhii', 1, command, fct, ext)
#print "rwb cs", command_string
response_size = 1024
data = []
start = 0
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(1503, command_string, response_size)
if not cmd_response.get('status'):
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("RWB Not supported")
size = unpack('I', self.__data_recv[9:13])[0] # extra info???
#print "size fill be %i" % size
remain = size % MAX_CHUNK
packets = (size-remain) / MAX_CHUNK # should be size /16k
for _wlk in range(packets):
data.append(self.__read_chunk(start,MAX_CHUNK))
start += MAX_CHUNK
if remain:
data.append(self.__read_chunk(start, remain))
start += remain # Debug
self.free_data()
#print "_read w/chunk %i bytes" % start
return ''.join(data), start
2016-05-25 21:35:50 +07:00
def get_attendance(self):
""" return attendance record """
attendances = []
attendance_data, size = self.read_with_buffer(const.CMD_ATTLOG_RRQ)
if size < 4:
print "WRN: no attendance data" # debug
return []
attendance_data = attendance_data[4:] #total size not used
if self.__firmware == 6:
while len(attendance_data) >= 8:
uid, status, timestamp = unpack('HH4s', attendance_data.ljust(8)[:8])
2018-04-13 06:10:01 +07:00
attendance_data = attendance_data[8:]
if uid == 0: #(on zk it's 16 bytes, extrauid 0, status 255, timestamp 0x0)
continue # probably
user_id = str(uid) #TODO revisar posibles valores cruzar con userdata
timestamp = self.__decode_time(timestamp)
attendance = Attendance(uid, user_id, timestamp, status)
attendances.append(attendance)
else:
while len(attendance_data) >= 40:
uid, user_id, sparator, timestamp, status, space = unpack('H24sc4sc8s', attendance_data.ljust(40)[:40])
user_id = user_id.split('\x00')[0]
timestamp = self.__decode_time(timestamp)
status = int(status.encode("hex"), 16)
attendance = Attendance(uid, user_id, timestamp, status)
attendances.append(attendance)
attendance_data = attendance_data[40:]
return attendances
def _get_attendance(self):
2016-06-15 19:15:12 +07:00
'''
return all attendance record
'''
2016-06-12 15:46:20 +07:00
command = const.CMD_ATTLOG_RRQ
command_string = ''
response_size = 1024
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string, response_size)
2016-06-12 15:46:20 +07:00
attendances = []
if cmd_response.get('status'):
if cmd_response.get('code') == const.CMD_PREPARE_DATA:
bytes = self.__get_data_size()
attendance_data = []
2017-12-10 00:39:16 +07:00
pac = 1
2017-12-28 23:01:12 +07:00
while True: #limitado por respuesta no por tamaƱo
2016-06-12 15:46:20 +07:00
data_recv = self.__sock.recv(1032)
2017-12-10 00:39:16 +07:00
response = unpack('HHHH', data_recv[:8])[0]
2017-12-28 23:01:12 +07:00
#print "# %s packet response is: %s" % (pac, response)
if response == const.CMD_DATA:
pac += 1
attendance_data.append(data_recv[8:]) #header turncated
bytes -= 1024
elif response == const.CMD_ACK_OK:
break #without problem.
else:
2017-12-10 00:39:16 +07:00
#truncado! continuar?
#print "broken!"
2017-12-10 00:39:16 +07:00
break
#print "still needs %s" % bytes
2016-06-12 15:46:20 +07:00
if response == const.CMD_ACK_OK:
if attendance_data:
attendance_data = ''.join(attendance_data)
2017-12-28 23:01:12 +07:00
attendance_data = attendance_data[4:]
if self.__firmware == 6:
while len(attendance_data) >= 8:
uid, status, timestamp = unpack('HH4s', attendance_data.ljust(8)[:8])
2018-04-13 06:10:01 +07:00
attendance_data = attendance_data[8:]
if uid == 0: #(on zk it's 16 bytes, extrauid 0, status 255, timestamp 0x0)
continue # probably
user_id = str(uid) #TODO revisar posibles valores cruzar con userdata
timestamp = self.__decode_time(timestamp)
attendance = Attendance(uid, user_id, timestamp, status)
attendances.append(attendance)
else:
while len(attendance_data) >= 40:
uid, user_id, sparator, timestamp, status, space = unpack('H24sc4sc8s', attendance_data.ljust(40)[:40])
user_id = user_id.split('\x00')[0]
timestamp = self.__decode_time(timestamp)
status = int(status.encode("hex"), 16)
attendance = Attendance(uid, user_id, timestamp, status)
attendances.append(attendance)
attendance_data = attendance_data[40:]
self.free_data()
2016-06-12 15:46:20 +07:00
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't _get attendance")
2016-06-12 15:46:20 +07:00
return attendances
2016-05-30 09:47:58 +07:00
2016-05-25 21:35:50 +07:00
def clear_attendance(self):
'''
2016-06-15 19:15:12 +07:00
clear all attendance record
2016-05-25 21:35:50 +07:00
'''
2016-06-15 19:15:12 +07:00
command = const.CMD_CLEAR_ATTLOG
2018-04-12 06:38:37 +07:00
cmd_response = self.__send_command(command, command_string)
2016-06-15 19:15:12 +07:00
if cmd_response.get('status'):
return True
else:
2018-04-12 06:38:37 +07:00
raise ZKErrorResponse("can't clear response")