python中dtype的使用规范_Python numpy.dtype() 使用实例

python中dtype的使用规范_Python numpy.dtype() 使用实例Thefollowingarecodeexamplesforshowinghowtouse.TheyareextractedfromopensourcePythonprojects.Youcanvoteuptheexamplesyoulikeorvotedowntheexmaplesyoudon’tlike.Youcanal…

大家好,又见面了,我是你们的朋友全栈君。

The following are code examples for showing how to use . They are extracted from open source Python projects. You can vote up the examples you like or vote down the exmaples you don’t like. You can also save this page to your account.

Example 1

def extract_images(filename):

“””Extract the images into a 4D uint8 numpy array [index, y, x, depth].”””

print(‘Extracting’, filename)

with gzip.open(filename) as bytestream:

magic = _read32(bytestream)

if magic != 2051:

raise ValueError(

‘Invalid magic number %d in MNIST image file: %s’ %

(magic, filename))

num_images = _read32(bytestream)

rows = _read32(bytestream)

cols = _read32(bytestream)

buf = bytestream.read(rows * cols * num_images)

data = numpy.frombuffer(buf, dtype=numpy.uint8)

data = data.reshape(num_images, rows, cols, 1)

return data

Example 2

def gl_init(self,array_table):

self.gl_hide = False

self.gl_vertex_array = gl.VertexArray()

glBindVertexArray(self.gl_vertex_array)

self.gl_vertex_buffer = gl.Buffer()

glBindBuffer(GL_ARRAY_BUFFER,self.gl_vertex_buffer)

self.gl_element_count = 3*gl_count_triangles(self)

self.gl_element_buffer = gl.Buffer()

glBindBuffer(GL_ELEMENT_ARRAY_BUFFER,self.gl_element_buffer)

vertex_type = numpy.dtype([array_table[attribute].field() for attribute in self.attributes])

vertex_count = sum(len(primitive.vertices) for primitive in self.primitives)

vertex_array = numpy.empty(vertex_count,vertex_type)

for attribute in self.attributes:

array_table[attribute].load(self,vertex_array)

vertex_array,element_map = numpy.unique(vertex_array,return_inverse=True)

element_array = gl_create_element_array(self,element_map,self.gl_element_count)

glBufferData(GL_ARRAY_BUFFER,vertex_array.nbytes,vertex_array,GL_STATIC_DRAW)

glBufferData(GL_ELEMENT_ARRAY_BUFFER,element_array.nbytes,element_array,GL_STATIC_DRAW)

Example 3

def extract_images(filename):

“””Extract the images into a 4D uint8 numpy array [index, y, x, depth].”””

print(‘Extracting’, filename)

with gzip.open(filename) as bytestream:

magic = _read32(bytestream)

if magic != 2051:

raise ValueError(

‘Invalid magic number %d in MNIST image file: %s’ %

(magic, filename))

num_images = _read32(bytestream)

rows = _read32(bytestream)

cols = _read32(bytestream)

buf = bytestream.read(rows * cols * num_images)

data = numpy.frombuffer(buf, dtype=numpy.uint8)

data = data.reshape(num_images, rows, cols, 1)

return data

Example 4

def __keytransform__(self, key):

if isinstance(key[0], np.ndarray):

shape = key[0].shape

dtype = key[0].dtype

i = key[1]

zero = True if len(key) == 2 else key[2]

elif isinstance(key[0], tuple):

if len(key) == 3:

shape, dtype, i = key

zero = True

elif len(key) == 4:

shape, dtype, i, zero = key

else:

raise TypeError(“Wrong type of key for work array”)

assert isinstance(zero, bool)

assert isinstance(i, int)

self.fillzero = zero

return (shape, np.dtype(dtype), i)

Example 5

def accumulate_strings(values, name=”strings”):

“””Accumulates strings into a vector.

Args:

values: A 1-d string tensor that contains values to add to the accumulator.

Returns:

A tuple (value_tensor, update_op).

“””

tf.assert_type(values, tf.string)

strings = tf.Variable(

name=name,

initial_value=[],

dtype=tf.string,

trainable=False,

collections=[],

validate_shape=True)

value_tensor = tf.identity(strings)

update_op = tf.assign(

ref=strings, value=tf.concat([strings, values], 0), validate_shape=False)

return value_tensor, update_op

Example 6

def test_expect_dtypes_with_tuple(self):

allowed_dtypes = (dtype(‘datetime64[ns]’), dtype(‘float’))

@expect_dtypes(a=allowed_dtypes)

def foo(a, b):

return a, b

for d in allowed_dtypes:

good_a = arange(3).astype(d)

good_b = object()

ret_a, ret_b = foo(good_a, good_b)

self.assertIs(good_a, ret_a)

self.assertIs(good_b, ret_b)

with self.assertRaises(TypeError) as e:

foo(arange(3, dtype=’uint32′), object())

expected_message = (

“{qualname}() expected a value with dtype ‘datetime64[ns]’ “

“or ‘float64’ for argument ‘a’, but got ‘uint32’ instead.”

).format(qualname=qualname(foo))

