qijo.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # This Source Code Form is subject to the terms of the Mozilla Public
  2. # License, v. 2.0. If a copy of the MPL was not distributed with this
  3. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. from ctypes import c_void_p, POINTER, sizeof, Structure, windll, WinError, WINFUNCTYPE, addressof, c_size_t, c_ulong
  5. from ctypes.wintypes import BOOL, BYTE, DWORD, HANDLE, LARGE_INTEGER
  6. LPVOID = c_void_p
  7. LPDWORD = POINTER(DWORD)
  8. SIZE_T = c_size_t
  9. ULONG_PTR = POINTER(c_ulong)
  10. # A ULONGLONG is a 64-bit unsigned integer.
  11. # Thus there are 8 bytes in a ULONGLONG.
  12. # XXX why not import c_ulonglong ?
  13. ULONGLONG = BYTE * 8
  14. class IO_COUNTERS(Structure):
  15. # The IO_COUNTERS struct is 6 ULONGLONGs.
  16. # TODO: Replace with non-dummy fields.
  17. _fields_ = [('dummy', ULONGLONG * 6)]
  18. class JOBOBJECT_BASIC_ACCOUNTING_INFORMATION(Structure):
  19. _fields_ = [('TotalUserTime', LARGE_INTEGER),
  20. ('TotalKernelTime', LARGE_INTEGER),
  21. ('ThisPeriodTotalUserTime', LARGE_INTEGER),
  22. ('ThisPeriodTotalKernelTime', LARGE_INTEGER),
  23. ('TotalPageFaultCount', DWORD),
  24. ('TotalProcesses', DWORD),
  25. ('ActiveProcesses', DWORD),
  26. ('TotalTerminatedProcesses', DWORD)]
  27. class JOBOBJECT_BASIC_AND_IO_ACCOUNTING_INFORMATION(Structure):
  28. _fields_ = [('BasicInfo', JOBOBJECT_BASIC_ACCOUNTING_INFORMATION),
  29. ('IoInfo', IO_COUNTERS)]
  30. # see http://msdn.microsoft.com/en-us/library/ms684147%28VS.85%29.aspx
  31. class JOBOBJECT_BASIC_LIMIT_INFORMATION(Structure):
  32. _fields_ = [('PerProcessUserTimeLimit', LARGE_INTEGER),
  33. ('PerJobUserTimeLimit', LARGE_INTEGER),
  34. ('LimitFlags', DWORD),
  35. ('MinimumWorkingSetSize', SIZE_T),
  36. ('MaximumWorkingSetSize', SIZE_T),
  37. ('ActiveProcessLimit', DWORD),
  38. ('Affinity', ULONG_PTR),
  39. ('PriorityClass', DWORD),
  40. ('SchedulingClass', DWORD)
  41. ]
  42. # see http://msdn.microsoft.com/en-us/library/ms684156%28VS.85%29.aspx
  43. class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(Structure):
  44. _fields_ = [('BasicLimitInformation', JOBOBJECT_BASIC_LIMIT_INFORMATION),
  45. ('IoInfo', IO_COUNTERS),
  46. ('ProcessMemoryLimit', SIZE_T),
  47. ('JobMemoryLimit', SIZE_T),
  48. ('PeakProcessMemoryUsed', SIZE_T),
  49. ('PeakJobMemoryUsed', SIZE_T)]
  50. # XXX Magical numbers like 8 should be documented
  51. JobObjectBasicAndIoAccountingInformation = 8
  52. # ...like magical number 9 comes from
  53. # http://community.flexerasoftware.com/archive/index.php?t-181670.html
  54. # I wish I had a more canonical source
  55. JobObjectExtendedLimitInformation = 9
  56. class JobObjectInfo(object):
  57. mapping = { 'JobObjectBasicAndIoAccountingInformation': 8,
  58. 'JobObjectExtendedLimitInformation': 9
  59. }
  60. structures = { 8: JOBOBJECT_BASIC_AND_IO_ACCOUNTING_INFORMATION,
  61. 9: JOBOBJECT_EXTENDED_LIMIT_INFORMATION
  62. }
  63. def __init__(self, _class):
  64. if isinstance(_class, basestring):
  65. assert _class in self.mapping, 'Class should be one of %s; you gave %s' % (self.mapping, _class)
  66. _class = self.mapping[_class]
  67. assert _class in self.structures, 'Class should be one of %s; you gave %s' % (self.structures, _class)
  68. self.code = _class
  69. self.info = self.structures[_class]()
  70. QueryInformationJobObjectProto = WINFUNCTYPE(
  71. BOOL, # Return type
  72. HANDLE, # hJob
  73. DWORD, # JobObjectInfoClass
  74. LPVOID, # lpJobObjectInfo
  75. DWORD, # cbJobObjectInfoLength
  76. LPDWORD # lpReturnLength
  77. )
  78. QueryInformationJobObjectFlags = (
  79. (1, 'hJob'),
  80. (1, 'JobObjectInfoClass'),
  81. (1, 'lpJobObjectInfo'),
  82. (1, 'cbJobObjectInfoLength'),
  83. (1, 'lpReturnLength', None)
  84. )
  85. _QueryInformationJobObject = QueryInformationJobObjectProto(
  86. ('QueryInformationJobObject', windll.kernel32),
  87. QueryInformationJobObjectFlags
  88. )
  89. class SubscriptableReadOnlyStruct(object):
  90. def __init__(self, struct):
  91. self._struct = struct
  92. def _delegate(self, name):
  93. result = getattr(self._struct, name)
  94. if isinstance(result, Structure):
  95. return SubscriptableReadOnlyStruct(result)
  96. return result
  97. def __getitem__(self, name):
  98. match = [fname for fname, ftype in self._struct._fields_
  99. if fname == name]
  100. if match:
  101. return self._delegate(name)
  102. raise KeyError(name)
  103. def __getattr__(self, name):
  104. return self._delegate(name)
  105. def QueryInformationJobObject(hJob, JobObjectInfoClass):
  106. jobinfo = JobObjectInfo(JobObjectInfoClass)
  107. result = _QueryInformationJobObject(
  108. hJob=hJob,
  109. JobObjectInfoClass=jobinfo.code,
  110. lpJobObjectInfo=addressof(jobinfo.info),
  111. cbJobObjectInfoLength=sizeof(jobinfo.info)
  112. )
  113. if not result:
  114. raise WinError()
  115. return SubscriptableReadOnlyStruct(jobinfo.info)
  116. def test_qijo():
  117. from killableprocess import Popen
  118. popen = Popen('c:\\windows\\notepad.exe')
  119. try:
  120. result = QueryInformationJobObject(0, 8)
  121. raise AssertionError('throw should occur')
  122. except WindowsError, e:
  123. pass
  124. try:
  125. result = QueryInformationJobObject(0, 1)
  126. raise AssertionError('throw should occur')
  127. except NotImplementedError, e:
  128. pass
  129. result = QueryInformationJobObject(popen._job, 8)
  130. if result['BasicInfo']['ActiveProcesses'] != 1:
  131. raise AssertionError('expected ActiveProcesses to be 1')
  132. popen.kill()
  133. result = QueryInformationJobObject(popen._job, 8)
  134. if result.BasicInfo.ActiveProcesses != 0:
  135. raise AssertionError('expected ActiveProcesses to be 0')
  136. if __name__ == '__main__':
  137. print "testing."
  138. test_qijo()
  139. print "success!"