<< return to Pixycam.com

Help read signatures, quantities, index with RPI I2C Python

Hello friends!
I am doing an interesting project for children, a cave made of lego in which 9 Pixy2.1 cameras are installed, which count the number of I2C signatures (colored lego men) in different places.

At first I wanted to count from the serial port from Arduino, but I faced the limitation of 4 cameras, then I decided directly with RPI4 (Python) with smbus following the example of pixycamev3 excluding ev3dev2._
https://github.com/charmedlabs/pixycamev3/blob/main/samples/python3/cp_display_block.py
by referring to the function def get_blocks(self, sigmap, max_blocks) of the module pixy2.py
https://github.com/charmedlabs/pixycamev3/blob/main/pixycamev3/pixy2.py
I can’t get the same data as (Arduino_I2C_example)

Please tell me how to correctly count the number of each signature, the total number of signatures and the color of the signature (given in the camera setup)

module pixy2.py:
import os, sys
from time import sleep
from smbus import SMBus

class Pixy2:

    def __init__(self, i2c_address):
        self.i2c_address = i2c_address
        self.pixy2 = SMBus(1)
      
    def pixy2_request(self, request, response_type):
        """ Send request to Pixy2, return header on success or error on fail."""
        times_tried = 0
        while True:
            times_tried += 1
            if times_tried == 10:
                # Tried sending 10 times without success, raise error
                msg = 'Fault in serial communication.'
                # raise Pixy2CommunicationError(msg, 'Pixy2CommunicationError')
            else:
                # Send requeat to Pixy2
                self._i2c_write(request)
                # Read header of response
                header = self._i2c_read(6)
                # Check header for success or fail
                err = self._check_header(header, response_type)
                if err == 'NO_ERROR':
                    # Successful request, break out of loop and return header
                    break
                else:
                    # Request not successful, try again
                    pass
        return header

    def _check_header(self, header, packet_type):
        err = 'NO_ERROR'
        return err

    def _i2c_write(self, data):
        self.pixy2.write_i2c_block_data(self.i2c_address, 0, data)
        
    def _i2c_read(self, length):
        """ Read data from Pixy2."""
        response = self.pixy2.read_i2c_block_data(self.i2c_address, 0, length)
        return response
    
    def set_lamp(self, upper, lower):
        """ Turn on/off upper and lower LED's of Pixy2 (0=off, 1=on)."""
        data = [174, 193, 22, 2, upper, lower]
        response_type = 1
        self.pixy2_request(data, response_type)

    def get_blocks(self, sigmap, max_blocks):
        """ Get blockdata for sigmap."""
        blocks = []
        # Request data
        data = [174, 193, 32, 2, sigmap, max_blocks]
        response_type = 33
        header = self.pixy2_request(data, response_type)
        length_of_payload = header[3]
        nr_detected_blocks = int(length_of_payload/14)
        # Read and parse data
        for b in range(0, nr_detected_blocks):
            data = self._i2c_read(14)
            blocks.append(Block())
            blocks[b].sig = data[1] << 8 | data[0]
            blocks[b].x_center = data[3] << 8 | data[2]
            blocks[b].y_center = data[5] << 8 | data[4]
            blocks[b].width= data[7] << 8 | data[6]
            blocks[b].height = data[9] << 8 | data[8]
            blocks[b].angle = data[11] << 8 | data[10]
            blocks[b].tracking_index = data[12]
            blocks[b].age = data[13]

        return nr_detected_blocks, blocks


class Block:
    """ Datablock with detected signature."""
    def __init__(self):
        self.sig = None
        self.x_center = None
        self.y_center = None
        self.width = None
        self.height = None
        self.angle = None
        self.tracking_index = None
        self.age = None

    def __str__(self):
        desc = 'sig: {}\nx: {}\ny: {}\nwidth:  {}\nheight: {}'.format(
            self.sig, self.x_center, self.y_center, self.width, self.height, self.angle, self.tracking_index, self.age)
        return desc

main.py:

##!/usr/bin/python
# -*- coding: utf-8 -*-
# !//usr/bin/env python

from time import sleep
from pixy2 import Pixy2
from pixy2 import Block

def main():
    block = Block()
    pixy2 = Pixy2(0x30)
    sigmap = 255
    max_blocks = 255
    sig = ''
    x = ''
    y = ''
    w = ''
    h = ''
    a = ''
    t = ''
    g = ''
    
  
    while (1):
        nr_blocks, block = pixy2.get_blocks(sigmap, max_blocks)
        if nr_blocks > 0:
            sig = block[0].sig
            x = block[0].x_center
            y = block[0].y_center
            w = block[0].width
            h = block[0].height
            a = block[0].angle
            t = block[0].tracking_index
            g = block[0].age
        print(sig, t, g)
        sleep(0.2)

if __name__ == '__main__':
    main()