self.assertEqual(e.exception.args[0], expected_message)

Example 7

def _classify_gems(counts0, counts1):

“”” Infer number of distinct transcriptomes present in each GEM (1 or 2) and

report cr_constants.GEM_CLASS_GENOME0 for a single cell w/ transcriptome 0,

report cr_constants.GEM_CLASS_GENOME1 for a single cell w/ transcriptome 1,

report cr_constants.GEM_CLASS_MULTIPLET for multiple transcriptomes “””

# Assumes that most of the GEMs are single-cell; model counts independently

thresh0, thresh1 = [cr_constants.DEFAULT_MULTIPLET_THRESHOLD] * 2

if sum(counts0 > counts1) >= 1 and sum(counts1 > counts0) >= 1:

thresh0 = np.percentile(counts0[counts0 > counts1], cr_constants.MULTIPLET_PROB_THRESHOLD)

thresh1 = np.percentile(counts1[counts1 > counts0], cr_constants.MULTIPLET_PROB_THRESHOLD)

doublet = np.logical_and(counts0 >= thresh0, counts1 >= thresh1)

dtype = np.dtype(‘|S%d’ % max(len(cls) for cls in cr_constants.GEM_CLASSES))

result = np.where(doublet, cr_constants.GEM_CLASS_MULTIPLET, cr_constants.GEM_CLASS_GENOME0).astype(dtype)

result[np.logical_and(np.logical_not(result == cr_constants.GEM_CLASS_MULTIPLET), counts1 > counts0)] = cr_constants.GEM_CLASS_GENOME1

return result

Example 8

def widen_cat_column(old_ds, new_type):

name = old_ds.name

tmp_name = “__tmp_” + old_ds.name

grp = old_ds.parent

ds = grp.create_dataset(tmp_name,

data = old_ds[:],

shape = old_ds.shape,

maxshape = (None,),

dtype = new_type,

compression = COMPRESSION,

shuffle = True,

chunks = (CHUNK_SIZE,))

del grp[name]

grp.move(tmp_name, name)

return ds

Example 9

def create_levels(ds, levels):

# Create a dataset in the LEVEL_GROUP

# and store as native numpy / h5py types

level_grp = ds.file.get(LEVEL_GROUP)

if level_grp is None:

# Create a LEVEL_GROUP

level_grp = ds.file.create_group(LEVEL_GROUP)

ds_name = ds.name.split(“/”)[-1]

dt = h5py.special_dtype(vlen=str)

level_grp.create_dataset(ds_name,

shape = [len(levels)],

maxshape = (None,),

dtype = dt,

data = levels,

compression = COMPRESSION,

chunks = (CHUNK_SIZE,))

Example 10

def reg2bin_vector(begin, end):

”’Vectorized tabix reg2bin — much faster than reg2bin”’

result = np.zeros(begin.shape)

# Entries filled

done = np.zeros(begin.shape, dtype=np.bool)

for (bits, bins) in rev_bit_bins:

begin_shift = begin >> bits

new_done = (begin >> bits) == (end >> bits)

mask = np.logical_and(new_done, np.logical_not(done))

offset = ((1 << (29 – bits)) – 1) / 7

result[mask] = offset + begin_shift[mask]

done = new_done

return result.astype(np.int32)

Example 11

def flip_code(code):

if isinstance(code, (numpy.dtype,type)):

# since several things map to complex64 we must carefully select

# the opposite that is an exact match (ticket 1518)

if code == numpy.int8:

return gdalconst.GDT_Byte

if code == numpy.complex64:

return gdalconst.GDT_CFloat32

for key, value in codes.items():

if value == code:

return key

return None

else:

try:

return codes[code]

except KeyError:

return None

Example 12

def make2d(array, cols=None, dtype=None):

”’

Make a 2D array from an array of arrays. The `cols’ and `dtype’

arguments can be omitted if the array is not empty.

”’

if (cols is None or dtype is None) and not len(array):

raise RuntimeError(“cols and dtype must be specified for empty “

“array”)

if cols is None:

cols = len(array[0])

if dtype is None:

dtype = array[0].dtype

return _np.fromiter(array, [(‘_’, dtype, (cols,))],

count=len(array))[‘_’]

Example 13

def _read(self, stream, text, byte_order):

”’

Read the actual data from a PLY file.

”’

if text:

self._read_txt(stream)

else:

if self._have_list:

# There are list properties, so a simple load is

# impossible.

self._read_bin(stream, byte_order)

else:

# There are no list properties, so loading the data is

# much more straightforward.

self._data = _np.fromfile(stream,

self.dtype(byte_order),

self.count)

if len(self._data) < self.count:

k = len(self._data)

del self._data

raise PlyParseError(“early end-of-file”, self, k)

self._check_sanity()

Example 14

def _read_bin(self, stream, byte_order):

”’

Load a PLY element from a binary PLY file. The element may

contain list properties.

”’

self._data = _np.empty(self.count, dtype=self.dtype(byte_order))

