Click here for back issues.
https://www.simulationroom999.com/blog/diagnostic-communication-en-back-issue/
Introduction.
Let’s simulate ISO-TP. series.
In this article, we will try to create a pseudo-ECU by the Python package can-isotp.
Is the previous communication method incomplete?
Last time, I made a multi-frame request with can-isotp.
It was realized by FC response from the destination.
However, this is incomplete as communication.
As a communication as ISO15765-2, this is OK, but it is incomplete when assumed up to ISO14229-1.
Vehicle diagnostic communication is generally one communication with a request and a response.
So, the previous communication is incomplete because it is only a request.
It is difficult to reproduce the response message in python-can.
Therefore, I will use can-isotp for the response as well.
Well, I don’t know if it is really possible or not until I try it.
request-response with can-isotp
First, let’s draw an experimental configuration.
Both will be configured to use can-isotp to perform request-response.
The tricky points this time are as follows.
- Two scripts, one for the request side and one for the response side, are required.
- Both need a send function and a receive function.
can-isotp request side script
The request side script is as follows.
import isotp
import logging
import time
import threading
from can.interfaces.vector import VectorBus
class ThreadedApp:
def __init__(self):
isotp_params = {
'stmin' : 0,
'blocksize' : 0,
'wftmax' : 0,
'll_data_length' : 8,
'tx_padding' : 0xCC,
'rx_flowcontrol_timeout' : 1000,
'rx_consecutive_frame_timeout' : 1000,
'squash_stmin_requirement' : False,
'can_fd' : False,
'tx_data_min_length' : 8
}
self.exit_requested = False
self.bus = VectorBus(channel='0', bitrate=500000)
addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10)
self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def send(self, msg):
self.stack.send(msg)
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
time.sleep(0.001)
def shutdown(self):
self.stop()
self.bus.shutdown()
def sendrecv( app, msg ):
print("Send msg : %s" % (msg.hex()))
app.send(msg)
t1 = time.time()
while time.time() - t1 < 5:
if app.stack.available():
payload = app.stack.recv()
print("Recv msg : %s" % (payload.hex()))
break
time.sleep(0.2)
if __name__ == '__main__':
app = ThreadedApp()
app.start()
#sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')
print("Exiting")
app.shutdown()
It uses a thread for receiving.
When you pass a message to the sendrecv function, you can assume that it handles sending and receiving in accordance with ISO 15765-2.
can-isotp response side script
The approximate structure is the same as the request side script, and the response side script is shown below.
import isotp
import logging
import time
import threading
from can.interfaces.vector import VectorBus
class ThreadedApp:
def __init__(self):
isotp_params = {
'stmin' : 0,
'blocksize' : 0,
'wftmax' : 0,
'll_data_length' : 8,
'tx_padding' : 0xCC,
'rx_flowcontrol_timeout' : 1000,
'rx_consecutive_frame_timeout' : 1000,
'squash_stmin_requirement' : False,
'can_fd' : True,
'tx_data_min_length' : 8
}
self.exit_requested = False
self.bus = VectorBus(channel='0', bitrate=500000)
addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1)
self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)
def start(self):
self.exit_requested = False
self.thread = threading.Thread(target = self.thread_task)
self.thread.start()
def stop(self):
self.exit_requested = True
if self.thread.isAlive():
self.thread.join()
def send(self, msg):
self.stack.send(msg)
def my_error_handler(self, error):
logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def thread_task(self):
while self.exit_requested == False:
self.stack.process() # Non-blocking
#time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
time.sleep(0.001) # Variable sleep time based on state machine state
def shutdown(self):
self.stop()
self.bus.shutdown()
def recvsend( app, msg ):
while True:
if app.stack.available():
payload = app.stack.recv()
print("Recv msg : %s" % (payload.hex()))
break
time.sleep(0.0001)
print("Send msg : %s" % (msg.hex()))
app.send(msg)
t1 = time.time()
while time.time() - t1 < 5:
if app.stack.available():
break
if __name__ == '__main__':
app = ThreadedApp()
app.start()
#recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')
print("Exiting")
app.shutdown()
The creation of the receiving thread is the same.
The only difference is that the previous sendrecv function has become recvsend, a function that receives and sends.
When a message is set in this function, it triggers a response when a request is received.
In other words, most of the complication exists in the implementation of the receiving thread.
Another cause is the use of class functions.
Although using class functions is more readable, it may not be the best way to explain the behavior.
We will check the actual behavior in the next issue.
Conclusion
- Most vehicle diagnostic communication is a single request-response communication.
- There are some exceptions such as request-only and response-only communication, but they are quite rare.
- Both the request and response sides are script fatted by the receive thread mechanism.
Click here for back issues.
コメント