Hello,
If you use sigmap of 255, it will return detected objects of all signatures. Are you getting data back from Pixy with your code?

Edward

I get it, but they are incomprehensible values. When I show sig1 I get 257, not 1.
I think I’m doing the wrong request-response, help me understand please.

Thank you Edward!
by disabling Pixy1 compatibility, I started getting the correct signature numbers, which parameter is responsible for counting all the signatures?
In arduino I read Serial.print(pixy1.ccc.numBlocks)? can I read the same in python?

Also in the terminal, it mostly outputs only one signature with a larger PixyMon court recognition area, it does not output two signatures at the same time. Maybe I’m not using the loop correctly?
main:

from time import sleep
from pixy2 import Pixy2
from pixy2 import Block
# from counter import Counter
from collections import Counter

def main():
    block = Block()
    pixy2 = Pixy2(0x30)
    sigmap = 255
    max_blocks = 255

    # sig = 0
    o = 0
    r = 0

    sleep(0.1)
  
    while (1):
        nr_blocks, block = pixy2.get_blocks(sigmap, max_blocks)
        sleep(0.1)
        if nr_blocks > 0:
            sig = block[0].sig
            if sig == 1:
                o += 1
            elif sig == 2:
                r += 1
            print (o, r) 

if __name__ == '__main__':
    main()

Hello,
The number of signatures should be in the nr_blocks (or you could do len(blocks). It looks like you are just looking at block[0]. To see the other blocks, you might try something like:

nr_blocks, blocks = pixy2.get_blocks(sigmap, max_blocks)
for b in blocks:
   print(b)

Edward

Thank you! I was able to count everything correctly, but now I need to count it from 9 cameras. What is the best way for me to count the same values from 9 cameras? Any ideas?

def main():
    pixy2 = Pixy2(0x30)
    sigmap = 255
    max_blocks = 255

    sleep(0.1)

    orange = 0
    red=0
    blue=0
    green=0
    
    while (1):
        o=0
        r=0
        b=0
        g=0
        orange=o
        red=r
        blue=b
        green=g
        nr_blocks, block = pixy2.get_blocks(sigmap, max_blocks)
        sleep(0.1)
        if nr_blocks > 0:
            for i in range(0,nr_blocks):
                sig = block[i].sig
                if sig == 1:
                    o += 1
                elif sig == 2:
                    r += 1
                elif sig == 3:
                    b += 1
                elif sig == 4:
                    g += 1
                print (o, r, b, g, nr_blocks)

Good day!
Maybe someone will find my code useful, but after a while the program closes, it complains about a lot of open files /dev/i2c-1
There is no bus.close in the Pixy2 module is this the intention?
Maybe someone has encountered a similar error with Pixy???:

Traceback (most recent call last):
  File "main.py", line 45, in <module>
  File "main.py", line 13, in main
  File "main.py", line 19, in pixyRead
  File "/home/pi/test/pixy2.py", line 9, in __init__
  File "/home/pi/.local/lib/python3.7/site-packages/smbus2/smbus2.py", line 280, in __init__
  File "/home/pi/.local/lib/python3.7/site-packages/smbus2/smbus2.py", line 310, in open
OSError: [Errno 24] Too many open files: '/dev/i2c-1'

main.py:

from time import sleep
from pixy2 import Pixy2


results = {0x30: {'o': 0,'r': 0,'b': 0,'g': 0,'nr_blocks': 0}, 0x31: {'o': 0,'r': 0,'b': 0,'g': 0,'nr_blocks': 0} }
def main():
    while True:
        for addr in results.keys():
            results[addr] = pixyRead(addr)
        print (results)
        

def pixyRead(addr):
    res = {'o': 0,'r': 0,'b': 0,'g': 0,'nr_blocks': 0}
    pixy2 = Pixy2(addr)
    pixy2.set_lamp(0, 0)
    sleep(0.1)
    nr_blocks, block = pixy2.get_blocks()
    res['nr_blocks'] = nr_blocks
    sleep(0.1)
    if nr_blocks > 0:
        for i in range(0,nr_blocks):
            sig = block[i].sig
            if sig == 1:
                res['o'] += 1
            elif sig == 2:
                res['r'] += 1
            elif sig == 3:
                res['b'] += 1
            elif sig == 4:
                res['g'] += 1
    # print (res)
    del pixy2
    return res

if __name__ == '__main__':
    main()

Hello,
You should move the instantiation to outside of PixyRead(). That is, move the line:

pixy2 = Pixy2(addr)

to your main() function, and pass the pixy2 object into pixyRead(). This way you won’t run into the “too many open files” error. What’s happening is every time you call pixyRead, it initializes a new Pixy2 object . After calling pixyRead several times, the numerous Pixy2 objects are not being cleaned up.

Edward

Thank you!!! works well