for k in _range(self.count):

for prop in self.properties:

try:

self._data[prop.name][k] = \

prop._read_bin(stream, byte_order)

except StopIteration:

raise PlyParseError(“early end-of-file”,

self, k, prop)

Example 15

def _merge_all(parts, dtype):

if len(parts) == 1:

return parts[0]

else:

nparts = []

for i in xrange(0, len(parts), 2):

if i+1 < len(parts):

npart = numpy.empty((len(parts[i])+len(parts[i+1]), 2), dtype)

merge_elements = index_merge(parts[i], parts[i+1], npart)

if merge_elements != len(npart):

npart = npart[:merge_elements]

nparts.append(npart)

else:

nparts.append(parts[i])

del parts

return _merge_all(nparts, dtype)

Example 16

def __init__(self, buf, offset = 0):

# Accelerate class attributes

self._encode = self.encode

self._dtype = self.dtype

self._xxh = self.xxh

# Initialize buffer

if offset:

self._buf = self._likebuf = buffer(buf, offset)

else:

self._buf = buf

self._likebuf = _likebuffer(buf)

# Parse header and map index

self.index_elements, self.index_offset = self._Header.unpack_from(self._buf, 0)

self.index = numpy.ndarray(buffer = self._buf,

offset = self.index_offset,

dtype = self.dtype,

shape = (self.index_elements, 3))

Example 17

def test_rescaleData():

dtypes = map(np.dtype, (‘ubyte’, ‘uint16’, ‘byte’, ‘int16’, ‘int’, ‘float’))

for dtype1 in dtypes:

for dtype2 in dtypes:

data = (np.random.random(size=10) * 2**32 – 2**31).astype(dtype1)

for scale, offset in [(10, 0), (10., 0.), (1, -50), (0.2, 0.5), (0.001, 0)]:

if dtype2.kind in ‘iu’:

lim = np.iinfo(dtype2)

lim = lim.min, lim.max

else:

lim = (-np.inf, np.inf)

s1 = np.clip(float(scale) * (data-float(offset)), *lim).astype(dtype2)

s2 = pg.rescaleData(data, scale, offset, dtype2)

assert s1.dtype == s2.dtype

if dtype2.kind in ‘iu’:

assert np.all(s1 == s2)

else:

assert np.allclose(s1, s2)

Example 18

def solve3DTransform(points1, points2):

“””

Find a 3D transformation matrix that maps points1 onto points2.

Points must be specified as either lists of 4 Vectors or

(4, 3) arrays.

“””

import numpy.linalg

pts = []

for inp in (points1, points2):

if isinstance(inp, np.ndarray):

A = np.empty((4,4), dtype=float)

A[:,:3] = inp[:,:3]

A[:,3] = 1.0

else:

A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])

pts.append(A)

## solve 3 sets of linear equations to determine transformation matrix elements

matrix = np.zeros((4,4))

for i in range(3):

## solve Ax = B; x is one row of the desired transformation matrix

matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])

return matrix

Example 19

def __init__(self, index, channel_names=None, channel_ids=None,

name=None, description=None, file_origin=None,

coordinates=None, **annotations):

”’

Initialize a new :class:`ChannelIndex` instance.

”’

# Inherited initialization

# Sets universally recommended attributes, and places all others

# in annotations

super(ChannelIndex, self).__init__(name=name,

description=description,

file_origin=file_origin,

**annotations)

# Defaults

if channel_names is None:

channel_names = np.array([], dtype=’S’)

if channel_ids is None:

channel_ids = np.array([], dtype=’i’)

# Store recommended attributes

self.channel_names = np.array(channel_names)

self.channel_ids = np.array(channel_ids)

self.index = np.array(index)

self.coordinates = coordinates

Example 20

