-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathimports.py
More file actions
127 lines (97 loc) · 3.54 KB
/
imports.py
File metadata and controls
127 lines (97 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
'''External DLL imports used for implementing Python EPICS device support.
'''
import sys
from ctypes import *
# Use the libs with the right windows flags
from epicscorelibs.ioc import dbCore, Com
from . import _extension
# These are in the extension
def get_DBF_values():
'''Return {DBF_name: DBF_int_value} mapping'''
return _extension.get_DBF_values()
def get_field_offsets(record_type):
'''Return {field_name: (offset, size, field_type)}'''
return _extension.get_field_offsets(record_type)
def db_put_field_process(name, dbr_type, pbuffer, length, process):
'''Put field where pbuffer is void* pointer, conditionally processing
the record. Returns None.'''
return _extension.db_put_field_process(
name, dbr_type, pbuffer, length, process
)
def db_get_field(name, dbr_type, pbuffer, length):
'''Get field where pbuffer is void* pointer. Returns None.'''
return _extension.db_get_field(name, dbr_type, pbuffer, length)
def install_pv_logging(acf_file):
'''Install pv logging'''
_extension.install_pv_logging(acf_file)
def create_callback_capsule():
return _extension.create_callback_capsule()
def signal_processing_complete(record, callback):
'''Signal that asynchronous record processing has completed'''
_extension.signal_processing_complete(
record.PRIO,
record.record.value,
callback)
def expect_success(status, function, args):
assert status == 0, 'Expected success'
def expect_true(status, function, args):
assert status, 'Expected True'
# Encode all strings to c_char_p
class auto_encode(c_char_p):
@classmethod
def from_param(cls, value):
if value is None:
return value
else:
return value.encode()
# int registryDeviceSupportAdd(
# const char *name,const struct dset *pdset);
#
# Registers device support.
registryDeviceSupportAdd = dbCore.registryDeviceSupportAdd
registryDeviceSupportAdd.argtypes = (c_char_p, c_void_p)
registryDeviceSupportAdd.errcheck = expect_true
# void scanIoInit(IOSCANPVT *)
# void scanIoRequest(IOSCANPVT *)
#
# Initialise and trigger I/O Intr processing structure.
IOSCANPVT = c_void_p
scanIoInit = dbCore.scanIoInit
scanIoInit.argtypes = (IOSCANPVT,)
scanIoInit.restype = None
scanIoRequest = dbCore.scanIoRequest
scanIoRequest.argtypes = (IOSCANPVT,)
scanIoRequest.restype = None
dbLoadDatabase = dbCore.dbLoadDatabase
dbLoadDatabase.argtypes = (auto_encode, auto_encode, auto_encode)
dbLoadDatabase.errcheck = expect_success
callbackSetQueueSize = dbCore.callbackSetQueueSize
callbackSetQueueSize.argtypes = (c_int,)
callbackSetQueueSize.errcheck = expect_success
# unsigned short recGblResetAlarms(void *precord)
#
# Raises event processing if any alarm status has changed, and resets NSTA
# and NSEV fields for further processing.
recGblResetAlarms = dbCore.recGblResetAlarms
recGblResetAlarms.argtypes = (c_void_p,)
recGblResetAlarms.restype = c_short
iocInit = dbCore.iocInit
iocInit.argtypes = ()
epicsExit = Com.epicsExit
epicsExit.argtypes = (c_int,)
epicsExitCallAtExits = Com.epicsExitCallAtExits
epicsExitCallAtExits.argtypes = ()
epicsExitCallAtExits.restype = None
registerRecordDeviceDriver = dbCore.registerAllRecordDeviceDrivers
registerRecordDeviceDriver.argtypes = (c_void_p,)
registerRecordDeviceDriver.errcheck = expect_success
__all__ = [
'get_field_offsets',
'create_callback_capsule',
'signal_processing_complete',
'registryDeviceSupportAdd',
'IOSCANPVT', 'scanIoRequest', 'scanIoInit',
'dbLoadDatabase',
'callbackSetQueueSize',
'recGblResetAlarms',
]