[Dcm] Vehicle Diagnostic Communication Part 74 [Simulation 13]

[Dcm] Vehicle Diagnostic Communication Part 74 [Simulation 13] 車両診断通信
[Dcm] Vehicle Diagnostic Communication Part 74 [Simulation 13]

Click here for back issues.
https://www.simulationroom999.com/blog/diagnostic-communication-en-back-issue/

Introduction.

AUTOSAR-Dcm simulation explanation.
In this article, I will explain the Python code for the simulation of DiagnosticSessionControl.

Python code for simulation of DiagnosticSessionControl

The first step is to simulate DiagnosticSessionControl.
The Python code to be used is as follows.

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0, 
         'blocksize' : 4,
         'wftmax' : 0,
         'll_data_length' : 8,
         'tx_padding' : 0xCC,
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : False,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      #self.bus = VectorBus(channel='0', bitrate=500000)
      self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def sendrecv( app, msg ):
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.2)


if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   datas=[
      bytes([0x10, 0x01]),
      bytes([0x10, 0x02]),
      bytes([0x10, 0x03]),
      bytes([0x10, 0x04]),
      bytes([0x10, 0x05]),
      bytes([0x10, 0x01, 0x02]),
   ]
   
   for i in range(len(datas)):
      sendrecv(app, datas[i])

   print("Exiting")
   app.shutdown()

Python code explanation for simulation of DiagnosticSessionControl

What this code is trying to do is the following flow.

(1). Transition to defaultSession
(2). Transition to programmingSession
(3). Transit to extendDiagnosticSession
(4). Transit to safetySystemSession
(5). Transit to nonexistentSession
(6). DiagnosticSessionControl request with wrong message length

(5) and (6) are intentionally wrong communication.

In the next article, let’s look at the communication results from the simulation.

Conclusion

  • I wrote a Python code for simulation of DiagnosticSessionControl.
  • Communication patterns include error patterns.
    • Non-existent session.
    • Wrong message length for a DiagnosticSessionControl request.

Click here for back issues.

コメント

タイトルとURLをコピーしました