def load_bytes(self, data_blocks, dtype=’

“””

Return list of bytes contained

in the specified set of blocks.

NB : load all data as files cannot exceed 4Gb

find later other solutions to spare memory.

“””

chunks = list()

raw = ”

# keep only data blocks having

# a size greater than zero

blocks = [k for k in data_blocks if k.size > 0]

for data_block in blocks :

self.file.seek(data_block.start)

raw = self.file.read(data_block.size)[0:expected_size]

databytes = np.frombuffer(raw, dtype=dtype)

chunks.append(databytes)

# concatenate all chunks and return

# the specified slice

if len(chunks)>0 :

databytes = np.concatenate(chunks)

return databytes[start:end]

else :

return np.array([])

Example 21

def load_channel_data(self, ep, ch):

“””

Return a numpy array containing the

list of bytes corresponding to the

specified episode and channel.

“””

#memorise the sample size and symbol

sample_size = self.sample_size(ep, ch)

sample_symbol = self.sample_symbol(ep, ch)

#create a bit mask to define which

#sample to keep from the file

bit_mask = self.create_bit_mask(ep, ch)

#load all bytes contained in an episode

data_blocks = self.get_data_blocks(ep)

databytes = self.load_bytes(data_blocks)

raw = self.filter_bytes(databytes, bit_mask)

#reshape bytes from the sample size

dt = np.dtype(numpy_map[sample_symbol])

dt.newbyteorder(‘

return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)

Example 22

def get_signal_data(self, ep, ch):

“””

Return a numpy array containing all samples of a

signal, acquired on an Elphy analog channel, formatted

as a list of (time, value) tuples.

“””

#get data from the file

y_data = self.load_encoded_data(ep, ch)

x_data = np.arange(0, len(y_data))

#create a recarray

data = np.recarray(len(y_data), dtype=[(‘x’, b_float), (‘y’, b_float)])

#put in the recarray the scaled data

x_factors = self.x_scale_factors(ep, ch)

y_factors = self.y_scale_factors(ep, ch)

data[‘x’] = x_factors.scale(x_data)

data[‘y’] = y_factors.scale(y_data)

return data

Example 23

def get_tag_data(self, ep, tag_ch):

“””

Return a numpy array containing all samples of a

signal, acquired on an Elphy tag channel, formatted

as a list of (time, value) tuples.

“””

#get data from the file

y_data = self.load_encoded_tags(ep, tag_ch)

x_data = np.arange(0, len(y_data))

#create a recarray

data = np.recarray(len(y_data), dtype=[(‘x’, b_float), (‘y’, b_int)])

#put in the recarray the scaled data

factors = self.x_tag_scale_factors(ep)

data[‘x’] = factors.scale(x_data)

data[‘y’] = y_data

return data

Example 24

def get_event(self, ep, ch, marked_ks):

“””

Return a :class:`ElphyEvent` which is a

descriptor of the specified event channel.

“””

assert ep in range(1, self.n_episodes + 1)

assert ch in range(1, self.n_channels + 1)

# find the event channel number

evt_channel = np.where(marked_ks == -1)[0][0]

assert evt_channel in range(1, self.n_events(ep) + 1)

block = self.episode_block(ep)

ep_blocks = self.get_blocks_stored_in_episode(ep)

evt_blocks = [k for k in ep_blocks if k.identifier == ‘REVT’]

n_events = np.sum([k.n_events[evt_channel – 1] for k in evt_blocks], dtype=int)

x_unit = block.ep_block.x_unit

return ElphyEvent(self, ep, evt_channel, x_unit, n_events, ch_number=ch)

Example 25

def load_encoded_events(self, episode, evt_channel, identifier):

“””

Return times stored as a 4-bytes integer

in the specified event channel.

“””

data_blocks = self.group_blocks_of_type(episode, identifier)

ep_blocks = self.get_blocks_stored_in_episode(episode)

evt_blocks = [k for k in ep_blocks if k.identifier == identifier]

#compute events on each channel

n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)

pre_events = np.sum(n_events[0:evt_channel – 1], dtype=int)

start = pre_events

end = start + n_events[evt_channel – 1]

expected_size = 4 * np.sum(n_events, dtype=int)

return self.load_bytes(data_blocks, dtype=’

Example 26

def load_encoded_spikes(self, episode, evt_channel, identifier):

“””

Return times stored as a 4-bytes integer

in the specified spike channel.

NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.

These additiona bytes are appended trailing the times.

“””

# to load the requested spikes for the specified episode and event channel:

# get all the elphy blocks having as identifier ‘RSPK’ (or whatever)

all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]

rspk_block = all_rspk_blocks[episode-1]

# RDATA(h?dI) REVT(NbVeV:I, NbEv:256I … spike data are 4byte integers

rspk_header = 4*( rspk_block.size – rspk_block.data_size-2 + len(rspk_block.n_events))

pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)

# the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)

start = rspk_header + (4*pre_events) + pre_events

end = start + 4*rspk_block.n_events[evt_channel-1]

raw = self.load_bytes( [rspk_block], dtype=’

# re-encoding after reading byte by byte

res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype=’

res.sort() # sometimes timings are not sorted

#print “load_encoded_data() – spikes:”,res

return res

Example 27

def get_waveform_data(self, episode, electrode_id):

“””

Return waveforms corresponding to the specified

spike channel. This function is triggered when the

“waveforms“ property of an :class:`Spike` descriptor

instance is accessed.

“””

block = self.episode_block(episode)

times, databytes = self.load_encoded_waveforms(episode, electrode_id)

n_events, = databytes.shape

wf_samples = databytes[‘waveform’].shape[1]

dtype = [

(‘time’, float),

(‘electrode_id’, int),

(‘unit_id’, int),

(‘waveform’, float, (wf_samples, 2))

]

data = np.empty(n_events, dtype=dtype)

data[‘electrode_id’] = databytes[‘channel_id’][:, 0]

data[‘unit_id’] = databytes[‘unit_id’][:, 0]

data[‘time’] = databytes[‘elphy_time’][:, 0] * block.ep_block.dX

data[‘waveform’][:, :, 0] = times * block.ep_block.dX

data[‘waveform’][:, :, 1] = databytes[‘waveform’] * block.ep_block.dY_wf + block.ep_block.Y0_wf

return data

Example 28

def get_rspk_data(self, spk_channel):

“””

Return times stored as a 4-bytes integer

in the specified event channel.

“””

evt_blocks = self.get_blocks_of_type(‘RSPK’)

#compute events on each channel

n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)

pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!

start = pre_events + (7 + len(n_events))# rspk header

end = start + n_events[spk_channel]

expected_size = 4 * np.sum(n_events, dtype=int) # constant

return self.load_bytes(evt_blocks, dtype=’

# ———————————————————

# factories.py

Example 29

def __mmap_ncs_packet_headers(self, filename):

“””

Memory map of the Neuralynx .ncs file optimized for extraction of

data packet headers

Reading standard dtype improves speed, but timestamps need to be

reconstructed

“””

filesize = getsize(self.sessiondir + sep + filename) # in byte

if filesize > 16384:

data = np.memmap(self.sessiondir + sep + filename,

dtype=’

shape=((filesize – 16384) / 4 / 261, 261),

mode=’r’, offset=16384)

ts = data[:, 0:2]

multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),

axis=0)

timestamps = np.sum(ts * multi, axis=1)

# timestamps = data[:,0] + (data[:,1] *2**32)

header_u4 = data[:, 2:5]

return timestamps, header_u4

else:

return None

Example 30

def __mmap_ncs_packet_timestamps(self, filename):

“””

Memory map of the Neuralynx .ncs file optimized for extraction of

data packet headers

Reading standard dtype improves speed, but timestamps need to be

reconstructed

“””

filesize = getsize(self.sessiondir + sep + filename) # in byte

if filesize > 16384:

data = np.memmap(self.sessiondir + sep + filename,

dtype=’

shape=(int((filesize – 16384) / 4 / 261), 261),

mode=’r’, offset=16384)

ts = data[:, 0:2]

multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),

axis=0)

timestamps = np.sum(ts * multi, axis=1)

# timestamps = data[:,0] + data[:,1]*2**32

return timestamps

else:

return None

Example 31

def __mmap_nev_file(self, filename):

“”” Memory map the Neuralynx .nev file “””

nev_dtype = np.dtype([

(‘reserved’, ‘

(‘system_id’, ‘

(‘data_size’, ‘

(‘timestamp’, ‘

(‘event_id’, ‘

(‘ttl_input’, ‘

(‘crc_check’, ‘

(‘dummy1’, ‘

(‘dummy2’, ‘

(‘extra’, ‘

(‘event_string’, ‘a128’),

])

if getsize(self.sessiondir + sep + filename) > 16384:

return np.memmap(self.sessiondir + sep + filename,

dtype=nev_dtype, mode=’r’, offset=16384)

else:

return None

Example 32

def __extract_nev_file_spec(self):

“””

Extract file specification from an .nsx file

“””

filename = ‘.’.join([self._filenames[‘nsx’], ‘nev’])

# Header structure of files specification 2.2 and higher. For files 2.1

# and lower, the entries ver_major and ver_minor are not supported.

dt0 = [

(‘file_id’, ‘S8’),

(‘ver_major’, ‘uint8’),

(‘ver_minor’, ‘uint8’)]

nev_file_id = np.fromfile(filename, count=1, dtype=dt0)[0]

if nev_file_id[‘file_id’].decode() == ‘NEURALEV’:

spec = ‘{0}.{1}’.format(

nev_file_id[‘ver_major’], nev_file_id[‘ver_minor’])

else:

raise IOError(‘NEV file type {0} is not supported’.format(

nev_file_id[‘file_id’]))

return spec

Example 33

def __read_nsx_data_variant_a(self, nsx_nb):

“””

Extract nsx data from a 2.1 .nsx file

“””

filename = ‘.’.join([self._filenames[‘nsx’], ‘ns%i’ % nsx_nb])

# get shape of data

shape = (

self.__nsx_databl_param[‘2.1’](‘nb_data_points’, nsx_nb),

self.__nsx_basic_header[nsx_nb][‘channel_count’])

offset = self.__nsx_params[‘2.1’](‘bytes_in_headers’, nsx_nb)

# read nsx data

# store as dict for compatibility with higher file specs

data = {1: np.memmap(

filename, dtype=’int16′, shape=shape, offset=offset)}

return data

Example 34

def __read_nev_data(self, nev_data_masks, nev_data_types):

“””

Extract nev data from a 2.1 or 2.2 .nev file

“””

filename = ‘.’.join([self._filenames[‘nev’], ‘nev’])

data_size = self.__nev_basic_header[‘bytes_in_data_packets’]

header_size = self.__nev_basic_header[‘bytes_in_headers’]

# read all raw data packets and markers

dt0 = [

(‘timestamp’, ‘uint32’),

(‘packet_id’, ‘uint16’),

(‘value’, ‘S{0}’.format(data_size – 6))]

raw_data = np.memmap(filename, offset=header_size, dtype=dt0)

masks = self.__nev_data_masks(raw_data[‘packet_id’])

types = self.__nev_data_types(data_size)

data = {}

for k, v in nev_data_masks.items():

data[k] = raw_data.view(types[k][nev_data_types[k]])[masks[k][v]]

return data

Example 35

def __get_nev_rec_times(self):

“””

Extracts minimum and maximum time points from a nev file.

“””

filename = ‘.’.join([self._filenames[‘nev’], ‘nev’])

dt = [(‘timestamp’, ‘uint32’)]

offset = \

self.__get_file_size(filename) – \

self.__nev_params(‘bytes_in_data_packets’)

last_data_packet = np.memmap(filename, offset=offset, dtype=dt)[0]

n_starts = [0 * self.__nev_params(‘event_unit’)]

n_stops = [

last_data_packet[‘timestamp’] * self.__nev_params(‘event_unit’)]

return n_starts, n_stops

Example 36

def __get_waveforms_dtype(self):

“””

Extracts the actual waveform dtype set for each channel.

“””

# Blackrock code giving the approiate dtype

conv = {0: ‘int8’, 1: ‘int8’, 2: ‘int16’, 4: ‘int32’}

# get all electrode ids from nev ext header

all_el_ids = self.__nev_ext_header[b’NEUEVWAV’][‘electrode_id’]

# get the dtype of waveform (this is stupidly complicated)

if self.__is_set(

np.array(self.__nev_basic_header[‘additionnal_flags’]), 0):

dtype_waveforms = dict((k, ‘int16’) for k in all_el_ids)

else:

# extract bytes per waveform

waveform_bytes = \

self.__nev_ext_header[b’NEUEVWAV’][‘bytes_per_waveform’]

# extract dtype for waveforms fro each electrode

dtype_waveforms = dict(zip(all_el_ids, conv[waveform_bytes]))

return dtype_waveforms

Example 37

def __read_comment(self,n_start,n_stop,data,lazy=False):

event_unit = self.__nev_params(‘event_unit’)

if lazy:

times = []

labels = np.array([],dtype=’s’)

else:

times = data[‘timestamp’]*event_unit

labels = data[‘comment’].astype(str)

# mask for given time interval

mask = (times >= n_start) & (times < n_stop)

if np.sum(mask)>0:

ev = Event(

times = times[mask].astype(float),

labels = labels[mask],

name = ‘comment’)

if lazy:

ev.lazy_shape = np.sum(mask)

else:

ev = None

return ev

# ————–end——added by zhangbo 20170926——–

Example 38

def reformat_integer_v1(data, nbchannel, header):

“””

reformat when dtype is int16 for ABF version 1

“””

chans = [chan_num for chan_num in

header[‘nADCSamplingSeq’] if chan_num >= 0]

for n, i in enumerate(chans[:nbchannel]): # respect SamplingSeq

data[:, n] /= header[‘fInstrumentScaleFactor’][i]

data[:, n] /= header[‘fSignalGain’][i]

data[:, n] /= header[‘fADCProgrammableGain’][i]

if header[‘nTelegraphEnable’][i]:

data[:, n] /= header[‘fTelegraphAdditGain’][i]

data[:, n] *= header[‘fADCRange’]

data[:, n] /= header[‘lADCResolution’]

data[:, n] += header[‘fInstrumentOffset’][i]

data[:, n] -= header[‘fSignalOffset’][i]

Example 39

def solve3DTransform(points1, points2):

“””

Find a 3D transformation matrix that maps points1 onto points2.

Points must be specified as either lists of 4 Vectors or

(4, 3) arrays.

“””

import numpy.linalg

pts = []

for inp in (points1, points2):

if isinstance(inp, np.ndarray):

A = np.empty((4,4), dtype=float)

A[:,:3] = inp[:,:3]

A[:,3] = 1.0

else:

A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])

pts.append(A)

## solve 3 sets of linear equations to determine transformation matrix elements

matrix = np.zeros((4,4))

for i in range(3):

## solve Ax = B; x is one row of the desired transformation matrix

matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])

return matrix

Example 40

def __init__(self, index, channel_names=None, channel_ids=None,

name=None, description=None, file_origin=None,

coordinates=None, **annotations):

”’

Initialize a new :class:`ChannelIndex` instance.

”’

# Inherited initialization

# Sets universally recommended attributes, and places all others

# in annotations

super(ChannelIndex, self).__init__(name=name,

description=description,

file_origin=file_origin,

**annotations)

# Defaults

if channel_names is None:

channel_names = np.array([], dtype=’S’)

if channel_ids is None:

channel_ids = np.array([], dtype=’i’)

# Store recommended attributes

self.channel_names = np.array(channel_names)

self.channel_ids = np.array(channel_ids)

self.index = np.array(index)

self.coordinates = coordinates

Example 41

def load_bytes(self, data_blocks, dtype=’

“””

Return list of bytes contained

in the specified set of blocks.

NB : load all data as files cannot exceed 4Gb

find later other solutions to spare memory.

“””

chunks = list()

raw = ”

# keep only data blocks having

# a size greater than zero

blocks = [k for k in data_blocks if k.size > 0]

for data_block in blocks :

self.file.seek(data_block.start)

raw = self.file.read(data_block.size)[0:expected_size]

databytes = np.frombuffer(raw, dtype=dtype)

chunks.append(databytes)

# concatenate all chunks and return

# the specified slice

if len(chunks)>0 :

databytes = np.concatenate(chunks)

return databytes[start:end]

else :

return np.array([])

Example 42

def load_channel_data(self, ep, ch):

“””

Return a numpy array containing the

list of bytes corresponding to the

specified episode and channel.

“””

#memorise the sample size and symbol

sample_size = self.sample_size(ep, ch)

sample_symbol = self.sample_symbol(ep, ch)

#create a bit mask to define which

#sample to keep from the file

bit_mask = self.create_bit_mask(ep, ch)

#load all bytes contained in an episode

data_blocks = self.get_data_blocks(ep)

databytes = self.load_bytes(data_blocks)

raw = self.filter_bytes(databytes, bit_mask)

#reshape bytes from the sample size

dt = np.dtype(numpy_map[sample_symbol])

dt.newbyteorder(‘

return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)

Example 43

def get_signal_data(self, ep, ch):

“””

Return a numpy array containing all samples of a

signal, acquired on an Elphy analog channel, formatted

as a list of (time, value) tuples.

“””

#get data from the file

y_data = self.load_encoded_data(ep, ch)

x_data = np.arange(0, len(y_data))

#create a recarray

data = np.recarray(len(y_data), dtype=[(‘x’, b_float), (‘y’, b_float)])

#put in the recarray the scaled data

x_factors = self.x_scale_factors(ep, ch)

y_factors = self.y_scale_factors(ep, ch)

data[‘x’] = x_factors.scale(x_data)

data[‘y’] = y_factors.scale(y_data)

return data

Example 44

def get_tag_data(self, ep, tag_ch):

“””

Return a numpy array containing all samples of a

signal, acquired on an Elphy tag channel, formatted

as a list of (time, value) tuples.

“””

#get data from the file

y_data = self.load_encoded_tags(ep, tag_ch)

x_data = np.arange(0, len(y_data))

#create a recarray

data = np.recarray(len(y_data), dtype=[(‘x’, b_float), (‘y’, b_int)])

#put in the recarray the scaled data

factors = self.x_tag_scale_factors(ep)

data[‘x’] = factors.scale(x_data)

data[‘y’] = y_data

return data

Example 45

def load_encoded_events(self, episode, evt_channel, identifier):

“””

Return times stored as a 4-bytes integer

in the specified event channel.

“””

data_blocks = self.group_blocks_of_type(episode, identifier)

ep_blocks = self.get_blocks_stored_in_episode(episode)

evt_blocks = [k for k in ep_blocks if k.identifier == identifier]

#compute events on each channel

n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)

pre_events = np.sum(n_events[0:evt_channel – 1], dtype=int)

start = pre_events

end = start + n_events[evt_channel – 1]

expected_size = 4 * np.sum(n_events, dtype=int)

return self.load_bytes(data_blocks, dtype=’

Example 46

def load_encoded_spikes(self, episode, evt_channel, identifier):

“””

Return times stored as a 4-bytes integer

in the specified spike channel.

NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.

These additiona bytes are appended trailing the times.

“””

# to load the requested spikes for the specified episode and event channel:

# get all the elphy blocks having as identifier ‘RSPK’ (or whatever)

all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]

rspk_block = all_rspk_blocks[episode-1]

# RDATA(h?dI) REVT(NbVeV:I, NbEv:256I … spike data are 4byte integers

rspk_header = 4*( rspk_block.size – rspk_block.data_size-2 + len(rspk_block.n_events))

pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)

# the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)

start = rspk_header + (4*pre_events) + pre_events

end = start + 4*rspk_block.n_events[evt_channel-1]

raw = self.load_bytes( [rspk_block], dtype=’

# re-encoding after reading byte by byte

res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype=’

res.sort() # sometimes timings are not sorted

#print “load_encoded_data() – spikes:”,res

return res

Example 47

def get_spiketrain(self, episode, electrode_id):

“””

Return a :class:`Spike` which is a

descriptor of the specified spike channel.

“””

assert episode in range(1, self.n_episodes + 1)

assert electrode_id in range(1, self.n_spiketrains(episode) + 1)

# get some properties stored in the episode sub-block

block = self.episode_block(episode)

x_unit = block.ep_block.x_unit

x_unit_wf = getattr(block.ep_block, ‘x_unit_wf’, None)

y_unit_wf = getattr(block.ep_block, ‘y_unit_wf’, None)

# number of spikes in the entire episode

spk_blocks = [k for k in self.blocks if k.identifier == ‘RSPK’]

n_events = np.sum([k.n_events[electrode_id – 1] for k in spk_blocks], dtype=int)

# number of samples in a waveform

wf_sampling_frequency = 1.0 / block.ep_block.dX

wf_blocks = [k for k in self.blocks if k.identifier == ‘RspkWave’]

if wf_blocks :

wf_samples = wf_blocks[0].wavelength

t_start = wf_blocks[0].pre_trigger * block.ep_block.dX

else:

wf_samples = 0

t_start = 0

return ElphySpikeTrain(self, episode, electrode_id, x_unit, n_events, wf_sampling_frequency, wf_samples, x_unit_wf, y_unit_wf, t_start)

Example 48

def get_rspk_data(self, spk_channel):

“””

Return times stored as a 4-bytes integer

in the specified event channel.

“””

evt_blocks = self.get_blocks_of_type(‘RSPK’)

#compute events on each channel

n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)

pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!

start = pre_events + (7 + len(n_events))# rspk header

end = start + n_events[spk_channel]

expected_size = 4 * np.sum(n_events, dtype=int) # constant

return self.load_bytes(evt_blocks, dtype=’

# ———————————————————

# factories.py

Example 49

def __mmap_ncs_packet_headers(self, filename):

“””

Memory map of the Neuralynx .ncs file optimized for extraction of

data packet headers

Reading standard dtype improves speed, but timestamps need to be

reconstructed

“””

filesize = getsize(self.sessiondir + sep + filename) # in byte

if filesize > 16384:

data = np.memmap(self.sessiondir + sep + filename,

dtype=’

shape=((filesize – 16384) / 4 / 261, 261),

mode=’r’, offset=16384)

ts = data[:, 0:2]

multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),

axis=0)

timestamps = np.sum(ts * multi, axis=1)

# timestamps = data[:,0] + (data[:,1] *2**32)

header_u4 = data[:, 2:5]

return timestamps, header_u4

else:

return None

Example 50

def __mmap_nev_file(self, filename):

“”” Memory map the Neuralynx .nev file “””

nev_dtype = np.dtype([

(‘reserved’, ‘

(‘system_id’, ‘

(‘data_size’, ‘

(‘timestamp’, ‘

(‘event_id’, ‘

(‘ttl_input’, ‘

(‘crc_check’, ‘

(‘dummy1’, ‘

(‘dummy2’, ‘

(‘extra’, ‘

(‘event_string’, ‘a128’),

])

if getsize(self.sessiondir + sep + filename) > 16384:

return np.memmap(self.sessiondir + sep + filename,

dtype=nev_dtype, mode=’r’, offset=16384)

else:

return None

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/134696.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 大数据分析及工具应用总结「建议收藏」

    大数据分析及工具应用总结「建议收藏」概述数据分析即从数据、信息到知识的过程,数据分析需要数学理论、行业经验以及计算机工具三者结合数据分析工具:各种厂商开发了数据分析的工具、模块,将分析模型封装,使不了解技术的人也能够快捷的实现数学建模,快速响应分析需求传统分析:在数据量较少时,传统的数据分析已能够发现数据中包含的知识,包括结构分析、杜邦分析等模型,方法成熟,应用广泛。数据挖掘:就是充分利用了统计学和人工智能技术的应用程序,并把这些高深复杂的技术封装起来,使人们不用自己掌握这些技术也能完成同样的功能,并且…

  • 端口分类_宽带端口是什么样的

    端口分类_宽带端口是什么样的一、端口通俗地讲,端口(Port)就是电脑向网络开放的信息出入“门户”。和小区大门不同的是,在电脑上这种“门户”有个256×256(65535)个,而且它们还有多种状态。1.端口的分类根据端口和服务的绑定情况,端口可分为公认端口、注册端口和动态端口。公认端口:0~1023。这个范围内的端口系统一般保留给一些常用的系统服务,比如WEB服务使用80端口,FTP服务使用21端口

  • dp3003打印机怎么清零_最小宽度dp

    dp3003打印机怎么清零_最小宽度dp有台奇怪的打印机有以下两个特殊要求:打印机每次只能打印由 同一个字符 组成的序列。每次可以在任意起始和结束位置打印新字符,并且会覆盖掉原来已有的字符。给你一个字符串 s ,你的任务是计算这个打印机打印它需要的最少打印次数。示例 1:输入:s = “aaabbb”输出:2解释:首先打印 “aaa” 然后打印 “bbb”。示例 2:输入:s = “aba”输出:2解释:首先打印 “aaa” 然后在第二个位置打印 “b” 覆盖掉原来的字符 ‘a’。 提示:1 <= s.le

  • idea最新激活码2021破解方法

    idea最新激活码2021破解方法,https://javaforall.cn/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

  • javac使用「建议收藏」

    javac使用「建议收藏」javac使用

  • Spring Cloud GateWay网关集群搭建「建议收藏」

    Spring Cloud GateWay网关集群搭建「建议收藏」SpringCloudGateWay网关集群搭建1.环境nginx:1.19.0nacos:1.3.1openjdk:1.8.0_181nacos集群:192.168.8.81192.168.8.82192.168.8.832.实现网关注册nacos中心1)配置依赖pom.xml因为是搭建网关集群,每一个网关应用使用的依赖都是一致的2)修改配置文件配置网关服务gatewaya的nacos集群注册中心地

    2022年10月10日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号